Added subscriber and publisher modules

For each action, except for creation of a new project, two messages will
be sent to rabbitmq. One with the main resource; resource_id; user_id;
method and the other with the event_id; user_id; a faked method POST and
a faked resource TIMELINE_EVENT

Publisher creates an exchange called 'storyboard' and publishes the
messages to it. Subscriber creates the queues with different binding_keys
which bind themself to the storyboard exchange and start consuming the
messages from the exchange.

The consumed messages will be printed on the console for now which will
be modified later.

Do not merge until storyboard has Rabbitmq running.

Change-Id: Ic4697f79aaab82dadf1fb1ae66f414a90ae28dac
This commit is contained in:
Nikita Konovalov 2014-06-26 18:26:40 +04:00 committed by Michael Krotscheck
parent cc0b5cccd9
commit 9a5a5b59d6
11 changed files with 324 additions and 3 deletions

View File

@ -46,6 +46,30 @@ lock_path = $state_path/lock
# page_size_maximum = 500
# page_size_default = 20
# Enable notifications. This feature drives deferred processing, reporting,
# and subscriptions.
# enable_notifications = True
[notifications]
# Host of the rabbitmq server.
# rabbit_host=localhost
# The RabbitMQ login method
# rabbit_login_method = AMQPLAIN
# The RabbitMQ userid.
# rabbit_userid = storyboard
# The RabbitMQ password.
# rabbit_password = storyboard
# The RabbitMQ broker port where a single node is used.
# rabbit_port = 5672
# The virtual host within which our queues and exchanges live.
# rabbit_virtual_host = /
[database]
# This line MUST be changed to actually run storyboard
# Example:

View File

@ -7,6 +7,7 @@ oauthlib>=0.6
oslo.config>=1.2.1
pecan>=0.4.5
oslo.db>=0.2.0
pika>=0.9.14
python-openid
PyYAML>=3.1.0
requests>=1.1

View File

@ -32,6 +32,7 @@ data_files =
[entry_points]
console_scripts =
storyboard-api = storyboard.api.app:start
storyboard-subscriber = storyboard.notifications.subscriber:subscribe
storyboard-db-manage = storyboard.db.migration.cli:main
[build_sphinx]

View File

@ -22,14 +22,17 @@ from wsgiref import simple_server
from storyboard.api.auth.token_storage import impls as storage_impls
from storyboard.api.auth.token_storage import storage
from storyboard.api import config as api_config
from storyboard.api.middleware import resource_hook
from storyboard.api.middleware import token_middleware
from storyboard.api.middleware import user_id_hook
from storyboard.api.v1.search import impls as search_engine_impls
from storyboard.api.v1.search import search_engine
from storyboard.notifications import connection_service
from storyboard.openstack.common.gettextutils import _ # noqa
from storyboard.openstack.common import log
CONF = cfg.CONF
LOG = log.getLogger(__name__)
API_OPTS = [
@ -38,7 +41,10 @@ API_OPTS = [
help='API host'),
cfg.IntOpt('bind_port',
default=8080,
help='API port')
help='API port'),
cfg.BoolOpt('enable_notifications',
default=False,
help='Enable Notifications')
]
CONF.register_opts(API_OPTS)
@ -62,6 +68,10 @@ def setup_app(pecan_config=None):
])
log.setup('storyboard')
hooks = [
user_id_hook.UserIdHook()
]
# Setup token storage
token_storage_type = CONF.token_storage_type
storage_cls = storage_impls.STORAGE_IMPLS[token_storage_type]
@ -72,10 +82,15 @@ def setup_app(pecan_config=None):
search_engine_cls = search_engine_impls.ENGINE_IMPLS[search_engine_name]
search_engine.set_engine(search_engine_cls())
# Setup notifier
if CONF.enable_notifications:
connection_service.initialize()
hooks.append(resource_hook.ResourceHook())
app = pecan.make_app(
pecan_config.app.root,
debug=CONF.debug,
hooks=[user_id_hook.UserIdHook()],
hooks=hooks,
force_canonical=getattr(pecan_config.app, 'force_canonical', True),
guess_content_type_from_ext=False
)

View File

@ -0,0 +1,31 @@
# Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from pecan import hooks
from storyboard.notifications import publisher
class ResourceHook(hooks.PecanHook):
def __init__(self):
super(ResourceHook, self).__init__()
def after(self, state):
# Ignore get methods, we only care about changes.
if state.request.method == 'GET':
return
publisher.publish(state)

View File

