From c9b9e650594fcc916d8688ff08f98f0c21ce7d2e Mon Sep 17 00:00:00 2001 From: "Brad P. Crochet" Date: Wed, 29 Mar 2017 22:25:34 -0400 Subject: [PATCH] Avoid race condition when setting up websocket There is a potential for a race condition if the websocket is not set up before the call is made to Mistral. The websocket setup can take sufficient time to set up, and if an error is thrown immediately from a workflow, it has the potential of returning a message, and having the websocket consume it before the websocket client is even set up. Also, I think this has the benefit of keeping the call to Mistral from even being made if there is something wrong with the websocket setup. Change-Id: Ib331037a7f5f4e59862d2b9646a83acdb18313eb --- tripleoclient/workflows/baremetal.py | 108 ++++++++++---------- tripleoclient/workflows/deployment.py | 12 +-- tripleoclient/workflows/package_update.py | 24 ++--- tripleoclient/workflows/parameters.py | 12 +-- tripleoclient/workflows/plan_management.py | 22 ++-- tripleoclient/workflows/scale.py | 12 +-- tripleoclient/workflows/stack_management.py | 12 +-- 7 files changed, 103 insertions(+), 99 deletions(-) diff --git a/tripleoclient/workflows/baremetal.py b/tripleoclient/workflows/baremetal.py index 47a5a31ac..4dfe664b5 100644 --- a/tripleoclient/workflows/baremetal.py +++ b/tripleoclient/workflows/baremetal.py @@ -28,13 +28,13 @@ def register_or_update(clients, **workflow_input): tripleoclients = clients.tripleoclient queue_name = workflow_input['queue_name'] - execution = base.start_workflow( - workflow_client, - 'tripleo.baremetal.v1.register_or_update', - workflow_input=workflow_input - ) - with tripleoclients.messaging_websocket(queue_name) as ws: + execution = base.start_workflow( + workflow_client, + 'tripleo.baremetal.v1.register_or_update', + workflow_input=workflow_input + ) + for payload in base.wait_for_messages(workflow_client, ws, execution): if 'message' in payload: print(payload['message']) @@ -74,14 +74,14 @@ def provide(clients, **workflow_input): tripleoclients = clients.tripleoclient queue_name = workflow_input['queue_name'] - execution = base.start_workflow( - workflow_client, - 'tripleo.baremetal.v1.provide', - workflow_input={'node_uuids': workflow_input['node_uuids'], - 'queue_name': queue_name} - ) - with tripleoclients.messaging_websocket(queue_name) as ws: + execution = base.start_workflow( + workflow_client, + 'tripleo.baremetal.v1.provide', + workflow_input={'node_uuids': workflow_input['node_uuids'], + 'queue_name': queue_name} + ) + for payload in base.wait_for_messages(workflow_client, ws, execution): if 'message' in payload: print(payload['message']) @@ -105,17 +105,19 @@ def introspect(clients, **workflow_input): tripleoclients = clients.tripleoclient queue_name = workflow_input['queue_name'] - execution = base.start_workflow( - workflow_client, - 'tripleo.baremetal.v1.introspect', - workflow_input={'node_uuids': workflow_input['node_uuids'], - 'run_validations': workflow_input['run_validations'], - 'queue_name': queue_name} - ) - print("Waiting for introspection to finish...") with tripleoclients.messaging_websocket(queue_name) as ws: + execution = base.start_workflow( + workflow_client, + 'tripleo.baremetal.v1.introspect', + workflow_input={ + 'node_uuids': workflow_input['node_uuids'], + 'run_validations': workflow_input['run_validations'], + 'queue_name': queue_name + } + ) + for payload in base.wait_for_messages(workflow_client, ws, execution): if 'message' in payload: print(payload['message']) @@ -140,19 +142,21 @@ def introspect_manageable_nodes(clients, **workflow_input): tripleoclients = clients.tripleoclient queue_name = workflow_input['queue_name'] - execution = base.start_workflow( - workflow_client, - 'tripleo.baremetal.v1.introspect_manageable_nodes', - workflow_input={'run_validations': workflow_input['run_validations'], - "queue_name": queue_name, } - ) - print("Waiting for introspection to finish...") errors = [] successful_node_uuids = set() with tripleoclients.messaging_websocket(queue_name) as ws: + execution = base.start_workflow( + workflow_client, + 'tripleo.baremetal.v1.introspect_manageable_nodes', + workflow_input={ + 'run_validations': workflow_input['run_validations'], + 'queue_name': queue_name, + } + ) + for payload in base.wait_for_messages(workflow_client, ws, execution): if 'message' in payload: print(payload['message']) @@ -192,13 +196,13 @@ def provide_manageable_nodes(clients, **workflow_input): tripleoclients = clients.tripleoclient queue_name = workflow_input['queue_name'] - execution = base.start_workflow( - workflow_client, - 'tripleo.baremetal.v1.provide_manageable_nodes', - workflow_input={"queue_name": queue_name, } - ) - with tripleoclients.messaging_websocket(queue_name) as ws: + execution = base.start_workflow( + workflow_client, + 'tripleo.baremetal.v1.provide_manageable_nodes', + workflow_input={"queue_name": queue_name, } + ) + for payload in base.wait_for_messages(workflow_client, ws, execution): if 'message' in payload: print(payload['message']) @@ -220,13 +224,13 @@ def configure(clients, **workflow_input): ooo_client = clients.tripleoclient queue_name = workflow_input['queue_name'] - execution = base.start_workflow( - workflow_client, - 'tripleo.baremetal.v1.configure', - workflow_input=workflow_input - ) - with ooo_client.messaging_websocket(queue_name) as ws: + execution = base.start_workflow( + workflow_client, + 'tripleo.baremetal.v1.configure', + workflow_input=workflow_input + ) + for payload in base.wait_for_messages(workflow_client, ws, execution): if 'message' in payload: print(payload['message']) @@ -246,13 +250,13 @@ def configure_manageable_nodes(clients, **workflow_input): ooo_client = clients.tripleoclient queue_name = workflow_input['queue_name'] - execution = base.start_workflow( - workflow_client, - 'tripleo.baremetal.v1.configure_manageable_nodes', - workflow_input=workflow_input - ) - with ooo_client.messaging_websocket(queue_name) as ws: + execution = base.start_workflow( + workflow_client, + 'tripleo.baremetal.v1.configure_manageable_nodes', + workflow_input=workflow_input + ) + for payload in base.wait_for_messages(workflow_client, ws, execution): if 'message' in payload: print(payload['message']) @@ -274,15 +278,15 @@ def create_raid_configuration(clients, **workflow_input): ooo_client = clients.tripleoclient queue_name = workflow_input['queue_name'] - execution = base.start_workflow( - workflow_client, - 'tripleo.baremetal.v1.create_raid_configuration', - workflow_input=workflow_input - ) - print('Creating RAID configuration for given nodes, this may take time') with ooo_client.messaging_websocket(queue_name) as ws: + execution = base.start_workflow( + workflow_client, + 'tripleo.baremetal.v1.create_raid_configuration', + workflow_input=workflow_input + ) + for payload in base.wait_for_messages(workflow_client, ws, execution): if 'message' in payload: print(payload['message']) diff --git a/tripleoclient/workflows/deployment.py b/tripleoclient/workflows/deployment.py index f0d0f99cf..53c0f2963 100644 --- a/tripleoclient/workflows/deployment.py +++ b/tripleoclient/workflows/deployment.py @@ -30,13 +30,13 @@ def deploy(clients, **workflow_input): tripleoclients = clients.tripleoclient queue_name = workflow_input['queue_name'] - execution = base.start_workflow( - workflow_client, - 'tripleo.deployment.v1.deploy_plan', - workflow_input=workflow_input - ) - with tripleoclients.messaging_websocket(queue_name) as ws: + execution = base.start_workflow( + workflow_client, + 'tripleo.deployment.v1.deploy_plan', + workflow_input=workflow_input + ) + # The deploy workflow ends once the Heat create/update starts. This # means that is shouldn't take very long. Wait for six minutes for # messages from the workflow. diff --git a/tripleoclient/workflows/package_update.py b/tripleoclient/workflows/package_update.py index f224df0df..b131a8f8b 100644 --- a/tripleoclient/workflows/package_update.py +++ b/tripleoclient/workflows/package_update.py @@ -25,13 +25,13 @@ def update(clients, **workflow_input): tripleoclients = clients.tripleoclient queue_name = workflow_input['queue_name'] - execution = base.start_workflow( - workflow_client, - 'tripleo.package_update.v1.package_update_plan', - workflow_input=workflow_input - ) - with tripleoclients.messaging_websocket(queue_name) as ws: + execution = base.start_workflow( + workflow_client, + 'tripleo.package_update.v1.package_update_plan', + workflow_input=workflow_input + ) + for payload in base.wait_for_messages(workflow_client, ws, execution): assert payload['status'] == "SUCCESS", pprint.pformat(payload) @@ -91,12 +91,12 @@ def clear_breakpoints(clients, **workflow_input): workflow_input['queue_name'] = str(uuid.uuid4()) queue_name = workflow_input['queue_name'] - execution = base.start_workflow( - workflow_client, - 'tripleo.package_update.v1.clear_breakpoints', - workflow_input=workflow_input - ) - with tripleoclients.messaging_websocket(queue_name) as ws: + execution = base.start_workflow( + workflow_client, + 'tripleo.package_update.v1.clear_breakpoints', + workflow_input=workflow_input + ) + for payload in base.wait_for_messages(workflow_client, ws, execution): assert payload['status'] == "SUCCESS", pprint.pformat(payload) diff --git a/tripleoclient/workflows/parameters.py b/tripleoclient/workflows/parameters.py index 7423d4608..4a4d638f1 100644 --- a/tripleoclient/workflows/parameters.py +++ b/tripleoclient/workflows/parameters.py @@ -35,13 +35,13 @@ def get_overcloud_passwords(clients, **workflow_input): tripleoclients = clients.tripleoclient queue_name = workflow_input['queue_name'] - execution = base.start_workflow( - workflow_client, - 'tripleo.plan_management.v1.get_passwords', - workflow_input=workflow_input - ) - with tripleoclients.messaging_websocket(queue_name) as ws: + execution = base.start_workflow( + workflow_client, + 'tripleo.plan_management.v1.get_passwords', + workflow_input=workflow_input + ) + # Getting the passwords is a quick operation, but to allow space for # delays or heavy loads, timeout after 60 seconds. for payload in base.wait_for_messages(workflow_client, ws, execution, diff --git a/tripleoclient/workflows/plan_management.py b/tripleoclient/workflows/plan_management.py index 9abc1d611..096967553 100644 --- a/tripleoclient/workflows/plan_management.py +++ b/tripleoclient/workflows/plan_management.py @@ -48,13 +48,13 @@ def create_default_plan(clients, **workflow_input): tripleoclients = clients.tripleoclient queue_name = workflow_input['queue_name'] - execution = base.start_workflow( - workflow_client, - 'tripleo.plan_management.v1.create_default_deployment_plan', - workflow_input=workflow_input - ) - with tripleoclients.messaging_websocket(queue_name) as ws: + execution = base.start_workflow( + workflow_client, + 'tripleo.plan_management.v1.create_default_deployment_plan', + workflow_input=workflow_input + ) + for payload in base.wait_for_messages(workflow_client, ws, execution, _WORKFLOW_TIMEOUT): if 'message' in payload: @@ -72,12 +72,12 @@ def _create_update_deployment_plan(clients, workflow, **workflow_input): tripleoclients = clients.tripleoclient queue_name = workflow_input['queue_name'] - execution = base.start_workflow( - workflow_client, workflow, - workflow_input=workflow_input - ) - with tripleoclients.messaging_websocket(queue_name) as ws: + execution = base.start_workflow( + workflow_client, workflow, + workflow_input=workflow_input + ) + for payload in base.wait_for_messages(workflow_client, ws, execution, _WORKFLOW_TIMEOUT): if 'message' in payload: diff --git a/tripleoclient/workflows/scale.py b/tripleoclient/workflows/scale.py index e08f859c7..32466d85e 100644 --- a/tripleoclient/workflows/scale.py +++ b/tripleoclient/workflows/scale.py @@ -26,13 +26,13 @@ def delete_node(clients, **workflow_input): tripleoclients = clients.tripleoclient queue_name = workflow_input['queue_name'] - execution = base.start_workflow( - workflow_client, - 'tripleo.scale.v1.delete_node', - workflow_input=workflow_input - ) - with tripleoclients.messaging_websocket(queue_name) as ws: + execution = base.start_workflow( + workflow_client, + 'tripleo.scale.v1.delete_node', + workflow_input=workflow_input + ) + for payload in base.wait_for_messages(workflow_client, ws, execution, 360): if payload['status'] != "SUCCESS": diff --git a/tripleoclient/workflows/stack_management.py b/tripleoclient/workflows/stack_management.py index 03d8ef4b3..bad8de426 100644 --- a/tripleoclient/workflows/stack_management.py +++ b/tripleoclient/workflows/stack_management.py @@ -37,13 +37,13 @@ def delete_stack(clients, stack): queue_name = workflow_input['queue_name'] - execution = base.start_workflow( - workflow_client, - 'tripleo.stack.v1.delete_stack', - workflow_input=workflow_input - ) - with tripleoclient.messaging_websocket(queue_name) as ws: + execution = base.start_workflow( + workflow_client, + 'tripleo.stack.v1.delete_stack', + workflow_input=workflow_input + ) + for payload in base.wait_for_messages(workflow_client, ws, execution): if payload['status'] != "SUCCESS": raise InvalidConfiguration(payload['message'])