Added robust message publisher and subscriber

AMQP message broadcasting did not reconnect when, for some reason,
the server disconnected (ex: restart). This change builds on the
ConnectionService class to make it a generic, self-healing
connection manager that may be extended. The publisher and
subscriber logic was then built on top of ConnectionService to
take advantage of connection management.

- New self-healing, lazy initializing connection service.
- Publisher extends connection service.
- Subscriber extends connection service.
- ResourceHook was moved into notifications as the NotificationHook.
- Configuration options for explicit exchange and queue naming added.

Change-Id: Ib57c56a38574a0c70db9066625aef75ff8891c93
This commit is contained in:
Michael Krotscheck 2014-09-04 13:37:08 -07:00
parent 8d86f7eac5
commit 6df6a6037f
9 changed files with 404 additions and 144 deletions

View File

@ -70,6 +70,15 @@ lock_path = $state_path/lock
# The virtual host within which our queues and exchanges live.
# rabbit_virtual_host = /
# Application name that binds to rabbit.
# rabbit_application_name=storyboard
# The name of the topic exchange to which storyboard will broadcast its events.
# rabbit_exchange_name=storyboard
# The name of the queue that will be created for API events.
# rabbit_event_queue_name=storyboard_events
[database]
# This line MUST be changed to actually run storyboard
# Example:

View File

@ -22,12 +22,11 @@ from wsgiref import simple_server
from storyboard.api.auth.token_storage import impls as storage_impls
from storyboard.api.auth.token_storage import storage
from storyboard.api import config as api_config
from storyboard.api.middleware import resource_hook
from storyboard.api.middleware import token_middleware
from storyboard.api.middleware import user_id_hook
from storyboard.api.v1.search import impls as search_engine_impls
from storyboard.api.v1.search import search_engine
from storyboard.notifications import connection_service
from storyboard.notifications.notification_hook import NotificationHook
from storyboard.openstack.common.gettextutils import _ # noqa
from storyboard.openstack.common import log
@ -84,8 +83,7 @@ def setup_app(pecan_config=None):
# Setup notifier
if CONF.enable_notifications:
connection_service.initialize()
hooks.append(resource_hook.ResourceHook())
hooks.append(NotificationHook())
app = pecan.make_app(
pecan_config.app.root,

View File

@ -1,31 +0,0 @@
# Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from pecan import hooks
from storyboard.notifications import publisher
class ResourceHook(hooks.PecanHook):
def __init__(self):
super(ResourceHook, self).__init__()
def after(self, state):
# Ignore get methods, we only care about changes.
if state.request.method == 'GET':
return
publisher.process(state)

View File

@ -54,7 +54,7 @@ def event_create(values):
"resource": "timeline_events",
"event_id": new_event.id
}
publish(payload, "timeline_events")
publish("timeline_events", payload)
return new_event

View File

