Fixed dead-locks and race conditions in tasks execution
Also imporved detecting dead-locks in chains, instead of card-coded chains use generic approach - collect knowledge about chains and check a new one against knowledge base. Change-Id: Ie1549c14d95a372a341e1e6e616226f8ee226a03 Partial-Bug: 1618852
This commit is contained in:
parent
f45e49a11a
commit
31fb1923bb
|
@ -17,22 +17,58 @@
|
|||
import threading
|
||||
import traceback
|
||||
|
||||
import networkx
|
||||
|
||||
from nailgun.logger import logger
|
||||
from nailgun.settings import settings
|
||||
|
||||
ALLOWED_LOCKS_CHAINS = [
|
||||
('attributes', 'clusters'),
|
||||
('attributes', 'clusters', 'ip_addr_ranges'),
|
||||
('attributes', 'ip_addr_ranges'),
|
||||
('attributes', 'ip_addrs'),
|
||||
('attributes', 'ip_addrs', 'network_groups'),
|
||||
('attributes', 'ip_addr_ranges', 'node_nic_interfaces'),
|
||||
('clusters', 'nodes'),
|
||||
('tasks', 'clusters'),
|
||||
('tasks', 'clusters', 'nodes'),
|
||||
('tasks', 'nodes'),
|
||||
('nodes', 'node_nic_interfaces'),
|
||||
]
|
||||
|
||||
class LockChainsRegistry(object):
|
||||
def __init__(self):
|
||||
self._chains = networkx.DiGraph()
|
||||
self._cache = {}
|
||||
self._lock = threading.Lock()
|
||||
|
||||
def register(self, transition):
|
||||
"""Register and new chain.
|
||||
|
||||
:param transition: the name of tables in chain
|
||||
"""
|
||||
# because the number of possible combinations is limited
|
||||
# eventually all of them will be added to cache
|
||||
# and this method does not affect performance
|
||||
tmp = self._chains.copy()
|
||||
tmp.add_path(transition)
|
||||
# if after add transition graph contains cycles
|
||||
# transition has wrong order
|
||||
if networkx.is_directed_acyclic_graph(tmp):
|
||||
result = self._cache[transition] = True
|
||||
self._chains = tmp
|
||||
else:
|
||||
result = self._cache[transition] = False
|
||||
return result
|
||||
|
||||
def is_allowed(self, transition):
|
||||
"""Checks that transition does not introduce potential dead-lock."""
|
||||
try:
|
||||
return self._cache[transition]
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
with self._lock:
|
||||
try:
|
||||
# try again with exclusive access
|
||||
return self._cache[transition]
|
||||
except KeyError:
|
||||
return self.register(transition)
|
||||
|
||||
|
||||
_lock_chains_registry = LockChainsRegistry()
|
||||
|
||||
# well known chains
|
||||
_lock_chains_registry.register(('attributes', 'clusters'))
|
||||
_lock_chains_registry.register(('clusters', 'tasks'))
|
||||
_lock_chains_registry.register(('clusters', 'nodes'))
|
||||
|
||||
|
||||
class Lock(object):
|
||||
|
@ -131,8 +167,8 @@ class ObjectsLockingOrderViolation(DeadlockDetectorError):
|
|||
class LockTransitionNotAllowedError(DeadlockDetectorError):
|
||||
def __init__(self):
|
||||
msg = "Possible deadlock found while attempting " \
|
||||
"to lock table: '{0}'. Lock transition is not allowed: {1}. " \
|
||||
"Traceback info: {2}".format(
|
||||
"to lock table: '{0}'. Lock transition is not allowed: {1}.\n" \
|
||||
"Traceback info: {2}\n".format(
|
||||
context.locks[-1].table,
|
||||
', '.join(lock.table for lock in context.locks),
|
||||
self._get_locks_trace()
|
||||
|
@ -182,9 +218,9 @@ def register_lock(table):
|
|||
if len(context.locks) == 1:
|
||||
return lock
|
||||
|
||||
# Checking lock transition is allowed
|
||||
# Checking lock transition is same as in previous visit
|
||||
transition = tuple(l.table for l in context.locks)
|
||||
if transition not in ALLOWED_LOCKS_CHAINS:
|
||||
if not _lock_chains_registry.is_allowed(transition):
|
||||
Lock.propagate_exception(LockTransitionNotAllowedError())
|
||||
|
||||
return lock
|
||||
|
|
|
@ -60,6 +60,7 @@ def upgrade():
|
|||
upgrade_deployment_history_summary()
|
||||
upgrade_node_deployment_info()
|
||||
upgrade_add_task_start_end_time()
|
||||
fix_deployment_history_constraint()
|
||||
|
||||
|
||||
def downgrade():
|
||||
|
@ -470,3 +471,18 @@ def upgrade_add_task_start_end_time():
|
|||
def downgrade_add_task_start_end_time():
|
||||
op.drop_column('tasks', 'time_start')
|
||||
op.drop_column('tasks', 'time_end')
|
||||
|
||||
|
||||
def fix_deployment_history_constraint():
|
||||
# only recreate deployment_history_task_id_fkey with valid properties
|
||||
op.drop_constraint(
|
||||
'deployment_history_task_id_fkey',
|
||||
'deployment_history',
|
||||
type_='foreignkey'
|
||||
)
|
||||
|
||||
op.create_foreign_key(
|
||||
"deployment_history_task_id_fkey",
|
||||
"deployment_history", "tasks",
|
||||
["task_id"], ["id"], ondelete="CASCADE"
|
||||
)
|
||||
|
|
|
@ -42,8 +42,11 @@ class Task(NailgunObject):
|
|||
|
||||
@classmethod
|
||||
def get_by_uid(cls, uid, fail_if_not_found=False, lock_for_update=False):
|
||||
return cls.get_by_uid_excluding_deleted(uid, fail_if_not_found=False,
|
||||
lock_for_update=False)
|
||||
return cls.get_by_uid_excluding_deleted(
|
||||
uid,
|
||||
fail_if_not_found=fail_if_not_found,
|
||||
lock_for_update=lock_for_update
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def create_subtask(cls, instance, name):
|
||||
|
|
|
@ -26,7 +26,6 @@ from oslo_serialization import jsonutils
|
|||
from sqlalchemy import or_
|
||||
|
||||
from nailgun import consts
|
||||
from nailgun.errors import errors as nailgun_errors
|
||||
from nailgun import notifier
|
||||
from nailgun import objects
|
||||
from nailgun.settings import settings
|
||||
|
@ -50,6 +49,37 @@ logger = logging.getLogger('receiverd')
|
|||
|
||||
class NailgunReceiver(object):
|
||||
|
||||
@classmethod
|
||||
def acquire_lock(cls, transaction_uuid):
|
||||
"""Get transaction and acquire exclusive access.
|
||||
|
||||
:param transaction_uuid: the unique identifier of transaction
|
||||
:return: transaction object or None if there is no task with such uid
|
||||
"""
|
||||
# use transaction object to get removed by UI tasks
|
||||
transaction = objects.Transaction.get_by_uuid(transaction_uuid)
|
||||
if not transaction:
|
||||
logger.error("Task '%s' was removed.", transaction_uuid)
|
||||
return
|
||||
|
||||
# the lock order is following: cluster, task
|
||||
if transaction.cluster:
|
||||
objects.Cluster.get_by_uid(
|
||||
transaction.cluster_id,
|
||||
fail_if_not_found=True, lock_for_update=True
|
||||
)
|
||||
|
||||
# read transaction again to ensure
|
||||
# that it was not removed in other session
|
||||
transaction = objects.Transaction.get_by_uuid(
|
||||
transaction_uuid, lock_for_update=True)
|
||||
if not transaction:
|
||||
logger.error(
|
||||
"Race condition detected, task '%s' was removed.",
|
||||
transaction_uuid
|
||||
)
|
||||
return transaction
|
||||
|
||||
@classmethod
|
||||
def remove_nodes_resp(cls, **kwargs):
|
||||
logger.info(
|
||||
|
@ -67,19 +97,9 @@ class NailgunReceiver(object):
|
|||
progress = 100
|
||||
|
||||
# locking task
|
||||
task = objects.Task.get_by_uuid(
|
||||
task_uuid,
|
||||
fail_if_not_found=True,
|
||||
lock_for_update=True
|
||||
)
|
||||
|
||||
# locking cluster
|
||||
if task.cluster_id is not None:
|
||||
objects.Cluster.get_by_uid(
|
||||
task.cluster_id,
|
||||
fail_if_not_found=True,
|
||||
lock_for_update=True
|
||||
)
|
||||
task = cls.acquire_lock(task_uuid)
|
||||
if not task:
|
||||
return False
|
||||
|
||||
# locking nodes
|
||||
all_nodes = itertools.chain(nodes, error_nodes, inaccessible_nodes)
|
||||
|
@ -226,7 +246,9 @@ class NailgunReceiver(object):
|
|||
)
|
||||
status = kwargs.get('status')
|
||||
task_uuid = kwargs['task_uuid']
|
||||
task = objects.Task.get_by_uuid(task_uuid)
|
||||
task = cls.acquire_lock(task_uuid)
|
||||
if not task:
|
||||
return
|
||||
|
||||
if status == consts.TASK_STATUSES.ready:
|
||||
logger.info("IBP images from deleted cluster have been removed")
|
||||
|
@ -241,11 +263,10 @@ class NailgunReceiver(object):
|
|||
"RPC method transaction_resp received: %s", jsonutils.dumps(kwargs)
|
||||
)
|
||||
|
||||
transaction = objects.Task.get_by_uuid(
|
||||
kwargs.pop('task_uuid', None),
|
||||
fail_if_not_found=True,
|
||||
lock_for_update=True,
|
||||
)
|
||||
# TODO(bgaifullin) move lock to transaction manager
|
||||
transaction = cls.acquire_lock(kwargs.pop('task_uuid', None))
|
||||
if not transaction:
|
||||
return
|
||||
manager = transactions.TransactionsManager(transaction.cluster.id)
|
||||
manager.process(transaction, kwargs)
|
||||
|
||||
|
@ -261,18 +282,9 @@ class NailgunReceiver(object):
|
|||
status = kwargs.get('status')
|
||||
progress = kwargs.get('progress')
|
||||
|
||||
task = objects.Task.get_by_uuid(
|
||||
task_uuid,
|
||||
fail_if_not_found=True,
|
||||
lock_for_update=True
|
||||
)
|
||||
|
||||
# lock cluster
|
||||
objects.Cluster.get_by_uid(
|
||||
task.cluster_id,
|
||||
fail_if_not_found=True,
|
||||
lock_for_update=True
|
||||
)
|
||||
task = cls.acquire_lock(task_uuid)
|
||||
if not task:
|
||||
return
|
||||
|
||||
if not status:
|
||||
status = task.status
|
||||
|
@ -377,11 +389,9 @@ class NailgunReceiver(object):
|
|||
progress = kwargs.get('progress')
|
||||
nodes = kwargs.get('nodes', [])
|
||||
|
||||
task = objects.Task.get_by_uuid(
|
||||
task_uuid,
|
||||
fail_if_not_found=True,
|
||||
lock_for_update=True
|
||||
)
|
||||
task = cls.acquire_lock(task_uuid)
|
||||
if not task:
|
||||
return
|
||||
|
||||
# we should remove master node from the nodes since it requires
|
||||
# special handling and won't work with old code
|
||||
|
@ -432,11 +442,9 @@ class NailgunReceiver(object):
|
|||
status = kwargs.get('status')
|
||||
progress = kwargs.get('progress')
|
||||
|
||||
task = objects.Task.get_by_uuid(
|
||||
task_uuid,
|
||||
fail_if_not_found=True,
|
||||
lock_for_update=True
|
||||
)
|
||||
task = cls.acquire_lock(task_uuid)
|
||||
if not task:
|
||||
return
|
||||
|
||||
q_nodes = objects.NodeCollection.filter_by_id_list(
|
||||
None, task.cache['nodes'])
|
||||
|
@ -667,10 +675,9 @@ class NailgunReceiver(object):
|
|||
status = kwargs.get('status')
|
||||
progress = kwargs.get('progress')
|
||||
|
||||
task = objects.Task.get_by_uuid(
|
||||
task_uuid,
|
||||
fail_if_not_found=True,
|
||||
)
|
||||
task = cls.acquire_lock(task_uuid)
|
||||
if not task:
|
||||
return
|
||||
|
||||
stopping_task_names = [
|
||||
consts.TASK_NAMES.deploy,
|
||||
|
@ -693,13 +700,6 @@ class NailgunReceiver(object):
|
|||
'id'
|
||||
).all()
|
||||
|
||||
# Locking cluster
|
||||
objects.Cluster.get_by_uid(
|
||||
task.cluster_id,
|
||||
fail_if_not_found=True,
|
||||
lock_for_update=True
|
||||
)
|
||||
|
||||
if not stop_tasks:
|
||||
logger.warning("stop_deployment_resp: deployment tasks \
|
||||
not found for environment '%s'!", task.cluster_id)
|
||||
|
@ -821,18 +821,9 @@ class NailgunReceiver(object):
|
|||
status = kwargs.get('status')
|
||||
progress = kwargs.get('progress')
|
||||
|
||||
task = objects.Task.get_by_uuid(
|
||||
task_uuid,
|
||||
fail_if_not_found=True,
|
||||
lock_for_update=True
|
||||
)
|
||||
|
||||
# Locking cluster
|
||||
objects.Cluster.get_by_uid(
|
||||
task.cluster_id,
|
||||
fail_if_not_found=True,
|
||||
lock_for_update=True
|
||||
)
|
||||
task = cls.acquire_lock(task_uuid)
|
||||
if not task:
|
||||
return
|
||||
|
||||
if status == consts.TASK_STATUSES.ready:
|
||||
# restoring pending changes
|
||||
|
@ -922,8 +913,9 @@ class NailgunReceiver(object):
|
|||
status = kwargs.get('status')
|
||||
progress = kwargs.get('progress')
|
||||
|
||||
# We simply check that each node received all vlans for cluster
|
||||
task = objects.Task.get_by_uuid(task_uuid, fail_if_not_found=True)
|
||||
task = cls.acquire_lock(task_uuid)
|
||||
if not task:
|
||||
return
|
||||
|
||||
result = []
|
||||
# We expect that 'nodes' contains all nodes which we test.
|
||||
|
@ -1049,7 +1041,10 @@ class NailgunReceiver(object):
|
|||
jsonutils.dumps(kwargs))
|
||||
)
|
||||
task_uuid = kwargs.get('task_uuid')
|
||||
task = objects.task.Task.get_by_uuid(uuid=task_uuid)
|
||||
task = cls.acquire_lock(task_uuid)
|
||||
if not task:
|
||||
return
|
||||
|
||||
if kwargs.get('status'):
|
||||
task.status = kwargs['status']
|
||||
task.progress = kwargs.get('progress', 0)
|
||||
|
@ -1116,6 +1111,10 @@ class NailgunReceiver(object):
|
|||
status = kwargs.get('status')
|
||||
progress = kwargs.get('progress')
|
||||
|
||||
task = cls.acquire_lock(task_uuid)
|
||||
if not task:
|
||||
return
|
||||
|
||||
nodes_uids = [node['uid'] for node in nodes]
|
||||
nodes_db = db().query(Node).filter(Node.id.in_(nodes_uids)).all()
|
||||
nodes_map = dict((str(node.id), node) for node in nodes_db)
|
||||
|
@ -1142,7 +1141,6 @@ class NailgunReceiver(object):
|
|||
error_msg = '\n'.join(messages) if messages else error_msg
|
||||
logger.debug('Check dhcp message %s', error_msg)
|
||||
|
||||
task = objects.Task.get_by_uuid(task_uuid, fail_if_not_found=True)
|
||||
objects.Task.update_verify_networks(task, status, progress,
|
||||
error_msg, result)
|
||||
|
||||
|
@ -1157,7 +1155,9 @@ class NailgunReceiver(object):
|
|||
status = kwargs.get('status')
|
||||
progress = kwargs.get('progress')
|
||||
|
||||
task = objects.Task.get_by_uuid(task_uuid, fail_if_not_found=True)
|
||||
task = cls.acquire_lock(task_uuid)
|
||||
if not task:
|
||||
return
|
||||
|
||||
release_info = task.cache['args']['release_info']
|
||||
release_id = release_info['release_id']
|
||||
|
@ -1200,7 +1200,9 @@ class NailgunReceiver(object):
|
|||
error = kwargs.get('error')
|
||||
msg = kwargs.get('msg')
|
||||
|
||||
task = objects.Task.get_by_uuid(task_uuid, fail_if_not_found=True)
|
||||
task = cls.acquire_lock(task_uuid)
|
||||
if not task:
|
||||
return
|
||||
|
||||
if status == 'error':
|
||||
notifier.notify('error', error)
|
||||
|
@ -1229,8 +1231,9 @@ class NailgunReceiver(object):
|
|||
error = kwargs.get('error')
|
||||
message = kwargs.get('msg')
|
||||
|
||||
task = objects.Task.get_by_uuid(
|
||||
task_uuid, fail_if_not_found=True, lock_for_update=True)
|
||||
task = cls.acquire_lock(task_uuid)
|
||||
if not task:
|
||||
return
|
||||
|
||||
if status not in (consts.TASK_STATUSES.ready,
|
||||
consts.TASK_STATUSES.error):
|
||||
|
@ -1281,8 +1284,9 @@ class NailgunReceiver(object):
|
|||
status = consts.TASK_STATUSES.ready
|
||||
progress = 100
|
||||
|
||||
task = objects.Task.get_by_uuid(
|
||||
task_uuid, fail_if_not_found=True)
|
||||
task = cls.acquire_lock(task_uuid)
|
||||
if not task:
|
||||
return
|
||||
|
||||
failed_response_nodes = {
|
||||
n['uid']: n for n in response if n['status'] != 0
|
||||
|
@ -1364,22 +1368,19 @@ class NailgunReceiver(object):
|
|||
jsonutils.dumps(kwargs))
|
||||
|
||||
task_uuid = kwargs.get('task_uuid')
|
||||
task = cls.acquire_lock(task_uuid)
|
||||
if not task:
|
||||
return
|
||||
|
||||
try:
|
||||
task = objects.Task.get_by_uuid(task_uuid, fail_if_not_found=True,
|
||||
lock_for_update=True)
|
||||
if task.status == consts.TASK_STATUSES.pending:
|
||||
objects.Task.update(
|
||||
task, {'status': consts.TASK_STATUSES.running})
|
||||
logger.debug("Task '%s' is acknowledged as running",
|
||||
task_uuid)
|
||||
else:
|
||||
logger.debug("Task '%s' in status '%s' can not "
|
||||
"be acknowledged as running", task_uuid,
|
||||
task.status)
|
||||
except nailgun_errors.ObjectNotFound:
|
||||
logger.warning("Task '%s' acknowledgement as running failed "
|
||||
"due to task doesn't exist in DB", task_uuid)
|
||||
if task.status == consts.TASK_STATUSES.pending:
|
||||
objects.Task.update(
|
||||
task, {'status': consts.TASK_STATUSES.running})
|
||||
logger.debug("Task '%s' is acknowledged as running",
|
||||
task_uuid)
|
||||
else:
|
||||
logger.debug("Task '%s' in status '%s' can not "
|
||||
"be acknowledged as running", task_uuid,
|
||||
task.status)
|
||||
|
||||
@classmethod
|
||||
def update_dnsmasq_resp(cls, **kwargs):
|
||||
|
@ -1391,8 +1392,9 @@ class NailgunReceiver(object):
|
|||
error = kwargs.get('error', '')
|
||||
message = kwargs.get('msg', '')
|
||||
|
||||
task = objects.Task.get_by_uuid(
|
||||
task_uuid, fail_if_not_found=True, lock_for_update=True)
|
||||
task = cls.acquire_lock(task_uuid)
|
||||
if not task:
|
||||
return
|
||||
|
||||
data = {'status': status, 'progress': 100, 'message': message}
|
||||
if status == consts.TASK_STATUSES.error:
|
||||
|
|
|
@ -847,28 +847,43 @@ class StopDeploymentTaskManager(TaskManager):
|
|||
return task
|
||||
|
||||
|
||||
class ResetEnvironmentTaskManager(TaskManager):
|
||||
class ClearTaskHistory(TaskManager):
|
||||
def clear_tasks_history(self, force=False):
|
||||
try:
|
||||
self.check_running_task(delete_obsolete=False)
|
||||
except errors.TaskAlreadyRunning:
|
||||
if not force:
|
||||
raise
|
||||
|
||||
logger.error(
|
||||
u"Force stop running tasks for cluster %s", self.cluster.name
|
||||
)
|
||||
running_tasks = objects.TaskCollection.all_in_progress(
|
||||
self.cluster.id
|
||||
)
|
||||
for task in running_tasks:
|
||||
# Force set task to finished state and update action log
|
||||
TaskHelper.set_ready_if_not_finished(task)
|
||||
|
||||
# clear tasks history
|
||||
cluster_tasks = objects.TaskCollection.get_cluster_tasks(
|
||||
self.cluster.id
|
||||
)
|
||||
cluster_tasks.delete(synchronize_session='fetch')
|
||||
|
||||
|
||||
class ResetEnvironmentTaskManager(ClearTaskHistory):
|
||||
|
||||
def execute(self, force=False, **kwargs):
|
||||
try:
|
||||
self.check_running_task(
|
||||
delete_obsolete=objects.Task.hard_delete
|
||||
)
|
||||
self.clear_tasks_history(force=force)
|
||||
except errors.TaskAlreadyRunning:
|
||||
if force:
|
||||
logger.error(
|
||||
u"Reset cluster '{0}' "
|
||||
u"while deployment is still running."
|
||||
.format(self.cluster.name)
|
||||
)
|
||||
|
||||
else:
|
||||
raise errors.DeploymentAlreadyStarted(
|
||||
"Can't reset environment '{0}' when "
|
||||
"running deployment task exists.".format(
|
||||
self.cluster.id
|
||||
)
|
||||
raise errors.DeploymentAlreadyStarted(
|
||||
"Can't reset environment '{0}' when "
|
||||
"running deployment task exists.".format(
|
||||
self.cluster.id
|
||||
)
|
||||
)
|
||||
|
||||
# FIXME(aroma): remove updating of 'deployed_before'
|
||||
# when stop action is reworked. 'deployed_before'
|
||||
|
@ -1111,33 +1126,17 @@ class VerifyNetworksTaskManager(TaskManager):
|
|||
return task
|
||||
|
||||
|
||||
class ClusterDeletionManager(TaskManager):
|
||||
|
||||
class ClusterDeletionManager(ClearTaskHistory):
|
||||
def execute(self, force=False, **kwargs):
|
||||
try:
|
||||
self.check_running_task(
|
||||
delete_obsolete=objects.Task.hard_delete
|
||||
)
|
||||
self.clear_tasks_history(force=force)
|
||||
except errors.TaskAlreadyRunning:
|
||||
if force:
|
||||
logger.warning(
|
||||
u"Deletion cluster '{0}' "
|
||||
u"while deployment is still running."
|
||||
.format(self.cluster.name)
|
||||
)
|
||||
running_tasks = objects.TaskCollection.all_in_progress(
|
||||
raise errors.DeploymentAlreadyStarted(
|
||||
u"Can't delete environment '{0}' when "
|
||||
u"running deployment task exists.".format(
|
||||
self.cluster.id
|
||||
)
|
||||
for task in running_tasks:
|
||||
# Force set task to finished state and update action log
|
||||
TaskHelper.set_ready_if_not_finished(task)
|
||||
else:
|
||||
raise errors.DeploymentAlreadyStarted(
|
||||
u"Can't delete environment '{0}' when "
|
||||
u"running deployment task exists.".format(
|
||||
self.cluster.id
|
||||
)
|
||||
)
|
||||
)
|
||||
# locking nodes
|
||||
nodes = objects.NodeCollection.filter_by(
|
||||
None,
|
||||
|
|
|
@ -91,16 +91,23 @@ class TestDeadlockDetector(BaseTestCase):
|
|||
db().rollback()
|
||||
self.assertEquals(0, len(dd.context.locks))
|
||||
|
||||
def test_unknown_locks_chain_failed(self):
|
||||
def test_different_order_in_chains_detected(self):
|
||||
db().query(models.Release).with_lockmode('update').all()
|
||||
db().query(models.Node).with_lockmode('update').all()
|
||||
db().rollback()
|
||||
|
||||
db().query(models.Node).with_lockmode('update').all()
|
||||
self.assertRaises(
|
||||
dd.LockTransitionNotAllowedError,
|
||||
db().query(models.Node).with_lockmode, 'update'
|
||||
db().query(models.Release).with_lockmode, 'update'
|
||||
)
|
||||
db().rollback()
|
||||
|
||||
db().query(models.Task).with_lockmode('update').all()
|
||||
db().query(models.Cluster).with_lockmode('update').all()
|
||||
db().query(models.Task).with_lockmode('update').all()
|
||||
db().query(models.Node).with_lockmode('update').all()
|
||||
db().rollback()
|
||||
|
||||
db().query(models.Node).with_lockmode('update').all()
|
||||
self.assertRaises(
|
||||
dd.LockTransitionNotAllowedError,
|
||||
|
|
Loading…
Reference in New Issue