Distributed serialization implementation
Distributed serialization is implemented with python distributed library. We have scheduler for jobs management and workers for jobs processing. Scheduler is started on the master node as well as set of workers on it. Also workers are started on all nodes. In the cluster settings we can select the type of serialization and nodes statuses that allows serialization on it. By default nodes with status 'ready' are excluded from the workers list. For data serialization we are using only nodes from the cluster where serialization is performing. Before the computation fresh nailgun code is sent to the workers as zip file and it will be imported for job execution. So we always have fresh nailgun code on the workers. In one job we are processing chunks of tasks on the workers. This approach significantly boosts performance. The tasks chunk size is defined as settings.LCM_DS_TASKS_PER_JOB parameter. For limiting memory consumption on the master node we use parameter settings.LCM_DS_NODE_LOAD_COEFF for calculation max number of jobs in the processing queue. Synthetic tests of distributed serialization for 500 nodes with nubmer of ifaces >= 5 performed on 40 cores (4 different machines) took 6-7 minutes on average. Change-Id: Id8ff8fada2f1ab036775fc01c78d91befdda9ea2 Implements: blueprint distributed-serialization
This commit is contained in:
parent
36b90eacdc
commit
4fa861fa8a
|
@ -532,3 +532,7 @@ DPDK_DRIVER_IN_SRIOV_CASE = 'vfio-pci'
|
|||
DEFAULT_MTU = 1500
|
||||
|
||||
SIZE_OF_VLAN_TAG = 4
|
||||
|
||||
SERIALIZATION_POLICY = Enum(
|
||||
'distributed'
|
||||
)
|
||||
|
|
|
@ -1114,6 +1114,55 @@
|
|||
group: "security"
|
||||
weight: 20
|
||||
type: "radio"
|
||||
serialization_policy:
|
||||
value: "default"
|
||||
values:
|
||||
- data: "default"
|
||||
label: "Default serialization"
|
||||
description: "Run serialization on the master node only"
|
||||
- data: "distributed"
|
||||
label: "Distributed serialization"
|
||||
description: "Run serialization on the master and environment nodes. Nodes for serialization are selected only form that environment for wich serialization is performing."
|
||||
label: "Serialization policy"
|
||||
group: "general"
|
||||
weight: 30
|
||||
type: "radio"
|
||||
ds_use_discover:
|
||||
group: "general"
|
||||
label: "Use discovered nodes as workers for serialization"
|
||||
type: "checkbox"
|
||||
value: true
|
||||
weight: 31
|
||||
restrictions:
|
||||
- condition: "settings:common.serialization_policy.value != 'distributed'"
|
||||
action: "hide"
|
||||
ds_use_provisioned:
|
||||
group: "general"
|
||||
label: "Use provisioned nodes as workers for serialization"
|
||||
type: "checkbox"
|
||||
value: true
|
||||
weight: 32
|
||||
restrictions:
|
||||
- condition: "settings:common.serialization_policy.value != 'distributed'"
|
||||
action: "hide"
|
||||
ds_use_error:
|
||||
group: "general"
|
||||
label: "Use nodes in error state as workers for serialization"
|
||||
type: "checkbox"
|
||||
value: true
|
||||
weight: 33
|
||||
restrictions:
|
||||
- condition: "settings:common.serialization_policy.value != 'distributed'"
|
||||
action: "hide"
|
||||
ds_use_ready:
|
||||
group: "general"
|
||||
label: "Use ready nodes as workers for serialization"
|
||||
type: "checkbox"
|
||||
value: false
|
||||
weight: 34
|
||||
restrictions:
|
||||
- condition: "settings:common.serialization_policy.value != 'distributed'"
|
||||
action: "hide"
|
||||
public_network_assignment:
|
||||
metadata:
|
||||
weight: 10
|
||||
|
|
|
@ -110,6 +110,8 @@ class Context(object):
|
|||
return evaluate
|
||||
|
||||
def get_formatter_context(self, node_id):
|
||||
# TODO(akislitsky) remove formatter context from the
|
||||
# tasks serialization workflow
|
||||
data = self._transaction.get_new_data(node_id)
|
||||
return {
|
||||
'CLUSTER_ID': data.get('cluster', {}).get('id'),
|
||||
|
@ -147,9 +149,14 @@ class DeploymentTaskSerializer(object):
|
|||
:return: the result
|
||||
"""
|
||||
|
||||
def serialize(self, node_id):
|
||||
"""Serialize task in expected by orchestrator format.
|
||||
def serialize(self, node_id, formatter_context=None):
|
||||
"""Serialize task in expected by orchestrator format
|
||||
|
||||
If serialization is performed on the remote worker
|
||||
we should pass formatter_context parameter with values
|
||||
from the master node settings
|
||||
|
||||
:param formatter_context: formatter context
|
||||
:param node_id: the target node_id
|
||||
"""
|
||||
|
||||
|
@ -157,10 +164,12 @@ class DeploymentTaskSerializer(object):
|
|||
"serialize task %s for node %s",
|
||||
self.task_template['id'], node_id
|
||||
)
|
||||
formatter_context = formatter_context \
|
||||
or self.context.get_formatter_context(node_id)
|
||||
task = utils.traverse(
|
||||
self.task_template,
|
||||
utils.text_format_safe,
|
||||
self.context.get_formatter_context(node_id),
|
||||
formatter_context,
|
||||
{
|
||||
'yaql_exp': self.context.get_yaql_interpreter(
|
||||
node_id, self.task_template['id'])
|
||||
|
|
|
@ -14,13 +14,21 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from distutils.version import StrictVersion
|
||||
import datetime
|
||||
import multiprocessing
|
||||
import os
|
||||
from Queue import Queue
|
||||
import shutil
|
||||
import tempfile
|
||||
|
||||
import distributed
|
||||
from distutils.version import StrictVersion
|
||||
import six
|
||||
import toolz
|
||||
|
||||
from nailgun import consts
|
||||
from nailgun import errors
|
||||
from nailgun.lcm.task_serializer import Context
|
||||
from nailgun.lcm.task_serializer import TasksSerializersFactory
|
||||
from nailgun.logger import logger
|
||||
from nailgun.settings import settings
|
||||
|
@ -128,7 +136,308 @@ class MultiProcessingConcurrencyPolicy(object):
|
|||
pool.join()
|
||||
|
||||
|
||||
def get_concurrency_policy():
|
||||
def _distributed_serialize_tasks_for_node(formatter_contexts_idx,
|
||||
node_and_tasks, scattered_data):
|
||||
"""Remote serialization call for DistributedProcessingPolicy
|
||||
|
||||
Code of the function is copied to the workers and executed there, thus
|
||||
we are including all required imports inside the function.
|
||||
|
||||
:param formatter_contexts_idx: dict of formatter contexts with node_id
|
||||
value as key
|
||||
:param node_and_tasks: list of node_id, task_data tuples
|
||||
:param scattered_data: feature object, that points to data copied to
|
||||
workers
|
||||
:return: [(node_id, serialized), error]
|
||||
"""
|
||||
|
||||
try:
|
||||
factory = TasksSerializersFactory(scattered_data['context'])
|
||||
|
||||
# Restoring settings
|
||||
settings.config = scattered_data['settings_config']
|
||||
for k in formatter_contexts_idx:
|
||||
formatter_contexts_idx[k]['SETTINGS'] = settings
|
||||
|
||||
except Exception as e:
|
||||
logger.exception("Distributed serialization failed")
|
||||
return [((None, None), e)]
|
||||
|
||||
result = []
|
||||
|
||||
for node_and_task in node_and_tasks:
|
||||
|
||||
node_id = None
|
||||
try:
|
||||
node_id, task = node_and_task
|
||||
|
||||
logger.debug("Starting distributed node %s task %s serialization",
|
||||
node_id, task['id'])
|
||||
|
||||
formatter_context = formatter_contexts_idx[node_id]
|
||||
|
||||
serializer = factory.create_serializer(task)
|
||||
serialized = serializer.serialize(
|
||||
node_id, formatter_context=formatter_context)
|
||||
|
||||
logger.debug("Distributed node %s task %s serialization "
|
||||
"result: %s", node_id, task['id'], serialized)
|
||||
|
||||
result.append(((node_id, serialized), None))
|
||||
except Exception as e:
|
||||
logger.exception("Distributed serialization failed")
|
||||
result.append(((node_id, None), e))
|
||||
break
|
||||
|
||||
logger.debug("Processed tasks count: %s", len(result))
|
||||
return result
|
||||
|
||||
|
||||
class DistributedProcessingPolicy(object):
|
||||
|
||||
def __init__(self):
|
||||
self.sent_jobs = Queue()
|
||||
self.sent_jobs_count = 0
|
||||
|
||||
def _consume_jobs(self, chunk_size=None):
|
||||
"""Consumes jobs
|
||||
|
||||
If chunk_size is set function consumes specified number of
|
||||
Finished tasks or less if sent_jobs_ids queue became empty.
|
||||
If chunk_size is None function consumes jobs until
|
||||
sent_jobs_ids queue became empty.
|
||||
Jobs with statuses Cancelled, Abandoned, Terminated will be
|
||||
resent and their ids added to sent_jobs_ids queue
|
||||
|
||||
:param chunk_size: size of consuming chunk
|
||||
:return: generator on job results
|
||||
"""
|
||||
logger.debug("Consuming jobs started")
|
||||
|
||||
jobs_to_consume = []
|
||||
while not self.sent_jobs.empty():
|
||||
job = self.sent_jobs.get()
|
||||
jobs_to_consume.append(job)
|
||||
|
||||
if chunk_size is not None:
|
||||
chunk_size -= 1
|
||||
if chunk_size <= 0:
|
||||
break
|
||||
|
||||
for ready_job in distributed.as_completed(jobs_to_consume):
|
||||
results = ready_job.result()
|
||||
self.sent_jobs_count -= 1
|
||||
|
||||
for result in results:
|
||||
(node_id, serialized), exc = result
|
||||
logger.debug("Got ready task for node %s, serialized: %s, "
|
||||
"error: %s", node_id, serialized, exc)
|
||||
if exc is not None:
|
||||
raise exc
|
||||
yield node_id, serialized
|
||||
|
||||
logger.debug("Consuming jobs finished")
|
||||
|
||||
def _get_formatter_context(self, task_context, formatter_contexts_idx,
|
||||
node_id):
|
||||
try:
|
||||
return formatter_contexts_idx[node_id]
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
logger.debug("Calculating formatter context for node %s", node_id)
|
||||
formatter_context = task_context.get_formatter_context(
|
||||
node_id)
|
||||
# Settings file is already sent to the workers
|
||||
formatter_context.pop('SETTINGS', None)
|
||||
formatter_contexts_idx[node_id] = formatter_context
|
||||
|
||||
return formatter_context
|
||||
|
||||
def _upload_nailgun_code(self, job_cluster):
|
||||
"""Creates zip of current nailgun code and uploads it to workers
|
||||
|
||||
TODO(akislitsky): add workers scope when it will be implemented
|
||||
in the distributed library
|
||||
|
||||
:param job_cluster: distributed.Client
|
||||
"""
|
||||
logger.debug("Compressing nailgun code")
|
||||
file_dir = os.path.dirname(__file__)
|
||||
nailgun_root_dir = os.path.realpath(os.path.join(file_dir, '..', '..'))
|
||||
archive = os.path.join(tempfile.gettempdir(), 'nailgun')
|
||||
result = shutil.make_archive(archive, 'zip', nailgun_root_dir,
|
||||
'nailgun')
|
||||
logger.debug("Nailgun code saved to: %s", result)
|
||||
|
||||
logger.debug("Uploading nailgun archive %s to workers", result)
|
||||
job_cluster.upload_file(result)
|
||||
|
||||
def _scatter_data(self, job_cluster, context, workers):
|
||||
logger.debug("Scattering data to workers started")
|
||||
shared_data = {'context': context, 'settings_config': settings.config}
|
||||
scattered = job_cluster.scatter(shared_data, broadcast=True,
|
||||
workers=workers)
|
||||
# Waiting data is scattered to workers
|
||||
distributed.wait(scattered.values())
|
||||
logger.debug("Scattering data to workers finished")
|
||||
|
||||
return scattered
|
||||
|
||||
def _get_allowed_nodes_statuses(self, context):
|
||||
"""Extracts node statuses that allows distributed serialization"""
|
||||
common = context.new.get('common', {})
|
||||
cluster = common.get('cluster', {})
|
||||
logger.debug("Getting allowed nodes statuses to use as serialization "
|
||||
"workers for cluster %s", cluster.get('id'))
|
||||
check_fields = {
|
||||
'ds_use_ready': consts.NODE_STATUSES.ready,
|
||||
'ds_use_provisioned': consts.NODE_STATUSES.provisioned,
|
||||
'ds_use_discover': consts.NODE_STATUSES.discover,
|
||||
'ds_use_error': consts.NODE_STATUSES.error
|
||||
}
|
||||
statuses = set()
|
||||
for field, node_status in check_fields.items():
|
||||
if common.get(field):
|
||||
statuses.add(node_status)
|
||||
|
||||
logger.debug("Allowed nodes statuses to use as serialization workers "
|
||||
"for cluster %s are: %s", cluster.get('id'), statuses)
|
||||
return statuses
|
||||
|
||||
def _get_allowed_nodes_ips(self, context):
|
||||
"""Filters online nodes from cluster by their status
|
||||
|
||||
In the cluster settings we select nodes statuses allowed for
|
||||
using in the distributed serialization. Accordingly to selected
|
||||
statuses nodes are going to be filtered.
|
||||
|
||||
:param context: TransactionContext
|
||||
:return: set of allowed nodes ips
|
||||
"""
|
||||
ips = set()
|
||||
allowed_statuses = self._get_allowed_nodes_statuses(context)
|
||||
for node in six.itervalues(context.new.get('nodes', {})):
|
||||
if node.get('status') in allowed_statuses:
|
||||
ips.add(node.get('ip'))
|
||||
ips.add(settings.MASTER_IP)
|
||||
return ips
|
||||
|
||||
def _get_allowed_workers(self, job_cluster, allowed_ips):
|
||||
"""Calculates workers addresses for distributed serialization
|
||||
|
||||
Only workers that placed on the allowed nodes must be selected
|
||||
for the serialization.
|
||||
|
||||
:param job_cluster: distributed.Client
|
||||
:param allowed_ips: allowed for serialization nodes ips
|
||||
:return: list of workers addresses in format 'ip:port'
|
||||
"""
|
||||
logger.debug("Getting allowed workers")
|
||||
workers = {}
|
||||
|
||||
# Worker has address like tcp://ip:port
|
||||
info = job_cluster.scheduler_info()
|
||||
for worker_addr in six.iterkeys(info['workers']):
|
||||
ip_port = worker_addr.split('//')[1]
|
||||
ip = ip_port.split(':')[0]
|
||||
if ip not in allowed_ips:
|
||||
continue
|
||||
try:
|
||||
pool = workers[ip]
|
||||
pool.add(ip_port)
|
||||
except KeyError:
|
||||
workers[ip] = set([ip_port])
|
||||
|
||||
return list(toolz.itertoolz.concat(six.itervalues(workers)))
|
||||
|
||||
def execute(self, context, _, tasks):
|
||||
"""Executes task serialization on distributed nodes
|
||||
|
||||
:param context: the transaction context
|
||||
:param _: serializers factory
|
||||
:param tasks: the tasks to serialize
|
||||
:return sequence of serialized tasks
|
||||
"""
|
||||
logger.debug("Performing distributed tasks processing")
|
||||
sched_address = '{0}:{1}'.format(settings.MASTER_IP,
|
||||
settings.LCM_DS_JOB_SHEDULER_PORT)
|
||||
job_cluster = distributed.Client(sched_address)
|
||||
|
||||
allowed_ips = self._get_allowed_nodes_ips(context)
|
||||
workers = self._get_allowed_workers(job_cluster, allowed_ips)
|
||||
logger.debug("Allowed workers list for serialization: %s", workers)
|
||||
workers_ips = set([ip_port.split(':')[0] for ip_port in workers])
|
||||
logger.debug("Allowed workers ips list for serialization: %s",
|
||||
workers_ips)
|
||||
|
||||
task_context = Context(context)
|
||||
formatter_contexts_idx = {}
|
||||
workers_num = len(workers)
|
||||
max_jobs_in_queue = workers_num * settings.LCM_DS_NODE_LOAD_COEFF
|
||||
logger.debug("Max jobs allowed in queue: %s", max_jobs_in_queue)
|
||||
|
||||
start = datetime.datetime.utcnow()
|
||||
tasks_count = 0
|
||||
|
||||
try:
|
||||
self._upload_nailgun_code(job_cluster)
|
||||
scattered = self._scatter_data(job_cluster, context, workers)
|
||||
|
||||
for tasks_chunk in toolz.partition_all(
|
||||
settings.LCM_DS_TASKS_PER_JOB, tasks):
|
||||
|
||||
formatter_contexts_for_tasks = {}
|
||||
|
||||
# Collecting required contexts for tasks
|
||||
for task in tasks_chunk:
|
||||
node_id, task_data = task
|
||||
formatter_context = self._get_formatter_context(
|
||||
task_context, formatter_contexts_idx, node_id)
|
||||
if node_id not in formatter_contexts_for_tasks:
|
||||
formatter_contexts_for_tasks[node_id] = \
|
||||
formatter_context
|
||||
|
||||
logger.debug("Submitting job for tasks chunk: %s", tasks_chunk)
|
||||
job = job_cluster.submit(
|
||||
_distributed_serialize_tasks_for_node,
|
||||
formatter_contexts_for_tasks,
|
||||
tasks_chunk,
|
||||
scattered,
|
||||
workers=workers_ips
|
||||
)
|
||||
|
||||
self.sent_jobs.put(job)
|
||||
self.sent_jobs_count += 1
|
||||
|
||||
# We are limit the max number of tasks by the number of nodes
|
||||
# which are used in the serialization
|
||||
if self.sent_jobs_count >= max_jobs_in_queue:
|
||||
for result in self._consume_jobs(chunk_size=workers_num):
|
||||
tasks_count += 1
|
||||
yield result
|
||||
|
||||
# We have no tasks any more but have unconsumed jobs
|
||||
for result in self._consume_jobs():
|
||||
tasks_count += 1
|
||||
yield result
|
||||
finally:
|
||||
end = datetime.datetime.utcnow()
|
||||
logger.debug("Distributed tasks processing finished. "
|
||||
"Total time: %s. Tasks processed: %s",
|
||||
end - start, tasks_count)
|
||||
job_cluster.shutdown()
|
||||
|
||||
|
||||
def is_distributed_processing_enabled(context):
|
||||
common = context.new.get('common', {})
|
||||
return common.get('serialization_policy') == \
|
||||
consts.SERIALIZATION_POLICY.distributed
|
||||
|
||||
|
||||
def get_processing_policy(context):
|
||||
if is_distributed_processing_enabled(context):
|
||||
return DistributedProcessingPolicy()
|
||||
cpu_num = settings.LCM_SERIALIZERS_CONCURRENCY_FACTOR
|
||||
if not cpu_num:
|
||||
try:
|
||||
|
@ -162,7 +471,7 @@ class TransactionSerializer(object):
|
|||
# ids of nodes in this group and how many nodes in this group can fail
|
||||
# and deployment will not be interrupted
|
||||
self.fault_tolerance_groups = []
|
||||
self.concurrency_policy = get_concurrency_policy()
|
||||
self.processing_policy = get_processing_policy(context)
|
||||
|
||||
@classmethod
|
||||
def serialize(cls, context, tasks, resolver):
|
||||
|
@ -216,7 +525,7 @@ class TransactionSerializer(object):
|
|||
:param tasks: the deployment tasks
|
||||
:return the mapping tasks per node
|
||||
"""
|
||||
serialized = self.concurrency_policy.execute(
|
||||
serialized = self.processing_policy.execute(
|
||||
self.context,
|
||||
self.serializer_factory_class,
|
||||
self.expand_tasks(tasks)
|
||||
|
|
|
@ -221,6 +221,7 @@ class DeploymentMultinodeSerializer(object):
|
|||
'role': role,
|
||||
'vms_conf': node.vms_conf,
|
||||
'fail_if_error': role in self.critical_roles,
|
||||
'ip': node.ip,
|
||||
# TODO(eli): need to remove, requried for the fake thread only
|
||||
'online': node.online,
|
||||
}
|
||||
|
|
|
@ -38,7 +38,21 @@ class NailgunSettings(object):
|
|||
if test_config:
|
||||
settings_files.append(test_config)
|
||||
|
||||
self.config = {}
|
||||
# If settings.yaml doesn't exist we should have default
|
||||
# config structure. Nailgun without settings is used
|
||||
# when we distribute source code to the workers for
|
||||
# distributed serialization
|
||||
self.config = {
|
||||
'VERSION': {},
|
||||
'DATABASE': {
|
||||
'engine': 'postgresql',
|
||||
'name': '',
|
||||
'host': '',
|
||||
'port': '0',
|
||||
'user': '',
|
||||
'passwd': ''
|
||||
}
|
||||
}
|
||||
for sf in settings_files:
|
||||
try:
|
||||
logger.debug("Trying to read config file %s" % sf)
|
||||
|
@ -47,9 +61,9 @@ class NailgunSettings(object):
|
|||
logger.error("Error while reading config file %s: %s" %
|
||||
(sf, str(e)))
|
||||
|
||||
self.config['VERSION']['api'] = self.config['API']
|
||||
self.config['VERSION']['api'] = self.config.get('API')
|
||||
self.config['VERSION']['feature_groups'] = \
|
||||
self.config['FEATURE_GROUPS']
|
||||
self.config.get('FEATURE_GROUPS')
|
||||
|
||||
fuel_release = self.get_file_content(consts.FUEL_RELEASE_FILE)
|
||||
if fuel_release:
|
||||
|
@ -61,7 +75,7 @@ class NailgunSettings(object):
|
|||
self.config['VERSION']['openstack_version'] = \
|
||||
fuel_openstack_version
|
||||
|
||||
if int(self.config.get("DEVELOPMENT")):
|
||||
if int(self.config.get("DEVELOPMENT", 0)):
|
||||
logger.info("DEVELOPMENT MODE ON:")
|
||||
here = os.path.abspath(
|
||||
os.path.join(os.path.dirname(__file__), '..')
|
||||
|
|
|
@ -177,6 +177,15 @@ YAQL_MEMORY_QUOTA: 104857600
|
|||
|
||||
LCM_CHECK_TASK_VERSION: False
|
||||
|
||||
# Coefficient for calculation max jobs queue length. If jobs number reaches the
|
||||
# len(nodes) * load_coef we stop generate and start consume of jobs.
|
||||
LCM_DS_NODE_LOAD_COEFF: 2
|
||||
# Port of dask-scheduler on the master node
|
||||
LCM_DS_JOB_SHEDULER_PORT: 8002
|
||||
# Size of tasks chunk sending to the distributed worker
|
||||
LCM_DS_TASKS_PER_JOB: 100
|
||||
|
||||
|
||||
DPDK_MAX_CPUS_PER_NIC: 4
|
||||
|
||||
TRUNCATE_LOG_ENTRIES: 100
|
||||
|
|
|
@ -56,6 +56,16 @@ class InstallationInfo(object):
|
|||
'propagate_task_deploy', None),
|
||||
WhiteListRule(('common', 'security_groups', 'value'),
|
||||
'security_groups', None),
|
||||
WhiteListRule(('common', 'serialization_policy', 'value'),
|
||||
'serialization_policy', None),
|
||||
WhiteListRule(('common', 'ds_use_discover', 'value'),
|
||||
'ds_use_discover', None),
|
||||
WhiteListRule(('common', 'ds_use_provisioned', 'value'),
|
||||
'ds_use_provisioned', None),
|
||||
WhiteListRule(('common', 'ds_use_ready', 'value'),
|
||||
'ds_use_ready', None),
|
||||
WhiteListRule(('common', 'ds_use_error', 'value'),
|
||||
'ds_use_error', None),
|
||||
WhiteListRule(('corosync', 'verified', 'value'),
|
||||
'corosync_verified', None),
|
||||
|
||||
|
|
|
@ -187,6 +187,7 @@ class TestHandlers(BaseIntegrationTest):
|
|||
'fail_if_error': is_critical,
|
||||
'vms_conf': [],
|
||||
'fqdn': 'node-%d.%s' % (node.id, settings.DNS_DOMAIN),
|
||||
'ip': node.ip,
|
||||
|
||||
'network_data': {
|
||||
'eth1': {
|
||||
|
@ -603,6 +604,7 @@ class TestHandlers(BaseIntegrationTest):
|
|||
'online': node.online,
|
||||
'fail_if_error': is_critical,
|
||||
'fqdn': 'node-%d.%s' % (node.id, settings.DNS_DOMAIN),
|
||||
'ip': node.ip,
|
||||
'priority': 100,
|
||||
'vms_conf': [],
|
||||
'network_scheme': {
|
||||
|
@ -1096,6 +1098,7 @@ class TestHandlers(BaseIntegrationTest):
|
|||
'fail_if_error': is_critical,
|
||||
'fqdn': 'node-%d.%s' % (node.id, settings.DNS_DOMAIN),
|
||||
'priority': 100,
|
||||
'ip': node.ip,
|
||||
'vms_conf': [],
|
||||
|
||||
'network_scheme': {
|
||||
|
|
|
@ -14,20 +14,24 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import copy
|
||||
import exceptions
|
||||
import mock
|
||||
import multiprocessing.dummy
|
||||
|
||||
from nailgun import consts
|
||||
from nailgun import errors
|
||||
from nailgun import lcm
|
||||
from nailgun.lcm import TransactionContext
|
||||
from nailgun.settings import settings
|
||||
from nailgun.test.base import BaseTestCase
|
||||
from nailgun.utils.resolvers import TagResolver
|
||||
|
||||
from nailgun.test.base import BaseUnitTest
|
||||
|
||||
|
||||
class TestTransactionSerializer(BaseUnitTest):
|
||||
class TestTransactionSerializer(BaseTestCase):
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
super(TestTransactionSerializer, cls).setUpClass()
|
||||
cls.tasks = [
|
||||
{
|
||||
'id': 'task1', 'roles': ['controller'],
|
||||
|
@ -462,3 +466,344 @@ class TestTransactionSerializer(BaseUnitTest):
|
|||
9,
|
||||
lcm.TransactionSerializer.calculate_fault_tolerance('-1 ', 10)
|
||||
)
|
||||
|
||||
def _get_context_for_distributed_serialization(self):
|
||||
new = copy.deepcopy(self.context.new)
|
||||
new['common']['serialization_policy'] = \
|
||||
consts.SERIALIZATION_POLICY.distributed
|
||||
return TransactionContext(new)
|
||||
|
||||
@mock.patch('nailgun.lcm.transaction_serializer.distributed.wait')
|
||||
@mock.patch('nailgun.lcm.transaction_serializer.distributed.as_completed')
|
||||
def test_distributed_serialization(self, _, as_completed):
|
||||
context = self._get_context_for_distributed_serialization()
|
||||
|
||||
with mock.patch(
|
||||
'nailgun.lcm.transaction_serializer.distributed.Client'
|
||||
) as job_cluster:
|
||||
job = mock.Mock()
|
||||
job.result.return_value = [
|
||||
(('1', {"id": "task1", "type": "skipped"}), None)
|
||||
]
|
||||
|
||||
submit = mock.Mock()
|
||||
submit.return_value = job
|
||||
|
||||
as_completed.return_value = [job]
|
||||
|
||||
job_cluster.return_value.submit = submit
|
||||
job_cluster.return_value.scheduler_info.return_value = \
|
||||
{'workers': {'tcp://worker': {}}}
|
||||
|
||||
lcm.TransactionSerializer.serialize(
|
||||
context, self.tasks, self.resolver)
|
||||
self.assertTrue(submit.called)
|
||||
# 4 controller task + 1 compute + 1 cinder
|
||||
self.assertTrue(6, submit.call_count)
|
||||
|
||||
@mock.patch('nailgun.lcm.transaction_serializer.distributed.wait')
|
||||
@mock.patch('nailgun.lcm.transaction_serializer.distributed.as_completed')
|
||||
@mock.patch('nailgun.lcm.transaction_serializer.'
|
||||
'DistributedProcessingPolicy._get_formatter_context')
|
||||
def test_distributed_serialization_workers_scope(self, formatter_context,
|
||||
as_completed, _):
|
||||
context = self._get_context_for_distributed_serialization()
|
||||
|
||||
node_id = '1'
|
||||
task = {
|
||||
'id': 'task1', 'roles': ['controller'],
|
||||
'type': 'puppet', 'version': '2.0.0'
|
||||
}
|
||||
|
||||
with mock.patch(
|
||||
'nailgun.lcm.transaction_serializer.distributed.Client'
|
||||
) as job_cluster:
|
||||
|
||||
# Mocking job processing
|
||||
job = mock.Mock()
|
||||
job.result.return_value = [((node_id, task), None)]
|
||||
|
||||
submit = mock.Mock()
|
||||
submit.return_value = job
|
||||
|
||||
as_completed.return_value = [job]
|
||||
|
||||
scatter = mock.Mock()
|
||||
job_cluster.return_value.scatter = scatter
|
||||
|
||||
job_cluster.return_value.scatter.return_value = {}
|
||||
job_cluster.return_value.submit = submit
|
||||
|
||||
formatter_context.return_value = {node_id: {}}
|
||||
|
||||
# Configuring available workers
|
||||
job_cluster.return_value.scheduler_info.return_value = \
|
||||
{
|
||||
'workers': {
|
||||
'tcp://{0}'.format(settings.MASTER_IP): {},
|
||||
'tcp://192.168.0.1:33334': {},
|
||||
'tcp://127.0.0.2:33335': {},
|
||||
}
|
||||
}
|
||||
|
||||
# Performing serialization
|
||||
lcm.TransactionSerializer.serialize(
|
||||
context, [task], self.resolver
|
||||
)
|
||||
|
||||
# Checking data is scattered only to expected workers
|
||||
scatter.assert_called_once()
|
||||
scatter.assert_called_with(
|
||||
{'context': context, 'settings_config': settings.config},
|
||||
broadcast=True,
|
||||
workers=[settings.MASTER_IP]
|
||||
)
|
||||
|
||||
# Checking submit job only to expected workers
|
||||
submit.assert_called_once()
|
||||
serializer = lcm.transaction_serializer
|
||||
submit.assert_called_with(
|
||||
serializer._distributed_serialize_tasks_for_node,
|
||||
{node_id: formatter_context()},
|
||||
((node_id, task),),
|
||||
job_cluster().scatter(),
|
||||
workers=set([settings.MASTER_IP])
|
||||
)
|
||||
|
||||
def test_distributed_serialization_get_allowed_nodes_ips(self):
|
||||
policy = lcm.transaction_serializer.DistributedProcessingPolicy()
|
||||
|
||||
context_data = {
|
||||
'common': {
|
||||
'serialization_policy':
|
||||
consts.SERIALIZATION_POLICY.distributed,
|
||||
'ds_use_error': True,
|
||||
'ds_use_provisioned': True,
|
||||
'ds_use_discover': True,
|
||||
'ds_use_ready': False
|
||||
},
|
||||
'nodes': {
|
||||
'1': {'status': consts.NODE_STATUSES.error,
|
||||
'ip': '10.20.0.3'},
|
||||
'2': {'status': consts.NODE_STATUSES.provisioned,
|
||||
'ip': '10.20.0.4'},
|
||||
'3': {'status': consts.NODE_STATUSES.discover,
|
||||
'ip': '10.20.0.5'},
|
||||
'4': {'status': consts.NODE_STATUSES.ready,
|
||||
'ip': '10.20.0.6'},
|
||||
}
|
||||
}
|
||||
|
||||
actual = policy._get_allowed_nodes_ips(
|
||||
TransactionContext(context_data))
|
||||
self.assertItemsEqual(
|
||||
[settings.MASTER_IP, '10.20.0.3', '10.20.0.4', '10.20.0.5'],
|
||||
actual
|
||||
)
|
||||
|
||||
def test_distributed_serialization_get_allowed_nodes_statuses(self):
|
||||
policy = lcm.transaction_serializer.DistributedProcessingPolicy()
|
||||
context_data = {}
|
||||
actual = policy._get_allowed_nodes_statuses(
|
||||
TransactionContext(context_data))
|
||||
self.assertItemsEqual([], actual)
|
||||
|
||||
context_data['common'] = {
|
||||
'ds_use_discover': False,
|
||||
'ds_use_provisioned': False,
|
||||
'ds_use_error': False,
|
||||
'ds_use_ready': False
|
||||
}
|
||||
actual = policy._get_allowed_nodes_statuses(
|
||||
TransactionContext(context_data))
|
||||
self.assertItemsEqual([], actual)
|
||||
|
||||
context_data['common']['ds_use_discover'] = True
|
||||
actual = policy._get_allowed_nodes_statuses(
|
||||
TransactionContext(context_data))
|
||||
expected = [consts.NODE_STATUSES.discover]
|
||||
self.assertItemsEqual(expected, actual)
|
||||
|
||||
context_data['common']['ds_use_provisioned'] = True
|
||||
actual = policy._get_allowed_nodes_statuses(
|
||||
TransactionContext(context_data))
|
||||
expected = [consts.NODE_STATUSES.discover,
|
||||
consts.NODE_STATUSES.provisioned]
|
||||
self.assertItemsEqual(expected, actual)
|
||||
|
||||
context_data['common']['ds_use_error'] = True
|
||||
actual = policy._get_allowed_nodes_statuses(
|
||||
TransactionContext(context_data))
|
||||
expected = [consts.NODE_STATUSES.discover,
|
||||
consts.NODE_STATUSES.provisioned,
|
||||
consts.NODE_STATUSES.error]
|
||||
self.assertItemsEqual(expected, actual)
|
||||
|
||||
context_data['common']['ds_use_ready'] = True
|
||||
actual = policy._get_allowed_nodes_statuses(
|
||||
TransactionContext(context_data))
|
||||
expected = [consts.NODE_STATUSES.discover,
|
||||
consts.NODE_STATUSES.provisioned,
|
||||
consts.NODE_STATUSES.error,
|
||||
consts.NODE_STATUSES.ready]
|
||||
self.assertItemsEqual(expected, actual)
|
||||
|
||||
def test_distributed_serialization_get_allowed_workers(self):
|
||||
policy = lcm.transaction_serializer.DistributedProcessingPolicy()
|
||||
|
||||
with mock.patch(
|
||||
'nailgun.lcm.transaction_serializer.distributed.Client'
|
||||
) as job_cluster:
|
||||
job_cluster.scheduler_info.return_value = \
|
||||
{'workers': {
|
||||
'tcp://10.20.0.2:1': {},
|
||||
'tcp://10.20.0.2:2': {},
|
||||
'tcp://10.20.0.3:1': {},
|
||||
'tcp://10.20.0.3:2': {},
|
||||
'tcp://10.20.0.3:3': {},
|
||||
'tcp://10.20.0.4:1': {},
|
||||
'tcp://10.20.0.5:1': {}
|
||||
}}
|
||||
allowed_ips = set(['10.20.0.2', '10.20.0.3', '10.20.0.5'])
|
||||
|
||||
expected = ['10.20.0.2:1', '10.20.0.2:2', '10.20.0.3:1',
|
||||
'10.20.0.3:2', '10.20.0.3:3', '10.20.0.5:1']
|
||||
actual = policy._get_allowed_workers(job_cluster, allowed_ips)
|
||||
self.assertItemsEqual(expected, actual)
|
||||
|
||||
def test_distributed_serialization_serialize_task(self):
|
||||
task = {
|
||||
'id': 'task1', 'roles': ['controller'],
|
||||
'type': 'puppet', 'version': '2.0.0',
|
||||
'parameters': {
|
||||
'master_ip': '{MN_IP}',
|
||||
'host': {'yaql_exp': '$.public_ssl.hostname'},
|
||||
'attr': {'yaql_exp': '$node.attributes.a_str'}
|
||||
}
|
||||
}
|
||||
|
||||
formatter_contexts_idx = {
|
||||
'1': {'MN_IP': '10.0.0.1'},
|
||||
'2': {}
|
||||
}
|
||||
scattered_data = {
|
||||
'settings_config': settings.config,
|
||||
'context': self.context
|
||||
}
|
||||
|
||||
serializer = lcm.transaction_serializer
|
||||
actual = serializer._distributed_serialize_tasks_for_node(
|
||||
formatter_contexts_idx, [('1', task), ('2', task)], scattered_data)
|
||||
|
||||
expected = [
|
||||
(
|
||||
(
|
||||
'1',
|
||||
{
|
||||
'id': 'task1',
|
||||
'type': 'puppet',
|
||||
'parameters': {
|
||||
'cwd': '/',
|
||||
'master_ip': '10.0.0.1',
|
||||
'host': 'localhost',
|
||||
'attr': 'text1'
|
||||
},
|
||||
'fail_on_error': True
|
||||
}
|
||||
),
|
||||
None
|
||||
),
|
||||
(
|
||||
(
|
||||
'2',
|
||||
{
|
||||
'id': 'task1',
|
||||
'type': 'puppet',
|
||||
'parameters': {
|
||||
'cwd': '/',
|
||||
'master_ip': '{MN_IP}',
|
||||
'host': 'localhost',
|
||||
'attr': 'text2'
|
||||
},
|
||||
'fail_on_error': True
|
||||
}
|
||||
),
|
||||
None
|
||||
)
|
||||
]
|
||||
|
||||
self.assertItemsEqual(expected, actual)
|
||||
|
||||
def test_distributed_serialization_serialize_task_failure(self):
|
||||
task = {
|
||||
'id': 'task1', 'roles': ['controller'],
|
||||
'type': 'puppet', 'version': '2.0.0',
|
||||
'parameters': {
|
||||
'fake': {'yaql_exp': '$.some.fake_param'}
|
||||
}
|
||||
}
|
||||
|
||||
formatter_contexts_idx = {'2': {}}
|
||||
scattered_data = {
|
||||
'settings_config': settings.config,
|
||||
'context': self.context
|
||||
}
|
||||
|
||||
serializer = lcm.transaction_serializer
|
||||
result = serializer._distributed_serialize_tasks_for_node(
|
||||
formatter_contexts_idx, [('2', task)], scattered_data)
|
||||
(_, __), err = result[0]
|
||||
self.assertIsInstance(err, exceptions.KeyError)
|
||||
|
||||
|
||||
class TestConcurrencyPolicy(BaseTestCase):
|
||||
|
||||
@mock.patch(
|
||||
'nailgun.lcm.transaction_serializer.multiprocessing.cpu_count',
|
||||
return_value=1
|
||||
)
|
||||
def test_one_cpu(self, cpu_count):
|
||||
policy = lcm.transaction_serializer.get_processing_policy(
|
||||
lcm.TransactionContext({}))
|
||||
self.assertIsInstance(
|
||||
policy,
|
||||
lcm.transaction_serializer.SingleWorkerConcurrencyPolicy
|
||||
)
|
||||
self.assertTrue(cpu_count.is_called)
|
||||
|
||||
@mock.patch(
|
||||
'nailgun.lcm.transaction_serializer.multiprocessing.cpu_count',
|
||||
return_value=0
|
||||
)
|
||||
def test_zero_cpu(self, cpu_count):
|
||||
policy = lcm.transaction_serializer.get_processing_policy(
|
||||
lcm.TransactionContext({}))
|
||||
self.assertIsInstance(
|
||||
policy,
|
||||
lcm.transaction_serializer.SingleWorkerConcurrencyPolicy
|
||||
)
|
||||
self.assertTrue(cpu_count.is_called)
|
||||
|
||||
@mock.patch(
|
||||
'nailgun.lcm.transaction_serializer.multiprocessing.cpu_count',
|
||||
side_effect=NotImplementedError
|
||||
)
|
||||
def test_cpu_count_not_implemented(self, cpu_count):
|
||||
policy = lcm.transaction_serializer.get_processing_policy(
|
||||
lcm.TransactionContext({}))
|
||||
self.assertIsInstance(
|
||||
policy,
|
||||
lcm.transaction_serializer.SingleWorkerConcurrencyPolicy
|
||||
)
|
||||
self.assertTrue(cpu_count.is_called)
|
||||
|
||||
def test_distributed_serialization_enabled_in_cluster(self):
|
||||
context_data = {'common': {
|
||||
'serialization_policy': consts.SERIALIZATION_POLICY.distributed
|
||||
}}
|
||||
policy = lcm.transaction_serializer.get_processing_policy(
|
||||
lcm.TransactionContext(context_data))
|
||||
self.assertIsInstance(
|
||||
policy,
|
||||
lcm.transaction_serializer.DistributedProcessingPolicy
|
||||
)
|
||||
|
|
|
@ -47,3 +47,5 @@ stevedore>=1.5.0
|
|||
# See: https://bugs.launchpad.net/fuel/+bug/1519727
|
||||
setuptools<=18.5
|
||||
yaql>=1.0.0
|
||||
# Distributed nodes serialization
|
||||
distributed==1.16.0
|
||||
|
|
Loading…
Reference in New Issue