@ -18,6 +18,15 @@ from oslo.config import cfg
CONF = cfg.CONF
NOTIFICATION_OPTS = [
cfg.StrOpt("rabbit_exchange_name", default="storyboard",
help="The name of the topic exchange which storyboard will "
"use to broadcast its events."),
cfg.StrOpt("rabbit_event_queue_name", default="storyboard_events",
help="The name of the queue that will be created for "
"API events."),
cfg.StrOpt("rabbit_application_name", default="storyboard",
help="The rabbit application identifier for storyboard's "
"connection."),
cfg.StrOpt("rabbit_host", default="localhost",
help="Host of the rabbitmq server."),
cfg.StrOpt("rabbit_login_method", default="AMQPLAIN",

View File

@ -13,50 +13,140 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from threading import Timer
import pika
from oslo.config import cfg
from storyboard.notifications.conf import NOTIFICATION_OPTS
from storyboard.openstack.common import log
CONF = cfg.CONF
CONN = None
CONF = cfg.CONF
LOG = log.getLogger(__name__)
class ConnectionService:
class ConnectionService(object):
"""A generic amqp connection agent that handles unexpected
interactions with RabbitMQ such as channel and connection closures,
by reconnecting on failure.
"""
def __init__(self, conf):
self.credentials = pika.PlainCredentials(
"""Setup the connection instance based on our configuration.
:param conf A configuration object.
"""
self._connection = None
self._channel = None
self._open = False
self.started = False
self._timer = None
self._closing = False
self._open_hooks = set()
self._exchange_name = conf.rabbit_exchange_name
self._application_id = conf.rabbit_application_name
self._properties = pika.BasicProperties(
app_id='storyboard', content_type='application/json')
self._connection_credentials = pika.PlainCredentials(
conf.rabbit_userid,
conf.rabbit_password)
self.connection = pika.BlockingConnection(pika.ConnectionParameters(
self._connection_parameters = pika.ConnectionParameters(
conf.rabbit_host,
conf.rabbit_port,
conf.rabbit_virtual_host,
self.credentials))
self._connection_credentials)
def create_exchange(self, channel, exchange, type):
self.exchange = exchange
self.type = type
self.channel = channel
self.channel.exchange_declare(exchange=self.exchange,
type=self.type, durable=True)
def _connect(self):
"""This method connects to RabbitMQ, establishes a channel, declares
the storyboard exchange if it doesn't yet exist, and executes any
post-connection hooks that an extending class may have registered.
"""
def close_connection(self):
self.connection.close()
# If the closing flag is set, just exit.
if self._closing:
return
# If a timer is set, kill it.
if self._timer:
LOG.debug('Clearing timer...')
self._timer.cancel()
self._timer = None
def initialize():
# Initialize the AMQP event publisher.
global CONN
CONF.register_opts(NOTIFICATION_OPTS, "notifications")
CONN = ConnectionService(CONF.notifications)
# Create the connection
LOG.info('Connecting to %s', self._connection_parameters.host)
self._connection = pika.BlockingConnection(self._connection_parameters)
# Create a channel
LOG.debug('Creating a new channel')
self._channel = self._connection.channel()
self._channel.confirm_delivery()
def get_connection():
global CONN
return CONN
# Declare the exchange
LOG.debug('Declaring exchange %s', self._exchange_name)
self._channel.exchange_declare(exchange=self._exchange_name,
exchange_type='topic',
durable=True,
auto_delete=False)
# Set the open flag and execute any connection hooks.
self._open = True
self._execute_open_hooks()
def _reconnect(self):
"""Reconnect to rabbit.
"""
# Sanity check - if we're closing, do nothing.
if self._closing:
return
# If a timer is already there, assume it's doing its thing...
if self._timer:
return
LOG.debug('Scheduling reconnect in 5 seconds...')
self._timer = Timer(5, self._connect)
self._timer.start()
def _close(self):
"""This method closes the connection to RabbitMQ."""
LOG.info('Closing connection')
self._open = False
if self._channel:
self._channel.close()
self._channel = None
if self._connection:
self._connection.close()
self._connection = None
self._closing = False
LOG.debug('Connection Closed')
def _execute_open_hooks(self):
"""Executes all hooks that have been registered to run on open.
"""
for hook in self._open_hooks:
hook()
def start(self):
"""Start the publisher, opening a connection to RabbitMQ. This method
must be explicitly invoked, otherwise any messages will simply be
cached for later broadcast.
"""
# Create the connection.
self.started = True
self._closing = False
self._connect()
def stop(self):
"""Stop the publisher by closing the channel and the connection.
"""
self.started = False
self._closing = True
self._close()
def add_open_hook(self, hook):
"""Add a method that will be executed whenever a connection is
established.
"""
self._open_hooks.add(hook)

View File

@ -0,0 +1,82 @@
# Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import re
from pecan import hooks
from storyboard.notifications.publisher import publish
class NotificationHook(hooks.PecanHook):
def __init__(self):
super(NotificationHook, self).__init__()
def after(self, state):
# Ignore get methods, we only care about changes.
if state.request.method == 'GET':
return
request = state.request
req_method = request.method
req_user_id = request.current_user_id
req_path = request.path
req_resource_grp = self._parse(req_path)
if not req_resource_grp:
return
resource = req_resource_grp[0]
if req_resource_grp[1]:
resource_id = req_resource_grp[1]
else:
# When a resource is created..
response_str = state.response.body
response = json.loads(response_str)
if response:
resource_id = response.get('id')
else:
resource_id = None
# when adding/removing projects to project_groups..
if req_resource_grp[3]:
sub_resource_id = req_resource_grp[3]
payload = {
"user_id": req_user_id,
"method": req_method,
"resource": resource,
"resource_id": resource_id,
"sub_resource_id": sub_resource_id
}
else:
payload = {
"user_id": req_user_id,
"method": req_method,
"resource": resource,
"resource_id": resource_id
}
publish(resource, payload)
def _parse(self, s):
url_pattern = re.match("^\/v1\/([a-z_]+)\/?([0-9]+)?"
"\/?([a-z]+)?\/?([0-9]+)?$", s)
if url_pattern and url_pattern.groups()[0] != "openid":
return url_pattern.groups()
else:
return

View File

@ -14,85 +14,134 @@
# limitations under the License.
import json
import re
from oslo.config import cfg
from pika.exceptions import ConnectionClosed
from storyboard.notifications import connection_service
from storyboard.notifications.conf import NOTIFICATION_OPTS
from storyboard.notifications.connection_service import ConnectionService
from storyboard.openstack.common import log
CONF = cfg.CONF
LOG = log.getLogger(__name__)
PUBLISHER = None
def parse(s):
url_pattern = re.match("^\/v1\/([a-z_]+)\/?([0-9]+)?"
"\/?([a-z]+)?\/?([0-9]+)?$", s)
if url_pattern and url_pattern.groups()[0] != "openid":
return url_pattern.groups()
else:
return
class Publisher(ConnectionService):
"""A generic message publisher that uses delivery confirmation to ensure
that messages are delivered, and will keep a running cache of unsent
messages while the publisher is attempting to reconnect.
"""
def __init__(self, conf):
"""Setup the publisher instance based on our configuration.
:param conf A configuration object.
"""
super(Publisher, self).__init__(conf)
self._pending = list()
self.add_open_hook(self._publish_pending)
def _publish_pending(self):
"""Publishes any pending messages that were broadcast while the
publisher was connecting.
"""
# Shallow copy, so we can iterate over it without having it be modified
# out of band.
pending = list(self._pending)
for payload in pending:
self._publish(payload)
def _publish(self, payload):
"""Publishes a payload to the passed exchange. If it encounters a
failure, will store the payload for later.
:param Payload payload: The payload to send.
"""
LOG.debug("Sending message to %s [%s]" % (self._exchange_name,
payload.topic))
# First check, are we closing?
if self._closing:
LOG.warning("Cannot send message, publisher is closing.")
if payload not in self._pending:
self._pending.append(payload)
return
# Second check, are we open?
if not self._open:
LOG.debug("Cannot send message, publisher is connecting.")
if payload not in self._pending:
self._pending.append(payload)
self._reconnect()
return
# Third check, are we in a sane state? This should never happen,
# but just in case...
if not self._connection or not self._channel:
LOG.error("Cannot send message, publisher is an unexpected state.")
if payload not in self._pending:
self._pending.append(payload)
self._reconnect()
return
# Try to send a message. If we fail, schedule a reconnect and store
# the message.
try:
self._channel.basic_publish(self._exchange_name,
payload.topic,
json.dumps(payload.payload,
ensure_ascii=False),
self._properties)
if payload in self._pending:
self._pending.remove(payload)
return True
except ConnectionClosed as cc:
LOG.warning("Attempted to send message on closed connection.")
LOG.debug(cc)
self._open = False
if payload not in self._pending:
self._pending.append(payload)
self._reconnect()
return False
def publish_message(self, topic, payload):
"""Publishes a message to RabbitMQ.
"""
self._publish(Payload(topic, payload))
def publish(payload, resource):
payload = json.dumps(payload)
routing_key = resource
conn = connection_service.get_connection()
channel = conn.connection.channel()
class Payload(object):
def __init__(self, topic, payload):
"""Setup the example publisher object, passing in the URL we will use
to connect to RabbitMQ.
conn.create_exchange(channel, 'storyboard', 'topic')
:param topic string The exchange topic to broadcast on.
:param payload string The message payload to send.
"""
channel.basic_publish(exchange='storyboard',
routing_key=routing_key,
body=payload)
channel.close()
self.topic = topic
self.payload = payload
def process(state):
def publish(topic, payload):
"""Send a message with a given topic and payload to the storyboard
exchange. The message will be automatically JSON encoded.
request = state.request
req_method = request.method
req_user_id = request.current_user_id
req_path = request.path
req_resource_grp = parse(req_path)
:param topic: The RabbitMQ topic.
:param payload: The JSON-serializable payload.
:return:
"""
global PUBLISHER
if req_resource_grp:
if not PUBLISHER:
CONF.register_opts(NOTIFICATION_OPTS, "notifications")
PUBLISHER = Publisher(CONF.notifications)
PUBLISHER.start()
resource = req_resource_grp[0]
if req_resource_grp[1]:
resource_id = req_resource_grp[1]
# When a resource is created..
else:
response_str = state.response.body
response = json.loads(response_str)
if response:
resource_id = response.get('id')
else:
resource_id = None
# when adding/removing projects to project_groups..
if req_resource_grp[3]:
sub_resource_id = req_resource_grp[3]
payload = {
"user_id": req_user_id,
"method": req_method,
"resource": resource,
"resource_id": resource_id,
"sub_resource_id": sub_resource_id
}
else:
payload = {
"user_id": req_user_id,
"method": req_method,
"resource": resource,
"resource_id": resource_id
}
publish(payload, resource)
else:
return
PUBLISHER.publish_message(topic, payload)

View File

@ -14,17 +14,21 @@
# limitations under the License.
import ast
import time
from oslo.config import cfg
from pika.exceptions import ConnectionClosed
from storyboard.db.api import timeline_events
from storyboard.notifications import connection_service
from storyboard.notifications.conf import NOTIFICATION_OPTS
from storyboard.notifications.connection_service import ConnectionService
from storyboard.notifications.subscriptions_handler import handle_deletions
from storyboard.notifications.subscriptions_handler import handle_resources
from storyboard.notifications.subscriptions_handler import \
handle_timeline_events
from storyboard.openstack.common import log
CONF = cfg.CONF
LOG = log.getLogger(__name__)
@ -32,25 +36,19 @@ LOG = log.getLogger(__name__)
def subscribe():
log.setup('storyboard')
CONF(project='storyboard')
CONF.register_opts(NOTIFICATION_OPTS, "notifications")
connection_service.initialize()
subscriber = Subscriber(CONF.notifications)
subscriber.start()
conn = connection_service.get_connection()
channel = conn.connection.channel()
while subscriber.started:
(method, properties, body) = subscriber.get()
conn.create_exchange(channel, 'storyboard', 'topic')
if not method or not properties:
LOG.debug("No messages available, sleeping for 5 seconds.")
time.sleep(5)
continue
result = channel.queue_declare(queue='subscription_queue', durable=True)
queue_name = result.method.queue
binding_keys = ['tasks', 'stories', 'projects', 'project_groups',
'timeline_events']
for binding_key in binding_keys:
channel.queue_bind(exchange='storyboard',
queue=queue_name,
routing_key=binding_key)
def callback(ch, method, properties, body):
body_dict = ast.literal_eval(body)
if 'event_id' in body_dict:
event_id = body_dict['event_id']
@ -73,8 +71,64 @@ def subscribe():
if 'sub_resource_id' not in body_dict:
handle_deletions(resource_name, resource_id)
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
# Handle the message
subscriber.ack(method.delivery_tag)
channel.start_consuming()
class Subscriber(ConnectionService):
def __init__(self, conf):
"""Setup the subscriber instance based on our configuration.
:param conf A configuration object.
"""
super(Subscriber, self).__init__(conf)
self._queue_name = conf.rabbit_event_queue_name
self._binding_keys = ['tasks', 'stories', 'projects', 'project_groups',
'timeline_events']
self.add_open_hook(self._declare_queue)
def _declare_queue(self):
"""Declare the subscription queue against our exchange.
"""
self._channel.queue_declare(queue=self._queue_name,
durable=True)
# Set up the queue bindings.
for binding_key in self._binding_keys:
self._channel.queue_bind(exchange=self._exchange_name,
queue=self._queue_name,
routing_key=binding_key)
def ack(self, delivery_tag):
"""Acknowledge receipt and processing of the message.
"""
self._channel.basic_ack(delivery_tag)
def get(self):
"""Get a single message from the queue. If the subscriber is currently
waiting to reconnect, it will return None. Note that you must
manually ack the message after it has been successfully processed.
:rtype: (None, None, None)|(spec.Basic.Get,
spec.Basic.Properties,
str or unicode)
"""
# Sanity check one, are we closing?
if self._closing:
return None, None, None
# Sanity check two, are we open, or reconnecting?
if not self._open:
return None, None, None
try:
return self._channel.basic_get(queue=self._queue_name,
no_ack=False)
except ConnectionClosed as cc:
LOG.warning("Attempted to get message on closed connection.")
LOG.debug(cc)
self._open = False
self._reconnect()
return None, None, None