diff options
author | Jenkins <jenkins@review.openstack.org> | 2017-03-24 19:51:59 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2017-03-24 19:51:59 +0000 |
commit | 1c3ef48a2eaa0a66e61e30212842230c4e64e8be (patch) | |
tree | b4f113547432fec657b0ebfdb1c2f1d513b70b25 | |
parent | 054ed4cf991fbd0433c87c497ba8758ecf7d4558 (diff) | |
parent | 4fa861fa8a857a50ec5f25073e6e0eb307db7404 (diff) |
Merge "Distributed serialization implementation"
-rw-r--r-- | nailgun/nailgun/consts.py | 4 | ||||
-rw-r--r-- | nailgun/nailgun/fixtures/openstack.yaml | 49 | ||||
-rw-r--r-- | nailgun/nailgun/lcm/task_serializer.py | 15 | ||||
-rw-r--r-- | nailgun/nailgun/lcm/transaction_serializer.py | 317 | ||||
-rw-r--r-- | nailgun/nailgun/orchestrator/deployment_serializers.py | 1 | ||||
-rw-r--r-- | nailgun/nailgun/settings.py | 22 | ||||
-rw-r--r-- | nailgun/nailgun/settings.yaml | 9 | ||||
-rw-r--r-- | nailgun/nailgun/statistics/fuel_statistics/installation_info.py | 10 | ||||
-rw-r--r-- | nailgun/nailgun/test/integration/test_cluster_changes_handler.py | 3 | ||||
-rw-r--r-- | nailgun/nailgun/test/unit/test_lcm_transaction_serializer.py | 351 | ||||
-rw-r--r-- | nailgun/requirements.txt | 2 |
11 files changed, 769 insertions, 14 deletions
diff --git a/nailgun/nailgun/consts.py b/nailgun/nailgun/consts.py index 3d3b1ae..4c790cc 100644 --- a/nailgun/nailgun/consts.py +++ b/nailgun/nailgun/consts.py | |||
@@ -532,3 +532,7 @@ DPDK_DRIVER_IN_SRIOV_CASE = 'vfio-pci' | |||
532 | DEFAULT_MTU = 1500 | 532 | DEFAULT_MTU = 1500 |
533 | 533 | ||
534 | SIZE_OF_VLAN_TAG = 4 | 534 | SIZE_OF_VLAN_TAG = 4 |
535 | |||
536 | SERIALIZATION_POLICY = Enum( | ||
537 | 'distributed' | ||
538 | ) | ||
diff --git a/nailgun/nailgun/fixtures/openstack.yaml b/nailgun/nailgun/fixtures/openstack.yaml index 8b4d763..5557710 100644 --- a/nailgun/nailgun/fixtures/openstack.yaml +++ b/nailgun/nailgun/fixtures/openstack.yaml | |||
@@ -1114,6 +1114,55 @@ | |||
1114 | group: "security" | 1114 | group: "security" |
1115 | weight: 20 | 1115 | weight: 20 |
1116 | type: "radio" | 1116 | type: "radio" |
1117 | serialization_policy: | ||
1118 | value: "default" | ||
1119 | values: | ||
1120 | - data: "default" | ||
1121 | label: "Default serialization" | ||
1122 | description: "Run serialization on the master node only" | ||
1123 | - data: "distributed" | ||
1124 | label: "Distributed serialization" | ||
1125 | description: "Run serialization on the master and environment nodes. Nodes for serialization are selected only form that environment for wich serialization is performing." | ||
1126 | label: "Serialization policy" | ||
1127 | group: "general" | ||
1128 | weight: 30 | ||
1129 | type: "radio" | ||
1130 | ds_use_discover: | ||
1131 | group: "general" | ||
1132 | label: "Use discovered nodes as workers for serialization" | ||
1133 | type: "checkbox" | ||
1134 | value: true | ||
1135 | weight: 31 | ||
1136 | restrictions: | ||
1137 | - condition: "settings:common.serialization_policy.value != 'distributed'" | ||
1138 | action: "hide" | ||
1139 | ds_use_provisioned: | ||
1140 | group: "general" | ||
1141 | label: "Use provisioned nodes as workers for serialization" | ||
1142 | type: "checkbox" | ||
1143 | value: true | ||
1144 | weight: 32 | ||
1145 | restrictions: | ||
1146 | - condition: "settings:common.serialization_policy.value != 'distributed'" | ||
1147 | action: "hide" | ||
1148 | ds_use_error: | ||
1149 | group: "general" | ||
1150 | label: "Use nodes in error state as workers for serialization" | ||
1151 | type: "checkbox" | ||
1152 | value: true | ||
1153 | weight: 33 | ||
1154 | restrictions: | ||
1155 | - condition: "settings:common.serialization_policy.value != 'distributed'" | ||
1156 | action: "hide" | ||
1157 | ds_use_ready: | ||
1158 | group: "general" | ||
1159 | label: "Use ready nodes as workers for serialization" | ||
1160 | type: "checkbox" | ||
1161 | value: false | ||
1162 | weight: 34 | ||
1163 | restrictions: | ||
1164 | - condition: "settings:common.serialization_policy.value != 'distributed'" | ||
1165 | action: "hide" | ||
1117 | public_network_assignment: | 1166 | public_network_assignment: |
1118 | metadata: | 1167 | metadata: |
1119 | weight: 10 | 1168 | weight: 10 |
diff --git a/nailgun/nailgun/lcm/task_serializer.py b/nailgun/nailgun/lcm/task_serializer.py index c004115..0d70f36 100644 --- a/nailgun/nailgun/lcm/task_serializer.py +++ b/nailgun/nailgun/lcm/task_serializer.py | |||
@@ -110,6 +110,8 @@ class Context(object): | |||
110 | return evaluate | 110 | return evaluate |
111 | 111 | ||
112 | def get_formatter_context(self, node_id): | 112 | def get_formatter_context(self, node_id): |
113 | # TODO(akislitsky) remove formatter context from the | ||
114 | # tasks serialization workflow | ||
113 | data = self._transaction.get_new_data(node_id) | 115 | data = self._transaction.get_new_data(node_id) |
114 | return { | 116 | return { |
115 | 'CLUSTER_ID': data.get('cluster', {}).get('id'), | 117 | 'CLUSTER_ID': data.get('cluster', {}).get('id'), |
@@ -147,9 +149,14 @@ class DeploymentTaskSerializer(object): | |||
147 | :return: the result | 149 | :return: the result |
148 | """ | 150 | """ |
149 | 151 | ||
150 | def serialize(self, node_id): | 152 | def serialize(self, node_id, formatter_context=None): |
151 | """Serialize task in expected by orchestrator format. | 153 | """Serialize task in expected by orchestrator format |
152 | 154 | ||
155 | If serialization is performed on the remote worker | ||
156 | we should pass formatter_context parameter with values | ||
157 | from the master node settings | ||
158 | |||
159 | :param formatter_context: formatter context | ||
153 | :param node_id: the target node_id | 160 | :param node_id: the target node_id |
154 | """ | 161 | """ |
155 | 162 | ||
@@ -157,10 +164,12 @@ class DeploymentTaskSerializer(object): | |||
157 | "serialize task %s for node %s", | 164 | "serialize task %s for node %s", |
158 | self.task_template['id'], node_id | 165 | self.task_template['id'], node_id |
159 | ) | 166 | ) |
167 | formatter_context = formatter_context \ | ||
168 | or self.context.get_formatter_context(node_id) | ||
160 | task = utils.traverse( | 169 | task = utils.traverse( |
161 | self.task_template, | 170 | self.task_template, |
162 | utils.text_format_safe, | 171 | utils.text_format_safe, |
163 | self.context.get_formatter_context(node_id), | 172 | formatter_context, |
164 | { | 173 | { |
165 | 'yaql_exp': self.context.get_yaql_interpreter( | 174 | 'yaql_exp': self.context.get_yaql_interpreter( |
166 | node_id, self.task_template['id']) | 175 | node_id, self.task_template['id']) |
diff --git a/nailgun/nailgun/lcm/transaction_serializer.py b/nailgun/nailgun/lcm/transaction_serializer.py index c02146b..32953c1 100644 --- a/nailgun/nailgun/lcm/transaction_serializer.py +++ b/nailgun/nailgun/lcm/transaction_serializer.py | |||
@@ -14,13 +14,21 @@ | |||
14 | # License for the specific language governing permissions and limitations | 14 | # License for the specific language governing permissions and limitations |
15 | # under the License. | 15 | # under the License. |
16 | 16 | ||
17 | from distutils.version import StrictVersion | 17 | import datetime |
18 | import multiprocessing | 18 | import multiprocessing |
19 | import os | ||
20 | from Queue import Queue | ||
21 | import shutil | ||
22 | import tempfile | ||
19 | 23 | ||
24 | import distributed | ||
25 | from distutils.version import StrictVersion | ||
20 | import six | 26 | import six |
27 | import toolz | ||
21 | 28 | ||
22 | from nailgun import consts | 29 | from nailgun import consts |
23 | from nailgun import errors | 30 | from nailgun import errors |
31 | from nailgun.lcm.task_serializer import Context | ||
24 | from nailgun.lcm.task_serializer import TasksSerializersFactory | 32 | from nailgun.lcm.task_serializer import TasksSerializersFactory |
25 | from nailgun.logger import logger | 33 | from nailgun.logger import logger |
26 | from nailgun.settings import settings | 34 | from nailgun.settings import settings |
@@ -128,7 +136,308 @@ class MultiProcessingConcurrencyPolicy(object): | |||
128 | pool.join() | 136 | pool.join() |
129 | 137 | ||
130 | 138 | ||
131 | def get_concurrency_policy(): | 139 | def _distributed_serialize_tasks_for_node(formatter_contexts_idx, |
140 | node_and_tasks, scattered_data): | ||
141 | """Remote serialization call for DistributedProcessingPolicy | ||
142 | |||
143 | Code of the function is copied to the workers and executed there, thus | ||
144 | we are including all required imports inside the function. | ||
145 | |||
146 | :param formatter_contexts_idx: dict of formatter contexts with node_id | ||
147 | value as key | ||
148 | :param node_and_tasks: list of node_id, task_data tuples | ||
149 | :param scattered_data: feature object, that points to data copied to | ||
150 | workers | ||
151 | :return: [(node_id, serialized), error] | ||
152 | """ | ||
153 | |||
154 | try: | ||
155 | factory = TasksSerializersFactory(scattered_data['context']) | ||
156 | |||
157 | # Restoring settings | ||
158 | settings.config = scattered_data['settings_config'] | ||
159 | for k in formatter_contexts_idx: | ||
160 | formatter_contexts_idx[k]['SETTINGS'] = settings | ||
161 | |||
162 | except Exception as e: | ||
163 | logger.exception("Distributed serialization failed") | ||
164 | return [((None, None), e)] | ||
165 | |||
166 | result = [] | ||
167 | |||
168 | for node_and_task in node_and_tasks: | ||
169 | |||
170 | node_id = None | ||
171 | try: | ||
172 | node_id, task = node_and_task | ||
173 | |||
174 | logger.debug("Starting distributed node %s task %s serialization", | ||
175 | node_id, task['id']) | ||
176 | |||
177 | formatter_context = formatter_contexts_idx[node_id] | ||
178 | |||
179 | serializer = factory.create_serializer(task) | ||
180 | serialized = serializer.serialize( | ||
181 | node_id, formatter_context=formatter_context) | ||
182 | |||
183 | logger.debug("Distributed node %s task %s serialization " | ||
184 | "result: %s", node_id, task['id'], serialized) | ||
185 | |||
186 | result.append(((node_id, serialized), None)) | ||
187 | except Exception as e: | ||
188 | logger.exception("Distributed serialization failed") | ||
189 | result.append(((node_id, None), e)) | ||
190 | break | ||
191 | |||
192 | logger.debug("Processed tasks count: %s", len(result)) | ||
193 | return result | ||
194 | |||
195 | |||
196 | class DistributedProcessingPolicy(object): | ||
197 | |||
198 | def __init__(self): | ||
199 | self.sent_jobs = Queue() | ||
200 | self.sent_jobs_count = 0 | ||
201 | |||
202 | def _consume_jobs(self, chunk_size=None): | ||
203 | """Consumes jobs | ||
204 | |||
205 | If chunk_size is set function consumes specified number of | ||
206 | Finished tasks or less if sent_jobs_ids queue became empty. | ||
207 | If chunk_size is None function consumes jobs until | ||
208 | sent_jobs_ids queue became empty. | ||
209 | Jobs with statuses Cancelled, Abandoned, Terminated will be | ||
210 | resent and their ids added to sent_jobs_ids queue | ||
211 | |||
212 | :param chunk_size: size of consuming chunk | ||
213 | :return: generator on job results | ||
214 | """ | ||
215 | logger.debug("Consuming jobs started") | ||
216 | |||
217 | jobs_to_consume = [] | ||
218 | while not self.sent_jobs.empty(): | ||
219 | job = self.sent_jobs.get() | ||
220 | jobs_to_consume.append(job) | ||
221 | |||
222 | if chunk_size is not None: | ||
223 | chunk_size -= 1 | ||
224 | if chunk_size <= 0: | ||
225 | break | ||
226 | |||
227 | for ready_job in distributed.as_completed(jobs_to_consume): | ||
228 | results = ready_job.result() | ||
229 | self.sent_jobs_count -= 1 | ||
230 | |||
231 | for result in results: | ||
232 | (node_id, serialized), exc = result | ||
233 | logger.debug("Got ready task for node %s, serialized: %s, " | ||
234 | "error: %s", node_id, serialized, exc) | ||
235 | if exc is not None: | ||
236 | raise exc | ||
237 | yield node_id, serialized | ||
238 | |||
239 | logger.debug("Consuming jobs finished") | ||
240 | |||
241 | def _get_formatter_context(self, task_context, formatter_contexts_idx, | ||
242 | node_id): | ||
243 | try: | ||
244 | return formatter_contexts_idx[node_id] | ||
245 | except KeyError: | ||
246 | pass | ||
247 | |||
248 | logger.debug("Calculating formatter context for node %s", node_id) | ||
249 | formatter_context = task_context.get_formatter_context( | ||
250 | node_id) | ||
251 | # Settings file is already sent to the workers | ||
252 | formatter_context.pop('SETTINGS', None) | ||
253 | formatter_contexts_idx[node_id] = formatter_context | ||
254 | |||
255 | return formatter_context | ||
256 | |||
257 | def _upload_nailgun_code(self, job_cluster): | ||
258 | """Creates zip of current nailgun code and uploads it to workers | ||
259 | |||
260 | TODO(akislitsky): add workers scope when it will be implemented | ||
261 | in the distributed library | ||
262 | |||
263 | :param job_cluster: distributed.Client | ||
264 | """ | ||
265 | logger.debug("Compressing nailgun code") | ||
266 | file_dir = os.path.dirname(__file__) | ||
267 | nailgun_root_dir = os.path.realpath(os.path.join(file_dir, '..', '..')) | ||
268 | archive = os.path.join(tempfile.gettempdir(), 'nailgun') | ||
269 | result = shutil.make_archive(archive, 'zip', nailgun_root_dir, | ||
270 | 'nailgun') | ||
271 | logger.debug("Nailgun code saved to: %s", result) | ||
272 | |||
273 | logger.debug("Uploading nailgun archive %s to workers", result) | ||
274 | job_cluster.upload_file(result) | ||
275 | |||
276 | def _scatter_data(self, job_cluster, context, workers): | ||
277 | logger.debug("Scattering data to workers started") | ||
278 | shared_data = {'context': context, 'settings_config': settings.config} | ||
279 | scattered = job_cluster.scatter(shared_data, broadcast=True, | ||
280 | workers=workers) | ||
281 | # Waiting data is scattered to workers | ||
282 | distributed.wait(scattered.values()) | ||
283 | logger.debug("Scattering data to workers finished") | ||
284 | |||
285 | return scattered | ||
286 | |||
287 | def _get_allowed_nodes_statuses(self, context): | ||
288 | """Extracts node statuses that allows distributed serialization""" | ||
289 | common = context.new.get('common', {}) | ||
290 | cluster = common.get('cluster', {}) | ||
291 | logger.debug("Getting allowed nodes statuses to use as serialization " | ||
292 | "workers for cluster %s", cluster.get('id')) | ||
293 | check_fields = { | ||
294 | 'ds_use_ready': consts.NODE_STATUSES.ready, | ||
295 | 'ds_use_provisioned': consts.NODE_STATUSES.provisioned, | ||
296 | 'ds_use_discover': consts.NODE_STATUSES.discover, | ||
297 | 'ds_use_error': consts.NODE_STATUSES.error | ||
298 | } | ||
299 | statuses = set() | ||
300 | for field, node_status in check_fields.items(): | ||
301 | if common.get(field): | ||
302 | statuses.add(node_status) | ||
303 | |||
304 | logger.debug("Allowed nodes statuses to use as serialization workers " | ||
305 | "for cluster %s are: %s", cluster.get('id'), statuses) | ||
306 | return statuses | ||
307 | |||
308 | def _get_allowed_nodes_ips(self, context): | ||
309 | """Filters online nodes from cluster by their status | ||
310 | |||
311 | In the cluster settings we select nodes statuses allowed for | ||
312 | using in the distributed serialization. Accordingly to selected | ||
313 | statuses nodes are going to be filtered. | ||
314 | |||
315 | :param context: TransactionContext | ||
316 | :return: set of allowed nodes ips | ||
317 | """ | ||
318 | ips = set() | ||
319 | allowed_statuses = self._get_allowed_nodes_statuses(context) | ||
320 | for node in six.itervalues(context.new.get('nodes', {})): | ||
321 | if node.get('status') in allowed_statuses: | ||
322 | ips.add(node.get('ip')) | ||
323 | ips.add(settings.MASTER_IP) | ||
324 | return ips | ||
325 | |||
326 | def _get_allowed_workers(self, job_cluster, allowed_ips): | ||
327 | """Calculates workers addresses for distributed serialization | ||
328 | |||
329 | Only workers that placed on the allowed nodes must be selected | ||
330 | for the serialization. | ||
331 | |||
332 | :param job_cluster: distributed.Client | ||
333 | :param allowed_ips: allowed for serialization nodes ips | ||
334 | :return: list of workers addresses in format 'ip:port' | ||
335 | """ | ||
336 | logger.debug("Getting allowed workers") | ||
337 | workers = {} | ||
338 | |||
339 | # Worker has address like tcp://ip:port | ||
340 | info = job_cluster.scheduler_info() | ||
341 | for worker_addr in six.iterkeys(info['workers']): | ||
342 | ip_port = worker_addr.split('//')[1] | ||
343 | ip = ip_port.split(':')[0] | ||
344 | if ip not in allowed_ips: | ||
345 | continue | ||
346 | try: | ||
347 | pool = workers[ip] | ||
348 | pool.add(ip_port) | ||
349 | except KeyError: | ||
350 | workers[ip] = set([ip_port]) | ||
351 | |||
352 | return list(toolz.itertoolz.concat(six.itervalues(workers))) | ||
353 | |||
354 | def execute(self, context, _, tasks): | ||
355 | """Executes task serialization on distributed nodes | ||
356 | |||
357 | :param context: the transaction context | ||
358 | :param _: serializers factory | ||
359 | :param tasks: the tasks to serialize | ||
360 | :return sequence of serialized tasks | ||
361 | """ | ||
362 | logger.debug("Performing distributed tasks processing") | ||
363 | sched_address = '{0}:{1}'.format(settings.MASTER_IP, | ||
364 | settings.LCM_DS_JOB_SHEDULER_PORT) | ||
365 | job_cluster = distributed.Client(sched_address) | ||
366 | |||
367 | allowed_ips = self._get_allowed_nodes_ips(context) | ||
368 | workers = self._get_allowed_workers(job_cluster, allowed_ips) | ||
369 | logger.debug("Allowed workers list for serialization: %s", workers) | ||
370 | workers_ips = set([ip_port.split(':')[0] for ip_port in workers]) | ||
371 | logger.debug("Allowed workers ips list for serialization: %s", | ||
372 | workers_ips) | ||
373 | |||
374 | task_context = Context(context) | ||
375 | formatter_contexts_idx = {} | ||
376 | workers_num = len(workers) | ||
377 | max_jobs_in_queue = workers_num * settings.LCM_DS_NODE_LOAD_COEFF | ||
378 | logger.debug("Max jobs allowed in queue: %s", max_jobs_in_queue) | ||
379 | |||
380 | start = datetime.datetime.utcnow() | ||
381 | tasks_count = 0 | ||
382 | |||
383 | try: | ||
384 | self._upload_nailgun_code(job_cluster) | ||
385 | scattered = self._scatter_data(job_cluster, context, workers) | ||
386 | |||
387 | for tasks_chunk in toolz.partition_all( | ||
388 | settings.LCM_DS_TASKS_PER_JOB, tasks): | ||
389 | |||
390 | formatter_contexts_for_tasks = {} | ||
391 | |||
392 | # Collecting required contexts for tasks | ||
393 | for task in tasks_chunk: | ||
394 | node_id, task_data = task | ||
395 | formatter_context = self._get_formatter_context( | ||
396 | task_context, formatter_contexts_idx, node_id) | ||
397 | if node_id not in formatter_contexts_for_tasks: | ||
398 | formatter_contexts_for_tasks[node_id] = \ | ||
399 | formatter_context | ||
400 | |||
401 | logger.debug("Submitting job for tasks chunk: %s", tasks_chunk) | ||
402 | job = job_cluster.submit( | ||
403 | _distributed_serialize_tasks_for_node, | ||
404 | formatter_contexts_for_tasks, | ||
405 | tasks_chunk, | ||
406 | scattered, | ||
407 | workers=workers_ips | ||
408 | ) | ||
409 | |||
410 | self.sent_jobs.put(job) | ||
411 | self.sent_jobs_count += 1 | ||
412 | |||
413 | # We are limit the max number of tasks by the number of nodes | ||
414 | # which are used in the serialization | ||
415 | if self.sent_jobs_count >= max_jobs_in_queue: | ||
416 | for result in self._consume_jobs(chunk_size=workers_num): | ||
417 | tasks_count += 1 | ||
418 | yield result | ||
419 | |||
420 | # We have no tasks any more but have unconsumed jobs | ||
421 | for result in self._consume_jobs(): | ||
422 | tasks_count += 1 | ||
423 | yield result | ||
424 | finally: | ||
425 | end = datetime.datetime.utcnow() | ||
426 | logger.debug("Distributed tasks processing finished. " | ||
427 | "Total time: %s. Tasks processed: %s", | ||
428 | end - start, tasks_count) | ||
429 | job_cluster.shutdown() | ||
430 | |||
431 | |||
432 | def is_distributed_processing_enabled(context): | ||
433 | common = context.new.get('common', {}) | ||
434 | return common.get('serialization_policy') == \ | ||
435 | consts.SERIALIZATION_POLICY.distributed | ||
436 | |||
437 | |||
438 | def get_processing_policy(context): | ||
439 | if is_distributed_processing_enabled(context): | ||
440 | return DistributedProcessingPolicy() | ||
132 | cpu_num = settings.LCM_SERIALIZERS_CONCURRENCY_FACTOR | 441 | cpu_num = settings.LCM_SERIALIZERS_CONCURRENCY_FACTOR |
133 | if not cpu_num: | 442 | if not cpu_num: |
134 | try: | 443 | try: |
@@ -162,7 +471,7 @@ class TransactionSerializer(object): | |||
162 | # ids of nodes in this group and how many nodes in this group can fail | 471 | # ids of nodes in this group and how many nodes in this group can fail |
163 | # and deployment will not be interrupted | 472 | # and deployment will not be interrupted |
164 | self.fault_tolerance_groups = [] | 473 | self.fault_tolerance_groups = [] |
165 | self.concurrency_policy = get_concurrency_policy() | 474 | self.processing_policy = get_processing_policy(context) |
166 | 475 | ||
167 | @classmethod | 476 | @classmethod |
168 | def serialize(cls, context, tasks, resolver): | 477 | def serialize(cls, context, tasks, resolver): |
@@ -216,7 +525,7 @@ class TransactionSerializer(object): | |||
216 | :param tasks: the deployment tasks | 525 | :param tasks: the deployment tasks |
217 | :return the mapping tasks per node | 526 | :return the mapping tasks per node |
218 | """ | 527 | """ |
219 | serialized = self.concurrency_policy.execute( | 528 | serialized = self.processing_policy.execute( |
220 | self.context, | 529 | self.context, |
221 | self.serializer_factory_class, | 530 | self.serializer_factory_class, |
222 | self.expand_tasks(tasks) | 531 | self.expand_tasks(tasks) |
diff --git a/nailgun/nailgun/orchestrator/deployment_serializers.py b/nailgun/nailgun/orchestrator/deployment_serializers.py index d9bf3b6..eb40f5a 100644 --- a/nailgun/nailgun/orchestrator/deployment_serializers.py +++ b/nailgun/nailgun/orchestrator/deployment_serializers.py | |||
@@ -221,6 +221,7 @@ class DeploymentMultinodeSerializer(object): | |||
221 | 'role': role, | 221 | 'role': role, |
222 | 'vms_conf': node.vms_conf, | 222 | 'vms_conf': node.vms_conf, |
223 | 'fail_if_error': role in self.critical_roles, | 223 | 'fail_if_error': role in self.critical_roles, |
224 | 'ip': node.ip, | ||
224 | # TODO(eli): need to remove, requried for the fake thread only | 225 | # TODO(eli): need to remove, requried for the fake thread only |
225 | 'online': node.online, | 226 | 'online': node.online, |
226 | } | 227 | } |
diff --git a/nailgun/nailgun/settings.py b/nailgun/nailgun/settings.py index eee08c9..1f9222b 100644 --- a/nailgun/nailgun/settings.py +++ b/nailgun/nailgun/settings.py | |||
@@ -38,7 +38,21 @@ class NailgunSettings(object): | |||
38 | if test_config: | 38 | if test_config: |
39 | settings_files.append(test_config) | 39 | settings_files.append(test_config) |
40 | 40 | ||
41 | self.config = {} | 41 | # If settings.yaml doesn't exist we should have default |
42 | # config structure. Nailgun without settings is used | ||
43 | # when we distribute source code to the workers for | ||
44 | # distributed serialization | ||
45 | self.config = { | ||
46 | 'VERSION': {}, | ||
47 | 'DATABASE': { | ||
48 | 'engine': 'postgresql', | ||
49 | 'name': '', | ||
50 | 'host': '', | ||
51 | 'port': '0', | ||
52 | 'user': '', | ||
53 | 'passwd': '' | ||
54 | } | ||
55 | } | ||
42 | for sf in settings_files: | 56 | for sf in settings_files: |
43 | try: | 57 | try: |
44 | logger.debug("Trying to read config file %s" % sf) | 58 | logger.debug("Trying to read config file %s" % sf) |
@@ -47,9 +61,9 @@ class NailgunSettings(object): | |||
47 | logger.error("Error while reading config file %s: %s" % | 61 | logger.error("Error while reading config file %s: %s" % |
48 | (sf, str(e))) | 62 | (sf, str(e))) |
49 | 63 | ||
50 | self.config['VERSION']['api'] = self.config['API'] | 64 | self.config['VERSION']['api'] = self.config.get('API') |
51 | self.config['VERSION']['feature_groups'] = \ | 65 | self.config['VERSION']['feature_groups'] = \ |
52 | self.config['FEATURE_GROUPS'] | 66 | self.config.get('FEATURE_GROUPS') |
53 | 67 | ||
54 | fuel_release = self.get_file_content(consts.FUEL_RELEASE_FILE) | 68 | fuel_release = self.get_file_content(consts.FUEL_RELEASE_FILE) |
55 | if fuel_release: | 69 | if fuel_release: |
@@ -61,7 +75,7 @@ class NailgunSettings(object): | |||
61 | self.config['VERSION']['openstack_version'] = \ | 75 | self.config['VERSION']['openstack_version'] = \ |
62 | fuel_openstack_version | 76 | fuel_openstack_version |
63 | 77 | ||
64 | if int(self.config.get("DEVELOPMENT")): | 78 | if int(self.config.get("DEVELOPMENT", 0)): |
65 | logger.info("DEVELOPMENT MODE ON:") | 79 | logger.info("DEVELOPMENT MODE ON:") |
66 | here = os.path.abspath( | 80 | here = os.path.abspath( |
67 | os.path.join(os.path.dirname(__file__), '..') | 81 | os.path.join(os.path.dirname(__file__), '..') |
diff --git a/nailgun/nailgun/settings.yaml b/nailgun/nailgun/settings.yaml index 92b3518..c2faaa8 100644 --- a/nailgun/nailgun/settings.yaml +++ b/nailgun/nailgun/settings.yaml | |||
@@ -177,6 +177,15 @@ YAQL_MEMORY_QUOTA: 104857600 | |||
177 | 177 | ||
178 | LCM_CHECK_TASK_VERSION: False | 178 | LCM_CHECK_TASK_VERSION: False |
179 | 179 | ||
180 | # Coefficient for calculation max jobs queue length. If jobs number reaches the | ||
181 | # len(nodes) * load_coef we stop generate and start consume of jobs. | ||
182 | LCM_DS_NODE_LOAD_COEFF: 2 | ||
183 | # Port of dask-scheduler on the master node | ||
184 | LCM_DS_JOB_SHEDULER_PORT: 8002 | ||
185 | # Size of tasks chunk sending to the distributed worker | ||
186 | LCM_DS_TASKS_PER_JOB: 100 | ||
187 | |||
188 | |||
180 | DPDK_MAX_CPUS_PER_NIC: 4 | 189 | DPDK_MAX_CPUS_PER_NIC: 4 |
181 | 190 | ||
182 | TRUNCATE_LOG_ENTRIES: 100 | 191 | TRUNCATE_LOG_ENTRIES: 100 |
diff --git a/nailgun/nailgun/statistics/fuel_statistics/installation_info.py b/nailgun/nailgun/statistics/fuel_statistics/installation_info.py index 482d8c5..d024cdd 100644 --- a/nailgun/nailgun/statistics/fuel_statistics/installation_info.py +++ b/nailgun/nailgun/statistics/fuel_statistics/installation_info.py | |||
@@ -56,6 +56,16 @@ class InstallationInfo(object): | |||
56 | 'propagate_task_deploy', None), | 56 | 'propagate_task_deploy', None), |
57 | WhiteListRule(('common', 'security_groups', 'value'), | 57 | WhiteListRule(('common', 'security_groups', 'value'), |
58 | 'security_groups', None), | 58 | 'security_groups', None), |
59 | WhiteListRule(('common', 'serialization_policy', 'value'), | ||
60 | 'serialization_policy', None), | ||
61 | WhiteListRule(('common', 'ds_use_discover', 'value'), | ||
62 | 'ds_use_discover', None), | ||
63 | WhiteListRule(('common', 'ds_use_provisioned', 'value'), | ||
64 | 'ds_use_provisioned', None), | ||
65 | WhiteListRule(('common', 'ds_use_ready', 'value'), | ||
66 | 'ds_use_ready', None), | ||
67 | WhiteListRule(('common', 'ds_use_error', 'value'), | ||
68 | 'ds_use_error', None), | ||
59 | WhiteListRule(('corosync', 'verified', 'value'), | 69 | WhiteListRule(('corosync', 'verified', 'value'), |
60 | 'corosync_verified', None), | 70 | 'corosync_verified', None), |
61 | 71 | ||
diff --git a/nailgun/nailgun/test/integration/test_cluster_changes_handler.py b/nailgun/nailgun/test/integration/test_cluster_changes_handler.py index a8661ab..b8f4e98 100644 --- a/nailgun/nailgun/test/integration/test_cluster_changes_handler.py +++ b/nailgun/nailgun/test/integration/test_cluster_changes_handler.py | |||
@@ -187,6 +187,7 @@ class TestHandlers(BaseIntegrationTest): | |||
187 | 'fail_if_error': is_critical, | 187 | 'fail_if_error': is_critical, |
188 | 'vms_conf': [], | 188 | 'vms_conf': [], |
189 | 'fqdn': 'node-%d.%s' % (node.id, settings.DNS_DOMAIN), | 189 | 'fqdn': 'node-%d.%s' % (node.id, settings.DNS_DOMAIN), |
190 | 'ip': node.ip, | ||
190 | 191 | ||
191 | 'network_data': { | 192 | 'network_data': { |
192 | 'eth1': { | 193 | 'eth1': { |
@@ -603,6 +604,7 @@ class TestHandlers(BaseIntegrationTest): | |||
603 | 'online': node.online, | 604 | 'online': node.online, |
604 | 'fail_if_error': is_critical, | 605 | 'fail_if_error': is_critical, |
605 | 'fqdn': 'node-%d.%s' % (node.id, settings.DNS_DOMAIN), | 606 | 'fqdn': 'node-%d.%s' % (node.id, settings.DNS_DOMAIN), |
607 | 'ip': node.ip, | ||
606 | 'priority': 100, | 608 | 'priority': 100, |
607 | 'vms_conf': [], | 609 | 'vms_conf': [], |
608 | 'network_scheme': { | 610 | 'network_scheme': { |
@@ -1096,6 +1098,7 @@ class TestHandlers(BaseIntegrationTest): | |||
1096 | 'fail_if_error': is_critical, | 1098 | 'fail_if_error': is_critical, |
1097 | 'fqdn': 'node-%d.%s' % (node.id, settings.DNS_DOMAIN), | 1099 | 'fqdn': 'node-%d.%s' % (node.id, settings.DNS_DOMAIN), |
1098 | 'priority': 100, | 1100 | 'priority': 100, |
1101 | 'ip': node.ip, | ||
1099 | 'vms_conf': [], | 1102 | 'vms_conf': [], |
1100 | 1103 | ||
1101 | 'network_scheme': { | 1104 | 'network_scheme': { |
diff --git a/nailgun/nailgun/test/unit/test_lcm_transaction_serializer.py b/nailgun/nailgun/test/unit/test_lcm_transaction_serializer.py index 6450e19..9751d55 100644 --- a/nailgun/nailgun/test/unit/test_lcm_transaction_serializer.py +++ b/nailgun/nailgun/test/unit/test_lcm_transaction_serializer.py | |||
@@ -14,20 +14,24 @@ | |||
14 | # License for the specific language governing permissions and limitations | 14 | # License for the specific language governing permissions and limitations |
15 | # under the License. | 15 | # under the License. |
16 | 16 | ||
17 | import copy | ||
18 | import exceptions | ||
17 | import mock | 19 | import mock |
18 | import multiprocessing.dummy | 20 | import multiprocessing.dummy |
19 | 21 | ||
20 | from nailgun import consts | 22 | from nailgun import consts |
21 | from nailgun import errors | 23 | from nailgun import errors |
22 | from nailgun import lcm | 24 | from nailgun import lcm |
25 | from nailgun.lcm import TransactionContext | ||
26 | from nailgun.settings import settings | ||
27 | from nailgun.test.base import BaseTestCase | ||
23 | from nailgun.utils.resolvers import TagResolver | 28 | from nailgun.utils.resolvers import TagResolver |
24 | 29 | ||
25 | from nailgun.test.base import BaseUnitTest | ||
26 | 30 | ||
27 | 31 | class TestTransactionSerializer(BaseTestCase): | |
28 | class TestTransactionSerializer(BaseUnitTest): | ||
29 | @classmethod | 32 | @classmethod |
30 | def setUpClass(cls): | 33 | def setUpClass(cls): |
34 | super(TestTransactionSerializer, cls).setUpClass() | ||
31 | cls.tasks = [ | 35 | cls.tasks = [ |
32 | { | 36 | { |
33 | 'id': 'task1', 'roles': ['controller'], | 37 | 'id': 'task1', 'roles': ['controller'], |
@@ -462,3 +466,344 @@ class TestTransactionSerializer(BaseUnitTest): | |||
462 | 9, | 466 | 9, |
463 | lcm.TransactionSerializer.calculate_fault_tolerance('-1 ', 10) | 467 | lcm.TransactionSerializer.calculate_fault_tolerance('-1 ', 10) |
464 | ) | 468 | ) |
469 | |||
470 | def _get_context_for_distributed_serialization(self): | ||
471 | new = copy.deepcopy(self.context.new) | ||
472 | new['common']['serialization_policy'] = \ | ||
473 | consts.SERIALIZATION_POLICY.distributed | ||
474 | return TransactionContext(new) | ||
475 | |||
476 | @mock.patch('nailgun.lcm.transaction_serializer.distributed.wait') | ||
477 | @mock.patch('nailgun.lcm.transaction_serializer.distributed.as_completed') | ||
478 | def test_distributed_serialization(self, _, as_completed): | ||
479 | context = self._get_context_for_distributed_serialization() | ||
480 | |||
481 | with mock.patch( | ||
482 | 'nailgun.lcm.transaction_serializer.distributed.Client' | ||
483 | ) as job_cluster: | ||
484 | job = mock.Mock() | ||
485 | job.result.return_value = [ | ||
486 | (('1', {"id": "task1", "type": "skipped"}), None) | ||
487 | ] | ||
488 | |||
489 | submit = mock.Mock() | ||
490 | submit.return_value = job | ||
491 | |||
492 | as_completed.return_value = [job] | ||
493 | |||
494 | job_cluster.return_value.submit = submit | ||
495 | job_cluster.return_value.scheduler_info.return_value = \ | ||
496 | {'workers': {'tcp://worker': {}}} | ||
497 | |||
498 | lcm.TransactionSerializer.serialize( | ||
499 | context, self.tasks, self.resolver) | ||
500 | self.assertTrue(submit.called) | ||
501 | # 4 controller task + 1 compute + 1 cinder | ||
502 | self.assertTrue(6, submit.call_count) | ||
503 | |||
504 | @mock.patch('nailgun.lcm.transaction_serializer.distributed.wait') | ||
505 | @mock.patch('nailgun.lcm.transaction_serializer.distributed.as_completed') | ||
506 | @mock.patch('nailgun.lcm.transaction_serializer.' | ||
507 | 'DistributedProcessingPolicy._get_formatter_context') | ||
508 | def test_distributed_serialization_workers_scope(self, formatter_context, | ||
509 | as_completed, _): | ||
510 | context = self._get_context_for_distributed_serialization() | ||
511 | |||
512 | node_id = '1' | ||
513 | task = { | ||
514 | 'id': 'task1', 'roles': ['controller'], | ||
515 | 'type': 'puppet', 'version': '2.0.0' | ||
516 | } | ||
517 | |||
518 | with mock.patch( | ||
519 | 'nailgun.lcm.transaction_serializer.distributed.Client' | ||
520 | ) as job_cluster: | ||
521 | |||
522 | # Mocking job processing | ||
523 | job = mock.Mock() | ||
524 | job.result.return_value = [((node_id, task), None)] | ||
525 | |||
526 | submit = mock.Mock() | ||
527 | submit.return_value = job | ||
528 | |||
529 | as_completed.return_value = [job] | ||
530 | |||
531 | scatter = mock.Mock() | ||
532 | job_cluster.return_value.scatter = scatter | ||
533 | |||
534 | job_cluster.return_value.scatter.return_value = {} | ||
535 | job_cluster.return_value.submit = submit | ||
536 | |||
537 | formatter_context.return_value = {node_id: {}} | ||
538 | |||
539 | # Configuring available workers | ||
540 | job_cluster.return_value.scheduler_info.return_value = \ | ||
541 | { | ||
542 | 'workers': { | ||
543 | 'tcp://{0}'.format(settings.MASTER_IP): {}, | ||
544 | 'tcp://192.168.0.1:33334': {}, | ||
545 | 'tcp://127.0.0.2:33335': {}, | ||
546 | } | ||
547 | } | ||
548 | |||
549 | # Performing serialization | ||
550 | lcm.TransactionSerializer.serialize( | ||
551 | context, [task], self.resolver | ||
552 | ) | ||
553 | |||
554 | # Checking data is scattered only to expected workers | ||
555 | scatter.assert_called_once() | ||
556 | scatter.assert_called_with( | ||
557 | {'context': context, 'settings_config': settings.config}, | ||
558 | broadcast=True, | ||
559 | workers=[settings.MASTER_IP] | ||
560 | ) | ||
561 | |||
562 | # Checking submit job only to expected workers | ||
563 | submit.assert_called_once() | ||
564 | serializer = lcm.transaction_serializer | ||
565 | submit.assert_called_with( | ||
566 | serializer._distributed_serialize_tasks_for_node, | ||
567 | {node_id: formatter_context()}, | ||
568 | ((node_id, task),), | ||
569 | job_cluster().scatter(), | ||
570 | workers=set([settings.MASTER_IP]) | ||
571 | ) | ||
572 | |||
573 | def test_distributed_serialization_get_allowed_nodes_ips(self): | ||
574 | policy = lcm.transaction_serializer.DistributedProcessingPolicy() | ||
575 | |||
576 | context_data = { | ||
577 | 'common': { | ||
578 | 'serialization_policy': | ||
579 | consts.SERIALIZATION_POLICY.distributed, | ||
580 | 'ds_use_error': True, | ||
581 | 'ds_use_provisioned': True, | ||
582 | 'ds_use_discover': True, | ||
583 | 'ds_use_ready': False | ||
584 | }, | ||
585 | 'nodes': { | ||
586 | '1': {'status': consts.NODE_STATUSES.error, | ||
587 | 'ip': '10.20.0.3'}, | ||
588 | '2': {'status': consts.NODE_STATUSES.provisioned, | ||
589 | 'ip': '10.20.0.4'}, | ||
590 | '3': {'status': consts.NODE_STATUSES.discover, | ||
591 | 'ip': '10.20.0.5'}, | ||
592 | '4': {'status': consts.NODE_STATUSES.ready, | ||
593 | 'ip': '10.20.0.6'}, | ||
594 | } | ||
595 | } | ||
596 | |||
597 | actual = policy._get_allowed_nodes_ips( | ||
598 | TransactionContext(context_data)) | ||
599 | self.assertItemsEqual( | ||
600 | [settings.MASTER_IP, '10.20.0.3', '10.20.0.4', '10.20.0.5'], | ||
601 | actual | ||
602 | ) | ||
603 | |||
604 | def test_distributed_serialization_get_allowed_nodes_statuses(self): | ||
605 | policy = lcm.transaction_serializer.DistributedProcessingPolicy() | ||
606 | context_data = {} | ||
607 | actual = policy._get_allowed_nodes_statuses( | ||
608 | TransactionContext(context_data)) | ||
609 | self.assertItemsEqual([], actual) | ||
610 | |||
611 | context_data['common'] = { | ||
612 | 'ds_use_discover': False, | ||
613 | 'ds_use_provisioned': False, | ||
614 | 'ds_use_error': False, | ||
615 | 'ds_use_ready': False | ||
616 | } | ||
617 | actual = policy._get_allowed_nodes_statuses( | ||
618 | TransactionContext(context_data)) | ||
619 | self.assertItemsEqual([], actual) | ||
620 | |||
621 | context_data['common']['ds_use_discover'] = True | ||
622 | actual = policy._get_allowed_nodes_statuses( | ||
623 | TransactionContext(context_data)) | ||
624 | expected = [consts.NODE_STATUSES.discover] | ||
625 | self.assertItemsEqual(expected, actual) | ||
626 | |||
627 | context_data['common']['ds_use_provisioned'] = True | ||
628 | actual = policy._get_allowed_nodes_statuses( | ||
629 | TransactionContext(context_data)) | ||
630 | expected = [consts.NODE_STATUSES.discover, | ||
631 | consts.NODE_STATUSES.provisioned] | ||
632 | self.assertItemsEqual(expected, actual) | ||
633 | |||
634 | context_data['common']['ds_use_error'] = True | ||
635 | actual = policy._get_allowed_nodes_statuses( | ||
636 | TransactionContext(context_data)) | ||
637 | expected = [consts.NODE_STATUSES.discover, | ||
638 | consts.NODE_STATUSES.provisioned, | ||
639 | consts.NODE_STATUSES.error] | ||
640 | self.assertItemsEqual(expected, actual) | ||
641 | |||
642 | context_data['common']['ds_use_ready'] = True | ||
643 | actual = policy._get_allowed_nodes_statuses( | ||
644 | TransactionContext(context_data)) | ||
645 | expected = [consts.NODE_STATUSES.discover, | ||
646 | consts.NODE_STATUSES.provisioned, | ||
647 | consts.NODE_STATUSES.error, | ||
648 | consts.NODE_STATUSES.ready] | ||
649 | self.assertItemsEqual(expected, actual) | ||
650 | |||
651 | def test_distributed_serialization_get_allowed_workers(self): | ||
652 | policy = lcm.transaction_serializer.DistributedProcessingPolicy() | ||
653 | |||
654 | with mock.patch( | ||
655 | 'nailgun.lcm.transaction_serializer.distributed.Client' | ||
656 | ) as job_cluster: | ||
657 | job_cluster.scheduler_info.return_value = \ | ||
658 | {'workers': { | ||
659 | 'tcp://10.20.0.2:1': {}, | ||
660 | 'tcp://10.20.0.2:2': {}, | ||
661 | 'tcp://10.20.0.3:1': {}, | ||
662 | 'tcp://10.20.0.3:2': {}, | ||
663 | 'tcp://10.20.0.3:3': {}, | ||
664 | 'tcp://10.20.0.4:1': {}, | ||
665 | 'tcp://10.20.0.5:1': {} | ||
666 | }} | ||
667 | allowed_ips = set(['10.20.0.2', '10.20.0.3', '10.20.0.5']) | ||
668 | |||
669 | expected = ['10.20.0.2:1', '10.20.0.2:2', '10.20.0.3:1', | ||
670 | '10.20.0.3:2', '10.20.0.3:3', '10.20.0.5:1'] | ||
671 | actual = policy._get_allowed_workers(job_cluster, allowed_ips) | ||
672 | self.assertItemsEqual(expected, actual) | ||
673 | |||
674 | def test_distributed_serialization_serialize_task(self): | ||
675 | task = { | ||
676 | 'id': 'task1', 'roles': ['controller'], | ||
677 | 'type': 'puppet', 'version': '2.0.0', | ||
678 | 'parameters': { | ||
679 | 'master_ip': '{MN_IP}', | ||
680 | 'host': {'yaql_exp': '$.public_ssl.hostname'}, | ||
681 | 'attr': {'yaql_exp': '$node.attributes.a_str'} | ||
682 | } | ||
683 | } | ||
684 | |||
685 | formatter_contexts_idx = { | ||
686 | '1': {'MN_IP': '10.0.0.1'}, | ||
687 | '2': {} | ||
688 | } | ||
689 | scattered_data = { | ||
690 | 'settings_config': settings.config, | ||
691 | 'context': self.context | ||
692 | } | ||
693 | |||
694 | serializer = lcm.transaction_serializer | ||
695 | actual = serializer._distributed_serialize_tasks_for_node( | ||
696 | formatter_contexts_idx, [('1', task), ('2', task)], scattered_data) | ||
697 | |||
698 | expected = [ | ||
699 | ( | ||
700 | ( | ||
701 | '1', | ||
702 | { | ||
703 | 'id': 'task1', | ||
704 | 'type': 'puppet', | ||
705 | 'parameters': { | ||
706 | 'cwd': '/', | ||
707 | 'master_ip': '10.0.0.1', | ||
708 | 'host': 'localhost', | ||
709 | 'attr': 'text1' | ||
710 | }, | ||
711 | 'fail_on_error': True | ||
712 | } | ||
713 | ), | ||
714 | None | ||
715 | ), | ||
716 | ( | ||
717 | ( | ||
718 | '2', | ||
719 | { | ||
720 | 'id': 'task1', | ||
721 | 'type': 'puppet', | ||
722 | 'parameters': { | ||
723 | 'cwd': '/', | ||
724 | 'master_ip': '{MN_IP}', | ||
725 | 'host': 'localhost', | ||
726 | 'attr': 'text2' | ||
727 | }, | ||
728 | 'fail_on_error': True | ||
729 | } | ||
730 | ), | ||
731 | None | ||
732 | ) | ||
733 | ] | ||
734 | |||
735 | self.assertItemsEqual(expected, actual) | ||
736 | |||
737 | def test_distributed_serialization_serialize_task_failure(self): | ||
738 | task = { | ||
739 | 'id': 'task1', 'roles': ['controller'], | ||
740 | 'type': 'puppet', 'version': '2.0.0', | ||
741 | 'parameters': { | ||
742 | 'fake': {'yaql_exp': '$.some.fake_param'} | ||
743 | } | ||
744 | } | ||
745 | |||
746 | formatter_contexts_idx = {'2': {}} | ||
747 | scattered_data = { | ||
748 | 'settings_config': settings.config, | ||
749 | 'context': self.context | ||
750 | } | ||
751 | |||
752 | serializer = lcm.transaction_serializer | ||
753 | result = serializer._distributed_serialize_tasks_for_node( | ||
754 | formatter_contexts_idx, [('2', task)], scattered_data) | ||
755 | (_, __), err = result[0] | ||
756 | self.assertIsInstance(err, exceptions.KeyError) | ||
757 | |||
758 | |||
759 | class TestConcurrencyPolicy(BaseTestCase): | ||
760 | |||
761 | @mock.patch( | ||
762 | 'nailgun.lcm.transaction_serializer.multiprocessing.cpu_count', | ||
763 | return_value=1 | ||
764 | ) | ||
765 | def test_one_cpu(self, cpu_count): | ||
766 | policy = lcm.transaction_serializer.get_processing_policy( | ||
767 | lcm.TransactionContext({})) | ||
768 | self.assertIsInstance( | ||
769 | policy, | ||
770 | lcm.transaction_serializer.SingleWorkerConcurrencyPolicy | ||
771 | ) | ||
772 | self.assertTrue(cpu_count.is_called) | ||
773 | |||
774 | @mock.patch( | ||
775 | 'nailgun.lcm.transaction_serializer.multiprocessing.cpu_count', | ||
776 | return_value=0 | ||
777 | ) | ||
778 | def test_zero_cpu(self, cpu_count): | ||
779 | policy = lcm.transaction_serializer.get_processing_policy( | ||
780 | lcm.TransactionContext({})) | ||
781 | self.assertIsInstance( | ||
782 | policy, | ||
783 | lcm.transaction_serializer.SingleWorkerConcurrencyPolicy | ||
784 | ) | ||
785 | self.assertTrue(cpu_count.is_called) | ||
786 | |||
787 | @mock.patch( | ||
788 | 'nailgun.lcm.transaction_serializer.multiprocessing.cpu_count', | ||
789 | side_effect=NotImplementedError | ||
790 | ) | ||
791 | def test_cpu_count_not_implemented(self, cpu_count): | ||
792 | policy = lcm.transaction_serializer.get_processing_policy( | ||
793 | lcm.TransactionContext({})) | ||
794 | self.assertIsInstance( | ||
795 | policy, | ||
796 | lcm.transaction_serializer.SingleWorkerConcurrencyPolicy | ||
797 | ) | ||
798 | self.assertTrue(cpu_count.is_called) | ||
799 | |||
800 | def test_distributed_serialization_enabled_in_cluster(self): | ||
801 | context_data = {'common': { | ||
802 | 'serialization_policy': consts.SERIALIZATION_POLICY.distributed | ||
803 | }} | ||
804 | policy = lcm.transaction_serializer.get_processing_policy( | ||
805 | lcm.TransactionContext(context_data)) | ||
806 | self.assertIsInstance( | ||
807 | policy, | ||
808 | lcm.transaction_serializer.DistributedProcessingPolicy | ||
809 | ) | ||
diff --git a/nailgun/requirements.txt b/nailgun/requirements.txt index 96bed25..c702e8a 100644 --- a/nailgun/requirements.txt +++ b/nailgun/requirements.txt | |||
@@ -47,3 +47,5 @@ stevedore>=1.5.0 | |||
47 | # See: https://bugs.launchpad.net/fuel/+bug/1519727 | 47 | # See: https://bugs.launchpad.net/fuel/+bug/1519727 |
48 | setuptools<=18.5 | 48 | setuptools<=18.5 |
49 | yaql>=1.0.0 | 49 | yaql>=1.0.0 |
50 | # Distributed nodes serialization | ||
51 | distributed==1.16.0 | ||