Implement compare-and-swap for instance update

This patch reworks nova.db.sqlalchemy.api._instance_update to remove a
race condition that exists in a critical section of code that
retrieves the instance record from the database, does some checks of
both the new values and existing values of instance fields, and then
tries to update the database with the new values.

We update the value using update_on_match, which does an optimistic,
atomic, lock-free, compare-and-swap on the object. If the update
fails, we do some additional checks to determine the specific error,
which maintains the previous behaviour.

_instance_update() is simplified by having it return only the updated
instance_ref. instance_update_and_get_original() is updated to fetch
the old value itself before calling _instance_update().

This patch was originally submitted as change
I9cd0f4b620e639b238555983bf6d58deafbaefeb, but that was reverted. The
original version was optimistic about not racing in
instance_update_and_get_original between fetching an instance and
updating it, in which case it raised a different exception to the
previous behaviour. In practice we were hit that race frequently in
the gate, and the unexpectedly different exception was causing gate
failures. This new version adds a retry in this case, and raises the
expected exception. The frequency of hitting that race highlights the
importance of this patch, as it is otherwise unhandled.

This new version also updates the exception hierarchy so that all
exceptions raised by _instance_update inherit from a single exception:
InstanceUpdateConflict. UnexpectedVMStateError is removed, as it had no
users. An InstanceUpdateConflict is now raised instead.

The new code trivially allows the caller to check any property when
updating an instance, not just the two prescribed. This new version
enables this by allowing the 'expected' dict to be passed in, as well
as maintaining the behaviour of pulling expected values from the
update dict. This has numerous potential applications, for example
atomically changing the host of an instance during evacuation.

The new version adds numerous additional tests.

Closes-bug: #1297375
Change-Id: I293da6f320dd8f3474ce2a9c907298e1fb348181
This commit is contained in:
Matthew Booth 2014-12-11 17:52:08 +00:00
parent 65d4884199
commit 2a875644cc
8 changed files with 346 additions and 90 deletions

View File

@ -730,17 +730,18 @@ def instance_get_all_hung_in_rebooting(context, reboot_window):
return IMPL.instance_get_all_hung_in_rebooting(context, reboot_window)
def instance_update(context, instance_uuid, values):
def instance_update(context, instance_uuid, values, expected=None):
"""Set the given properties on an instance and update it.
Raises NotFound if instance does not exist.
"""
return IMPL.instance_update(context, instance_uuid, values)
return IMPL.instance_update(context, instance_uuid, values,
expected=expected)
def instance_update_and_get_original(context, instance_uuid, values,
columns_to_join=None):
columns_to_join=None, expected=None):
"""Set the given properties on an instance and update it. Return
a shallow copy of the original instance reference, as well as the
updated one.
@ -754,7 +755,8 @@ def instance_update_and_get_original(context, instance_uuid, values,
Raises NotFound if instance does not exist.
"""
rv = IMPL.instance_update_and_get_original(context, instance_uuid, values,
columns_to_join=columns_to_join)
columns_to_join=columns_to_join,
expected=expected)
return rv

View File

