Merge "Make notification driver configurable"
This commit is contained in:
commit
ecd91398b8
|
@ -26,7 +26,7 @@ from storyboard.db.api import base as api_base
|
||||||
from storyboard.db.api import stories as stories_api
|
from storyboard.db.api import stories as stories_api
|
||||||
from storyboard.db.api import tasks as tasks_api
|
from storyboard.db.api import tasks as tasks_api
|
||||||
from storyboard.db import models
|
from storyboard.db import models
|
||||||
from storyboard.notifications.publisher import publish
|
from storyboard.notifications import publisher
|
||||||
|
|
||||||
CONF = cfg.CONF
|
CONF = cfg.CONF
|
||||||
|
|
||||||
|
@ -112,15 +112,15 @@ def event_create(values):
|
||||||
event_dict = tojson(TimeLineEvent,
|
event_dict = tojson(TimeLineEvent,
|
||||||
TimeLineEvent.from_db_model(new_event))
|
TimeLineEvent.from_db_model(new_event))
|
||||||
|
|
||||||
publish(author_id=request.current_user_id or None,
|
publisher.publish(author_id=request.current_user_id or None,
|
||||||
method="POST",
|
method="POST",
|
||||||
url=request.headers.get('Referer') or None,
|
url=request.headers.get('Referer') or None,
|
||||||
path=request.path or None,
|
path=request.path or None,
|
||||||
query_string=request.query_string or None,
|
query_string=request.query_string or None,
|
||||||
status=response.status_code or None,
|
status=response.status_code or None,
|
||||||
resource="timeline_event",
|
resource="timeline_event",
|
||||||
resource_id=new_event.id or None,
|
resource_id=new_event.id or None,
|
||||||
resource_after=event_dict or None)
|
resource_after=event_dict or None)
|
||||||
|
|
||||||
return new_event
|
return new_event
|
||||||
|
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
# Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
|
# Copyright (c) 2018 IBM Corp.
|
||||||
#
|
#
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
# you may not use this file except in compliance with the License.
|
# you may not use this file except in compliance with the License.
|
||||||
|
@ -17,31 +17,7 @@ from oslo_config import cfg
|
||||||
|
|
||||||
CONF = cfg.CONF
|
CONF = cfg.CONF
|
||||||
|
|
||||||
NOTIFICATION_OPTS = [
|
OPTS = [
|
||||||
cfg.StrOpt("rabbit_exchange_name", default="storyboard",
|
cfg.StrOpt('driver', choices=['pika'],
|
||||||
help="The name of the topic exchange which storyboard will "
|
help='The notification driver to use', default='pika')
|
||||||
"use to broadcast its events."),
|
|
||||||
cfg.StrOpt("rabbit_event_queue_name", default="storyboard_events",
|
|
||||||
help="The name of the queue that will be created for "
|
|
||||||
"API events."),
|
|
||||||
cfg.StrOpt("rabbit_application_name", default="storyboard",
|
|
||||||
help="The rabbit application identifier for storyboard's "
|
|
||||||
"connection."),
|
|
||||||
cfg.StrOpt("rabbit_host", default="localhost",
|
|
||||||
help="Host of the rabbitmq server."),
|
|
||||||
cfg.StrOpt("rabbit_login_method", default="AMQPLAIN",
|
|
||||||
help="The RabbitMQ login method."),
|
|
||||||
cfg.StrOpt("rabbit_userid", default="storyboard",
|
|
||||||
help="The RabbitMQ userid."),
|
|
||||||
cfg.StrOpt("rabbit_password", default="storyboard",
|
|
||||||
help="The RabbitMQ password."),
|
|
||||||
cfg.IntOpt("rabbit_port", default=5672,
|
|
||||||
help="The RabbitMQ broker port where a single node is used."),
|
|
||||||
cfg.StrOpt("rabbit_virtual_host", default="/",
|
|
||||||
help="The virtual host within which our queues and exchanges "
|
|
||||||
"live."),
|
|
||||||
cfg.IntOpt("rabbit_connection_attempts", default=6,
|
|
||||||
help="The number of connection attempts before giving-up"),
|
|
||||||
cfg.IntOpt("rabbit_retry_delay", default=10,
|
|
||||||
help="The interval between connection attempts (in seconds)")
|
|
||||||
]
|
]
|
||||||
|
|
|
@ -23,7 +23,7 @@ from storyboard.api.v1 import wmodels
|
||||||
import storyboard.common.hook_priorities as priority
|
import storyboard.common.hook_priorities as priority
|
||||||
from storyboard.db.api import base as api_base
|
from storyboard.db.api import base as api_base
|
||||||
from storyboard.db import models
|
from storyboard.db import models
|
||||||
from storyboard.notifications.publisher import publish
|
from storyboard.notifications import publisher
|
||||||
|
|
||||||
|
|
||||||
class_mappings = {'task': [models.Task, wmodels.Task],
|
class_mappings = {'task': [models.Task, wmodels.Task],
|
||||||
|
@ -110,18 +110,18 @@ class NotificationHook(hooks.PecanHook):
|
||||||
# Build the payload. Use of None is included to ensure that we don't
|
# Build the payload. Use of None is included to ensure that we don't
|
||||||
# accidentally blow up the API call, but we don't anticipate it
|
# accidentally blow up the API call, but we don't anticipate it
|
||||||
# happening.
|
# happening.
|
||||||
publish(author_id=request.current_user_id,
|
publisher.publish(author_id=request.current_user_id,
|
||||||
method=request.method,
|
method=request.method,
|
||||||
url=request.headers.get('Referer'),
|
url=request.headers.get('Referer'),
|
||||||
path=request.path,
|
path=request.path,
|
||||||
query_string=request.query_string,
|
query_string=request.query_string,
|
||||||
status=response.status_code,
|
status=response.status_code,
|
||||||
resource=resource,
|
resource=resource,
|
||||||
resource_id=resource_id,
|
resource_id=resource_id,
|
||||||
sub_resource=subresource,
|
sub_resource=subresource,
|
||||||
sub_resource_id=subresource_id,
|
sub_resource_id=subresource_id,
|
||||||
resource_before=old_resource,
|
resource_before=old_resource,
|
||||||
resource_after=new_resource)
|
resource_after=new_resource)
|
||||||
|
|
||||||
def get_original_resource(self, resource, resource_id):
|
def get_original_resource(self, resource, resource_id):
|
||||||
"""Given a resource name and ID, will load that resource and map it
|
"""Given a resource name and ID, will load that resource and map it
|
||||||
|
|
|
@ -0,0 +1,56 @@
|
||||||
|
# Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||||
|
# implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
from oslo_config import cfg
|
||||||
|
|
||||||
|
CONF = cfg.CONF
|
||||||
|
|
||||||
|
NOTIFICATION_OPTS = [
|
||||||
|
cfg.StrOpt("rabbit_exchange_name", default="storyboard",
|
||||||
|
help="The name of the topic exchange which storyboard will "
|
||||||
|
"use to broadcast its events.",
|
||||||
|
deprecated_group='notifications'),
|
||||||
|
cfg.StrOpt("rabbit_event_queue_name", default="storyboard_events",
|
||||||
|
help="The name of the queue that will be created for "
|
||||||
|
"API events.",
|
||||||
|
deprecated_group='notifications'),
|
||||||
|
cfg.StrOpt("rabbit_application_name", default="storyboard",
|
||||||
|
help="The rabbit application identifier for storyboard's "
|
||||||
|
"connection.",
|
||||||
|
deprecated_group='notifications'),
|
||||||
|
cfg.StrOpt("rabbit_host", default="localhost",
|
||||||
|
help="Host of the rabbitmq server.",
|
||||||
|
deprecated_group='notifications'),
|
||||||
|
cfg.StrOpt("rabbit_login_method", default="AMQPLAIN",
|
||||||
|
help="The RabbitMQ login method.",
|
||||||
|
deprecated_group='notifications'),
|
||||||
|
cfg.StrOpt("rabbit_userid", default="storyboard",
|
||||||
|
help="The RabbitMQ userid.",
|
||||||
|
deprecated_group='notifications'),
|
||||||
|
cfg.StrOpt("rabbit_password", default="storyboard",
|
||||||
|
help="The RabbitMQ password.",
|
||||||
|
deprecated_group='notifications'),
|
||||||
|
cfg.IntOpt("rabbit_port", default=5672,
|
||||||
|
help="The RabbitMQ broker port where a single node is used."),
|
||||||
|
cfg.StrOpt("rabbit_virtual_host", default="/",
|
||||||
|
help="The virtual host within which our queues and exchanges "
|
||||||
|
"live.", deprecated_group='notifications'),
|
||||||
|
cfg.IntOpt("rabbit_connection_attempts", default=6,
|
||||||
|
help="The number of connection attempts before giving-up",
|
||||||
|
deprecated_group='notifications'),
|
||||||
|
cfg.IntOpt("rabbit_retry_delay", default=10,
|
||||||
|
help="The interval between connection attempts (in seconds)",
|
||||||
|
deprecated_group='notifications')
|
||||||
|
]
|
|
@ -0,0 +1,181 @@
|
||||||
|
# Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||||
|
# implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
import json
|
||||||
|
|
||||||
|
from oslo_config import cfg
|
||||||
|
from oslo_log import log
|
||||||
|
from pika.exceptions import ConnectionClosed
|
||||||
|
|
||||||
|
from storyboard.notifications.pika.conf import NOTIFICATION_OPTS
|
||||||
|
from storyboard.notifications.pika.connection_service import ConnectionService
|
||||||
|
from storyboard._i18n import _, _LW, _LE
|
||||||
|
|
||||||
|
|
||||||
|
CONF = cfg.CONF
|
||||||
|
LOG = log.getLogger(__name__)
|
||||||
|
PUBLISHER = None
|
||||||
|
|
||||||
|
|
||||||
|
class Publisher(ConnectionService):
|
||||||
|
"""A generic message publisher that uses delivery confirmation to ensure
|
||||||
|
that messages are delivered, and will keep a running cache of unsent
|
||||||
|
messages while the publisher is attempting to reconnect.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, conf):
|
||||||
|
"""Setup the publisher instance based on our configuration.
|
||||||
|
|
||||||
|
:param conf A configuration object.
|
||||||
|
"""
|
||||||
|
super(Publisher, self).__init__(conf)
|
||||||
|
|
||||||
|
self._pending = list()
|
||||||
|
|
||||||
|
self.add_open_hook(self._publish_pending)
|
||||||
|
|
||||||
|
def _publish_pending(self):
|
||||||
|
"""Publishes any pending messages that were broadcast while the
|
||||||
|
publisher was connecting.
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Shallow copy, so we can iterate over it without having it be modified
|
||||||
|
# out of band.
|
||||||
|
pending = list(self._pending)
|
||||||
|
|
||||||
|
for payload in pending:
|
||||||
|
self._publish(payload)
|
||||||
|
|
||||||
|
def _publish(self, payload):
|
||||||
|
"""Publishes a payload to the passed exchange. If it encounters a
|
||||||
|
failure, will store the payload for later.
|
||||||
|
|
||||||
|
:param Payload payload: The payload to send.
|
||||||
|
"""
|
||||||
|
LOG.debug(_("Sending message to %(name)s [%(topic)s]") %
|
||||||
|
{'name': self._exchange_name, 'topic': payload.topic})
|
||||||
|
|
||||||
|
# First check, are we closing?
|
||||||
|
if self._closing:
|
||||||
|
LOG.warning(_LW("Cannot send message, publisher is closing."))
|
||||||
|
if payload not in self._pending:
|
||||||
|
self._pending.append(payload)
|
||||||
|
return
|
||||||
|
|
||||||
|
# Second check, are we open?
|
||||||
|
if not self._open:
|
||||||
|
LOG.debug(_("Cannot send message, publisher is connecting."))
|
||||||
|
if payload not in self._pending:
|
||||||
|
self._pending.append(payload)
|
||||||
|
self._reconnect()
|
||||||
|
return
|
||||||
|
|
||||||
|
# Third check, are we in a sane state? This should never happen,
|
||||||
|
# but just in case...
|
||||||
|
if not self._connection or not self._channel:
|
||||||
|
LOG.error(_LE("Cannot send message, publisher is "
|
||||||
|
"an unexpected state."))
|
||||||
|
if payload not in self._pending:
|
||||||
|
self._pending.append(payload)
|
||||||
|
self._reconnect()
|
||||||
|
return
|
||||||
|
|
||||||
|
# Try to send a message. If we fail, schedule a reconnect and store
|
||||||
|
# the message.
|
||||||
|
try:
|
||||||
|
self._channel.basic_publish(self._exchange_name,
|
||||||
|
payload.topic,
|
||||||
|
json.dumps(payload.payload,
|
||||||
|
ensure_ascii=False),
|
||||||
|
self._properties)
|
||||||
|
if payload in self._pending:
|
||||||
|
self._pending.remove(payload)
|
||||||
|
return True
|
||||||
|
except (ConnectionClosed, AttributeError) as cc:
|
||||||
|
LOG.warning(_LW("Attempted to send message on closed connection."))
|
||||||
|
LOG.debug(cc)
|
||||||
|
self._open = False
|
||||||
|
if payload not in self._pending:
|
||||||
|
self._pending.append(payload)
|
||||||
|
self._reconnect()
|
||||||
|
return False
|
||||||
|
|
||||||
|
def publish_message(self, topic, payload):
|
||||||
|
"""Publishes a message to RabbitMQ.
|
||||||
|
"""
|
||||||
|
self._publish(Payload(topic, payload))
|
||||||
|
|
||||||
|
|
||||||
|
class Payload(object):
|
||||||
|
def __init__(self, topic, payload):
|
||||||
|
"""Setup the example publisher object, passing in the URL we will use
|
||||||
|
to connect to RabbitMQ.
|
||||||
|
|
||||||
|
:param topic string The exchange topic to broadcast on.
|
||||||
|
:param payload string The message payload to send.
|
||||||
|
"""
|
||||||
|
|
||||||
|
self.topic = topic
|
||||||
|
self.payload = payload
|
||||||
|
|
||||||
|
|
||||||
|
def publish(resource, author_id=None, method=None, url=None, path=None,
|
||||||
|
query_string=None, status=None, resource_id=None,
|
||||||
|
sub_resource=None, sub_resource_id=None, resource_before=None,
|
||||||
|
resource_after=None):
|
||||||
|
"""Send a message for an API event to the storyboard exchange.
|
||||||
|
|
||||||
|
The message will be automatically JSON encoded.
|
||||||
|
|
||||||
|
:param resource: The extrapolated resource type (project, story, etc).
|
||||||
|
:param author_id: The ID of the author who performed this action.
|
||||||
|
:param method: The HTTP Method used.
|
||||||
|
:param url: The Referer header from the request.
|
||||||
|
:param path: The HTTP Path used.
|
||||||
|
:param query_string: The HTTP query string used.
|
||||||
|
:param status: The HTTP Status code of the response.
|
||||||
|
:param resource_id: The ID of the resource.
|
||||||
|
:param sub_resource: The extracted subresource (user_token, etc)
|
||||||
|
:param sub_resource_id: THe ID of the subresource.
|
||||||
|
:param resource_before: The resource state before this event occurred.
|
||||||
|
:param resource_after: The resource state after this event occurred.
|
||||||
|
"""
|
||||||
|
global PUBLISHER
|
||||||
|
|
||||||
|
if not PUBLISHER:
|
||||||
|
CONF.register_opts(NOTIFICATION_OPTS, "pika-notifications")
|
||||||
|
PUBLISHER = Publisher(CONF.pika_notifications)
|
||||||
|
PUBLISHER.start()
|
||||||
|
|
||||||
|
payload = {
|
||||||
|
"author_id": author_id,
|
||||||
|
"method": method,
|
||||||
|
"url": url,
|
||||||
|
"path": path,
|
||||||
|
"query_string": query_string,
|
||||||
|
"status": status,
|
||||||
|
"resource": resource,
|
||||||
|
"resource_id": resource_id,
|
||||||
|
"sub_resource": sub_resource,
|
||||||
|
"sub_resource_id": sub_resource_id,
|
||||||
|
"resource_before": resource_before,
|
||||||
|
"resource_after": resource_after
|
||||||
|
}
|
||||||
|
|
||||||
|
if resource:
|
||||||
|
PUBLISHER.publish_message(resource, payload)
|
||||||
|
else:
|
||||||
|
LOG.warning("Attempted to send payload with no destination "
|
||||||
|
"resource.")
|
|
@ -21,8 +21,8 @@ from oslo_log import log
|
||||||
from pika.exceptions import ConnectionClosed
|
from pika.exceptions import ConnectionClosed
|
||||||
from stevedore import enabled
|
from stevedore import enabled
|
||||||
|
|
||||||
from storyboard.notifications.conf import NOTIFICATION_OPTS
|
from storyboard.notifications.pika.conf import NOTIFICATION_OPTS
|
||||||
from storyboard.notifications.connection_service import ConnectionService
|
from storyboard.notifications.pika.connection_service import ConnectionService
|
||||||
from storyboard._i18n import _, _LW
|
from storyboard._i18n import _, _LW
|
||||||
|
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
# Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
|
# Copyright (c) 2018 IBM Corp.
|
||||||
#
|
#
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
# you may not use this file except in compliance with the License.
|
# you may not use this file except in compliance with the License.
|
||||||
|
@ -13,167 +13,26 @@
|
||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
import json
|
|
||||||
|
|
||||||
from oslo_config import cfg
|
from oslo_config import cfg
|
||||||
from oslo_log import log
|
from oslo_utils import importutils
|
||||||
from pika.exceptions import ConnectionClosed
|
|
||||||
|
|
||||||
from storyboard.notifications.conf import NOTIFICATION_OPTS
|
|
||||||
from storyboard.notifications.connection_service import ConnectionService
|
|
||||||
from storyboard._i18n import _, _LW, _LE
|
|
||||||
|
|
||||||
|
from storyboard.notifications import conf
|
||||||
|
|
||||||
CONF = cfg.CONF
|
CONF = cfg.CONF
|
||||||
LOG = log.getLogger(__name__)
|
CONF.register_opts(conf.OPTS, 'notifications')
|
||||||
PUBLISHER = None
|
|
||||||
|
|
||||||
|
|
||||||
class Publisher(ConnectionService):
|
|
||||||
"""A generic message publisher that uses delivery confirmation to ensure
|
|
||||||
that messages are delivered, and will keep a running cache of unsent
|
|
||||||
messages while the publisher is attempting to reconnect.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self, conf):
|
|
||||||
"""Setup the publisher instance based on our configuration.
|
|
||||||
|
|
||||||
:param conf A configuration object.
|
|
||||||
"""
|
|
||||||
super(Publisher, self).__init__(conf)
|
|
||||||
|
|
||||||
self._pending = list()
|
|
||||||
|
|
||||||
self.add_open_hook(self._publish_pending)
|
|
||||||
|
|
||||||
def _publish_pending(self):
|
|
||||||
"""Publishes any pending messages that were broadcast while the
|
|
||||||
publisher was connecting.
|
|
||||||
"""
|
|
||||||
|
|
||||||
# Shallow copy, so we can iterate over it without having it be modified
|
|
||||||
# out of band.
|
|
||||||
pending = list(self._pending)
|
|
||||||
|
|
||||||
for payload in pending:
|
|
||||||
self._publish(payload)
|
|
||||||
|
|
||||||
def _publish(self, payload):
|
|
||||||
"""Publishes a payload to the passed exchange. If it encounters a
|
|
||||||
failure, will store the payload for later.
|
|
||||||
|
|
||||||
:param Payload payload: The payload to send.
|
|
||||||
"""
|
|
||||||
LOG.debug(_("Sending message to %(name)s [%(topic)s]") %
|
|
||||||
{'name': self._exchange_name, 'topic': payload.topic})
|
|
||||||
|
|
||||||
# First check, are we closing?
|
|
||||||
if self._closing:
|
|
||||||
LOG.warning(_LW("Cannot send message, publisher is closing."))
|
|
||||||
if payload not in self._pending:
|
|
||||||
self._pending.append(payload)
|
|
||||||
return
|
|
||||||
|
|
||||||
# Second check, are we open?
|
|
||||||
if not self._open:
|
|
||||||
LOG.debug(_("Cannot send message, publisher is connecting."))
|
|
||||||
if payload not in self._pending:
|
|
||||||
self._pending.append(payload)
|
|
||||||
self._reconnect()
|
|
||||||
return
|
|
||||||
|
|
||||||
# Third check, are we in a sane state? This should never happen,
|
|
||||||
# but just in case...
|
|
||||||
if not self._connection or not self._channel:
|
|
||||||
LOG.error(_LE("Cannot send message, publisher is "
|
|
||||||
"an unexpected state."))
|
|
||||||
if payload not in self._pending:
|
|
||||||
self._pending.append(payload)
|
|
||||||
self._reconnect()
|
|
||||||
return
|
|
||||||
|
|
||||||
# Try to send a message. If we fail, schedule a reconnect and store
|
|
||||||
# the message.
|
|
||||||
try:
|
|
||||||
self._channel.basic_publish(self._exchange_name,
|
|
||||||
payload.topic,
|
|
||||||
json.dumps(payload.payload,
|
|
||||||
ensure_ascii=False),
|
|
||||||
self._properties)
|
|
||||||
if payload in self._pending:
|
|
||||||
self._pending.remove(payload)
|
|
||||||
return True
|
|
||||||
except (ConnectionClosed, AttributeError) as cc:
|
|
||||||
LOG.warning(_LW("Attempted to send message on closed connection."))
|
|
||||||
LOG.debug(cc)
|
|
||||||
self._open = False
|
|
||||||
if payload not in self._pending:
|
|
||||||
self._pending.append(payload)
|
|
||||||
self._reconnect()
|
|
||||||
return False
|
|
||||||
|
|
||||||
def publish_message(self, topic, payload):
|
|
||||||
"""Publishes a message to RabbitMQ.
|
|
||||||
"""
|
|
||||||
self._publish(Payload(topic, payload))
|
|
||||||
|
|
||||||
|
|
||||||
class Payload(object):
|
|
||||||
def __init__(self, topic, payload):
|
|
||||||
"""Setup the example publisher object, passing in the URL we will use
|
|
||||||
to connect to RabbitMQ.
|
|
||||||
|
|
||||||
:param topic string The exchange topic to broadcast on.
|
|
||||||
:param payload string The message payload to send.
|
|
||||||
"""
|
|
||||||
|
|
||||||
self.topic = topic
|
|
||||||
self.payload = payload
|
|
||||||
|
|
||||||
|
|
||||||
def publish(resource, author_id=None, method=None, url=None, path=None,
|
def publish(resource, author_id=None, method=None, url=None, path=None,
|
||||||
query_string=None, status=None, resource_id=None,
|
query_string=None, status=None, resource_id=None,
|
||||||
sub_resource=None, sub_resource_id=None, resource_before=None,
|
sub_resource=None, sub_resource_id=None, resource_before=None,
|
||||||
resource_after=None):
|
resource_after=None):
|
||||||
"""Send a message for an API event to the storyboard exchange. The message
|
|
||||||
will be automatically JSON encoded.
|
|
||||||
|
|
||||||
:param resource: The extrapolated resource type (project, story, etc).
|
publisher_module = importutils.import_module(
|
||||||
:param author_id: The ID of the author who performed this action.
|
'storyboard.notifications.' + CONF.notifications.driver + '.publisher')
|
||||||
:param method: The HTTP Method used.
|
publisher_module.publish(resource, author_id=author_id, method=method,
|
||||||
:param url: The Referer header from the request.
|
url=url, path=path, query_string=query_string,
|
||||||
:param path: The HTTP Path used.
|
status=status, resource_id=resource_id,
|
||||||
:param query_string: The HTTP query string used.
|
sub_resource=sub_resource,
|
||||||
:param status: The HTTP Status code of the response.
|
sub_resource_id=sub_resource_id,
|
||||||
:param resource_id: The ID of the resource.
|
resource_before=resource_before,
|
||||||
:param sub_resource: The extracted subresource (user_token, etc)
|
resource_after=resource_after)
|
||||||
:param sub_resource_id: THe ID of the subresource.
|
|
||||||
:param resource_before: The resource state before this event occurred.
|
|
||||||
:param resource_after: The resource state after this event occurred.
|
|
||||||
"""
|
|
||||||
global PUBLISHER
|
|
||||||
|
|
||||||
if not PUBLISHER:
|
|
||||||
CONF.register_opts(NOTIFICATION_OPTS, "notifications")
|
|
||||||
PUBLISHER = Publisher(CONF.notifications)
|
|
||||||
PUBLISHER.start()
|
|
||||||
|
|
||||||
payload = {
|
|
||||||
"author_id": author_id,
|
|
||||||
"method": method,
|
|
||||||
"url": url,
|
|
||||||
"path": path,
|
|
||||||
"query_string": query_string,
|
|
||||||
"status": status,
|
|
||||||
"resource": resource,
|
|
||||||
"resource_id": resource_id,
|
|
||||||
"sub_resource": sub_resource,
|
|
||||||
"sub_resource_id": sub_resource_id,
|
|
||||||
"resource_before": resource_before,
|
|
||||||
"resource_after": resource_after
|
|
||||||
}
|
|
||||||
|
|
||||||
if resource:
|
|
||||||
PUBLISHER.publish_message(resource, payload)
|
|
||||||
else:
|
|
||||||
LOG.warning("Attempted to send payload with no destination resource.")
|
|
||||||
|
|
|
@ -23,7 +23,7 @@ import six
|
||||||
|
|
||||||
import storyboard.db.api.base as db_api
|
import storyboard.db.api.base as db_api
|
||||||
from storyboard.notifications.notification_hook import class_mappings
|
from storyboard.notifications.notification_hook import class_mappings
|
||||||
from storyboard.notifications.subscriber import subscribe
|
from storyboard.notifications.pika.subscriber import subscribe
|
||||||
from storyboard._i18n import _LI, _LW
|
from storyboard._i18n import _LI, _LW
|
||||||
from storyboard.plugin.base import PluginBase
|
from storyboard.plugin.base import PluginBase
|
||||||
|
|
||||||
|
|
|
@ -221,7 +221,7 @@ class TestNotificationHook(base.BaseDbTestCase):
|
||||||
self.assertEqual(mock_state.old_entity_values['priority'],
|
self.assertEqual(mock_state.old_entity_values['priority'],
|
||||||
sample_task_wmodel.priority)
|
sample_task_wmodel.priority)
|
||||||
|
|
||||||
@patch('storyboard.notifications.notification_hook.publish')
|
@patch('storyboard.notifications.notification_hook.publisher')
|
||||||
@patch.object(NotificationHook, 'get_original_resource')
|
@patch.object(NotificationHook, 'get_original_resource')
|
||||||
def test_after_publishes_payload(self, mock_get_original_resource,
|
def test_after_publishes_payload(self, mock_get_original_resource,
|
||||||
mock_publish):
|
mock_publish):
|
||||||
|
@ -261,7 +261,7 @@ class TestNotificationHook(base.BaseDbTestCase):
|
||||||
mock_get_original_resource.return_value = smt_json
|
mock_get_original_resource.return_value = smt_json
|
||||||
|
|
||||||
n.after(mock_state)
|
n.after(mock_state)
|
||||||
mock_publish.assert_called_with(
|
mock_publish.publish.assert_called_with(
|
||||||
author_id=mock_state.request.current_user_id,
|
author_id=mock_state.request.current_user_id,
|
||||||
method=mock_state.request.method,
|
method=mock_state.request.method,
|
||||||
url=mock_state.request.headers['Referer'],
|
url=mock_state.request.headers['Referer'],
|
||||||
|
|
Loading…
Reference in New Issue