Merge "Make notification driver configurable"

This commit is contained in:
Zuul 2018-06-08 22:19:00 +00:00 committed by Gerrit Code Review
commit ecd91398b8
11 changed files with 282 additions and 210 deletions

View File

@ -26,7 +26,7 @@ from storyboard.db.api import base as api_base
from storyboard.db.api import stories as stories_api
from storyboard.db.api import tasks as tasks_api
from storyboard.db import models
from storyboard.notifications.publisher import publish
from storyboard.notifications import publisher
CONF = cfg.CONF
@ -112,15 +112,15 @@ def event_create(values):
event_dict = tojson(TimeLineEvent,
TimeLineEvent.from_db_model(new_event))
publish(author_id=request.current_user_id or None,
method="POST",
url=request.headers.get('Referer') or None,
path=request.path or None,
query_string=request.query_string or None,
status=response.status_code or None,
resource="timeline_event",
resource_id=new_event.id or None,
resource_after=event_dict or None)
publisher.publish(author_id=request.current_user_id or None,
method="POST",
url=request.headers.get('Referer') or None,
path=request.path or None,
query_string=request.query_string or None,
status=response.status_code or None,
resource="timeline_event",
resource_id=new_event.id or None,
resource_after=event_dict or None)
return new_event

View File

@ -1,4 +1,4 @@
# Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
# Copyright (c) 2018 IBM Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -17,31 +17,7 @@ from oslo_config import cfg
CONF = cfg.CONF
NOTIFICATION_OPTS = [
cfg.StrOpt("rabbit_exchange_name", default="storyboard",
help="The name of the topic exchange which storyboard will "
"use to broadcast its events."),
cfg.StrOpt("rabbit_event_queue_name", default="storyboard_events",
help="The name of the queue that will be created for "
"API events."),
cfg.StrOpt("rabbit_application_name", default="storyboard",
help="The rabbit application identifier for storyboard's "
"connection."),
cfg.StrOpt("rabbit_host", default="localhost",
help="Host of the rabbitmq server."),
cfg.StrOpt("rabbit_login_method", default="AMQPLAIN",
help="The RabbitMQ login method."),
cfg.StrOpt("rabbit_userid", default="storyboard",
help="The RabbitMQ userid."),
cfg.StrOpt("rabbit_password", default="storyboard",
help="The RabbitMQ password."),
cfg.IntOpt("rabbit_port", default=5672,
help="The RabbitMQ broker port where a single node is used."),
cfg.StrOpt("rabbit_virtual_host", default="/",
help="The virtual host within which our queues and exchanges "
"live."),
cfg.IntOpt("rabbit_connection_attempts", default=6,
help="The number of connection attempts before giving-up"),
cfg.IntOpt("rabbit_retry_delay", default=10,
help="The interval between connection attempts (in seconds)")
OPTS = [
cfg.StrOpt('driver', choices=['pika'],
help='The notification driver to use', default='pika')
]

View File

@ -23,7 +23,7 @@ from storyboard.api.v1 import wmodels
import storyboard.common.hook_priorities as priority
from storyboard.db.api import base as api_base
from storyboard.db import models
from storyboard.notifications.publisher import publish
from storyboard.notifications import publisher
class_mappings = {'task': [models.Task, wmodels.Task],
@ -110,18 +110,18 @@ class NotificationHook(hooks.PecanHook):
# Build the payload. Use of None is included to ensure that we don't
# accidentally blow up the API call, but we don't anticipate it
# happening.
publish(author_id=request.current_user_id,
method=request.method,
url=request.headers.get('Referer'),
path=request.path,
query_string=request.query_string,
status=response.status_code,
resource=resource,
resource_id=resource_id,
sub_resource=subresource,
sub_resource_id=subresource_id,
resource_before=old_resource,
resource_after=new_resource)
publisher.publish(author_id=request.current_user_id,
method=request.method,
url=request.headers.get('Referer'),
path=request.path,
query_string=request.query_string,
status=response.status_code,
resource=resource,
resource_id=resource_id,
sub_resource=subresource,
sub_resource_id=subresource_id,
resource_before=old_resource,
resource_after=new_resource)
def get_original_resource(self, resource, resource_id):
"""Given a resource name and ID, will load that resource and map it

View File

@ -0,0 +1,56 @@
# Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_config import cfg
CONF = cfg.CONF
NOTIFICATION_OPTS = [
cfg.StrOpt("rabbit_exchange_name", default="storyboard",
help="The name of the topic exchange which storyboard will "
"use to broadcast its events.",
deprecated_group='notifications'),
cfg.StrOpt("rabbit_event_queue_name", default="storyboard_events",
help="The name of the queue that will be created for "
"API events.",
deprecated_group='notifications'),
cfg.StrOpt("rabbit_application_name", default="storyboard",
help="The rabbit application identifier for storyboard's "
"connection.",
deprecated_group='notifications'),
cfg.StrOpt("rabbit_host", default="localhost",
help="Host of the rabbitmq server.",
deprecated_group='notifications'),
cfg.StrOpt("rabbit_login_method", default="AMQPLAIN",
help="The RabbitMQ login method.",
deprecated_group='notifications'),
cfg.StrOpt("rabbit_userid", default="storyboard",
help="The RabbitMQ userid.",
deprecated_group='notifications'),
cfg.StrOpt("rabbit_password", default="storyboard",
help="The RabbitMQ password.",
deprecated_group='notifications'),
cfg.IntOpt("rabbit_port", default=5672,
help="The RabbitMQ broker port where a single node is used."),
cfg.StrOpt("rabbit_virtual_host", default="/",
help="The virtual host within which our queues and exchanges "
"live.", deprecated_group='notifications'),
cfg.IntOpt("rabbit_connection_attempts", default=6,
help="The number of connection attempts before giving-up",
deprecated_group='notifications'),
cfg.IntOpt("rabbit_retry_delay", default=10,
help="The interval between connection attempts (in seconds)",
deprecated_group='notifications')
]

View File

@ -0,0 +1,181 @@
# Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
from oslo_config import cfg
from oslo_log import log
from pika.exceptions import ConnectionClosed
from storyboard.notifications.pika.conf import NOTIFICATION_OPTS
from storyboard.notifications.pika.connection_service import ConnectionService
from storyboard._i18n import _, _LW, _LE
CONF = cfg.CONF
LOG = log.getLogger(__name__)
PUBLISHER = None
class Publisher(ConnectionService):
"""A generic message publisher that uses delivery confirmation to ensure
that messages are delivered, and will keep a running cache of unsent
messages while the publisher is attempting to reconnect.
"""
def __init__(self, conf):
"""Setup the publisher instance based on our configuration.
:param conf A configuration object.
"""
super(Publisher, self).__init__(conf)
self._pending = list()
self.add_open_hook(self._publish_pending)
def _publish_pending(self):
"""Publishes any pending messages that were broadcast while the
publisher was connecting.
"""
# Shallow copy, so we can iterate over it without having it be modified
# out of band.
pending = list(self._pending)
for payload in pending:
self._publish(payload)
def _publish(self, payload):
"""Publishes a payload to the passed exchange. If it encounters a
failure, will store the payload for later.
:param Payload payload: The payload to send.
"""
LOG.debug(_("Sending message to %(name)s [%(topic)s]") %
{'name': self._exchange_name, 'topic': payload.topic})
# First check, are we closing?
if self._closing:
LOG.warning(_LW("Cannot send message, publisher is closing."))
if payload not in self._pending:
self._pending.append(payload)
return
# Second check, are we open?
if not self._open:
LOG.debug(_("Cannot send message, publisher is connecting."))
if payload not in self._pending:
self._pending.append(payload)
self._reconnect()
return
# Third check, are we in a sane state? This should never happen,
# but just in case...
if not self._connection or not self._channel:
LOG.error(_LE("Cannot send message, publisher is "
"an unexpected state."))
if payload not in self._pending:
self._pending.append(payload)
self._reconnect()
return
# Try to send a message. If we fail, schedule a reconnect and store
# the message.
try:
self._channel.basic_publish(self._exchange_name,
payload.topic,
json.dumps(payload.payload,
ensure_ascii=False),
self._properties)
if payload in self._pending:
self._pending.remove(payload)
return True
except (ConnectionClosed, AttributeError) as cc:
LOG.warning(_LW("Attempted to send message on closed connection."))
LOG.debug(cc)
self._open = False
if payload not in self._pending:
self._pending.append(payload)
self._reconnect()
return False
def publish_message(self, topic, payload):
"""Publishes a message to RabbitMQ.
"""
self._publish(Payload(topic, payload))
class Payload(object):
def __init__(self, topic, payload):
"""Setup the example publisher object, passing in the URL we will use
to connect to RabbitMQ.
:param topic string The exchange topic to broadcast on.
:param payload string The message payload to send.
"""
self.topic = topic
self.payload = payload
def publish(resource, author_id=None, method=None, url=None, path=None,
query_string=None, status=None, resource_id=None,
sub_resource=None, sub_resource_id=None, resource_before=None,
resource_after=None):
"""Send a message for an API event to the storyboard exchange.
The message will be automatically JSON encoded.
:param resource: The extrapolated resource type (project, story, etc).
:param author_id: The ID of the author who performed this action.
:param method: The HTTP Method used.
:param url: The Referer header from the request.
:param path: The HTTP Path used.
:param query_string: The HTTP query string used.
:param status: The HTTP Status code of the response.
:param resource_id: The ID of the resource.
:param sub_resource: The extracted subresource (user_token, etc)
:param sub_resource_id: THe ID of the subresource.
:param resource_before: The resource state before this event occurred.
:param resource_after: The resource state after this event occurred.
"""
global PUBLISHER
if not PUBLISHER:
CONF.register_opts(NOTIFICATION_OPTS, "pika-notifications")
PUBLISHER = Publisher(CONF.pika_notifications)
PUBLISHER.start()
payload = {
"author_id": author_id,
"method": method,
"url": url,
"path": path,
"query_string": query_string,
"status": status,
"resource": resource,
"resource_id": resource_id,
"sub_resource": sub_resource,
"sub_resource_id": sub_resource_id,
"resource_before": resource_before,
"resource_after": resource_after
}
if resource:
PUBLISHER.publish_message(resource, payload)
else:
LOG.warning("Attempted to send payload with no destination "
"resource.")

View File

@ -21,8 +21,8 @@ from oslo_log import log
from pika.exceptions import ConnectionClosed
from stevedore import enabled
from storyboard.notifications.conf import NOTIFICATION_OPTS
from storyboard.notifications.connection_service import ConnectionService
from storyboard.notifications.pika.conf import NOTIFICATION_OPTS
from storyboard.notifications.pika.connection_service import ConnectionService
from storyboard._i18n import _, _LW

View File

@ -1,4 +1,4 @@
# Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
# Copyright (c) 2018 IBM Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -13,167 +13,26 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import json
from oslo_config import cfg
from oslo_log import log
from pika.exceptions import ConnectionClosed
from storyboard.notifications.conf import NOTIFICATION_OPTS
from storyboard.notifications.connection_service import ConnectionService
from storyboard._i18n import _, _LW, _LE
from oslo_utils import importutils
from storyboard.notifications import conf
CONF = cfg.CONF
LOG = log.getLogger(__name__)
PUBLISHER = None
class Publisher(ConnectionService):
"""A generic message publisher that uses delivery confirmation to ensure
that messages are delivered, and will keep a running cache of unsent
messages while the publisher is attempting to reconnect.
"""
def __init__(self, conf):
"""Setup the publisher instance based on our configuration.
:param conf A configuration object.
"""
super(Publisher, self).__init__(conf)
self._pending = list()
self.add_open_hook(self._publish_pending)
def _publish_pending(self):
"""Publishes any pending messages that were broadcast while the
publisher was connecting.
"""
# Shallow copy, so we can iterate over it without having it be modified
# out of band.
pending = list(self._pending)
for payload in pending:
self._publish(payload)
def _publish(self, payload):
"""Publishes a payload to the passed exchange. If it encounters a
failure, will store the payload for later.
:param Payload payload: The payload to send.
"""
LOG.debug(_("Sending message to %(name)s [%(topic)s]") %
{'name': self._exchange_name, 'topic': payload.topic})
# First check, are we closing?
if self._closing:
LOG.warning(_LW("Cannot send message, publisher is closing."))
if payload not in self._pending:
self._pending.append(payload)
return
# Second check, are we open?
if not self._open:
LOG.debug(_("Cannot send message, publisher is connecting."))
if payload not in self._pending:
self._pending.append(payload)
self._reconnect()
return
# Third check, are we in a sane state? This should never happen,
# but just in case...
if not self._connection or not self._channel:
LOG.error(_LE("Cannot send message, publisher is "
"an unexpected state."))
if payload not in self._pending:
self._pending.append(payload)
self._reconnect()
return
# Try to send a message. If we fail, schedule a reconnect and store
# the message.
try:
self._channel.basic_publish(self._exchange_name,
payload.topic,
json.dumps(payload.payload,
ensure_ascii=False),
self._properties)
if payload in self._pending:
self._pending.remove(payload)
return True
except (ConnectionClosed, AttributeError) as cc:
LOG.warning(_LW("Attempted to send message on closed connection."))
LOG.debug(cc)
self._open = False
if payload not in self._pending:
self._pending.append(payload)
self._reconnect()
return False
def publish_message(self, topic, payload):
"""Publishes a message to RabbitMQ.
"""
self._publish(Payload(topic, payload))
class Payload(object):
def __init__(self, topic, payload):
"""Setup the example publisher object, passing in the URL we will use
to connect to RabbitMQ.
:param topic string The exchange topic to broadcast on.
:param payload string The message payload to send.
"""
self.topic = topic
self.payload = payload
CONF.register_opts(conf.OPTS, 'notifications')
def publish(resource, author_id=None, method=None, url=None, path=None,
query_string=None, status=None, resource_id=None,
sub_resource=None, sub_resource_id=None, resource_before=None,
resource_after=None):
"""Send a message for an API event to the storyboard exchange. The message
will be automatically JSON encoded.
:param resource: The extrapolated resource type (project, story, etc).
:param author_id: The ID of the author who performed this action.
:param method: The HTTP Method used.
:param url: The Referer header from the request.
:param path: The HTTP Path used.
:param query_string: The HTTP query string used.
:param status: The HTTP Status code of the response.
:param resource_id: The ID of the resource.
:param sub_resource: The extracted subresource (user_token, etc)
:param sub_resource_id: THe ID of the subresource.
:param resource_before: The resource state before this event occurred.
:param resource_after: The resource state after this event occurred.
"""
global PUBLISHER
if not PUBLISHER:
CONF.register_opts(NOTIFICATION_OPTS, "notifications")
PUBLISHER = Publisher(CONF.notifications)
PUBLISHER.start()
payload = {
"author_id": author_id,
"method": method,
"url": url,
"path": path,
"query_string": query_string,
"status": status,
"resource": resource,
"resource_id": resource_id,
"sub_resource": sub_resource,
"sub_resource_id": sub_resource_id,
"resource_before": resource_before,
"resource_after": resource_after
}
if resource:
PUBLISHER.publish_message(resource, payload)
else:
LOG.warning("Attempted to send payload with no destination resource.")
publisher_module = importutils.import_module(
'storyboard.notifications.' + CONF.notifications.driver + '.publisher')
publisher_module.publish(resource, author_id=author_id, method=method,
url=url, path=path, query_string=query_string,
status=status, resource_id=resource_id,
sub_resource=sub_resource,
sub_resource_id=sub_resource_id,
resource_before=resource_before,
resource_after=resource_after)

View File

@ -23,7 +23,7 @@ import six
import storyboard.db.api.base as db_api
from storyboard.notifications.notification_hook import class_mappings
from storyboard.notifications.subscriber import subscribe
from storyboard.notifications.pika.subscriber import subscribe
from storyboard._i18n import _LI, _LW
from storyboard.plugin.base import PluginBase

View File

@ -221,7 +221,7 @@ class TestNotificationHook(base.BaseDbTestCase):
self.assertEqual(mock_state.old_entity_values['priority'],
sample_task_wmodel.priority)
@patch('storyboard.notifications.notification_hook.publish')
@patch('storyboard.notifications.notification_hook.publisher')
@patch.object(NotificationHook, 'get_original_resource')
def test_after_publishes_payload(self, mock_get_original_resource,
mock_publish):
@ -261,7 +261,7 @@ class TestNotificationHook(base.BaseDbTestCase):
mock_get_original_resource.return_value = smt_json
n.after(mock_state)
mock_publish.assert_called_with(
mock_publish.publish.assert_called_with(
author_id=mock_state.request.current_user_id,
method=mock_state.request.method,
url=mock_state.request.headers['Referer'],