Start websocket client before workflows

When we start a workflow in the client, we need to create the websocket
connection beforehand. If the workflow is very quick (like
create_overcloudrc), it could be finished before we subscribe to the
Zaqar queue properly, and thus we wouldn't get the message.

Change-Id: I56c5f3a094094f7ba2158d8a434122ccb496f6b4
Closes-Bug: #1794418
This commit is contained in:
Thomas Herve 2018-09-26 11:32:08 +02:00
parent 4462be0aae
commit 2a26ef2cf1
3 changed files with 66 additions and 66 deletions

View File

@ -110,13 +110,13 @@ def create_overcloudrc(clients, **workflow_input):
workflow_client = clients.workflow_engine
tripleoclients = clients.tripleoclient
execution = base.start_workflow(
workflow_client,
'tripleo.deployment.v1.create_overcloudrc',
workflow_input=workflow_input
)
with tripleoclients.messaging_websocket() as ws:
execution = base.start_workflow(
workflow_client,
'tripleo.deployment.v1.create_overcloudrc',
workflow_input=workflow_input
)
for payload in base.wait_for_messages(workflow_client, ws, execution):
# the workflow will return the overcloudrc data, an error message
# or blank.
@ -281,13 +281,13 @@ def config_download_export(clients, **workflow_input):
workflow_client = clients.workflow_engine
tripleoclients = clients.tripleoclient
execution = base.start_workflow(
workflow_client,
'tripleo.deployment.v1.config_download_export',
workflow_input=workflow_input
)
with tripleoclients.messaging_websocket() as ws:
execution = base.start_workflow(
workflow_client,
'tripleo.deployment.v1.config_download_export',
workflow_input=workflow_input
)
for payload in base.wait_for_messages(workflow_client, ws, execution,
_WORKFLOW_TIMEOUT):
if 'message' in payload:
@ -323,13 +323,13 @@ def get_deployment_status(clients, **workflow_input):
workflow_client = clients.workflow_engine
tripleoclients = clients.tripleoclient
execution = base.start_workflow(
workflow_client,
'tripleo.deployment.v1.get_deployment_status',
workflow_input=workflow_input
)
with tripleoclients.messaging_websocket() as ws:
execution = base.start_workflow(
workflow_client,
'tripleo.deployment.v1.get_deployment_status',
workflow_input=workflow_input
)
for payload in base.wait_for_messages(workflow_client, ws, execution,
_WORKFLOW_TIMEOUT):
if 'message' in payload:
@ -347,13 +347,13 @@ def get_deployment_failures(clients, **workflow_input):
workflow_client = clients.workflow_engine
tripleoclients = clients.tripleoclient
execution = base.start_workflow(
workflow_client,
'tripleo.deployment.v1.get_deployment_failures',
workflow_input=workflow_input
)
with tripleoclients.messaging_websocket() as ws:
execution = base.start_workflow(
workflow_client,
'tripleo.deployment.v1.get_deployment_failures',
workflow_input=workflow_input
)
for payload in base.wait_for_messages(workflow_client, ws, execution,
_WORKFLOW_TIMEOUT):
if 'message' in payload:

View File

@ -95,13 +95,13 @@ def delete_deployment_plan(clients, **workflow_input):
workflow_client = clients.workflow_engine
tripleoclients = clients.tripleoclient
execution = base.start_workflow(
workflow_client,
'tripleo.plan_management.v1.delete_deployment_plan',
workflow_input=workflow_input
)
with tripleoclients.messaging_websocket() as ws:
execution = base.start_workflow(
workflow_client,
'tripleo.plan_management.v1.delete_deployment_plan',
workflow_input=workflow_input
)
for payload in base.wait_for_messages(workflow_client, ws, execution,
_WORKFLOW_TIMEOUT):
if 'message' in payload:
@ -311,13 +311,13 @@ def export_deployment_plan(clients, **workflow_input):
workflow_client = clients.workflow_engine
tripleoclients = clients.tripleoclient
execution = base.start_workflow(
workflow_client,
'tripleo.plan_management.v1.export_deployment_plan',
workflow_input=workflow_input
)
with tripleoclients.messaging_websocket() as ws:
execution = base.start_workflow(
workflow_client,
'tripleo.plan_management.v1.export_deployment_plan',
workflow_input=workflow_input
)
for payload in base.wait_for_messages(workflow_client, ws, execution,
_WORKFLOW_TIMEOUT):
if 'message' in payload:

View File

@ -83,23 +83,23 @@ def fetch_logs(clients, container, server_name, timeout=None,
workflow_client = clients.workflow_engine
tripleoclients = clients.tripleoclient
execution = base.start_workflow(
workflow_client,
'tripleo.support.v1.fetch_logs',
workflow_input=workflow_input
)
with tripleoclients.messaging_websocket() as ws:
execution = base.start_workflow(
workflow_client,
'tripleo.support.v1.fetch_logs',
workflow_input=workflow_input
)
websocket = tripleoclients.messaging_websocket()
messages = base.wait_for_messages(workflow_client,
websocket,
execution,
timeout)
messages = base.wait_for_messages(workflow_client,
ws,
execution,
timeout)
for message in messages:
if message['status'] != 'SUCCESS':
raise LogFetchError(message['message'])
if message['message']:
print('{}'.format(message['message']))
for message in messages:
if message['status'] != 'SUCCESS':
raise LogFetchError(message['message'])
if message['message']:
print('{}'.format(message['message']))
def delete_container(clients, container, timeout=None, concurrency=None):
@ -122,20 +122,20 @@ def delete_container(clients, container, timeout=None, concurrency=None):
workflow_client = clients.workflow_engine
tripleoclients = clients.tripleoclient
execution = base.start_workflow(
workflow_client,
'tripleo.support.v1.delete_container',
workflow_input=workflow_input
)
with tripleoclients.messaging_websocket() as ws:
execution = base.start_workflow(
workflow_client,
'tripleo.support.v1.delete_container',
workflow_input=workflow_input
)
websocket = tripleoclients.messaging_websocket()
messages = base.wait_for_messages(workflow_client,
websocket,
execution,
timeout)
messages = base.wait_for_messages(workflow_client,
ws,
execution,
timeout)
for message in messages:
if message['status'] != 'SUCCESS':
raise ContainerDeleteFailed(message['message'])
if message['message']:
print('{}'.format(message['message']))
for message in messages:
if message['status'] != 'SUCCESS':
raise ContainerDeleteFailed(message['message'])
if message['message']:
print('{}'.format(message['message']))