Merge "Avoid race condition when setting up websocket"
This commit is contained in:
commit
90a9ae0b6d
|
@ -28,13 +28,13 @@ def register_or_update(clients, **workflow_input):
|
|||
tripleoclients = clients.tripleoclient
|
||||
queue_name = workflow_input['queue_name']
|
||||
|
||||
execution = base.start_workflow(
|
||||
workflow_client,
|
||||
'tripleo.baremetal.v1.register_or_update',
|
||||
workflow_input=workflow_input
|
||||
)
|
||||
|
||||
with tripleoclients.messaging_websocket(queue_name) as ws:
|
||||
execution = base.start_workflow(
|
||||
workflow_client,
|
||||
'tripleo.baremetal.v1.register_or_update',
|
||||
workflow_input=workflow_input
|
||||
)
|
||||
|
||||
for payload in base.wait_for_messages(workflow_client, ws, execution):
|
||||
if 'message' in payload:
|
||||
print(payload['message'])
|
||||
|
@ -74,14 +74,14 @@ def provide(clients, **workflow_input):
|
|||
tripleoclients = clients.tripleoclient
|
||||
queue_name = workflow_input['queue_name']
|
||||
|
||||
execution = base.start_workflow(
|
||||
workflow_client,
|
||||
'tripleo.baremetal.v1.provide',
|
||||
workflow_input={'node_uuids': workflow_input['node_uuids'],
|
||||
'queue_name': queue_name}
|
||||
)
|
||||
|
||||
with tripleoclients.messaging_websocket(queue_name) as ws:
|
||||
execution = base.start_workflow(
|
||||
workflow_client,
|
||||
'tripleo.baremetal.v1.provide',
|
||||
workflow_input={'node_uuids': workflow_input['node_uuids'],
|
||||
'queue_name': queue_name}
|
||||
)
|
||||
|
||||
for payload in base.wait_for_messages(workflow_client, ws, execution):
|
||||
if 'message' in payload:
|
||||
print(payload['message'])
|
||||
|
@ -105,17 +105,19 @@ def introspect(clients, **workflow_input):
|
|||
tripleoclients = clients.tripleoclient
|
||||
queue_name = workflow_input['queue_name']
|
||||
|
||||
execution = base.start_workflow(
|
||||
workflow_client,
|
||||
'tripleo.baremetal.v1.introspect',
|
||||
workflow_input={'node_uuids': workflow_input['node_uuids'],
|
||||
'run_validations': workflow_input['run_validations'],
|
||||
'queue_name': queue_name}
|
||||
)
|
||||
|
||||
print("Waiting for introspection to finish...")
|
||||
|
||||
with tripleoclients.messaging_websocket(queue_name) as ws:
|
||||
execution = base.start_workflow(
|
||||
workflow_client,
|
||||
'tripleo.baremetal.v1.introspect',
|
||||
workflow_input={
|
||||
'node_uuids': workflow_input['node_uuids'],
|
||||
'run_validations': workflow_input['run_validations'],
|
||||
'queue_name': queue_name
|
||||
}
|
||||
)
|
||||
|
||||
for payload in base.wait_for_messages(workflow_client, ws, execution):
|
||||
if 'message' in payload:
|
||||
print(payload['message'])
|
||||
|
@ -140,19 +142,21 @@ def introspect_manageable_nodes(clients, **workflow_input):
|
|||
tripleoclients = clients.tripleoclient
|
||||
queue_name = workflow_input['queue_name']
|
||||
|
||||
execution = base.start_workflow(
|
||||
workflow_client,
|
||||
'tripleo.baremetal.v1.introspect_manageable_nodes',
|
||||
workflow_input={'run_validations': workflow_input['run_validations'],
|
||||
"queue_name": queue_name, }
|
||||
)
|
||||
|
||||
print("Waiting for introspection to finish...")
|
||||
|
||||
errors = []
|
||||
successful_node_uuids = set()
|
||||
|
||||
with tripleoclients.messaging_websocket(queue_name) as ws:
|
||||
execution = base.start_workflow(
|
||||
workflow_client,
|
||||
'tripleo.baremetal.v1.introspect_manageable_nodes',
|
||||
workflow_input={
|
||||
'run_validations': workflow_input['run_validations'],
|
||||
'queue_name': queue_name,
|
||||
}
|
||||
)
|
||||
|
||||
for payload in base.wait_for_messages(workflow_client, ws, execution):
|
||||
if 'message' in payload:
|
||||
print(payload['message'])
|
||||
|
@ -192,13 +196,13 @@ def provide_manageable_nodes(clients, **workflow_input):
|
|||
tripleoclients = clients.tripleoclient
|
||||
queue_name = workflow_input['queue_name']
|
||||
|
||||
execution = base.start_workflow(
|
||||
workflow_client,
|
||||
'tripleo.baremetal.v1.provide_manageable_nodes',
|
||||
workflow_input={"queue_name": queue_name, }
|
||||
)
|
||||
|
||||
with tripleoclients.messaging_websocket(queue_name) as ws:
|
||||
execution = base.start_workflow(
|
||||
workflow_client,
|
||||
'tripleo.baremetal.v1.provide_manageable_nodes',
|
||||
workflow_input={"queue_name": queue_name, }
|
||||
)
|
||||
|
||||
for payload in base.wait_for_messages(workflow_client, ws, execution):
|
||||
if 'message' in payload:
|
||||
print(payload['message'])
|
||||
|
@ -220,13 +224,13 @@ def configure(clients, **workflow_input):
|
|||
ooo_client = clients.tripleoclient
|
||||
queue_name = workflow_input['queue_name']
|
||||
|
||||
execution = base.start_workflow(
|
||||
workflow_client,
|
||||
'tripleo.baremetal.v1.configure',
|
||||
workflow_input=workflow_input
|
||||
)
|
||||
|
||||
with ooo_client.messaging_websocket(queue_name) as ws:
|
||||
execution = base.start_workflow(
|
||||
workflow_client,
|
||||
'tripleo.baremetal.v1.configure',
|
||||
workflow_input=workflow_input
|
||||
)
|
||||
|
||||
for payload in base.wait_for_messages(workflow_client, ws, execution):
|
||||
if 'message' in payload:
|
||||
print(payload['message'])
|
||||
|
@ -246,13 +250,13 @@ def configure_manageable_nodes(clients, **workflow_input):
|
|||
ooo_client = clients.tripleoclient
|
||||
queue_name = workflow_input['queue_name']
|
||||
|
||||
execution = base.start_workflow(
|
||||
workflow_client,
|
||||
'tripleo.baremetal.v1.configure_manageable_nodes',
|
||||
workflow_input=workflow_input
|
||||
)
|
||||
|
||||
with ooo_client.messaging_websocket(queue_name) as ws:
|
||||
execution = base.start_workflow(
|
||||
workflow_client,
|
||||
'tripleo.baremetal.v1.configure_manageable_nodes',
|
||||
workflow_input=workflow_input
|
||||
)
|
||||
|
||||
for payload in base.wait_for_messages(workflow_client, ws, execution):
|
||||
if 'message' in payload:
|
||||
print(payload['message'])
|
||||
|
@ -274,15 +278,15 @@ def create_raid_configuration(clients, **workflow_input):
|
|||
ooo_client = clients.tripleoclient
|
||||
queue_name = workflow_input['queue_name']
|
||||
|
||||
execution = base.start_workflow(
|
||||
workflow_client,
|
||||
'tripleo.baremetal.v1.create_raid_configuration',
|
||||
workflow_input=workflow_input
|
||||
)
|
||||
|
||||
print('Creating RAID configuration for given nodes, this may take time')
|
||||
|
||||
with ooo_client.messaging_websocket(queue_name) as ws:
|
||||
execution = base.start_workflow(
|
||||
workflow_client,
|
||||
'tripleo.baremetal.v1.create_raid_configuration',
|
||||
workflow_input=workflow_input
|
||||
)
|
||||
|
||||
for payload in base.wait_for_messages(workflow_client, ws, execution):
|
||||
if 'message' in payload:
|
||||
print(payload['message'])
|
||||
|
|
|
@ -30,13 +30,13 @@ def deploy(clients, **workflow_input):
|
|||
tripleoclients = clients.tripleoclient
|
||||
queue_name = workflow_input['queue_name']
|
||||
|
||||
execution = base.start_workflow(
|
||||
workflow_client,
|
||||
'tripleo.deployment.v1.deploy_plan',
|
||||
workflow_input=workflow_input
|
||||
)
|
||||
|
||||
with tripleoclients.messaging_websocket(queue_name) as ws:
|
||||
execution = base.start_workflow(
|
||||
workflow_client,
|
||||
'tripleo.deployment.v1.deploy_plan',
|
||||
workflow_input=workflow_input
|
||||
)
|
||||
|
||||
# The deploy workflow ends once the Heat create/update starts. This
|
||||
# means that is shouldn't take very long. Wait for six minutes for
|
||||
# messages from the workflow.
|
||||
|
|
|
@ -25,13 +25,13 @@ def update(clients, **workflow_input):
|
|||
tripleoclients = clients.tripleoclient
|
||||
queue_name = workflow_input['queue_name']
|
||||
|
||||
execution = base.start_workflow(
|
||||
workflow_client,
|
||||
'tripleo.package_update.v1.package_update_plan',
|
||||
workflow_input=workflow_input
|
||||
)
|
||||
|
||||
with tripleoclients.messaging_websocket(queue_name) as ws:
|
||||
execution = base.start_workflow(
|
||||
workflow_client,
|
||||
'tripleo.package_update.v1.package_update_plan',
|
||||
workflow_input=workflow_input
|
||||
)
|
||||
|
||||
for payload in base.wait_for_messages(workflow_client, ws, execution):
|
||||
assert payload['status'] == "SUCCESS", pprint.pformat(payload)
|
||||
|
||||
|
@ -91,12 +91,12 @@ def clear_breakpoints(clients, **workflow_input):
|
|||
workflow_input['queue_name'] = str(uuid.uuid4())
|
||||
queue_name = workflow_input['queue_name']
|
||||
|
||||
execution = base.start_workflow(
|
||||
workflow_client,
|
||||
'tripleo.package_update.v1.clear_breakpoints',
|
||||
workflow_input=workflow_input
|
||||
)
|
||||
|
||||
with tripleoclients.messaging_websocket(queue_name) as ws:
|
||||
execution = base.start_workflow(
|
||||
workflow_client,
|
||||
'tripleo.package_update.v1.clear_breakpoints',
|
||||
workflow_input=workflow_input
|
||||
)
|
||||
|
||||
for payload in base.wait_for_messages(workflow_client, ws, execution):
|
||||
assert payload['status'] == "SUCCESS", pprint.pformat(payload)
|
||||
|
|
|
@ -35,13 +35,13 @@ def get_overcloud_passwords(clients, **workflow_input):
|
|||
tripleoclients = clients.tripleoclient
|
||||
queue_name = workflow_input['queue_name']
|
||||
|
||||
execution = base.start_workflow(
|
||||
workflow_client,
|
||||
'tripleo.plan_management.v1.get_passwords',
|
||||
workflow_input=workflow_input
|
||||
)
|
||||
|
||||
with tripleoclients.messaging_websocket(queue_name) as ws:
|
||||
execution = base.start_workflow(
|
||||
workflow_client,
|
||||
'tripleo.plan_management.v1.get_passwords',
|
||||
workflow_input=workflow_input
|
||||
)
|
||||
|
||||
# Getting the passwords is a quick operation, but to allow space for
|
||||
# delays or heavy loads, timeout after 60 seconds.
|
||||
for payload in base.wait_for_messages(workflow_client, ws, execution,
|
||||
|
|
|
@ -48,13 +48,13 @@ def create_default_plan(clients, **workflow_input):
|
|||
tripleoclients = clients.tripleoclient
|
||||
queue_name = workflow_input['queue_name']
|
||||
|
||||
execution = base.start_workflow(
|
||||
workflow_client,
|
||||
'tripleo.plan_management.v1.create_default_deployment_plan',
|
||||
workflow_input=workflow_input
|
||||
)
|
||||
|
||||
with tripleoclients.messaging_websocket(queue_name) as ws:
|
||||
execution = base.start_workflow(
|
||||
workflow_client,
|
||||
'tripleo.plan_management.v1.create_default_deployment_plan',
|
||||
workflow_input=workflow_input
|
||||
)
|
||||
|
||||
for payload in base.wait_for_messages(workflow_client, ws, execution,
|
||||
_WORKFLOW_TIMEOUT):
|
||||
if 'message' in payload:
|
||||
|
@ -72,12 +72,12 @@ def _create_update_deployment_plan(clients, workflow, **workflow_input):
|
|||
tripleoclients = clients.tripleoclient
|
||||
queue_name = workflow_input['queue_name']
|
||||
|
||||
execution = base.start_workflow(
|
||||
workflow_client, workflow,
|
||||
workflow_input=workflow_input
|
||||
)
|
||||
|
||||
with tripleoclients.messaging_websocket(queue_name) as ws:
|
||||
execution = base.start_workflow(
|
||||
workflow_client, workflow,
|
||||
workflow_input=workflow_input
|
||||
)
|
||||
|
||||
for payload in base.wait_for_messages(workflow_client, ws, execution,
|
||||
_WORKFLOW_TIMEOUT):
|
||||
if 'message' in payload:
|
||||
|
|
|
@ -26,13 +26,13 @@ def delete_node(clients, **workflow_input):
|
|||
tripleoclients = clients.tripleoclient
|
||||
queue_name = workflow_input['queue_name']
|
||||
|
||||
execution = base.start_workflow(
|
||||
workflow_client,
|
||||
'tripleo.scale.v1.delete_node',
|
||||
workflow_input=workflow_input
|
||||
)
|
||||
|
||||
with tripleoclients.messaging_websocket(queue_name) as ws:
|
||||
execution = base.start_workflow(
|
||||
workflow_client,
|
||||
'tripleo.scale.v1.delete_node',
|
||||
workflow_input=workflow_input
|
||||
)
|
||||
|
||||
for payload in base.wait_for_messages(workflow_client, ws, execution,
|
||||
360):
|
||||
if payload['status'] != "SUCCESS":
|
||||
|
|
|
@ -37,13 +37,13 @@ def delete_stack(clients, stack):
|
|||
|
||||
queue_name = workflow_input['queue_name']
|
||||
|
||||
execution = base.start_workflow(
|
||||
workflow_client,
|
||||
'tripleo.stack.v1.delete_stack',
|
||||
workflow_input=workflow_input
|
||||
)
|
||||
|
||||
with tripleoclient.messaging_websocket(queue_name) as ws:
|
||||
execution = base.start_workflow(
|
||||
workflow_client,
|
||||
'tripleo.stack.v1.delete_stack',
|
||||
workflow_input=workflow_input
|
||||
)
|
||||
|
||||
for payload in base.wait_for_messages(workflow_client, ws, execution):
|
||||
if payload['status'] != "SUCCESS":
|
||||
raise InvalidConfiguration(payload['message'])
|
||||
|
|
Loading…
Reference in New Issue