Multiprocess tasks serialization
Tasks serialization process take long time in case when environment
contains many nodes. To reduce time of serialization, make
serialization process work in parallel (reduces time of
serialization mostly linear way with increasing workers pool)
DocImpact
Change-Id: Id3753dbc6983256d410e69c98ab02b61ab6bfb7f
Partial-Bug: #1572103
Co-Authored-With: V. Kuklin <vkuklin@mirantis.com>
Co-Authored-With: B. Gaifullin <bgaifullin@mirantis.com>
(cherry picked from commit 02c4283fe7
)
This commit is contained in:
parent
e85d2bc955
commit
4efe81f589
|
@ -15,6 +15,7 @@
|
|||
# under the License.
|
||||
|
||||
from distutils.version import StrictVersion
|
||||
import multiprocessing
|
||||
|
||||
import six
|
||||
|
||||
|
@ -30,6 +31,93 @@ from nailgun.utils.role_resolver import NameMatchingPolicy
|
|||
# but there is no chance to re-use TasksSerializer until bug
|
||||
# https://bugs.launchpad.net/fuel/+bug/1562292 is not fixed
|
||||
|
||||
|
||||
def _serialize_task_for_node(factory, node_and_task):
|
||||
node_id, task = node_and_task
|
||||
logger.debug(
|
||||
"applying task '%s' for node: %s", task['id'], node_id
|
||||
)
|
||||
try:
|
||||
task_serializer = factory.create_serializer(task)
|
||||
serialized = task_serializer.serialize(node_id)
|
||||
return node_id, serialized
|
||||
except Exception:
|
||||
logger.exception(
|
||||
"failed to serialize task '%s' for node: %s", task['id'], node_id
|
||||
)
|
||||
raise
|
||||
|
||||
|
||||
def _initialize_worker(serializers_factory, context):
|
||||
globals()['__factory'] = serializers_factory(context)
|
||||
|
||||
|
||||
def _serialize_task_for_node_in_worker(node_and_task):
|
||||
return _serialize_task_for_node(globals()['__factory'], node_and_task)
|
||||
|
||||
|
||||
class SingleWorkerConcurrencyPolicy(object):
|
||||
def execute(self, context, serializers_factory, tasks):
|
||||
"""Executes task serialization synchronously, task by task.
|
||||
|
||||
:param context: the transaction context
|
||||
:param serializers_factory: the serializers factory
|
||||
:param tasks: the tasks to serialize
|
||||
:return sequence of serialized tasks
|
||||
"""
|
||||
factory = serializers_factory(context)
|
||||
return six.moves.map(
|
||||
lambda x: _serialize_task_for_node(factory, x),
|
||||
tasks
|
||||
)
|
||||
|
||||
|
||||
class MultiProcessingConcurrencyPolicy(object):
|
||||
def __init__(self, workers_num):
|
||||
self.workers_num = workers_num
|
||||
|
||||
def execute(self, context, serializers_factory, tasks):
|
||||
"""Executes task serialization in parallel.
|
||||
|
||||
:param context: the transaction context
|
||||
:param serializers_factory: the serializers factory
|
||||
:param tasks: the tasks to serialize
|
||||
:return sequence of serialized tasks
|
||||
"""
|
||||
pool = multiprocessing.Pool(
|
||||
processes=self.workers_num,
|
||||
initializer=_initialize_worker,
|
||||
initargs=(serializers_factory, context)
|
||||
)
|
||||
|
||||
try:
|
||||
result = pool.imap_unordered(
|
||||
_serialize_task_for_node_in_worker, tasks
|
||||
)
|
||||
for r in result:
|
||||
yield r
|
||||
except Exception:
|
||||
pool.terminate()
|
||||
raise
|
||||
else:
|
||||
pool.close()
|
||||
finally:
|
||||
pool.join()
|
||||
|
||||
|
||||
def get_concurrency_policy():
|
||||
cpu_num = settings.LCM_SERIALIZERS_CONCURRENCY_FACTOR
|
||||
if not cpu_num:
|
||||
try:
|
||||
cpu_num = multiprocessing.cpu_count()
|
||||
except NotImplementedError:
|
||||
cpu_num = 1
|
||||
|
||||
if cpu_num > 1:
|
||||
return MultiProcessingConcurrencyPolicy(cpu_num)
|
||||
return SingleWorkerConcurrencyPolicy()
|
||||
|
||||
|
||||
class TransactionSerializer(object):
|
||||
"""The deploy tasks serializer."""
|
||||
|
||||
|
@ -44,9 +132,10 @@ class TransactionSerializer(object):
|
|||
|
||||
def __init__(self, context, role_resolver):
|
||||
self.role_resolver = role_resolver
|
||||
self.factory = self.serializer_factory_class(context)
|
||||
self.context = context
|
||||
self.tasks_graph = {}
|
||||
self.tasks_dictionary = {}
|
||||
self.concurrency_policy = get_concurrency_policy()
|
||||
|
||||
@classmethod
|
||||
def serialize(cls, context, tasks, role_resolver):
|
||||
|
@ -95,9 +184,27 @@ class TransactionSerializer(object):
|
|||
:param tasks: the deployment tasks
|
||||
:return the mapping tasks per node
|
||||
"""
|
||||
serialized = self.concurrency_policy.execute(
|
||||
self.context,
|
||||
self.serializer_factory_class,
|
||||
self.expand_tasks(tasks)
|
||||
)
|
||||
|
||||
tasks_mapping = {}
|
||||
for node_and_task in serialized:
|
||||
node_id, task = node_and_task
|
||||
node_tasks = self.tasks_graph.setdefault(node_id, {})
|
||||
# de-duplication the tasks on node
|
||||
# since task can be added after expanding of group need to
|
||||
# overwrite task if existed task is skipped and new is not skipped.
|
||||
if self.need_update_task(node_tasks, task):
|
||||
node_tasks[task['id']] = task
|
||||
|
||||
# make sure that null node is present
|
||||
self.tasks_graph.setdefault(None, {})
|
||||
|
||||
def expand_tasks(self, tasks):
|
||||
groups = []
|
||||
tasks_mapping = {}
|
||||
|
||||
for task in tasks:
|
||||
if task.get('type') == consts.ORCHESTRATOR_TASK_TYPES.group:
|
||||
|
@ -105,7 +212,8 @@ class TransactionSerializer(object):
|
|||
else:
|
||||
self.ensure_task_based_deploy_allowed(task)
|
||||
tasks_mapping[task['id']] = task
|
||||
self.process_task(task, self.resolve_nodes(task))
|
||||
for node_id in self.resolve_nodes(task):
|
||||
yield node_id, task
|
||||
|
||||
for task in groups:
|
||||
node_ids = self.role_resolver.resolve(
|
||||
|
@ -115,39 +223,13 @@ class TransactionSerializer(object):
|
|||
try:
|
||||
sub_task = tasks_mapping[sub_task_id]
|
||||
except KeyError:
|
||||
raise errors.InvalidData(
|
||||
'Task %s cannot be resolved', sub_task_id
|
||||
)
|
||||
|
||||
msg = 'Task {0} cannot be resolved'.format(sub_task_id)
|
||||
logger.error(msg)
|
||||
raise errors.InvalidData(msg)
|
||||
# if group is not excluded, all task should be run as well
|
||||
# otherwise check each task individually
|
||||
self.process_task(sub_task, node_ids)
|
||||
|
||||
# make sure that null node is present
|
||||
self.tasks_graph.setdefault(None, {})
|
||||
|
||||
def process_task(self, task, node_ids):
|
||||
"""Processes one task one nodes of cluster.
|
||||
|
||||
:param task: the task instance
|
||||
:param node_ids: the list of nodes, where this tasks should run
|
||||
"""
|
||||
|
||||
logger.debug("applying task '%s' for nodes: %s", task['id'], node_ids)
|
||||
task_serializer = self.factory.create_serializer(task)
|
||||
for node_id in node_ids:
|
||||
try:
|
||||
task = task_serializer.serialize(node_id)
|
||||
except Exception:
|
||||
logger.exception("Failed to serialize task %s", task['id'])
|
||||
raise
|
||||
|
||||
node_tasks = self.tasks_graph.setdefault(node_id, {})
|
||||
# de-duplication the tasks on node
|
||||
# since task can be added after expand group need to
|
||||
# overwrite if existed task is skipped and new is not skipped.
|
||||
if self.need_update_task(node_tasks, task):
|
||||
node_tasks[task['id']] = task
|
||||
for node_id in node_ids:
|
||||
yield node_id, sub_task
|
||||
|
||||
def resolve_nodes(self, task):
|
||||
if task.get('type') == consts.ORCHESTRATOR_TASK_TYPES.stage:
|
||||
|
|
|
@ -15,6 +15,7 @@
|
|||
# under the License.
|
||||
|
||||
import mock
|
||||
import multiprocessing.dummy
|
||||
|
||||
from nailgun import consts
|
||||
from nailgun.errors import errors
|
||||
|
@ -391,3 +392,15 @@ class TestTransactionSerializer(BaseUnitTest):
|
|||
{'type': consts.ORCHESTRATOR_TASK_TYPES.puppet,
|
||||
'version': '1.0.0', 'id': 'test'}
|
||||
)
|
||||
|
||||
@mock.patch(
|
||||
'nailgun.lcm.transaction_serializer.settings'
|
||||
'.LCM_SERIALIZERS_CONCURRENCY_FACTOR',
|
||||
new=2
|
||||
)
|
||||
@mock.patch(
|
||||
'nailgun.lcm.transaction_serializer.multiprocessing',
|
||||
new=multiprocessing.dummy
|
||||
)
|
||||
def test_multi_processing_serialization(self):
|
||||
self.test_serialize_integration()
|
||||
|
|
|
@ -12,7 +12,6 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
|
||||
from oslo_serialization import jsonutils
|
||||
import yaml
|
||||
from yaql.language import specs
|
||||
|
|
|
@ -61,6 +61,7 @@ APP_LOG: "${NAILGUN_LOGS}/app.log"
|
|||
RPC_CONSUMER_LOG_PATH: "${NAILGUN_LOGS}/receiverd.log"
|
||||
ASSASSIN_LOG_PATH: "${NAILGUN_LOGS}/assassind.log"
|
||||
STATS_LOGS_PATH: ${NAILGUN_LOGS}
|
||||
LCM_SERIALIZERS_CONCURRENCY_FACTOR: 1
|
||||
EOL
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue