Add event trigger REST API

This patch adds the Mistral changes to support the new event trigger
REST API.

Change-Id: I8190ce81d46cc8296db29f41442354cdfe1a5bbd
Implements: blueprint event-notification-trigger
Co-Authored-By: Lingxian Kong <anlin.kong@gmail.com>
This commit is contained in:
Lingxian Kong 2016-06-20 22:49:49 +12:00
parent 33e0ee5ea8
commit 4e1e358c8b
9 changed files with 577 additions and 3 deletions

View File

@ -54,5 +54,11 @@
"workflows:delete": "rule:admin_or_owner",
"workflows:get": "rule:admin_or_owner",
"workflows:list": "rule:admin_or_owner",
"workflows:update": "rule:admin_or_owner"
"workflows:update": "rule:admin_or_owner",
"event_triggers:create": "rule:admin_or_owner",
"event_triggers:delete": "rule:admin_or_owner",
"event_triggers:get": "rule:admin_or_owner",
"event_triggers:list": "rule:admin_or_owner",
"event_triggers:update": "rule:admin_or_owner",
}

View File

@ -0,0 +1,143 @@
# Copyright 2016 - IBM Corp.
# Copyright 2016 Catalyst IT Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_log import log as logging
from pecan import rest
import wsmeext.pecan as wsme_pecan
from mistral.api import access_control as acl
from mistral.api.controllers.v2 import resources
from mistral.api.controllers.v2 import types
from mistral import context as auth_ctx
from mistral.db.v2 import api as db_api
from mistral import exceptions as exc
from mistral.services import triggers
from mistral.utils import rest_utils
LOG = logging.getLogger(__name__)
UPDATE_NOT_ALLOWED = ['exchange', 'topic', 'event']
CREATE_MANDATORY = set(['exchange', 'topic', 'event', 'workflow_id'])
class EventTriggersController(rest.RestController):
@rest_utils.wrap_wsme_controller_exception
@wsme_pecan.wsexpose(resources.EventTrigger, types.uuid)
def get(self, id):
"""Returns the specified event_trigger."""
acl.enforce('event_trigger:get', auth_ctx.ctx())
LOG.info('Fetch event trigger [id=%s]', id)
db_model = db_api.get_event_trigger(id)
return resources.EventTrigger.from_dict(db_model.to_dict())
@rest_utils.wrap_wsme_controller_exception
@wsme_pecan.wsexpose(resources.EventTrigger, body=resources.EventTrigger,
status_code=201)
def post(self, event_trigger):
"""Creates a new event trigger."""
acl.enforce('event_trigger:create', auth_ctx.ctx())
values = event_trigger.to_dict()
input_keys = [k for k in values if values[k]]
if CREATE_MANDATORY - set(input_keys):
raise exc.EventTriggerException(
"Params %s must be provided for creating event trigger." %
CREATE_MANDATORY
)
LOG.info('Create event trigger: %s', values)
db_model = triggers.create_event_trigger(
values.get('name', ''),
values.get('exchange'),
values.get('topic'),
values.get('event'),
values.get('workflow_id'),
workflow_input=values.get('workflow_input'),
workflow_params=values.get('workflow_params'),
)
return resources.EventTrigger.from_dict(db_model.to_dict())
@rest_utils.wrap_wsme_controller_exception
@wsme_pecan.wsexpose(resources.EventTrigger, types.uuid,
body=resources.EventTrigger)
def put(self, id, event_trigger):
"""Updates an existing event trigger.
The exchange, topic and event can not be updated. The right way to
change them is to delete the event trigger first, then create a new
event trigger with new params.
"""
acl.enforce('event_trigger:update', auth_ctx.ctx())
values = event_trigger.to_dict()
for field in UPDATE_NOT_ALLOWED:
if values.get(field, None):
raise exc.EventTriggerException(
"Can not update fields %s of event trigger." %
UPDATE_NOT_ALLOWED
)
db_api.ensure_event_trigger_exists(id)
LOG.info('Update event trigger: [id=%s, values=%s]', id, values)
db_model = triggers.update_event_trigger(id, values)
return resources.EventTrigger.from_dict(db_model.to_dict())
@rest_utils.wrap_wsme_controller_exception
@wsme_pecan.wsexpose(None, types.uuid, status_code=204)
def delete(self, id):
"""Delete event trigger."""
acl.enforce('event_trigger:delete', auth_ctx.ctx())
LOG.info("Delete event trigger [id=%s]", id)
event_trigger = db_api.get_event_trigger(id)
triggers.delete_event_trigger(event_trigger.to_dict())
@wsme_pecan.wsexpose(resources.EventTriggers, types.uuid, int,
types.uniquelist, types.list, types.uniquelist,
types.jsontype)
def get_all(self, marker=None, limit=None, sort_keys='created_at',
sort_dirs='asc', fields='', **filters):
"""Return all event triggers."""
acl.enforce('event_trigger:list', auth_ctx.ctx())
LOG.info("Fetch event triggers. marker=%s, limit=%s, sort_keys=%s, "
"sort_dirs=%s, fields=%s, filters=%s", marker, limit,
sort_keys, sort_dirs, fields, filters)
return rest_utils.get_all(
resources.EventTriggers,
resources.EventTrigger,
db_api.get_event_triggers,
db_api.get_event_trigger,
resource_function=None,
marker=marker,
limit=limit,
sort_keys=sort_keys,
sort_dirs=sort_dirs,
fields=fields,
**filters
)

