Get message from websocket instead from zaqarclient directly

Use the websocket api to get and print the real time
ansible execution for minor update instead of using the
zaqarclient call directly.
This is safer and avoid brutal claim of the messages.

Manual conflict resolution on:
tripleoclient/workflows/package_update.py

Closes-Bug: #1757487
Change-Id: I7e324b9e037197082c23a19b4e4b8832daaf5aee
(cherry picked from commit eaa2e9ce75)
(cherry picked from commit f31795fb7e)
This commit is contained in:
Mathieu Bultel 2017-12-06 08:39:00 +01:00 committed by Jiri Stransky
parent 76fd6d2b23
commit 158bcb2cd2
3 changed files with 14 additions and 21 deletions

View File

@ -15,4 +15,3 @@ six>=1.9.0 # MIT
osc-lib>=1.7.0 # Apache-2.0
websocket-client>=0.32.0 # LGPLv2+
tripleo-common>=7.1.0 # Apache-2.0
python-zaqarclient>=1.0.0 # Apache-2.0

View File

@ -69,7 +69,8 @@ def wait_for_messages(mistral, websocket, execution, timeout=None):
# default to running and assume it is just an "in progress"
# message from the workflow.
# Workflows should end with SUCCESS or ERROR statuses.
if payload.get('status', 'RUNNING') != "RUNNING":
if payload.get('status', 'RUNNING') != "RUNNING" or \
mistral.executions.get(execution.id).state != "RUNNING":
raise StopIteration
except exceptions.WebSocketTimeout:
check_execution_status(mistral, execution.id)

View File

@ -20,7 +20,6 @@ from tripleoclient import exceptions
from tripleoclient import utils
from tripleoclient.workflows import base
from zaqarclient.transport import errors as zaqar_errors
def update(clients, **workflow_input):
@ -79,8 +78,7 @@ def update_ansible(clients, **workflow_input):
workflow_client = clients.workflow_engine
tripleoclients = clients.tripleoclient
queue_name = workflow_input['queue_name']
zaqar = clients.messaging
queue = zaqar.queue(workflow_input['ansible_queue_name'])
ansible_queue = workflow_input['ansible_queue_name']
with tripleoclients.messaging_websocket(queue_name) as ws:
execution = base.start_workflow(
@ -88,22 +86,17 @@ def update_ansible(clients, **workflow_input):
'tripleo.package_update.v1.update_nodes',
workflow_input=workflow_input
)
timeout = time.time() + 600
# First we need to wait for the first item in the queue
while queue.stats['messages']['total'] == 0 or time.time() == timeout:
pass
# Then we can start to claim the queue
while workflow_client.executions.get(execution.id).state == 'RUNNING':
try:
claim = queue.claim(ttl=600, grace=600)
for message in claim:
pprint.pprint(
message.body['payload']['message'].splitlines())
message.delete()
except zaqar_errors.ServiceUnavailableError:
pass
# clean the Queue
queue.delete()
with tripleoclients.messaging_websocket(ansible_queue) as update_ws:
for payload in base.wait_for_messages(workflow_client,
update_ws,
execution):
# Need to sleep a little, to let the time for the execution
# to get the right status in between. It avoid to fall in the
# while True loop to get messages
time.sleep(5)
if payload.get('message'):
pprint.pprint(payload['message'].splitlines())
for payload in base.wait_for_messages(workflow_client, ws, execution):
if payload.get('message'):