Merge "Distributed serialization implementation"

This commit is contained in:
Jenkins 2017-03-24 19:51:59 +00:00 committed by Gerrit Code Review
commit 1c3ef48a2e
11 changed files with 769 additions and 14 deletions

View File

@ -532,3 +532,7 @@ DPDK_DRIVER_IN_SRIOV_CASE = 'vfio-pci'
DEFAULT_MTU = 1500
SIZE_OF_VLAN_TAG = 4
SERIALIZATION_POLICY = Enum(
'distributed'
)

View File

@ -1114,6 +1114,55 @@
group: "security"
weight: 20
type: "radio"
serialization_policy:
value: "default"
values:
- data: "default"
label: "Default serialization"
description: "Run serialization on the master node only"
- data: "distributed"
label: "Distributed serialization"
description: "Run serialization on the master and environment nodes. Nodes for serialization are selected only form that environment for wich serialization is performing."
label: "Serialization policy"
group: "general"
weight: 30
type: "radio"
ds_use_discover:
group: "general"
label: "Use discovered nodes as workers for serialization"
type: "checkbox"
value: true
weight: 31
restrictions:
- condition: "settings:common.serialization_policy.value != 'distributed'"
action: "hide"
ds_use_provisioned:
group: "general"
label: "Use provisioned nodes as workers for serialization"
type: "checkbox"
value: true
weight: 32
restrictions:
- condition: "settings:common.serialization_policy.value != 'distributed'"
action: "hide"
ds_use_error:
group: "general"
label: "Use nodes in error state as workers for serialization"
type: "checkbox"
value: true
weight: 33
restrictions:
- condition: "settings:common.serialization_policy.value != 'distributed'"
action: "hide"
ds_use_ready:
group: "general"
label: "Use ready nodes as workers for serialization"
type: "checkbox"
value: false
weight: 34
restrictions:
- condition: "settings:common.serialization_policy.value != 'distributed'"
action: "hide"
public_network_assignment:
metadata:
weight: 10

View File