@ -30,6 +30,7 @@ from oslo_db import api as oslo_db_api
from oslo_db import exception as db_exc
from oslo_db import options as oslo_db_options
from oslo_db.sqlalchemy import session as db_session
from oslo_db.sqlalchemy import update_match
from oslo_db.sqlalchemy import utils as sqlalchemyutils
from oslo_log import log as logging
from oslo_utils import excutils
@ -2423,15 +2424,29 @@ def instance_get_all_hung_in_rebooting(context, reboot_window):
manual_joins=[])
@require_context
def instance_update(context, instance_uuid, values):
instance_ref = _instance_update(context, instance_uuid, values)[1]
return instance_ref
def _retry_instance_update():
"""Wrap with oslo_db_api.wrap_db_retry, and also retry on
UnknownInstanceUpdateConflict.
"""
exception_checker = \
lambda exc: isinstance(exc, (exception.UnknownInstanceUpdateConflict,))
return oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True,
exception_checker=exception_checker)
@require_context
@_retry_instance_update()
def instance_update(context, instance_uuid, values, expected=None):
session = get_session()
with session.begin():
return _instance_update(context, session, instance_uuid,
values, expected)
@require_context
@_retry_instance_update()
def instance_update_and_get_original(context, instance_uuid, values,
columns_to_join=None):
columns_to_join=None, expected=None):
"""Set the given properties on an instance and update it. Return
a shallow copy of the original instance reference, as well as the
updated one.
@ -2448,9 +2463,14 @@ def instance_update_and_get_original(context, instance_uuid, values,
Raises NotFound if instance does not exist.
"""
return _instance_update(context, instance_uuid, values,
copy_old_instance=True,
columns_to_join=columns_to_join)
session = get_session()
with session.begin():
instance_ref = _instance_get_by_uuid(context, instance_uuid,
columns_to_join=columns_to_join,
session=session)
return (copy.copy(instance_ref),
_instance_update(context, session, instance_uuid, values,
expected, original=instance_ref))
# NOTE(danms): This updates the instance's metadata list in-place and in
@ -2487,73 +2507,122 @@ def _instance_metadata_update_in_place(context, instance, metadata_type, model,
instance[metadata_type].append(newitem)
@oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True)
def _instance_update(context, instance_uuid, values, copy_old_instance=False,
columns_to_join=None):
session = get_session()
def _instance_update(context, session, instance_uuid, values, expected,
original=None):
if not uuidutils.is_uuid_like(instance_uuid):
raise exception.InvalidUUID(instance_uuid)
with session.begin():
instance_ref = _instance_get_by_uuid(context, instance_uuid,
session=session,
columns_to_join=columns_to_join)
if "expected_task_state" in values:
# it is not a db column so always pop out
expected = values.pop("expected_task_state")
if not isinstance(expected, (tuple, list, set)):
expected = (expected,)
actual_state = instance_ref["task_state"]
if actual_state not in expected:
if actual_state == task_states.DELETING:
raise exception.UnexpectedDeletingTaskStateError(
actual=actual_state, expected=expected)
else:
raise exception.UnexpectedTaskStateError(
actual=actual_state, expected=expected)
if "expected_vm_state" in values:
expected = values.pop("expected_vm_state")
if not isinstance(expected, (tuple, list, set)):
expected = (expected,)
actual_state = instance_ref["vm_state"]
if actual_state not in expected:
raise exception.UnexpectedVMStateError(actual=actual_state,
expected=expected)
if expected is None:
expected = {}
else:
# Coerce all single values to singleton lists
expected = {k: [None] if v is None else sqlalchemyutils.to_list(v)
for (k, v) in six.iteritems(expected)}
instance_hostname = instance_ref['hostname'] or ''
if ("hostname" in values and
values["hostname"].lower() != instance_hostname.lower()):
_validate_unique_server_name(context,
session,
values['hostname'])
# Extract 'expected_' values from values dict, as these aren't actually
# updates
for field in ('task_state', 'vm_state'):
expected_field = 'expected_%s' % field
if expected_field in values:
value = values.pop(expected_field, None)
# Coerce all single values to singleton lists
if value is None:
expected[field] = [None]
else:
expected[field] = sqlalchemyutils.to_list(value)
if copy_old_instance:
old_instance_ref = copy.copy(instance_ref)
# Values which need to be updated separately
metadata = values.pop('metadata', None)
system_metadata = values.pop('system_metadata', None)
_handle_objects_related_type_conversions(values)
# Hostname is potentially unique, but this is enforced in code rather
# than the DB. The query below races, but the number of users of
# osapi_compute_unique_server_name_scope is small, and a robust fix
# will be complex. This is intentionally left as is for the moment.
if 'hostname' in values:
_validate_unique_server_name(context, session, values['hostname'])
compare = models.Instance(uuid=instance_uuid, **expected)
try:
instance_ref = model_query(context, models.Instance,
project_only=True, session=session).\
update_on_match(compare, 'uuid', values)
except update_match.NoRowsMatched:
# Update failed. Try to find why and raise a specific error.
# We should get here only because our expected values were not current
# when update_on_match executed. Having failed, we now have a hint that
# the values are out of date and should check them.
# This code is made more complex because we are using repeatable reads.
# If we have previously read the original instance in the current
# transaction, reading it again will return the same data, even though
# the above update failed because it has changed: it is not possible to
# determine what has changed in this transaction. In this case we raise
# UnknownInstanceUpdateConflict, which will cause the operation to be
# retried in a new transaction.
# Because of the above, if we have previously read the instance in the
# current transaction it will have been passed as 'original', and there
# is no point refreshing it. If we have not previously read the
# instance, we can fetch it here and we will get fresh data.
if original is None:
original = _instance_get_by_uuid(context, instance_uuid,
session=session)
conflicts_expected = {}
conflicts_actual = {}
for (field, expected_values) in six.iteritems(expected):
actual = original[field]
if actual not in expected_values:
conflicts_expected[field] = expected_values
conflicts_actual[field] = actual
# Exception properties
exc_props = {
'instance_uuid': instance_uuid,
'expected': conflicts_expected,
'actual': conflicts_actual
}
# There was a conflict, but something (probably the MySQL read view,
# but possibly an exceptionally unlikely second race) is preventing us
# from seeing what it is. When we go round again we'll get a fresh
# transaction and a fresh read view.
if len(conflicts_actual) == 0:
raise exception.UnknownInstanceUpdateConflict(**exc_props)
# Task state gets special handling for convenience. We raise the
# specific error UnexpectedDeletingTaskStateError or
# UnexpectedTaskStateError as appropriate
if 'task_state' in conflicts_actual:
conflict_task_state = conflicts_actual['task_state']
if conflict_task_state == task_states.DELETING:
exc = exception.UnexpectedDeletingTaskStateError
else:
exc = exception.UnexpectedTaskStateError
# Everything else is an InstanceUpdateConflict
else:
old_instance_ref = None
exc = exception.InstanceUpdateConflict
metadata = values.get('metadata')
if metadata is not None:
_instance_metadata_update_in_place(context, instance_ref,
'metadata',
models.InstanceMetadata,
values.pop('metadata'),
session)
raise exc(**exc_props)
system_metadata = values.get('system_metadata')
if system_metadata is not None:
_instance_metadata_update_in_place(context, instance_ref,
'system_metadata',
models.InstanceSystemMetadata,
values.pop('system_metadata'),
session)
if metadata is not None:
_instance_metadata_update_in_place(context, instance_ref,
'metadata',
models.InstanceMetadata,
metadata, session)
_handle_objects_related_type_conversions(values)
instance_ref.update(values)
session.add(instance_ref)
if system_metadata is not None:
_instance_metadata_update_in_place(context, instance_ref,
'system_metadata',
models.InstanceSystemMetadata,
system_metadata, session)
return (old_instance_ref, instance_ref)
return instance_ref
def instance_add_security_group(context, instance_uuid, security_group_id):

