diff --git a/mistral/api/controllers/v2/execution.py b/mistral/api/controllers/v2/execution.py index f7b14c00f..acfbc7f98 100644 --- a/mistral/api/controllers/v2/execution.py +++ b/mistral/api/controllers/v2/execution.py @@ -2,6 +2,7 @@ # Copyright 2015 - StackStorm, Inc. # Copyright 2015 Huawei Technologies Co., Ltd. # Copyright 2016 - Brocade Communications Systems, Inc. +# Copyright 2018 - Extreme Networks, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -258,8 +259,8 @@ class ExecutionsController(rest.RestController): result_exec_dict.get('workflow_namespace', ''), exec_id, result_exec_dict.get('input'), - result_exec_dict.get('description', ''), - **result_exec_dict.get('params') or {} + description=result_exec_dict.get('description', ''), + **result_exec_dict.get('params', {}) ) return resources.Execution.from_dict(result) diff --git a/mistral/api/controllers/v2/resources.py b/mistral/api/controllers/v2/resources.py index e1547e246..f4efe58c4 100644 --- a/mistral/api/controllers/v2/resources.py +++ b/mistral/api/controllers/v2/resources.py @@ -1,4 +1,5 @@ # Copyright 2013 - Mirantis, Inc. +# Copyright 2018 - Extreme Networks, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -284,18 +285,39 @@ class Execution(resource.Resource): @classmethod def sample(cls): - return cls(id='123e4567-e89b-12d3-a456-426655440000', - workflow_name='flow', - workflow_namespace='some_namespace', - workflow_id='123e4567-e89b-12d3-a456-426655441111', - description='this is the first execution.', - project_id='40a908dbddfe48ad80a87fb30fa70a03', - state='SUCCESS', - input={}, - output={}, - params={'env': {'k1': 'abc', 'k2': 123}}, - created_at='1970-01-01T00:00:00.000000', - updated_at='1970-01-01T00:00:00.000000') + return cls( + id='123e4567-e89b-12d3-a456-426655440000', + workflow_name='flow', + workflow_namespace='some_namespace', + workflow_id='123e4567-e89b-12d3-a456-426655441111', + description='this is the first execution.', + project_id='40a908dbddfe48ad80a87fb30fa70a03', + state='SUCCESS', + input={}, + output={}, + params={ + 'env': {'k1': 'abc', 'k2': 123}, + 'notify': [ + { + 'type': 'webhook', + 'url': 'http://endpoint/of/webhook', + 'headers': { + 'Content-Type': 'application/json', + 'X-Auth-Token': '123456789' + } + }, + { + 'type': 'queue', + 'topic': 'failover_queue', + 'backend': 'rabbitmq', + 'host': '127.0.0.1', + 'port': 5432 + } + ] + }, + created_at='1970-01-01T00:00:00.000000', + updated_at='1970-01-01T00:00:00.000000' + ) class Executions(resource.ResourceList): diff --git a/mistral/cmd/launch.py b/mistral/cmd/launch.py index 88926a8dc..a3ac55e5c 100644 --- a/mistral/cmd/launch.py +++ b/mistral/cmd/launch.py @@ -1,5 +1,6 @@ #!/usr/bin/env python # Copyright 2016 - Brocade Communications Systems, Inc. +# Copyright 2018 - Extreme Networks, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -46,6 +47,7 @@ from mistral import config from mistral.engine import engine_server from mistral.event_engine import event_engine_server from mistral.executors import executor_server +from mistral.notifiers import notification_server from mistral.rpc import base as rpc from mistral import version @@ -93,6 +95,10 @@ def launch_event_engine(): launch_thread(event_engine_server.get_oslo_service()) +def launch_notifier(): + launch_thread(notification_server.get_oslo_service()) + + def launch_api(): server = api_service.WSGIService('mistral_api') launch_process(server, workers=server.workers) @@ -118,7 +124,8 @@ LAUNCH_OPTIONS = { 'api': launch_api, 'engine': launch_engine, 'executor': launch_executor, - 'event-engine': launch_event_engine + 'event-engine': launch_event_engine, + 'notifier': launch_notifier } diff --git a/mistral/config.py b/mistral/config.py index 0c1108684..c3e54eea6 100644 --- a/mistral/config.py +++ b/mistral/config.py @@ -1,5 +1,6 @@ # Copyright 2013 - Mirantis, Inc. # Copyright 2016 - Brocade Communications Systems, Inc. +# Copyright 2018 - Extreme Networks, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -300,6 +301,37 @@ event_engine_opts = [ ), ] +notifier_opts = [ + cfg.StrOpt( + 'type', + choices=['local', 'remote'], + default='remote', + help=( + 'Type of notifier. Use local to run the notifier within the ' + 'engine server. Use remote if the notifier is launched as ' + 'a separate server to process events.' + ) + ), + cfg.StrOpt( + 'host', + default='0.0.0.0', + help=_('Name of the notifier node. This can be an opaque ' + 'identifier. It is not necessarily a hostname, ' + 'FQDN, or IP address.') + ), + cfg.StrOpt( + 'topic', + default='mistral_notifier', + help=_('The message topic that the notifier server listens on.') + ), + cfg.ListOpt( + 'notify', + item_type=eval, + bounds=True, + help=_('List of publishers to publish notification.') + ) +] + execution_expiration_policy_opts = [ cfg.IntOpt( 'evaluation_interval', @@ -425,6 +457,7 @@ EXECUTOR_GROUP = 'executor' SCHEDULER_GROUP = 'scheduler' CRON_TRIGGER_GROUP = 'cron_trigger' EVENT_ENGINE_GROUP = 'event_engine' +NOTIFIER_GROUP = 'notifier' PECAN_GROUP = 'pecan' COORDINATION_GROUP = 'coordination' EXECUTION_EXPIRATION_POLICY_GROUP = 'execution_expiration_policy' @@ -450,6 +483,7 @@ CONF.register_opts( group=EXECUTION_EXPIRATION_POLICY_GROUP ) CONF.register_opts(event_engine_opts, group=EVENT_ENGINE_GROUP) +CONF.register_opts(notifier_opts, group=NOTIFIER_GROUP) CONF.register_opts(pecan_opts, group=PECAN_GROUP) CONF.register_opts(coordination_opts, group=COORDINATION_GROUP) CONF.register_opts(profiler_opts, group=PROFILER_GROUP) @@ -494,6 +528,7 @@ def list_opts(): (EVENT_ENGINE_GROUP, event_engine_opts), (SCHEDULER_GROUP, scheduler_opts), (CRON_TRIGGER_GROUP, cron_trigger_opts), + (NOTIFIER_GROUP, notifier_opts), (PECAN_GROUP, pecan_opts), (COORDINATION_GROUP, coordination_opts), (EXECUTION_EXPIRATION_POLICY_GROUP, execution_expiration_policy_opts), diff --git a/mistral/engine/actions.py b/mistral/engine/actions.py index ad4bd55bd..3378a6d71 100644 --- a/mistral/engine/actions.py +++ b/mistral/engine/actions.py @@ -1,5 +1,6 @@ # Copyright 2016 - Nokia Networks. # Copyright 2016 - Brocade Communications Systems, Inc. +# Copyright 2018 - Extreme Networks, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -558,6 +559,9 @@ class WorkflowAction(Action): wf_params['env'] = parent_wf_ex.params['env'] wf_params['evaluate_env'] = parent_wf_ex.params.get('evaluate_env') + if 'notify' in parent_wf_ex.params: + wf_params['notify'] = parent_wf_ex.params['notify'] + for k, v in list(input_dict.items()): if k not in wf_spec.get_input(): wf_params[k] = v diff --git a/mistral/engine/default_engine.py b/mistral/engine/default_engine.py index 85583e01d..6ec53e07d 100644 --- a/mistral/engine/default_engine.py +++ b/mistral/engine/default_engine.py @@ -1,6 +1,7 @@ # Copyright 2013 - Mirantis, Inc. # Copyright 2015 - StackStorm, Inc. # Copyright 2016 - Brocade Communications Systems, Inc. +# Copyright 2018 - Extreme Networks, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,6 +15,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +from oslo_config import cfg from osprofiler import profiler from mistral.db import utils as db_utils @@ -42,6 +44,12 @@ class DefaultEngine(base.Engine): if wf_namespace: params['namespace'] = wf_namespace + if cfg.CONF.notifier.notify: + if 'notify' not in params or not params['notify']: + params['notify'] = [] + + params['notify'].extend(cfg.CONF.notifier.notify) + try: with db_api.transaction(): wf_ex = wf_handler.start_workflow( diff --git a/mistral/engine/tasks.py b/mistral/engine/tasks.py index 6a1d5b55e..faad0fb63 100644 --- a/mistral/engine/tasks.py +++ b/mistral/engine/tasks.py @@ -1,5 +1,6 @@ # Copyright 2016 - Nokia Networks. # Copyright 2016 - Brocade Communications Systems, Inc. +# Copyright 2018 - Extreme Networks, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -15,6 +16,7 @@ import abc import copy +from oslo_config import cfg from oslo_log import log as logging from osprofiler import profiler import six @@ -25,6 +27,8 @@ from mistral.engine import dispatcher from mistral.engine import policies from mistral import exceptions as exc from mistral import expressions as expr +from mistral.notifiers import base as notif +from mistral.notifiers import notification_events as events from mistral import utils from mistral.utils import wf_trace from mistral.workflow import base as wf_base @@ -57,6 +61,23 @@ class Task(object): self.created = False self.state_changed = False + def notify(self, old_task_state, new_task_state): + publishers = self.wf_ex.params.get('notify') + + if not publishers and not isinstance(publishers, list): + return + + notifier = notif.get_notifier(cfg.CONF.notifier.type) + event = events.identify_task_event(old_task_state, new_task_state) + + notifier.notify( + self.task_ex.id, + self.task_ex.to_dict(), + event, + self.task_ex.updated_at, + publishers + ) + def is_completed(self): return self.task_ex and states.is_completed(self.task_ex.state) @@ -177,8 +198,15 @@ class Task(object): assert self.task_ex + # Record the current task state. + old_task_state = self.task_ex.state + # Ignore if task already completed. if self.is_completed(): + # Publish task event again so subscribers know + # task completed state is being processed again. + self.notify(old_task_state, self.task_ex.state) + return # If we were unable to change the task state it means that it was @@ -205,6 +233,9 @@ class Task(object): # If workflow is paused we shouldn't schedule new commands # and mark task as processed. if states.is_paused(self.wf_ex.state): + # Publish task event even if the workflow is paused. + self.notify(old_task_state, self.task_ex.state) + return wf_ctrl = wf_base.get_controller(self.wf_ex, self.wf_spec) @@ -216,6 +247,9 @@ class Task(object): # upon its completion. self.task_ex.processed = True + # Publish task event. + self.notify(old_task_state, self.task_ex.state) + dispatcher.dispatch_workflow_commands(self.wf_ex, cmds) @profiler.trace('task-update') @@ -230,8 +264,15 @@ class Task(object): assert self.task_ex + # Record the current task state. + old_task_state = self.task_ex.state + # Ignore if task already completed. if states.is_completed(self.task_ex.state): + # Publish task event again so subscribers know + # task completed state is being processed again. + self.notify(old_task_state, self.task_ex.state) + return # Update only if state transition is valid. @@ -247,6 +288,9 @@ class Task(object): self.set_state(state, state_info) + # Publish event. + self.notify(old_task_state, self.task_ex.state) + def _before_task_start(self): policies_spec = self.task_spec.get_policies() @@ -340,6 +384,9 @@ class RegularTask(Task): self._create_task_execution() + # Publish event. + self.notify(None, self.task_ex.state) + LOG.debug( 'Starting task [workflow=%s, task=%s, init_state=%s]', self.wf_ex.name, @@ -367,8 +414,14 @@ class RegularTask(Task): 'Rerunning succeeded tasks is not supported.' ) + # Record the current task state. + old_task_state = self.task_ex.state + self.set_state(states.RUNNING, None, processed=False) + # Publish event. + self.notify(old_task_state, self.task_ex.state) + self._update_inbound_context() self._update_triggered_by() self._reset_actions() diff --git a/mistral/engine/workflows.py b/mistral/engine/workflows.py index f988434fe..1585e69c0 100644 --- a/mistral/engine/workflows.py +++ b/mistral/engine/workflows.py @@ -1,5 +1,6 @@ # Copyright 2016 - Nokia Networks. # Copyright 2016 - Brocade Communications Systems, Inc. +# Copyright 2018 - Extreme Networks, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -26,6 +27,8 @@ from mistral.engine import dispatcher from mistral.engine import utils as engine_utils from mistral import exceptions as exc from mistral.lang import parser as spec_parser +from mistral.notifiers import base as notif +from mistral.notifiers import notification_events as events from mistral.services import triggers from mistral.services import workflows as wf_service from mistral import utils @@ -61,6 +64,22 @@ class Workflow(object): else: self.wf_spec = None + def notify(self, event): + publishers = self.wf_ex.params.get('notify') + + if not publishers and not isinstance(publishers, list): + return + + notifier = notif.get_notifier(cfg.CONF.notifier.type) + + notifier.notify( + self.wf_ex.id, + self.wf_ex.to_dict(), + event, + self.wf_ex.updated_at, + publishers + ) + @profiler.trace('workflow-start') def start(self, wf_def, wf_ex_id, input_dict, desc='', params=None): """Start workflow. @@ -100,6 +119,9 @@ class Workflow(object): self.set_state(states.RUNNING) + # Publish event as soon as state is set to running. + self.notify(events.WORKFLOW_LAUNCHED) + wf_ctrl = wf_base.get_controller(self.wf_ex, self.wf_spec) dispatcher.dispatch_workflow_commands( @@ -113,7 +135,6 @@ class Workflow(object): :param state: New workflow state. :param msg: Additional explaining message. """ - assert self.wf_ex if state == states.SUCCESS: @@ -137,6 +158,9 @@ class Workflow(object): # Set the state of this workflow to paused. self.set_state(states.PAUSED, state_info=msg) + # Publish event. + self.notify(events.WORKFLOW_PAUSED) + # If workflow execution is a subworkflow, # schedule update to the task execution. if self.wf_ex.task_execution_id: @@ -144,8 +168,6 @@ class Workflow(object): from mistral.engine import task_handler task_handler.schedule_on_action_update(self.wf_ex) - return - def resume(self, env=None): """Resume workflow. @@ -158,6 +180,9 @@ class Workflow(object): self.set_state(states.RUNNING) + # Publish event. + self.notify(events.WORKFLOW_RESUMED) + wf_ctrl = wf_base.get_controller(self.wf_ex) # Calculate commands to process next. @@ -403,6 +428,9 @@ class Workflow(object): # Set workflow execution to success until after output is evaluated. self.set_state(states.SUCCESS, msg) + # Publish event. + self.notify(events.WORKFLOW_SUCCEEDED) + if self.wf_ex.task_execution_id: self._send_result_to_parent_workflow() @@ -448,6 +476,9 @@ class Workflow(object): self.wf_ex.output = merge_dicts({'result': msg}, output_on_error) + # Publish event. + self.notify(events.WORKFLOW_FAILED) + if self.wf_ex.task_execution_id: self._send_result_to_parent_workflow() @@ -466,6 +497,9 @@ class Workflow(object): self.wf_ex.output = {'result': msg} + # Publish event. + self.notify(events.WORKFLOW_CANCELLED) + if self.wf_ex.task_execution_id: self._send_result_to_parent_workflow() diff --git a/mistral/notifiers/__init__.py b/mistral/notifiers/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/mistral/notifiers/base.py b/mistral/notifiers/base.py new file mode 100644 index 000000000..b72dc1f05 --- /dev/null +++ b/mistral/notifiers/base.py @@ -0,0 +1,81 @@ +# Copyright 2018 - Extreme Networks, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import abc +import six + +from oslo_log import log as logging +from stevedore import driver + + +LOG = logging.getLogger(__name__) + +_NOTIFIERS = {} +_NOTIFICATION_PUBLISHERS = {} + + +def cleanup(): + global _NOTIFIERS + global _NOTIFICATION_PUBLISHERS + + _NOTIFIERS = {} + _NOTIFICATION_PUBLISHERS = {} + + +def get_notifier(notifier_name): + global _NOTIFIERS + + if not _NOTIFIERS.get(notifier_name): + mgr = driver.DriverManager( + 'mistral.notifiers', + notifier_name, + invoke_on_load=True + ) + + _NOTIFIERS[notifier_name] = mgr.driver + + return _NOTIFIERS[notifier_name] + + +def get_notification_publisher(publisher_name): + global _NOTIFICATION_PUBLISHERS + + if not _NOTIFICATION_PUBLISHERS.get(publisher_name): + mgr = driver.DriverManager( + 'mistral.notification.publishers', + publisher_name, + invoke_on_load=True + ) + + _NOTIFICATION_PUBLISHERS[publisher_name] = mgr.driver + + return _NOTIFICATION_PUBLISHERS[publisher_name] + + +@six.add_metaclass(abc.ABCMeta) +class Notifier(object): + """Notifier interface.""" + + @abc.abstractmethod + def notify(self, ex_id, data, event, timestamp, **kwargs): + raise NotImplementedError() + + +@six.add_metaclass(abc.ABCMeta) +class NotificationPublisher(object): + """Notifier plugin interface.""" + + @abc.abstractmethod + def publish(self, ex_id, data, event, timestamp, **kwargs): + raise NotImplementedError() diff --git a/mistral/notifiers/default_notifier.py b/mistral/notifiers/default_notifier.py new file mode 100644 index 000000000..cf871d3a2 --- /dev/null +++ b/mistral/notifiers/default_notifier.py @@ -0,0 +1,44 @@ +# Copyright 2018 - Extreme Networks, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import copy + +from oslo_log import log as logging + +from mistral.notifiers import base + + +LOG = logging.getLogger(__name__) + + +class DefaultNotifier(base.Notifier): + """Local notifier that process notification request.""" + + def notify(self, ex_id, data, event, timestamp, publishers): + for entry in publishers: + params = copy.deepcopy(entry) + publisher_name = params.pop('type', None) + + if not publisher_name: + LOG.error('Notification publisher type is not specified.') + continue + + try: + publisher = base.get_notification_publisher(publisher_name) + publisher.publish(ex_id, data, event, timestamp, **params) + except Exception: + LOG.exception( + 'Unable to process event for publisher "%s".', + publisher_name + ) diff --git a/mistral/notifiers/notification_events.py b/mistral/notifiers/notification_events.py new file mode 100644 index 000000000..5afdd7430 --- /dev/null +++ b/mistral/notifiers/notification_events.py @@ -0,0 +1,82 @@ +# Copyright 2018 - Extreme Networks, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from mistral.workflow import states + + +WORKFLOW_LAUNCHED = 'WORKFLOW_LAUNCHED' +WORKFLOW_SUCCEEDED = 'WORKFLOW_SUCCEEDED' +WORKFLOW_FAILED = 'WORKFLOW_FAILED' +WORKFLOW_CANCELLED = 'WORKFLOW_CANCELLED' +WORKFLOW_PAUSED = 'WORKFLOW_PAUSED' +WORKFLOW_RESUMED = 'WORKFLOW_RESUMED' + +WORKFLOWS = [ + WORKFLOW_LAUNCHED, + WORKFLOW_SUCCEEDED, + WORKFLOW_FAILED, + WORKFLOW_CANCELLED, + WORKFLOW_PAUSED, + WORKFLOW_RESUMED +] + +TASK_LAUNCHED = 'TASK_LAUNCHED' +TASK_SUCCEEDED = 'TASK_SUCCEEDED' +TASK_FAILED = 'TASK_FAILED' +TASK_CANCELLED = 'TASK_CANCELLED' +TASK_PAUSED = 'TASK_PAUSED' +TASK_RESUMED = 'TASK_RESUMED' + +TASKS = [ + TASK_LAUNCHED, + TASK_SUCCEEDED, + TASK_FAILED, + TASK_CANCELLED, + TASK_PAUSED, + TASK_RESUMED +] + +EVENTS = WORKFLOWS + TASKS + +TASK_STATE_TRANSITION_MAP = { + states.RUNNING: { + 'ANY': TASK_LAUNCHED, + 'IDLE': TASK_RESUMED, + 'PAUSED': TASK_RESUMED, + 'WAITING': TASK_RESUMED + }, + states.SUCCESS: {'ANY': TASK_SUCCEEDED}, + states.ERROR: {'ANY': TASK_FAILED}, + states.CANCELLED: {'ANY': TASK_CANCELLED}, + states.PAUSED: {'ANY': TASK_PAUSED} +} + + +def identify_task_event(old_task_state, new_task_state): + event_options = ( + TASK_STATE_TRANSITION_MAP[new_task_state] + if new_task_state in TASK_STATE_TRANSITION_MAP + else {} + ) + + if not event_options: + return None + + event = ( + event_options[old_task_state] + if old_task_state and old_task_state in event_options + else event_options['ANY'] + ) + + return event diff --git a/mistral/notifiers/notification_server.py b/mistral/notifiers/notification_server.py new file mode 100644 index 000000000..f7e5e2447 --- /dev/null +++ b/mistral/notifiers/notification_server.py @@ -0,0 +1,93 @@ +# Copyright 2018 - Extreme Networks, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo_log import log as logging + +from mistral import config as cfg +from mistral.notifiers import default_notifier as notif +from mistral.rpc import base as rpc +from mistral.service import base as service_base +from mistral import utils +from mistral.utils import profiler as profiler_utils + +LOG = logging.getLogger(__name__) + + +class NotificationServer(service_base.MistralService): + + def __init__(self, notifier, setup_profiler=True): + super(NotificationServer, self).__init__( + 'notifier_group', + setup_profiler + ) + + self.notifier = notifier + self._rpc_server = None + + def start(self): + super(NotificationServer, self).start() + + if self._setup_profiler: + profiler_utils.setup('mistral-notifier', cfg.CONF.notifier.host) + + # Initialize and start RPC server. + + self._rpc_server = rpc.get_rpc_server_driver()(cfg.CONF.notifier) + self._rpc_server.register_endpoint(self) + + self._rpc_server.run(executor='threading') + + self._notify_started('Notification server started.') + + def stop(self, graceful=False): + super(NotificationServer, self).stop(graceful) + + if self._rpc_server: + self._rpc_server.stop(graceful) + + def notify(self, rpc_ctx, ex_id, data, event, timestamp, publishers): + """Receives calls over RPC to notify on notification server. + + :param rpc_ctx: RPC request context dictionary. + :param ex_id: Workflow, task, or action execution id. + :param data: Dictionary to include in the notification message. + :param event: Event being notified on. + :param timestamp: Datetime when this event occurred. + :param publishers: The list of publishers to send the notification. + """ + + LOG.info( + "Received RPC request 'notify'[ex_id=%s, event=%s, " + "timestamp=%s, data=%s, publishers=%s]", + ex_id, + event, + timestamp, + data, + utils.cut(publishers) + ) + + self.notifier.notify( + ex_id, + data, + event, + timestamp, + publishers + ) + + +def get_oslo_service(setup_profiler=True): + return NotificationServer( + notif.DefaultNotifier(), + setup_profiler=setup_profiler + ) diff --git a/mistral/notifiers/publishers/__init__.py b/mistral/notifiers/publishers/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/mistral/notifiers/publishers/noop.py b/mistral/notifiers/publishers/noop.py new file mode 100644 index 000000000..799313b51 --- /dev/null +++ b/mistral/notifiers/publishers/noop.py @@ -0,0 +1,31 @@ +# Copyright 2018 - Extreme Networks, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo_log import log as logging + +from mistral.notifiers import base + + +LOG = logging.getLogger(__name__) + + +class NoopPublisher(base.NotificationPublisher): + + def publish(self, ex_id, data, event, timestamp, **kwargs): + LOG.info( + 'The event %s for %s is published by the ' + 'noop notification publisher.', + event, + ex_id + ) diff --git a/mistral/notifiers/publishers/webhook.py b/mistral/notifiers/publishers/webhook.py new file mode 100644 index 000000000..0067433d4 --- /dev/null +++ b/mistral/notifiers/publishers/webhook.py @@ -0,0 +1,36 @@ +# Copyright 2018 - Extreme Networks, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import requests +from six.moves import http_client + +from oslo_log import log as logging + +from mistral.notifiers import base + + +LOG = logging.getLogger(__name__) + + +class WebhookPublisher(base.NotificationPublisher): + + def publish(self, ex_id, data, event, timestamp, **kwargs): + url = kwargs.get('url') + headers = kwargs.get('headers', {}) + + resp = requests.post(url, data=json.dumps(data), headers=headers) + + if resp.status_code not in [http_client.OK, http_client.CREATED]: + raise Exception(resp.text) diff --git a/mistral/notifiers/remote_notifier.py b/mistral/notifiers/remote_notifier.py new file mode 100644 index 000000000..8ea49eb61 --- /dev/null +++ b/mistral/notifiers/remote_notifier.py @@ -0,0 +1,30 @@ +# Copyright 2018 - Extreme Networks, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo_config import cfg +from oslo_log import log as logging + +from mistral.rpc import base as rpc_base +from mistral.rpc import clients as rpc_clients + + +LOG = logging.getLogger(__name__) + + +class RemoteNotifier(rpc_clients.NotifierClient): + """Notifier that passes notification request to a remote notifier.""" + + def __init__(self): + self.topic = cfg.CONF.notifier.topic + self._client = rpc_base.get_rpc_client_driver()(cfg.CONF.notifier) diff --git a/mistral/rpc/clients.py b/mistral/rpc/clients.py index 865936c0a..d69f68697 100644 --- a/mistral/rpc/clients.py +++ b/mistral/rpc/clients.py @@ -1,6 +1,7 @@ # Copyright 2014 - Mirantis, Inc. # Copyright 2015 - StackStorm, Inc. # Copyright 2017 - Brocade Communications Systems, Inc. +# Copyright 2018 - Extreme Networks, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -15,6 +16,7 @@ # limitations under the License. from oslo_config import cfg +from oslo_log import log as logging from osprofiler import profiler import threading @@ -22,9 +24,12 @@ from mistral import context as auth_ctx from mistral.engine import base as eng from mistral.event_engine import base as evt_eng from mistral.executors import base as exe +from mistral.notifiers import base as notif from mistral.rpc import base +LOG = logging.getLogger(__name__) + _ENGINE_CLIENT = None _ENGINE_CLIENT_LOCK = threading.Lock() @@ -34,6 +39,9 @@ _EXECUTOR_CLIENT_LOCK = threading.Lock() _EVENT_ENGINE_CLIENT = None _EVENT_ENGINE_CLIENT_LOCK = threading.Lock() +_NOTIFIER_CLIENT = None +_NOTIFIER_CLIENT_LOCK = threading.Lock() + def cleanup(): """Clean all the RPC clients. @@ -46,15 +54,17 @@ def cleanup(): global _ENGINE_CLIENT global _EXECUTOR_CLIENT global _EVENT_ENGINE_CLIENT + global _NOTIFIER_CLIENT _ENGINE_CLIENT = None _EXECUTOR_CLIENT = None _EVENT_ENGINE_CLIENT = None + _NOTIFIER_CLIENT = None def get_engine_client(): global _ENGINE_CLIENT - global _EVENT_ENGINE_CLIENT_LOCK + global _ENGINE_CLIENT_LOCK with _ENGINE_CLIENT_LOCK: if not _ENGINE_CLIENT: @@ -85,6 +95,17 @@ def get_event_engine_client(): return _EVENT_ENGINE_CLIENT +def get_notifier_client(): + global _NOTIFIER_CLIENT + global _NOTIFIER_CLIENT_LOCK + + with _NOTIFIER_CLIENT_LOCK: + if not _NOTIFIER_CLIENT: + _NOTIFIER_CLIENT = NotifierClient(cfg.CONF.notifier) + + return _NOTIFIER_CLIENT + + class EngineClient(eng.Engine): """RPC Engine client.""" @@ -379,3 +400,25 @@ class EventEngineClient(evt_eng.EventEngine): 'update_event_trigger', trigger=trigger, ) + + +class NotifierClient(notif.Notifier): + """RPC Notifier client.""" + + def __init__(self, rpc_conf_dict): + """Constructs an RPC client for the Notifier service.""" + self._client = base.get_rpc_client_driver()(rpc_conf_dict) + + def notify(self, ex_id, data, event, timestamp, publishers): + try: + return self._client.async_call( + auth_ctx.ctx(), + 'notify', + ex_id=ex_id, + data=data, + event=event, + timestamp=timestamp, + publishers=publishers + ) + except Exception: + LOG.exception('Unable to send notification.') diff --git a/mistral/tests/unit/api/v2/test_executions.py b/mistral/tests/unit/api/v2/test_executions.py index 973b4c557..788154994 100644 --- a/mistral/tests/unit/api/v2/test_executions.py +++ b/mistral/tests/unit/api/v2/test_executions.py @@ -2,6 +2,7 @@ # Copyright 2015 - StackStorm, Inc. # Copyright 2015 Huawei Technologies Co., Ltd. # Copyright 2016 - Brocade Communications Systems, Inc. +# Copyright 2018 - Extreme Networks, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -505,13 +506,15 @@ class TestExecutionsController(base.APITest): load_wf_ex_func.assert_not_called() + kwargs = json.loads(expected_json['params']) + kwargs['description'] = expected_json['description'] + start_wf_func.assert_called_once_with( expected_json['workflow_id'], '', None, json.loads(expected_json['input']), - expected_json['description'], - **json.loads(expected_json['params']) + **kwargs ) @mock.patch.object(rpc_clients.EngineClient, 'start_workflow') @@ -540,13 +543,15 @@ class TestExecutionsController(base.APITest): load_wf_ex_func.assert_called_once_with(expected_json['id']) + kwargs = json.loads(expected_json['params']) + kwargs['description'] = expected_json['description'] + start_wf_func.assert_called_once_with( expected_json['workflow_id'], '', expected_json['id'], json.loads(expected_json['input']), - expected_json['description'], - **json.loads(expected_json['params']) + **kwargs ) @mock.patch.object(rpc_clients.EngineClient, 'start_workflow') @@ -600,7 +605,7 @@ class TestExecutionsController(base.APITest): '', exec_dict['id'], json.loads(exec_dict['input']), - exec_dict['description'], + description=exec_dict['description'], **json.loads(exec_dict['params']) ) @@ -629,7 +634,7 @@ class TestExecutionsController(base.APITest): '', '', json.loads(exec_dict['input']), - exec_dict['description'], + description=exec_dict['description'], **json.loads(exec_dict['params']) ) @@ -659,7 +664,7 @@ class TestExecutionsController(base.APITest): '', exec_dict['id'], json.loads(exec_dict['input']), - exec_dict['description'], + description=exec_dict['description'], **json.loads(exec_dict['params']) ) diff --git a/mistral/tests/unit/engine/base.py b/mistral/tests/unit/engine/base.py index a27329329..59f7e4ab7 100644 --- a/mistral/tests/unit/engine/base.py +++ b/mistral/tests/unit/engine/base.py @@ -1,6 +1,7 @@ # Copyright 2014 - Mirantis, Inc. # Copyright 2015 - StackStorm, Inc. # Copyright 2016 - Brocade Communications Systems, Inc. +# Copyright 2018 - Extreme Networks, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -24,6 +25,7 @@ from mistral.db.v2 import api as db_api from mistral.engine import engine_server from mistral.executors import base as exe from mistral.executors import executor_server +from mistral.notifiers import notification_server as notif_server from mistral.rpc import base as rpc_base from mistral.rpc import clients as rpc_clients from mistral.tests.unit import base @@ -76,6 +78,18 @@ class EngineTestCase(base.DbTestCase): self.threads.append(eventlet.spawn(launch_service, exe_svc)) self.addCleanup(exe_svc.stop, True) + # Start remote notifier. + if cfg.CONF.notifier.type == 'remote': + LOG.info("Starting remote notifier threads...") + + self.notifier_client = rpc_clients.get_notifier_client() + + notif_svc = notif_server.get_oslo_service(setup_profiler=False) + + self.notifier = notif_svc.notifier + self.threads.append(eventlet.spawn(launch_service, notif_svc)) + self.addCleanup(notif_svc.stop, True) + # Start engine. LOG.info("Starting engine threads...") @@ -95,6 +109,9 @@ class EngineTestCase(base.DbTestCase): if cfg.CONF.executor.type == 'remote': exe_svc.wait_started() + if cfg.CONF.notifier.type == 'remote': + notif_svc.wait_started() + eng_svc.wait_started() def kill_threads(self): diff --git a/mistral/tests/unit/executors/test_plugins.py b/mistral/tests/unit/executors/test_server_plugins.py similarity index 72% rename from mistral/tests/unit/executors/test_plugins.py rename to mistral/tests/unit/executors/test_server_plugins.py index bed64790c..e33257289 100644 --- a/mistral/tests/unit/executors/test_plugins.py +++ b/mistral/tests/unit/executors/test_server_plugins.py @@ -1,4 +1,5 @@ # Copyright 2017 - Brocade Communications Systems, Inc. +# Copyright 2018 - Extreme Networks, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -13,10 +14,11 @@ # limitations under the License. from oslo_log import log as logging +from stevedore import exception as sd_exc from mistral.executors import base as exe -from mistral.executors import default_executor as d_exe -from mistral.executors import remote_executor as r_exe +from mistral.executors import default_executor as d +from mistral.executors import remote_executor as r from mistral.tests.unit.executors import base @@ -32,9 +34,12 @@ class PluginTestCase(base.ExecutorTestCase): def test_get_local_executor(self): executor = exe.get_executor('local') - self.assertIsInstance(executor, d_exe.DefaultExecutor) + self.assertIsInstance(executor, d.DefaultExecutor) def test_get_remote_executor(self): executor = exe.get_executor('remote') - self.assertIsInstance(executor, r_exe.RemoteExecutor) + self.assertIsInstance(executor, r.RemoteExecutor) + + def test_get_bad_executor(self): + self.assertRaises(sd_exc.NoMatches, exe.get_executor, 'foobar') diff --git a/mistral/tests/unit/notifiers/__init__.py b/mistral/tests/unit/notifiers/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/mistral/tests/unit/notifiers/base.py b/mistral/tests/unit/notifiers/base.py new file mode 100644 index 000000000..46db73c40 --- /dev/null +++ b/mistral/tests/unit/notifiers/base.py @@ -0,0 +1,47 @@ +# Copyright 2018 - Extreme Networks, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo_log import log as logging + +from mistral.tests.unit.engine import base as engine_test_base + + +LOG = logging.getLogger(__name__) + + +class NotifierTestCase(engine_test_base.EngineTestCase): + + def await_workflow_success(self, wf_ex_id, post_delay=1): + # Override the original wait method to add a delay to allow enough + # time for the notification events to get processed. + super(NotifierTestCase, self).await_workflow_success(wf_ex_id) + self._sleep(post_delay) + + def await_workflow_error(self, wf_ex_id, post_delay=1): + # Override the original wait method to add a delay to allow enough + # time for the notification events to get processed. + super(NotifierTestCase, self).await_workflow_error(wf_ex_id) + self._sleep(post_delay) + + def await_workflow_paused(self, wf_ex_id, post_delay=1): + # Override the original wait method to add a delay to allow enough + # time for the notification events to get processed. + super(NotifierTestCase, self).await_workflow_paused(wf_ex_id) + self._sleep(post_delay) + + def await_workflow_cancelled(self, wf_ex_id, post_delay=1): + # Override the original wait method to add a delay to allow enough + # time for the notification events to get processed. + super(NotifierTestCase, self).await_workflow_cancelled(wf_ex_id) + self._sleep(post_delay) diff --git a/mistral/tests/unit/notifiers/test_notifier_servers.py b/mistral/tests/unit/notifiers/test_notifier_servers.py new file mode 100644 index 000000000..0872afa56 --- /dev/null +++ b/mistral/tests/unit/notifiers/test_notifier_servers.py @@ -0,0 +1,221 @@ +# Copyright 2018 - Extreme Networks, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import mock + +from oslo_config import cfg +from stevedore import exception as sd_exc + +from mistral.db.v2 import api as db_api +from mistral.notifiers import base as notif +from mistral.notifiers import default_notifier as d_notif +from mistral.notifiers import notification_events as events +from mistral.notifiers import remote_notifier as r_notif +from mistral.services import workflows as wf_svc +from mistral.tests.unit.notifiers import base +from mistral.workflow import states + +# Use the set_default method to set value otherwise in certain test cases +# the change in value is not permanent. +cfg.CONF.set_default('auth_enable', False, group='pecan') + +EVENT_LOGS = [] + + +def publisher_process(ex_id, data, event, timestamp, **kwargs): + EVENT_LOGS.append((ex_id, event)) + + +def notifier_process(ex_id, data, event, timestamp, publishers): + EVENT_LOGS.append((ex_id, event)) + + +class ServerPluginTestCase(base.NotifierTestCase): + + def tearDown(self): + notif.cleanup() + super(ServerPluginTestCase, self).tearDown() + + def test_get_bad_notifier(self): + self.assertRaises(sd_exc.NoMatches, notif.get_notifier, 'foobar') + + +@mock.patch.object( + r_notif.RemoteNotifier, + 'notify', + mock.MagicMock(return_value=None) +) +class LocalNotifServerTestCase(base.NotifierTestCase): + + @classmethod + def setUpClass(cls): + super(LocalNotifServerTestCase, cls).setUpClass() + cfg.CONF.set_default('type', 'local', group='notifier') + + @classmethod + def tearDownClass(cls): + cfg.CONF.set_default('type', 'remote', group='notifier') + super(LocalNotifServerTestCase, cls).tearDownClass() + + def setUp(self): + super(LocalNotifServerTestCase, self).setUp() + self.publisher = notif.get_notification_publisher('webhook') + self.publisher.publish = mock.MagicMock(side_effect=publisher_process) + self.publisher.publish.reset_mock() + del EVENT_LOGS[:] + + def tearDown(self): + notif.cleanup() + super(LocalNotifServerTestCase, self).tearDown() + + def test_get_notifier(self): + notifier = notif.get_notifier(cfg.CONF.notifier.type) + + self.assertEqual('local', cfg.CONF.notifier.type) + self.assertIsInstance(notifier, d_notif.DefaultNotifier) + + def test_notify(self): + wf_def = """ + version: '2.0' + wf: + tasks: + t1: + action: std.noop + on-success: + - t2 + t2: + action: std.noop + """ + + wf_svc.create_workflows(wf_def) + + notif_options = [{'type': 'webhook'}] + + wf_ex = self.engine.start_workflow( + 'wf', + '', + wf_input={}, + notify=notif_options + ) + + self.await_workflow_success(wf_ex.id) + + with db_api.transaction(): + wf_ex = db_api.get_workflow_execution(wf_ex.id) + task_exs = wf_ex.task_executions + + self.assertEqual(states.SUCCESS, wf_ex.state) + self.assertIsNone(wf_ex.state_info) + self.assertEqual(2, len(task_exs)) + + t1_ex = self._assert_single_item(task_exs, name='t1') + t2_ex = self._assert_single_item(task_exs, name='t2') + + self.assertEqual(states.SUCCESS, t1_ex.state) + self.assertIsNone(t1_ex.state_info) + self.assertEqual(states.SUCCESS, t2_ex.state) + self.assertIsNone(t2_ex.state_info) + + expected_order = [ + (wf_ex.id, events.WORKFLOW_LAUNCHED), + (t1_ex.id, events.TASK_LAUNCHED), + (t1_ex.id, events.TASK_SUCCEEDED), + (t2_ex.id, events.TASK_LAUNCHED), + (t2_ex.id, events.TASK_SUCCEEDED), + (wf_ex.id, events.WORKFLOW_SUCCEEDED) + ] + + self.assertFalse(r_notif.RemoteNotifier.notify.called) + self.assertListEqual(expected_order, EVENT_LOGS) + + +@mock.patch.object( + r_notif.RemoteNotifier, + 'notify', + mock.MagicMock(side_effect=notifier_process) +) +class RemoteNotifServerTestCase(base.NotifierTestCase): + + @classmethod + def setUpClass(cls): + super(RemoteNotifServerTestCase, cls).setUpClass() + cfg.CONF.set_default('type', 'remote', group='notifier') + + def setUp(self): + super(RemoteNotifServerTestCase, self).setUp() + del EVENT_LOGS[:] + + def tearDown(self): + notif.cleanup() + super(RemoteNotifServerTestCase, self).tearDown() + + def test_get_notifier(self): + notifier = notif.get_notifier(cfg.CONF.notifier.type) + + self.assertEqual('remote', cfg.CONF.notifier.type) + self.assertIsInstance(notifier, r_notif.RemoteNotifier) + + def test_notify(self): + wf_def = """ + version: '2.0' + wf: + tasks: + t1: + action: std.noop + on-success: + - t2 + t2: + action: std.noop + """ + + wf_svc.create_workflows(wf_def) + + notif_options = [{'type': 'foobar'}] + + wf_ex = self.engine.start_workflow( + 'wf', + '', + wf_input={}, + notify=notif_options + ) + + self.await_workflow_success(wf_ex.id) + + with db_api.transaction(): + wf_ex = db_api.get_workflow_execution(wf_ex.id) + task_exs = wf_ex.task_executions + + self.assertEqual(states.SUCCESS, wf_ex.state) + self.assertIsNone(wf_ex.state_info) + self.assertEqual(2, len(task_exs)) + + t1_ex = self._assert_single_item(task_exs, name='t1') + t2_ex = self._assert_single_item(task_exs, name='t2') + + self.assertEqual(states.SUCCESS, t1_ex.state) + self.assertIsNone(t1_ex.state_info) + self.assertEqual(states.SUCCESS, t2_ex.state) + self.assertIsNone(t2_ex.state_info) + + expected_order = [ + (wf_ex.id, events.WORKFLOW_LAUNCHED), + (t1_ex.id, events.TASK_LAUNCHED), + (t1_ex.id, events.TASK_SUCCEEDED), + (t2_ex.id, events.TASK_LAUNCHED), + (t2_ex.id, events.TASK_SUCCEEDED), + (wf_ex.id, events.WORKFLOW_SUCCEEDED) + ] + + self.assertTrue(r_notif.RemoteNotifier.notify.called) + self.assertListEqual(expected_order, EVENT_LOGS) diff --git a/mistral/tests/unit/notifiers/test_notify.py b/mistral/tests/unit/notifiers/test_notify.py new file mode 100644 index 000000000..c75b70c84 --- /dev/null +++ b/mistral/tests/unit/notifiers/test_notify.py @@ -0,0 +1,1036 @@ +# Copyright 2018 - Extreme Networks, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import mock + +from oslo_config import cfg + +from mistral.db.v2 import api as db_api +from mistral.notifiers import base as notif +from mistral.notifiers import notification_events as events +from mistral.services import workbooks as wb_svc +from mistral.services import workflows as wf_svc +from mistral.tests.unit.notifiers import base +from mistral.workflow import states +from mistral_lib import actions as ml_actions + +# Use the set_default method to set value otherwise in certain test cases +# the change in value is not permanent. +cfg.CONF.set_default('auth_enable', False, group='pecan') + +EVENT_LOGS = [] + + +def log_event(ex_id, data, event, timestamp, **kwargs): + EVENT_LOGS.append((ex_id, event)) + + +class NotifyEventsTest(base.NotifierTestCase): + + def setUp(self): + super(NotifyEventsTest, self).setUp() + + self.publishers = { + 'wbhk': notif.get_notification_publisher('webhook'), + 'noop': notif.get_notification_publisher('noop') + } + + self.publishers['wbhk'].publish = mock.MagicMock(side_effect=log_event) + self.publishers['wbhk'].publish.reset_mock() + self.publishers['noop'].publish = mock.MagicMock(side_effect=log_event) + self.publishers['noop'].publish.reset_mock() + + del EVENT_LOGS[:] + + def tearDown(self): + cfg.CONF.set_default('notify', None, group='notifier') + super(NotifyEventsTest, self).tearDown() + + def test_notify_all_explicit(self): + wf_def = """ + version: '2.0' + wf: + tasks: + t1: + action: std.noop + on-success: + - t2 + t2: + action: std.noop + """ + + wf_svc.create_workflows(wf_def) + + notify_options = [ + { + 'type': 'webhook', + 'events': events.EVENTS + } + ] + + params = {'notify': notify_options} + + wf_ex = self.engine.start_workflow('wf', '', **params) + + self.await_workflow_success(wf_ex.id) + + with db_api.transaction(): + wf_ex = db_api.get_workflow_execution(wf_ex.id) + task_exs = wf_ex.task_executions + + self.assertEqual(states.SUCCESS, wf_ex.state) + self.assertIsNone(wf_ex.state_info) + self.assertEqual(2, len(task_exs)) + + t1_ex = self._assert_single_item(task_exs, name='t1') + t2_ex = self._assert_single_item(task_exs, name='t2') + + self.assertEqual(states.SUCCESS, t1_ex.state) + self.assertIsNone(t1_ex.state_info) + self.assertEqual(states.SUCCESS, t2_ex.state) + self.assertIsNone(t2_ex.state_info) + + self.assertTrue(self.publishers['wbhk'].publish.called) + self.assertEqual(6, len(EVENT_LOGS)) + self.assertIn((wf_ex.id, events.WORKFLOW_LAUNCHED), EVENT_LOGS) + self.assertIn((t1_ex.id, events.TASK_LAUNCHED), EVENT_LOGS) + self.assertIn((t1_ex.id, events.TASK_SUCCEEDED), EVENT_LOGS) + self.assertIn((t2_ex.id, events.TASK_LAUNCHED), EVENT_LOGS) + self.assertIn((t2_ex.id, events.TASK_SUCCEEDED), EVENT_LOGS) + self.assertIn((wf_ex.id, events.WORKFLOW_SUCCEEDED), EVENT_LOGS) + + def test_notify_all_implicit(self): + wf_def = """ + version: '2.0' + wf: + tasks: + t1: + action: std.noop + on-success: + - t2 + t2: + action: std.noop + """ + + wf_svc.create_workflows(wf_def) + + notify_options = [{'type': 'webhook'}] + params = {'notify': notify_options} + + wf_ex = self.engine.start_workflow('wf', '', **params) + + self.await_workflow_success(wf_ex.id) + + with db_api.transaction(): + wf_ex = db_api.get_workflow_execution(wf_ex.id) + task_exs = wf_ex.task_executions + + self.assertEqual(states.SUCCESS, wf_ex.state) + self.assertIsNone(wf_ex.state_info) + self.assertEqual(2, len(task_exs)) + + t1_ex = self._assert_single_item(task_exs, name='t1') + t2_ex = self._assert_single_item(task_exs, name='t2') + + self.assertEqual(states.SUCCESS, t1_ex.state) + self.assertIsNone(t1_ex.state_info) + self.assertEqual(states.SUCCESS, t2_ex.state) + self.assertIsNone(t2_ex.state_info) + + self.assertTrue(self.publishers['wbhk'].publish.called) + self.assertEqual(6, len(EVENT_LOGS)) + self.assertIn((wf_ex.id, events.WORKFLOW_LAUNCHED), EVENT_LOGS) + self.assertIn((t1_ex.id, events.TASK_LAUNCHED), EVENT_LOGS) + self.assertIn((t1_ex.id, events.TASK_SUCCEEDED), EVENT_LOGS) + self.assertIn((t2_ex.id, events.TASK_LAUNCHED), EVENT_LOGS) + self.assertIn((t2_ex.id, events.TASK_SUCCEEDED), EVENT_LOGS) + self.assertIn((wf_ex.id, events.WORKFLOW_SUCCEEDED), EVENT_LOGS) + + def test_notify_order(self): + wf_def = """ + version: '2.0' + wf: + tasks: + t1: + action: std.noop + on-success: + - t2 + t2: + action: std.noop + """ + + wf_svc.create_workflows(wf_def) + + notify_options = [ + {'type': 'webhook'} + ] + + params = {'notify': notify_options} + + wf_ex = self.engine.start_workflow('wf', '', **params) + + self.await_workflow_success(wf_ex.id) + + with db_api.transaction(): + wf_ex = db_api.get_workflow_execution(wf_ex.id) + task_exs = wf_ex.task_executions + + self.assertEqual(states.SUCCESS, wf_ex.state) + self.assertIsNone(wf_ex.state_info) + self.assertEqual(2, len(task_exs)) + + t1_ex = self._assert_single_item(task_exs, name='t1') + t2_ex = self._assert_single_item(task_exs, name='t2') + + self.assertEqual(states.SUCCESS, t1_ex.state) + self.assertIsNone(t1_ex.state_info) + self.assertEqual(states.SUCCESS, t2_ex.state) + self.assertIsNone(t2_ex.state_info) + + expected_order = [ + (wf_ex.id, events.WORKFLOW_LAUNCHED), + (t1_ex.id, events.TASK_LAUNCHED), + (t1_ex.id, events.TASK_SUCCEEDED), + (t2_ex.id, events.TASK_LAUNCHED), + (t2_ex.id, events.TASK_SUCCEEDED), + (wf_ex.id, events.WORKFLOW_SUCCEEDED) + ] + + self.assertTrue(self.publishers['wbhk'].publish.called) + self.assertListEqual(expected_order, EVENT_LOGS) + + def test_notify_multiple(self): + self.assertFalse(self.publishers['wbhk'].publish.called) + self.assertFalse(self.publishers['noop'].publish.called) + + wf_def = """ + version: '2.0' + wf: + tasks: + t1: + action: std.noop + on-success: + - t2 + t2: + action: std.noop + """ + + wf_svc.create_workflows(wf_def) + + notify_options = [ + {'type': 'webhook'}, + {'type': 'noop'} + ] + + params = {'notify': notify_options} + + wf_ex = self.engine.start_workflow('wf', '', **params) + + self.await_workflow_success(wf_ex.id) + + with db_api.transaction(): + wf_ex = db_api.get_workflow_execution(wf_ex.id) + task_exs = wf_ex.task_executions + + self.assertEqual(states.SUCCESS, wf_ex.state) + self.assertIsNone(wf_ex.state_info) + self.assertEqual(2, len(task_exs)) + + t1_ex = self._assert_single_item(task_exs, name='t1') + t2_ex = self._assert_single_item(task_exs, name='t2') + + self.assertEqual(states.SUCCESS, t1_ex.state) + self.assertIsNone(t1_ex.state_info) + self.assertEqual(states.SUCCESS, t2_ex.state) + self.assertIsNone(t2_ex.state_info) + + expected_order = [ + (wf_ex.id, events.WORKFLOW_LAUNCHED), + (wf_ex.id, events.WORKFLOW_LAUNCHED), + (t1_ex.id, events.TASK_LAUNCHED), + (t1_ex.id, events.TASK_LAUNCHED), + (t1_ex.id, events.TASK_SUCCEEDED), + (t1_ex.id, events.TASK_SUCCEEDED), + (t2_ex.id, events.TASK_LAUNCHED), + (t2_ex.id, events.TASK_LAUNCHED), + (t2_ex.id, events.TASK_SUCCEEDED), + (t2_ex.id, events.TASK_SUCCEEDED), + (wf_ex.id, events.WORKFLOW_SUCCEEDED), + (wf_ex.id, events.WORKFLOW_SUCCEEDED) + ] + + self.assertTrue(self.publishers['wbhk'].publish.called) + self.assertTrue(self.publishers['noop'].publish.called) + self.assertListEqual(expected_order, EVENT_LOGS) + + def test_notify_from_cfg(self): + self.assertFalse(self.publishers['wbhk'].publish.called) + self.assertFalse(self.publishers['noop'].publish.called) + + wf_def = """ + version: '2.0' + wf: + tasks: + t1: + action: std.noop + on-success: + - t2 + t2: + action: std.noop + """ + + wf_svc.create_workflows(wf_def) + + notify_options = [ + {'type': 'webhook'}, + {'type': 'noop'} + ] + + cfg.CONF.set_default( + 'notify', + json.dumps(notify_options), + group='notifier' + ) + + wf_ex = self.engine.start_workflow('wf', '') + + self.await_workflow_success(wf_ex.id) + + with db_api.transaction(): + wf_ex = db_api.get_workflow_execution(wf_ex.id) + task_exs = wf_ex.task_executions + + self.assertEqual(states.SUCCESS, wf_ex.state) + self.assertIsNone(wf_ex.state_info) + self.assertEqual(2, len(task_exs)) + + t1_ex = self._assert_single_item(task_exs, name='t1') + t2_ex = self._assert_single_item(task_exs, name='t2') + + self.assertEqual(states.SUCCESS, t1_ex.state) + self.assertIsNone(t1_ex.state_info) + self.assertEqual(states.SUCCESS, t2_ex.state) + self.assertIsNone(t2_ex.state_info) + + expected_order = [ + (wf_ex.id, events.WORKFLOW_LAUNCHED), + (wf_ex.id, events.WORKFLOW_LAUNCHED), + (t1_ex.id, events.TASK_LAUNCHED), + (t1_ex.id, events.TASK_LAUNCHED), + (t1_ex.id, events.TASK_SUCCEEDED), + (t1_ex.id, events.TASK_SUCCEEDED), + (t2_ex.id, events.TASK_LAUNCHED), + (t2_ex.id, events.TASK_LAUNCHED), + (t2_ex.id, events.TASK_SUCCEEDED), + (t2_ex.id, events.TASK_SUCCEEDED), + (wf_ex.id, events.WORKFLOW_SUCCEEDED), + (wf_ex.id, events.WORKFLOW_SUCCEEDED) + ] + + self.assertTrue(self.publishers['wbhk'].publish.called) + self.assertTrue(self.publishers['noop'].publish.called) + self.assertListEqual(expected_order, EVENT_LOGS) + + def test_notify_from_cfg_and_params(self): + self.assertFalse(self.publishers['wbhk'].publish.called) + self.assertFalse(self.publishers['noop'].publish.called) + + wf_def = """ + version: '2.0' + wf: + tasks: + t1: + action: std.noop + on-success: + - t2 + t2: + action: std.noop + """ + + wf_svc.create_workflows(wf_def) + + cfg.CONF.set_default( + 'notify', + json.dumps([{'type': 'noop'}]), + group='notifier' + ) + + params = {'notify': [{'type': 'webhook'}]} + + wf_ex = self.engine.start_workflow('wf', '', **params) + + self.await_workflow_success(wf_ex.id) + + with db_api.transaction(): + wf_ex = db_api.get_workflow_execution(wf_ex.id) + task_exs = wf_ex.task_executions + + self.assertEqual(states.SUCCESS, wf_ex.state) + self.assertIsNone(wf_ex.state_info) + self.assertEqual(2, len(task_exs)) + + t1_ex = self._assert_single_item(task_exs, name='t1') + t2_ex = self._assert_single_item(task_exs, name='t2') + + self.assertEqual(states.SUCCESS, t1_ex.state) + self.assertIsNone(t1_ex.state_info) + self.assertEqual(states.SUCCESS, t2_ex.state) + self.assertIsNone(t2_ex.state_info) + + expected_order = [ + (wf_ex.id, events.WORKFLOW_LAUNCHED), + (wf_ex.id, events.WORKFLOW_LAUNCHED), + (t1_ex.id, events.TASK_LAUNCHED), + (t1_ex.id, events.TASK_LAUNCHED), + (t1_ex.id, events.TASK_SUCCEEDED), + (t1_ex.id, events.TASK_SUCCEEDED), + (t2_ex.id, events.TASK_LAUNCHED), + (t2_ex.id, events.TASK_LAUNCHED), + (t2_ex.id, events.TASK_SUCCEEDED), + (t2_ex.id, events.TASK_SUCCEEDED), + (wf_ex.id, events.WORKFLOW_SUCCEEDED), + (wf_ex.id, events.WORKFLOW_SUCCEEDED) + ] + + self.assertTrue(self.publishers['wbhk'].publish.called) + self.assertTrue(self.publishers['noop'].publish.called) + self.assertListEqual(expected_order, EVENT_LOGS) + + def test_workbook_notify(self): + wb_def = """ + version: '2.0' + name: wb + workflows: + wf1: + tasks: + t1: + workflow: wf2 + on-success: + - t2 + t2: + action: std.noop + wf2: + tasks: + t1: + action: std.noop + """ + + wb_svc.create_workbook_v2(wb_def) + + notify_options = [{'type': 'webhook'}] + params = {'notify': notify_options} + + wf1_ex = self.engine.start_workflow('wb.wf1', '', **params) + + self.await_workflow_success(wf1_ex.id) + + with db_api.transaction(): + wf1_ex = db_api.get_workflow_execution(wf1_ex.id) + wf1_task_exs = wf1_ex.task_executions + + wf1_t1_ex = self._assert_single_item(wf1_task_exs, name='t1') + wf1_t2_ex = self._assert_single_item(wf1_task_exs, name='t2') + + wf1_t1_act_exs = db_api.get_workflow_executions( + task_execution_id=wf1_t1_ex.id + ) + + wf2_ex = wf1_t1_act_exs[0] + wf2_task_exs = wf2_ex.task_executions + + wf2_t1_ex = self._assert_single_item(wf2_task_exs, name='t1') + + self.assertEqual(states.SUCCESS, wf1_ex.state) + self.assertIsNone(wf1_ex.state_info) + self.assertEqual(2, len(wf1_task_exs)) + + self.assertEqual(states.SUCCESS, wf1_t1_ex.state) + self.assertIsNone(wf1_t1_ex.state_info) + self.assertEqual(states.SUCCESS, wf1_t2_ex.state) + self.assertIsNone(wf1_t2_ex.state_info) + + self.assertEqual(1, len(wf1_t1_act_exs)) + + self.assertEqual(states.SUCCESS, wf2_ex.state) + self.assertIsNone(wf2_ex.state_info) + self.assertEqual(1, len(wf2_task_exs)) + + self.assertEqual(states.SUCCESS, wf2_t1_ex.state) + self.assertIsNone(wf2_t1_ex.state_info) + + expected_order = [ + (wf1_ex.id, events.WORKFLOW_LAUNCHED), + (wf1_t1_ex.id, events.TASK_LAUNCHED), + (wf2_ex.id, events.WORKFLOW_LAUNCHED), + (wf2_t1_ex.id, events.TASK_LAUNCHED), + (wf2_t1_ex.id, events.TASK_SUCCEEDED), + (wf2_ex.id, events.WORKFLOW_SUCCEEDED), + (wf1_t1_ex.id, events.TASK_SUCCEEDED), + (wf1_t2_ex.id, events.TASK_LAUNCHED), + (wf1_t2_ex.id, events.TASK_SUCCEEDED), + (wf1_ex.id, events.WORKFLOW_SUCCEEDED) + ] + + self.assertTrue(self.publishers['wbhk'].publish.called) + self.assertListEqual(expected_order, EVENT_LOGS) + + def test_notify_task_error(self): + wf_def = """ + version: '2.0' + wf: + tasks: + t1: + action: std.noop + on-success: + - t2 + t2: + action: std.fail + """ + + wf_svc.create_workflows(wf_def) + + notify_options = [{'type': 'webhook'}] + params = {'notify': notify_options} + + wf_ex = self.engine.start_workflow('wf', '', **params) + + self.await_workflow_error(wf_ex.id) + + with db_api.transaction(): + wf_ex = db_api.get_workflow_execution(wf_ex.id) + task_exs = wf_ex.task_executions + + self.assertEqual(states.ERROR, wf_ex.state) + self.assertIsNotNone(wf_ex.state_info) + self.assertEqual(2, len(task_exs)) + + t1_ex = self._assert_single_item(task_exs, name='t1') + t2_ex = self._assert_single_item(task_exs, name='t2') + + self.assertEqual(states.SUCCESS, t1_ex.state) + self.assertIsNone(t1_ex.state_info) + self.assertEqual(states.ERROR, t2_ex.state) + self.assertIsNotNone(t2_ex.state_info) + + expected_order = [ + (wf_ex.id, events.WORKFLOW_LAUNCHED), + (t1_ex.id, events.TASK_LAUNCHED), + (t1_ex.id, events.TASK_SUCCEEDED), + (t2_ex.id, events.TASK_LAUNCHED), + (t2_ex.id, events.TASK_FAILED), + (wf_ex.id, events.WORKFLOW_FAILED) + ] + + self.assertTrue(self.publishers['wbhk'].publish.called) + self.assertListEqual(expected_order, EVENT_LOGS) + + def test_notify_task_transition_fail(self): + wf_def = """ + version: '2.0' + wf: + tasks: + t1: + action: std.noop + on-complete: + - fail + """ + + wf_svc.create_workflows(wf_def) + + notify_options = [{'type': 'webhook'}] + params = {'notify': notify_options} + + wf_ex = self.engine.start_workflow('wf', '', **params) + + self.await_workflow_error(wf_ex.id) + + with db_api.transaction(): + wf_ex = db_api.get_workflow_execution(wf_ex.id) + task_exs = wf_ex.task_executions + + self.assertEqual(states.ERROR, wf_ex.state) + self.assertIsNone(wf_ex.state_info) + self.assertEqual(1, len(task_exs)) + + t1_ex = self._assert_single_item(task_exs, name='t1') + + self.assertEqual(states.SUCCESS, t1_ex.state) + self.assertIsNone(t1_ex.state_info) + + expected_order = [ + (wf_ex.id, events.WORKFLOW_LAUNCHED), + (t1_ex.id, events.TASK_LAUNCHED), + (t1_ex.id, events.TASK_SUCCEEDED), + (wf_ex.id, events.WORKFLOW_FAILED) + ] + + self.assertTrue(self.publishers['wbhk'].publish.called) + self.assertListEqual(expected_order, EVENT_LOGS) + + def test_notify_with_items_task(self): + wf_def = """ + version: '2.0' + wf: + tasks: + t1: + with-items: i in <% list(range(0, 3)) %> + action: std.noop + on-success: + - t2 + t2: + action: std.noop + """ + + wf_svc.create_workflows(wf_def) + + notify_options = [{'type': 'webhook'}] + params = {'notify': notify_options} + + wf_ex = self.engine.start_workflow('wf', '', **params) + + self.await_workflow_success(wf_ex.id) + self._sleep(1) + + with db_api.transaction(): + wf_ex = db_api.get_workflow_execution(wf_ex.id) + task_exs = wf_ex.task_executions + + self.assertEqual(states.SUCCESS, wf_ex.state) + self.assertIsNone(wf_ex.state_info) + self.assertEqual(2, len(task_exs)) + + t1_ex = self._assert_single_item(task_exs, name='t1') + t2_ex = self._assert_single_item(task_exs, name='t2') + + self.assertEqual(states.SUCCESS, t1_ex.state) + self.assertIsNone(t1_ex.state_info) + self.assertEqual(states.SUCCESS, t2_ex.state) + self.assertIsNone(t2_ex.state_info) + + expected_order = [ + (wf_ex.id, events.WORKFLOW_LAUNCHED), + (t1_ex.id, events.TASK_LAUNCHED), + (t1_ex.id, events.TASK_SUCCEEDED), + (t2_ex.id, events.TASK_LAUNCHED), + (t2_ex.id, events.TASK_SUCCEEDED), + (wf_ex.id, events.WORKFLOW_SUCCEEDED) + ] + + self.assertTrue(self.publishers['wbhk'].publish.called) + self.assertListEqual(expected_order, EVENT_LOGS) + + def test_notify_pause_resume(self): + wf_def = """ + version: '2.0' + wf: + tasks: + t1: + action: std.async_noop + on-success: + - t2 + t2: + action: std.noop + """ + + wf_svc.create_workflows(wf_def) + + notify_options = [{'type': 'webhook'}] + params = {'notify': notify_options} + + wf_ex = self.engine.start_workflow('wf', '', **params) + + self.await_workflow_running(wf_ex.id) + + with db_api.transaction(): + wf_ex = db_api.get_workflow_execution(wf_ex.id) + task_exs = wf_ex.task_executions + + t1_ex = self._assert_single_item(task_exs, name='t1') + t1_act_exs = db_api.get_action_executions(task_execution_id=t1_ex.id) + + self.assertEqual(states.RUNNING, wf_ex.state) + self.assertEqual(1, len(task_exs)) + self.assertEqual(states.RUNNING, t1_ex.state) + self.assertEqual(1, len(t1_act_exs)) + self.assertEqual(states.RUNNING, t1_act_exs[0].state) + + # Pause the workflow. + self.engine.pause_workflow(wf_ex.id) + self.await_workflow_paused(wf_ex.id) + + with db_api.transaction(): + wf_ex = db_api.get_workflow_execution(wf_ex.id) + task_exs = wf_ex.task_executions + + t1_ex = self._assert_single_item(task_exs, name='t1') + t1_act_exs = db_api.get_action_executions(task_execution_id=t1_ex.id) + + # Workflow is paused but the task is still running as expected. + self.assertEqual(states.PAUSED, wf_ex.state) + self.assertEqual(1, len(task_exs)) + self.assertEqual(states.RUNNING, t1_ex.state) + self.assertEqual(1, len(t1_act_exs)) + self.assertEqual(states.RUNNING, t1_act_exs[0].state) + + expected_order = [ + (wf_ex.id, events.WORKFLOW_LAUNCHED), + (t1_ex.id, events.TASK_LAUNCHED), + (wf_ex.id, events.WORKFLOW_PAUSED) + ] + + self.assertTrue(self.publishers['wbhk'].publish.called) + self.assertListEqual(expected_order, EVENT_LOGS) + + # Complete action execution of task 1. + self.engine.on_action_complete( + t1_act_exs[0].id, + ml_actions.Result(data={'result': 'foobar'}) + ) + + self.await_workflow_paused(wf_ex.id) + + with db_api.transaction(): + wf_ex = db_api.get_workflow_execution(wf_ex.id) + task_exs = wf_ex.task_executions + + self.assertEqual(states.PAUSED, wf_ex.state) + self.assertIsNone(wf_ex.state_info) + self.assertEqual(1, len(task_exs)) + + t1_ex = self._assert_single_item(task_exs, name='t1') + + self.assertEqual(states.SUCCESS, t1_ex.state) + self.assertIsNone(t1_ex.state_info) + + expected_order = [ + (wf_ex.id, events.WORKFLOW_LAUNCHED), + (t1_ex.id, events.TASK_LAUNCHED), + (wf_ex.id, events.WORKFLOW_PAUSED), + (t1_ex.id, events.TASK_SUCCEEDED) + ] + + self.assertTrue(self.publishers['wbhk'].publish.called) + self.assertListEqual(expected_order, EVENT_LOGS) + + # Resume the workflow. + self.engine.resume_workflow(wf_ex.id) + self.await_workflow_success(wf_ex.id) + + with db_api.transaction(): + wf_ex = db_api.get_workflow_execution(wf_ex.id) + task_exs = wf_ex.task_executions + + self.assertEqual(states.SUCCESS, wf_ex.state) + self.assertIsNone(wf_ex.state_info) + self.assertEqual(2, len(task_exs)) + + t1_ex = self._assert_single_item(task_exs, name='t1') + t2_ex = self._assert_single_item(task_exs, name='t2') + + self.assertEqual(states.SUCCESS, t1_ex.state) + self.assertIsNone(t1_ex.state_info) + self.assertEqual(states.SUCCESS, t2_ex.state) + self.assertIsNone(t2_ex.state_info) + + expected_order = [ + (wf_ex.id, events.WORKFLOW_LAUNCHED), + (t1_ex.id, events.TASK_LAUNCHED), + (wf_ex.id, events.WORKFLOW_PAUSED), + (t1_ex.id, events.TASK_SUCCEEDED), + (wf_ex.id, events.WORKFLOW_RESUMED), + (t2_ex.id, events.TASK_LAUNCHED), + (t2_ex.id, events.TASK_SUCCEEDED), + (wf_ex.id, events.WORKFLOW_SUCCEEDED) + ] + + self.assertTrue(self.publishers['wbhk'].publish.called) + self.assertListEqual(expected_order, EVENT_LOGS) + + def test_notify_pause_resume_task(self): + wf_def = """ + version: '2.0' + wf: + tasks: + t1: + action: std.async_noop + on-success: + - t2 + t2: + action: std.noop + """ + + wf_svc.create_workflows(wf_def) + + notify_options = [{'type': 'webhook'}] + params = {'notify': notify_options} + + wf_ex = self.engine.start_workflow('wf', '', **params) + + self.await_workflow_running(wf_ex.id) + + with db_api.transaction(): + wf_ex = db_api.get_workflow_execution(wf_ex.id) + task_exs = wf_ex.task_executions + + t1_ex = self._assert_single_item(task_exs, name='t1') + t1_act_exs = db_api.get_action_executions(task_execution_id=t1_ex.id) + + self.assertEqual(states.RUNNING, wf_ex.state) + self.assertEqual(1, len(task_exs)) + self.assertEqual(states.RUNNING, t1_ex.state) + self.assertEqual(1, len(t1_act_exs)) + self.assertEqual(states.RUNNING, t1_act_exs[0].state) + + # Pause the action execution of task 1. + self.engine.on_action_update(t1_act_exs[0].id, states.PAUSED) + self.await_workflow_paused(wf_ex.id) + + with db_api.transaction(): + wf_ex = db_api.get_workflow_execution(wf_ex.id) + task_exs = wf_ex.task_executions + + t1_ex = self._assert_single_item(task_exs, name='t1') + t1_act_exs = db_api.get_action_executions(task_execution_id=t1_ex.id) + + self.assertEqual(states.PAUSED, wf_ex.state) + self.assertEqual(1, len(task_exs)) + self.assertEqual(states.PAUSED, t1_ex.state) + self.assertEqual(1, len(t1_act_exs)) + self.assertEqual(states.PAUSED, t1_act_exs[0].state) + + expected_order = [ + (wf_ex.id, events.WORKFLOW_LAUNCHED), + (t1_ex.id, events.TASK_LAUNCHED), + (t1_ex.id, events.TASK_PAUSED), + (wf_ex.id, events.WORKFLOW_PAUSED) + ] + + self.assertTrue(self.publishers['wbhk'].publish.called) + self.assertListEqual(expected_order, EVENT_LOGS) + + # Resume the action execution of task 1. + self.engine.on_action_update(t1_act_exs[0].id, states.RUNNING) + self.await_task_running(t1_ex.id) + + with db_api.transaction(): + wf_ex = db_api.get_workflow_execution(wf_ex.id) + task_exs = wf_ex.task_executions + + t1_ex = self._assert_single_item(task_exs, name='t1') + t1_act_exs = db_api.get_action_executions(task_execution_id=t1_ex.id) + + self.assertEqual(states.RUNNING, wf_ex.state) + self.assertEqual(1, len(task_exs)) + self.assertEqual(states.RUNNING, t1_ex.state) + self.assertEqual(1, len(t1_act_exs)) + self.assertEqual(states.RUNNING, t1_act_exs[0].state) + + # Complete action execution of task 1. + self.engine.on_action_complete( + t1_act_exs[0].id, + ml_actions.Result(data={'result': 'foobar'}) + ) + + # Wait for the workflow execution to complete. + self.await_workflow_success(wf_ex.id) + + with db_api.transaction(): + wf_ex = db_api.get_workflow_execution(wf_ex.id) + task_exs = wf_ex.task_executions + + self.assertEqual(states.SUCCESS, wf_ex.state) + self.assertIsNone(wf_ex.state_info) + self.assertEqual(2, len(task_exs)) + + t1_ex = self._assert_single_item(task_exs, name='t1') + t2_ex = self._assert_single_item(task_exs, name='t2') + + self.assertEqual(states.SUCCESS, t1_ex.state) + self.assertIsNone(t1_ex.state_info) + self.assertEqual(states.SUCCESS, t2_ex.state) + self.assertIsNone(t2_ex.state_info) + + # TASK_RESUMED comes before WORKFLOW_RESUMED because + # this test resumed the workflow with on_action_update. + expected_order = [ + (wf_ex.id, events.WORKFLOW_LAUNCHED), + (t1_ex.id, events.TASK_LAUNCHED), + (t1_ex.id, events.TASK_PAUSED), + (wf_ex.id, events.WORKFLOW_PAUSED), + (t1_ex.id, events.TASK_RESUMED), + (wf_ex.id, events.WORKFLOW_RESUMED), + (t1_ex.id, events.TASK_SUCCEEDED), + (t2_ex.id, events.TASK_LAUNCHED), + (t2_ex.id, events.TASK_SUCCEEDED), + (wf_ex.id, events.WORKFLOW_SUCCEEDED) + ] + + self.assertTrue(self.publishers['wbhk'].publish.called) + self.assertListEqual(expected_order, EVENT_LOGS) + + def test_notify_cancel(self): + wf_def = """ + version: '2.0' + wf: + tasks: + t1: + action: std.async_noop + on-success: + - t2 + t2: + action: std.noop + """ + + wf_svc.create_workflows(wf_def) + + notify_options = [{'type': 'webhook'}] + params = {'notify': notify_options} + + wf_ex = self.engine.start_workflow('wf', '', **params) + + self.await_workflow_running(wf_ex.id) + + with db_api.transaction(): + wf_ex = db_api.get_workflow_execution(wf_ex.id) + task_exs = wf_ex.task_executions + + t1_ex = self._assert_single_item(task_exs, name='t1') + t1_act_exs = db_api.get_action_executions(task_execution_id=t1_ex.id) + + self.assertEqual(states.RUNNING, wf_ex.state) + self.assertEqual(1, len(task_exs)) + self.assertEqual(states.RUNNING, t1_ex.state) + self.assertEqual(1, len(t1_act_exs)) + self.assertEqual(states.RUNNING, t1_act_exs[0].state) + + # Cancel the workflow. + self.engine.stop_workflow(wf_ex.id, states.CANCELLED) + self.await_workflow_cancelled(wf_ex.id) + + with db_api.transaction(): + wf_ex = db_api.get_workflow_execution(wf_ex.id) + task_exs = wf_ex.task_executions + + t1_ex = self._assert_single_item(task_exs, name='t1') + t1_act_exs = db_api.get_action_executions(task_execution_id=t1_ex.id) + + # Workflow is cancelled but the task is still running as expected. + self.assertEqual(states.CANCELLED, wf_ex.state) + self.assertEqual(1, len(task_exs)) + self.assertEqual(states.RUNNING, t1_ex.state) + self.assertEqual(1, len(t1_act_exs)) + self.assertEqual(states.RUNNING, t1_act_exs[0].state) + + expected_order = [ + (wf_ex.id, events.WORKFLOW_LAUNCHED), + (t1_ex.id, events.TASK_LAUNCHED), + (wf_ex.id, events.WORKFLOW_CANCELLED) + ] + + self.assertTrue(self.publishers['wbhk'].publish.called) + self.assertListEqual(expected_order, EVENT_LOGS) + + # Complete action execution of task 1. + self.engine.on_action_complete( + t1_act_exs[0].id, + ml_actions.Result(data={'result': 'foobar'}) + ) + + self.await_workflow_cancelled(wf_ex.id) + + with db_api.transaction(): + wf_ex = db_api.get_workflow_execution(wf_ex.id) + task_exs = wf_ex.task_executions + + self.assertEqual(states.CANCELLED, wf_ex.state) + self.assertIsNone(wf_ex.state_info) + self.assertEqual(1, len(task_exs)) + + t1_ex = self._assert_single_item(task_exs, name='t1') + + self.assertEqual(states.SUCCESS, t1_ex.state) + self.assertIsNone(t1_ex.state_info) + + expected_order = [ + (wf_ex.id, events.WORKFLOW_LAUNCHED), + (t1_ex.id, events.TASK_LAUNCHED), + (wf_ex.id, events.WORKFLOW_CANCELLED), + (t1_ex.id, events.TASK_SUCCEEDED) + ] + + self.assertTrue(self.publishers['wbhk'].publish.called) + self.assertListEqual(expected_order, EVENT_LOGS) + + def test_notify_cancel_task(self): + wf_def = """ + version: '2.0' + wf: + tasks: + t1: + action: std.async_noop + on-success: + - t2 + t2: + action: std.noop + """ + + wf_svc.create_workflows(wf_def) + + notify_options = [{'type': 'webhook'}] + params = {'notify': notify_options} + + wf_ex = self.engine.start_workflow('wf', '', **params) + + self.await_workflow_running(wf_ex.id) + + with db_api.transaction(): + wf_ex = db_api.get_workflow_execution(wf_ex.id) + task_exs = wf_ex.task_executions + + t1_ex = self._assert_single_item(task_exs, name='t1') + t1_act_exs = db_api.get_action_executions(task_execution_id=t1_ex.id) + + self.assertEqual(states.RUNNING, wf_ex.state) + self.assertEqual(1, len(task_exs)) + self.assertEqual(states.RUNNING, t1_ex.state) + self.assertEqual(1, len(t1_act_exs)) + self.assertEqual(states.RUNNING, t1_act_exs[0].state) + + # Cancel the action execution of task 1. + self.engine.on_action_update(t1_act_exs[0].id, states.CANCELLED) + self.await_workflow_cancelled(wf_ex.id) + + with db_api.transaction(): + wf_ex = db_api.get_workflow_execution(wf_ex.id) + task_exs = wf_ex.task_executions + + t1_ex = self._assert_single_item(task_exs, name='t1') + t1_act_exs = db_api.get_action_executions(task_execution_id=t1_ex.id) + + self.assertEqual(states.CANCELLED, wf_ex.state) + self.assertEqual(1, len(task_exs)) + self.assertEqual(states.CANCELLED, t1_ex.state) + self.assertEqual(1, len(t1_act_exs)) + self.assertEqual(states.CANCELLED, t1_act_exs[0].state) + + expected_order = [ + (wf_ex.id, events.WORKFLOW_LAUNCHED), + (t1_ex.id, events.TASK_LAUNCHED), + (t1_ex.id, events.TASK_CANCELLED), + (wf_ex.id, events.WORKFLOW_CANCELLED) + ] + + self.assertTrue(self.publishers['wbhk'].publish.called) + self.assertListEqual(expected_order, EVENT_LOGS) diff --git a/mistral/tests/unit/test_launcher.py b/mistral/tests/unit/test_launcher.py index f194a8c85..0d9711761 100644 --- a/mistral/tests/unit/test_launcher.py +++ b/mistral/tests/unit/test_launcher.py @@ -1,4 +1,5 @@ # Copyright 2017 - Brocade Communications Systems, Inc. +# Copyright 2018 - Extreme Networks, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -47,7 +48,7 @@ class ServiceLauncherTest(base.DbTestCase): api_workers = api_server.workers self._await(lambda: len(svr_proc_mgr.children.keys()) == api_workers) - self._await(lambda: len(svr_thrd_mgr.services.services) == 3) + self._await(lambda: len(svr_thrd_mgr.services.services) == 4) def test_launch_process(self): eventlet.spawn(launch.launch_any, ['api']) diff --git a/setup.cfg b/setup.cfg index 56b1adf3a..caab0062c 100644 --- a/setup.cfg +++ b/setup.cfg @@ -75,6 +75,14 @@ mistral.executors = local = mistral.executors.default_executor:DefaultExecutor remote = mistral.executors.remote_executor:RemoteExecutor +mistral.notifiers = + local = mistral.notifiers.default_notifier:DefaultNotifier + remote = mistral.notifiers.remote_notifier:RemoteNotifier + +mistral.notification.publishers = + webhook = mistral.notifiers.publishers.webhook:WebhookPublisher + noop = mistral.notifiers.publishers.noop:NoopPublisher + mistral.expression.functions = # json_pp was deprecated in Queens and will be removed in the S cycle json_pp = mistral.utils.expression_utils:json_pp_