Start websocket client before workflows
When we start a workflow in the client, we need to create the websocket
connection beforehand. If the workflow is very quick (like
create_overcloudrc), it could be finished before we subscribe to the
Zaqar queue properly, and thus we wouldn't get the message.
Change-Id: I56c5f3a094094f7ba2158d8a434122ccb496f6b4
Closes-Bug: #1794418
(cherry picked from commit 2a26ef2cf1
)
This commit is contained in:
parent
607d4835af
commit
dae0133410
|
@ -110,13 +110,13 @@ def create_overcloudrc(clients, **workflow_input):
|
|||
workflow_client = clients.workflow_engine
|
||||
tripleoclients = clients.tripleoclient
|
||||
|
||||
execution = base.start_workflow(
|
||||
workflow_client,
|
||||
'tripleo.deployment.v1.create_overcloudrc',
|
||||
workflow_input=workflow_input
|
||||
)
|
||||
|
||||
with tripleoclients.messaging_websocket() as ws:
|
||||
execution = base.start_workflow(
|
||||
workflow_client,
|
||||
'tripleo.deployment.v1.create_overcloudrc',
|
||||
workflow_input=workflow_input
|
||||
)
|
||||
|
||||
for payload in base.wait_for_messages(workflow_client, ws, execution):
|
||||
# the workflow will return the overcloudrc data, an error message
|
||||
# or blank.
|
||||
|
@ -276,13 +276,13 @@ def config_download_export(clients, **workflow_input):
|
|||
workflow_client = clients.workflow_engine
|
||||
tripleoclients = clients.tripleoclient
|
||||
|
||||
execution = base.start_workflow(
|
||||
workflow_client,
|
||||
'tripleo.deployment.v1.config_download_export',
|
||||
workflow_input=workflow_input
|
||||
)
|
||||
|
||||
with tripleoclients.messaging_websocket() as ws:
|
||||
execution = base.start_workflow(
|
||||
workflow_client,
|
||||
'tripleo.deployment.v1.config_download_export',
|
||||
workflow_input=workflow_input
|
||||
)
|
||||
|
||||
for payload in base.wait_for_messages(workflow_client, ws, execution,
|
||||
_WORKFLOW_TIMEOUT):
|
||||
if 'message' in payload:
|
||||
|
@ -318,13 +318,13 @@ def get_deployment_status(clients, **workflow_input):
|
|||
workflow_client = clients.workflow_engine
|
||||
tripleoclients = clients.tripleoclient
|
||||
|
||||
execution = base.start_workflow(
|
||||
workflow_client,
|
||||
'tripleo.deployment.v1.get_deployment_status',
|
||||
workflow_input=workflow_input
|
||||
)
|
||||
|
||||
with tripleoclients.messaging_websocket() as ws:
|
||||
execution = base.start_workflow(
|
||||
workflow_client,
|
||||
'tripleo.deployment.v1.get_deployment_status',
|
||||
workflow_input=workflow_input
|
||||
)
|
||||
|
||||
for payload in base.wait_for_messages(workflow_client, ws, execution,
|
||||
_WORKFLOW_TIMEOUT):
|
||||
if 'message' in payload:
|
||||
|
@ -342,13 +342,13 @@ def get_deployment_failures(clients, **workflow_input):
|
|||
workflow_client = clients.workflow_engine
|
||||
tripleoclients = clients.tripleoclient
|
||||
|
||||
execution = base.start_workflow(
|
||||
workflow_client,
|
||||
'tripleo.deployment.v1.get_deployment_failures',
|
||||
workflow_input=workflow_input
|
||||
)
|
||||
|
||||
with tripleoclients.messaging_websocket() as ws:
|
||||
execution = base.start_workflow(
|
||||
workflow_client,
|
||||
'tripleo.deployment.v1.get_deployment_failures',
|
||||
workflow_input=workflow_input
|
||||
)
|
||||
|
||||
for payload in base.wait_for_messages(workflow_client, ws, execution,
|
||||
_WORKFLOW_TIMEOUT):
|
||||
if 'message' in payload:
|
||||
|
|
|
@ -92,13 +92,13 @@ def delete_deployment_plan(clients, **workflow_input):
|
|||
workflow_client = clients.workflow_engine
|
||||
tripleoclients = clients.tripleoclient
|
||||
|
||||
execution = base.start_workflow(
|
||||
workflow_client,
|
||||
'tripleo.plan_management.v1.delete_deployment_plan',
|
||||
workflow_input=workflow_input
|
||||
)
|
||||
|
||||
with tripleoclients.messaging_websocket() as ws:
|
||||
execution = base.start_workflow(
|
||||
workflow_client,
|
||||
'tripleo.plan_management.v1.delete_deployment_plan',
|
||||
workflow_input=workflow_input
|
||||
)
|
||||
|
||||
for payload in base.wait_for_messages(workflow_client, ws, execution,
|
||||
_WORKFLOW_TIMEOUT):
|
||||
if 'message' in payload:
|
||||
|
@ -308,13 +308,13 @@ def export_deployment_plan(clients, **workflow_input):
|
|||
workflow_client = clients.workflow_engine
|
||||
tripleoclients = clients.tripleoclient
|
||||
|
||||
execution = base.start_workflow(
|
||||
workflow_client,
|
||||
'tripleo.plan_management.v1.export_deployment_plan',
|
||||
workflow_input=workflow_input
|
||||
)
|
||||
|
||||
with tripleoclients.messaging_websocket() as ws:
|
||||
execution = base.start_workflow(
|
||||
workflow_client,
|
||||
'tripleo.plan_management.v1.export_deployment_plan',
|
||||
workflow_input=workflow_input
|
||||
)
|
||||
|
||||
for payload in base.wait_for_messages(workflow_client, ws, execution,
|
||||
_WORKFLOW_TIMEOUT):
|
||||
if 'message' in payload:
|
||||
|
|
|
@ -83,23 +83,23 @@ def fetch_logs(clients, container, server_name, timeout=None,
|
|||
workflow_client = clients.workflow_engine
|
||||
tripleoclients = clients.tripleoclient
|
||||
|
||||
execution = base.start_workflow(
|
||||
workflow_client,
|
||||
'tripleo.support.v1.fetch_logs',
|
||||
workflow_input=workflow_input
|
||||
)
|
||||
with tripleoclients.messaging_websocket() as ws:
|
||||
execution = base.start_workflow(
|
||||
workflow_client,
|
||||
'tripleo.support.v1.fetch_logs',
|
||||
workflow_input=workflow_input
|
||||
)
|
||||
|
||||
websocket = tripleoclients.messaging_websocket()
|
||||
messages = base.wait_for_messages(workflow_client,
|
||||
websocket,
|
||||
execution,
|
||||
timeout)
|
||||
messages = base.wait_for_messages(workflow_client,
|
||||
ws,
|
||||
execution,
|
||||
timeout)
|
||||
|
||||
for message in messages:
|
||||
if message['status'] != 'SUCCESS':
|
||||
raise LogFetchError(message['message'])
|
||||
if message['message']:
|
||||
print('{}'.format(message['message']))
|
||||
for message in messages:
|
||||
if message['status'] != 'SUCCESS':
|
||||
raise LogFetchError(message['message'])
|
||||
if message['message']:
|
||||
print('{}'.format(message['message']))
|
||||
|
||||
|
||||
def delete_container(clients, container, timeout=None, concurrency=None):
|
||||
|
@ -122,20 +122,20 @@ def delete_container(clients, container, timeout=None, concurrency=None):
|
|||
workflow_client = clients.workflow_engine
|
||||
tripleoclients = clients.tripleoclient
|
||||
|
||||
execution = base.start_workflow(
|
||||
workflow_client,
|
||||
'tripleo.support.v1.delete_container',
|
||||
workflow_input=workflow_input
|
||||
)
|
||||
with tripleoclients.messaging_websocket() as ws:
|
||||
execution = base.start_workflow(
|
||||
workflow_client,
|
||||
'tripleo.support.v1.delete_container',
|
||||
workflow_input=workflow_input
|
||||
)
|
||||
|
||||
websocket = tripleoclients.messaging_websocket()
|
||||
messages = base.wait_for_messages(workflow_client,
|
||||
websocket,
|
||||
execution,
|
||||
timeout)
|
||||
messages = base.wait_for_messages(workflow_client,
|
||||
ws,
|
||||
execution,
|
||||
timeout)
|
||||
|
||||
for message in messages:
|
||||
if message['status'] != 'SUCCESS':
|
||||
raise ContainerDeleteFailed(message['message'])
|
||||
if message['message']:
|
||||
print('{}'.format(message['message']))
|
||||
for message in messages:
|
||||
if message['status'] != 'SUCCESS':
|
||||
raise ContainerDeleteFailed(message['message'])
|
||||
if message['message']:
|
||||
print('{}'.format(message['message']))
|
||||
|
|
Loading…
Reference in New Issue