View File

@ -1426,9 +1426,18 @@ class InstanceUserDataMalformed(NovaException):
msg_fmt = _("User data needs to be valid base 64.")
class UnexpectedTaskStateError(NovaException):
msg_fmt = _("Unexpected task state: expecting %(expected)s but "
"the actual state is %(actual)s")
class InstanceUpdateConflict(NovaException):
msg_fmt = _("Conflict updating instance %(instance_uuid)s. "
"Expected: %(expected)s. Actual: %(actual)s")
class UnknownInstanceUpdateConflict(InstanceUpdateConflict):
msg_fmt = _("Conflict updating instance %(instance_uuid)s, but we were "
"unable to determine the cause")
class UnexpectedTaskStateError(InstanceUpdateConflict):
pass
class UnexpectedDeletingTaskStateError(UnexpectedTaskStateError):
@ -1444,11 +1453,6 @@ class InstanceActionEventNotFound(NovaException):
msg_fmt = _("Event %(event)s not found for action id %(action_id)s")
class UnexpectedVMStateError(NovaException):
msg_fmt = _("Unexpected VM state: expecting %(expected)s but "
"the actual state is %(actual)s")
class CryptoCAFileNotFound(FileNotFound):
msg_fmt = _("The CA file for %(project)s could not be found")

View File

