Persist image format to a file, to prevent attacks based on changing it

The attack is based on creating a raw image that looks like a qcow2
image, and taking advantage of the code that used 'qemu-img info' to
autodetect the image format.

Now we store the image format to a 'disk.info' file, for Qcow2 and Raw
images, and only autodetect for images that have never been written to
that file.

SecurityImpact

Co-authored-by: Nikola Dipanov <ndipanov@redhat.com>
Closes-bug: #1221190
Change-Id: I2016efdb3f49a44ec4d677ac596eacc97871f30a
This commit is contained in:
David Ripton 2014-01-28 16:38:51 -05:00 committed by Nikola Dipanov
parent e064d6d9f4
commit dc8de42606
6 changed files with 519 additions and 186 deletions

View File

@ -463,6 +463,14 @@ class InvalidDiskFormat(Invalid):
msg_fmt = _("Disk format %(disk_format)s is not acceptable")
class InvalidDiskInfo(Invalid):
msg_fmt = _("Disk info file is invalid: %(reason)s")
class DiskInfoReadWriteFail(Invalid):
msg_fmt = _("Failed to read or write disk info file: %(reason)s")
class ImageUnacceptable(Invalid):
msg_fmt = _("Image %(image_id)s is unacceptable: %(reason)s")

View File

@ -14,6 +14,8 @@
# under the License.
import os
import shutil
import tempfile
import fixtures
from oslo.config import cfg
@ -26,13 +28,13 @@ from nova.openstack.common import uuidutils
from nova import test
from nova.tests import fake_processutils
from nova.tests.virt.libvirt import fake_libvirt_utils
from nova import utils
from nova.virt.libvirt import imagebackend
CONF = cfg.CONF
class _ImageTestCase(object):
INSTANCES_PATH = '/instances_path'
def mock_create_image(self, image):
def create_image(fn, base, size, *args, **kwargs):
@ -41,10 +43,13 @@ class _ImageTestCase(object):
def setUp(self):
super(_ImageTestCase, self).setUp()
self.INSTANCES_PATH = tempfile.mkdtemp(suffix='instances')
self.flags(disable_process_locking=True,
instances_path=self.INSTANCES_PATH)
self.INSTANCE = {'name': 'instance',
'uuid': uuidutils.generate_uuid()}
self.DISK_INFO_PATH = os.path.join(self.INSTANCES_PATH,
self.INSTANCE['uuid'], 'disk.info')
self.NAME = 'fake.vm'
self.TEMPLATE = 'template'
@ -62,6 +67,78 @@ class _ImageTestCase(object):
'nova.virt.libvirt.imagebackend.libvirt_utils',
fake_libvirt_utils))
def fake_chown(path, owner_uid=None):
return None
self.stubs.Set(utils, 'chown', fake_chown)
def tearDown(self):
super(_ImageTestCase, self).tearDown()
shutil.rmtree(self.INSTANCES_PATH)
def test_prealloc_image(self):
CONF.set_override('preallocate_images', 'space')
fake_processutils.fake_execute_clear_log()
fake_processutils.stub_out_processutils_execute(self.stubs)
image = self.image_class(self.INSTANCE, self.NAME)
def fake_fetch(target, *args, **kwargs):
return
self.stubs.Set(os.path, 'exists', lambda _: True)
self.stubs.Set(os, 'access', lambda p, w: True)
# Call twice to verify testing fallocate is only called once.
image.cache(fake_fetch, self.TEMPLATE_PATH, self.SIZE)
image.cache(fake_fetch, self.TEMPLATE_PATH, self.SIZE)
self.assertEqual(fake_processutils.fake_execute_get_log(),
['fallocate -n -l 1 %s.fallocate_test' % self.PATH,
'fallocate -n -l %s %s' % (self.SIZE, self.PATH),
'fallocate -n -l %s %s' % (self.SIZE, self.PATH)])
def test_prealloc_image_without_write_access(self):
CONF.set_override('preallocate_images', 'space')
fake_processutils.fake_execute_clear_log()
fake_processutils.stub_out_processutils_execute(self.stubs)
image = self.image_class(self.INSTANCE, self.NAME)
def fake_fetch(target, *args, **kwargs):
return
self.stubs.Set(image, 'check_image_exists', lambda: True)
self.stubs.Set(image, '_can_fallocate', lambda: True)
self.stubs.Set(os.path, 'exists', lambda _: True)
self.stubs.Set(os, 'access', lambda p, w: False)
# Testing fallocate is only called when user has write access.
image.cache(fake_fetch, self.TEMPLATE_PATH, self.SIZE)
self.assertEqual(fake_processutils.fake_execute_get_log(), [])
class RawTestCase(_ImageTestCase, test.NoDBTestCase):
SIZE = 1024
def setUp(self):
self.image_class = imagebackend.Raw
super(RawTestCase, self).setUp()
self.stubs.Set(imagebackend.Raw, 'correct_format', lambda _: None)
def fake_chown(path, owner_uid=None):
return None
self.stubs.Set(utils, 'chown', fake_chown)
def prepare_mocks(self):
fn = self.mox.CreateMockAnything()
self.mox.StubOutWithMock(imagebackend.utils.synchronized,
'__call__')
self.mox.StubOutWithMock(imagebackend.libvirt_utils, 'copy_image')
self.mox.StubOutWithMock(imagebackend.disk, 'extend')
return fn
def test_cache(self):
self.mox.StubOutWithMock(os.path, 'exists')
if self.OLD_STYLE_INSTANCE_PATH:
@ -127,6 +204,270 @@ class _ImageTestCase(object):
self.mox.VerifyAll()
def test_create_image(self):
fn = self.prepare_mocks()
fn(target=self.TEMPLATE_PATH, max_size=None, image_id=None)
imagebackend.libvirt_utils.copy_image(self.TEMPLATE_PATH, self.PATH)
self.mox.ReplayAll()
image = self.image_class(self.INSTANCE, self.NAME)
image.create_image(fn, self.TEMPLATE_PATH, None, image_id=None)
self.mox.VerifyAll()
def test_create_image_generated(self):
fn = self.prepare_mocks()
fn(target=self.PATH)
self.mox.ReplayAll()
image = self.image_class(self.INSTANCE, self.NAME)
image.create_image(fn, self.TEMPLATE_PATH, None)
self.mox.VerifyAll()
def test_create_image_extend(self):
fn = self.prepare_mocks()
fn(max_size=self.SIZE, target=self.TEMPLATE_PATH, image_id=None)
imagebackend.libvirt_utils.copy_image(self.TEMPLATE_PATH, self.PATH)
imagebackend.disk.extend(self.PATH, self.SIZE, use_cow=False)
self.mox.ReplayAll()
image = self.image_class(self.INSTANCE, self.NAME)
image.create_image(fn, self.TEMPLATE_PATH, self.SIZE, image_id=None)
self.mox.VerifyAll()
def test_correct_format(self):
self.stubs.UnsetAll()
self.mox.StubOutWithMock(os.path, 'exists')
self.mox.StubOutWithMock(imagebackend.images, 'qemu_img_info')
def fake_chown(path, owner_uid=None):
return None
self.stubs.Set(utils, 'chown', fake_chown)
os.path.exists(self.PATH).AndReturn(True)
os.path.exists(self.DISK_INFO_PATH).AndReturn(False)
info = self.mox.CreateMockAnything()
info.file_format = 'foo'
imagebackend.images.qemu_img_info(self.PATH).AndReturn(info)
os.path.exists(CONF.instances_path).AndReturn(True)
self.mox.ReplayAll()
image = self.image_class(self.INSTANCE, self.NAME, path=self.PATH)
self.assertEqual(image.driver_format, 'foo')
self.mox.VerifyAll()
def test_resolve_driver_format(self):
image = self.image_class(self.INSTANCE, self.NAME)
driver_format = image.resolve_driver_format()
self.assertEqual(driver_format, 'raw')
class Qcow2TestCase(_ImageTestCase, test.NoDBTestCase):
SIZE = units.Gi
def setUp(self):
self.image_class = imagebackend.Qcow2
super(Qcow2TestCase, self).setUp()
self.QCOW2_BASE = (self.TEMPLATE_PATH +
'_%d' % (self.SIZE / units.Gi))
def fake_chown(path, owner_uid=None):
return None
self.stubs.Set(utils, 'chown', fake_chown)
def prepare_mocks(self):
fn = self.mox.CreateMockAnything()
self.mox.StubOutWithMock(imagebackend.utils.synchronized,
'__call__')
self.mox.StubOutWithMock(imagebackend.libvirt_utils,
'create_cow_image')
self.mox.StubOutWithMock(imagebackend.libvirt_utils, 'copy_image')
self.mox.StubOutWithMock(imagebackend.disk, 'extend')
return fn
def test_cache(self):
self.mox.StubOutWithMock(os.path, 'exists')
if self.OLD_STYLE_INSTANCE_PATH:
os.path.exists(self.OLD_STYLE_INSTANCE_PATH).AndReturn(False)
os.path.exists(self.DISK_INFO_PATH).AndReturn(False)
os.path.exists(CONF.instances_path).AndReturn(True)
os.path.exists(self.TEMPLATE_DIR).AndReturn(False)
os.path.exists(self.INSTANCES_PATH).AndReturn(True)
os.path.exists(self.PATH).AndReturn(False)
fn = self.mox.CreateMockAnything()
fn(target=self.TEMPLATE_PATH)
self.mox.ReplayAll()
image = self.image_class(self.INSTANCE, self.NAME)
self.mock_create_image(image)
image.cache(fn, self.TEMPLATE)
self.mox.VerifyAll()
def test_cache_image_exists(self):
self.mox.StubOutWithMock(os.path, 'exists')
if self.OLD_STYLE_INSTANCE_PATH:
os.path.exists(self.OLD_STYLE_INSTANCE_PATH).AndReturn(False)
os.path.exists(self.DISK_INFO_PATH).AndReturn(False)
os.path.exists(self.INSTANCES_PATH).AndReturn(True)
os.path.exists(self.TEMPLATE_DIR).AndReturn(True)
os.path.exists(self.PATH).AndReturn(True)
os.path.exists(self.TEMPLATE_PATH).AndReturn(True)
self.mox.ReplayAll()
image = self.image_class(self.INSTANCE, self.NAME)
image.cache(None, self.TEMPLATE)
self.mox.VerifyAll()
def test_cache_base_dir_exists(self):
self.mox.StubOutWithMock(os.path, 'exists')
if self.OLD_STYLE_INSTANCE_PATH:
os.path.exists(self.OLD_STYLE_INSTANCE_PATH).AndReturn(False)
os.path.exists(self.DISK_INFO_PATH).AndReturn(False)
os.path.exists(self.INSTANCES_PATH).AndReturn(True)
os.path.exists(self.TEMPLATE_DIR).AndReturn(True)
os.path.exists(self.PATH).AndReturn(False)
fn = self.mox.CreateMockAnything()
fn(target=self.TEMPLATE_PATH)
self.mox.ReplayAll()
image = self.image_class(self.INSTANCE, self.NAME)
self.mock_create_image(image)
image.cache(fn, self.TEMPLATE)
self.mox.VerifyAll()
def test_cache_template_exists(self):
self.mox.StubOutWithMock(os.path, 'exists')
if self.OLD_STYLE_INSTANCE_PATH:
os.path.exists(self.OLD_STYLE_INSTANCE_PATH).AndReturn(False)
os.path.exists(self.DISK_INFO_PATH).AndReturn(False)
os.path.exists(self.INSTANCES_PATH).AndReturn(True)
os.path.exists(self.TEMPLATE_DIR).AndReturn(True)
os.path.exists(self.PATH).AndReturn(False)
fn = self.mox.CreateMockAnything()
fn(target=self.TEMPLATE_PATH)
self.mox.ReplayAll()
image = self.image_class(self.INSTANCE, self.NAME)
self.mock_create_image(image)
image.cache(fn, self.TEMPLATE)
self.mox.VerifyAll()
def test_create_image(self):
fn = self.prepare_mocks()
fn(max_size=None, target=self.TEMPLATE_PATH)
imagebackend.libvirt_utils.create_cow_image(self.TEMPLATE_PATH,
self.PATH)
self.mox.ReplayAll()
image = self.image_class(self.INSTANCE, self.NAME)
image.create_image(fn, self.TEMPLATE_PATH, None)
self.mox.VerifyAll()
def test_create_image_with_size(self):
fn = self.prepare_mocks()
fn(max_size=self.SIZE, target=self.TEMPLATE_PATH)
self.mox.StubOutWithMock(os.path, 'exists')
if self.OLD_STYLE_INSTANCE_PATH:
os.path.exists(self.OLD_STYLE_INSTANCE_PATH).AndReturn(False)
os.path.exists(self.DISK_INFO_PATH).AndReturn(False)
os.path.exists(self.INSTANCES_PATH).AndReturn(True)
os.path.exists(self.TEMPLATE_PATH).AndReturn(False)
os.path.exists(self.PATH).AndReturn(False)
os.path.exists(self.PATH).AndReturn(False)
imagebackend.libvirt_utils.create_cow_image(self.TEMPLATE_PATH,
self.PATH)
imagebackend.disk.extend(self.PATH, self.SIZE, use_cow=True)
self.mox.ReplayAll()
image = self.image_class(self.INSTANCE, self.NAME)
image.create_image(fn, self.TEMPLATE_PATH, self.SIZE)
self.mox.VerifyAll()
def test_create_image_too_small(self):
fn = self.prepare_mocks()
self.mox.StubOutWithMock(os.path, 'exists')
self.mox.StubOutWithMock(imagebackend.disk, 'get_disk_size')
if self.OLD_STYLE_INSTANCE_PATH:
os.path.exists(self.OLD_STYLE_INSTANCE_PATH).AndReturn(False)
os.path.exists(self.DISK_INFO_PATH).AndReturn(False)
os.path.exists(self.INSTANCES_PATH).AndReturn(True)
os.path.exists(self.TEMPLATE_PATH).AndReturn(True)
imagebackend.disk.get_disk_size(self.TEMPLATE_PATH
).AndReturn(self.SIZE)
self.mox.ReplayAll()
image = self.image_class(self.INSTANCE, self.NAME)
self.assertRaises(exception.FlavorDiskTooSmall,
image.create_image, fn, self.TEMPLATE_PATH, 1)
self.mox.VerifyAll()
def test_generate_resized_backing_files(self):
fn = self.prepare_mocks()
fn(max_size=self.SIZE, target=self.TEMPLATE_PATH)
self.mox.StubOutWithMock(os.path, 'exists')
self.mox.StubOutWithMock(imagebackend.libvirt_utils,
'get_disk_backing_file')
if self.OLD_STYLE_INSTANCE_PATH:
os.path.exists(self.OLD_STYLE_INSTANCE_PATH).AndReturn(False)
os.path.exists(self.DISK_INFO_PATH).AndReturn(False)
os.path.exists(CONF.instances_path).AndReturn(True)
os.path.exists(self.TEMPLATE_PATH).AndReturn(False)
os.path.exists(self.PATH).AndReturn(True)
imagebackend.libvirt_utils.get_disk_backing_file(self.PATH)\
.AndReturn(self.QCOW2_BASE)
os.path.exists(self.QCOW2_BASE).AndReturn(False)
imagebackend.libvirt_utils.copy_image(self.TEMPLATE_PATH,
self.QCOW2_BASE)
imagebackend.disk.extend(self.QCOW2_BASE, self.SIZE, use_cow=True)
os.path.exists(self.PATH).AndReturn(True)
self.mox.ReplayAll()
image = self.image_class(self.INSTANCE, self.NAME)
image.create_image(fn, self.TEMPLATE_PATH, self.SIZE)
self.mox.VerifyAll()
def test_qcow2_exists_and_has_no_backing_file(self):
fn = self.prepare_mocks()
fn(max_size=self.SIZE, target=self.TEMPLATE_PATH)
self.mox.StubOutWithMock(os.path, 'exists')
self.mox.StubOutWithMock(imagebackend.libvirt_utils,
'get_disk_backing_file')
if self.OLD_STYLE_INSTANCE_PATH:
os.path.exists(self.OLD_STYLE_INSTANCE_PATH).AndReturn(False)
os.path.exists(self.DISK_INFO_PATH).AndReturn(False)
os.path.exists(self.INSTANCES_PATH).AndReturn(True)
os.path.exists(self.TEMPLATE_PATH).AndReturn(False)
os.path.exists(self.PATH).AndReturn(True)
imagebackend.libvirt_utils.get_disk_backing_file(self.PATH)\
.AndReturn(None)
os.path.exists(self.PATH).AndReturn(True)
self.mox.ReplayAll()
image = self.image_class(self.INSTANCE, self.NAME)
image.create_image(fn, self.TEMPLATE_PATH, self.SIZE)
self.mox.VerifyAll()
def test_resolve_driver_format(self):
image = self.image_class(self.INSTANCE, self.NAME)
driver_format = image.resolve_driver_format()
self.assertEqual(driver_format, 'qcow2')
def test_prealloc_image(self):
CONF.set_override('preallocate_images', 'space')
@ -170,189 +511,6 @@ class _ImageTestCase(object):
self.assertEqual(fake_processutils.fake_execute_get_log(), [])
class RawTestCase(_ImageTestCase, test.NoDBTestCase):
SIZE = 1024
def setUp(self):
self.image_class = imagebackend.Raw
super(RawTestCase, self).setUp()
self.stubs.Set(imagebackend.Raw, 'correct_format', lambda _: None)
def prepare_mocks(self):
fn = self.mox.CreateMockAnything()
self.mox.StubOutWithMock(imagebackend.utils.synchronized,
'__call__')
self.mox.StubOutWithMock(imagebackend.libvirt_utils, 'copy_image')
self.mox.StubOutWithMock(imagebackend.disk, 'extend')
return fn
def test_create_image(self):
fn = self.prepare_mocks()
fn(target=self.TEMPLATE_PATH, max_size=None, image_id=None)
imagebackend.libvirt_utils.copy_image(self.TEMPLATE_PATH, self.PATH)
self.mox.ReplayAll()
image = self.image_class(self.INSTANCE, self.NAME)
image.create_image(fn, self.TEMPLATE_PATH, None, image_id=None)
self.mox.VerifyAll()
def test_create_image_generated(self):
fn = self.prepare_mocks()
fn(target=self.PATH)
self.mox.ReplayAll()
image = self.image_class(self.INSTANCE, self.NAME)
image.create_image(fn, self.TEMPLATE_PATH, None)
self.mox.VerifyAll()
def test_create_image_extend(self):
fn = self.prepare_mocks()
fn(max_size=self.SIZE, target=self.TEMPLATE_PATH, image_id=None)
imagebackend.libvirt_utils.copy_image(self.TEMPLATE_PATH, self.PATH)
imagebackend.disk.extend(self.PATH, self.SIZE, use_cow=False)
self.mox.ReplayAll()
image = self.image_class(self.INSTANCE, self.NAME)
image.create_image(fn, self.TEMPLATE_PATH, self.SIZE, image_id=None)
self.mox.VerifyAll()
def test_correct_format(self):
info = self.mox.CreateMockAnything()
self.stubs.UnsetAll()
self.mox.StubOutWithMock(os.path, 'exists')
self.mox.StubOutWithMock(imagebackend.images, 'qemu_img_info')
os.path.exists(self.PATH).AndReturn(True)
info = self.mox.CreateMockAnything()
info.file_format = 'foo'
imagebackend.images.qemu_img_info(self.PATH).AndReturn(info)
self.mox.ReplayAll()
image = self.image_class(self.INSTANCE, self.NAME, path=self.PATH)
self.assertEqual(image.driver_format, 'foo')
self.mox.VerifyAll()
class Qcow2TestCase(_ImageTestCase, test.NoDBTestCase):
SIZE = units.Gi
def setUp(self):
self.image_class = imagebackend.Qcow2
super(Qcow2TestCase, self).setUp()
self.QCOW2_BASE = (self.TEMPLATE_PATH +
'_%d' % (self.SIZE / units.Gi))
def prepare_mocks(self):
fn = self.mox.CreateMockAnything()
self.mox.StubOutWithMock(imagebackend.utils.synchronized,
'__call__')
self.mox.StubOutWithMock(imagebackend.libvirt_utils,
'create_cow_image')
self.mox.StubOutWithMock(imagebackend.libvirt_utils, 'copy_image')
self.mox.StubOutWithMock(imagebackend.disk, 'extend')
return fn
def test_create_image(self):
fn = self.prepare_mocks()
fn(max_size=None, target=self.TEMPLATE_PATH)
imagebackend.libvirt_utils.create_cow_image(self.TEMPLATE_PATH,
self.PATH)
self.mox.ReplayAll()
image = self.image_class(self.INSTANCE, self.NAME)
image.create_image(fn, self.TEMPLATE_PATH, None)
self.mox.VerifyAll()
def test_create_image_with_size(self):
fn = self.prepare_mocks()
fn(max_size=self.SIZE, target=self.TEMPLATE_PATH)
self.mox.StubOutWithMock(os.path, 'exists')
if self.OLD_STYLE_INSTANCE_PATH:
os.path.exists(self.OLD_STYLE_INSTANCE_PATH).AndReturn(False)
os.path.exists(self.TEMPLATE_PATH).AndReturn(False)
os.path.exists(self.PATH).AndReturn(False)
os.path.exists(self.PATH).AndReturn(False)
imagebackend.libvirt_utils.create_cow_image(self.TEMPLATE_PATH,
self.PATH)
imagebackend.disk.extend(self.PATH, self.SIZE, use_cow=True)
self.mox.ReplayAll()
image = self.image_class(self.INSTANCE, self.NAME)
image.create_image(fn, self.TEMPLATE_PATH, self.SIZE)
self.mox.VerifyAll()
def test_create_image_too_small(self):
fn = self.prepare_mocks()
self.mox.StubOutWithMock(os.path, 'exists')
self.mox.StubOutWithMock(imagebackend.disk, 'get_disk_size')
if self.OLD_STYLE_INSTANCE_PATH:
os.path.exists(self.OLD_STYLE_INSTANCE_PATH).AndReturn(False)
os.path.exists(self.TEMPLATE_PATH).AndReturn(True)
imagebackend.disk.get_disk_size(self.TEMPLATE_PATH
).AndReturn(self.SIZE)
self.mox.ReplayAll()
image = self.image_class(self.INSTANCE, self.NAME)
self.assertRaises(exception.FlavorDiskTooSmall,
image.create_image, fn, self.TEMPLATE_PATH, 1)
self.mox.VerifyAll()
def test_generate_resized_backing_files(self):
fn = self.prepare_mocks()
fn(max_size=self.SIZE, target=self.TEMPLATE_PATH)
self.mox.StubOutWithMock(os.path, 'exists')
self.mox.StubOutWithMock(imagebackend.libvirt_utils,
'get_disk_backing_file')
if self.OLD_STYLE_INSTANCE_PATH:
os.path.exists(self.OLD_STYLE_INSTANCE_PATH).AndReturn(False)
os.path.exists(self.TEMPLATE_PATH).AndReturn(False)
os.path.exists(self.PATH).AndReturn(True)
imagebackend.libvirt_utils.get_disk_backing_file(self.PATH)\
.AndReturn(self.QCOW2_BASE)
os.path.exists(self.QCOW2_BASE).AndReturn(False)
imagebackend.libvirt_utils.copy_image(self.TEMPLATE_PATH,
self.QCOW2_BASE)
imagebackend.disk.extend(self.QCOW2_BASE, self.SIZE, use_cow=True)
os.path.exists(self.PATH).AndReturn(True)
self.mox.ReplayAll()
image = self.image_class(self.INSTANCE, self.NAME)
image.create_image(fn, self.TEMPLATE_PATH, self.SIZE)
self.mox.VerifyAll()
def test_qcow2_exists_and_has_no_backing_file(self):
fn = self.prepare_mocks()
fn(max_size=self.SIZE, target=self.TEMPLATE_PATH)
self.mox.StubOutWithMock(os.path, 'exists')
self.mox.StubOutWithMock(imagebackend.libvirt_utils,
'get_disk_backing_file')
if self.OLD_STYLE_INSTANCE_PATH:
os.path.exists(self.OLD_STYLE_INSTANCE_PATH).AndReturn(False)
os.path.exists(self.TEMPLATE_PATH).AndReturn(False)
os.path.exists(self.PATH).AndReturn(True)
imagebackend.libvirt_utils.get_disk_backing_file(self.PATH)\
.AndReturn(None)
os.path.exists(self.PATH).AndReturn(True)
self.mox.ReplayAll()
image = self.image_class(self.INSTANCE, self.NAME)
image.create_image(fn, self.TEMPLATE_PATH, self.SIZE)
self.mox.VerifyAll()
class LvmTestCase(_ImageTestCase, test.NoDBTestCase):
VG = 'FakeVG'
TEMPLATE_SIZE = 512
@ -428,6 +586,56 @@ class LvmTestCase(_ImageTestCase, test.NoDBTestCase):
self.mox.VerifyAll()
def test_cache(self):
self.mox.StubOutWithMock(os.path, 'exists')
if self.OLD_STYLE_INSTANCE_PATH:
os.path.exists(self.OLD_STYLE_INSTANCE_PATH).AndReturn(False)
os.path.exists(self.TEMPLATE_DIR).AndReturn(False)
os.path.exists(self.PATH).AndReturn(False)
fn = self.mox.CreateMockAnything()
fn(target=self.TEMPLATE_PATH)
self.mox.StubOutWithMock(imagebackend.fileutils, 'ensure_tree')
imagebackend.fileutils.ensure_tree(self.TEMPLATE_DIR)
self.mox.ReplayAll()
image = self.image_class(self.INSTANCE, self.NAME)
self.mock_create_image(image)
image.cache(fn, self.TEMPLATE)
self.mox.VerifyAll()
def test_cache_image_exists(self):
self.mox.StubOutWithMock(os.path, 'exists')
if self.OLD_STYLE_INSTANCE_PATH:
os.path.exists(self.OLD_STYLE_INSTANCE_PATH).AndReturn(False)
os.path.exists(self.TEMPLATE_DIR).AndReturn(True)
os.path.exists(self.PATH).AndReturn(True)
os.path.exists(self.TEMPLATE_PATH).AndReturn(True)
self.mox.ReplayAll()
image = self.image_class(self.INSTANCE, self.NAME)
image.cache(None, self.TEMPLATE)
self.mox.VerifyAll()
def test_cache_base_dir_exists(self):
self.mox.StubOutWithMock(os.path, 'exists')
if self.OLD_STYLE_INSTANCE_PATH:
os.path.exists(self.OLD_STYLE_INSTANCE_PATH).AndReturn(False)
os.path.exists(self.TEMPLATE_DIR).AndReturn(True)
os.path.exists(self.PATH).AndReturn(False)
fn = self.mox.CreateMockAnything()
fn(target=self.TEMPLATE_PATH)
self.mox.StubOutWithMock(imagebackend.fileutils, 'ensure_tree')
self.mox.ReplayAll()
image = self.image_class(self.INSTANCE, self.NAME)
self.mock_create_image(image)
image.cache(fn, self.TEMPLATE)
self.mox.VerifyAll()
def test_create_image(self):
self._create_image(False)
@ -595,6 +803,20 @@ class RbdTestCase(_ImageTestCase, test.NoDBTestCase):
self.mox.VerifyAll()
def test_cache_base_dir_exists(self):
self.mox.StubOutWithMock(os.path, 'exists')
os.path.exists(self.TEMPLATE_DIR).AndReturn(True)
fn = self.mox.CreateMockAnything()
fn(target=self.TEMPLATE_PATH)
self.mox.StubOutWithMock(imagebackend.fileutils, 'ensure_tree')
self.mox.ReplayAll()
image = self.image_class(self.INSTANCE, self.NAME)
self.mock_create_image(image)
image.cache(fn, self.TEMPLATE)
self.mox.VerifyAll()
def test_create_image(self):
fn = self.prepare_mocks()
fn(max_size=None, rbd=self.rbd, target=self.TEMPLATE_PATH)
@ -663,6 +885,13 @@ class BackendTestCase(test.NoDBTestCase):
'uuid': uuidutils.generate_uuid()}
NAME = 'fake-name.suffix'
def setUp(self):
super(BackendTestCase, self).setUp()
def fake_chown(path, owner_uid=None):
return None
self.stubs.Set(utils, 'chown', fake_chown)
def get_image(self, use_cow, image_type):
return imagebackend.Backend(use_cow).image(self.INSTANCE,
self.NAME,

View File

@ -409,6 +409,9 @@ class LibvirtConnTestCase(test.TestCase):
self.stubs.Set(libvirt_driver.disk, 'extend', fake_extend)
self.stubs.Set(imagebackend.Image, 'resolve_driver_format',
imagebackend.Image._get_driver_format)
class FakeConn():
def baselineCPU(self, cpu, flag):
"""Add new libvirt API."""

View File

@ -32,6 +32,7 @@ from nova.tests import utils as test_utils
from nova.tests.virt.libvirt import fake_libvirt_utils
from nova.virt import event as virtevent
from nova.virt import fake
from nova.virt.libvirt import imagebackend
LOG = logging.getLogger(__name__)
@ -204,6 +205,12 @@ class _VirtDriverTestCase(_FakeDriverBackendTestCase):
fake.FakeVirtAPI())
self.ctxt = test_utils.get_test_admin_context()
self.image_service = fake_image.FakeImageService()
# NOTE(dripton): resolve_driver_format does some file reading and
# writing and chowning that complicate testing too much by requiring
# using real directories with proper permissions. Just stub it out
# here; we test it in test_imagebackend.py
self.stubs.Set(imagebackend.Image, 'resolve_driver_format',
imagebackend.Image._get_driver_format)
def _get_running_instance(self, obj=False):
instance_ref = test_utils.get_test_instance(obj=obj)