@ -14,10 +14,15 @@
# limitations under the License.
import json
from oslo.config import cfg
from pecan import request
from storyboard.common import event_types
from storyboard.db.api import base as api_base
from storyboard.db import models
from storyboard.notifications import connection_service
CONF = cfg.CONF
def event_get(event_id):
@ -40,7 +45,30 @@ def events_get_count(**kwargs):
def event_create(values):
return api_base.entity_create(models.TimeLineEvent, values)
new_event = api_base.entity_create(models.TimeLineEvent, values)
if CONF.enable_notifications:
payload_timeline_events = {
"user_id": request.current_user_id,
"method": "POST",
"resource": "timeline_event",
"event_id": new_event.id
}
payload_timeline_events = json.dumps(payload_timeline_events)
routing_key = "timeline_events"
conn = connection_service.get_connection()
channel = conn.connection.channel()
conn.create_exchange(channel, 'storyboard', 'topic')
channel.basic_publish(exchange='storyboard',
routing_key=routing_key,
body=payload_timeline_events)
channel.close()
return new_event
def story_created_event(story_id, author_id):

View File

View File

@ -0,0 +1,34 @@
# Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo.config import cfg
CONF = cfg.CONF
NOTIFICATION_OPTS = [
cfg.StrOpt("rabbit_host", default="localhost",
help="Host of the rabbitmq server."),
cfg.StrOpt("rabbit_login_method", default="AMQPLAIN",
help="The RabbitMQ login method."),
cfg.StrOpt("rabbit_userid", default="storyboard",
help="The RabbitMQ userid."),
cfg.StrOpt("rabbit_password", default="storyboard",
help="The RabbitMQ password."),
cfg.IntOpt("rabbit_port", default=5672,
help="The RabbitMQ broker port where a single node is used."),
cfg.StrOpt("rabbit_virtual_host", default="/",
help="The virtual host within which our queues and exchanges "
"live."),
]

View File

@ -0,0 +1,62 @@
# Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pika
from oslo.config import cfg
from storyboard.notifications.conf import NOTIFICATION_OPTS
from storyboard.openstack.common import log
CONF = cfg.CONF
CONN = None
LOG = log.getLogger(__name__)
class ConnectionService:
def __init__(self, conf):
self.credentials = pika.PlainCredentials(
conf.rabbit_userid,
conf.rabbit_password)
self.connection = pika.BlockingConnection(pika.ConnectionParameters(
conf.rabbit_host,
conf.rabbit_port,
conf.rabbit_virtual_host,
self.credentials))
def create_exchange(self, channel, exchange, type):
self.exchange = exchange
self.type = type
self.channel = channel
self.channel.exchange_declare(exchange=self.exchange,
type=self.type, durable=True)
def close_connection(self):
self.connection.close()
def initialize():
# Initialize the AMQP event publisher.
global CONN
CONF.register_opts(NOTIFICATION_OPTS, "notifications")
CONN = ConnectionService(CONF.notifications)
def get_connection():
global CONN
return CONN

View File

@ -0,0 +1,77 @@
# Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import re
from oslo.config import cfg
from storyboard.notifications import connection_service
from storyboard.openstack.common import log
CONF = cfg.CONF
LOG = log.getLogger(__name__)
def publish(state):
def parse(s):
url_pattern = re.match("^\/v1\/([a-z]+)\/?([0-9]+)?"
"\/?([a-z]+)?$", s)
if url_pattern and url_pattern.groups()[0] != "openid":
return url_pattern.groups()
else:
return
request = state.request
req_method = request.method
req_user_id = request.current_user_id
req_path = request.path
req_resource_grp = parse(req_path)
if req_resource_grp:
resource = req_resource_grp[0]
resource_id = req_resource_grp[1]
else:
return
if not resource_id:
response_str = state.response.body
response = json.loads(response_str)
if response:
resource_id = response.get('id')
else:
resource_id = None
payload = {
"user_id": req_user_id,
"method": req_method,
"resource_name": resource,
"resource_id": resource_id,
}
payload = json.dumps(payload)
routing_key = resource
conn = connection_service.get_connection()
channel = conn.connection.channel()
conn.create_exchange(channel, 'storyboard', 'topic')
channel.basic_publish(exchange='storyboard',
routing_key=routing_key,
body=payload)
channel.close()

View File

@ -0,0 +1,48 @@
# Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo.config import cfg
from storyboard.notifications import connection_service
from storyboard.openstack.common import log
CONF = cfg.CONF
LOG = log.getLogger(__name__)
def subscribe():
CONF(project='storyboard')
connection_service.initialize()
conn = connection_service.get_connection()
channel = conn.connection.channel()
conn.create_exchange(channel, 'storyboard', 'topic')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
binding_keys = ['projects', 'tasks', 'stories', 'timeline_events']
for binding_key in binding_keys:
channel.queue_bind(exchange='storyboard',
queue=queue_name,
routing_key=binding_key)
def callback(ch, method, properties, body):
print(" [x] %r %r %r %r"
% (method.routing_key, body, ch, properties))
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()