Added subscriber and publisher modules

For each action, except for creation of a new project, two messages will
be sent to rabbitmq. One with the main resource; resource_id; user_id;
method and the other with the event_id; user_id; a faked method POST and
a faked resource TIMELINE_EVENT

Publisher creates an exchange called 'storyboard' and publishes the
messages to it. Subscriber creates the queues with different binding_keys
which bind themself to the storyboard exchange and start consuming the
messages from the exchange.

The consumed messages are used by CR 113016.

Co-Authored-By: Nikita Konovalov <nkonovalov@mirantis.com>

Change-Id: Ia573437302dc2d0b1a68d2343e83f9dd397fac04
This commit is contained in:
Aishwarya Thangappa 2014-08-14 13:44:41 -07:00
parent d576442b69
commit c8cbc9720d
11 changed files with 324 additions and 3 deletions

View File

@ -46,6 +46,30 @@ lock_path = $state_path/lock
# page_size_maximum = 500
# page_size_default = 20
# Enable notifications. This feature drives deferred processing, reporting,
# and subscriptions.
# enable_notifications = True
[notifications]
# Host of the rabbitmq server.
# rabbit_host=localhost
# The RabbitMQ login method
# rabbit_login_method = AMQPLAIN
# The RabbitMQ userid.
# rabbit_userid = storyboard
# The RabbitMQ password.
# rabbit_password = storyboard
# The RabbitMQ broker port where a single node is used.
# rabbit_port = 5672
# The virtual host within which our queues and exchanges live.
# rabbit_virtual_host = /
[database]
# This line MUST be changed to actually run storyboard
# Example:

View File

@ -7,6 +7,7 @@ oauthlib>=0.6
oslo.config>=1.2.1
pecan>=0.4.5
oslo.db>=0.2.0
pika>=0.9.14
python-openid
PyYAML>=3.1.0
requests>=1.1

View File

@ -32,6 +32,7 @@ data_files =
[entry_points]
console_scripts =
storyboard-api = storyboard.api.app:start
storyboard-subscriber = storyboard.notifications.subscriber:subscribe
storyboard-db-manage = storyboard.db.migration.cli:main
[build_sphinx]

View File

@ -22,14 +22,17 @@ from wsgiref import simple_server
from storyboard.api.auth.token_storage import impls as storage_impls
from storyboard.api.auth.token_storage import storage
from storyboard.api import config as api_config
from storyboard.api.middleware import resource_hook
from storyboard.api.middleware import token_middleware
from storyboard.api.middleware import user_id_hook
from storyboard.api.v1.search import impls as search_engine_impls
from storyboard.api.v1.search import search_engine
from storyboard.notifications import connection_service
from storyboard.openstack.common.gettextutils import _ # noqa
from storyboard.openstack.common import log
CONF = cfg.CONF
LOG = log.getLogger(__name__)
API_OPTS = [
@ -38,7 +41,10 @@ API_OPTS = [
help='API host'),
cfg.IntOpt('bind_port',
default=8080,
help='API port')
help='API port'),
cfg.BoolOpt('enable_notifications',
default=False,
help='Enable Notifications')
]
CONF.register_opts(API_OPTS)
@ -62,6 +68,10 @@ def setup_app(pecan_config=None):
])
log.setup('storyboard')
hooks = [
user_id_hook.UserIdHook()
]
# Setup token storage
token_storage_type = CONF.token_storage_type
storage_cls = storage_impls.STORAGE_IMPLS[token_storage_type]
@ -72,10 +82,15 @@ def setup_app(pecan_config=None):
search_engine_cls = search_engine_impls.ENGINE_IMPLS[search_engine_name]
search_engine.set_engine(search_engine_cls())
# Setup notifier
if CONF.enable_notifications:
connection_service.initialize()
hooks.append(resource_hook.ResourceHook())
app = pecan.make_app(
pecan_config.app.root,
debug=CONF.debug,
hooks=[user_id_hook.UserIdHook()],
hooks=hooks,
force_canonical=getattr(pecan_config.app, 'force_canonical', True),
guess_content_type_from_ext=False
)

View File

@ -0,0 +1,31 @@
# Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from pecan import hooks
from storyboard.notifications import publisher
class ResourceHook(hooks.PecanHook):
def __init__(self):
super(ResourceHook, self).__init__()
def after(self, state):
# Ignore get methods, we only care about changes.
if state.request.method == 'GET':
return
publisher.publish(state)

View File