View File

@ -761,6 +761,20 @@ def temporary_chown(path, owner_uid=None):
execute('chown', orig_uid, path, run_as_root=True)
def chown(path, owner_uid=None):
"""chown a path.
:param owner_uid: UID of owner (defaults to current user)
"""
if owner_uid is None:
owner_uid = os.getuid()
orig_uid = os.stat(path).st_uid
if orig_uid != owner_uid:
execute('chown', owner_uid, path, run_as_root=True)
@contextlib.contextmanager
def tempdir(**kwargs):
argdict = kwargs.copy()

View File

@ -104,6 +104,11 @@ class Image(object):
self.is_block_dev = is_block_dev
self.preallocate = False
# NOTE(dripton): We store lines of json (path, disk_format) in this
# file, for some image types, to prevent attacks based on changing the
# disk_format.
self.disk_info_path = None
# NOTE(mikal): We need a lock directory which is shared along with
# instance files, to cover the scenario where multiple compute nodes
# are trying to create a base file at the same time
@ -240,6 +245,65 @@ class Image(object):
def snapshot_extract(self, target, out_format):
raise NotImplementedError()
def _get_driver_format(self):
return self.driver_format
def resolve_driver_format(self):
"""Return the driver format for self.path.
First checks self.disk_info_path for an entry.
If it's not there, calls self._get_driver_format(), and then
stores the result in self.disk_info_path
See https://bugs.launchpad.net/nova/+bug/1221190
"""
def _dict_from_line(line):
if not line:
return {}
try:
return jsonutils.loads(line)
except (TypeError, ValueError) as e:
msg = (_("Could not load line %(line)s, got error "
"%(error)s") %
{'line': line, 'error': unicode(e)})
raise exception.InvalidDiskInfo(reason=msg)
@utils.synchronized(self.disk_info_path, external=False,
lock_path=self.lock_path)
def write_to_disk_info_file():
# Use os.open to create it without group or world write permission.
fd = os.open(self.disk_info_path, os.O_RDWR | os.O_CREAT, 0o644)
with os.fdopen(fd, "r+") as disk_info_file:
line = disk_info_file.read().rstrip()
dct = _dict_from_line(line)
if self.path in dct:
msg = _("Attempted overwrite of an existing value.")
raise exception.InvalidDiskInfo(reason=msg)
dct.update({self.path: driver_format})
disk_info_file.seek(0)
disk_info_file.truncate()
disk_info_file.write('%s\n' % jsonutils.dumps(dct))
# Ensure the file is always owned by the nova user so qemu can't
# write it.
utils.chown(self.disk_info_path, owner_uid=os.getuid())
try:
if (self.disk_info_path is not None and
os.path.exists(self.disk_info_path)):
with open(self.disk_info_path) as disk_info_file:
line = disk_info_file.read().rstrip()
dct = _dict_from_line(line)
for path, driver_format in dct.iteritems():
if path == self.path:
return driver_format
driver_format = self._get_driver_format()
if self.disk_info_path is not None:
fileutils.ensure_tree(os.path.dirname(self.disk_info_path))
write_to_disk_info_file()
except OSError as e:
raise exception.DiskInfoReadWriteFail(reason=unicode(e))
return driver_format
class Raw(Image):
def __init__(self, instance=None, disk_name=None, path=None):
@ -249,12 +313,17 @@ class Raw(Image):
os.path.join(libvirt_utils.get_instance_path(instance),
disk_name))
self.preallocate = CONF.preallocate_images != 'none'
self.disk_info_path = os.path.join(os.path.dirname(self.path),
'disk.info')
self.correct_format()
def _get_driver_format(self):
data = images.qemu_img_info(self.path)
return data.file_format or 'raw'
def correct_format(self):
if os.path.exists(self.path):
data = images.qemu_img_info(self.path)
self.driver_format = data.file_format or 'raw'
self.driver_format = self.resolve_driver_format()
def create_image(self, prepare_template, base, size, *args, **kwargs):
filename = os.path.split(base)[-1]
@ -293,6 +362,9 @@ class Qcow2(Image):
os.path.join(libvirt_utils.get_instance_path(instance),
disk_name))
self.preallocate = CONF.preallocate_images != 'none'
self.disk_info_path = os.path.join(os.path.dirname(self.path),
'disk.info')
self.resolve_driver_format()
def create_image(self, prepare_template, base, size, *args, **kwargs):
filename = os.path.split(base)[-1]