From 158bcb2cd297d9f6e02bd033857d248d4fe2ab38 Mon Sep 17 00:00:00 2001 From: Mathieu Bultel Date: Wed, 6 Dec 2017 08:39:00 +0100 Subject: [PATCH] Get message from websocket instead from zaqarclient directly Use the websocket api to get and print the real time ansible execution for minor update instead of using the zaqarclient call directly. This is safer and avoid brutal claim of the messages. Manual conflict resolution on: tripleoclient/workflows/package_update.py Closes-Bug: #1757487 Change-Id: I7e324b9e037197082c23a19b4e4b8832daaf5aee (cherry picked from commit eaa2e9ce75dbc107d9624a2a7325d3d8ec212a0a) (cherry picked from commit f31795fb7e294295d73604bbb0ab9c22103472cf) --- requirements.txt | 1 - tripleoclient/workflows/base.py | 3 ++- tripleoclient/workflows/package_update.py | 31 +++++++++-------------- 3 files changed, 14 insertions(+), 21 deletions(-) diff --git a/requirements.txt b/requirements.txt index 91306b4d0..79fe06382 100644 --- a/requirements.txt +++ b/requirements.txt @@ -15,4 +15,3 @@ six>=1.9.0 # MIT osc-lib>=1.7.0 # Apache-2.0 websocket-client>=0.32.0 # LGPLv2+ tripleo-common>=7.1.0 # Apache-2.0 -python-zaqarclient>=1.0.0 # Apache-2.0 diff --git a/tripleoclient/workflows/base.py b/tripleoclient/workflows/base.py index 7592a69fc..3a1514f94 100644 --- a/tripleoclient/workflows/base.py +++ b/tripleoclient/workflows/base.py @@ -69,7 +69,8 @@ def wait_for_messages(mistral, websocket, execution, timeout=None): # default to running and assume it is just an "in progress" # message from the workflow. # Workflows should end with SUCCESS or ERROR statuses. - if payload.get('status', 'RUNNING') != "RUNNING": + if payload.get('status', 'RUNNING') != "RUNNING" or \ + mistral.executions.get(execution.id).state != "RUNNING": raise StopIteration except exceptions.WebSocketTimeout: check_execution_status(mistral, execution.id) diff --git a/tripleoclient/workflows/package_update.py b/tripleoclient/workflows/package_update.py index 00f46d185..724a020e7 100644 --- a/tripleoclient/workflows/package_update.py +++ b/tripleoclient/workflows/package_update.py @@ -20,7 +20,6 @@ from tripleoclient import exceptions from tripleoclient import utils from tripleoclient.workflows import base -from zaqarclient.transport import errors as zaqar_errors def update(clients, **workflow_input): @@ -79,8 +78,7 @@ def update_ansible(clients, **workflow_input): workflow_client = clients.workflow_engine tripleoclients = clients.tripleoclient queue_name = workflow_input['queue_name'] - zaqar = clients.messaging - queue = zaqar.queue(workflow_input['ansible_queue_name']) + ansible_queue = workflow_input['ansible_queue_name'] with tripleoclients.messaging_websocket(queue_name) as ws: execution = base.start_workflow( @@ -88,22 +86,17 @@ def update_ansible(clients, **workflow_input): 'tripleo.package_update.v1.update_nodes', workflow_input=workflow_input ) - timeout = time.time() + 600 - # First we need to wait for the first item in the queue - while queue.stats['messages']['total'] == 0 or time.time() == timeout: - pass - # Then we can start to claim the queue - while workflow_client.executions.get(execution.id).state == 'RUNNING': - try: - claim = queue.claim(ttl=600, grace=600) - for message in claim: - pprint.pprint( - message.body['payload']['message'].splitlines()) - message.delete() - except zaqar_errors.ServiceUnavailableError: - pass - # clean the Queue - queue.delete() + + with tripleoclients.messaging_websocket(ansible_queue) as update_ws: + for payload in base.wait_for_messages(workflow_client, + update_ws, + execution): + # Need to sleep a little, to let the time for the execution + # to get the right status in between. It avoid to fall in the + # while True loop to get messages + time.sleep(5) + if payload.get('message'): + pprint.pprint(payload['message'].splitlines()) for payload in base.wait_for_messages(workflow_client, ws, execution): if payload.get('message'):