summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2017-03-24 19:51:59 +0000
committerGerrit Code Review <review@openstack.org>2017-03-24 19:51:59 +0000
commit1c3ef48a2eaa0a66e61e30212842230c4e64e8be (patch)
treeb4f113547432fec657b0ebfdb1c2f1d513b70b25
parent054ed4cf991fbd0433c87c497ba8758ecf7d4558 (diff)
parent4fa861fa8a857a50ec5f25073e6e0eb307db7404 (diff)
Merge "Distributed serialization implementation"
-rw-r--r--nailgun/nailgun/consts.py4
-rw-r--r--nailgun/nailgun/fixtures/openstack.yaml49
-rw-r--r--nailgun/nailgun/lcm/task_serializer.py15
-rw-r--r--nailgun/nailgun/lcm/transaction_serializer.py317
-rw-r--r--nailgun/nailgun/orchestrator/deployment_serializers.py1
-rw-r--r--nailgun/nailgun/settings.py22
-rw-r--r--nailgun/nailgun/settings.yaml9
-rw-r--r--nailgun/nailgun/statistics/fuel_statistics/installation_info.py10
-rw-r--r--nailgun/nailgun/test/integration/test_cluster_changes_handler.py3
-rw-r--r--nailgun/nailgun/test/unit/test_lcm_transaction_serializer.py351
-rw-r--r--nailgun/requirements.txt2
11 files changed, 769 insertions, 14 deletions
diff --git a/nailgun/nailgun/consts.py b/nailgun/nailgun/consts.py
index 3d3b1ae..4c790cc 100644
--- a/nailgun/nailgun/consts.py
+++ b/nailgun/nailgun/consts.py
@@ -532,3 +532,7 @@ DPDK_DRIVER_IN_SRIOV_CASE = 'vfio-pci'
532DEFAULT_MTU = 1500 532DEFAULT_MTU = 1500
533 533
534SIZE_OF_VLAN_TAG = 4 534SIZE_OF_VLAN_TAG = 4
535
536SERIALIZATION_POLICY = Enum(
537 'distributed'
538)
diff --git a/nailgun/nailgun/fixtures/openstack.yaml b/nailgun/nailgun/fixtures/openstack.yaml
index 8b4d763..5557710 100644
--- a/nailgun/nailgun/fixtures/openstack.yaml
+++ b/nailgun/nailgun/fixtures/openstack.yaml
@@ -1114,6 +1114,55 @@
1114 group: "security" 1114 group: "security"
1115 weight: 20 1115 weight: 20
1116 type: "radio" 1116 type: "radio"
1117 serialization_policy:
1118 value: "default"
1119 values:
1120 - data: "default"
1121 label: "Default serialization"
1122 description: "Run serialization on the master node only"
1123 - data: "distributed"
1124 label: "Distributed serialization"
1125 description: "Run serialization on the master and environment nodes. Nodes for serialization are selected only form that environment for wich serialization is performing."
1126 label: "Serialization policy"
1127 group: "general"
1128 weight: 30
1129 type: "radio"
1130 ds_use_discover:
1131 group: "general"
1132 label: "Use discovered nodes as workers for serialization"
1133 type: "checkbox"
1134 value: true
1135 weight: 31
1136 restrictions:
1137 - condition: "settings:common.serialization_policy.value != 'distributed'"
1138 action: "hide"
1139 ds_use_provisioned:
1140 group: "general"
1141 label: "Use provisioned nodes as workers for serialization"
1142 type: "checkbox"
1143 value: true
1144 weight: 32
1145 restrictions:
1146 - condition: "settings:common.serialization_policy.value != 'distributed'"
1147 action: "hide"
1148 ds_use_error:
1149 group: "general"
1150 label: "Use nodes in error state as workers for serialization"
1151 type: "checkbox"
1152 value: true
1153 weight: 33
1154 restrictions:
1155 - condition: "settings:common.serialization_policy.value != 'distributed'"
1156 action: "hide"
1157 ds_use_ready:
1158 group: "general"
1159 label: "Use ready nodes as workers for serialization"
1160 type: "checkbox"
1161 value: false
1162 weight: 34
1163 restrictions:
1164 - condition: "settings:common.serialization_policy.value != 'distributed'"
1165 action: "hide"
1117 public_network_assignment: 1166 public_network_assignment:
1118 metadata: 1167 metadata:
1119 weight: 10 1168 weight: 10
diff --git a/nailgun/nailgun/lcm/task_serializer.py b/nailgun/nailgun/lcm/task_serializer.py
index c004115..0d70f36 100644
--- a/nailgun/nailgun/lcm/task_serializer.py
+++ b/nailgun/nailgun/lcm/task_serializer.py
@@ -110,6 +110,8 @@ class Context(object):
110 return evaluate 110 return evaluate
111 111
112 def get_formatter_context(self, node_id): 112 def get_formatter_context(self, node_id):
113 # TODO(akislitsky) remove formatter context from the
114 # tasks serialization workflow
113 data = self._transaction.get_new_data(node_id) 115 data = self._transaction.get_new_data(node_id)
114 return { 116 return {
115 'CLUSTER_ID': data.get('cluster', {}).get('id'), 117 'CLUSTER_ID': data.get('cluster', {}).get('id'),
@@ -147,9 +149,14 @@ class DeploymentTaskSerializer(object):
147 :return: the result 149 :return: the result
148 """ 150 """
149 151
150 def serialize(self, node_id): 152 def serialize(self, node_id, formatter_context=None):
151 """Serialize task in expected by orchestrator format. 153 """Serialize task in expected by orchestrator format
152 154
155 If serialization is performed on the remote worker
156 we should pass formatter_context parameter with values
157 from the master node settings
158
159 :param formatter_context: formatter context
153 :param node_id: the target node_id 160 :param node_id: the target node_id
154 """ 161 """
155 162
@@ -157,10 +164,12 @@ class DeploymentTaskSerializer(object):
157 "serialize task %s for node %s", 164 "serialize task %s for node %s",
158 self.task_template['id'], node_id 165 self.task_template['id'], node_id
159 ) 166 )
167 formatter_context = formatter_context \
168 or self.context.get_formatter_context(node_id)
160 task = utils.traverse( 169 task = utils.traverse(
161 self.task_template, 170 self.task_template,
162 utils.text_format_safe, 171 utils.text_format_safe,
163 self.context.get_formatter_context(node_id), 172 formatter_context,
164 { 173 {
165 'yaql_exp': self.context.get_yaql_interpreter( 174 'yaql_exp': self.context.get_yaql_interpreter(
166 node_id, self.task_template['id']) 175 node_id, self.task_template['id'])
diff --git a/nailgun/nailgun/lcm/transaction_serializer.py b/nailgun/nailgun/lcm/transaction_serializer.py
index c02146b..32953c1 100644
--- a/nailgun/nailgun/lcm/transaction_serializer.py
+++ b/nailgun/nailgun/lcm/transaction_serializer.py
@@ -14,13 +14,21 @@
14# License for the specific language governing permissions and limitations 14# License for the specific language governing permissions and limitations
15# under the License. 15# under the License.
16 16
17from distutils.version import StrictVersion 17import datetime
18import multiprocessing 18import multiprocessing
19import os
20from Queue import Queue
21import shutil
22import tempfile
19 23
24import distributed
25from distutils.version import StrictVersion
20import six 26import six
27import toolz
21 28
22from nailgun import consts 29from nailgun import consts
23from nailgun import errors 30from nailgun import errors
31from nailgun.lcm.task_serializer import Context
24from nailgun.lcm.task_serializer import TasksSerializersFactory 32from nailgun.lcm.task_serializer import TasksSerializersFactory
25from nailgun.logger import logger 33from nailgun.logger import logger
26from nailgun.settings import settings 34from nailgun.settings import settings
@@ -128,7 +136,308 @@ class MultiProcessingConcurrencyPolicy(object):
128 pool.join() 136 pool.join()
129 137
130 138
131def get_concurrency_policy(): 139def _distributed_serialize_tasks_for_node(formatter_contexts_idx,
140 node_and_tasks, scattered_data):
141 """Remote serialization call for DistributedProcessingPolicy
142
143 Code of the function is copied to the workers and executed there, thus
144 we are including all required imports inside the function.
145
146 :param formatter_contexts_idx: dict of formatter contexts with node_id
147 value as key
148 :param node_and_tasks: list of node_id, task_data tuples
149 :param scattered_data: feature object, that points to data copied to
150 workers
151 :return: [(node_id, serialized), error]
152 """
153
154 try:
155 factory = TasksSerializersFactory(scattered_data['context'])
156
157 # Restoring settings
158 settings.config = scattered_data['settings_config']
159 for k in formatter_contexts_idx:
160 formatter_contexts_idx[k]['SETTINGS'] = settings
161
162 except Exception as e:
163 logger.exception("Distributed serialization failed")
164 return [((None, None), e)]
165
166 result = []
167
168 for node_and_task in node_and_tasks:
169
170 node_id = None
171 try:
172 node_id, task = node_and_task
173
174 logger.debug("Starting distributed node %s task %s serialization",
175 node_id, task['id'])
176
177 formatter_context = formatter_contexts_idx[node_id]
178
179 serializer = factory.create_serializer(task)
180 serialized = serializer.serialize(
181 node_id, formatter_context=formatter_context)
182
183 logger.debug("Distributed node %s task %s serialization "
184 "result: %s", node_id, task['id'], serialized)
185
186 result.append(((node_id, serialized), None))
187 except Exception as e:
188 logger.exception("Distributed serialization failed")
189 result.append(((node_id, None), e))
190 break
191
192 logger.debug("Processed tasks count: %s", len(result))
193 return result
194
195
196class DistributedProcessingPolicy(object):
197
198 def __init__(self):
199 self.sent_jobs = Queue()
200 self.sent_jobs_count = 0
201
202 def _consume_jobs(self, chunk_size=None):
203 """Consumes jobs
204
205 If chunk_size is set function consumes specified number of
206 Finished tasks or less if sent_jobs_ids queue became empty.
207 If chunk_size is None function consumes jobs until
208 sent_jobs_ids queue became empty.
209 Jobs with statuses Cancelled, Abandoned, Terminated will be
210 resent and their ids added to sent_jobs_ids queue
211
212 :param chunk_size: size of consuming chunk
213 :return: generator on job results
214 """
215 logger.debug("Consuming jobs started")
216
217 jobs_to_consume = []
218 while not self.sent_jobs.empty():
219 job = self.sent_jobs.get()
220 jobs_to_consume.append(job)
221
222 if chunk_size is not None:
223 chunk_size -= 1
224 if chunk_size <= 0:
225 break
226
227 for ready_job in distributed.as_completed(jobs_to_consume):
228 results = ready_job.result()
229 self.sent_jobs_count -= 1
230
231 for result in results:
232 (node_id, serialized), exc = result
233 logger.debug("Got ready task for node %s, serialized: %s, "
234 "error: %s", node_id, serialized, exc)
235 if exc is not None:
236 raise exc
237 yield node_id, serialized
238
239 logger.debug("Consuming jobs finished")
240
241 def _get_formatter_context(self, task_context, formatter_contexts_idx,
242 node_id):
243 try:
244 return formatter_contexts_idx[node_id]
245 except KeyError:
246 pass
247
248 logger.debug("Calculating formatter context for node %s", node_id)
249 formatter_context = task_context.get_formatter_context(
250 node_id)
251 # Settings file is already sent to the workers
252 formatter_context.pop('SETTINGS', None)
253 formatter_contexts_idx[node_id] = formatter_context
254
255 return formatter_context
256
257 def _upload_nailgun_code(self, job_cluster):
258 """Creates zip of current nailgun code and uploads it to workers
259
260 TODO(akislitsky): add workers scope when it will be implemented
261 in the distributed library
262
263 :param job_cluster: distributed.Client
264 """
265 logger.debug("Compressing nailgun code")
266 file_dir = os.path.dirname(__file__)
267 nailgun_root_dir = os.path.realpath(os.path.join(file_dir, '..', '..'))
268 archive = os.path.join(tempfile.gettempdir(), 'nailgun')
269 result = shutil.make_archive(archive, 'zip', nailgun_root_dir,
270 'nailgun')
271 logger.debug("Nailgun code saved to: %s", result)
272
273 logger.debug("Uploading nailgun archive %s to workers", result)
274 job_cluster.upload_file(result)
275
276 def _scatter_data(self, job_cluster, context, workers):
277 logger.debug("Scattering data to workers started")
278 shared_data = {'context': context, 'settings_config': settings.config}
279 scattered = job_cluster.scatter(shared_data, broadcast=True,
280 workers=workers)
281 # Waiting data is scattered to workers
282 distributed.wait(scattered.values())
283 logger.debug("Scattering data to workers finished")
284
285 return scattered
286
287 def _get_allowed_nodes_statuses(self, context):
288 """Extracts node statuses that allows distributed serialization"""
289 common = context.new.get('common', {})
290 cluster = common.get('cluster', {})
291 logger.debug("Getting allowed nodes statuses to use as serialization "
292 "workers for cluster %s", cluster.get('id'))
293 check_fields = {
294 'ds_use_ready': consts.NODE_STATUSES.ready,
295 'ds_use_provisioned': consts.NODE_STATUSES.provisioned,
296 'ds_use_discover': consts.NODE_STATUSES.discover,
297 'ds_use_error': consts.NODE_STATUSES.error
298 }
299 statuses = set()
300 for field, node_status in check_fields.items():
301 if common.get(field):
302 statuses.add(node_status)
303
304 logger.debug("Allowed nodes statuses to use as serialization workers "
305 "for cluster %s are: %s", cluster.get('id'), statuses)
306 return statuses
307
308 def _get_allowed_nodes_ips(self, context):
309 """Filters online nodes from cluster by their status
310
311 In the cluster settings we select nodes statuses allowed for
312 using in the distributed serialization. Accordingly to selected
313 statuses nodes are going to be filtered.
314
315 :param context: TransactionContext
316 :return: set of allowed nodes ips
317 """
318 ips = set()
319 allowed_statuses = self._get_allowed_nodes_statuses(context)
320 for node in six.itervalues(context.new.get('nodes', {})):
321 if node.get('status') in allowed_statuses:
322 ips.add(node.get('ip'))
323 ips.add(settings.MASTER_IP)
324 return ips
325
326 def _get_allowed_workers(self, job_cluster, allowed_ips):
327 """Calculates workers addresses for distributed serialization
328
329 Only workers that placed on the allowed nodes must be selected
330 for the serialization.
331
332 :param job_cluster: distributed.Client
333 :param allowed_ips: allowed for serialization nodes ips
334 :return: list of workers addresses in format 'ip:port'
335 """
336 logger.debug("Getting allowed workers")
337 workers = {}
338
339 # Worker has address like tcp://ip:port
340 info = job_cluster.scheduler_info()
341 for worker_addr in six.iterkeys(info['workers']):
342 ip_port = worker_addr.split('//')[1]
343 ip = ip_port.split(':')[0]
344 if ip not in allowed_ips:
345 continue
346 try:
347 pool = workers[ip]
348 pool.add(ip_port)
349 except KeyError:
350 workers[ip] = set([ip_port])
351
352 return list(toolz.itertoolz.concat(six.itervalues(workers)))
353
354 def execute(self, context, _, tasks):
355 """Executes task serialization on distributed nodes
356
357 :param context: the transaction context
358 :param _: serializers factory
359 :param tasks: the tasks to serialize
360 :return sequence of serialized tasks
361 """
362 logger.debug("Performing distributed tasks processing")
363 sched_address = '{0}:{1}'.format(settings.MASTER_IP,
364 settings.LCM_DS_JOB_SHEDULER_PORT)
365 job_cluster = distributed.Client(sched_address)
366
367 allowed_ips = self._get_allowed_nodes_ips(context)
368 workers = self._get_allowed_workers(job_cluster, allowed_ips)
369 logger.debug("Allowed workers list for serialization: %s", workers)
370 workers_ips = set([ip_port.split(':')[0] for ip_port in workers])
371 logger.debug("Allowed workers ips list for serialization: %s",
372 workers_ips)
373
374 task_context = Context(context)
375 formatter_contexts_idx = {}
376 workers_num = len(workers)
377 max_jobs_in_queue = workers_num * settings.LCM_DS_NODE_LOAD_COEFF
378 logger.debug("Max jobs allowed in queue: %s", max_jobs_in_queue)
379
380 start = datetime.datetime.utcnow()
381 tasks_count = 0
382
383 try:
384 self._upload_nailgun_code(job_cluster)
385 scattered = self._scatter_data(job_cluster, context, workers)
386
387 for tasks_chunk in toolz.partition_all(
388 settings.LCM_DS_TASKS_PER_JOB, tasks):
389
390 formatter_contexts_for_tasks = {}
391
392 # Collecting required contexts for tasks
393 for task in tasks_chunk:
394 node_id, task_data = task
395 formatter_context = self._get_formatter_context(
396 task_context, formatter_contexts_idx, node_id)
397 if node_id not in formatter_contexts_for_tasks:
398 formatter_contexts_for_tasks[node_id] = \
399 formatter_context
400
401 logger.debug("Submitting job for tasks chunk: %s", tasks_chunk)
402 job = job_cluster.submit(
403 _distributed_serialize_tasks_for_node,
404 formatter_contexts_for_tasks,
405 tasks_chunk,
406 scattered,
407 workers=workers_ips
408 )
409
410 self.sent_jobs.put(job)
411 self.sent_jobs_count += 1
412
413 # We are limit the max number of tasks by the number of nodes
414 # which are used in the serialization
415 if self.sent_jobs_count >= max_jobs_in_queue:
416 for result in self._consume_jobs(chunk_size=workers_num):
417 tasks_count += 1
418 yield result
419
420 # We have no tasks any more but have unconsumed jobs
421 for result in self._consume_jobs():
422 tasks_count += 1
423 yield result
424 finally:
425 end = datetime.datetime.utcnow()
426 logger.debug("Distributed tasks processing finished. "
427 "Total time: %s. Tasks processed: %s",
428 end - start, tasks_count)
429 job_cluster.shutdown()
430
431
432def is_distributed_processing_enabled(context):
433 common = context.new.get('common', {})
434 return common.get('serialization_policy') == \
435 consts.SERIALIZATION_POLICY.distributed
436
437
438def get_processing_policy(context):
439 if is_distributed_processing_enabled(context):
440 return DistributedProcessingPolicy()
132 cpu_num = settings.LCM_SERIALIZERS_CONCURRENCY_FACTOR 441 cpu_num = settings.LCM_SERIALIZERS_CONCURRENCY_FACTOR
133 if not cpu_num: 442 if not cpu_num:
134 try: 443 try:
@@ -162,7 +471,7 @@ class TransactionSerializer(object):
162 # ids of nodes in this group and how many nodes in this group can fail 471 # ids of nodes in this group and how many nodes in this group can fail
163 # and deployment will not be interrupted 472 # and deployment will not be interrupted
164 self.fault_tolerance_groups = [] 473 self.fault_tolerance_groups = []
165 self.concurrency_policy = get_concurrency_policy() 474 self.processing_policy = get_processing_policy(context)
166 475
167 @classmethod 476 @classmethod
168 def serialize(cls, context, tasks, resolver): 477 def serialize(cls, context, tasks, resolver):
@@ -216,7 +525,7 @@ class TransactionSerializer(object):
216 :param tasks: the deployment tasks 525 :param tasks: the deployment tasks
217 :return the mapping tasks per node 526 :return the mapping tasks per node
218 """ 527 """
219 serialized = self.concurrency_policy.execute( 528 serialized = self.processing_policy.execute(
220 self.context, 529 self.context,
221 self.serializer_factory_class, 530 self.serializer_factory_class,
222 self.expand_tasks(tasks) 531 self.expand_tasks(tasks)
diff --git a/nailgun/nailgun/orchestrator/deployment_serializers.py b/nailgun/nailgun/orchestrator/deployment_serializers.py
index d9bf3b6..eb40f5a 100644
--- a/nailgun/nailgun/orchestrator/deployment_serializers.py
+++ b/nailgun/nailgun/orchestrator/deployment_serializers.py
@@ -221,6 +221,7 @@ class DeploymentMultinodeSerializer(object):
221 'role': role, 221 'role': role,
222 'vms_conf': node.vms_conf, 222 'vms_conf': node.vms_conf,
223 'fail_if_error': role in self.critical_roles, 223 'fail_if_error': role in self.critical_roles,
224 'ip': node.ip,
224 # TODO(eli): need to remove, requried for the fake thread only 225 # TODO(eli): need to remove, requried for the fake thread only
225 'online': node.online, 226 'online': node.online,
226 } 227 }
diff --git a/nailgun/nailgun/settings.py b/nailgun/nailgun/settings.py
index eee08c9..1f9222b 100644
--- a/nailgun/nailgun/settings.py
+++ b/nailgun/nailgun/settings.py
@@ -38,7 +38,21 @@ class NailgunSettings(object):
38 if test_config: 38 if test_config:
39 settings_files.append(test_config) 39 settings_files.append(test_config)
40 40
41 self.config = {} 41 # If settings.yaml doesn't exist we should have default
42 # config structure. Nailgun without settings is used
43 # when we distribute source code to the workers for
44 # distributed serialization
45 self.config = {
46 'VERSION': {},
47 'DATABASE': {
48 'engine': 'postgresql',
49 'name': '',
50 'host': '',
51 'port': '0',
52 'user': '',
53 'passwd': ''
54 }
55 }
42 for sf in settings_files: 56 for sf in settings_files:
43 try: 57 try:
44 logger.debug("Trying to read config file %s" % sf) 58 logger.debug("Trying to read config file %s" % sf)
@@ -47,9 +61,9 @@ class NailgunSettings(object):
47 logger.error("Error while reading config file %s: %s" % 61 logger.error("Error while reading config file %s: %s" %
48 (sf, str(e))) 62 (sf, str(e)))
49 63
50 self.config['VERSION']['api'] = self.config['API'] 64 self.config['VERSION']['api'] = self.config.get('API')
51 self.config['VERSION']['feature_groups'] = \ 65 self.config['VERSION']['feature_groups'] = \
52 self.config['FEATURE_GROUPS'] 66 self.config.get('FEATURE_GROUPS')
53 67
54 fuel_release = self.get_file_content(consts.FUEL_RELEASE_FILE) 68 fuel_release = self.get_file_content(consts.FUEL_RELEASE_FILE)
55 if fuel_release: 69 if fuel_release:
@@ -61,7 +75,7 @@ class NailgunSettings(object):
61 self.config['VERSION']['openstack_version'] = \ 75 self.config['VERSION']['openstack_version'] = \
62 fuel_openstack_version 76 fuel_openstack_version
63 77
64 if int(self.config.get("DEVELOPMENT")): 78 if int(self.config.get("DEVELOPMENT", 0)):
65 logger.info("DEVELOPMENT MODE ON:") 79 logger.info("DEVELOPMENT MODE ON:")
66 here = os.path.abspath( 80 here = os.path.abspath(
67 os.path.join(os.path.dirname(__file__), '..') 81 os.path.join(os.path.dirname(__file__), '..')
diff --git a/nailgun/nailgun/settings.yaml b/nailgun/nailgun/settings.yaml
index 92b3518..c2faaa8 100644
--- a/nailgun/nailgun/settings.yaml
+++ b/nailgun/nailgun/settings.yaml
@@ -177,6 +177,15 @@ YAQL_MEMORY_QUOTA: 104857600
177 177
178LCM_CHECK_TASK_VERSION: False 178LCM_CHECK_TASK_VERSION: False
179 179
180# Coefficient for calculation max jobs queue length. If jobs number reaches the
181# len(nodes) * load_coef we stop generate and start consume of jobs.
182LCM_DS_NODE_LOAD_COEFF: 2
183# Port of dask-scheduler on the master node
184LCM_DS_JOB_SHEDULER_PORT: 8002
185# Size of tasks chunk sending to the distributed worker
186LCM_DS_TASKS_PER_JOB: 100
187
188
180DPDK_MAX_CPUS_PER_NIC: 4 189DPDK_MAX_CPUS_PER_NIC: 4
181 190
182TRUNCATE_LOG_ENTRIES: 100 191TRUNCATE_LOG_ENTRIES: 100
diff --git a/nailgun/nailgun/statistics/fuel_statistics/installation_info.py b/nailgun/nailgun/statistics/fuel_statistics/installation_info.py
index 482d8c5..d024cdd 100644
--- a/nailgun/nailgun/statistics/fuel_statistics/installation_info.py
+++ b/nailgun/nailgun/statistics/fuel_statistics/installation_info.py
@@ -56,6 +56,16 @@ class InstallationInfo(object):
56 'propagate_task_deploy', None), 56 'propagate_task_deploy', None),
57 WhiteListRule(('common', 'security_groups', 'value'), 57 WhiteListRule(('common', 'security_groups', 'value'),
58 'security_groups', None), 58 'security_groups', None),
59 WhiteListRule(('common', 'serialization_policy', 'value'),
60 'serialization_policy', None),
61 WhiteListRule(('common', 'ds_use_discover', 'value'),
62 'ds_use_discover', None),
63 WhiteListRule(('common', 'ds_use_provisioned', 'value'),
64 'ds_use_provisioned', None),
65 WhiteListRule(('common', 'ds_use_ready', 'value'),
66 'ds_use_ready', None),
67 WhiteListRule(('common', 'ds_use_error', 'value'),
68 'ds_use_error', None),
59 WhiteListRule(('corosync', 'verified', 'value'), 69 WhiteListRule(('corosync', 'verified', 'value'),
60 'corosync_verified', None), 70 'corosync_verified', None),
61 71
diff --git a/nailgun/nailgun/test/integration/test_cluster_changes_handler.py b/nailgun/nailgun/test/integration/test_cluster_changes_handler.py
index a8661ab..b8f4e98 100644
--- a/nailgun/nailgun/test/integration/test_cluster_changes_handler.py
+++ b/nailgun/nailgun/test/integration/test_cluster_changes_handler.py
@@ -187,6 +187,7 @@ class TestHandlers(BaseIntegrationTest):
187 'fail_if_error': is_critical, 187 'fail_if_error': is_critical,
188 'vms_conf': [], 188 'vms_conf': [],
189 'fqdn': 'node-%d.%s' % (node.id, settings.DNS_DOMAIN), 189 'fqdn': 'node-%d.%s' % (node.id, settings.DNS_DOMAIN),
190 'ip': node.ip,
190 191
191 'network_data': { 192 'network_data': {
192 'eth1': { 193 'eth1': {
@@ -603,6 +604,7 @@ class TestHandlers(BaseIntegrationTest):
603 'online': node.online, 604 'online': node.online,
604 'fail_if_error': is_critical, 605 'fail_if_error': is_critical,
605 'fqdn': 'node-%d.%s' % (node.id, settings.DNS_DOMAIN), 606 'fqdn': 'node-%d.%s' % (node.id, settings.DNS_DOMAIN),
607 'ip': node.ip,
606 'priority': 100, 608 'priority': 100,
607 'vms_conf': [], 609 'vms_conf': [],
608 'network_scheme': { 610 'network_scheme': {
@@ -1096,6 +1098,7 @@ class TestHandlers(BaseIntegrationTest):
1096 'fail_if_error': is_critical, 1098 'fail_if_error': is_critical,
1097 'fqdn': 'node-%d.%s' % (node.id, settings.DNS_DOMAIN), 1099 'fqdn': 'node-%d.%s' % (node.id, settings.DNS_DOMAIN),
1098 'priority': 100, 1100 'priority': 100,
1101 'ip': node.ip,
1099 'vms_conf': [], 1102 'vms_conf': [],
1100 1103
1101 'network_scheme': { 1104 'network_scheme': {
diff --git a/nailgun/nailgun/test/unit/test_lcm_transaction_serializer.py b/nailgun/nailgun/test/unit/test_lcm_transaction_serializer.py
index 6450e19..9751d55 100644
--- a/nailgun/nailgun/test/unit/test_lcm_transaction_serializer.py
+++ b/nailgun/nailgun/test/unit/test_lcm_transaction_serializer.py
@@ -14,20 +14,24 @@
14# License for the specific language governing permissions and limitations 14# License for the specific language governing permissions and limitations
15# under the License. 15# under the License.
16 16
17import copy
18import exceptions
17import mock 19import mock
18import multiprocessing.dummy 20import multiprocessing.dummy
19 21
20from nailgun import consts 22from nailgun import consts
21from nailgun import errors 23from nailgun import errors
22from nailgun import lcm 24from nailgun import lcm
25from nailgun.lcm import TransactionContext
26from nailgun.settings import settings
27from nailgun.test.base import BaseTestCase
23from nailgun.utils.resolvers import TagResolver 28from nailgun.utils.resolvers import TagResolver
24 29
25from nailgun.test.base import BaseUnitTest
26 30
27 31class TestTransactionSerializer(BaseTestCase):
28class TestTransactionSerializer(BaseUnitTest):
29 @classmethod 32 @classmethod
30 def setUpClass(cls): 33 def setUpClass(cls):
34 super(TestTransactionSerializer, cls).setUpClass()
31 cls.tasks = [ 35 cls.tasks = [
32 { 36 {
33 'id': 'task1', 'roles': ['controller'], 37 'id': 'task1', 'roles': ['controller'],
@@ -462,3 +466,344 @@ class TestTransactionSerializer(BaseUnitTest):
462 9, 466 9,
463 lcm.TransactionSerializer.calculate_fault_tolerance('-1 ', 10) 467 lcm.TransactionSerializer.calculate_fault_tolerance('-1 ', 10)
464 ) 468 )
469
470 def _get_context_for_distributed_serialization(self):
471 new = copy.deepcopy(self.context.new)
472 new['common']['serialization_policy'] = \
473 consts.SERIALIZATION_POLICY.distributed
474 return TransactionContext(new)
475
476 @mock.patch('nailgun.lcm.transaction_serializer.distributed.wait')
477 @mock.patch('nailgun.lcm.transaction_serializer.distributed.as_completed')
478 def test_distributed_serialization(self, _, as_completed):
479 context = self._get_context_for_distributed_serialization()
480
481 with mock.patch(
482 'nailgun.lcm.transaction_serializer.distributed.Client'
483 ) as job_cluster:
484 job = mock.Mock()
485 job.result.return_value = [
486 (('1', {"id": "task1", "type": "skipped"}), None)
487 ]
488
489 submit = mock.Mock()
490 submit.return_value = job
491
492 as_completed.return_value = [job]
493
494 job_cluster.return_value.submit = submit
495 job_cluster.return_value.scheduler_info.return_value = \
496 {'workers': {'tcp://worker': {}}}
497
498 lcm.TransactionSerializer.serialize(
499 context, self.tasks, self.resolver)
500 self.assertTrue(submit.called)
501 # 4 controller task + 1 compute + 1 cinder
502 self.assertTrue(6, submit.call_count)
503
504 @mock.patch('nailgun.lcm.transaction_serializer.distributed.wait')
505 @mock.patch('nailgun.lcm.transaction_serializer.distributed.as_completed')
506 @mock.patch('nailgun.lcm.transaction_serializer.'
507 'DistributedProcessingPolicy._get_formatter_context')
508 def test_distributed_serialization_workers_scope(self, formatter_context,
509 as_completed, _):
510 context = self._get_context_for_distributed_serialization()
511
512 node_id = '1'
513 task = {
514 'id': 'task1', 'roles': ['controller'],
515 'type': 'puppet', 'version': '2.0.0'
516 }
517
518 with mock.patch(
519 'nailgun.lcm.transaction_serializer.distributed.Client'
520 ) as job_cluster:
521
522 # Mocking job processing
523 job = mock.Mock()
524 job.result.return_value = [((node_id, task), None)]
525
526 submit = mock.Mock()
527 submit.return_value = job
528
529 as_completed.return_value = [job]
530
531 scatter = mock.Mock()
532 job_cluster.return_value.scatter = scatter
533
534 job_cluster.return_value.scatter.return_value = {}
535 job_cluster.return_value.submit = submit
536
537 formatter_context.return_value = {node_id: {}}
538
539 # Configuring available workers
540 job_cluster.return_value.scheduler_info.return_value = \
541 {
542 'workers': {
543 'tcp://{0}'.format(settings.MASTER_IP): {},
544 'tcp://192.168.0.1:33334': {},
545 'tcp://127.0.0.2:33335': {},
546 }
547 }
548
549 # Performing serialization
550 lcm.TransactionSerializer.serialize(
551 context, [task], self.resolver
552 )
553
554 # Checking data is scattered only to expected workers
555 scatter.assert_called_once()
556 scatter.assert_called_with(
557 {'context': context, 'settings_config': settings.config},
558 broadcast=True,
559 workers=[settings.MASTER_IP]
560 )
561
562 # Checking submit job only to expected workers
563 submit.assert_called_once()
564 serializer = lcm.transaction_serializer
565 submit.assert_called_with(
566 serializer._distributed_serialize_tasks_for_node,
567 {node_id: formatter_context()},
568 ((node_id, task),),
569 job_cluster().scatter(),
570 workers=set([settings.MASTER_IP])
571 )
572
573 def test_distributed_serialization_get_allowed_nodes_ips(self):
574 policy = lcm.transaction_serializer.DistributedProcessingPolicy()
575
576 context_data = {
577 'common': {
578 'serialization_policy':
579 consts.SERIALIZATION_POLICY.distributed,
580 'ds_use_error': True,
581 'ds_use_provisioned': True,
582 'ds_use_discover': True,
583 'ds_use_ready': False
584 },
585 'nodes': {
586 '1': {'status': consts.NODE_STATUSES.error,
587 'ip': '10.20.0.3'},
588 '2': {'status': consts.NODE_STATUSES.provisioned,
589 'ip': '10.20.0.4'},
590 '3': {'status': consts.NODE_STATUSES.discover,
591 'ip': '10.20.0.5'},
592 '4': {'status': consts.NODE_STATUSES.ready,
593 'ip': '10.20.0.6'},
594 }
595 }
596
597 actual = policy._get_allowed_nodes_ips(
598 TransactionContext(context_data))
599 self.assertItemsEqual(
600 [settings.MASTER_IP, '10.20.0.3', '10.20.0.4', '10.20.0.5'],
601 actual
602 )
603
604 def test_distributed_serialization_get_allowed_nodes_statuses(self):
605 policy = lcm.transaction_serializer.DistributedProcessingPolicy()
606 context_data = {}
607 actual = policy._get_allowed_nodes_statuses(
608 TransactionContext(context_data))
609 self.assertItemsEqual([], actual)
610
611 context_data['common'] = {
612 'ds_use_discover': False,
613 'ds_use_provisioned': False,
614 'ds_use_error': False,
615 'ds_use_ready': False
616 }
617 actual = policy._get_allowed_nodes_statuses(
618 TransactionContext(context_data))
619 self.assertItemsEqual([], actual)
620
621 context_data['common']['ds_use_discover'] = True
622 actual = policy._get_allowed_nodes_statuses(
623 TransactionContext(context_data))
624 expected = [consts.NODE_STATUSES.discover]
625 self.assertItemsEqual(expected, actual)
626
627 context_data['common']['ds_use_provisioned'] = True
628 actual = policy._get_allowed_nodes_statuses(
629 TransactionContext(context_data))
630 expected = [consts.NODE_STATUSES.discover,
631 consts.NODE_STATUSES.provisioned]
632 self.assertItemsEqual(expected, actual)
633
634 context_data['common']['ds_use_error'] = True
635 actual = policy._get_allowed_nodes_statuses(
636 TransactionContext(context_data))
637 expected = [consts.NODE_STATUSES.discover,
638 consts.NODE_STATUSES.provisioned,
639 consts.NODE_STATUSES.error]
640 self.assertItemsEqual(expected, actual)
641
642 context_data['common']['ds_use_ready'] = True
643 actual = policy._get_allowed_nodes_statuses(
644 TransactionContext(context_data))
645 expected = [consts.NODE_STATUSES.discover,
646 consts.NODE_STATUSES.provisioned,
647 consts.NODE_STATUSES.error,
648 consts.NODE_STATUSES.ready]
649 self.assertItemsEqual(expected, actual)
650
651 def test_distributed_serialization_get_allowed_workers(self):
652 policy = lcm.transaction_serializer.DistributedProcessingPolicy()
653
654 with mock.patch(
655 'nailgun.lcm.transaction_serializer.distributed.Client'
656 ) as job_cluster:
657 job_cluster.scheduler_info.return_value = \
658 {'workers': {
659 'tcp://10.20.0.2:1': {},
660 'tcp://10.20.0.2:2': {},
661 'tcp://10.20.0.3:1': {},
662 'tcp://10.20.0.3:2': {},
663 'tcp://10.20.0.3:3': {},
664 'tcp://10.20.0.4:1': {},
665 'tcp://10.20.0.5:1': {}
666 }}
667 allowed_ips = set(['10.20.0.2', '10.20.0.3', '10.20.0.5'])
668
669 expected = ['10.20.0.2:1', '10.20.0.2:2', '10.20.0.3:1',
670 '10.20.0.3:2', '10.20.0.3:3', '10.20.0.5:1']
671 actual = policy._get_allowed_workers(job_cluster, allowed_ips)
672 self.assertItemsEqual(expected, actual)
673
674 def test_distributed_serialization_serialize_task(self):
675 task = {
676 'id': 'task1', 'roles': ['controller'],
677 'type': 'puppet', 'version': '2.0.0',
678 'parameters': {
679 'master_ip': '{MN_IP}',
680 'host': {'yaql_exp': '$.public_ssl.hostname'},
681 'attr': {'yaql_exp': '$node.attributes.a_str'}
682 }
683 }
684
685 formatter_contexts_idx = {
686 '1': {'MN_IP': '10.0.0.1'},
687 '2': {}
688 }
689 scattered_data = {
690 'settings_config': settings.config,
691 'context': self.context
692 }
693
694 serializer = lcm.transaction_serializer
695 actual = serializer._distributed_serialize_tasks_for_node(
696 formatter_contexts_idx, [('1', task), ('2', task)], scattered_data)
697
698 expected = [
699 (
700 (
701 '1',
702 {
703 'id': 'task1',
704 'type': 'puppet',
705 'parameters': {
706 'cwd': '/',
707 'master_ip': '10.0.0.1',
708 'host': 'localhost',
709 'attr': 'text1'
710 },
711 'fail_on_error': True
712 }
713 ),
714 None
715 ),
716 (
717 (
718 '2',
719 {
720 'id': 'task1',
721 'type': 'puppet',
722 'parameters': {
723 'cwd': '/',
724 'master_ip': '{MN_IP}',
725 'host': 'localhost',
726 'attr': 'text2'
727 },
728 'fail_on_error': True
729 }
730 ),
731 None
732 )
733 ]
734
735 self.assertItemsEqual(expected, actual)
736
737 def test_distributed_serialization_serialize_task_failure(self):
738 task = {
739 'id': 'task1', 'roles': ['controller'],
740 'type': 'puppet', 'version': '2.0.0',
741 'parameters': {
742 'fake': {'yaql_exp': '$.some.fake_param'}
743 }
744 }
745
746 formatter_contexts_idx = {'2': {}}
747 scattered_data = {
748 'settings_config': settings.config,
749 'context': self.context
750 }
751
752 serializer = lcm.transaction_serializer
753 result = serializer._distributed_serialize_tasks_for_node(
754 formatter_contexts_idx, [('2', task)], scattered_data)
755 (_, __), err = result[0]
756 self.assertIsInstance(err, exceptions.KeyError)
757
758
759class TestConcurrencyPolicy(BaseTestCase):
760
761 @mock.patch(
762 'nailgun.lcm.transaction_serializer.multiprocessing.cpu_count',
763 return_value=1
764 )
765 def test_one_cpu(self, cpu_count):
766 policy = lcm.transaction_serializer.get_processing_policy(
767 lcm.TransactionContext({}))
768 self.assertIsInstance(
769 policy,
770 lcm.transaction_serializer.SingleWorkerConcurrencyPolicy
771 )
772 self.assertTrue(cpu_count.is_called)
773
774 @mock.patch(
775 'nailgun.lcm.transaction_serializer.multiprocessing.cpu_count',
776 return_value=0
777 )
778 def test_zero_cpu(self, cpu_count):
779 policy = lcm.transaction_serializer.get_processing_policy(
780 lcm.TransactionContext({}))
781 self.assertIsInstance(
782 policy,
783 lcm.transaction_serializer.SingleWorkerConcurrencyPolicy
784 )
785 self.assertTrue(cpu_count.is_called)
786
787 @mock.patch(
788 'nailgun.lcm.transaction_serializer.multiprocessing.cpu_count',
789 side_effect=NotImplementedError
790 )
791 def test_cpu_count_not_implemented(self, cpu_count):
792 policy = lcm.transaction_serializer.get_processing_policy(
793 lcm.TransactionContext({}))
794 self.assertIsInstance(
795 policy,
796 lcm.transaction_serializer.SingleWorkerConcurrencyPolicy
797 )
798 self.assertTrue(cpu_count.is_called)
799
800 def test_distributed_serialization_enabled_in_cluster(self):
801 context_data = {'common': {
802 'serialization_policy': consts.SERIALIZATION_POLICY.distributed
803 }}
804 policy = lcm.transaction_serializer.get_processing_policy(
805 lcm.TransactionContext(context_data))
806 self.assertIsInstance(
807 policy,
808 lcm.transaction_serializer.DistributedProcessingPolicy
809 )
diff --git a/nailgun/requirements.txt b/nailgun/requirements.txt
index 96bed25..c702e8a 100644
--- a/nailgun/requirements.txt
+++ b/nailgun/requirements.txt
@@ -47,3 +47,5 @@ stevedore>=1.5.0
47# See: https://bugs.launchpad.net/fuel/+bug/1519727 47# See: https://bugs.launchpad.net/fuel/+bug/1519727
48setuptools<=18.5 48setuptools<=18.5
49yaql>=1.0.0 49yaql>=1.0.0
50# Distributed nodes serialization
51distributed==1.16.0