Get message from websocket instead from zaqarclient directly
Use the websocket api to get and print the real time ansible execution for minor update instead of using the zaqarclient call directly. This is safer and avoid brutal claim of the messages. Manual conflict resolution on: tripleoclient/workflows/package_update.py Closes-Bug: #1757487 Change-Id: I7e324b9e037197082c23a19b4e4b8832daaf5aee (cherry picked from commiteaa2e9ce75
) (cherry picked from commitf31795fb7e
)
This commit is contained in:
parent
76fd6d2b23
commit
158bcb2cd2
|
@ -15,4 +15,3 @@ six>=1.9.0 # MIT
|
|||
osc-lib>=1.7.0 # Apache-2.0
|
||||
websocket-client>=0.32.0 # LGPLv2+
|
||||
tripleo-common>=7.1.0 # Apache-2.0
|
||||
python-zaqarclient>=1.0.0 # Apache-2.0
|
||||
|
|
|
@ -69,7 +69,8 @@ def wait_for_messages(mistral, websocket, execution, timeout=None):
|
|||
# default to running and assume it is just an "in progress"
|
||||
# message from the workflow.
|
||||
# Workflows should end with SUCCESS or ERROR statuses.
|
||||
if payload.get('status', 'RUNNING') != "RUNNING":
|
||||
if payload.get('status', 'RUNNING') != "RUNNING" or \
|
||||
mistral.executions.get(execution.id).state != "RUNNING":
|
||||
raise StopIteration
|
||||
except exceptions.WebSocketTimeout:
|
||||
check_execution_status(mistral, execution.id)
|
||||
|
|
|
@ -20,7 +20,6 @@ from tripleoclient import exceptions
|
|||
from tripleoclient import utils
|
||||
|
||||
from tripleoclient.workflows import base
|
||||
from zaqarclient.transport import errors as zaqar_errors
|
||||
|
||||
|
||||
def update(clients, **workflow_input):
|
||||
|
@ -79,8 +78,7 @@ def update_ansible(clients, **workflow_input):
|
|||
workflow_client = clients.workflow_engine
|
||||
tripleoclients = clients.tripleoclient
|
||||
queue_name = workflow_input['queue_name']
|
||||
zaqar = clients.messaging
|
||||
queue = zaqar.queue(workflow_input['ansible_queue_name'])
|
||||
ansible_queue = workflow_input['ansible_queue_name']
|
||||
|
||||
with tripleoclients.messaging_websocket(queue_name) as ws:
|
||||
execution = base.start_workflow(
|
||||
|
@ -88,22 +86,17 @@ def update_ansible(clients, **workflow_input):
|
|||
'tripleo.package_update.v1.update_nodes',
|
||||
workflow_input=workflow_input
|
||||
)
|
||||
timeout = time.time() + 600
|
||||
# First we need to wait for the first item in the queue
|
||||
while queue.stats['messages']['total'] == 0 or time.time() == timeout:
|
||||
pass
|
||||
# Then we can start to claim the queue
|
||||
while workflow_client.executions.get(execution.id).state == 'RUNNING':
|
||||
try:
|
||||
claim = queue.claim(ttl=600, grace=600)
|
||||
for message in claim:
|
||||
pprint.pprint(
|
||||
message.body['payload']['message'].splitlines())
|
||||
message.delete()
|
||||
except zaqar_errors.ServiceUnavailableError:
|
||||
pass
|
||||
# clean the Queue
|
||||
queue.delete()
|
||||
|
||||
with tripleoclients.messaging_websocket(ansible_queue) as update_ws:
|
||||
for payload in base.wait_for_messages(workflow_client,
|
||||
update_ws,
|
||||
execution):
|
||||
# Need to sleep a little, to let the time for the execution
|
||||
# to get the right status in between. It avoid to fall in the
|
||||
# while True loop to get messages
|
||||
time.sleep(5)
|
||||
if payload.get('message'):
|
||||
pprint.pprint(payload['message'].splitlines())
|
||||
|
||||
for payload in base.wait_for_messages(workflow_client, ws, execution):
|
||||
if payload.get('message'):
|
||||
|
|
Loading…
Reference in New Issue