View File

@ -540,3 +540,56 @@ class Services(resource.Resource):
@classmethod
def sample(cls):
return cls(services=[Service.sample()])
class EventTrigger(resource.Resource):
"""EventTrigger resource."""
id = wsme.wsattr(wtypes.text, readonly=True)
created_at = wsme.wsattr(wtypes.text, readonly=True)
updated_at = wsme.wsattr(wtypes.text, readonly=True)
project_id = wsme.wsattr(wtypes.text, readonly=True)
name = wtypes.text
workflow_id = types.uuid
workflow_input = types.jsontype
workflow_params = types.jsontype
exchange = wtypes.text
topic = wtypes.text
event = wtypes.text
scope = SCOPE_TYPES
@classmethod
def sample(cls):
return cls(id='123e4567-e89b-12d3-a456-426655441414',
created_at='1970-01-01T00:00:00.000000',
updated_at='1970-01-01T00:00:00.000000',
project_id='project',
name='expiration_event_trigger',
workflow_id='123e4567-e89b-12d3-a456-426655441414',
workflow_input={},
workflow_params={},
exchange='nova',
topic='notifications',
event='compute.instance.create.end')
class EventTriggers(resource.ResourceList):
"""A collection of event triggers."""
event_triggers = [EventTrigger]
def __init__(self, **kwargs):
self._type = 'event_triggers'
super(EventTriggers, self).__init__(**kwargs)
@classmethod
def sample(cls):
triggers_sample = cls()
triggers_sample.event_triggers = [EventTrigger.sample()]
triggers_sample.next = ("http://localhost:8989/v2/event_triggers?"
"sort_keys=id,name&"
"sort_dirs=asc,desc&limit=10&"
"marker=123e4567-e89b-12d3-a456-426655440000")
return triggers_sample

View File

@ -22,6 +22,7 @@ from mistral.api.controllers.v2 import action
from mistral.api.controllers.v2 import action_execution
from mistral.api.controllers.v2 import cron_trigger
from mistral.api.controllers.v2 import environment
from mistral.api.controllers.v2 import event_trigger
from mistral.api.controllers.v2 import execution
from mistral.api.controllers.v2 import service
from mistral.api.controllers.v2 import task
@ -54,6 +55,7 @@ class Controller(object):
environments = environment.EnvironmentController()
action_executions = action_execution.ActionExecutionsController()
services = service.ServicesController()
event_triggers = event_trigger.EventTriggersController()
@wsme_pecan.wsexpose(RootResource)
def index(self):

View File