@ -1786,7 +1786,9 @@ class ComputeTestCase(BaseTestCase):
with mock.patch.object(instance, 'save') as mock_save:
mock_save.side_effect = exception.UnexpectedDeletingTaskStateError(
actual='foo', expected='bar')
instance_uuid=instance['uuid'],
expected={'task_state': 'bar'},
actual={'task_state': 'foo'})
self.compute.build_and_run_instance(self.context, instance, {}, {},
{}, block_device_mapping=[])
self.assertTrue(mock_save.called)
@ -3159,7 +3161,9 @@ class ComputeTestCase(BaseTestCase):
def test_snapshot_fails_with_task_state_error(self):
deleting_state_error = exception.UnexpectedDeletingTaskStateError(
expected=task_states.IMAGE_SNAPSHOT, actual=task_states.DELETING)
instance_uuid='fake_uuid',
expected={'task_state': task_states.IMAGE_SNAPSHOT},
actual={'task_state': task_states.DELETING})
self._test_snapshot_deletes_image_on_failure(
'error', deleting_state_error)
self.assertTrue(self.fake_image_delete_called)

View File

@ -1234,7 +1234,9 @@ class _ComputeAPIUnitTestMixIn(object):
self.context, delta, fake_inst).AndReturn(fake_quotas)
exc = exception.UnexpectedTaskStateError(
actual=task_states.RESIZE_REVERTING, expected=None)
instance_uuid=fake_inst['uuid'],
actual={'task_state': task_states.RESIZE_REVERTING},
expected={'task_state': [None]})
fake_inst.save(expected_task_state=[None]).AndRaise(exc)
fake_quotas.rollback()

View File

@ -2973,8 +2973,8 @@ class ComputeManagerBuildInstanceTestCase(test.NoDBTestCase):
def test_build_and_run_unexpecteddeleting_exception(self):
self._test_build_and_run_exceptions(
exception.UnexpectedDeletingTaskStateError(expected='',
actual=''))
exception.UnexpectedDeletingTaskStateError(
instance_uuid='fake_uuid', expected={}, actual={}))
def test_build_and_run_buildabort_exception(self):
self._test_build_and_run_exceptions(exception.BuildAbortException(
@ -3222,7 +3222,9 @@ class ComputeManagerBuildInstanceTestCase(test.NoDBTestCase):
return_value=self.network_info),
mock.patch.object(self.instance, 'save',
side_effect=exception.UnexpectedDeletingTaskStateError(
actual=task_states.DELETING, expected='None')),
instance_uuid='fake_uuid',
actual={'task_state': task_states.DELETING},
expected={'task_state': None})),
) as (_build_networks_for_instance, save):
try:
@ -3260,8 +3262,10 @@ class ComputeManagerBuildInstanceTestCase(test.NoDBTestCase):
'_build_networks_for_instance') as _build_networks:
exc = exception.UnexpectedDeletingTaskStateError
_build_networks.side_effect = exc(actual=task_states.DELETING,
expected='None')
_build_networks.side_effect = exc(
instance_uuid='fake_uuid',
actual={'task_state': task_states.DELETING},
expected={'task_state': None})
try:
with self.compute._build_resources(self.context, self.instance,
@ -3354,7 +3358,7 @@ class ComputeManagerBuildInstanceTestCase(test.NoDBTestCase):
self, mock_save, mock_build_network, mock_info_wait):
mock_build_network.return_value = self.network_info
mock_save.side_effect = exception.UnexpectedTaskStateError(
expected='', actual='')
instance_uuid='fake_uuid', expected={}, actual={})
try:
with self.compute._build_resources(self.context, self.instance,
self.requested_networks, self.security_groups,

View File

@ -459,8 +459,9 @@ class ConductorTestCase(_BaseTestCase, test.TestCase):
def test_instance_update_expected_exceptions(self):
errors = (exc.InvalidUUID(uuid='foo'),
exc.InstanceNotFound(instance_id=1),
exc.UnexpectedTaskStateError(expected='foo',
actual='bar'))
exc.UnexpectedTaskStateError(instance_uuid='fake_uuid',
expected={'task_state': 'foo'},
actual={'task_state': 'bar'}))
self._test_expected_exceptions(
'instance_update', self.conductor.instance_update,
errors, None, {'foo': 'bar'}, None)

View File

