Consume a zaqar queue for update to poll ansible result

Consume the update zaqar queue to get the ansible output in
real time.
The queue used is not the 'tripleo' queue because we need
to not be disrupt by other messages.
We need to claim all the zaqar messages in the queue to
get a consistent ansible output

Cherry-Pick from bb52d24649
Change-Id: I3682051eb719f8da9c744f7ec8be3a58f3db3f86
Closes-Bug: 1732497
This commit is contained in:
Mathieu Bultel 2017-11-18 14:56:24 +01:00
parent a4878d0c97
commit 15db65eabc
4 changed files with 30 additions and 5 deletions

View File

@ -44,3 +44,5 @@ PLAN_ENVIRONMENT = 'plan-environment.yaml'
TRIPLEO_PUPPET_MODULES = "/usr/share/openstack-puppet/modules/"
PUPPET_MODULES = "/etc/puppet/modules/"
PUPPET_BASE = "/etc/puppet/"
# Update Queue
UPDATE_QUEUE = 'update'

View File

@ -16,6 +16,7 @@
import mock
import uuid
from tripleoclient import constants
from tripleoclient import exceptions
from tripleoclient.tests.v1.overcloud_update import fakes
from tripleoclient.v1 import overcloud_update
@ -117,4 +118,6 @@ class TestOvercloudUpdate(fakes.TestOvercloudUpdate):
nodes='Compute',
inventory_file=mock_open().read(),
playbook='fake-playbook.yaml',
queue_name=str(uuid.uuid4()))
queue_name=str(uuid.uuid4()),
ansible_queue_name=constants.UPDATE_QUEUE
)

View File

@ -141,9 +141,9 @@ class UpdateOvercloud(command.Command):
else:
raise exceptions.InvalidConfiguration(
"Inventory file %s can not be found." % inventory_file)
output = package_update.update_ansible(
package_update.update_ansible(
clients, nodes=nodes,
inventory_file=inventory,
playbook=playbook,
queue_name=str(uuid.uuid4()))
print(output)
queue_name=str(uuid.uuid4()),
ansible_queue_name=constants.UPDATE_QUEUE)

View File

@ -58,6 +58,8 @@ def update_ansible(clients, **workflow_input):
workflow_client = clients.workflow_engine
tripleoclients = clients.tripleoclient
queue_name = workflow_input['queue_name']
zaqar = clients.messaging
queue = zaqar.queue(workflow_input['ansible_queue_name'])
with tripleoclients.messaging_websocket(queue_name) as ws:
execution = base.start_workflow(
@ -65,6 +67,24 @@ def update_ansible(clients, **workflow_input):
'tripleo.package_update.v1.update_nodes',
workflow_input=workflow_input
)
timeout = time.time() + 600
# First we need to wait for the first item in the queue
while queue.stats['messages']['total'] == 0 or time.time() == timeout:
pass
# Then we can start to claim the queue
while workflow_client.executions.get(execution.id).state == 'RUNNING':
claim = queue.claim(ttl=600, grace=600)
for message in claim:
pprint.pprint(message.body['payload']['message'].splitlines())
message.delete()
# clean the Queue
queue.delete()
for payload in base.wait_for_messages(workflow_client, ws, execution):
assert payload['status'] == "SUCCESS", pprint.pformat(payload)
if payload.get('message'):
print(payload)
if payload['status'] == 'SUCCESS':
print('Success')
else:
raise RuntimeError('Minor update failed with: {}'.format(payload))