Multiprocess tasks serialization

Tasks serialization process take long time in case when environment
contains many nodes. To reduce time of serialization, make
serialization process work in parallel (reduces time of
serialization mostly linear way with increasing workers pool)

DocImpact
Change-Id: Id3753dbc6983256d410e69c98ab02b61ab6bfb7f
Partial-Bug: #1572103
Co-Authored-With: V. Kuklin <vkuklin@mirantis.com>
Co-Authored-With: B. Gaifullin <bgaifullin@mirantis.com>
(cherry picked from commit 02c4283fe7)
This commit is contained in:
Stanislaw Bogatkin 2016-04-19 14:55:57 +03:00 committed by Bulat Gaifullin
parent e85d2bc955
commit 4efe81f589
4 changed files with 130 additions and 35 deletions

View File

@ -15,6 +15,7 @@
# under the License.
from distutils.version import StrictVersion
import multiprocessing
import six
@ -30,6 +31,93 @@ from nailgun.utils.role_resolver import NameMatchingPolicy
# but there is no chance to re-use TasksSerializer until bug
# https://bugs.launchpad.net/fuel/+bug/1562292 is not fixed
def _serialize_task_for_node(factory, node_and_task):
node_id, task = node_and_task
logger.debug(
"applying task '%s' for node: %s", task['id'], node_id
)
try:
task_serializer = factory.create_serializer(task)
serialized = task_serializer.serialize(node_id)
return node_id, serialized
except Exception:
logger.exception(
"failed to serialize task '%s' for node: %s", task['id'], node_id
)
raise
def _initialize_worker(serializers_factory, context):
globals()['__factory'] = serializers_factory(context)
def _serialize_task_for_node_in_worker(node_and_task):
return _serialize_task_for_node(globals()['__factory'], node_and_task)
class SingleWorkerConcurrencyPolicy(object):
def execute(self, context, serializers_factory, tasks):
"""Executes task serialization synchronously, task by task.
:param context: the transaction context
:param serializers_factory: the serializers factory
:param tasks: the tasks to serialize
:return sequence of serialized tasks
"""
factory = serializers_factory(context)
return six.moves.map(
lambda x: _serialize_task_for_node(factory, x),
tasks
)
class MultiProcessingConcurrencyPolicy(object):
def __init__(self, workers_num):
self.workers_num = workers_num
def execute(self, context, serializers_factory, tasks):
"""Executes task serialization in parallel.
:param context: the transaction context
:param serializers_factory: the serializers factory
:param tasks: the tasks to serialize
:return sequence of serialized tasks
"""
pool = multiprocessing.Pool(
processes=self.workers_num,
initializer=_initialize_worker,
initargs=(serializers_factory, context)
)
try:
result = pool.imap_unordered(
_serialize_task_for_node_in_worker, tasks
)
for r in result:
yield r
except Exception:
pool.terminate()
raise
else:
pool.close()
finally:
pool.join()
def get_concurrency_policy():
cpu_num = settings.LCM_SERIALIZERS_CONCURRENCY_FACTOR
if not cpu_num:
try:
cpu_num = multiprocessing.cpu_count()
except NotImplementedError:
cpu_num = 1
if cpu_num > 1:
return MultiProcessingConcurrencyPolicy(cpu_num)
return SingleWorkerConcurrencyPolicy()
class TransactionSerializer(object):
"""The deploy tasks serializer."""
@ -44,9 +132,10 @@ class TransactionSerializer(object):
def __init__(self, context, role_resolver):
self.role_resolver = role_resolver
self.factory = self.serializer_factory_class(context)
self.context = context
self.tasks_graph = {}
self.tasks_dictionary = {}
self.concurrency_policy = get_concurrency_policy()
@classmethod
def serialize(cls, context, tasks, role_resolver):
@ -95,9 +184,27 @@ class TransactionSerializer(object):
:param tasks: the deployment tasks
:return the mapping tasks per node
"""
serialized = self.concurrency_policy.execute(
self.context,
self.serializer_factory_class,
self.expand_tasks(tasks)
)
tasks_mapping = {}
for node_and_task in serialized:
node_id, task = node_and_task
node_tasks = self.tasks_graph.setdefault(node_id, {})
# de-duplication the tasks on node
# since task can be added after expanding of group need to
# overwrite task if existed task is skipped and new is not skipped.
if self.need_update_task(node_tasks, task):
node_tasks[task['id']] = task
# make sure that null node is present
self.tasks_graph.setdefault(None, {})
def expand_tasks(self, tasks):
groups = []
tasks_mapping = {}
for task in tasks:
if task.get('type') == consts.ORCHESTRATOR_TASK_TYPES.group:
@ -105,7 +212,8 @@ class TransactionSerializer(object):
else:
self.ensure_task_based_deploy_allowed(task)
tasks_mapping[task['id']] = task
self.process_task(task, self.resolve_nodes(task))
for node_id in self.resolve_nodes(task):
yield node_id, task
for task in groups:
node_ids = self.role_resolver.resolve(
@ -115,39 +223,13 @@ class TransactionSerializer(object):
try:
sub_task = tasks_mapping[sub_task_id]
except KeyError:
raise errors.InvalidData(
'Task %s cannot be resolved', sub_task_id
)
msg = 'Task {0} cannot be resolved'.format(sub_task_id)
logger.error(msg)
raise errors.InvalidData(msg)
# if group is not excluded, all task should be run as well
# otherwise check each task individually
self.process_task(sub_task, node_ids)
# make sure that null node is present
self.tasks_graph.setdefault(None, {})
def process_task(self, task, node_ids):
"""Processes one task one nodes of cluster.
:param task: the task instance
:param node_ids: the list of nodes, where this tasks should run
"""
logger.debug("applying task '%s' for nodes: %s", task['id'], node_ids)
task_serializer = self.factory.create_serializer(task)
for node_id in node_ids:
try:
task = task_serializer.serialize(node_id)
except Exception:
logger.exception("Failed to serialize task %s", task['id'])
raise
node_tasks = self.tasks_graph.setdefault(node_id, {})
# de-duplication the tasks on node
# since task can be added after expand group need to
# overwrite if existed task is skipped and new is not skipped.
if self.need_update_task(node_tasks, task):
node_tasks[task['id']] = task
for node_id in node_ids:
yield node_id, sub_task
def resolve_nodes(self, task):
if task.get('type') == consts.ORCHESTRATOR_TASK_TYPES.stage:

View File

@ -15,6 +15,7 @@
# under the License.
import mock
import multiprocessing.dummy
from nailgun import consts
from nailgun.errors import errors
@ -391,3 +392,15 @@ class TestTransactionSerializer(BaseUnitTest):
{'type': consts.ORCHESTRATOR_TASK_TYPES.puppet,
'version': '1.0.0', 'id': 'test'}
)
@mock.patch(
'nailgun.lcm.transaction_serializer.settings'
'.LCM_SERIALIZERS_CONCURRENCY_FACTOR',
new=2
)
@mock.patch(
'nailgun.lcm.transaction_serializer.multiprocessing',
new=multiprocessing.dummy
)
def test_multi_processing_serialization(self):
self.test_serialize_integration()

View File

@ -12,7 +12,6 @@
# License for the specific language governing permissions and limitations
# under the License.
from oslo_serialization import jsonutils
import yaml
from yaql.language import specs

View File

@ -61,6 +61,7 @@ APP_LOG: "${NAILGUN_LOGS}/app.log"
RPC_CONSUMER_LOG_PATH: "${NAILGUN_LOGS}/receiverd.log"
ASSASSIN_LOG_PATH: "${NAILGUN_LOGS}/assassind.log"
STATS_LOGS_PATH: ${NAILGUN_LOGS}
LCM_SERIALIZERS_CONCURRENCY_FACTOR: 1
EOL
}