Revert "Make notification driver configurable"

This reverts commit c7e0e458ff because
it causes a regression. The noted symptom is that OpenID logins
consistently failed and/or timed out with "Lock wait timeout
exceeded; try restarting transaction" in the logs raised by the
"UPDATE users SET updated_at=%(updated_at)s,
last_login=%(last_login)s WHERE users.id = %(users_id)s" query.
Diagnosis revealed no persistently open tables where in_use>0 and no
unexpected processes reported in the MySQL show processlist output.
Restarts of apache2 and storyboard-workers services yielded no
change in behavior either, nor did restarting the VM providing
MySQL. Ultimately, bisecting the storyboard codebase revealed that
with this change applied the symptom was reliably manifest but the
regression disappeared on the commit before it merged.

Change-Id: I3d40abe04f53e9086020ec88e8610423197c3e9f
This commit is contained in:
Jeremy Stanley 2018-06-11 21:11:00 +00:00
parent 1686499b6e
commit 5b8266593f
11 changed files with 211 additions and 283 deletions

View File

@ -26,7 +26,7 @@ from storyboard.db.api import base as api_base
from storyboard.db.api import stories as stories_api
from storyboard.db.api import tasks as tasks_api
from storyboard.db import models
from storyboard.notifications import publisher
from storyboard.notifications.publisher import publish
CONF = cfg.CONF
@ -112,15 +112,15 @@ def event_create(values):
event_dict = tojson(TimeLineEvent,
TimeLineEvent.from_db_model(new_event))
publisher.publish(author_id=request.current_user_id or None,
method="POST",
url=request.headers.get('Referer') or None,
path=request.path or None,
query_string=request.query_string or None,
status=response.status_code or None,
resource="timeline_event",
resource_id=new_event.id or None,
resource_after=event_dict or None)
publish(author_id=request.current_user_id or None,
method="POST",
url=request.headers.get('Referer') or None,
path=request.path or None,
query_string=request.query_string or None,
status=response.status_code or None,
resource="timeline_event",
resource_id=new_event.id or None,
resource_after=event_dict or None)
return new_event

View File

