Reject actions if target resource is locked

- If an action targets a cluster or node that is already locked, reject
  the action with 409 error.
- Added simulated wait time option to nova test driver to allow tempest
  to test resource locked exception.

Implements: blueprint fail-fast-locked-resource
Change-Id: I27b6996c65ffd1777dad5047f869596373e693c7
This commit is contained in:
Duc Truong 2018-08-28 21:25:19 +00:00
parent b1a0ab8e27
commit a34e45b696
12 changed files with 274 additions and 2 deletions

View File

@ -56,6 +56,8 @@ class FaultWrapper(wsgi.Middleware):
'ProfileOperationFailed': webob.exc.HTTPInternalServerError,
'RequestLimitExceeded': webob.exc.HTTPBadRequest,
'ResourceInUse': webob.exc.HTTPConflict,
'ResourceIsLocked': webob.exc.HTTPConflict,
'ActionConflict': webob.exc.HTTPConflict,
'ResourceNotFound': webob.exc.HTTPNotFound,
}

View File

@ -128,6 +128,15 @@ class ResourceInUse(SenlinException):
msg_fmt = _("The %(type)s '%(id)s' cannot be deleted: %(reason)s.")
class ResourceIsLocked(SenlinException):
"""Generic exception for resource in use.
The resource type here can be 'cluster', 'node'.
"""
msg_fmt = _("%(action)s for %(type)s '%(id)s' cannot be completed "
"because it is already locked.")
class ProfileNotSpecified(SenlinException):
msg_fmt = _("Profile not specified.")
@ -180,6 +189,11 @@ class ActionInProgress(SenlinException):
msg_fmt = _("The %(type)s '%(id)s' is in status %(status)s.")
class ActionConflict(SenlinException):
msg_fmt = _("The %(type)s action for target %(target)s conflicts with "
"the following action(s): %(actions)s")
class NodeNotOrphan(SenlinException):
msg_fmt = _("%(message)s")

View File

@ -129,6 +129,10 @@ def cluster_lock_acquire(cluster_id, action_id, scope):
return IMPL.cluster_lock_acquire(cluster_id, action_id, scope)
def cluster_is_locked(cluster_id):
return IMPL.cluster_is_locked(cluster_id)
def cluster_lock_release(cluster_id, action_id, scope):
return IMPL.cluster_lock_release(cluster_id, action_id, scope)
@ -141,6 +145,10 @@ def node_lock_acquire(node_id, action_id):
return IMPL.node_lock_acquire(node_id, action_id)
def node_is_locked(node_id):
return IMPL.node_is_locked(node_id)
def node_lock_release(node_id, action_id):
return IMPL.node_lock_release(node_id, action_id)
@ -334,6 +342,11 @@ def action_get_all_by_owner(context, owner):
return IMPL.action_get_all_by_owner(context, owner)
def action_get_all_active_by_target(context, target_id, project_safe=True):
return IMPL.action_get_all_active_by_target(context, target_id,
project_safe=project_safe)
def action_get_all(context, filters=None, limit=None, marker=None, sort=None,
project_safe=True):
return IMPL.action_get_all(context, filters=filters, sort=sort,

View File

@ -462,6 +462,14 @@ def cluster_lock_acquire(cluster_id, action_id, scope):
return lock.action_ids
@retry_on_deadlock
def cluster_is_locked(cluster_id):
with session_for_read() as session:
query = session.query(models.ClusterLock)
lock = query.get(cluster_id)
return lock is not None
@retry_on_deadlock
def _release_cluster_lock(session, lock, action_id, scope):
success = False
@ -529,6 +537,15 @@ def node_lock_acquire(node_id, action_id):
return lock.action_id
@retry_on_deadlock
def node_is_locked(node_id):
with session_for_read() as session:
query = session.query(models.NodeLock)
lock = query.get(node_id)
return lock is not None
@retry_on_deadlock
def node_lock_release(node_id, action_id):
with session_for_write() as session:
@ -1065,6 +1082,22 @@ def action_get_all_by_owner(context, owner_id):
return query.all()
def action_get_all_active_by_target(context, target_id, project_safe=True):
with session_for_read() as session:
query = session.query(models.Action)
if project_safe:
query = query.filter_by(project=context.project_id)
query = query.filter_by(target=target_id)
query = query.filter(
models.Action.status.in_(
[consts.ACTION_READY,
consts.ACTION_WAITING,
consts.ACTION_RUNNING,
consts.ACTION_WAITING_LIFECYCLE_COMPLETION]))
actions = query.all()
return actions
def action_get_all(context, filters=None, limit=None, marker=None, sort=None,
project_safe=True):

View File

@ -11,6 +11,7 @@
# under the License.
import copy
import time
from oslo_utils import uuidutils
@ -161,6 +162,8 @@ class NovaClient(base.DriverBase):
'zoneName': 'nova',
}
self.simulated_waits = {}
def flavor_find(self, name_or_id, ignore_missing=False):
return sdk.FakeResourceObject(self.fake_flavor)
@ -180,17 +183,33 @@ class NovaClient(base.DriverBase):
return sdk.FakeResourceObject(self.keypair)
def server_create(self, **attrs):
self.fake_server_create['id'] = uuidutils.generate_uuid()
self.fake_server_get['id'] = self.fake_server_create['id']
server_id = uuidutils.generate_uuid()
self.fake_server_create['id'] = server_id
self.fake_server_get['id'] = server_id
# save simulated wait time if it was set in metadata
if ('metadata' in attrs and
'simulated_wait_time' in attrs['metadata']):
simulated_wait = attrs['metadata']['simulated_wait_time']
if (isinstance(simulated_wait, int) and simulated_wait > 0):
self.simulated_waits[server_id] = simulated_wait
return sdk.FakeResourceObject(self.fake_server_create)
def server_get(self, server):
return sdk.FakeResourceObject(self.fake_server_get)
def wait_for_server(self, server, timeout=None):
# sleep for simulated wait time if it was supplied during server_create
if server in self.simulated_waits:
time.sleep(self.simulated_waits[server])
return
def wait_for_server_delete(self, server, timeout=None):
# sleep for simulated wait time if it was supplied during server_create
if server in self.simulated_waits:
time.sleep(self.simulated_waits[server])
del self.simulated_waits[server]
return
def server_update(self, server, **attrs):

