Consume a zaqar queue for update to poll ansible result

Consume the update zaqar queue to get the ansible output in
real time.
The queue used is not the 'tripleo' queue because we need
to not be disrupt by other messages.
We need to claim all the zaqar messages in the queue to
get a consistent ansible output

Change-Id: I3682051eb719f8da9c744f7ec8be3a58f3db3f86
Closes-Bug: 1732497
This commit is contained in:
Mathieu Bultel 2017-11-18 14:56:24 +01:00
parent 2c8e76f647
commit bb52d24649
4 changed files with 28 additions and 5 deletions

View File

@ -47,3 +47,5 @@ DEFAULT_ENV_DIRECTORY = "~/.tripleo/environments"
TRIPLEO_PUPPET_MODULES = "/usr/share/openstack-puppet/modules/"
PUPPET_MODULES = "/etc/puppet/modules/"
PUPPET_BASE = "/etc/puppet/"
# Update Queue
UPDATE_QUEUE = 'update'

View File

@ -15,6 +15,7 @@
import mock
from tripleoclient import constants
from tripleoclient import exceptions
from tripleoclient.tests.v1.overcloud_update import fakes
from tripleoclient.v1 import overcloud_update
@ -116,4 +117,5 @@ class TestOvercloudUpdate(fakes.TestOvercloudUpdate):
nodes='Compute',
inventory_file=mock_open().read(),
playbook='fake-playbook.yaml',
ansible_queue_name=constants.UPDATE_QUEUE
)

View File

@ -140,9 +140,8 @@ class UpdateOvercloud(command.Command):
else:
raise exceptions.InvalidConfiguration(
"Inventory file %s can not be found." % inventory_file)
output = package_update.update_ansible(
package_update.update_ansible(
clients, nodes=nodes,
inventory_file=inventory,
playbook=playbook
)
print(output)
playbook=playbook,
ansible_queue_name=constants.UPDATE_QUEUE)

View File

@ -57,6 +57,8 @@ def update(clients, **workflow_input):
def update_ansible(clients, **workflow_input):
workflow_client = clients.workflow_engine
tripleoclients = clients.tripleoclient
zaqar = clients.messaging
queue = zaqar.queue(workflow_input['ansible_queue_name'])
with tripleoclients.messaging_websocket() as ws:
execution = base.start_workflow(
@ -64,6 +66,24 @@ def update_ansible(clients, **workflow_input):
'tripleo.package_update.v1.update_nodes',
workflow_input=workflow_input
)
timeout = time.time() + 600
# First we need to wait for the first item in the queue
while queue.stats['messages']['total'] == 0 or time.time() == timeout:
pass
# Then we can start to claim the queue
while workflow_client.executions.get(execution.id).state == 'RUNNING':
claim = queue.claim(ttl=600, grace=600)
for message in claim:
pprint.pprint(message.body['payload']['message'].splitlines())
message.delete()
# clean the Queue
queue.delete()
for payload in base.wait_for_messages(workflow_client, ws, execution):
assert payload['status'] == "SUCCESS", pprint.pformat(payload)
if payload.get('message'):
print(payload)
if payload['status'] == 'SUCCESS':
print('Success')
else:
raise RuntimeError('Minor update failed with: {}'.format(payload))