diff --git a/nailgun/nailgun/lcm/transaction_serializer.py b/nailgun/nailgun/lcm/transaction_serializer.py index 953512e6af..3d339d1a86 100644 --- a/nailgun/nailgun/lcm/transaction_serializer.py +++ b/nailgun/nailgun/lcm/transaction_serializer.py @@ -15,6 +15,7 @@ # under the License. from distutils.version import StrictVersion +import multiprocessing import six @@ -30,6 +31,93 @@ from nailgun.utils.role_resolver import NameMatchingPolicy # but there is no chance to re-use TasksSerializer until bug # https://bugs.launchpad.net/fuel/+bug/1562292 is not fixed + +def _serialize_task_for_node(factory, node_and_task): + node_id, task = node_and_task + logger.debug( + "applying task '%s' for node: %s", task['id'], node_id + ) + try: + task_serializer = factory.create_serializer(task) + serialized = task_serializer.serialize(node_id) + return node_id, serialized + except Exception: + logger.exception( + "failed to serialize task '%s' for node: %s", task['id'], node_id + ) + raise + + +def _initialize_worker(serializers_factory, context): + globals()['__factory'] = serializers_factory(context) + + +def _serialize_task_for_node_in_worker(node_and_task): + return _serialize_task_for_node(globals()['__factory'], node_and_task) + + +class SingleWorkerConcurrencyPolicy(object): + def execute(self, context, serializers_factory, tasks): + """Executes task serialization synchronously, task by task. + + :param context: the transaction context + :param serializers_factory: the serializers factory + :param tasks: the tasks to serialize + :return sequence of serialized tasks + """ + factory = serializers_factory(context) + return six.moves.map( + lambda x: _serialize_task_for_node(factory, x), + tasks + ) + + +class MultiProcessingConcurrencyPolicy(object): + def __init__(self, workers_num): + self.workers_num = workers_num + + def execute(self, context, serializers_factory, tasks): + """Executes task serialization in parallel. + + :param context: the transaction context + :param serializers_factory: the serializers factory + :param tasks: the tasks to serialize + :return sequence of serialized tasks + """ + pool = multiprocessing.Pool( + processes=self.workers_num, + initializer=_initialize_worker, + initargs=(serializers_factory, context) + ) + + try: + result = pool.imap_unordered( + _serialize_task_for_node_in_worker, tasks + ) + for r in result: + yield r + except Exception: + pool.terminate() + raise + else: + pool.close() + finally: + pool.join() + + +def get_concurrency_policy(): + cpu_num = settings.LCM_SERIALIZERS_CONCURRENCY_FACTOR + if not cpu_num: + try: + cpu_num = multiprocessing.cpu_count() + except NotImplementedError: + cpu_num = 1 + + if cpu_num > 1: + return MultiProcessingConcurrencyPolicy(cpu_num) + return SingleWorkerConcurrencyPolicy() + + class TransactionSerializer(object): """The deploy tasks serializer.""" @@ -44,9 +132,10 @@ class TransactionSerializer(object): def __init__(self, context, role_resolver): self.role_resolver = role_resolver - self.factory = self.serializer_factory_class(context) + self.context = context self.tasks_graph = {} self.tasks_dictionary = {} + self.concurrency_policy = get_concurrency_policy() @classmethod def serialize(cls, context, tasks, role_resolver): @@ -95,9 +184,27 @@ class TransactionSerializer(object): :param tasks: the deployment tasks :return the mapping tasks per node """ + serialized = self.concurrency_policy.execute( + self.context, + self.serializer_factory_class, + self.expand_tasks(tasks) + ) - tasks_mapping = {} + for node_and_task in serialized: + node_id, task = node_and_task + node_tasks = self.tasks_graph.setdefault(node_id, {}) + # de-duplication the tasks on node + # since task can be added after expanding of group need to + # overwrite task if existed task is skipped and new is not skipped. + if self.need_update_task(node_tasks, task): + node_tasks[task['id']] = task + + # make sure that null node is present + self.tasks_graph.setdefault(None, {}) + + def expand_tasks(self, tasks): groups = [] + tasks_mapping = {} for task in tasks: if task.get('type') == consts.ORCHESTRATOR_TASK_TYPES.group: @@ -105,7 +212,8 @@ class TransactionSerializer(object): else: self.ensure_task_based_deploy_allowed(task) tasks_mapping[task['id']] = task - self.process_task(task, self.resolve_nodes(task)) + for node_id in self.resolve_nodes(task): + yield node_id, task for task in groups: node_ids = self.role_resolver.resolve( @@ -115,39 +223,13 @@ class TransactionSerializer(object): try: sub_task = tasks_mapping[sub_task_id] except KeyError: - raise errors.InvalidData( - 'Task %s cannot be resolved', sub_task_id - ) - + msg = 'Task {0} cannot be resolved'.format(sub_task_id) + logger.error(msg) + raise errors.InvalidData(msg) # if group is not excluded, all task should be run as well # otherwise check each task individually - self.process_task(sub_task, node_ids) - - # make sure that null node is present - self.tasks_graph.setdefault(None, {}) - - def process_task(self, task, node_ids): - """Processes one task one nodes of cluster. - - :param task: the task instance - :param node_ids: the list of nodes, where this tasks should run - """ - - logger.debug("applying task '%s' for nodes: %s", task['id'], node_ids) - task_serializer = self.factory.create_serializer(task) - for node_id in node_ids: - try: - task = task_serializer.serialize(node_id) - except Exception: - logger.exception("Failed to serialize task %s", task['id']) - raise - - node_tasks = self.tasks_graph.setdefault(node_id, {}) - # de-duplication the tasks on node - # since task can be added after expand group need to - # overwrite if existed task is skipped and new is not skipped. - if self.need_update_task(node_tasks, task): - node_tasks[task['id']] = task + for node_id in node_ids: + yield node_id, sub_task def resolve_nodes(self, task): if task.get('type') == consts.ORCHESTRATOR_TASK_TYPES.stage: diff --git a/nailgun/nailgun/test/unit/test_lcm_transaction_serializer.py b/nailgun/nailgun/test/unit/test_lcm_transaction_serializer.py index cb852d00cf..031c8818f7 100644 --- a/nailgun/nailgun/test/unit/test_lcm_transaction_serializer.py +++ b/nailgun/nailgun/test/unit/test_lcm_transaction_serializer.py @@ -15,6 +15,7 @@ # under the License. import mock +import multiprocessing.dummy from nailgun import consts from nailgun.errors import errors @@ -391,3 +392,15 @@ class TestTransactionSerializer(BaseUnitTest): {'type': consts.ORCHESTRATOR_TASK_TYPES.puppet, 'version': '1.0.0', 'id': 'test'} ) + + @mock.patch( + 'nailgun.lcm.transaction_serializer.settings' + '.LCM_SERIALIZERS_CONCURRENCY_FACTOR', + new=2 + ) + @mock.patch( + 'nailgun.lcm.transaction_serializer.multiprocessing', + new=multiprocessing.dummy + ) + def test_multi_processing_serialization(self): + self.test_serialize_integration() diff --git a/nailgun/nailgun/yaql_ext/serializers.py b/nailgun/nailgun/yaql_ext/serializers.py index caaf0e4eb6..7e9ef879cf 100644 --- a/nailgun/nailgun/yaql_ext/serializers.py +++ b/nailgun/nailgun/yaql_ext/serializers.py @@ -12,7 +12,6 @@ # License for the specific language governing permissions and limitations # under the License. - from oslo_serialization import jsonutils import yaml from yaql.language import specs diff --git a/nailgun/tools/env_functions.sh b/nailgun/tools/env_functions.sh index 0fa0e98077..be21b44edd 100644 --- a/nailgun/tools/env_functions.sh +++ b/nailgun/tools/env_functions.sh @@ -61,6 +61,7 @@ APP_LOG: "${NAILGUN_LOGS}/app.log" RPC_CONSUMER_LOG_PATH: "${NAILGUN_LOGS}/receiverd.log" ASSASSIN_LOG_PATH: "${NAILGUN_LOGS}/assassind.log" STATS_LOGS_PATH: ${NAILGUN_LOGS} +LCM_SERIALIZERS_CONCURRENCY_FACTOR: 1 EOL }