@ -1,4 +1,4 @@
# Copyright (c) 2018 IBM Corp.
# Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -17,7 +17,31 @@ from oslo_config import cfg
CONF = cfg.CONF
OPTS = [
cfg.StrOpt('driver', choices=['pika'],
help='The notification driver to use', default='pika')
NOTIFICATION_OPTS = [
cfg.StrOpt("rabbit_exchange_name", default="storyboard",
help="The name of the topic exchange which storyboard will "
"use to broadcast its events."),
cfg.StrOpt("rabbit_event_queue_name", default="storyboard_events",
help="The name of the queue that will be created for "
"API events."),
cfg.StrOpt("rabbit_application_name", default="storyboard",
help="The rabbit application identifier for storyboard's "
"connection."),
cfg.StrOpt("rabbit_host", default="localhost",
help="Host of the rabbitmq server."),
cfg.StrOpt("rabbit_login_method", default="AMQPLAIN",
help="The RabbitMQ login method."),
cfg.StrOpt("rabbit_userid", default="storyboard",
help="The RabbitMQ userid."),
cfg.StrOpt("rabbit_password", default="storyboard",
help="The RabbitMQ password."),
cfg.IntOpt("rabbit_port", default=5672,
help="The RabbitMQ broker port where a single node is used."),
cfg.StrOpt("rabbit_virtual_host", default="/",
help="The virtual host within which our queues and exchanges "
"live."),
cfg.IntOpt("rabbit_connection_attempts", default=6,
help="The number of connection attempts before giving-up"),
cfg.IntOpt("rabbit_retry_delay", default=10,
help="The interval between connection attempts (in seconds)")
]

View File

@ -23,7 +23,7 @@ from storyboard.api.v1 import wmodels
import storyboard.common.hook_priorities as priority
from storyboard.db.api import base as api_base
from storyboard.db import models
from storyboard.notifications import publisher
from storyboard.notifications.publisher import publish
class_mappings = {'task': [models.Task, wmodels.Task],
@ -110,18 +110,18 @@ class NotificationHook(hooks.PecanHook):
# Build the payload. Use of None is included to ensure that we don't
# accidentally blow up the API call, but we don't anticipate it
# happening.
publisher.publish(author_id=request.current_user_id,
method=request.method,
url=request.headers.get('Referer'),
path=request.path,
query_string=request.query_string,
status=response.status_code,
resource=resource,
resource_id=resource_id,
sub_resource=subresource,
sub_resource_id=subresource_id,
resource_before=old_resource,
resource_after=new_resource)
publish(author_id=request.current_user_id,
method=request.method,
url=request.headers.get('Referer'),
path=request.path,
query_string=request.query_string,
status=response.status_code,
resource=resource,
resource_id=resource_id,
sub_resource=subresource,
sub_resource_id=subresource_id,
resource_before=old_resource,
resource_after=new_resource)
def get_original_resource(self, resource, resource_id):
"""Given a resource name and ID, will load that resource and map it

View File

@ -1,56 +0,0 @@
# Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_config import cfg
CONF = cfg.CONF
NOTIFICATION_OPTS = [
cfg.StrOpt("rabbit_exchange_name", default="storyboard",
help="The name of the topic exchange which storyboard will "
"use to broadcast its events.",
deprecated_group='notifications'),
cfg.StrOpt("rabbit_event_queue_name", default="storyboard_events",
help="The name of the queue that will be created for "
"API events.",
deprecated_group='notifications'),
cfg.StrOpt("rabbit_application_name", default="storyboard",
help="The rabbit application identifier for storyboard's "
"connection.",
deprecated_group='notifications'),
cfg.StrOpt("rabbit_host", default="localhost",
help="Host of the rabbitmq server.",
deprecated_group='notifications'),
cfg.StrOpt("rabbit_login_method", default="AMQPLAIN",
help="The RabbitMQ login method.",
deprecated_group='notifications'),
cfg.StrOpt("rabbit_userid", default="storyboard",
help="The RabbitMQ userid.",
deprecated_group='notifications'),
cfg.StrOpt("rabbit_password", default="storyboard",
help="The RabbitMQ password.",
deprecated_group='notifications'),
cfg.IntOpt("rabbit_port", default=5672,
help="The RabbitMQ broker port where a single node is used."),
cfg.StrOpt("rabbit_virtual_host", default="/",
help="The virtual host within which our queues and exchanges "
"live.", deprecated_group='notifications'),
cfg.IntOpt("rabbit_connection_attempts", default=6,
help="The number of connection attempts before giving-up",
deprecated_group='notifications'),
cfg.IntOpt("rabbit_retry_delay", default=10,
help="The interval between connection attempts (in seconds)",
deprecated_group='notifications')
]

View File

@ -1,181 +0,0 @@
# Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
from oslo_config import cfg
from oslo_log import log
from pika.exceptions import ConnectionClosed
from storyboard.notifications.pika.conf import NOTIFICATION_OPTS
from storyboard.notifications.pika.connection_service import ConnectionService
from storyboard._i18n import _, _LW, _LE
CONF = cfg.CONF
LOG = log.getLogger(__name__)
PUBLISHER = None
class Publisher(ConnectionService):
"""A generic message publisher that uses delivery confirmation to ensure
that messages are delivered, and will keep a running cache of unsent
messages while the publisher is attempting to reconnect.
"""
def __init__(self, conf):
"""Setup the publisher instance based on our configuration.
:param conf A configuration object.
"""
super(Publisher, self).__init__(conf)
self._pending = list()
self.add_open_hook(self._publish_pending)
def _publish_pending(self):
"""Publishes any pending messages that were broadcast while the
publisher was connecting.
"""
# Shallow copy, so we can iterate over it without having it be modified
# out of band.
pending = list(self._pending)
for payload in pending:
self._publish(payload)
def _publish(self, payload):
"""Publishes a payload to the passed exchange. If it encounters a
failure, will store the payload for later.
:param Payload payload: The payload to send.
"""
LOG.debug(_("Sending message to %(name)s [%(topic)s]") %
{'name': self._exchange_name, 'topic': payload.topic})
# First check, are we closing?
if self._closing:
LOG.warning(_LW("Cannot send message, publisher is closing."))
if payload not in self._pending:
self._pending.append(payload)
return
# Second check, are we open?
if not self._open:
LOG.debug(_("Cannot send message, publisher is connecting."))
if payload not in self._pending:
self._pending.append(payload)
self._reconnect()
return
# Third check, are we in a sane state? This should never happen,
# but just in case...
if not self._connection or not self._channel:
LOG.error(_LE("Cannot send message, publisher is "
"an unexpected state."))
if payload not in self._pending:
self._pending.append(payload)
self._reconnect()
return
# Try to send a message. If we fail, schedule a reconnect and store
# the message.
try:
self._channel.basic_publish(self._exchange_name,
payload.topic,
json.dumps(payload.payload,
ensure_ascii=False),
self._properties)
if payload in self._pending:
self._pending.remove(payload)
return True
except (ConnectionClosed, AttributeError) as cc:
LOG.warning(_LW("Attempted to send message on closed connection."))
LOG.debug(cc)
self._open = False
if payload not in self._pending:
self._pending.append(payload)
self._reconnect()
return False
def publish_message(self, topic, payload):
"""Publishes a message to RabbitMQ.
"""
self._publish(Payload(topic, payload))
class Payload(object):
def __init__(self, topic, payload):
"""Setup the example publisher object, passing in the URL we will use
to connect to RabbitMQ.
:param topic string The exchange topic to broadcast on.
:param payload string The message payload to send.
"""
self.topic = topic
self.payload = payload
def publish(resource, author_id=None, method=None, url=None, path=None,
query_string=None, status=None, resource_id=None,
sub_resource=None, sub_resource_id=None, resource_before=None,
resource_after=None):
"""Send a message for an API event to the storyboard exchange.
The message will be automatically JSON encoded.
:param resource: The extrapolated resource type (project, story, etc).
:param author_id: The ID of the author who performed this action.
:param method: The HTTP Method used.
:param url: The Referer header from the request.
:param path: The HTTP Path used.
:param query_string: The HTTP query string used.
:param status: The HTTP Status code of the response.
:param resource_id: The ID of the resource.
:param sub_resource: The extracted subresource (user_token, etc)
:param sub_resource_id: THe ID of the subresource.
:param resource_before: The resource state before this event occurred.
:param resource_after: The resource state after this event occurred.
"""
global PUBLISHER
if not PUBLISHER:
CONF.register_opts(NOTIFICATION_OPTS, "pika-notifications")
PUBLISHER = Publisher(CONF.pika_notifications)
PUBLISHER.start()
payload = {
"author_id": author_id,
"method": method,
"url": url,
"path": path,
"query_string": query_string,
"status": status,
"resource": resource,
"resource_id": resource_id,
"sub_resource": sub_resource,
"sub_resource_id": sub_resource_id,
"resource_before": resource_before,
"resource_after": resource_after
}
if resource:
PUBLISHER.publish_message(resource, payload)
else:
LOG.warning("Attempted to send payload with no destination "
"resource.")

View File

@ -1,4 +1,4 @@
# Copyright (c) 2018 IBM Corp.
# Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -13,26 +13,167 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_config import cfg
from oslo_utils import importutils
import json
from oslo_config import cfg
from oslo_log import log
from pika.exceptions import ConnectionClosed
from storyboard.notifications.conf import NOTIFICATION_OPTS
from storyboard.notifications.connection_service import ConnectionService
from storyboard._i18n import _, _LW, _LE
from storyboard.notifications import conf
CONF = cfg.CONF
CONF.register_opts(conf.OPTS, 'notifications')
LOG = log.getLogger(__name__)
PUBLISHER = None
class Publisher(ConnectionService):
"""A generic message publisher that uses delivery confirmation to ensure
that messages are delivered, and will keep a running cache of unsent
messages while the publisher is attempting to reconnect.
"""
def __init__(self, conf):
"""Setup the publisher instance based on our configuration.
:param conf A configuration object.
"""
super(Publisher, self).__init__(conf)
self._pending = list()
self.add_open_hook(self._publish_pending)
def _publish_pending(self):
"""Publishes any pending messages that were broadcast while the
publisher was connecting.
"""
# Shallow copy, so we can iterate over it without having it be modified
# out of band.
pending = list(self._pending)
for payload in pending:
self._publish(payload)
def _publish(self, payload):
"""Publishes a payload to the passed exchange. If it encounters a
failure, will store the payload for later.
:param Payload payload: The payload to send.
"""
LOG.debug(_("Sending message to %(name)s [%(topic)s]") %
{'name': self._exchange_name, 'topic': payload.topic})
# First check, are we closing?
if self._closing:
LOG.warning(_LW("Cannot send message, publisher is closing."))
if payload not in self._pending:
self._pending.append(payload)
return
# Second check, are we open?
if not self._open:
LOG.debug(_("Cannot send message, publisher is connecting."))
if payload not in self._pending:
self._pending.append(payload)
self._reconnect()
return
# Third check, are we in a sane state? This should never happen,
# but just in case...
if not self._connection or not self._channel:
LOG.error(_LE("Cannot send message, publisher is "
"an unexpected state."))
if payload not in self._pending:
self._pending.append(payload)
self._reconnect()
return
# Try to send a message. If we fail, schedule a reconnect and store
# the message.
try:
self._channel.basic_publish(self._exchange_name,
payload.topic,
json.dumps(payload.payload,
ensure_ascii=False),
self._properties)
if payload in self._pending:
self._pending.remove(payload)
return True
except (ConnectionClosed, AttributeError) as cc:
LOG.warning(_LW("Attempted to send message on closed connection."))
LOG.debug(cc)
self._open = False
if payload not in self._pending:
self._pending.append(payload)
self._reconnect()
return False
def publish_message(self, topic, payload):
"""Publishes a message to RabbitMQ.
"""
self._publish(Payload(topic, payload))
class Payload(object):
def __init__(self, topic, payload):
"""Setup the example publisher object, passing in the URL we will use
to connect to RabbitMQ.
:param topic string The exchange topic to broadcast on.
:param payload string The message payload to send.
"""
self.topic = topic
self.payload = payload
def publish(resource, author_id=None, method=None, url=None, path=None,
query_string=None, status=None, resource_id=None,
sub_resource=None, sub_resource_id=None, resource_before=None,
resource_after=None):
"""Send a message for an API event to the storyboard exchange. The message
will be automatically JSON encoded.
publisher_module = importutils.import_module(
'storyboard.notifications.' + CONF.notifications.driver + '.publisher')
publisher_module.publish(resource, author_id=author_id, method=method,
url=url, path=path, query_string=query_string,
status=status, resource_id=resource_id,
sub_resource=sub_resource,
sub_resource_id=sub_resource_id,
resource_before=resource_before,
resource_after=resource_after)
:param resource: The extrapolated resource type (project, story, etc).
:param author_id: The ID of the author who performed this action.
:param method: The HTTP Method used.
:param url: The Referer header from the request.
:param path: The HTTP Path used.
:param query_string: The HTTP query string used.
:param status: The HTTP Status code of the response.
:param resource_id: The ID of the resource.
:param sub_resource: The extracted subresource (user_token, etc)
:param sub_resource_id: THe ID of the subresource.
:param resource_before: The resource state before this event occurred.
:param resource_after: The resource state after this event occurred.
"""
global PUBLISHER
if not PUBLISHER:
CONF.register_opts(NOTIFICATION_OPTS, "notifications")
PUBLISHER = Publisher(CONF.notifications)
PUBLISHER.start()
payload = {
"author_id": author_id,
"method": method,
"url": url,
"path": path,
"query_string": query_string,
"status": status,
"resource": resource,
"resource_id": resource_id,
"sub_resource": sub_resource,
"sub_resource_id": sub_resource_id,
"resource_before": resource_before,
"resource_after": resource_after
}
if resource:
PUBLISHER.publish_message(resource, payload)
else:
LOG.warning("Attempted to send payload with no destination resource.")

View File

@ -21,8 +21,8 @@ from oslo_log import log
from pika.exceptions import ConnectionClosed
from stevedore import enabled
from storyboard.notifications.pika.conf import NOTIFICATION_OPTS
from storyboard.notifications.pika.connection_service import ConnectionService
from storyboard.notifications.conf import NOTIFICATION_OPTS
from storyboard.notifications.connection_service import ConnectionService
from storyboard._i18n import _, _LW

View File

@ -23,7 +23,7 @@ import six
import storyboard.db.api.base as db_api
from storyboard.notifications.notification_hook import class_mappings
from storyboard.notifications.pika.subscriber import subscribe
from storyboard.notifications.subscriber import subscribe
from storyboard._i18n import _LI, _LW
from storyboard.plugin.base import PluginBase

View File

@ -221,7 +221,7 @@ class TestNotificationHook(base.BaseDbTestCase):
self.assertEqual(mock_state.old_entity_values['priority'],
sample_task_wmodel.priority)
@patch('storyboard.notifications.notification_hook.publisher')
@patch('storyboard.notifications.notification_hook.publish')
@patch.object(NotificationHook, 'get_original_resource')
def test_after_publishes_payload(self, mock_get_original_resource,
mock_publish):
@ -261,7 +261,7 @@ class TestNotificationHook(base.BaseDbTestCase):
mock_get_original_resource.return_value = smt_json
n.after(mock_state)
mock_publish.publish.assert_called_with(
mock_publish.assert_called_with(
author_id=mock_state.request.current_user_id,
method=mock_state.request.method,
url=mock_state.request.headers['Referer'],