View File

@ -25,8 +25,10 @@ from senlin.common import utils
from senlin.engine import dispatcher
from senlin.engine import event as EVENT
from senlin.objects import action as ao
from senlin.objects import cluster_lock as cl
from senlin.objects import cluster_policy as cpo
from senlin.objects import dependency as dobj
from senlin.objects import node_lock as nl
from senlin.policies import base as policy_mod
wallclock = time.time
@ -247,6 +249,21 @@ class Action(object):
:param dict kwargs: Other keyword arguments for the action.
:return: ID of the action created.
"""
if (action in list(consts.CLUSTER_ACTION_NAMES) and
cl.ClusterLock.is_locked(target)):
raise exception.ResourceIsLocked(
action=action, type='cluster', id=target)
elif (action in list(consts.NODE_ACTION_NAMES) and
nl.NodeLock.is_locked(target)):
raise exception.ResourceIsLocked(
action=action, type='node', id=target)
conflict_actions = ao.Action.get_all_active_by_target(ctx, target)
if conflict_actions:
action_ids = [a['id'] for a in conflict_actions]
raise exception.ActionConflict(
type=action, target=target, actions=",".join(action_ids))
params = {
'user_id': ctx.user_id,
'project_id': ctx.project_id,
@ -256,6 +273,7 @@ class Action(object):
'trusts': ctx.trusts,
}
c = req_context.RequestContext.from_dict(params)
obj = cls(target, action, c, **kwargs)
return obj.store(ctx)

View File

@ -105,6 +105,11 @@ class Action(base.SenlinObject, base.VersionedObjectDictCompat):
objs = db_api.action_get_all_by_owner(context, owner)
return [cls._from_db_object(context, cls(), obj) for obj in objs]
@classmethod
def get_all_active_by_target(cls, context, target):
objs = db_api.action_get_all_active_by_target(context, target)
return [cls._from_db_object(context, cls(), obj) for obj in objs]
@classmethod
def check_status(cls, context, action_id, timestamp):
return db_api.action_check_status(context, action_id, timestamp)

View File

@ -31,6 +31,10 @@ class ClusterLock(base.SenlinObject, base.VersionedObjectDictCompat):
def acquire(cls, cluster_id, action_id, scope):
return db_api.cluster_lock_acquire(cluster_id, action_id, scope)
@classmethod
def is_locked(cls, cluster_id):
return db_api.cluster_is_locked(cluster_id)
@classmethod
def release(cls, cluster_id, action_id, scope):
return db_api.cluster_lock_release(cluster_id, action_id, scope)

View File

@ -30,6 +30,10 @@ class NodeLock(base.SenlinObject, base.VersionedObjectDictCompat):
def acquire(cls, node_id, action_id):
return db_api.node_lock_acquire(node_id, action_id)
@classmethod
def is_locked(cls, cluster_id):
return db_api.node_is_locked(cluster_id)
@classmethod
def release(cls, node_id, action_id):
return db_api.node_lock_release(node_id, action_id)

View File

@ -244,6 +244,32 @@ class DBAPIActionTest(base.SenlinTestCase):
for spec in specs:
self.assertIn(spec['name'], names)
def test_action_get_all_active_by_target(self):
specs = [
{'name': 'A01', 'target': 'cluster_001', 'status': 'READY'},
{'name': 'A02', 'target': 'node_001'},
{'name': 'A03', 'target': 'cluster_001', 'status': 'INIT'},
{'name': 'A04', 'target': 'cluster_001', 'status': 'WAITING'},
{'name': 'A05', 'target': 'cluster_001', 'status': 'READY'},
{'name': 'A06', 'target': 'cluster_001', 'status': 'RUNNING'},
{'name': 'A07', 'target': 'cluster_001', 'status': 'SUCCEEDED'},
{'name': 'A08', 'target': 'cluster_001', 'status': 'FAILED'},
{'name': 'A09', 'target': 'cluster_001', 'status': 'CANCELLED'},
{'name': 'A10', 'target': 'cluster_001',
'status': 'WAITING_LIFECYCLE_COMPLETION'},
{'name': 'A11', 'target': 'cluster_001', 'status': 'SUSPENDED'},
]
for spec in specs:
_create_action(self.ctx, **spec)
actions = db_api.action_get_all_active_by_target(self.ctx,
'cluster_001')
self.assertEqual(5, len(actions))
names = [p.name for p in actions]
for name in names:
self.assertIn(name, ['A01', 'A04', 'A05', 'A06', 'A10'])
def test_action_get_all_project_safe(self):
parser.simple_parse(shared.sample_action)
_create_action(self.ctx)

View File

@ -174,6 +174,27 @@ class DBAPILockTest(base.SenlinTestCase):
observed = db_api.cluster_lock_release(self.cluster.id, UUID1, -1)
self.assertTrue(observed)
def test_cluster_is_locked(self):
# newly created cluster should not be locked
observed = db_api.cluster_is_locked(self.cluster.id)
self.assertFalse(observed)
# lock cluster
observed = db_api.cluster_lock_acquire(self.cluster.id, UUID1, -1)
self.assertIn(UUID1, observed)
# cluster should be locked
observed = db_api.cluster_is_locked(self.cluster.id)
self.assertTrue(observed)
# release cluster lock
observed = db_api.cluster_lock_release(self.cluster.id, UUID1, -1)
self.assertTrue(observed)
# cluster should not be locked anymore
observed = db_api.cluster_is_locked(self.cluster.id)
self.assertFalse(observed)
def test_node_lock_acquire_release(self):
observed = db_api.node_lock_acquire(self.node.id, UUID1)
self.assertEqual(UUID1, observed)
@ -221,6 +242,27 @@ class DBAPILockTest(base.SenlinTestCase):
observed = db_api.node_lock_release(self.node.id, UUID2)
self.assertTrue(observed)
def test_node_is_locked(self):
# newly created node should not be locked
observed = db_api.node_is_locked(self.node.id)
self.assertFalse(observed)
# lock node
observed = db_api.node_lock_acquire(self.node.id, UUID1)
self.assertIn(UUID1, observed)
# node should be locked
observed = db_api.node_is_locked(self.node.id)
self.assertTrue(observed)
# release node lock
observed = db_api.node_lock_release(self.node.id, UUID1)
self.assertTrue(observed)
# node should not be locked anymore
observed = db_api.node_is_locked(self.node.id)
self.assertFalse(observed)
class GCByEngineTest(base.SenlinTestCase):

View File

@ -29,8 +29,10 @@ from senlin.engine import environment
from senlin.engine import event as EVENT
from senlin.engine import node as node_mod
from senlin.objects import action as ao
from senlin.objects import cluster_lock as cl
from senlin.objects import cluster_policy as cpo
from senlin.objects import dependency as dobj
from senlin.objects import node_lock as nl
from senlin.policies import base as policy_mod
from senlin.tests.unit.common import base
from senlin.tests.unit.common import utils
@ -245,6 +247,96 @@ class ActionBaseTest(base.SenlinTestCase):
self.assertEqual('FAKE_ID', result)
mock_store.assert_called_once_with(self.ctx)
@mock.patch.object(ab.Action, 'store')
@mock.patch.object(ao.Action, 'get_all_active_by_target')
@mock.patch.object(cl.ClusterLock, 'is_locked')
def test_action_create_lock_cluster_false(self, mock_lock,
mock_active, mock_store):
mock_store.return_value = 'FAKE_ID'
mock_active.return_value = None
mock_lock.return_value = False
result = ab.Action.create(self.ctx, OBJID, 'CLUSTER_CREATE',
name='test')
self.assertEqual('FAKE_ID', result)
mock_store.assert_called_once_with(self.ctx)
mock_active.assert_called_once_with(mock.ANY, OBJID)
@mock.patch.object(ab.Action, 'store')
@mock.patch.object(ao.Action, 'get_all_active_by_target')
@mock.patch.object(cl.ClusterLock, 'is_locked')
def test_action_create_lock_cluster_true(self, mock_lock,
mock_active, mock_store):
mock_store.return_value = 'FAKE_ID'
mock_active.return_value = None
mock_lock.return_value = True
error_message = (
'CLUSTER_CREATE for cluster \'{}\' cannot be completed because '
'it is already locked.').format(OBJID)
with self.assertRaisesRegexp(exception.ResourceIsLocked,
error_message):
ab.Action.create(self.ctx, OBJID, 'CLUSTER_CREATE', name='test')
mock_store.assert_not_called()
mock_active.assert_not_called()
@mock.patch.object(ab.Action, 'store')
@mock.patch.object(ao.Action, 'get_all_active_by_target')
@mock.patch.object(nl.NodeLock, 'is_locked')
def test_action_create_lock_node_false(self, mock_lock,
mock_active, mock_store):
mock_store.return_value = 'FAKE_ID'
mock_active.return_value = None
mock_lock.return_value = False
result = ab.Action.create(self.ctx, OBJID, 'NODE_CREATE',
name='test')
self.assertEqual('FAKE_ID', result)
mock_store.assert_called_once_with(self.ctx)
mock_active.assert_called_once_with(mock.ANY, OBJID)
@mock.patch.object(ab.Action, 'store')
@mock.patch.object(ao.Action, 'get_all_active_by_target')
@mock.patch.object(nl.NodeLock, 'is_locked')
def test_action_create_lock_node_true(self, mock_lock, mock_active,
mock_store):
mock_store.return_value = 'FAKE_ID'
mock_active.return_value = None
mock_lock.return_value = True
error_message = (
'NODE_CREATE for node \'{}\' cannot be completed because '
'it is already locked.').format(OBJID)
with self.assertRaisesRegexp(exception.ResourceIsLocked,
error_message):
ab.Action.create(self.ctx, OBJID, 'NODE_CREATE', name='test')
mock_store.assert_not_called()
mock_active.assert_not_called()
@mock.patch.object(ab.Action, 'store')
@mock.patch.object(ao.Action, 'get_all_active_by_target')
@mock.patch.object(cl.ClusterLock, 'is_locked')
def test_action_create_conflict(self, mock_lock, mock_active, mock_store):
mock_store.return_value = 'FAKE_ID'
uuid1 = 'ce982cd5-26da-4e2c-84e5-be8f720b7478'
uuid2 = 'ce982cd5-26da-4e2c-84e5-be8f720b7479'
mock_active.return_value = [ao.Action(id=uuid1), ao.Action(id=uuid2)]
mock_lock.return_value = False
error_message = (
'The NODE_CREATE action for target {} conflicts with the following'
' action\(s\): {},{}').format(OBJID, uuid1, uuid2)
with self.assertRaisesRegexp(exception.ActionConflict,
error_message):
ab.Action.create(self.ctx, OBJID, 'NODE_CREATE', name='test')
mock_store.assert_not_called()
mock_active.assert_called_once_with(mock.ANY, OBJID)
def test_action_delete(self):
result = ab.Action.delete(self.ctx, 'non-existent')
self.assertIsNone(result)