@ -29,6 +29,7 @@ from oslo_config import cfg
from oslo_db import api as oslo_db_api
from oslo_db import exception as db_exc
from oslo_db.sqlalchemy import test_base
from oslo_db.sqlalchemy import update_match
from oslo_db.sqlalchemy import utils as sqlalchemyutils
from oslo_serialization import jsonutils
from oslo_utils import timeutils
@ -48,6 +49,7 @@ from sqlalchemy import Table
from nova import block_device
from nova.compute import arch
from nova.compute import task_states
from nova.compute import vm_states
from nova import context
from nova import db
@ -2465,7 +2467,7 @@ class InstanceTestCase(test.TestCase, ModelsObjectComparatorMixin):
def test_instance_update_with_unexpected_vm_state(self):
instance = self.create_instance_with_args(vm_state='foo')
self.assertRaises(exception.UnexpectedVMStateError,
self.assertRaises(exception.InstanceUpdateConflict,
db.instance_update, self.ctxt, instance['uuid'],
{'host': 'h1', 'expected_vm_state': ('spam', 'bar')})
@ -2532,7 +2534,7 @@ class InstanceTestCase(test.TestCase, ModelsObjectComparatorMixin):
# Make sure instance faults is deleted as well
self.assertEqual(0, len(faults[uuid]))
def test_instance_update_with_and_get_original(self):
def test_instance_update_and_get_original(self):
instance = self.create_instance_with_args(vm_state='building')
(old_ref, new_ref) = db.instance_update_and_get_original(self.ctxt,
instance['uuid'], {'vm_state': 'needscoffee'})
@ -2588,6 +2590,174 @@ class InstanceTestCase(test.TestCase, ModelsObjectComparatorMixin):
# 4. the "old" object is detached from this Session.
self.assertTrue(old_insp.detached)
def test_instance_update_and_get_original_conflict_race(self):
# Ensure that we retry if update_on_match fails for no discernable
# reason
instance = self.create_instance_with_args()
orig_update_on_match = update_match.update_on_match
# Reproduce the conditions of a race between fetching and updating the
# instance by making update_on_match fail for no discernable reason the
# first time it is called, but work normally the second time.
with mock.patch.object(update_match, 'update_on_match',
side_effect=[update_match.NoRowsMatched,
orig_update_on_match]):
db.instance_update_and_get_original(
self.ctxt, instance['uuid'], {'metadata': {'mk1': 'mv3'}})
self.assertEqual(update_match.update_on_match.call_count, 2)
def test_instance_update_and_get_original_conflict_race_fallthrough(self):
# Ensure that is update_match continuously fails for no discernable
# reason, we evantually raise UnknownInstanceUpdateConflict
instance = self.create_instance_with_args()
# Reproduce the conditions of a race between fetching and updating the
# instance by making update_on_match fail for no discernable reason.
with mock.patch.object(update_match, 'update_on_match',
side_effect=update_match.NoRowsMatched):
self.assertRaises(exception.UnknownInstanceUpdateConflict,
db.instance_update_and_get_original,
self.ctxt,
instance['uuid'],
{'metadata': {'mk1': 'mv3'}})
def test_instance_update_and_get_original_expected_host(self):
# Ensure that we allow update when expecting a host field
instance = self.create_instance_with_args()
(orig, new) = db.instance_update_and_get_original(
self.ctxt, instance['uuid'], {'host': None},
expected={'host': 'h1'})
self.assertIsNone(new['host'])
def test_instance_update_and_get_original_expected_host_fail(self):
# Ensure that we detect a changed expected host and raise
# InstanceUpdateConflict
instance = self.create_instance_with_args()
try:
db.instance_update_and_get_original(
self.ctxt, instance['uuid'], {'host': None},
expected={'host': 'h2'})
except exception.InstanceUpdateConflict as ex:
self.assertEqual(ex.kwargs['instance_uuid'], instance['uuid'])
self.assertEqual(ex.kwargs['actual'], {'host': 'h1'})
self.assertEqual(ex.kwargs['expected'], {'host': ['h2']})
else:
self.fail('InstanceUpdateConflict was not raised')
def test_instance_update_and_get_original_expected_host_none(self):
# Ensure that we allow update when expecting a host field of None
instance = self.create_instance_with_args(host=None)
(old, new) = db.instance_update_and_get_original(
self.ctxt, instance['uuid'], {'host': 'h1'},
expected={'host': None})
self.assertEqual('h1', new['host'])
def test_instance_update_and_get_original_expected_host_none_fail(self):
# Ensure that we detect a changed expected host of None and raise
# InstanceUpdateConflict
instance = self.create_instance_with_args()
try:
db.instance_update_and_get_original(
self.ctxt, instance['uuid'], {'host': None},
expected={'host': None})
except exception.InstanceUpdateConflict as ex:
self.assertEqual(ex.kwargs['instance_uuid'], instance['uuid'])
self.assertEqual(ex.kwargs['actual'], {'host': 'h1'})
self.assertEqual(ex.kwargs['expected'], {'host': [None]})
else:
self.fail('InstanceUpdateConflict was not raised')
def test_instance_update_and_get_original_expected_task_state_single_fail(self): # noqa
# Ensure that we detect a changed expected task and raise
# UnexpectedTaskStateError
instance = self.create_instance_with_args()
try:
db.instance_update_and_get_original(
self.ctxt, instance['uuid'], {
'host': None,
'expected_task_state': task_states.SCHEDULING
})
except exception.UnexpectedTaskStateError as ex:
self.assertEqual(ex.kwargs['instance_uuid'], instance['uuid'])
self.assertEqual(ex.kwargs['actual'], {'task_state': None})
self.assertEqual(ex.kwargs['expected'],
{'task_state': [task_states.SCHEDULING]})
else:
self.fail('UnexpectedTaskStateError was not raised')
def test_instance_update_and_get_original_expected_task_state_single_pass(self): # noqa
# Ensure that we allow an update when expected task is correct
instance = self.create_instance_with_args()
(orig, new) = db.instance_update_and_get_original(
self.ctxt, instance['uuid'], {
'host': None,
'expected_task_state': None
})
self.assertIsNone(new['host'])
def test_instance_update_and_get_original_expected_task_state_multi_fail(self): # noqa
# Ensure that we detect a changed expected task and raise
# UnexpectedTaskStateError when there are multiple potential expected
# tasks
instance = self.create_instance_with_args()
try:
db.instance_update_and_get_original(
self.ctxt, instance['uuid'], {
'host': None,
'expected_task_state': [task_states.SCHEDULING,
task_states.REBUILDING]
})
except exception.UnexpectedTaskStateError as ex:
self.assertEqual(ex.kwargs['instance_uuid'], instance['uuid'])
self.assertEqual(ex.kwargs['actual'], {'task_state': None})
self.assertEqual(ex.kwargs['expected'],
{'task_state': [task_states.SCHEDULING,
task_states.REBUILDING]})
else:
self.fail('UnexpectedTaskStateError was not raised')
def test_instance_update_and_get_original_expected_task_state_multi_pass(self): # noqa
# Ensure that we allow an update when expected task is in a list of
# expected tasks
instance = self.create_instance_with_args()
(orig, new) = db.instance_update_and_get_original(
self.ctxt, instance['uuid'], {
'host': None,
'expected_task_state': [task_states.SCHEDULING, None]
})
self.assertIsNone(new['host'])
def test_instance_update_and_get_original_expected_task_state_deleting(self): # noqa
# Ensure that we raise UnepectedDeletingTaskStateError when task state
# is not as expected, and it is DELETING
instance = self.create_instance_with_args(
task_state=task_states.DELETING)
try:
db.instance_update_and_get_original(
self.ctxt, instance['uuid'], {
'host': None,
'expected_task_state': task_states.SCHEDULING
})
except exception.UnexpectedDeletingTaskStateError as ex:
self.assertEqual(ex.kwargs['instance_uuid'], instance['uuid'])
self.assertEqual(ex.kwargs['actual'],
{'task_state': task_states.DELETING})
self.assertEqual(ex.kwargs['expected'],
{'task_state': [task_states.SCHEDULING]})
else:
self.fail('UnexpectedDeletingTaskStateError was not raised')
def test_instance_update_unique_name(self):
context1 = context.RequestContext('user1', 'p1')
context2 = context.RequestContext('user2', 'p2')