diff --git a/etc/storyboard.conf.sample b/etc/storyboard.conf.sample index 6d3d57a3..a3f52772 100644 --- a/etc/storyboard.conf.sample +++ b/etc/storyboard.conf.sample @@ -46,6 +46,30 @@ lock_path = $state_path/lock # page_size_maximum = 500 # page_size_default = 20 +# Enable notifications. This feature drives deferred processing, reporting, +# and subscriptions. +# enable_notifications = True + +[notifications] + +# Host of the rabbitmq server. +# rabbit_host=localhost + +# The RabbitMQ login method +# rabbit_login_method = AMQPLAIN + +# The RabbitMQ userid. +# rabbit_userid = storyboard + +# The RabbitMQ password. +# rabbit_password = storyboard + +# The RabbitMQ broker port where a single node is used. +# rabbit_port = 5672 + +# The virtual host within which our queues and exchanges live. +# rabbit_virtual_host = / + [database] # This line MUST be changed to actually run storyboard # Example: diff --git a/requirements.txt b/requirements.txt index e5d67e15..41d1255f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,6 +7,7 @@ oauthlib>=0.6 oslo.config>=1.2.1 pecan>=0.4.5 oslo.db>=0.2.0 +pika>=0.9.14 python-openid PyYAML>=3.1.0 requests>=1.1 diff --git a/setup.cfg b/setup.cfg index cfc6a9f8..3a64f92b 100644 --- a/setup.cfg +++ b/setup.cfg @@ -32,6 +32,7 @@ data_files = [entry_points] console_scripts = storyboard-api = storyboard.api.app:start + storyboard-subscriber = storyboard.notifications.subscriber:subscribe storyboard-db-manage = storyboard.db.migration.cli:main [build_sphinx] diff --git a/storyboard/api/app.py b/storyboard/api/app.py index 58bfaefa..a2f4a0d7 100644 --- a/storyboard/api/app.py +++ b/storyboard/api/app.py @@ -22,14 +22,17 @@ from wsgiref import simple_server from storyboard.api.auth.token_storage import impls as storage_impls from storyboard.api.auth.token_storage import storage from storyboard.api import config as api_config +from storyboard.api.middleware import resource_hook from storyboard.api.middleware import token_middleware from storyboard.api.middleware import user_id_hook from storyboard.api.v1.search import impls as search_engine_impls from storyboard.api.v1.search import search_engine +from storyboard.notifications import connection_service from storyboard.openstack.common.gettextutils import _ # noqa from storyboard.openstack.common import log CONF = cfg.CONF + LOG = log.getLogger(__name__) API_OPTS = [ @@ -38,7 +41,10 @@ API_OPTS = [ help='API host'), cfg.IntOpt('bind_port', default=8080, - help='API port') + help='API port'), + cfg.BoolOpt('enable_notifications', + default=False, + help='Enable Notifications') ] CONF.register_opts(API_OPTS) @@ -62,6 +68,10 @@ def setup_app(pecan_config=None): ]) log.setup('storyboard') + hooks = [ + user_id_hook.UserIdHook() + ] + # Setup token storage token_storage_type = CONF.token_storage_type storage_cls = storage_impls.STORAGE_IMPLS[token_storage_type] @@ -72,10 +82,15 @@ def setup_app(pecan_config=None): search_engine_cls = search_engine_impls.ENGINE_IMPLS[search_engine_name] search_engine.set_engine(search_engine_cls()) + # Setup notifier + if CONF.enable_notifications: + connection_service.initialize() + hooks.append(resource_hook.ResourceHook()) + app = pecan.make_app( pecan_config.app.root, debug=CONF.debug, - hooks=[user_id_hook.UserIdHook()], + hooks=hooks, force_canonical=getattr(pecan_config.app, 'force_canonical', True), guess_content_type_from_ext=False ) diff --git a/storyboard/api/middleware/resource_hook.py b/storyboard/api/middleware/resource_hook.py new file mode 100644 index 00000000..63b52521 --- /dev/null +++ b/storyboard/api/middleware/resource_hook.py @@ -0,0 +1,31 @@ +# Copyright (c) 2014 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from pecan import hooks + +from storyboard.notifications import publisher + + +class ResourceHook(hooks.PecanHook): + + def __init__(self): + super(ResourceHook, self).__init__() + + def after(self, state): + # Ignore get methods, we only care about changes. + if state.request.method == 'GET': + return + + publisher.publish(state) diff --git a/storyboard/db/api/timeline_events.py b/storyboard/db/api/timeline_events.py index 8ed2a253..d277f0ba 100644 --- a/storyboard/db/api/timeline_events.py +++ b/storyboard/db/api/timeline_events.py @@ -14,10 +14,15 @@ # limitations under the License. import json +from oslo.config import cfg +from pecan import request from storyboard.common import event_types from storyboard.db.api import base as api_base from storyboard.db import models +from storyboard.notifications import connection_service + +CONF = cfg.CONF def event_get(event_id): @@ -40,7 +45,30 @@ def events_get_count(**kwargs): def event_create(values): - return api_base.entity_create(models.TimeLineEvent, values) + new_event = api_base.entity_create(models.TimeLineEvent, values) + + if CONF.enable_notifications: + + payload_timeline_events = { + "user_id": request.current_user_id, + "method": "POST", + "resource": "timeline_event", + "event_id": new_event.id + } + payload_timeline_events = json.dumps(payload_timeline_events) + routing_key = "timeline_events" + + conn = connection_service.get_connection() + channel = conn.connection.channel() + conn.create_exchange(channel, 'storyboard', 'topic') + + channel.basic_publish(exchange='storyboard', + routing_key=routing_key, + body=payload_timeline_events) + + channel.close() + + return new_event def story_created_event(story_id, author_id): diff --git a/storyboard/notifications/__init__.py b/storyboard/notifications/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/storyboard/notifications/conf.py b/storyboard/notifications/conf.py new file mode 100644 index 00000000..9a4ffb8d --- /dev/null +++ b/storyboard/notifications/conf.py @@ -0,0 +1,34 @@ +# Copyright (c) 2014 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo.config import cfg + +CONF = cfg.CONF + +NOTIFICATION_OPTS = [ + cfg.StrOpt("rabbit_host", default="localhost", + help="Host of the rabbitmq server."), + cfg.StrOpt("rabbit_login_method", default="AMQPLAIN", + help="The RabbitMQ login method."), + cfg.StrOpt("rabbit_userid", default="storyboard", + help="The RabbitMQ userid."), + cfg.StrOpt("rabbit_password", default="storyboard", + help="The RabbitMQ password."), + cfg.IntOpt("rabbit_port", default=5672, + help="The RabbitMQ broker port where a single node is used."), + cfg.StrOpt("rabbit_virtual_host", default="/", + help="The virtual host within which our queues and exchanges " + "live."), +] diff --git a/storyboard/notifications/connection_service.py b/storyboard/notifications/connection_service.py new file mode 100644 index 00000000..be99d550 --- /dev/null +++ b/storyboard/notifications/connection_service.py @@ -0,0 +1,62 @@ +# Copyright (c) 2014 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pika + +from oslo.config import cfg + +from storyboard.notifications.conf import NOTIFICATION_OPTS +from storyboard.openstack.common import log + +CONF = cfg.CONF +CONN = None + +LOG = log.getLogger(__name__) + + +class ConnectionService: + + def __init__(self, conf): + self.credentials = pika.PlainCredentials( + conf.rabbit_userid, + conf.rabbit_password) + + self.connection = pika.BlockingConnection(pika.ConnectionParameters( + conf.rabbit_host, + conf.rabbit_port, + conf.rabbit_virtual_host, + self.credentials)) + + def create_exchange(self, channel, exchange, type): + self.exchange = exchange + self.type = type + self.channel = channel + self.channel.exchange_declare(exchange=self.exchange, + type=self.type, durable=True) + + def close_connection(self): + self.connection.close() + + +def initialize(): + # Initialize the AMQP event publisher. + global CONN + CONF.register_opts(NOTIFICATION_OPTS, "notifications") + CONN = ConnectionService(CONF.notifications) + + +def get_connection(): + global CONN + return CONN diff --git a/storyboard/notifications/publisher.py b/storyboard/notifications/publisher.py new file mode 100644 index 00000000..9ac40fe7 --- /dev/null +++ b/storyboard/notifications/publisher.py @@ -0,0 +1,77 @@ +# Copyright (c) 2014 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import re + +from oslo.config import cfg + +from storyboard.notifications import connection_service +from storyboard.openstack.common import log + +CONF = cfg.CONF + +LOG = log.getLogger(__name__) + + +def publish(state): + + def parse(s): + url_pattern = re.match("^\/v1\/([a-z]+)\/?([0-9]+)?" + "\/?([a-z]+)?$", s) + if url_pattern and url_pattern.groups()[0] != "openid": + return url_pattern.groups() + else: + return + + request = state.request + req_method = request.method + req_user_id = request.current_user_id + req_path = request.path + req_resource_grp = parse(req_path) + + if req_resource_grp: + resource = req_resource_grp[0] + resource_id = req_resource_grp[1] + else: + return + + if not resource_id: + response_str = state.response.body + response = json.loads(response_str) + + if response: + resource_id = response.get('id') + else: + resource_id = None + + payload = { + "user_id": req_user_id, + "method": req_method, + "resource_name": resource, + "resource_id": resource_id, + } + + payload = json.dumps(payload) + routing_key = resource + conn = connection_service.get_connection() + channel = conn.connection.channel() + + conn.create_exchange(channel, 'storyboard', 'topic') + + channel.basic_publish(exchange='storyboard', + routing_key=routing_key, + body=payload) + channel.close() diff --git a/storyboard/notifications/subscriber.py b/storyboard/notifications/subscriber.py new file mode 100644 index 00000000..83d70521 --- /dev/null +++ b/storyboard/notifications/subscriber.py @@ -0,0 +1,48 @@ +# Copyright (c) 2014 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo.config import cfg +from storyboard.notifications import connection_service +from storyboard.openstack.common import log + +CONF = cfg.CONF +LOG = log.getLogger(__name__) + + +def subscribe(): + + CONF(project='storyboard') + connection_service.initialize() + conn = connection_service.get_connection() + channel = conn.connection.channel() + conn.create_exchange(channel, 'storyboard', 'topic') + result = channel.queue_declare(exclusive=True) + queue_name = result.method.queue + binding_keys = ['projects', 'tasks', 'stories', 'timeline_events'] + + for binding_key in binding_keys: + channel.queue_bind(exchange='storyboard', + queue=queue_name, + routing_key=binding_key) + + def callback(ch, method, properties, body): + print(" [x] %r %r %r %r" + % (method.routing_key, body, ch, properties)) + + channel.basic_consume(callback, + queue=queue_name, + no_ack=True) + + channel.start_consuming()