Fixed dead-locks and race conditions in tasks execution

Also imporved detecting dead-locks in chains, instead
of card-coded chains use generic approach - collect
knowledge about chains and check a new one against
knowledge base.

Change-Id: Ie1549c14d95a372a341e1e6e616226f8ee226a03
Partial-Bug: 1618852
This commit is contained in:
Bulat Gaifullin 2016-09-02 13:43:46 +03:00
parent f45e49a11a
commit 31fb1923bb
6 changed files with 216 additions and 153 deletions

View File

@ -17,22 +17,58 @@
import threading
import traceback
import networkx
from nailgun.logger import logger
from nailgun.settings import settings
ALLOWED_LOCKS_CHAINS = [
('attributes', 'clusters'),
('attributes', 'clusters', 'ip_addr_ranges'),
('attributes', 'ip_addr_ranges'),
('attributes', 'ip_addrs'),
('attributes', 'ip_addrs', 'network_groups'),
('attributes', 'ip_addr_ranges', 'node_nic_interfaces'),
('clusters', 'nodes'),
('tasks', 'clusters'),
('tasks', 'clusters', 'nodes'),
('tasks', 'nodes'),
('nodes', 'node_nic_interfaces'),
]
class LockChainsRegistry(object):
def __init__(self):
self._chains = networkx.DiGraph()
self._cache = {}
self._lock = threading.Lock()
def register(self, transition):
"""Register and new chain.
:param transition: the name of tables in chain
"""
# because the number of possible combinations is limited
# eventually all of them will be added to cache
# and this method does not affect performance
tmp = self._chains.copy()
tmp.add_path(transition)
# if after add transition graph contains cycles
# transition has wrong order
if networkx.is_directed_acyclic_graph(tmp):
result = self._cache[transition] = True
self._chains = tmp
else:
result = self._cache[transition] = False
return result
def is_allowed(self, transition):
"""Checks that transition does not introduce potential dead-lock."""
try:
return self._cache[transition]
except KeyError:
pass
with self._lock:
try:
# try again with exclusive access
return self._cache[transition]
except KeyError:
return self.register(transition)
_lock_chains_registry = LockChainsRegistry()
# well known chains
_lock_chains_registry.register(('attributes', 'clusters'))
_lock_chains_registry.register(('clusters', 'tasks'))
_lock_chains_registry.register(('clusters', 'nodes'))
class Lock(object):
@ -131,8 +167,8 @@ class ObjectsLockingOrderViolation(DeadlockDetectorError):
class LockTransitionNotAllowedError(DeadlockDetectorError):
def __init__(self):
msg = "Possible deadlock found while attempting " \
"to lock table: '{0}'. Lock transition is not allowed: {1}. " \
"Traceback info: {2}".format(
"to lock table: '{0}'. Lock transition is not allowed: {1}.\n" \
"Traceback info: {2}\n".format(
context.locks[-1].table,
', '.join(lock.table for lock in context.locks),
self._get_locks_trace()
@ -182,9 +218,9 @@ def register_lock(table):
if len(context.locks) == 1:
return lock
# Checking lock transition is allowed
# Checking lock transition is same as in previous visit
transition = tuple(l.table for l in context.locks)
if transition not in ALLOWED_LOCKS_CHAINS:
if not _lock_chains_registry.is_allowed(transition):
Lock.propagate_exception(LockTransitionNotAllowedError())
return lock

View File

@ -60,6 +60,7 @@ def upgrade():
upgrade_deployment_history_summary()
upgrade_node_deployment_info()
upgrade_add_task_start_end_time()
fix_deployment_history_constraint()
def downgrade():
@ -470,3 +471,18 @@ def upgrade_add_task_start_end_time():
def downgrade_add_task_start_end_time():
op.drop_column('tasks', 'time_start')
op.drop_column('tasks', 'time_end')
def fix_deployment_history_constraint():
# only recreate deployment_history_task_id_fkey with valid properties
op.drop_constraint(
'deployment_history_task_id_fkey',
'deployment_history',
type_='foreignkey'
)
op.create_foreign_key(
"deployment_history_task_id_fkey",
"deployment_history", "tasks",
["task_id"], ["id"], ondelete="CASCADE"
)

View File

@ -42,8 +42,11 @@ class Task(NailgunObject):
@classmethod
def get_by_uid(cls, uid, fail_if_not_found=False, lock_for_update=False):
return cls.get_by_uid_excluding_deleted(uid, fail_if_not_found=False,
lock_for_update=False)
return cls.get_by_uid_excluding_deleted(
uid,
fail_if_not_found=fail_if_not_found,
lock_for_update=lock_for_update
)
@classmethod
def create_subtask(cls, instance, name):

View File

@ -26,7 +26,6 @@ from oslo_serialization import jsonutils
from sqlalchemy import or_
from nailgun import consts
from nailgun.errors import errors as nailgun_errors
from nailgun import notifier
from nailgun import objects
from nailgun.settings import settings
@ -50,6 +49,37 @@ logger = logging.getLogger('receiverd')
class NailgunReceiver(object):
@classmethod
def acquire_lock(cls, transaction_uuid):
"""Get transaction and acquire exclusive access.
:param transaction_uuid: the unique identifier of transaction
:return: transaction object or None if there is no task with such uid
"""
# use transaction object to get removed by UI tasks
transaction = objects.Transaction.get_by_uuid(transaction_uuid)
if not transaction:
logger.error("Task '%s' was removed.", transaction_uuid)
return
# the lock order is following: cluster, task
if transaction.cluster:
objects.Cluster.get_by_uid(
transaction.cluster_id,
fail_if_not_found=True, lock_for_update=True
)
# read transaction again to ensure
# that it was not removed in other session
transaction = objects.Transaction.get_by_uuid(
transaction_uuid, lock_for_update=True)
if not transaction:
logger.error(
"Race condition detected, task '%s' was removed.",
transaction_uuid
)
return transaction
@classmethod
def remove_nodes_resp(cls, **kwargs):
logger.info(
@ -67,19 +97,9 @@ class NailgunReceiver(object):
progress = 100
# locking task
task = objects.Task.get_by_uuid(
task_uuid,
fail_if_not_found=True,
lock_for_update=True
)
# locking cluster
if task.cluster_id is not None:
objects.Cluster.get_by_uid(
task.cluster_id,
fail_if_not_found=True,
lock_for_update=True
)
task = cls.acquire_lock(task_uuid)
if not task:
return False
# locking nodes
all_nodes = itertools.chain(nodes, error_nodes, inaccessible_nodes)
@ -226,7 +246,9 @@ class NailgunReceiver(object):
)
status = kwargs.get('status')
task_uuid = kwargs['task_uuid']
task = objects.Task.get_by_uuid(task_uuid)
task = cls.acquire_lock(task_uuid)
if not task:
return
if status == consts.TASK_STATUSES.ready:
logger.info("IBP images from deleted cluster have been removed")
@ -241,11 +263,10 @@ class NailgunReceiver(object):
"RPC method transaction_resp received: %s", jsonutils.dumps(kwargs)
)
transaction = objects.Task.get_by_uuid(
kwargs.pop('task_uuid', None),
fail_if_not_found=True,
lock_for_update=True,
)
# TODO(bgaifullin) move lock to transaction manager
transaction = cls.acquire_lock(kwargs.pop('task_uuid', None))
if not transaction:
return
manager = transactions.TransactionsManager(transaction.cluster.id)
manager.process(transaction, kwargs)
@ -261,18 +282,9 @@ class NailgunReceiver(object):
status = kwargs.get('status')
progress = kwargs.get('progress')
task = objects.Task.get_by_uuid(
task_uuid,
fail_if_not_found=True,
lock_for_update=True
)
# lock cluster
objects.Cluster.get_by_uid(
task.cluster_id,
fail_if_not_found=True,
lock_for_update=True
)
task = cls.acquire_lock(task_uuid)
if not task:
return
if not status:
status = task.status
@ -377,11 +389,9 @@ class NailgunReceiver(object):
progress = kwargs.get('progress')
nodes = kwargs.get('nodes', [])
task = objects.Task.get_by_uuid(
task_uuid,
fail_if_not_found=True,
lock_for_update=True
)
task = cls.acquire_lock(task_uuid)
if not task:
return
# we should remove master node from the nodes since it requires
# special handling and won't work with old code
@ -432,11 +442,9 @@ class NailgunReceiver(object):
status = kwargs.get('status')
progress = kwargs.get('progress')
task = objects.Task.get_by_uuid(
task_uuid,
fail_if_not_found=True,
lock_for_update=True
)
task = cls.acquire_lock(task_uuid)
if not task:
return
q_nodes = objects.NodeCollection.filter_by_id_list(
None, task.cache['nodes'])
@ -667,10 +675,9 @@ class NailgunReceiver(object):
status = kwargs.get('status')
progress = kwargs.get('progress')
task = objects.Task.get_by_uuid(
task_uuid,
fail_if_not_found=True,
)
task = cls.acquire_lock(task_uuid)
if not task:
return
stopping_task_names = [
consts.TASK_NAMES.deploy,
@ -693,13 +700,6 @@ class NailgunReceiver(object):
'id'
).all()
# Locking cluster
objects.Cluster.get_by_uid(
task.cluster_id,
fail_if_not_found=True,
lock_for_update=True
)
if not stop_tasks:
logger.warning("stop_deployment_resp: deployment tasks \
not found for environment '%s'!", task.cluster_id)
@ -821,18 +821,9 @@ class NailgunReceiver(object):
status = kwargs.get('status')
progress = kwargs.get('progress')
task = objects.Task.get_by_uuid(
task_uuid,
fail_if_not_found=True,
lock_for_update=True
)
# Locking cluster
objects.Cluster.get_by_uid(
task.cluster_id,
fail_if_not_found=True,
lock_for_update=True
)
task = cls.acquire_lock(task_uuid)
if not task:
return
if status == consts.TASK_STATUSES.ready:
# restoring pending changes
@ -922,8 +913,9 @@ class NailgunReceiver(object):
status = kwargs.get('status')
progress = kwargs.get('progress')
# We simply check that each node received all vlans for cluster
task = objects.Task.get_by_uuid(task_uuid, fail_if_not_found=True)
task = cls.acquire_lock(task_uuid)
if not task:
return
result = []
# We expect that 'nodes' contains all nodes which we test.
@ -1049,7 +1041,10 @@ class NailgunReceiver(object):
jsonutils.dumps(kwargs))
)
task_uuid = kwargs.get('task_uuid')
task = objects.task.Task.get_by_uuid(uuid=task_uuid)
task = cls.acquire_lock(task_uuid)
if not task:
return
if kwargs.get('status'):
task.status = kwargs['status']
task.progress = kwargs.get('progress', 0)
@ -1116,6 +1111,10 @@ class NailgunReceiver(object):
status = kwargs.get('status')
progress = kwargs.get('progress')
task = cls.acquire_lock(task_uuid)
if not task:
return
nodes_uids = [node['uid'] for node in nodes]
nodes_db = db().query(Node).filter(Node.id.in_(nodes_uids)).all()
nodes_map = dict((str(node.id), node) for node in nodes_db)
@ -1142,7 +1141,6 @@ class NailgunReceiver(object):
error_msg = '\n'.join(messages) if messages else error_msg
logger.debug('Check dhcp message %s', error_msg)
task = objects.Task.get_by_uuid(task_uuid, fail_if_not_found=True)
objects.Task.update_verify_networks(task, status, progress,
error_msg, result)
@ -1157,7 +1155,9 @@ class NailgunReceiver(object):
status = kwargs.get('status')
progress = kwargs.get('progress')
task = objects.Task.get_by_uuid(task_uuid, fail_if_not_found=True)
task = cls.acquire_lock(task_uuid)
if not task:
return
release_info = task.cache['args']['release_info']
release_id = release_info['release_id']
@ -1200,7 +1200,9 @@ class NailgunReceiver(object):
error = kwargs.get('error')
msg = kwargs.get('msg')
task = objects.Task.get_by_uuid(task_uuid, fail_if_not_found=True)
task = cls.acquire_lock(task_uuid)
if not task:
return
if status == 'error':
notifier.notify('error', error)
@ -1229,8 +1231,9 @@ class NailgunReceiver(object):
error = kwargs.get('error')
message = kwargs.get('msg')
task = objects.Task.get_by_uuid(
task_uuid, fail_if_not_found=True, lock_for_update=True)
task = cls.acquire_lock(task_uuid)
if not task:
return
if status not in (consts.TASK_STATUSES.ready,
consts.TASK_STATUSES.error):
@ -1281,8 +1284,9 @@ class NailgunReceiver(object):
status = consts.TASK_STATUSES.ready
progress = 100
task = objects.Task.get_by_uuid(
task_uuid, fail_if_not_found=True)
task = cls.acquire_lock(task_uuid)
if not task:
return
failed_response_nodes = {
n['uid']: n for n in response if n['status'] != 0
@ -1364,22 +1368,19 @@ class NailgunReceiver(object):
jsonutils.dumps(kwargs))
task_uuid = kwargs.get('task_uuid')
task = cls.acquire_lock(task_uuid)
if not task:
return
try:
task = objects.Task.get_by_uuid(task_uuid, fail_if_not_found=True,
lock_for_update=True)
if task.status == consts.TASK_STATUSES.pending:
objects.Task.update(
task, {'status': consts.TASK_STATUSES.running})
logger.debug("Task '%s' is acknowledged as running",
task_uuid)
else:
logger.debug("Task '%s' in status '%s' can not "
"be acknowledged as running", task_uuid,
task.status)
except nailgun_errors.ObjectNotFound:
logger.warning("Task '%s' acknowledgement as running failed "
"due to task doesn't exist in DB", task_uuid)
if task.status == consts.TASK_STATUSES.pending:
objects.Task.update(
task, {'status': consts.TASK_STATUSES.running})
logger.debug("Task '%s' is acknowledged as running",
task_uuid)
else:
logger.debug("Task '%s' in status '%s' can not "
"be acknowledged as running", task_uuid,
task.status)
@classmethod
def update_dnsmasq_resp(cls, **kwargs):
@ -1391,8 +1392,9 @@ class NailgunReceiver(object):
error = kwargs.get('error', '')
message = kwargs.get('msg', '')
task = objects.Task.get_by_uuid(
task_uuid, fail_if_not_found=True, lock_for_update=True)
task = cls.acquire_lock(task_uuid)
if not task:
return
data = {'status': status, 'progress': 100, 'message': message}
if status == consts.TASK_STATUSES.error:

