From 3f48e24dc4948970105020140d8290788cca86c1 Mon Sep 17 00:00:00 2001 From: Winson Chan Date: Wed, 24 May 2017 22:03:31 +0000 Subject: [PATCH] Implement notification of execution events Introduce execution events and notification server and plugins for publishing these events for consumers. Event notification is defined per workflow execution and can be configured to notify on all the events or only for specific events. Change-Id: I9820bdc4792a374dad9ad5310f84cd7aaddab8ca Implements: blueprint mistral-execution-event-subscription --- mistral/api/controllers/v2/execution.py | 5 +- mistral/api/controllers/v2/resources.py | 46 +- mistral/cmd/launch.py | 9 +- mistral/config.py | 35 + mistral/engine/actions.py | 4 + mistral/engine/default_engine.py | 8 + mistral/engine/tasks.py | 53 + mistral/engine/workflows.py | 40 +- mistral/notifiers/__init__.py | 0 mistral/notifiers/base.py | 81 ++ mistral/notifiers/default_notifier.py | 44 + mistral/notifiers/notification_events.py | 82 ++ mistral/notifiers/notification_server.py | 93 ++ mistral/notifiers/publishers/__init__.py | 0 mistral/notifiers/publishers/noop.py | 31 + mistral/notifiers/publishers/webhook.py | 36 + mistral/notifiers/remote_notifier.py | 30 + mistral/rpc/clients.py | 45 +- mistral/tests/unit/api/v2/test_executions.py | 19 +- mistral/tests/unit/engine/base.py | 17 + ...test_plugins.py => test_server_plugins.py} | 13 +- mistral/tests/unit/notifiers/__init__.py | 0 mistral/tests/unit/notifiers/base.py | 47 + .../unit/notifiers/test_notifier_servers.py | 221 ++++ mistral/tests/unit/notifiers/test_notify.py | 1036 +++++++++++++++++ mistral/tests/unit/test_launcher.py | 3 +- setup.cfg | 8 + 27 files changed, 1975 insertions(+), 31 deletions(-) create mode 100644 mistral/notifiers/__init__.py create mode 100644 mistral/notifiers/base.py create mode 100644 mistral/notifiers/default_notifier.py create mode 100644 mistral/notifiers/notification_events.py create mode 100644 mistral/notifiers/notification_server.py create mode 100644 mistral/notifiers/publishers/__init__.py create mode 100644 mistral/notifiers/publishers/noop.py create mode 100644 mistral/notifiers/publishers/webhook.py create mode 100644 mistral/notifiers/remote_notifier.py rename mistral/tests/unit/executors/{test_plugins.py => test_server_plugins.py} (72%) create mode 100644 mistral/tests/unit/notifiers/__init__.py create mode 100644 mistral/tests/unit/notifiers/base.py create mode 100644 mistral/tests/unit/notifiers/test_notifier_servers.py create mode 100644 mistral/tests/unit/notifiers/test_notify.py diff --git a/mistral/api/controllers/v2/execution.py b/mistral/api/controllers/v2/execution.py index f7b14c00f..acfbc7f98 100644 --- a/mistral/api/controllers/v2/execution.py +++ b/mistral/api/controllers/v2/execution.py @@ -2,6 +2,7 @@ # Copyright 2015 - StackStorm, Inc. # Copyright 2015 Huawei Technologies Co., Ltd. # Copyright 2016 - Brocade Communications Systems, Inc. +# Copyright 2018 - Extreme Networks, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -258,8 +259,8 @@ class ExecutionsController(rest.RestController): result_exec_dict.get('workflow_namespace', ''), exec_id, result_exec_dict.get('input'), - result_exec_dict.get('description', ''), - **result_exec_dict.get('params') or {} + description=result_exec_dict.get('description', ''), + **result_exec_dict.get('params', {}) ) return resources.Execution.from_dict(result) diff --git a/mistral/api/controllers/v2/resources.py b/mistral/api/controllers/v2/resources.py index e1547e246..f4efe58c4 100644 --- a/mistral/api/controllers/v2/resources.py +++ b/mistral/api/controllers/v2/resources.py @@ -1,4 +1,5 @@ # Copyright 2013 - Mirantis, Inc. +# Copyright 2018 - Extreme Networks, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -284,18 +285,39 @@ class Execution(resource.Resource): @classmethod def sample(cls): - return cls(id='123e4567-e89b-12d3-a456-426655440000', - workflow_name='flow', - workflow_namespace='some_namespace', - workflow_id='123e4567-e89b-12d3-a456-426655441111', - description='this is the first execution.', - project_id='40a908dbddfe48ad80a87fb30fa70a03', - state='SUCCESS', - input={}, - output={}, - params={'env': {'k1': 'abc', 'k2': 123}}, - created_at='1970-01-01T00:00:00.000000', - updated_at='1970-01-01T00:00:00.000000') + return cls( + id='123e4567-e89b-12d3-a456-426655440000', + workflow_name='flow', + workflow_namespace='some_namespace', + workflow_id='123e4567-e89b-12d3-a456-426655441111', + description='this is the first execution.', + project_id='40a908dbddfe48ad80a87fb30fa70a03', + state='SUCCESS', + input={}, + output={}, + params={ + 'env': {'k1': 'abc', 'k2': 123}, + 'notify': [ + { + 'type': 'webhook', + 'url': 'http://endpoint/of/webhook', + 'headers': { + 'Content-Type': 'application/json', + 'X-Auth-Token': '123456789' + } + }, + { + 'type': 'queue', + 'topic': 'failover_queue', + 'backend': 'rabbitmq', + 'host': '127.0.0.1', + 'port': 5432 + } + ] + }, + created_at='1970-01-01T00:00:00.000000', + updated_at='1970-01-01T00:00:00.000000' + ) class Executions(resource.ResourceList): diff --git a/mistral/cmd/launch.py b/mistral/cmd/launch.py index 88926a8dc..a3ac55e5c 100644 --- a/mistral/cmd/launch.py +++ b/mistral/cmd/launch.py @@ -1,5 +1,6 @@ #!/usr/bin/env python # Copyright 2016 - Brocade Communications Systems, Inc. +# Copyright 2018 - Extreme Networks, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -46,6 +47,7 @@ from mistral import config from mistral.engine import engine_server from mistral.event_engine import event_engine_server from mistral.executors import executor_server +from mistral.notifiers import notification_server from mistral.rpc import base as rpc from mistral import version @@ -93,6 +95,10 @@ def launch_event_engine(): launch_thread(event_engine_server.get_oslo_service()) +def launch_notifier(): + launch_thread(notification_server.get_oslo_service()) + + def launch_api(): server = api_service.WSGIService('mistral_api') launch_process(server, workers=server.workers) @@ -118,7 +124,8 @@ LAUNCH_OPTIONS = { 'api': launch_api, 'engine': launch_engine, 'executor': launch_executor, - 'event-engine': launch_event_engine + 'event-engine': launch_event_engine, + 'notifier': launch_notifier } diff --git a/mistral/config.py b/mistral/config.py index 0c1108684..c3e54eea6 100644 --- a/mistral/config.py +++ b/mistral/config.py @@ -1,5 +1,6 @@ # Copyright 2013 - Mirantis, Inc. # Copyright 2016 - Brocade Communications Systems, Inc. +# Copyright 2018 - Extreme Networks, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -300,6 +301,37 @@ event_engine_opts = [ ), ] +notifier_opts = [ + cfg.StrOpt( + 'type', + choices=['local', 'remote'], + default='remote', + help=( + 'Type of notifier. Use local to run the notifier within the ' + 'engine server. Use remote if the notifier is launched as ' + 'a separate server to process events.' + ) + ), + cfg.StrOpt( + 'host', + default='0.0.0.0', + help=_('Name of the notifier node. This can be an opaque ' + 'identifier. It is not necessarily a hostname, ' + 'FQDN, or IP address.') + ), + cfg.StrOpt( + 'topic', + default='mistral_notifier', + help=_('The message topic that the notifier server listens on.') + ), + cfg.ListOpt( + 'notify', + item_type=eval, + bounds=True, + help=_('List of publishers to publish notification.') + ) +] + execution_expiration_policy_opts = [ cfg.IntOpt( 'evaluation_interval', @@ -425,6 +457,7 @@ EXECUTOR_GROUP = 'executor' SCHEDULER_GROUP = 'scheduler' CRON_TRIGGER_GROUP = 'cron_trigger' EVENT_ENGINE_GROUP = 'event_engine' +NOTIFIER_GROUP = 'notifier' PECAN_GROUP = 'pecan' COORDINATION_GROUP = 'coordination' EXECUTION_EXPIRATION_POLICY_GROUP = 'execution_expiration_policy' @@ -450,6 +483,7 @@ CONF.register_opts( group=EXECUTION_EXPIRATION_POLICY_GROUP ) CONF.register_opts(event_engine_opts, group=EVENT_ENGINE_GROUP) +CONF.register_opts(notifier_opts, group=NOTIFIER_GROUP) CONF.register_opts(pecan_opts, group=PECAN_GROUP) CONF.register_opts(coordination_opts, group=COORDINATION_GROUP) CONF.register_opts(profiler_opts, group=PROFILER_GROUP) @@ -494,6 +528,7 @@ def list_opts(): (EVENT_ENGINE_GROUP, event_engine_opts), (SCHEDULER_GROUP, scheduler_opts), (CRON_TRIGGER_GROUP, cron_trigger_opts), + (NOTIFIER_GROUP, notifier_opts), (PECAN_GROUP, pecan_opts), (COORDINATION_GROUP, coordination_opts), (EXECUTION_EXPIRATION_POLICY_GROUP, execution_expiration_policy_opts), diff --git a/mistral/engine/actions.py b/mistral/engine/actions.py index ad4bd55bd..3378a6d71 100644 --- a/mistral/engine/actions.py +++ b/mistral/engine/actions.py @@ -1,5 +1,6 @@ # Copyright 2016 - Nokia Networks. # Copyright 2016 - Brocade Communications Systems, Inc. +# Copyright 2018 - Extreme Networks, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -558,6 +559,9 @@ class WorkflowAction(Action): wf_params['env'] = parent_wf_ex.params['env'] wf_params['evaluate_env'] = parent_wf_ex.params.get('evaluate_env') + if 'notify' in parent_wf_ex.params: + wf_params['notify'] = parent_wf_ex.params['notify'] + for k, v in list(input_dict.items()): if k not in wf_spec.get_input(): wf_params[k] = v diff --git a/mistral/engine/default_engine.py b/mistral/engine/default_engine.py index 85583e01d..6ec53e07d 100644 --- a/mistral/engine/default_engine.py +++ b/mistral/engine/default_engine.py @@ -1,6 +1,7 @@ # Copyright 2013 - Mirantis, Inc. # Copyright 2015 - StackStorm, Inc. # Copyright 2016 - Brocade Communications Systems, Inc. +# Copyright 2018 - Extreme Networks, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,6 +15,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +from oslo_config import cfg from osprofiler import profiler from mistral.db import utils as db_utils @@ -42,6 +44,12 @@ class DefaultEngine(base.Engine): if wf_namespace: params['namespace'] = wf_namespace + if cfg.CONF.notifier.notify: + if 'notify' not in params or not params['notify']: + params['notify'] = [] + + params['notify'].extend(cfg.CONF.notifier.notify) + try: with db_api.transaction(): wf_ex = wf_handler.start_workflow( diff --git a/mistral/engine/tasks.py b/mistral/engine/tasks.py index 6a1d5b55e..faad0fb63 100644 --- a/mistral/engine/tasks.py +++ b/mistral/engine/tasks.py @@ -1,5 +1,6 @@ # Copyright 2016 - Nokia Networks. # Copyright 2016 - Brocade Communications Systems, Inc. +# Copyright 2018 - Extreme Networks, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -15,6 +16,7 @@ import abc import copy +from oslo_config import cfg from oslo_log import log as logging from osprofiler import profiler import six @@ -25,6 +27,8 @@ from mistral.engine import dispatcher from mistral.engine import policies from mistral import exceptions as exc from mistral import expressions as expr +from mistral.notifiers import base as notif +from mistral.notifiers import notification_events as events from mistral import utils from mistral.utils import wf_trace from mistral.workflow import base as wf_base @@ -57,6 +61,23 @@ class Task(object): self.created = False self.state_changed = False + def notify(self, old_task_state, new_task_state): + publishers = self.wf_ex.params.get('notify') + + if not publishers and not isinstance(publishers, list): + return + + notifier = notif.get_notifier(cfg.CONF.notifier.type) + event = events.identify_task_event(old_task_state, new_task_state) + + notifier.notify( + self.task_ex.id, + self.task_ex.to_dict(), + event, + self.task_ex.updated_at, + publishers + ) + def is_completed(self): return self.task_ex and states.is_completed(self.task_ex.state) @@ -177,8 +198,15 @@ class Task(object): assert self.task_ex + # Record the current task state. + old_task_state = self.task_ex.state + # Ignore if task already completed. if self.is_completed(): + # Publish task event again so subscribers know + # task completed state is being processed again. + self.notify(old_task_state, self.task_ex.state) + return # If we were unable to change the task state it means that it was @@ -205,6 +233,9 @@ class Task(object): # If workflow is paused we shouldn't schedule new commands # and mark task as processed. if states.is_paused(self.wf_ex.state): + # Publish task event even if the workflow is paused. + self.notify(old_task_state, self.task_ex.state) + return wf_ctrl = wf_base.get_controller(self.wf_ex, self.wf_spec) @@ -216,6 +247,9 @@ class Task(object): # upon its completion. self.task_ex.processed = True + # Publish task event. + self.notify(old_task_state, self.task_ex.state) + dispatcher.dispatch_workflow_commands(self.wf_ex, cmds) @profiler.trace('task-update') @@ -230,8 +264,15 @@ class Task(object): assert self.task_ex + # Record the current task state. + old_task_state = self.task_ex.state + # Ignore if task already completed. if states.is_completed(self.task_ex.state): + # Publish task event again so subscribers know + # task completed state is being processed again. + self.notify(old_task_state, self.task_ex.state) + return # Update only if state transition is valid. @@ -247,6 +288,9 @@ class Task(object): self.set_state(state, state_info) + # Publish event. + self.notify(old_task_state, self.task_ex.state) + def _before_task_start(self): policies_spec = self.task_spec.get_policies() @@ -340,6 +384,9 @@ class RegularTask(Task): self._create_task_execution() + # Publish event. + self.notify(None, self.task_ex.state) + LOG.debug( 'Starting task [workflow=%s, task=%s, init_state=%s]', self.wf_ex.name, @@ -367,8 +414,14 @@ class RegularTask(Task): 'Rerunning succeeded tasks is not supported.' ) + # Record the current task state. + old_task_state = self.task_ex.state + self.set_state(states.RUNNING, None, processed=False) + # Publish event. + self.notify(old_task_state, self.task_ex.state) + self._update_inbound_context() self._update_triggered_by() self._reset_actions() diff --git a/mistral/engine/workflows.py b/mistral/engine/workflows.py index f988434fe..1585e69c0 100644 --- a/mistral/engine/workflows.py +++ b/mistral/engine/workflows.py @@ -1,5 +1,6 @@ # Copyright 2016 - Nokia Networks. # Copyright 2016 - Brocade Communications Systems, Inc. +# Copyright 2018 - Extreme Networks, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -26,6 +27,8 @@ from mistral.engine import dispatcher from mistral.engine import utils as engine_utils from mistral import exceptions as exc from mistral.lang import parser as spec_parser +from mistral.notifiers import base as notif +from mistral.notifiers import notification_events as events from mistral.services import triggers from mistral.services import workflows as wf_service from mistral import utils @@ -61,6 +64,22 @@ class Workflow(object): else: self.wf_spec = None + def notify(self, event): + publishers = self.wf_ex.params.get('notify') + + if not publishers and not isinstance(publishers, list): + return + + notifier = notif.get_notifier(cfg.CONF.notifier.type) + + notifier.notify( + self.wf_ex.id, + self.wf_ex.to_dict(), + event, + self.wf_ex.updated_at, + publishers + ) + @profiler.trace('workflow-start') def start(self, wf_def, wf_ex_id, input_dict, desc='', params=None): """Start workflow. @@ -100,6 +119,9 @@ class Workflow(object): self.set_state(states.RUNNING) + # Publish event as soon as state is set to running. + self.notify(events.WORKFLOW_LAUNCHED) + wf_ctrl = wf_base.get_controller(self.wf_ex, self.wf_spec) dispatcher.dispatch_workflow_commands( @@ -113,7 +135,6 @@ class Workflow(object): :param state: New workflow state. :param msg: Additional explaining message. """ - assert self.wf_ex if state == states.SUCCESS: @@ -137,6 +158,9 @@ class Workflow(object): # Set the state of this workflow to paused. self.set_state(states.PAUSED, state_info=msg) + # Publish event. + self.notify(events.WORKFLOW_PAUSED) + # If workflow execution is a subworkflow, # schedule update to the task execution. if self.wf_ex.task_execution_id: @@ -144,8 +168,6 @@ class Workflow(object): from mistral.engine import task_handler task_handler.schedule_on_action_update(self.wf_ex) - return - def resume(self, env=None): """Resume workflow. @@ -158,6 +180,9 @@ class Workflow(object): self.set_state(states.RUNNING) + # Publish event. + self.notify(events.WORKFLOW_RESUMED) + wf_ctrl = wf_base.get_controller(self.wf_ex) # Calculate commands to process next. @@ -403,6 +428,9 @@ class Workflow(object): # Set workflow execution to success until after output is evaluated. self.set_state(states.SUCCESS, msg) + # Publish event. + self.notify(events.WORKFLOW_SUCCEEDED) + if self.wf_ex.task_execution_id: self._send_result_to_parent_workflow() @@ -448,6 +476,9 @@ class Workflow(object): self.wf_ex.output = merge_dicts({'result': msg}, output_on_error) + # Publish event. + self.notify(events.WORKFLOW_FAILED) + if self.wf_ex.task_execution_id: self._send_result_to_parent_workflow() @@ -466,6 +497,9 @@ class Workflow(object): self.wf_ex.output = {'result': msg} + # Publish event. + self.notify(events.WORKFLOW_CANCELLED) + if self.wf_ex.task_execution_id: self._send_result_to_parent_workflow() diff --git a/mistral/notifiers/__init__.py b/mistral/notifiers/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/mistral/notifiers/base.py b/mistral/notifiers/base.py new file mode 100644 index 000000000..b72dc1f05 --- /dev/null +++ b/mistral/notifiers/base.py @@ -0,0 +1,81 @@ +# Copyright 2018 - Extreme Networks, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import abc +import six + +from oslo_log import log as logging +from stevedore import driver + + +LOG = logging.getLogger(__name__) + +_NOTIFIERS = {} +_NOTIFICATION_PUBLISHERS = {} + + +def cleanup(): + global _NOTIFIERS + global _NOTIFICATION_PUBLISHERS + + _NOTIFIERS = {} + _NOTIFICATION_PUBLISHERS = {} + + +def get_notifier(notifier_name): + global _NOTIFIERS + + if not _NOTIFIERS.get(notifier_name): + mgr = driver.DriverManager( + 'mistral.notifiers', + notifier_name, + invoke_on_load=True + ) + + _NOTIFIERS[notifier_name] = mgr.driver + + return _NOTIFIERS[notifier_name] + + +def get_notification_publisher(publisher_name): + global _NOTIFICATION_PUBLISHERS + + if not _NOTIFICATION_PUBLISHERS.get(publisher_name): + mgr = driver.DriverManager( + 'mistral.notification.publishers', + publisher_name, + invoke_on_load=True + ) + + _NOTIFICATION_PUBLISHERS[publisher_name] = mgr.driver + + return _NOTIFICATION_PUBLISHERS[publisher_name] + + +@six.add_metaclass(abc.ABCMeta) +class Notifier(object): + """Notifier interface.""" + + @abc.abstractmethod + def notify(self, ex_id, data, event, timestamp, **kwargs): + raise NotImplementedError() + + +@six.add_metaclass(abc.ABCMeta) +class NotificationPublisher(object): + """Notifier plugin interface.""" + + @abc.abstractmethod + def publish(self, ex_id, data, event, timestamp, **kwargs): + raise NotImplementedError() diff --git a/mistral/notifiers/default_notifier.py b/mistral/notifiers/default_notifier.py new file mode 100644 index 000000000..cf871d3a2 --- /dev/null +++ b/mistral/notifiers/default_notifier.py @@ -0,0 +1,44 @@ +# Copyright 2018 - Extreme Networks, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import copy + +from oslo_log import log as logging + +from mistral.notifiers import base + + +LOG = logging.getLogger(__name__) + + +class DefaultNotifier(base.Notifier): + """Local notifier that process notification request.""" + + def notify(self, ex_id, data, event, timestamp, publishers): + for entry in publishers: + params = copy.deepcopy(entry) + publisher_name = params.pop('type', None) + + if not publisher_name: + LOG.error('Notification publisher type is not specified.') + continue + + try: + publisher = base.get_notification_publisher(publisher_name) + publisher.publish(ex_id, data, event, timestamp, **params) + except Exception: + LOG.exception( + 'Unable to process event for publisher "%s".', + publisher_name + ) diff --git a/mistral/notifiers/notification_events.py b/mistral/notifiers/notification_events.py new file mode 100644 index 000000000..5afdd7430 --- /dev/null +++ b/mistral/notifiers/notification_events.py @@ -0,0 +1,82 @@ +# Copyright 2018 - Extreme Networks, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from mistral.workflow import states + + +WORKFLOW_LAUNCHED = 'WORKFLOW_LAUNCHED' +WORKFLOW_SUCCEEDED = 'WORKFLOW_SUCCEEDED' +WORKFLOW_FAILED = 'WORKFLOW_FAILED' +WORKFLOW_CANCELLED = 'WORKFLOW_CANCELLED' +WORKFLOW_PAUSED = 'WORKFLOW_PAUSED' +WORKFLOW_RESUMED = 'WORKFLOW_RESUMED' + +WORKFLOWS = [ + WORKFLOW_LAUNCHED, + WORKFLOW_SUCCEEDED, + WORKFLOW_FAILED, + WORKFLOW_CANCELLED, + WORKFLOW_PAUSED, + WORKFLOW_RESUMED +] + +TASK_LAUNCHED = 'TASK_LAUNCHED' +TASK_SUCCEEDED = 'TASK_SUCCEEDED' +TASK_FAILED = 'TASK_FAILED' +TASK_CANCELLED = 'TASK_CANCELLED' +TASK_PAUSED = 'TASK_PAUSED' +TASK_RESUMED = 'TASK_RESUMED' + +TASKS = [ + TASK_LAUNCHED, + TASK_SUCCEEDED, + TASK_FAILED, + TASK_CANCELLED, + TASK_PAUSED, + TASK_RESUMED +] + +EVENTS = WORKFLOWS + TASKS + +TASK_STATE_TRANSITION_MAP = { + states.RUNNING: { + 'ANY': TASK_LAUNCHED, + 'IDLE': TASK_RESUMED, + 'PAUSED': TASK_RESUMED, + 'WAITING': TASK_RESUMED + }, + states.SUCCESS: {'ANY': TASK_SUCCEEDED}, + states.ERROR: {'ANY': TASK_FAILED}, + states.CANCELLED: {'ANY': TASK_CANCELLED}, + states.PAUSED: {'ANY': TASK_PAUSED} +} + + +def identify_task_event(old_task_state, new_task_state): + event_options = ( + TASK_STATE_TRANSITION_MAP[new_task_state] + if new_task_state in TASK_STATE_TRANSITION_MAP + else {} + ) + + if not event_options: + return None + + event = ( + event_options[old_task_state] + if old_task_state and old_task_state in event_options + else event_options['ANY'] + ) + + return event diff --git a/mistral/notifiers/notification_server.py b/mistral/notifiers/notification_server.py new file mode 100644 index 000000000..f7e5e2447 --- /dev/null +++ b/mistral/notifiers/notification_server.py @@ -0,0 +1,93 @@ +# Copyright 2018 - Extreme Networks, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo_log import log as logging + +from mistral import config as cfg +from mistral.notifiers import default_notifier as notif +from mistral.rpc import base as rpc +from mistral.service import base as service_base +from mistral import utils +from mistral.utils import profiler as profiler_utils + +LOG = logging.getLogger(__name__) + + +class NotificationServer(service_base.MistralService): + + def __init__(self, notifier, setup_profiler=True): + super(NotificationServer, self).__init__( + 'notifier_group', + setup_profiler + ) + + self.notifier = notifier + self._rpc_server = None + + def start(self): + super(NotificationServer, self).start() + + if self._setup_profiler: + profiler_utils.setup('mistral-notifier', cfg.CONF.notifier.host) + + # Initialize and start RPC server. + + self._rpc_server = rpc.get_rpc_server_driver()(cfg.CONF.notifier) + self._rpc_server.register_endpoint(self) + + self._rpc_server.run(executor='threading') + + self._notify_started('Notification server started.') + + def stop(self, graceful=False): + super(NotificationServer, self).stop(graceful) + + if self._rpc_server: + self._rpc_server.stop(graceful) + + def notify(self, rpc_ctx, ex_id, data, event, timestamp, publishers): + """Receives calls over RPC to notify on notification server. + + :param rpc_ctx: RPC request context dictionary. + :param ex_id: Workflow, task, or action execution id. + :param data: Dictionary to include in the notification message. + :param event: Event being notified on. + :param timestamp: Datetime when this event occurred. + :param publishers: The list of publishers to send the notification. + """ + + LOG.info( + "Received RPC request 'notify'[ex_id=%s, event=%s, " + "timestamp=%s, data=%s, publishers=%s]", + ex_id, + event, + timestamp, + data, + utils.cut(publishers) + ) + + self.notifier.notify( + ex_id, + data, + event, + timestamp, + publishers + ) + + +def get_oslo_service(setup_profiler=True): + return NotificationServer( + notif.DefaultNotifier(), + setup_profiler=setup_profiler + ) diff --git a/mistral/notifiers/publishers/__init__.py b/mistral/notifiers/publishers/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/mistral/notifiers/publishers/noop.py b/mistral/notifiers/publishers/noop.py new file mode 100644 index 000000000..799313b51 --- /dev/null +++ b/mistral/notifiers/publishers/noop.py @@ -0,0 +1,31 @@ +# Copyright 2018 - Extreme Networks, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo_log import log as logging + +from mistral.notifiers import base + + +LOG = logging.getLogger(__name__) + + +class NoopPublisher(base.NotificationPublisher): + + def publish(self, ex_id, data, event, timestamp, **kwargs): + LOG.info( + 'The event %s for %s is published by the ' + 'noop notification publisher.', + event, + ex_id + ) diff --git a/mistral/notifiers/publishers/webhook.py b/mistral/notifiers/publishers/webhook.py new file mode 100644 index 000000000..0067433d4 --- /dev/null +++ b/mistral/notifiers/publishers/webhook.py @@ -0,0 +1,36 @@ +# Copyright 2018 - Extreme Networks, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import requests +from six.moves import http_client + +from oslo_log import log as logging + +from mistral.notifiers import base + + +LOG = logging.getLogger(__name__) + + +class WebhookPublisher(base.NotificationPublisher): + + def publish(self, ex_id, data, event, timestamp, **kwargs): + url = kwargs.get('url') + headers = kwargs.get('headers', {}) + + resp = requests.post(url, data=json.dumps(data), headers=headers) + + if resp.status_code not in [http_client.OK, http_client.CREATED]: + raise Exception(resp.text) diff --git a/mistral/notifiers/remote_notifier.py b/mistral/notifiers/remote_notifier.py new file mode 100644 index 000000000..8ea49eb61 --- /dev/null +++ b/mistral/notifiers/remote_notifier.py @@ -0,0 +1,30 @@ +# Copyright 2018 - Extreme Networks, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo_config import cfg +from oslo_log import log as logging + +from mistral.rpc import base as rpc_base +from mistral.rpc import clients as rpc_clients + + +LOG = logging.getLogger(__name__) + + +class RemoteNotifier(rpc_clients.NotifierClient): + """Notifier that passes notification request to a remote notifier.""" + + def __init__(self): + self.topic = cfg.CONF.notifier.topic + self._client = rpc_base.get_rpc_client_driver()(cfg.CONF.notifier) diff --git a/mistral/rpc/clients.py b/mistral/rpc/clients.py index 865936c0a..d69f68697 100644 --- a/mistral/rpc/clients.py +++ b/mistral/rpc/clients.py @@ -1,6 +1,7 @@ # Copyright 2014 - Mirantis, Inc. # Copyright 2015 - StackStorm, Inc. # Copyright 2017 - Brocade Communications Systems, Inc. +# Copyright 2018 - Extreme Networks, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -15,6 +16,7 @@ # limitations under the License. from oslo_config import cfg +from oslo_log import log as logging from osprofiler import profiler import threading @@ -22,9 +24,12 @@ from mistral import context as auth_ctx from mistral.engine import base as eng from mistral.event_engine import base as evt_eng from mistral.executors import base as exe +from mistral.notifiers import base as notif from mistral.rpc import base +LOG = logging.getLogger(__name__) + _ENGINE_CLIENT = None _ENGINE_CLIENT_LOCK = threading.Lock() @@ -34,6 +39,9 @@ _EXECUTOR_CLIENT_LOCK = threading.Lock() _EVENT_ENGINE_CLIENT = None _EVENT_ENGINE_CLIENT_LOCK = threading.Lock() +_NOTIFIER_CLIENT = None +_NOTIFIER_CLIENT_LOCK = threading.Lock() + def cleanup(): """Clean all the RPC clients. @@ -46,15 +54,17 @@ def cleanup(): global _ENGINE_CLIENT global _EXECUTOR_CLIENT global _EVENT_ENGINE_CLIENT + global _NOTIFIER_CLIENT _ENGINE_CLIENT = None _EXECUTOR_CLIENT = None _EVENT_ENGINE_CLIENT = None + _NOTIFIER_CLIENT = None def get_engine_client(): global _ENGINE_CLIENT - global _EVENT_ENGINE_CLIENT_LOCK + global _ENGINE_CLIENT_LOCK with _ENGINE_CLIENT_LOCK: if not _ENGINE_CLIENT: @@ -85,6 +95,17 @@ def get_event_engine_client(): return _EVENT_ENGINE_CLIENT +def get_notifier_client(): + global _NOTIFIER_CLIENT + global _NOTIFIER_CLIENT_LOCK + + with _NOTIFIER_CLIENT_LOCK: + if not _NOTIFIER_CLIENT: + _NOTIFIER_CLIENT = NotifierClient(cfg.CONF.notifier) + + return _NOTIFIER_CLIENT + + class EngineClient(eng.Engine): """RPC Engine client.""" @@ -379,3 +400,25 @@ class EventEngineClient(evt_eng.EventEngine): 'update_event_trigger', trigger=trigger, ) + + +class NotifierClient(notif.Notifier): + """RPC Notifier client.""" + + def __init__(self, rpc_conf_dict): + """Constructs an RPC client for the Notifier service.""" + self._client = base.get_rpc_client_driver()(rpc_conf_dict) + + def notify(self, ex_id, data, event, timestamp, publishers): + try: + return self._client.async_call( + auth_ctx.ctx(), + 'notify', + ex_id=ex_id, + data=data, + event=event, + timestamp=timestamp, + publishers=publishers + ) + except Exception: + LOG.exception('Unable to send notification.') diff --git a/mistral/tests/unit/api/v2/test_executions.py b/mistral/tests/unit/api/v2/test_executions.py index 973b4c557..788154994 100644 --- a/mistral/tests/unit/api/v2/test_executions.py +++ b/mistral/tests/unit/api/v2/test_executions.py @@ -2,6 +2,7 @@ # Copyright 2015 - StackStorm, Inc. # Copyright 2015 Huawei Technologies Co., Ltd. # Copyright 2016 - Brocade Communications Systems, Inc. +# Copyright 2018 - Extreme Networks, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -505,13 +506,15 @@ class TestExecutionsController(base.APITest): load_wf_ex_func.assert_not_called() + kwargs = json.loads(expected_json['params']) + kwargs['description'] = expected_json['description'] + start_wf_func.assert_called_once_with( expected_json['workflow_id'], '', None, json.loads(expected_json['input']), - expected_json['description'], - **json.loads(expected_json['params']) + **kwargs ) @mock.patch.object(rpc_clients.EngineClient, 'start_workflow') @@ -540,13 +543,15 @@ class TestExecutionsController(base.APITest): load_wf_ex_func.assert_called_once_with(expected_json['id']) + kwargs = json.loads(expected_json['params']) + kwargs['description'] = expected_json['description'] + start_wf_func.assert_called_once_with( expected_json['workflow_id'], '', expected_json['id'], json.loads(expected_json['input']), - expected_json['description'], - **json.loads(expected_json['params']) + **kwargs ) @mock.patch.object(rpc_clients.EngineClient, 'start_workflow') @@ -600,7 +605,7 @@ class TestExecutionsController(base.APITest): '', exec_dict['id'], json.loads(exec_dict['input']), - exec_dict['description'], + description=exec_dict['description'], **json.loads(exec_dict['params']) ) @@ -629,7 +634,7 @@ class TestExecutionsController(base.APITest): '', '', json.loads(exec_dict['input']), - exec_dict['description'], + description=exec_dict['description'], **json.loads(exec_dict['params']) ) @@ -659,7 +664,7 @@ class TestExecutionsController(base.APITest): '', exec_dict['id'], json.loads(exec_dict['input']), - exec_dict['description'], + description=exec_dict['description'], **json.loads(exec_dict['params']) ) diff --git a/mistral/tests/unit/engine/base.py b/mistral/tests/unit/engine/base.py index a27329329..59f7e4ab7 100644 --- a/mistral/tests/unit/engine/base.py +++ b/mistral/tests/unit/engine/base.py @@ -1,6 +1,7 @@ # Copyright 2014 - Mirantis, Inc. # Copyright 2015 - StackStorm, Inc. # Copyright 2016 - Brocade Communications Systems, Inc. +# Copyright 2018 - Extreme Networks, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -24,6 +25,7 @@ from mistral.db.v2 import api as db_api from mistral.engine import engine_server from mistral.executors import base as exe from mistral.executors import executor_server +from mistral.notifiers import notification_server as notif_server from mistral.rpc import base as rpc_base from mistral.rpc import clients as rpc_clients from mistral.tests.unit import base @@ -76,6 +78,18 @@ class EngineTestCase(base.DbTestCase): self.threads.append(eventlet.spawn(launch_service, exe_svc)) self.addCleanup(exe_svc.stop, True) + # Start remote notifier. + if cfg.CONF.notifier.type == 'remote': + LOG.info("Starting remote notifier threads...") + + self.notifier_client = rpc_clients.get_notifier_client() + + notif_svc = notif_server.get_oslo_service(setup_profiler=False) + + self.notifier = notif_svc.notifier + self.threads.append(eventlet.spawn(launch_service, notif_svc)) + self.addCleanup(notif_svc.stop, True) + # Start engine. LOG.info("Starting engine threads...") @@ -95,6 +109,9 @@ class EngineTestCase(base.DbTestCase): if cfg.CONF.executor.type == 'remote': exe_svc.wait_started() + if cfg.CONF.notifier.type == 'remote': + notif_svc.wait_started() + eng_svc.wait_started() def kill_threads(self): diff --git a/mistral/tests/unit/executors/test_plugins.py b/mistral/tests/unit/executors/test_server_plugins.py similarity index 72% rename from mistral/tests/unit/executors/test_plugins.py rename to mistral/tests/unit/executors/test_server_plugins.py index bed64790c..e33257289 100644 --- a/mistral/tests/unit/executors/test_plugins.py +++ b/mistral/tests/unit/executors/test_server_plugins.py @@ -1,4 +1,5 @@ # Copyright 2017 - Brocade Communications Systems, Inc. +# Copyright 2018 - Extreme Networks, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -13,10 +14,11 @@ # limitations under the License. from oslo_log import log as logging +from stevedore import exception as sd_exc from mistral.executors import base as exe -from mistral.executors import default_executor as d_exe -from mistral.executors import remote_executor as r_exe +from mistral.executors import default_executor as d +from mistral.executors import remote_executor as r from mistral.tests.unit.executors import base @@ -32,9 +34,12 @@ class PluginTestCase(base.ExecutorTestCase): def test_get_local_executor(self): executor = exe.get_executor('local') - self.assertIsInstance(executor, d_exe.DefaultExecutor) + self.assertIsInstance(executor, d.DefaultExecutor) def test_get_remote_executor(self): executor = exe.get_executor('remote') - self.assertIsInstance(executor, r_exe.RemoteExecutor) + self.assertIsInstance(executor, r.RemoteExecutor) + + def test_get_bad_executor(self): + self.assertRaises(sd_exc.NoMatches, exe.get_executor, 'foobar') diff --git a/mistral/tests/unit/notifiers/__init__.py b/mistral/tests/unit/notifiers/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/mistral/tests/unit/notifiers/base.py b/mistral/tests/unit/notifiers/base.py new file mode 100644 index 000000000..46db73c40 --- /dev/null +++ b/mistral/tests/unit/notifiers/base.py @@ -0,0 +1,47 @@ +# Copyright 2018 - Extreme Networks, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo_log import log as logging + +from mistral.tests.unit.engine import base as engine_test_base + + +LOG = logging.getLogger(__name__) + + +class NotifierTestCase(engine_test_base.EngineTestCase): + + def await_workflow_success(self, wf_ex_id, post_delay=1): + # Override the original wait method to add a delay to allow enough + # time for the notification events to get processed. + super(NotifierTestCase, self).await_workflow_success(wf_ex_id) + self._sleep(post_delay) + + def await_workflow_error(self, wf_ex_id, post_delay=1): + # Override the original wait method to add a delay to allow enough + # time for the notification events to get processed. + super(NotifierTestCase, self).await_workflow_error(wf_ex_id) + self._sleep(post_delay) + + def await_workflow_paused(self, wf_ex_id, post_delay=1): + # Override the original wait method to add a delay to allow enough + # time for the notification events to get processed. + super(NotifierTestCase, self).await_workflow_paused(wf_ex_id) + self._sleep(post_delay) + + def await_workflow_cancelled(self, wf_ex_id, post_delay=1): + # Override the original wait method to add a delay to allow enough + # time for the notification events to get processed. + super(NotifierTestCase, self).await_workflow_cancelled(wf_ex_id) + self._sleep(post_delay) diff --git a/mistral/tests/unit/notifiers/test_notifier_servers.py b/mistral/tests/unit/notifiers/test_notifier_servers.py new file mode 100644 index 000000000..0872afa56 --- /dev/null +++ b/mistral/tests/unit/notifiers/test_notifier_servers.py @@ -0,0 +1,221 @@ +# Copyright 2018 - Extreme Networks, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import mock + +from oslo_config import cfg +from stevedore import exception as sd_exc + +from mistral.db.v2 import api as db_api +from mistral.notifiers import base as notif +from mistral.notifiers import default_notifier as d_notif +from mistral.notifiers import notification_events as events +from mistral.notifiers import remote_notifier as r_notif +from mistral.services import workflows as wf_svc +from mistral.tests.unit.notifiers import base +from mistral.workflow import states + +# Use the set_default method to set value otherwise in certain test cases +# the change in value is not permanent. +cfg.CONF.set_default('auth_enable', False, group='pecan') + +EVENT_LOGS = [] + + +def publisher_process(ex_id, data, event, timestamp, **kwargs): + EVENT_LOGS.append((ex_id, event)) + + +def notifier_process(ex_id, data, event, timestamp, publishers): + EVENT_LOGS.append((ex_id, event)) + + +class ServerPluginTestCase(base.NotifierTestCase): + + def tearDown(self): + notif.cleanup() + super(ServerPluginTestCase, self).tearDown() + + def test_get_bad_notifier(self): + self.assertRaises(sd_exc.NoMatches, notif.get_notifier, 'foobar') + + +@mock.patch.object( + r_notif.RemoteNotifier, + 'notify', + mock.MagicMock(return_value=None) +) +class LocalNotifServerTestCase(base.NotifierTestCase): + + @classmethod + def setUpClass(cls): + super(LocalNotifServerTestCase, cls).setUpClass() + cfg.CONF.set_default('type', 'local', group='notifier') + + @classmethod + def tearDownClass(cls): + cfg.CONF.set_default('type', 'remote', group='notifier') + super(LocalNotifServerTestCase, cls).tearDownClass() + + def setUp(self): + super(LocalNotifServerTestCase, self).setUp() + self.publisher = notif.get_notification_publisher('webhook') + self.publisher.publish = mock.MagicMock(side_effect=publisher_process) + self.publisher.publish.reset_mock() + del EVENT_LOGS[:] + + def tearDown(self): + notif.cleanup() + super(LocalNotifServerTestCase, self).tearDown() + + def test_get_notifier(self): + notifier = notif.get_notifier(cfg.CONF.notifier.type) + + self.assertEqual('local', cfg.CONF.notifier.type) + self.assertIsInstance(notifier, d_notif.DefaultNotifier) + + def test_notify(self): + wf_def = """ + version: '2.0' + wf: + tasks: + t1: + action: std.noop + on-success: + - t2 + t2: + action: std.noop + """ + + wf_svc.create_workflows(wf_def) + + notif_options = [{'type': 'webhook'}] + + wf_ex = self.engine.start_workflow( + 'wf', + '', + wf_input={}, + notify=notif_options + ) + + self.await_workflow_success(wf_ex.id) + + with db_api.transaction(): + wf_ex = db_api.get_workflow_execution(wf_ex.id) + task_exs = wf_ex.task_executions + + self.assertEqual(states.SUCCESS, wf_ex.state) + self.assertIsNone(wf_ex.state_info) + self.assertEqual(2, len(task_exs)) + + t1_ex = self._assert_single_item(task_exs, name='t1') + t2_ex = self._assert_single_item(task_exs, name='t2') + + self.assertEqual(states.SUCCESS, t1_ex.state) + self.assertIsNone(t1_ex.state_info) + self.assertEqual(states.SUCCESS, t2_ex.state) + self.assertIsNone(t2_ex.state_info) + + expected_order = [ + (wf_ex.id, events.WORKFLOW_LAUNCHED), + (t1_ex.id, events.TASK_LAUNCHED), + (t1_ex.id, events.TASK_SUCCEEDED), + (t2_ex.id, events.TASK_LAUNCHED), + (t2_ex.id, events.TASK_SUCCEEDED), + (wf_ex.id, events.WORKFLOW_SUCCEEDED) + ] + + self.assertFalse(r_notif.RemoteNotifier.notify.called) + self.assertListEqual(expected_order, EVENT_LOGS) + + +@mock.patch.object( + r_notif.RemoteNotifier, + 'notify', + mock.MagicMock(side_effect=notifier_process) +) +class RemoteNotifServerTestCase(base.NotifierTestCase): + + @classmethod + def setUpClass(cls): + super(RemoteNotifServerTestCase, cls).setUpClass() + cfg.CONF.set_default('type', 'remote', group='notifier') + + def setUp(self): + super(RemoteNotifServerTestCase, self).setUp() + del EVENT_LOGS[:] + + def tearDown(self): + notif.cleanup() + super(RemoteNotifServerTestCase, self).tearDown() + + def test_get_notifier(self): + notifier = notif.get_notifier(cfg.CONF.notifier.type) + + self.assertEqual('remote', cfg.CONF.notifier.type) + self.assertIsInstance(notifier, r_notif.RemoteNotifier) + + def test_notify(self): + wf_def = """ + version: '2.0' + wf: + tasks: + t1: + action: std.noop + on-success: + - t2 + t2: + action: std.noop + """ + + wf_svc.create_workflows(wf_def) + + notif_options = [{'type': 'foobar'}] + + wf_ex = self.engine.start_workflow( + 'wf', + '', + wf_input={}, + notify=notif_options + ) + + self.await_workflow_success(wf_ex.id) + + with db_api.transaction(): + wf_ex = db_api.get_workflow_execution(wf_ex.id) + task_exs = wf_ex.task_executions + + self.assertEqual(states.SUCCESS, wf_ex.state) + self.assertIsNone(wf_ex.state_info) + self.assertEqual(2, len(task_exs)) + + t1_ex = self._assert_single_item(task_exs, name='t1') + t2_ex = self._assert_single_item(task_exs, name='t2') + + self.assertEqual(states.SUCCESS, t1_ex.state) + self.assertIsNone(t1_ex.state_info) + self.assertEqual(states.SUCCESS, t2_ex.state) + self.assertIsNone(t2_ex.state_info) + + expected_order = [ + (wf_ex.id, events.WORKFLOW_LAUNCHED), + (t1_ex.id, events.TASK_LAUNCHED), + (t1_ex.id, events.TASK_SUCCEEDED), + (t2_ex.id, events.TASK_LAUNCHED), + (t2_ex.id, events.TASK_SUCCEEDED), + (wf_ex.id, events.WORKFLOW_SUCCEEDED) + ] + + self.assertTrue(r_notif.RemoteNotifier.notify.called) + self.assertListEqual(expected_order, EVENT_LOGS) diff --git a/mistral/tests/unit/notifiers/test_notify.py b/mistral/tests/unit/notifiers/test_notify.py new file mode 100644 index 000000000..c75b70c84 --- /dev/null +++ b/mistral/tests/unit/notifiers/test_notify.py @@ -0,0 +1,1036 @@ +# Copyright 2018 - Extreme Networks, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import mock + +from oslo_config import cfg + +from mistral.db.v2 import api as db_api +from mistral.notifiers import base as notif +from mistral.notifiers import notification_events as events +from mistral.services import workbooks as wb_svc +from mistral.services import workflows as wf_svc +from mistral.tests.unit.notifiers import base +from mistral.workflow import states +from mistral_lib import actions as ml_actions + +# Use the set_default method to set value otherwise in certain test cases +# the change in value is not permanent. +cfg.CONF.set_default('auth_enable', False, group='pecan') + +EVENT_LOGS = [] + + +def log_event(ex_id, data, event, timestamp, **kwargs): + EVENT_LOGS.append((ex_id, event)) + + +class NotifyEventsTest(base.NotifierTestCase): + + def setUp(self): + super(NotifyEventsTest, self).setUp() + + self.publishers = { + 'wbhk': notif.get_notification_publisher('webhook'), + 'noop': notif.get_notification_publisher('noop') + } + + self.publishers['wbhk'].publish = mock.MagicMock(side_effect=log_event) + self.publishers['wbhk'].publish.reset_mock() + self.publishers['noop'].publish = mock.MagicMock(side_effect=log_event) + self.publishers['noop'].publish.reset_mock() + + del EVENT_LOGS[:] + + def tearDown(self): + cfg.CONF.set_default('notify', None, group='notifier') + super(NotifyEventsTest, self).tearDown() + + def test_notify_all_explicit(self): + wf_def = """ + version: '2.0' + wf: + tasks: + t1: + action: std.noop + on-success: + - t2 + t2: + action: std.noop + """ + + wf_svc.create_workflows(wf_def) + + notify_options = [ + { + 'type': 'webhook', + 'events': events.EVENTS + } + ] + + params = {'notify': notify_options} + + wf_ex = self.engine.start_workflow('wf', '', **params) + + self.await_workflow_success(wf_ex.id) + + with db_api.transaction(): + wf_ex = db_api.get_workflow_execution(wf_ex.id) + task_exs = wf_ex.task_executions + + self.assertEqual(states.SUCCESS, wf_ex.state) + self.assertIsNone(wf_ex.state_info) + self.assertEqual(2, len(task_exs)) + + t1_ex = self._assert_single_item(task_exs, name='t1') + t2_ex = self._assert_single_item(task_exs, name='t2') + + self.assertEqual(states.SUCCESS, t1_ex.state) + self.assertIsNone(t1_ex.state_info) + self.assertEqual(states.SUCCESS, t2_ex.state) + self.assertIsNone(t2_ex.state_info) + + self.assertTrue(self.publishers['wbhk'].publish.called) + self.assertEqual(6, len(EVENT_LOGS)) + self.assertIn((wf_ex.id, events.WORKFLOW_LAUNCHED), EVENT_LOGS) + self.assertIn((t1_ex.id, events.TASK_LAUNCHED), EVENT_LOGS) + self.assertIn((t1_ex.id, events.TASK_SUCCEEDED), EVENT_LOGS) + self.assertIn((t2_ex.id, events.TASK_LAUNCHED), EVENT_LOGS) + self.assertIn((t2_ex.id, events.TASK_SUCCEEDED), EVENT_LOGS) + self.assertIn((wf_ex.id, events.WORKFLOW_SUCCEEDED), EVENT_LOGS) + + def test_notify_all_implicit(self): + wf_def = """ + version: '2.0' + wf: + tasks: + t1: + action: std.noop + on-success: + - t2 + t2: + action: std.noop + """ + + wf_svc.create_workflows(wf_def) + + notify_options = [{'type': 'webhook'}] + params = {'notify': notify_options} + + wf_ex = self.engine.start_workflow('wf', '', **params) + + self.await_workflow_success(wf_ex.id) + + with db_api.transaction(): + wf_ex = db_api.get_workflow_execution(wf_ex.id) + task_exs = wf_ex.task_executions + + self.assertEqual(states.SUCCESS, wf_ex.state) + self.assertIsNone(wf_ex.state_info) + self.assertEqual(2, len(task_exs)) + + t1_ex = self._assert_single_item(task_exs, name='t1') + t2_ex = self._assert_single_item(task_exs, name='t2') + + self.assertEqual(states.SUCCESS, t1_ex.state) + self.assertIsNone(t1_ex.state_info) + self.assertEqual(states.SUCCESS, t2_ex.state) + self.assertIsNone(t2_ex.state_info) + + self.assertTrue(self.publishers['wbhk'].publish.called) + self.assertEqual(6, len(EVENT_LOGS)) + self.assertIn((wf_ex.id, events.WORKFLOW_LAUNCHED), EVENT_LOGS) + self.assertIn((t1_ex.id, events.TASK_LAUNCHED), EVENT_LOGS) + self.assertIn((t1_ex.id, events.TASK_SUCCEEDED), EVENT_LOGS) + self.assertIn((t2_ex.id, events.TASK_LAUNCHED), EVENT_LOGS) + self.assertIn((t2_ex.id, events.TASK_SUCCEEDED), EVENT_LOGS) + self.assertIn((wf_ex.id, events.WORKFLOW_SUCCEEDED), EVENT_LOGS) + + def test_notify_order(self): + wf_def = """ + version: '2.0' + wf: + tasks: + t1: + action: std.noop + on-success: + - t2 + t2: + action: std.noop + """ + + wf_svc.create_workflows(wf_def) + + notify_options = [ + {'type': 'webhook'} + ] + + params = {'notify': notify_options} + + wf_ex = self.engine.start_workflow('wf', '', **params) + + self.await_workflow_success(wf_ex.id) + + with db_api.transaction(): + wf_ex = db_api.get_workflow_execution(wf_ex.id) + task_exs = wf_ex.task_executions + + self.assertEqual(states.SUCCESS, wf_ex.state) + self.assertIsNone(wf_ex.state_info) + self.assertEqual(2, len(task_exs)) + + t1_ex = self._assert_single_item(task_exs, name='t1') + t2_ex = self._assert_single_item(task_exs, name='t2') + + self.assertEqual(states.SUCCESS, t1_ex.state) + self.assertIsNone(t1_ex.state_info) + self.assertEqual(states.SUCCESS, t2_ex.state) + self.assertIsNone(t2_ex.state_info) + + expected_order = [ + (wf_ex.id, events.WORKFLOW_LAUNCHED), + (t1_ex.id, events.TASK_LAUNCHED), + (t1_ex.id, events.TASK_SUCCEEDED), + (t2_ex.id, events.TASK_LAUNCHED), + (t2_ex.id, events.TASK_SUCCEEDED), + (wf_ex.id, events.WORKFLOW_SUCCEEDED) + ] + + self.assertTrue(self.publishers['wbhk'].publish.called) + self.assertListEqual(expected_order, EVENT_LOGS) + + def test_notify_multiple(self): + self.assertFalse(self.publishers['wbhk'].publish.called) + self.assertFalse(self.publishers['noop'].publish.called) + + wf_def = """ + version: '2.0' + wf: + tasks: + t1: + action: std.noop + on-success: + - t2 + t2: + action: std.noop + """ + + wf_svc.create_workflows(wf_def) + + notify_options = [ + {'type': 'webhook'}, + {'type': 'noop'} + ] + + params = {'notify': notify_options} + + wf_ex = self.engine.start_workflow('wf', '', **params) + + self.await_workflow_success(wf_ex.id) + + with db_api.transaction(): + wf_ex = db_api.get_workflow_execution(wf_ex.id) + task_exs = wf_ex.task_executions + + self.assertEqual(states.SUCCESS, wf_ex.state) + self.assertIsNone(wf_ex.state_info) + self.assertEqual(2, len(task_exs)) + + t1_ex = self._assert_single_item(task_exs, name='t1') + t2_ex = self._assert_single_item(task_exs, name='t2') + + self.assertEqual(states.SUCCESS, t1_ex.state) + self.assertIsNone(t1_ex.state_info) + self.assertEqual(states.SUCCESS, t2_ex.state) + self.assertIsNone(t2_ex.state_info) + + expected_order = [ + (wf_ex.id, events.WORKFLOW_LAUNCHED), + (wf_ex.id, events.WORKFLOW_LAUNCHED), + (t1_ex.id, events.TASK_LAUNCHED), + (t1_ex.id, events.TASK_LAUNCHED), + (t1_ex.id, events.TASK_SUCCEEDED), + (t1_ex.id, events.TASK_SUCCEEDED), + (t2_ex.id, events.TASK_LAUNCHED), + (t2_ex.id, events.TASK_LAUNCHED), + (t2_ex.id, events.TASK_SUCCEEDED), + (t2_ex.id, events.TASK_SUCCEEDED), + (wf_ex.id, events.WORKFLOW_SUCCEEDED), + (wf_ex.id, events.WORKFLOW_SUCCEEDED) + ] + + self.assertTrue(self.publishers['wbhk'].publish.called) + self.assertTrue(self.publishers['noop'].publish.called) + self.assertListEqual(expected_order, EVENT_LOGS) + + def test_notify_from_cfg(self): + self.assertFalse(self.publishers['wbhk'].publish.called) + self.assertFalse(self.publishers['noop'].publish.called) + + wf_def = """ + version: '2.0' + wf: + tasks: + t1: + action: std.noop + on-success: + - t2 + t2: + action: std.noop + """ + + wf_svc.create_workflows(wf_def) + + notify_options = [ + {'type': 'webhook'}, + {'type': 'noop'} + ] + + cfg.CONF.set_default( + 'notify', + json.dumps(notify_options), + group='notifier' + ) + + wf_ex = self.engine.start_workflow('wf', '') + + self.await_workflow_success(wf_ex.id) + + with db_api.transaction(): + wf_ex = db_api.get_workflow_execution(wf_ex.id) + task_exs = wf_ex.task_executions + + self.assertEqual(states.SUCCESS, wf_ex.state) + self.assertIsNone(wf_ex.state_info) + self.assertEqual(2, len(task_exs)) + + t1_ex = self._assert_single_item(task_exs, name='t1') + t2_ex = self._assert_single_item(task_exs, name='t2') + + self.assertEqual(states.SUCCESS, t1_ex.state) + self.assertIsNone(t1_ex.state_info) + self.assertEqual(states.SUCCESS, t2_ex.state) + self.assertIsNone(t2_ex.state_info) + + expected_order = [ + (wf_ex.id, events.WORKFLOW_LAUNCHED), + (wf_ex.id, events.WORKFLOW_LAUNCHED), + (t1_ex.id, events.TASK_LAUNCHED), + (t1_ex.id, events.TASK_LAUNCHED), + (t1_ex.id, events.TASK_SUCCEEDED), + (t1_ex.id, events.TASK_SUCCEEDED), + (t2_ex.id, events.TASK_LAUNCHED), + (t2_ex.id, events.TASK_LAUNCHED), + (t2_ex.id, events.TASK_SUCCEEDED), + (t2_ex.id, events.TASK_SUCCEEDED), + (wf_ex.id, events.WORKFLOW_SUCCEEDED), + (wf_ex.id, events.WORKFLOW_SUCCEEDED) + ] + + self.assertTrue(self.publishers['wbhk'].publish.called) + self.assertTrue(self.publishers['noop'].publish.called) + self.assertListEqual(expected_order, EVENT_LOGS) + + def test_notify_from_cfg_and_params(self): + self.assertFalse(self.publishers['wbhk'].publish.called) + self.assertFalse(self.publishers['noop'].publish.called) + + wf_def = """ + version: '2.0' + wf: + tasks: + t1: + action: std.noop + on-success: + - t2 + t2: + action: std.noop + """ + + wf_svc.create_workflows(wf_def) + + cfg.CONF.set_default( + 'notify', + json.dumps([{'type': 'noop'}]), + group='notifier' + ) + + params = {'notify': [{'type': 'webhook'}]} + + wf_ex = self.engine.start_workflow('wf', '', **params) + + self.await_workflow_success(wf_ex.id) + + with db_api.transaction(): + wf_ex = db_api.get_workflow_execution(wf_ex.id) + task_exs = wf_ex.task_executions + + self.assertEqual(states.SUCCESS, wf_ex.state) + self.assertIsNone(wf_ex.state_info) + self.assertEqual(2, len(task_exs)) + + t1_ex = self._assert_single_item(task_exs, name='t1') + t2_ex = self._assert_single_item(task_exs, name='t2') + + self.assertEqual(states.SUCCESS, t1_ex.state) + self.assertIsNone(t1_ex.state_info) + self.assertEqual(states.SUCCESS, t2_ex.state) + self.assertIsNone(t2_ex.state_info) + + expected_order = [ + (wf_ex.id, events.WORKFLOW_LAUNCHED), + (wf_ex.id, events.WORKFLOW_LAUNCHED), + (t1_ex.id, events.TASK_LAUNCHED), + (t1_ex.id, events.TASK_LAUNCHED), + (t1_ex.id, events.TASK_SUCCEEDED), + (t1_ex.id, events.TASK_SUCCEEDED), + (t2_ex.id, events.TASK_LAUNCHED), + (t2_ex.id, events.TASK_LAUNCHED), + (t2_ex.id, events.TASK_SUCCEEDED), + (t2_ex.id, events.TASK_SUCCEEDED), + (wf_ex.id, events.WORKFLOW_SUCCEEDED), + (wf_ex.id, events.WORKFLOW_SUCCEEDED) + ] + + self.assertTrue(self.publishers['wbhk'].publish.called) + self.assertTrue(self.publishers['noop'].publish.called) + self.assertListEqual(expected_order, EVENT_LOGS) + + def test_workbook_notify(self): + wb_def = """ + version: '2.0' + name: wb + workflows: + wf1: + tasks: + t1: + workflow: wf2 + on-success: + - t2 + t2: + action: std.noop + wf2: + tasks: + t1: + action: std.noop + """ + + wb_svc.create_workbook_v2(wb_def) + + notify_options = [{'type': 'webhook'}] + params = {'notify': notify_options} + + wf1_ex = self.engine.start_workflow('wb.wf1', '', **params) + + self.await_workflow_success(wf1_ex.id) + + with db_api.transaction(): + wf1_ex = db_api.get_workflow_execution(wf1_ex.id) + wf1_task_exs = wf1_ex.task_executions + + wf1_t1_ex = self._assert_single_item(wf1_task_exs, name='t1') + wf1_t2_ex = self._assert_single_item(wf1_task_exs, name='t2') + + wf1_t1_act_exs = db_api.get_workflow_executions( + task_execution_id=wf1_t1_ex.id + ) + + wf2_ex = wf1_t1_act_exs[0] + wf2_task_exs = wf2_ex.task_executions + + wf2_t1_ex = self._assert_single_item(wf2_task_exs, name='t1') + + self.assertEqual(states.SUCCESS, wf1_ex.state) + self.assertIsNone(wf1_ex.state_info) + self.assertEqual(2, len(wf1_task_exs)) + + self.assertEqual(states.SUCCESS, wf1_t1_ex.state) + self.assertIsNone(wf1_t1_ex.state_info) + self.assertEqual(states.SUCCESS, wf1_t2_ex.state) + self.assertIsNone(wf1_t2_ex.state_info) + + self.assertEqual(1, len(wf1_t1_act_exs)) + + self.assertEqual(states.SUCCESS, wf2_ex.state) + self.assertIsNone(wf2_ex.state_info) + self.assertEqual(1, len(wf2_task_exs)) + + self.assertEqual(states.SUCCESS, wf2_t1_ex.state) + self.assertIsNone(wf2_t1_ex.state_info) + + expected_order = [ + (wf1_ex.id, events.WORKFLOW_LAUNCHED), + (wf1_t1_ex.id, events.TASK_LAUNCHED), + (wf2_ex.id, events.WORKFLOW_LAUNCHED), + (wf2_t1_ex.id, events.TASK_LAUNCHED), + (wf2_t1_ex.id, events.TASK_SUCCEEDED), + (wf2_ex.id, events.WORKFLOW_SUCCEEDED), + (wf1_t1_ex.id, events.TASK_SUCCEEDED), + (wf1_t2_ex.id, events.TASK_LAUNCHED), + (wf1_t2_ex.id, events.TASK_SUCCEEDED), + (wf1_ex.id, events.WORKFLOW_SUCCEEDED) + ] + + self.assertTrue(self.publishers['wbhk'].publish.called) + self.assertListEqual(expected_order, EVENT_LOGS) + + def test_notify_task_error(self): + wf_def = """ + version: '2.0' + wf: + tasks: + t1: + action: std.noop + on-success: + - t2 + t2: + action: std.fail + """ + + wf_svc.create_workflows(wf_def) + + notify_options = [{'type': 'webhook'}] + params = {'notify': notify_options} + + wf_ex = self.engine.start_workflow('wf', '', **params) + + self.await_workflow_error(wf_ex.id) + + with db_api.transaction(): + wf_ex = db_api.get_workflow_execution(wf_ex.id) + task_exs = wf_ex.task_executions + + self.assertEqual(states.ERROR, wf_ex.state) + self.assertIsNotNone(wf_ex.state_info) + self.assertEqual(2, len(task_exs)) + + t1_ex = self._assert_single_item(task_exs, name='t1') + t2_ex = self._assert_single_item(task_exs, name='t2') + + self.assertEqual(states.SUCCESS, t1_ex.state) + self.assertIsNone(t1_ex.state_info) + self.assertEqual(states.ERROR, t2_ex.state) + self.assertIsNotNone(t2_ex.state_info) + + expected_order = [ + (wf_ex.id, events.WORKFLOW_LAUNCHED), + (t1_ex.id, events.TASK_LAUNCHED), + (t1_ex.id, events.TASK_SUCCEEDED), + (t2_ex.id, events.TASK_LAUNCHED), + (t2_ex.id, events.TASK_FAILED), + (wf_ex.id, events.WORKFLOW_FAILED) + ] + + self.assertTrue(self.publishers['wbhk'].publish.called) + self.assertListEqual(expected_order, EVENT_LOGS) + + def test_notify_task_transition_fail(self): + wf_def = """ + version: '2.0' + wf: + tasks: + t1: + action: std.noop + on-complete: + - fail + """ + + wf_svc.create_workflows(wf_def) + + notify_options = [{'type': 'webhook'}] + params = {'notify': notify_options} + + wf_ex = self.engine.start_workflow('wf', '', **params) + + self.await_workflow_error(wf_ex.id) + + with db_api.transaction(): + wf_ex = db_api.get_workflow_execution(wf_ex.id) + task_exs = wf_ex.task_executions + + self.assertEqual(states.ERROR, wf_ex.state) + self.assertIsNone(wf_ex.state_info) + self.assertEqual(1, len(task_exs)) + + t1_ex = self._assert_single_item(task_exs, name='t1') + + self.assertEqual(states.SUCCESS, t1_ex.state) + self.assertIsNone(t1_ex.state_info) + + expected_order = [ + (wf_ex.id, events.WORKFLOW_LAUNCHED), + (t1_ex.id, events.TASK_LAUNCHED), + (t1_ex.id, events.TASK_SUCCEEDED), + (wf_ex.id, events.WORKFLOW_FAILED) + ] + + self.assertTrue(self.publishers['wbhk'].publish.called) + self.assertListEqual(expected_order, EVENT_LOGS) + + def test_notify_with_items_task(self): + wf_def = """ + version: '2.0' + wf: + tasks: + t1: + with-items: i in <% list(range(0, 3)) %> + action: std.noop + on-success: + - t2 + t2: + action: std.noop + """ + + wf_svc.create_workflows(wf_def) + + notify_options = [{'type': 'webhook'}] + params = {'notify': notify_options} + + wf_ex = self.engine.start_workflow('wf', '', **params) + + self.await_workflow_success(wf_ex.id) + self._sleep(1) + + with db_api.transaction(): + wf_ex = db_api.get_workflow_execution(wf_ex.id) + task_exs = wf_ex.task_executions + + self.assertEqual(states.SUCCESS, wf_ex.state) + self.assertIsNone(wf_ex.state_info) + self.assertEqual(2, len(task_exs)) + + t1_ex = self._assert_single_item(task_exs, name='t1') + t2_ex = self._assert_single_item(task_exs, name='t2') + + self.assertEqual(states.SUCCESS, t1_ex.state) + self.assertIsNone(t1_ex.state_info) + self.assertEqual(states.SUCCESS, t2_ex.state) + self.assertIsNone(t2_ex.state_info) + + expected_order = [ + (wf_ex.id, events.WORKFLOW_LAUNCHED), + (t1_ex.id, events.TASK_LAUNCHED), + (t1_ex.id, events.TASK_SUCCEEDED), + (t2_ex.id, events.TASK_LAUNCHED), + (t2_ex.id, events.TASK_SUCCEEDED), + (wf_ex.id, events.WORKFLOW_SUCCEEDED) + ] + + self.assertTrue(self.publishers['wbhk'].publish.called) + self.assertListEqual(expected_order, EVENT_LOGS) + + def test_notify_pause_resume(self): + wf_def = """ + version: '2.0' + wf: + tasks: + t1: + action: std.async_noop + on-success: + - t2 + t2: + action: std.noop + """ + + wf_svc.create_workflows(wf_def) + + notify_options = [{'type': 'webhook'}] + params = {'notify': notify_options} + + wf_ex = self.engine.start_workflow('wf', '', **params) + + self.await_workflow_running(wf_ex.id) + + with db_api.transaction(): + wf_ex = db_api.get_workflow_execution(wf_ex.id) + task_exs = wf_ex.task_executions + + t1_ex = self._assert_single_item(task_exs, name='t1') + t1_act_exs = db_api.get_action_executions(task_execution_id=t1_ex.id) + + self.assertEqual(states.RUNNING, wf_ex.state) + self.assertEqual(1, len(task_exs)) + self.assertEqual(states.RUNNING, t1_ex.state) + self.assertEqual(1, len(t1_act_exs)) + self.assertEqual(states.RUNNING, t1_act_exs[0].state) + + # Pause the workflow. + self.engine.pause_workflow(wf_ex.id) + self.await_workflow_paused(wf_ex.id) + + with db_api.transaction(): + wf_ex = db_api.get_workflow_execution(wf_ex.id) + task_exs = wf_ex.task_executions + + t1_ex = self._assert_single_item(task_exs, name='t1') + t1_act_exs = db_api.get_action_executions(task_execution_id=t1_ex.id) + + # Workflow is paused but the task is still running as expected. + self.assertEqual(states.PAUSED, wf_ex.state) + self.assertEqual(1, len(task_exs)) + self.assertEqual(states.RUNNING, t1_ex.state) + self.assertEqual(1, len(t1_act_exs)) + self.assertEqual(states.RUNNING, t1_act_exs[0].state) + + expected_order = [ + (wf_ex.id, events.WORKFLOW_LAUNCHED), + (t1_ex.id, events.TASK_LAUNCHED), + (wf_ex.id, events.WORKFLOW_PAUSED) + ] + + self.assertTrue(self.publishers['wbhk'].publish.called) + self.assertListEqual(expected_order, EVENT_LOGS) + + # Complete action execution of task 1. + self.engine.on_action_complete( + t1_act_exs[0].id, + ml_actions.Result(data={'result': 'foobar'}) + ) + + self.await_workflow_paused(wf_ex.id) + + with db_api.transaction(): + wf_ex = db_api.get_workflow_execution(wf_ex.id) + task_exs = wf_ex.task_executions + + self.assertEqual(states.PAUSED, wf_ex.state) + self.assertIsNone(wf_ex.state_info) + self.assertEqual(1, len(task_exs)) + + t1_ex = self._assert_single_item(task_exs, name='t1') + + self.assertEqual(states.SUCCESS, t1_ex.state) + self.assertIsNone(t1_ex.state_info) + + expected_order = [ + (wf_ex.id, events.WORKFLOW_LAUNCHED), + (t1_ex.id, events.TASK_LAUNCHED), + (wf_ex.id, events.WORKFLOW_PAUSED), + (t1_ex.id, events.TASK_SUCCEEDED) + ] + + self.assertTrue(self.publishers['wbhk'].publish.called) + self.assertListEqual(expected_order, EVENT_LOGS) + + # Resume the workflow. + self.engine.resume_workflow(wf_ex.id) + self.await_workflow_success(wf_ex.id) + + with db_api.transaction(): + wf_ex = db_api.get_workflow_execution(wf_ex.id) + task_exs = wf_ex.task_executions + + self.assertEqual(states.SUCCESS, wf_ex.state) + self.assertIsNone(wf_ex.state_info) + self.assertEqual(2, len(task_exs)) + + t1_ex = self._assert_single_item(task_exs, name='t1') + t2_ex = self._assert_single_item(task_exs, name='t2') + + self.assertEqual(states.SUCCESS, t1_ex.state) + self.assertIsNone(t1_ex.state_info) + self.assertEqual(states.SUCCESS, t2_ex.state) + self.assertIsNone(t2_ex.state_info) + + expected_order = [ + (wf_ex.id, events.WORKFLOW_LAUNCHED), + (t1_ex.id, events.TASK_LAUNCHED), + (wf_ex.id, events.WORKFLOW_PAUSED), + (t1_ex.id, events.TASK_SUCCEEDED), + (wf_ex.id, events.WORKFLOW_RESUMED), + (t2_ex.id, events.TASK_LAUNCHED), + (t2_ex.id, events.TASK_SUCCEEDED), + (wf_ex.id, events.WORKFLOW_SUCCEEDED) + ] + + self.assertTrue(self.publishers['wbhk'].publish.called) + self.assertListEqual(expected_order, EVENT_LOGS) + + def test_notify_pause_resume_task(self): + wf_def = """ + version: '2.0' + wf: + tasks: + t1: + action: std.async_noop + on-success: + - t2 + t2: + action: std.noop + """ + + wf_svc.create_workflows(wf_def) + + notify_options = [{'type': 'webhook'}] + params = {'notify': notify_options} + + wf_ex = self.engine.start_workflow('wf', '', **params) + + self.await_workflow_running(wf_ex.id) + + with db_api.transaction(): + wf_ex = db_api.get_workflow_execution(wf_ex.id) + task_exs = wf_ex.task_executions + + t1_ex = self._assert_single_item(task_exs, name='t1') + t1_act_exs = db_api.get_action_executions(task_execution_id=t1_ex.id) + + self.assertEqual(states.RUNNING, wf_ex.state) + self.assertEqual(1, len(task_exs)) + self.assertEqual(states.RUNNING, t1_ex.state) + self.assertEqual(1, len(t1_act_exs)) + self.assertEqual(states.RUNNING, t1_act_exs[0].state) + + # Pause the action execution of task 1. + self.engine.on_action_update(t1_act_exs[0].id, states.PAUSED) + self.await_workflow_paused(wf_ex.id) + + with db_api.transaction(): + wf_ex = db_api.get_workflow_execution(wf_ex.id) + task_exs = wf_ex.task_executions + + t1_ex = self._assert_single_item(task_exs, name='t1') + t1_act_exs = db_api.get_action_executions(task_execution_id=t1_ex.id) + + self.assertEqual(states.PAUSED, wf_ex.state) + self.assertEqual(1, len(task_exs)) + self.assertEqual(states.PAUSED, t1_ex.state) + self.assertEqual(1, len(t1_act_exs)) + self.assertEqual(states.PAUSED, t1_act_exs[0].state) + + expected_order = [ + (wf_ex.id, events.WORKFLOW_LAUNCHED), + (t1_ex.id, events.TASK_LAUNCHED), + (t1_ex.id, events.TASK_PAUSED), + (wf_ex.id, events.WORKFLOW_PAUSED) + ] + + self.assertTrue(self.publishers['wbhk'].publish.called) + self.assertListEqual(expected_order, EVENT_LOGS) + + # Resume the action execution of task 1. + self.engine.on_action_update(t1_act_exs[0].id, states.RUNNING) + self.await_task_running(t1_ex.id) + + with db_api.transaction(): + wf_ex = db_api.get_workflow_execution(wf_ex.id) + task_exs = wf_ex.task_executions + + t1_ex = self._assert_single_item(task_exs, name='t1') + t1_act_exs = db_api.get_action_executions(task_execution_id=t1_ex.id) + + self.assertEqual(states.RUNNING, wf_ex.state) + self.assertEqual(1, len(task_exs)) + self.assertEqual(states.RUNNING, t1_ex.state) + self.assertEqual(1, len(t1_act_exs)) + self.assertEqual(states.RUNNING, t1_act_exs[0].state) + + # Complete action execution of task 1. + self.engine.on_action_complete( + t1_act_exs[0].id, + ml_actions.Result(data={'result': 'foobar'}) + ) + + # Wait for the workflow execution to complete. + self.await_workflow_success(wf_ex.id) + + with db_api.transaction(): + wf_ex = db_api.get_workflow_execution(wf_ex.id) + task_exs = wf_ex.task_executions + + self.assertEqual(states.SUCCESS, wf_ex.state) + self.assertIsNone(wf_ex.state_info) + self.assertEqual(2, len(task_exs)) + + t1_ex = self._assert_single_item(task_exs, name='t1') + t2_ex = self._assert_single_item(task_exs, name='t2') + + self.assertEqual(states.SUCCESS, t1_ex.state) + self.assertIsNone(t1_ex.state_info) + self.assertEqual(states.SUCCESS, t2_ex.state) + self.assertIsNone(t2_ex.state_info) + + # TASK_RESUMED comes before WORKFLOW_RESUMED because + # this test resumed the workflow with on_action_update. + expected_order = [ + (wf_ex.id, events.WORKFLOW_LAUNCHED), + (t1_ex.id, events.TASK_LAUNCHED), + (t1_ex.id, events.TASK_PAUSED), + (wf_ex.id, events.WORKFLOW_PAUSED), + (t1_ex.id, events.TASK_RESUMED), + (wf_ex.id, events.WORKFLOW_RESUMED), + (t1_ex.id, events.TASK_SUCCEEDED), + (t2_ex.id, events.TASK_LAUNCHED), + (t2_ex.id, events.TASK_SUCCEEDED), + (wf_ex.id, events.WORKFLOW_SUCCEEDED) + ] + + self.assertTrue(self.publishers['wbhk'].publish.called) + self.assertListEqual(expected_order, EVENT_LOGS) + + def test_notify_cancel(self): + wf_def = """ + version: '2.0' + wf: + tasks: + t1: + action: std.async_noop + on-success: + - t2 + t2: + action: std.noop + """ + + wf_svc.create_workflows(wf_def) + + notify_options = [{'type': 'webhook'}] + params = {'notify': notify_options} + + wf_ex = self.engine.start_workflow('wf', '', **params) + + self.await_workflow_running(wf_ex.id) + + with db_api.transaction(): + wf_ex = db_api.get_workflow_execution(wf_ex.id) + task_exs = wf_ex.task_executions + + t1_ex = self._assert_single_item(task_exs, name='t1') + t1_act_exs = db_api.get_action_executions(task_execution_id=t1_ex.id) + + self.assertEqual(states.RUNNING, wf_ex.state) + self.assertEqual(1, len(task_exs)) + self.assertEqual(states.RUNNING, t1_ex.state) + self.assertEqual(1, len(t1_act_exs)) + self.assertEqual(states.RUNNING, t1_act_exs[0].state) + + # Cancel the workflow. + self.engine.stop_workflow(wf_ex.id, states.CANCELLED) + self.await_workflow_cancelled(wf_ex.id) + + with db_api.transaction(): + wf_ex = db_api.get_workflow_execution(wf_ex.id) + task_exs = wf_ex.task_executions + + t1_ex = self._assert_single_item(task_exs, name='t1') + t1_act_exs = db_api.get_action_executions(task_execution_id=t1_ex.id) + + # Workflow is cancelled but the task is still running as expected. + self.assertEqual(states.CANCELLED, wf_ex.state) + self.assertEqual(1, len(task_exs)) + self.assertEqual(states.RUNNING, t1_ex.state) + self.assertEqual(1, len(t1_act_exs)) + self.assertEqual(states.RUNNING, t1_act_exs[0].state) + + expected_order = [ + (wf_ex.id, events.WORKFLOW_LAUNCHED), + (t1_ex.id, events.TASK_LAUNCHED), + (wf_ex.id, events.WORKFLOW_CANCELLED) + ] + + self.assertTrue(self.publishers['wbhk'].publish.called) + self.assertListEqual(expected_order, EVENT_LOGS) + + # Complete action execution of task 1. + self.engine.on_action_complete( + t1_act_exs[0].id, + ml_actions.Result(data={'result': 'foobar'}) + ) + + self.await_workflow_cancelled(wf_ex.id) + + with db_api.transaction(): + wf_ex = db_api.get_workflow_execution(wf_ex.id) + task_exs = wf_ex.task_executions + + self.assertEqual(states.CANCELLED, wf_ex.state) + self.assertIsNone(wf_ex.state_info) + self.assertEqual(1, len(task_exs)) + + t1_ex = self._assert_single_item(task_exs, name='t1') + + self.assertEqual(states.SUCCESS, t1_ex.state) + self.assertIsNone(t1_ex.state_info) + + expected_order = [ + (wf_ex.id, events.WORKFLOW_LAUNCHED), + (t1_ex.id, events.TASK_LAUNCHED), + (wf_ex.id, events.WORKFLOW_CANCELLED), + (t1_ex.id, events.TASK_SUCCEEDED) + ] + + self.assertTrue(self.publishers['wbhk'].publish.called) + self.assertListEqual(expected_order, EVENT_LOGS) + + def test_notify_cancel_task(self): + wf_def = """ + version: '2.0' + wf: + tasks: + t1: + action: std.async_noop + on-success: + - t2 + t2: + action: std.noop + """ + + wf_svc.create_workflows(wf_def) + + notify_options = [{'type': 'webhook'}] + params = {'notify': notify_options} + + wf_ex = self.engine.start_workflow('wf', '', **params) + + self.await_workflow_running(wf_ex.id) + + with db_api.transaction(): + wf_ex = db_api.get_workflow_execution(wf_ex.id) + task_exs = wf_ex.task_executions + + t1_ex = self._assert_single_item(task_exs, name='t1') + t1_act_exs = db_api.get_action_executions(task_execution_id=t1_ex.id) + + self.assertEqual(states.RUNNING, wf_ex.state) + self.assertEqual(1, len(task_exs)) + self.assertEqual(states.RUNNING, t1_ex.state) + self.assertEqual(1, len(t1_act_exs)) + self.assertEqual(states.RUNNING, t1_act_exs[0].state) + + # Cancel the action execution of task 1. + self.engine.on_action_update(t1_act_exs[0].id, states.CANCELLED) + self.await_workflow_cancelled(wf_ex.id) + + with db_api.transaction(): + wf_ex = db_api.get_workflow_execution(wf_ex.id) + task_exs = wf_ex.task_executions + + t1_ex = self._assert_single_item(task_exs, name='t1') + t1_act_exs = db_api.get_action_executions(task_execution_id=t1_ex.id) + + self.assertEqual(states.CANCELLED, wf_ex.state) + self.assertEqual(1, len(task_exs)) + self.assertEqual(states.CANCELLED, t1_ex.state) + self.assertEqual(1, len(t1_act_exs)) + self.assertEqual(states.CANCELLED, t1_act_exs[0].state) + + expected_order = [ + (wf_ex.id, events.WORKFLOW_LAUNCHED), + (t1_ex.id, events.TASK_LAUNCHED), + (t1_ex.id, events.TASK_CANCELLED), + (wf_ex.id, events.WORKFLOW_CANCELLED) + ] + + self.assertTrue(self.publishers['wbhk'].publish.called) + self.assertListEqual(expected_order, EVENT_LOGS) diff --git a/mistral/tests/unit/test_launcher.py b/mistral/tests/unit/test_launcher.py index f194a8c85..0d9711761 100644 --- a/mistral/tests/unit/test_launcher.py +++ b/mistral/tests/unit/test_launcher.py @@ -1,4 +1,5 @@ # Copyright 2017 - Brocade Communications Systems, Inc. +# Copyright 2018 - Extreme Networks, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -47,7 +48,7 @@ class ServiceLauncherTest(base.DbTestCase): api_workers = api_server.workers self._await(lambda: len(svr_proc_mgr.children.keys()) == api_workers) - self._await(lambda: len(svr_thrd_mgr.services.services) == 3) + self._await(lambda: len(svr_thrd_mgr.services.services) == 4) def test_launch_process(self): eventlet.spawn(launch.launch_any, ['api']) diff --git a/setup.cfg b/setup.cfg index 56b1adf3a..caab0062c 100644 --- a/setup.cfg +++ b/setup.cfg @@ -75,6 +75,14 @@ mistral.executors = local = mistral.executors.default_executor:DefaultExecutor remote = mistral.executors.remote_executor:RemoteExecutor +mistral.notifiers = + local = mistral.notifiers.default_notifier:DefaultNotifier + remote = mistral.notifiers.remote_notifier:RemoteNotifier + +mistral.notification.publishers = + webhook = mistral.notifiers.publishers.webhook:WebhookPublisher + noop = mistral.notifiers.publishers.noop:NoopPublisher + mistral.expression.functions = # json_pp was deprecated in Queens and will be removed in the S cycle json_pp = mistral.utils.expression_utils:json_pp_