@ -110,6 +110,8 @@ class Context(object):
return evaluate
def get_formatter_context(self, node_id):
# TODO(akislitsky) remove formatter context from the
# tasks serialization workflow
data = self._transaction.get_new_data(node_id)
return {
'CLUSTER_ID': data.get('cluster', {}).get('id'),
@ -147,9 +149,14 @@ class DeploymentTaskSerializer(object):
:return: the result
"""
def serialize(self, node_id):
"""Serialize task in expected by orchestrator format.
def serialize(self, node_id, formatter_context=None):
"""Serialize task in expected by orchestrator format
If serialization is performed on the remote worker
we should pass formatter_context parameter with values
from the master node settings
:param formatter_context: formatter context
:param node_id: the target node_id
"""
@ -157,10 +164,12 @@ class DeploymentTaskSerializer(object):
"serialize task %s for node %s",
self.task_template['id'], node_id
)
formatter_context = formatter_context \
or self.context.get_formatter_context(node_id)
task = utils.traverse(
self.task_template,
utils.text_format_safe,
self.context.get_formatter_context(node_id),
formatter_context,
{
'yaql_exp': self.context.get_yaql_interpreter(
node_id, self.task_template['id'])

View File

@ -14,13 +14,21 @@
# License for the specific language governing permissions and limitations
# under the License.
from distutils.version import StrictVersion
import datetime
import multiprocessing
import os
from Queue import Queue
import shutil
import tempfile
import distributed
from distutils.version import StrictVersion
import six
import toolz
from nailgun import consts
from nailgun import errors
from nailgun.lcm.task_serializer import Context
from nailgun.lcm.task_serializer import TasksSerializersFactory
from nailgun.logger import logger
from nailgun.settings import settings
@ -128,7 +136,308 @@ class MultiProcessingConcurrencyPolicy(object):
pool.join()
def get_concurrency_policy():
def _distributed_serialize_tasks_for_node(formatter_contexts_idx,
node_and_tasks, scattered_data):
"""Remote serialization call for DistributedProcessingPolicy
Code of the function is copied to the workers and executed there, thus
we are including all required imports inside the function.
:param formatter_contexts_idx: dict of formatter contexts with node_id
value as key
:param node_and_tasks: list of node_id, task_data tuples
:param scattered_data: feature object, that points to data copied to
workers
:return: [(node_id, serialized), error]
"""
try:
factory = TasksSerializersFactory(scattered_data['context'])
# Restoring settings
settings.config = scattered_data['settings_config']
for k in formatter_contexts_idx:
formatter_contexts_idx[k]['SETTINGS'] = settings
except Exception as e:
logger.exception("Distributed serialization failed")
return [((None, None), e)]
result = []
for node_and_task in node_and_tasks:
node_id = None
try:
node_id, task = node_and_task
logger.debug("Starting distributed node %s task %s serialization",
node_id, task['id'])
formatter_context = formatter_contexts_idx[node_id]
serializer = factory.create_serializer(task)
serialized = serializer.serialize(
node_id, formatter_context=formatter_context)
logger.debug("Distributed node %s task %s serialization "
"result: %s", node_id, task['id'], serialized)
result.append(((node_id, serialized), None))
except Exception as e:
logger.exception("Distributed serialization failed")
result.append(((node_id, None), e))
break
logger.debug("Processed tasks count: %s", len(result))
return result
class DistributedProcessingPolicy(object):
def __init__(self):
self.sent_jobs = Queue()
self.sent_jobs_count = 0
def _consume_jobs(self, chunk_size=None):
"""Consumes jobs
If chunk_size is set function consumes specified number of
Finished tasks or less if sent_jobs_ids queue became empty.
If chunk_size is None function consumes jobs until
sent_jobs_ids queue became empty.
Jobs with statuses Cancelled, Abandoned, Terminated will be
resent and their ids added to sent_jobs_ids queue
:param chunk_size: size of consuming chunk
:return: generator on job results
"""
logger.debug("Consuming jobs started")
jobs_to_consume = []
while not self.sent_jobs.empty():
job = self.sent_jobs.get()
jobs_to_consume.append(job)
if chunk_size is not None:
chunk_size -= 1
if chunk_size <= 0:
break
for ready_job in distributed.as_completed(jobs_to_consume):
results = ready_job.result()
self.sent_jobs_count -= 1
for result in results:
(node_id, serialized), exc = result
logger.debug("Got ready task for node %s, serialized: %s, "
"error: %s", node_id, serialized, exc)
if exc is not None:
raise exc
yield node_id, serialized
logger.debug("Consuming jobs finished")
def _get_formatter_context(self, task_context, formatter_contexts_idx,
node_id):
try:
return formatter_contexts_idx[node_id]
except KeyError:
pass
logger.debug("Calculating formatter context for node %s", node_id)
formatter_context = task_context.get_formatter_context(
node_id)
# Settings file is already sent to the workers
formatter_context.pop('SETTINGS', None)
formatter_contexts_idx[node_id] = formatter_context
return formatter_context
def _upload_nailgun_code(self, job_cluster):
"""Creates zip of current nailgun code and uploads it to workers
TODO(akislitsky): add workers scope when it will be implemented
in the distributed library
:param job_cluster: distributed.Client
"""
logger.debug("Compressing nailgun code")
file_dir = os.path.dirname(__file__)
nailgun_root_dir = os.path.realpath(os.path.join(file_dir, '..', '..'))
archive = os.path.join(tempfile.gettempdir(), 'nailgun')
result = shutil.make_archive(archive, 'zip', nailgun_root_dir,
'nailgun')
logger.debug("Nailgun code saved to: %s", result)
logger.debug("Uploading nailgun archive %s to workers", result)
job_cluster.upload_file(result)
def _scatter_data(self, job_cluster, context, workers):
logger.debug("Scattering data to workers started")
shared_data = {'context': context, 'settings_config': settings.config}
scattered = job_cluster.scatter(shared_data, broadcast=True,
workers=workers)
# Waiting data is scattered to workers
distributed.wait(scattered.values())
logger.debug("Scattering data to workers finished")
return scattered
def _get_allowed_nodes_statuses(self, context):
"""Extracts node statuses that allows distributed serialization"""
common = context.new.get('common', {})
cluster = common.get('cluster', {})
logger.debug("Getting allowed nodes statuses to use as serialization "
"workers for cluster %s", cluster.get('id'))
check_fields = {
'ds_use_ready': consts.NODE_STATUSES.ready,
'ds_use_provisioned': consts.NODE_STATUSES.provisioned,
'ds_use_discover': consts.NODE_STATUSES.discover,
'ds_use_error': consts.NODE_STATUSES.error
}
statuses = set()
for field, node_status in check_fields.items():
if common.get(field):
statuses.add(node_status)
logger.debug("Allowed nodes statuses to use as serialization workers "
"for cluster %s are: %s", cluster.get('id'), statuses)
return statuses
def _get_allowed_nodes_ips(self, context):
"""Filters online nodes from cluster by their status
In the cluster settings we select nodes statuses allowed for
using in the distributed serialization. Accordingly to selected
statuses nodes are going to be filtered.
:param context: TransactionContext
:return: set of allowed nodes ips
"""
ips = set()
allowed_statuses = self._get_allowed_nodes_statuses(context)
for node in six.itervalues(context.new.get('nodes', {})):
if node.get('status') in allowed_statuses:
ips.add(node.get('ip'))
ips.add(settings.MASTER_IP)
return ips
def _get_allowed_workers(self, job_cluster, allowed_ips):
"""Calculates workers addresses for distributed serialization
Only workers that placed on the allowed nodes must be selected
for the serialization.
:param job_cluster: distributed.Client
:param allowed_ips: allowed for serialization nodes ips
:return: list of workers addresses in format 'ip:port'
"""
logger.debug("Getting allowed workers")
workers = {}
# Worker has address like tcp://ip:port
info = job_cluster.scheduler_info()
for worker_addr in six.iterkeys(info['workers']):
ip_port = worker_addr.split('//')[1]
ip = ip_port.split(':')[0]
if ip not in allowed_ips:
continue
try:
pool = workers[ip]
pool.add(ip_port)
except KeyError:
workers[ip] = set([ip_port])
return list(toolz.itertoolz.concat(six.itervalues(workers)))
def execute(self, context, _, tasks):
"""Executes task serialization on distributed nodes
:param context: the transaction context
:param _: serializers factory
:param tasks: the tasks to serialize
:return sequence of serialized tasks
"""
logger.debug("Performing distributed tasks processing")
sched_address = '{0}:{1}'.format(settings.MASTER_IP,
settings.LCM_DS_JOB_SHEDULER_PORT)
job_cluster = distributed.Client(sched_address)
allowed_ips = self._get_allowed_nodes_ips(context)
workers = self._get_allowed_workers(job_cluster, allowed_ips)
logger.debug("Allowed workers list for serialization: %s", workers)
workers_ips = set([ip_port.split(':')[0] for ip_port in workers])
logger.debug("Allowed workers ips list for serialization: %s",
workers_ips)
task_context = Context(context)
formatter_contexts_idx = {}
workers_num = len(workers)
max_jobs_in_queue = workers_num * settings.LCM_DS_NODE_LOAD_COEFF
logger.debug("Max jobs allowed in queue: %s", max_jobs_in_queue)
start = datetime.datetime.utcnow()
tasks_count = 0
try:
self._upload_nailgun_code(job_cluster)
scattered = self._scatter_data(job_cluster, context, workers)
for tasks_chunk in toolz.partition_all(
settings.LCM_DS_TASKS_PER_JOB, tasks):
formatter_contexts_for_tasks = {}
# Collecting required contexts for tasks
for task in tasks_chunk:
node_id, task_data = task
formatter_context = self._get_formatter_context(
task_context, formatter_contexts_idx, node_id)
if node_id not in formatter_contexts_for_tasks:
formatter_contexts_for_tasks[node_id] = \
formatter_context
logger.debug("Submitting job for tasks chunk: %s", tasks_chunk)
job = job_cluster.submit(
_distributed_serialize_tasks_for_node,
formatter_contexts_for_tasks,
tasks_chunk,
scattered,
workers=workers_ips
)
self.sent_jobs.put(job)
self.sent_jobs_count += 1
# We are limit the max number of tasks by the number of nodes
# which are used in the serialization
if self.sent_jobs_count >= max_jobs_in_queue:
for result in self._consume_jobs(chunk_size=workers_num):
tasks_count += 1
yield result
# We have no tasks any more but have unconsumed jobs
for result in self._consume_jobs():
tasks_count += 1
yield result
finally:
end = datetime.datetime.utcnow()
logger.debug("Distributed tasks processing finished. "
"Total time: %s. Tasks processed: %s",
end - start, tasks_count)
job_cluster.shutdown()
def is_distributed_processing_enabled(context):
common = context.new.get('common', {})
return common.get('serialization_policy') == \
consts.SERIALIZATION_POLICY.distributed
def get_processing_policy(context):
if is_distributed_processing_enabled(context):
return DistributedProcessingPolicy()
cpu_num = settings.LCM_SERIALIZERS_CONCURRENCY_FACTOR
if not cpu_num:
try:
@ -162,7 +471,7 @@ class TransactionSerializer(object):
# ids of nodes in this group and how many nodes in this group can fail
# and deployment will not be interrupted
self.fault_tolerance_groups = []
self.concurrency_policy = get_concurrency_policy()
self.processing_policy = get_processing_policy(context)
@classmethod
def serialize(cls, context, tasks, resolver):
@ -216,7 +525,7 @@ class TransactionSerializer(object):
:param tasks: the deployment tasks
:return the mapping tasks per node
"""
serialized = self.concurrency_policy.execute(
serialized = self.processing_policy.execute(
self.context,
self.serializer_factory_class,
self.expand_tasks(tasks)

View File

@ -221,6 +221,7 @@ class DeploymentMultinodeSerializer(object):
'role': role,
'vms_conf': node.vms_conf,
'fail_if_error': role in self.critical_roles,
'ip': node.ip,
# TODO(eli): need to remove, requried for the fake thread only
'online': node.online,
}

View File

@ -38,7 +38,21 @@ class NailgunSettings(object):
if test_config:
settings_files.append(test_config)
self.config = {}
# If settings.yaml doesn't exist we should have default
# config structure. Nailgun without settings is used
# when we distribute source code to the workers for
# distributed serialization
self.config = {
'VERSION': {},
'DATABASE': {
'engine': 'postgresql',
'name': '',
'host': '',
'port': '0',
'user': '',
'passwd': ''
}
}
for sf in settings_files:
try:
logger.debug("Trying to read config file %s" % sf)
@ -47,9 +61,9 @@ class NailgunSettings(object):
logger.error("Error while reading config file %s: %s" %
(sf, str(e)))
self.config['VERSION']['api'] = self.config['API']
self.config['VERSION']['api'] = self.config.get('API')
self.config['VERSION']['feature_groups'] = \
self.config['FEATURE_GROUPS']
self.config.get('FEATURE_GROUPS')
fuel_release = self.get_file_content(consts.FUEL_RELEASE_FILE)
if fuel_release:
@ -61,7 +75,7 @@ class NailgunSettings(object):
self.config['VERSION']['openstack_version'] = \
fuel_openstack_version
if int(self.config.get("DEVELOPMENT")):
if int(self.config.get("DEVELOPMENT", 0)):
logger.info("DEVELOPMENT MODE ON:")
here = os.path.abspath(
os.path.join(os.path.dirname(__file__), '..')

View File

@ -177,6 +177,15 @@ YAQL_MEMORY_QUOTA: 104857600
LCM_CHECK_TASK_VERSION: False
# Coefficient for calculation max jobs queue length. If jobs number reaches the
# len(nodes) * load_coef we stop generate and start consume of jobs.
LCM_DS_NODE_LOAD_COEFF: 2
# Port of dask-scheduler on the master node
LCM_DS_JOB_SHEDULER_PORT: 8002
# Size of tasks chunk sending to the distributed worker
LCM_DS_TASKS_PER_JOB: 100
DPDK_MAX_CPUS_PER_NIC: 4
TRUNCATE_LOG_ENTRIES: 100

View File

@ -56,6 +56,16 @@ class InstallationInfo(object):
'propagate_task_deploy', None),
WhiteListRule(('common', 'security_groups', 'value'),
'security_groups', None),
WhiteListRule(('common', 'serialization_policy', 'value'),
'serialization_policy', None),
WhiteListRule(('common', 'ds_use_discover', 'value'),
'ds_use_discover', None),
WhiteListRule(('common', 'ds_use_provisioned', 'value'),
'ds_use_provisioned', None),
WhiteListRule(('common', 'ds_use_ready', 'value'),
'ds_use_ready', None),
WhiteListRule(('common', 'ds_use_error', 'value'),
'ds_use_error', None),
WhiteListRule(('corosync', 'verified', 'value'),
'corosync_verified', None),

View File

@ -187,6 +187,7 @@ class TestHandlers(BaseIntegrationTest):
'fail_if_error': is_critical,
'vms_conf': [],
'fqdn': 'node-%d.%s' % (node.id, settings.DNS_DOMAIN),
'ip': node.ip,
'network_data': {
'eth1': {
@ -603,6 +604,7 @@ class TestHandlers(BaseIntegrationTest):
'online': node.online,
'fail_if_error': is_critical,
'fqdn': 'node-%d.%s' % (node.id, settings.DNS_DOMAIN),
'ip': node.ip,
'priority': 100,
'vms_conf': [],
'network_scheme': {
@ -1096,6 +1098,7 @@ class TestHandlers(BaseIntegrationTest):
'fail_if_error': is_critical,
'fqdn': 'node-%d.%s' % (node.id, settings.DNS_DOMAIN),
'priority': 100,
'ip': node.ip,
'vms_conf': [],
'network_scheme': {

View File

@ -14,20 +14,24 @@
# License for the specific language governing permissions and limitations
# under the License.
import copy
import exceptions
import mock
import multiprocessing.dummy
from nailgun import consts
from nailgun import errors
from nailgun import lcm
from nailgun.lcm import TransactionContext
from nailgun.settings import settings
from nailgun.test.base import BaseTestCase
from nailgun.utils.resolvers import TagResolver
from nailgun.test.base import BaseUnitTest
class TestTransactionSerializer(BaseUnitTest):
class TestTransactionSerializer(BaseTestCase):
@classmethod
def setUpClass(cls):
super(TestTransactionSerializer, cls).setUpClass()
cls.tasks = [
{
'id': 'task1', 'roles': ['controller'],
@ -462,3 +466,344 @@ class TestTransactionSerializer(BaseUnitTest):
9,
lcm.TransactionSerializer.calculate_fault_tolerance('-1 ', 10)
)
def _get_context_for_distributed_serialization(self):
new = copy.deepcopy(self.context.new)
new['common']['serialization_policy'] = \
consts.SERIALIZATION_POLICY.distributed
return TransactionContext(new)
@mock.patch('nailgun.lcm.transaction_serializer.distributed.wait')
@mock.patch('nailgun.lcm.transaction_serializer.distributed.as_completed')
def test_distributed_serialization(self, _, as_completed):
context = self._get_context_for_distributed_serialization()
with mock.patch(
'nailgun.lcm.transaction_serializer.distributed.Client'
) as job_cluster:
job = mock.Mock()
job.result.return_value = [
(('1', {"id": "task1", "type": "skipped"}), None)
]
submit = mock.Mock()
submit.return_value = job
as_completed.return_value = [job]
job_cluster.return_value.submit = submit
job_cluster.return_value.scheduler_info.return_value = \
{'workers': {'tcp://worker': {}}}
lcm.TransactionSerializer.serialize(
context, self.tasks, self.resolver)
self.assertTrue(submit.called)
# 4 controller task + 1 compute + 1 cinder
self.assertTrue(6, submit.call_count)
@mock.patch('nailgun.lcm.transaction_serializer.distributed.wait')
@mock.patch('nailgun.lcm.transaction_serializer.distributed.as_completed')
@mock.patch('nailgun.lcm.transaction_serializer.'
'DistributedProcessingPolicy._get_formatter_context')
def test_distributed_serialization_workers_scope(self, formatter_context,
as_completed, _):
context = self._get_context_for_distributed_serialization()
node_id = '1'
task = {
'id': 'task1', 'roles': ['controller'],
'type': 'puppet', 'version': '2.0.0'
}
with mock.patch(
'nailgun.lcm.transaction_serializer.distributed.Client'
) as job_cluster:
# Mocking job processing
job = mock.Mock()
job.result.return_value = [((node_id, task), None)]
submit = mock.Mock()
submit.return_value = job
as_completed.return_value = [job]
scatter = mock.Mock()
job_cluster.return_value.scatter = scatter
job_cluster.return_value.scatter.return_value = {}
job_cluster.return_value.submit = submit
formatter_context.return_value = {node_id: {}}
# Configuring available workers
job_cluster.return_value.scheduler_info.return_value = \
{
'workers': {
'tcp://{0}'.format(settings.MASTER_IP): {},
'tcp://192.168.0.1:33334': {},
'tcp://127.0.0.2:33335': {},
}
}
# Performing serialization
lcm.TransactionSerializer.serialize(
context, [task], self.resolver
)
# Checking data is scattered only to expected workers
scatter.assert_called_once()
scatter.assert_called_with(
{'context': context, 'settings_config': settings.config},
broadcast=True,
workers=[settings.MASTER_IP]
)
# Checking submit job only to expected workers
submit.assert_called_once()
serializer = lcm.transaction_serializer
submit.assert_called_with(
serializer._distributed_serialize_tasks_for_node,
{node_id: formatter_context()},
((node_id, task),),
job_cluster().scatter(),
workers=set([settings.MASTER_IP])
)
def test_distributed_serialization_get_allowed_nodes_ips(self):
policy = lcm.transaction_serializer.DistributedProcessingPolicy()
context_data = {
'common': {
'serialization_policy':
consts.SERIALIZATION_POLICY.distributed,
'ds_use_error': True,
'ds_use_provisioned': True,
'ds_use_discover': True,
'ds_use_ready': False
},
'nodes': {
'1': {'status': consts.NODE_STATUSES.error,
'ip': '10.20.0.3'},
'2': {'status': consts.NODE_STATUSES.provisioned,
'ip': '10.20.0.4'},
'3': {'status': consts.NODE_STATUSES.discover,
'ip': '10.20.0.5'},
'4': {'status': consts.NODE_STATUSES.ready,
'ip': '10.20.0.6'},
}
}
actual = policy._get_allowed_nodes_ips(
TransactionContext(context_data))
self.assertItemsEqual(
[settings.MASTER_IP, '10.20.0.3', '10.20.0.4', '10.20.0.5'],
actual
)
def test_distributed_serialization_get_allowed_nodes_statuses(self):
policy = lcm.transaction_serializer.DistributedProcessingPolicy()
context_data = {}
actual = policy._get_allowed_nodes_statuses(
TransactionContext(context_data))
self.assertItemsEqual([], actual)
context_data['common'] = {
'ds_use_discover': False,
'ds_use_provisioned': False,
'ds_use_error': False,
'ds_use_ready': False
}
actual = policy._get_allowed_nodes_statuses(
TransactionContext(context_data))
self.assertItemsEqual([], actual)
context_data['common']['ds_use_discover'] = True
actual = policy._get_allowed_nodes_statuses(
TransactionContext(context_data))
expected = [consts.NODE_STATUSES.discover]
self.assertItemsEqual(expected, actual)
context_data['common']['ds_use_provisioned'] = True
actual = policy._get_allowed_nodes_statuses(
TransactionContext(context_data))
expected = [consts.NODE_STATUSES.discover,
consts.NODE_STATUSES.provisioned]
self.assertItemsEqual(expected, actual)
context_data['common']['ds_use_error'] = True
actual = policy._get_allowed_nodes_statuses(
TransactionContext(context_data))
expected = [consts.NODE_STATUSES.discover,
consts.NODE_STATUSES.provisioned,
consts.NODE_STATUSES.error]
self.assertItemsEqual(expected, actual)
context_data['common']['ds_use_ready'] = True
actual = policy._get_allowed_nodes_statuses(
TransactionContext(context_data))
expected = [consts.NODE_STATUSES.discover,
consts.NODE_STATUSES.provisioned,
consts.NODE_STATUSES.error,
consts.NODE_STATUSES.ready]
self.assertItemsEqual(expected, actual)
def test_distributed_serialization_get_allowed_workers(self):
policy = lcm.transaction_serializer.DistributedProcessingPolicy()
with mock.patch(
'nailgun.lcm.transaction_serializer.distributed.Client'
) as job_cluster:
job_cluster.scheduler_info.return_value = \
{'workers': {
'tcp://10.20.0.2:1': {},
'tcp://10.20.0.2:2': {},
'tcp://10.20.0.3:1': {},
'tcp://10.20.0.3:2': {},
'tcp://10.20.0.3:3': {},
'tcp://10.20.0.4:1': {},
'tcp://10.20.0.5:1': {}
}}
allowed_ips = set(['10.20.0.2', '10.20.0.3', '10.20.0.5'])
expected = ['10.20.0.2:1', '10.20.0.2:2', '10.20.0.3:1',
'10.20.0.3:2', '10.20.0.3:3', '10.20.0.5:1']
actual = policy._get_allowed_workers(job_cluster, allowed_ips)
self.assertItemsEqual(expected, actual)
def test_distributed_serialization_serialize_task(self):
task = {
'id': 'task1', 'roles': ['controller'],
'type': 'puppet', 'version': '2.0.0',
'parameters': {
'master_ip': '{MN_IP}',
'host': {'yaql_exp': '$.public_ssl.hostname'},
'attr': {'yaql_exp': '$node.attributes.a_str'}
}
}
formatter_contexts_idx = {
'1': {'MN_IP': '10.0.0.1'},
'2': {}
}
scattered_data = {
'settings_config': settings.config,
'context': self.context
}
serializer = lcm.transaction_serializer
actual = serializer._distributed_serialize_tasks_for_node(
formatter_contexts_idx, [('1', task), ('2', task)], scattered_data)
expected = [
(
(
'1',
{
'id': 'task1',
'type': 'puppet',
'parameters': {
'cwd': '/',
'master_ip': '10.0.0.1',
'host': 'localhost',
'attr': 'text1'
},
'fail_on_error': True
}
),
None
),
(
(
'2',
{
'id': 'task1',
'type': 'puppet',
'parameters': {
'cwd': '/',
'master_ip': '{MN_IP}',
'host': 'localhost',
'attr': 'text2'
},
'fail_on_error': True
}
),
None
)
]
self.assertItemsEqual(expected, actual)
def test_distributed_serialization_serialize_task_failure(self):
task = {
'id': 'task1', 'roles': ['controller'],
'type': 'puppet', 'version': '2.0.0',
'parameters': {
'fake': {'yaql_exp': '$.some.fake_param'}
}
}
formatter_contexts_idx = {'2': {}}
scattered_data = {
'settings_config': settings.config,
'context': self.context
}
serializer = lcm.transaction_serializer
result = serializer._distributed_serialize_tasks_for_node(
formatter_contexts_idx, [('2', task)], scattered_data)
(_, __), err = result[0]
self.assertIsInstance(err, exceptions.KeyError)
class TestConcurrencyPolicy(BaseTestCase):
@mock.patch(
'nailgun.lcm.transaction_serializer.multiprocessing.cpu_count',
return_value=1
)
def test_one_cpu(self, cpu_count):
policy = lcm.transaction_serializer.get_processing_policy(
lcm.TransactionContext({}))
self.assertIsInstance(
policy,
lcm.transaction_serializer.SingleWorkerConcurrencyPolicy
)
self.assertTrue(cpu_count.is_called)
@mock.patch(
'nailgun.lcm.transaction_serializer.multiprocessing.cpu_count',
return_value=0
)
def test_zero_cpu(self, cpu_count):
policy = lcm.transaction_serializer.get_processing_policy(
lcm.TransactionContext({}))
self.assertIsInstance(
policy,
lcm.transaction_serializer.SingleWorkerConcurrencyPolicy
)
self.assertTrue(cpu_count.is_called)
@mock.patch(
'nailgun.lcm.transaction_serializer.multiprocessing.cpu_count',
side_effect=NotImplementedError
)
def test_cpu_count_not_implemented(self, cpu_count):
policy = lcm.transaction_serializer.get_processing_policy(
lcm.TransactionContext({}))
self.assertIsInstance(
policy,
lcm.transaction_serializer.SingleWorkerConcurrencyPolicy
)
self.assertTrue(cpu_count.is_called)
def test_distributed_serialization_enabled_in_cluster(self):
context_data = {'common': {
'serialization_policy': consts.SERIALIZATION_POLICY.distributed
}}
policy = lcm.transaction_serializer.get_processing_policy(
lcm.TransactionContext(context_data))
self.assertIsInstance(
policy,
lcm.transaction_serializer.DistributedProcessingPolicy
)

View File

@ -47,3 +47,5 @@ stevedore>=1.5.0
# See: https://bugs.launchpad.net/fuel/+bug/1519727
setuptools<=18.5
yaql>=1.0.0
# Distributed nodes serialization
distributed==1.16.0