diff --git a/tripleoclient/exceptions.py b/tripleoclient/exceptions.py index 8e7538cbf..768d4d5f9 100644 --- a/tripleoclient/exceptions.py +++ b/tripleoclient/exceptions.py @@ -22,7 +22,10 @@ class Timeout(Exception): class WorkflowServiceError(Exception): """The service type is unknown""" - pass + + +class WebSocketTimeout(Exception): + """Timed out waiting for messages on the websocket""" class NotFound(Exception): diff --git a/tripleoclient/plugin.py b/tripleoclient/plugin.py index 7a830284b..4f6b467b2 100644 --- a/tripleoclient/plugin.py +++ b/tripleoclient/plugin.py @@ -24,6 +24,8 @@ from osc_lib import utils from swiftclient import client as swift_client import websocket +from tripleoclient import exceptions + LOG = logging.getLogger(__name__) DEFAULT_TRIPLEOCLIENT_API_VERSION = '1' @@ -124,17 +126,29 @@ class WebsocketClient(object): def recv(self): return json.loads(self._ws.recv()) - def wait_for_message(self, execution_id): + def wait_for_message(self, execution_id, timeout=None): """Wait for a message for a mistral execution ID - This blocks until a message is received on the provided queue name - with the execution ID. + This method blocks until a message is received on the message queue + with the execution ID passed in. - TODO(d0ugal): Add a timeout/break for the case when a message is - never arrives. + A timeout can be provided in seconds, if no timeout is provided it + will block forever until a message is received. If no message is + received (for example, Zaqar is down) then it will block until manually + killed. """ + + if timeout is None: + LOG.warning("Waiting for messages on queue '{}' with no timeout." + .format(self._queue_name)) + + self._ws.settimeout(timeout) + while True: - body = self.recv()['body'] + try: + body = self.recv()['body'] + except websocket.WebSocketTimeoutException: + raise exceptions.WebSocketTimeout() if body['payload']['execution']['id'] == execution_id: return body['payload'] diff --git a/tripleoclient/tests/fakes.py b/tripleoclient/tests/fakes.py index 5e92f5622..2395ee1ca 100644 --- a/tripleoclient/tests/fakes.py +++ b/tripleoclient/tests/fakes.py @@ -44,7 +44,7 @@ class FakeClientManager(object): class FakeWebSocket(object): - def wait_for_message(self, execution_id): + def wait_for_message(self, execution_id, timeout=None): return { 'status': 'SUCCESS' } diff --git a/tripleoclient/workflows/base.py b/tripleoclient/workflows/base.py index 09570fe1a..55d158a07 100644 --- a/tripleoclient/workflows/base.py +++ b/tripleoclient/workflows/base.py @@ -10,6 +10,11 @@ # License for the specific language governing permissions and limitations # under the License. import json +import logging + +from tripleoclient import exceptions + +LOG = logging.getLogger(__name__) def call_action(workflow_client, action, **input_): @@ -34,3 +39,45 @@ def start_workflow(workflow_client, identifier, workflow_input): identifier, execution.id)) return execution + + +def wait_for_message(mistral, websocket, execution, timeout=None): + """Wait for messages on a websocket. + + Given an instance of mistral client, a websocket and a Mistral execution + wait for messages on that websocket queue that match the execution ID until + the timeout is reached. + + If no timeout is provided, this method will block forever. + + If a timeout is reached, called check_execution_status which will look up + the execution on Mistral and log information about it. + """ + try: + return websocket.wait_for_message(execution.id, timeout=timeout) + except exceptions.WebSocketTimeout: + check_execution_status(mistral, execution.id) + raise + + +def check_execution_status(workflow_client, execution_id): + """Check the status of a workflow that timeout when waiting for messages + + The status will be logged. + """ + + execution = workflow_client.executions.get(execution_id) + state = execution.state + + if state == 'RUNNING': + message = ("The WebSocket timed out before the Workflow completed.") + elif state == 'SUCCESS': + message = ("The Workflow finished successfully but no messages were " + "received before the WebSocket timed out.") + elif state == 'ERROR': + message = ("The Workflow errored and no messages were received.") + else: + message = "Unknown Execution state." + + LOG.error(("Timed out waiting for messages from Execution " + "(ID: {}, State: {}). {}").format(execution_id, state, message)) diff --git a/tripleoclient/workflows/parameters.py b/tripleoclient/workflows/parameters.py index 5b9c0b9b0..05948aa58 100644 --- a/tripleoclient/workflows/parameters.py +++ b/tripleoclient/workflows/parameters.py @@ -42,6 +42,8 @@ def get_overcloud_passwords(clients, **workflow_input): ) with tripleoclients.messaging_websocket(queue_name) as ws: - message = ws.wait_for_message(execution.id) - assert message['status'] == "SUCCESS" - return message['message'] + # Getting the passwords is a quick operation, but to allow space for + # delays or heavy loads, timeout after 60 seconds. + payload = base.wait_for_message(workflow_client, ws, execution, 60) + assert payload['status'] == "SUCCESS" + return payload['message'] diff --git a/tripleoclient/workflows/plan_management.py b/tripleoclient/workflows/plan_management.py index b570cd997..64e85df83 100644 --- a/tripleoclient/workflows/plan_management.py +++ b/tripleoclient/workflows/plan_management.py @@ -19,6 +19,13 @@ from tripleoclient import exceptions from tripleoclient.workflows import base +# Plan management workflows should generally be quick. However, the creation +# of the default plan in instack has demonstrated that sometimes it can take +# several minutes. This timeout value of 6 minutes is the same as the timeout +# used in Instack. +_WORKFLOW_TIMEOUT = 360 # 6 * 60 seconds + + def _upload_templates(swift_client, container_name, tht_root, roles_file=None): """tarball up a given directory and upload it to Swift to be extracted""" @@ -47,7 +54,8 @@ def create_default_plan(clients, **workflow_input): ) with tripleoclients.messaging_websocket(queue_name) as ws: - payload = ws.wait_for_message(execution.id) + payload = base.wait_for_message(workflow_client, ws, execution, + _WORKFLOW_TIMEOUT) if payload['status'] == 'SUCCESS': print ("Default plan created") @@ -67,7 +75,8 @@ def _create_update_deployment_plan(clients, workflow, **workflow_input): ) with tripleoclients.messaging_websocket(queue_name) as ws: - return ws.wait_for_message(execution.id) + return base.wait_for_message(workflow_client, ws, execution, + _WORKFLOW_TIMEOUT) def create_deployment_plan(clients, **workflow_input):