View File

@ -847,28 +847,43 @@ class StopDeploymentTaskManager(TaskManager):
return task
class ResetEnvironmentTaskManager(TaskManager):
class ClearTaskHistory(TaskManager):
def clear_tasks_history(self, force=False):
try:
self.check_running_task(delete_obsolete=False)
except errors.TaskAlreadyRunning:
if not force:
raise
logger.error(
u"Force stop running tasks for cluster %s", self.cluster.name
)
running_tasks = objects.TaskCollection.all_in_progress(
self.cluster.id
)
for task in running_tasks:
# Force set task to finished state and update action log
TaskHelper.set_ready_if_not_finished(task)
# clear tasks history
cluster_tasks = objects.TaskCollection.get_cluster_tasks(
self.cluster.id
)
cluster_tasks.delete(synchronize_session='fetch')
class ResetEnvironmentTaskManager(ClearTaskHistory):
def execute(self, force=False, **kwargs):
try:
self.check_running_task(
delete_obsolete=objects.Task.hard_delete
)
self.clear_tasks_history(force=force)
except errors.TaskAlreadyRunning:
if force:
logger.error(
u"Reset cluster '{0}' "
u"while deployment is still running."
.format(self.cluster.name)
)
else:
raise errors.DeploymentAlreadyStarted(
"Can't reset environment '{0}' when "
"running deployment task exists.".format(
self.cluster.id
)
raise errors.DeploymentAlreadyStarted(
"Can't reset environment '{0}' when "
"running deployment task exists.".format(
self.cluster.id
)
)
# FIXME(aroma): remove updating of 'deployed_before'
# when stop action is reworked. 'deployed_before'
@ -1111,33 +1126,17 @@ class VerifyNetworksTaskManager(TaskManager):
return task
class ClusterDeletionManager(TaskManager):
class ClusterDeletionManager(ClearTaskHistory):
def execute(self, force=False, **kwargs):
try:
self.check_running_task(
delete_obsolete=objects.Task.hard_delete
)
self.clear_tasks_history(force=force)
except errors.TaskAlreadyRunning:
if force:
logger.warning(
u"Deletion cluster '{0}' "
u"while deployment is still running."
.format(self.cluster.name)
)
running_tasks = objects.TaskCollection.all_in_progress(
raise errors.DeploymentAlreadyStarted(
u"Can't delete environment '{0}' when "
u"running deployment task exists.".format(
self.cluster.id
)
for task in running_tasks:
# Force set task to finished state and update action log
TaskHelper.set_ready_if_not_finished(task)
else:
raise errors.DeploymentAlreadyStarted(
u"Can't delete environment '{0}' when "
u"running deployment task exists.".format(
self.cluster.id
)
)
)
# locking nodes
nodes = objects.NodeCollection.filter_by(
None,

View File

@ -91,16 +91,23 @@ class TestDeadlockDetector(BaseTestCase):
db().rollback()
self.assertEquals(0, len(dd.context.locks))
def test_unknown_locks_chain_failed(self):
def test_different_order_in_chains_detected(self):
db().query(models.Release).with_lockmode('update').all()
db().query(models.Node).with_lockmode('update').all()
db().rollback()
db().query(models.Node).with_lockmode('update').all()
self.assertRaises(
dd.LockTransitionNotAllowedError,
db().query(models.Node).with_lockmode, 'update'
db().query(models.Release).with_lockmode, 'update'
)
db().rollback()
db().query(models.Task).with_lockmode('update').all()
db().query(models.Cluster).with_lockmode('update').all()
db().query(models.Task).with_lockmode('update').all()
db().query(models.Node).with_lockmode('update').all()
db().rollback()
db().query(models.Node).with_lockmode('update').all()
self.assertRaises(
dd.LockTransitionNotAllowedError,