@ -14,10 +14,15 @@
# limitations under the License.
import json
from oslo.config import cfg
from pecan import request
from storyboard.common import event_types
from storyboard.db.api import base as api_base
from storyboard.db import models
from storyboard.notifications import connection_service
CONF = cfg.CONF
def event_get(event_id):
@ -40,7 +45,30 @@ def events_get_count(**kwargs):
def event_create(values):
return api_base.entity_create(models.TimeLineEvent, values)
new_event = api_base.entity_create(models.TimeLineEvent, values)
if CONF.enable_notifications:
payload_timeline_events = {
"user_id": request.current_user_id,
"method": "POST",
"resource": "timeline_event",
"event_id": new_event.id
}
payload_timeline_events = json.dumps(payload_timeline_events)
routing_key = "timeline_events"
conn = connection_service.get_connection()
channel = conn.connection.channel()
conn.create_exchange(channel, 'storyboard', 'topic')
channel.basic_publish(exchange='storyboard',
routing_key=routing_key,
body=payload_timeline_events)
channel.close()
return new_event
def story_created_event(story_id, author_id):

View File

View File

@ -0,0 +1,34 @@
# Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo.config import cfg
CONF = cfg.CONF
NOTIFICATION_OPTS = [
cfg.StrOpt("rabbit_host", default="localhost",
help="Host of the rabbitmq server."),
cfg.StrOpt("rabbit_login_method", default="AMQPLAIN",
help="The RabbitMQ login method."),
cfg.StrOpt("rabbit_userid", default="storyboard",
help="The RabbitMQ userid."),
cfg.StrOpt("rabbit_password", default="storyboard",
help="The RabbitMQ password."),
cfg.IntOpt("rabbit_port", default=5672,
help="The RabbitMQ broker port where a single node is used."),
cfg.StrOpt("rabbit_virtual_host", default="/",
help="The virtual host within which our queues and exchanges "
"live."),
]

View File

@ -0,0 +1,62 @@
# Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pika
from oslo.config import cfg
from storyboard.notifications.conf import NOTIFICATION_OPTS
from storyboard.openstack.common import log
CONF = cfg.CONF
CONN = None
LOG = log.getLogger(__name__)
class ConnectionService:
def __init__(self, conf):
self.credentials = pika.PlainCredentials(
conf.rabbit_userid,
conf.rabbit_password)
self.connection = pika.BlockingConnection(pika.ConnectionParameters(
conf.rabbit_host,
conf.rabbit_port,
conf.rabbit_virtual_host,
self.credentials))
def create_exchange(self, channel, exchange, type):
self.exchange = exchange
self.type = type
self.channel = channel
self.channel.exchange_declare(exchange=self.exchange,
type=self.type, durable=True)
def close_connection(self):
self.connection.close()
def initialize():
# Initialize the AMQP event publisher.
global CONN
CONF.register_opts(NOTIFICATION_OPTS, "notifications")
CONN = ConnectionService(CONF.notifications)
def get_connection():
global CONN
return CONN

View File

@ -0,0 +1,77 @@
# Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import re
from oslo.config import cfg
from storyboard.notifications import connection_service
from storyboard.openstack.common import log
CONF = cfg.CONF
LOG = log.getLogger(__name__)
def publish(state):
def parse(s):
url_pattern = re.match("^\/v1\/([a-z]+)\/?([0-9]+)?"
"\/?([a-z]+)?$", s)
if url_pattern and url_pattern.groups()[0] != "openid":
return url_pattern.groups()
else:
return
request = state.request
req_method = request.method
req_user_id = request.current_user_id
req_path = request.path
req_resource_grp = parse(req_path)
if req_resource_grp:
resource = req_resource_grp[0]
resource_id = req_resource_grp[1]
else:
return
if not resource_id:
response_str = state.response.body
response = json.loads(response_str)
if response:
resource_id = response.get('id')
else:
resource_id = None
payload = {
"user_id": req_user_id,
"method": req_method,
"resource_name": resource,
"resource_id": resource_id,
}
payload = json.dumps(payload)
routing_key = resource
conn = connection_service.get_connection()
channel = conn.connection.channel()
conn.create_exchange(channel, 'storyboard', 'topic')
channel.basic_publish(exchange='storyboard',
routing_key=routing_key,
body=payload)
channel.close()

View File

@ -0,0 +1,48 @@
# Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo.config import cfg
from storyboard.notifications import connection_service
from storyboard.openstack.common import log
CONF = cfg.CONF
LOG = log.getLogger(__name__)
def subscribe():
CONF(project='storyboard')
connection_service.initialize()
conn = connection_service.get_connection()
channel = conn.connection.channel()
conn.create_exchange(channel, 'storyboard', 'topic')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
binding_keys = ['projects', 'tasks', 'stories', 'timeline_events']
for binding_key in binding_keys:
channel.queue_bind(exchange='storyboard',
queue=queue_name,
routing_key=binding_key)
def callback(ch, method, properties, body):
print(" [x] %r %r %r %r"
% (method.routing_key, body, ch, properties))
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()