@ -146,6 +146,19 @@ class Executor(object):
raise NotImplementedError()
@six.add_metaclass(abc.ABCMeta)
class EventEngine(object):
"""Action event trigger interface."""
@abc.abstractmethod
def create_event_trigger(self, trigger, events):
raise NotImplementedError()
@abc.abstractmethod
def delete_event_trigger(self, trigger, events):
raise NotImplementedError()
@six.add_metaclass(abc.ABCMeta)
class TaskPolicy(object):
"""Task policy.

View File

@ -35,6 +35,7 @@ _TRANSPORT = None
_ENGINE_CLIENT = None
_EXECUTOR_CLIENT = None
_EVENT_ENGINE_CLIENT = None
def cleanup():
@ -43,10 +44,12 @@ def cleanup():
global _TRANSPORT
global _ENGINE_CLIENT
global _EXECUTOR_CLIENT
global _EVENT_ENGINE_CLIENT
_TRANSPORT = None
_ENGINE_CLIENT = None
_EXECUTOR_CLIENT = None
_EVENT_ENGINE_CLIENT = None
def get_transport():
@ -80,6 +83,17 @@ def get_executor_client():
return _EXECUTOR_CLIENT
def get_event_engine_client():
global _EVENT_ENGINE_CLIENT
if not _EVENT_ENGINE_CLIENT:
_EVENT_ENGINE_CLIENT = EventEngineClient(
rpc_utils.get_rpc_info_from_oslo(cfg.CONF.event_engine)
)
return _EVENT_ENGINE_CLIENT
def get_rpc_server_driver():
rpc_impl = cfg.CONF.rpc_implementation
@ -570,7 +584,62 @@ class ExecutorClient(base.Executor):
class EventEngineServer(object):
"""RPC Event Engine server."""
"""RPC EventEngine server."""
def __init__(self, event_engine):
self.event_engine = event_engine
self._event_engine = event_engine
def create_event_trigger(self, rpc_ctx, trigger, events):
LOG.info(
"Received RPC request 'create_event_trigger'[rpc_ctx=%s,"
" trigger=%s, events=%s", rpc_ctx, trigger, events
)
return self._event_engine.create_event_trigger(trigger, events)
def delete_event_trigger(self, rpc_ctx, trigger, events):
LOG.info(
"Received RPC request 'delete_event_trigger'[rpc_ctx=%s,"
" trigger=%s, events=%s", rpc_ctx, trigger, events
)
return self._event_engine.delete_event_trigger(trigger, events)
def update_event_trigger(self, rpc_ctx, trigger):
LOG.info(
"Received RPC request 'update_event_trigger'[rpc_ctx=%s,"
" trigger=%s", rpc_ctx, trigger
)
return self._event_engine.update_event_trigger(trigger)
class EventEngineClient(base.EventEngine):
"""RPC EventEngine client."""
def __init__(self, rpc_conf_dict):
"""Constructs an RPC client for the EventEngine service."""
self._client = get_rpc_client_driver()(rpc_conf_dict)
def create_event_trigger(self, trigger, events):
return self._client.sync_call(
auth_ctx.ctx(),
'create_event_trigger',
trigger=trigger,
events=events
)
def delete_event_trigger(self, trigger, events):
return self._client.sync_call(
auth_ctx.ctx(),
'delete_event_trigger',
trigger=trigger,
events=events
)
def update_event_trigger(self, trigger):
return self._client.sync_call(
auth_ctx.ctx(),
'update_event_trigger',
trigger=trigger,
)

View File

@ -153,6 +153,10 @@ class WorkflowException(MistralException):
http_code = 400
class EventTriggerException(MistralException):
http_code = 400
class InputException(MistralException):
http_code = 400

View File

@ -17,6 +17,7 @@ import datetime
import six
from mistral.db.v2 import api as db_api
from mistral.engine.rpc_backend import rpc
from mistral.engine import utils as eng_utils
from mistral import exceptions as exc
from mistral.services import security
@ -112,3 +113,75 @@ def create_cron_trigger(name, workflow_name, workflow_input,
trig = db_api.create_cron_trigger(values)
return trig
def create_event_trigger(name, exchange, topic, event, workflow_id,
workflow_input=None, workflow_params=None):
with db_api.transaction():
wf_def = db_api.get_workflow_definition_by_id(workflow_id)
eng_utils.validate_input(
wf_def,
workflow_input or {},
parser.get_workflow_spec_by_definition_id(
wf_def.id,
wf_def.updated_at
)
)
values = {
'name': name,
'workflow_id': workflow_id,
'workflow_input': workflow_input or {},
'workflow_params': workflow_params or {},
'exchange': exchange,
'topic': topic,
'event': event,
}
security.add_trust_id(values)
trig = db_api.create_event_trigger(values)
trigs = db_api.get_event_triggers(insecure=True, exchange=exchange,
topic=topic)
events = [t.event for t in trigs]
# NOTE(kong): Send RPC message within the db transaction, rollback if
# any error occurs.
rpc.get_event_engine_client().create_event_trigger(
trig.to_dict(),
events
)
return trig
def delete_event_trigger(event_trigger):
with db_api.transaction():
db_api.delete_event_trigger(event_trigger['id'])
trigs = db_api.get_event_triggers(
insecure=True,
exchange=event_trigger['exchange'],
topic=event_trigger['topic']
)
events = set([t.event for t in trigs])
# NOTE(kong): Send RPC message within the db transaction, rollback if
# any error occurs.
rpc.get_event_engine_client().delete_event_trigger(
event_trigger,
list(events)
)
def update_event_trigger(id, values):
with db_api.transaction():
trig = db_api.update_event_trigger(id, values)
# NOTE(kong): Send RPC message within the db transaction, rollback if
# any error occurs.
rpc.get_event_engine_client().update_event_trigger(trig.to_dict())
return trig

View File

@ -0,0 +1,211 @@
# Copyright 2016 Catalyst IT Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
import json
import mock
from mistral.db.v2 import api as db_api
from mistral.db.v2.sqlalchemy import models
from mistral import exceptions as exc
from mistral.tests.unit.api import base
WF = models.WorkflowDefinition(
spec={
'version': '2.0',
'name': 'my_wf',
'tasks': {
'task1': {
'action': 'std.noop'
}
}
}
)
WF.update({'id': '123e4567-e89b-12d3-a456-426655440000', 'name': 'my_wf'})
TRIGGER = {
'id': '09cc56a9-d15e-4494-a6e2-c4ec8bdaacae',
'name': 'my_event_trigger',
'workflow_id': '123e4567-e89b-12d3-a456-426655440000',
'workflow_input': '{}',
'workflow_params': '{}',
'scope': 'private',
'exchange': 'openstack',
'topic': 'notification',
'event': 'compute.instance.create.start'
}
trigger_values = copy.deepcopy(TRIGGER)
trigger_values['workflow_input'] = json.loads(
trigger_values['workflow_input'])
trigger_values['workflow_params'] = json.loads(
trigger_values['workflow_params'])
TRIGGER_DB = models.EventTrigger()
TRIGGER_DB.update(trigger_values)
MOCK_WF = mock.MagicMock(return_value=WF)
MOCK_TRIGGER = mock.MagicMock(return_value=TRIGGER_DB)
MOCK_TRIGGERS = mock.MagicMock(return_value=[TRIGGER_DB])
MOCK_NONE = mock.MagicMock(return_value=None)
MOCK_NOT_FOUND = mock.MagicMock(side_effect=exc.DBEntityNotFoundError())
class TestEventTriggerController(base.APITest):
@mock.patch.object(db_api, "get_event_trigger", MOCK_TRIGGER)
def test_get(self):
resp = self.app.get(
'/v2/event_triggers/09cc56a9-d15e-4494-a6e2-c4ec8bdaacae'
)
self.assertEqual(200, resp.status_int)
self.assertDictEqual(TRIGGER, resp.json)
@mock.patch.object(db_api, "get_event_trigger", MOCK_NOT_FOUND)
def test_get_not_found(self):
resp = self.app.get(
'/v2/event_triggers/09cc56a9-d15e-4494-a6e2-c4ec8bdaacae',
expect_errors=True
)
self.assertEqual(404, resp.status_int)
@mock.patch.object(db_api, "get_workflow_definition_by_id", MOCK_WF)
@mock.patch.object(db_api, "get_workflow_definition", MOCK_WF)
@mock.patch.object(db_api, "create_event_trigger", MOCK_TRIGGER)
@mock.patch.object(db_api, "get_event_triggers", MOCK_TRIGGERS)
@mock.patch('mistral.engine.rpc_backend.rpc.get_event_engine_client')
def test_post(self, mock_rpc_client):
client = mock.Mock()
mock_rpc_client.return_value = client
CREATE_TRIGGER = copy.deepcopy(TRIGGER)
CREATE_TRIGGER.pop('id')
resp = self.app.post_json('/v2/event_triggers', CREATE_TRIGGER)
self.assertEqual(201, resp.status_int)
self.assertEqual(1, client.create_event_trigger.call_count)
self.assertDictEqual(
TRIGGER_DB.to_dict(),
client.create_event_trigger.call_args[0][0]
)
self.assertListEqual(
['compute.instance.create.start'],
client.create_event_trigger.call_args[0][1]
)
def test_post_no_workflow_id(self):
CREATE_TRIGGER = copy.deepcopy(TRIGGER)
CREATE_TRIGGER.pop('id')
CREATE_TRIGGER.pop('workflow_id')
resp = self.app.post_json(
'/v2/event_triggers',
CREATE_TRIGGER,
expect_errors=True
)
self.assertEqual(400, resp.status_int)
@mock.patch.object(db_api, "get_workflow_definition_by_id", MOCK_NOT_FOUND)
def test_post_workflow_not_found(self):
CREATE_TRIGGER = copy.deepcopy(TRIGGER)
CREATE_TRIGGER.pop('id')
resp = self.app.post_json(
'/v2/event_triggers',
CREATE_TRIGGER,
expect_errors=True
)
self.assertEqual(404, resp.status_int)
@mock.patch.object(db_api, 'ensure_event_trigger_exists', MOCK_NONE)
@mock.patch('mistral.engine.rpc_backend.rpc.get_event_engine_client')
@mock.patch('mistral.db.v2.api.update_event_trigger')
def test_put(self, mock_update, mock_rpc_client):
client = mock.Mock()
mock_rpc_client.return_value = client
UPDATED_TRIGGER = models.EventTrigger()
UPDATED_TRIGGER.update(trigger_values)
UPDATED_TRIGGER.update({'name': 'new_name'})
mock_update.return_value = UPDATED_TRIGGER
resp = self.app.put_json(
'/v2/event_triggers/09cc56a9-d15e-4494-a6e2-c4ec8bdaacae',
{'name': 'new_name'}
)
self.assertEqual(200, resp.status_int)
self.assertEqual(1, client.update_event_trigger.call_count)
self.assertDictEqual(
UPDATED_TRIGGER.to_dict(),
client.update_event_trigger.call_args[0][0]
)
def test_put_field_not_allowed(self):
resp = self.app.put_json(
'/v2/event_triggers/09cc56a9-d15e-4494-a6e2-c4ec8bdaacae',
{'exchange': 'new_exchange'},
expect_errors=True
)
self.assertEqual(400, resp.status_int)
@mock.patch('mistral.engine.rpc_backend.rpc.get_event_engine_client')
@mock.patch.object(db_api, "get_event_trigger", MOCK_TRIGGER)
@mock.patch.object(db_api, "get_event_triggers",
mock.MagicMock(return_value=[]))
@mock.patch.object(db_api, "delete_event_trigger", MOCK_NONE)
def test_delete(self, mock_rpc_client):
client = mock.Mock()
mock_rpc_client.return_value = client
resp = self.app.delete(
'/v2/event_triggers/09cc56a9-d15e-4494-a6e2-c4ec8bdaacae'
)
self.assertEqual(204, resp.status_int)
self.assertEqual(1, client.delete_event_trigger.call_count)
self.assertDictEqual(
TRIGGER_DB.to_dict(),
client.delete_event_trigger.call_args[0][0]
)
self.assertListEqual(
[],
client.delete_event_trigger.call_args[0][1]
)
@mock.patch.object(db_api, "get_event_trigger", MOCK_NOT_FOUND)
def test_delete_not_found(self):
resp = self.app.delete(
'/v2/event_triggers/09cc56a9-d15e-4494-a6e2-c4ec8bdaacae',
expect_errors=True
)
self.assertEqual(404, resp.status_int)
@mock.patch.object(db_api, "get_event_triggers", MOCK_TRIGGERS)
def test_get_all(self):
resp = self.app.get('/v2/event_triggers')
self.assertEqual(200, resp.status_int)
self.assertEqual(1, len(resp.json['event_triggers']))
self.assertDictEqual(TRIGGER, resp.json['event_triggers'][0])