Validate volume status again when snapshot created

In a multitask environment, the volume can be deleted
after the snapshot record is created. Add the validation
right after the creation to avoid undeletable snapshot.

Closes-Bug: #1739995

Change-Id: Icbe86473243871996483fdc96c079b8861bac2e5
This commit is contained in:
TommyLike 2017-12-25 09:19:20 +08:00
parent de4b1f1354
commit 7ccfb2c8d9
7 changed files with 54 additions and 11 deletions

View File

@ -228,7 +228,8 @@ class Snapshot(cleanable.CinderCleanableObject, base.CinderObject,
self.obj_reset_changes()
def destroy(self):
updated_values = db.snapshot_destroy(self._context, self.id)
with self.obj_as_admin():
updated_values = db.snapshot_destroy(self._context, self.id)
self.update(updated_values)
self.obj_reset_changes(updated_values.keys())

View File

@ -117,6 +117,7 @@ class SnapshotMetaDataTest(test.TestCase):
self.mock_object(scheduler_rpcapi.SchedulerAPI, 'create_snapshot')
self.mock_object(cinder.db, 'snapshot_get', return_snapshot)
self.mock_object(self.volume_api, 'update_snapshot_metadata')
self.patch('cinder.objects.volume.Volume.refresh')
self.ext_mgr = extensions.ExtensionManager()
self.ext_mgr.extensions = {}

View File

@ -244,7 +244,8 @@ class SnapshotApiTest(test.TestCase):
else:
self.assertNotIn('count', res_dict)
def test_snapshot_list_with_sort_name(self):
@mock.patch('cinder.objects.volume.Volume.refresh')
def test_snapshot_list_with_sort_name(self, mock_refresh):
self._create_snapshot(name='test1')
self._create_snapshot(name='test2')
@ -260,7 +261,8 @@ class SnapshotApiTest(test.TestCase):
self.assertEqual('test2', res_dict['snapshots'][0]['name'])
self.assertEqual('test1', res_dict['snapshots'][1]['name'])
def test_snapshot_list_with_one_metadata_in_filter(self):
@mock.patch('cinder.objects.volume.Volume.refresh')
def test_snapshot_list_with_one_metadata_in_filter(self, mock_refresh):
# Create snapshot with metadata key1: value1
metadata = {"key1": "val1"}
self._create_snapshot(metadata=metadata)
@ -289,7 +291,9 @@ class SnapshotApiTest(test.TestCase):
# verify no snapshot is returned
self.assertEqual(0, len(res_dict['snapshots']))
def test_snapshot_list_with_multiple_metadata_in_filter(self):
@mock.patch('cinder.objects.volume.Volume.refresh')
def test_snapshot_list_with_multiple_metadata_in_filter(self,
mock_refresh):
# Create snapshot with metadata key1: value1, key11: value11
metadata = {"key1": "val1", "key11": "val11"}
self._create_snapshot(metadata=metadata)
@ -350,7 +354,9 @@ class SnapshotApiTest(test.TestCase):
mock.ANY, 'snapshot',
support_like)
def test_snapshot_list_with_metadata_unsupported_microversion(self):
@mock.patch('cinder.objects.volume.Volume.refresh')
def test_snapshot_list_with_metadata_unsupported_microversion(
self, mock_refresh):
# Create snapshot with metadata key1: value1
metadata = {"key1": "val1"}
self._create_snapshot(metadata=metadata)

View File

@ -124,10 +124,12 @@ class TestSnapshot(test_objects.BaseObjectsTestCase):
'status': 'deleted',
'deleted': True,
'deleted_at': utcnow_mock.return_value}
snapshot = objects.Snapshot(context=self.context, id=fake.SNAPSHOT_ID)
snapshot = objects.Snapshot(context=self.context,
id=fake.SNAPSHOT_ID)
snapshot.destroy()
snapshot_destroy.assert_called_once_with(self.context,
fake.SNAPSHOT_ID)
self.assertTrue(snapshot_destroy.called)
admin_context = snapshot_destroy.call_args[0][0]
self.assertTrue(admin_context.is_admin)
self.assertTrue(snapshot.deleted)
self.assertEqual(fields.SnapshotStatus.DELETED, snapshot.status)
self.assertEqual(utcnow_mock.return_value.replace(tzinfo=pytz.UTC),

View File

@ -199,6 +199,25 @@ class SnapshotTestCase(base.BaseVolumeTestCase):
'fake_name',
'fake_description')
@mock.patch('cinder.objects.volume.Volume.get_by_id')
def test_create_snapshot_in_db_invalid_volume_status(self, mock_get):
test_volume1 = tests_utils.create_volume(
self.context,
status='available',
host=CONF.host)
test_volume2 = tests_utils.create_volume(
self.context,
status='deleting',
host=CONF.host)
mock_get.return_value = test_volume2
volume_api = cinder.volume.api.API()
self.assertRaises(exception.InvalidVolume,
volume_api.create_snapshot_in_db,
self.context, test_volume1, "fake_snapshot_name",
"fake_description", False, {}, None,
commit_quota=False)
def test_create_snapshot_failed_maintenance(self):
"""Test exception handling when create snapshot in maintenance."""
test_volume = tests_utils.create_volume(

View File

@ -1531,7 +1531,8 @@ class VolumeTestCase(base.BaseVolumeTestCase):
volume_type=db_vol_type)
db.volume_update(self.context, volume_src['id'],
{'host': 'fake_host@fake_backend'})
{'host': 'fake_host@fake_backend',
'status': 'available'})
volume_src = objects.Volume.get_by_id(self.context, volume_src['id'])
snapshot_ref = volume_api.create_snapshot_force(self.context,

View File

@ -873,10 +873,13 @@ class API(base.Base):
msg = _("Snapshot of secondary replica is not allowed.")
raise exception.InvalidVolume(reason=msg)
if ((not force) and (volume['status'] != "available")):
msg = _("Volume %(vol_id)s status must be available, "
valid_status = ["available", "in-use"] if force else ["available"]
if volume['status'] not in valid_status:
msg = _("Volume %(vol_id)s status must be %(status)s, "
"but current status is: "
"%(vol_status)s.") % {'vol_id': volume['id'],
'status': ', '.join(valid_status),
'vol_status': volume['status']}
raise exception.InvalidVolume(reason=msg)
@ -917,7 +920,17 @@ class API(base.Base):
}
snapshot = objects.Snapshot(context=context, **kwargs)
snapshot.create()
volume.refresh()
if volume['status'] not in valid_status:
msg = _("Volume %(vol_id)s status must be %(status)s , "
"but current status is: "
"%(vol_status)s.") % {'vol_id': volume['id'],
'status':
', '.join(valid_status),
'vol_status':
volume['status']}
raise exception.InvalidVolume(reason=msg)
if commit_quota:
QUOTAS.commit(context, reservations)
except Exception: