Consume a zaqar queue for update to poll ansible result
Consume the update zaqar queue to get the ansible output in real time. The queue used is not the 'tripleo' queue because we need to not be disrupt by other messages. We need to claim all the zaqar messages in the queue to get a consistent ansible output Change-Id: I3682051eb719f8da9c744f7ec8be3a58f3db3f86 Closes-Bug: 1732497
This commit is contained in:
parent
2c8e76f647
commit
bb52d24649
|
@ -47,3 +47,5 @@ DEFAULT_ENV_DIRECTORY = "~/.tripleo/environments"
|
|||
TRIPLEO_PUPPET_MODULES = "/usr/share/openstack-puppet/modules/"
|
||||
PUPPET_MODULES = "/etc/puppet/modules/"
|
||||
PUPPET_BASE = "/etc/puppet/"
|
||||
# Update Queue
|
||||
UPDATE_QUEUE = 'update'
|
||||
|
|
|
@ -15,6 +15,7 @@
|
|||
|
||||
import mock
|
||||
|
||||
from tripleoclient import constants
|
||||
from tripleoclient import exceptions
|
||||
from tripleoclient.tests.v1.overcloud_update import fakes
|
||||
from tripleoclient.v1 import overcloud_update
|
||||
|
@ -116,4 +117,5 @@ class TestOvercloudUpdate(fakes.TestOvercloudUpdate):
|
|||
nodes='Compute',
|
||||
inventory_file=mock_open().read(),
|
||||
playbook='fake-playbook.yaml',
|
||||
ansible_queue_name=constants.UPDATE_QUEUE
|
||||
)
|
||||
|
|
|
@ -140,9 +140,8 @@ class UpdateOvercloud(command.Command):
|
|||
else:
|
||||
raise exceptions.InvalidConfiguration(
|
||||
"Inventory file %s can not be found." % inventory_file)
|
||||
output = package_update.update_ansible(
|
||||
package_update.update_ansible(
|
||||
clients, nodes=nodes,
|
||||
inventory_file=inventory,
|
||||
playbook=playbook
|
||||
)
|
||||
print(output)
|
||||
playbook=playbook,
|
||||
ansible_queue_name=constants.UPDATE_QUEUE)
|
||||
|
|
|
@ -57,6 +57,8 @@ def update(clients, **workflow_input):
|
|||
def update_ansible(clients, **workflow_input):
|
||||
workflow_client = clients.workflow_engine
|
||||
tripleoclients = clients.tripleoclient
|
||||
zaqar = clients.messaging
|
||||
queue = zaqar.queue(workflow_input['ansible_queue_name'])
|
||||
|
||||
with tripleoclients.messaging_websocket() as ws:
|
||||
execution = base.start_workflow(
|
||||
|
@ -64,6 +66,24 @@ def update_ansible(clients, **workflow_input):
|
|||
'tripleo.package_update.v1.update_nodes',
|
||||
workflow_input=workflow_input
|
||||
)
|
||||
timeout = time.time() + 600
|
||||
# First we need to wait for the first item in the queue
|
||||
while queue.stats['messages']['total'] == 0 or time.time() == timeout:
|
||||
pass
|
||||
# Then we can start to claim the queue
|
||||
while workflow_client.executions.get(execution.id).state == 'RUNNING':
|
||||
claim = queue.claim(ttl=600, grace=600)
|
||||
for message in claim:
|
||||
pprint.pprint(message.body['payload']['message'].splitlines())
|
||||
message.delete()
|
||||
# clean the Queue
|
||||
queue.delete()
|
||||
|
||||
for payload in base.wait_for_messages(workflow_client, ws, execution):
|
||||
assert payload['status'] == "SUCCESS", pprint.pformat(payload)
|
||||
if payload.get('message'):
|
||||
print(payload)
|
||||
|
||||
if payload['status'] == 'SUCCESS':
|
||||
print('Success')
|
||||
else:
|
||||
raise RuntimeError('Minor update failed with: {}'.format(payload))
|
||||
|
|
Loading…
Reference in New Issue