Remove the deprecated Pika driver

It is recommended that all users of the Pika driver transition to
using the Rabbit driver instead.  Typically this is done by changing
the prefix of the transport_url configuration option from "pika://..."
to "rabbit://...".  There are no changes required to the RabbitMQ
server configuration.

Change-Id: I52ea5ccb7e7c247abd95e2d8d50dac4c4ad11246
Closes-Bug: #1744741
This commit is contained in:
Kenneth Giusti 2018-01-23 15:23:15 -05:00
parent dec178257b
commit 222a939361
30 changed files with 13 additions and 4551 deletions

View File

@ -11,13 +11,6 @@
tox_envlist: py27-func-kafka
bindep_profile: kafka
- job:
name: oslo.messaging-tox-py27-func-pika
parent: openstack-tox-py27
vars:
tox_envlist: py27-func-pika
bindep_profile: pika
- job:
name: oslo.messaging-tox-py27-func-rabbit
parent: openstack-tox-py27
@ -76,17 +69,6 @@
- openstack-infra/devstack-gate
- openstack/oslo.messaging
- job:
name: oslo.messaging-src-dsvm-full-pika-default
parent: legacy-dsvm-base
run: playbooks/oslo.messaging-src-dsvm-full-pika-default/run.yaml
post-run: playbooks/oslo.messaging-src-dsvm-full-pika-default/post.yaml
timeout: 10800
required-projects:
- openstack-infra/devstack-gate
- openstack/devstack-plugin-pika
- openstack/oslo.messaging
- job:
name: oslo.messaging-src-dsvm-full-amqp1-dual-centos-7
parent: legacy-dsvm-base
@ -211,24 +193,6 @@
- openstack/dib-utils
- openstack/diskimage-builder
- job:
name: oslo.messaging-telemetry-dsvm-integration-pika
parent: legacy-dsvm-base
run: playbooks/oslo.messaging-telemetry-dsvm-integration-pika/run.yaml
post-run: playbooks/oslo.messaging-telemetry-dsvm-integration-pika/post.yaml
timeout: 4200
required-projects:
- openstack-infra/devstack-gate
- openstack/aodh
- openstack/ceilometer
- openstack/devstack-plugin-pika
- openstack/oslo.messaging
- openstack/panko
# following are required when DEVSTACK_GATE_HEAT, which this
# job turns on
- openstack/dib-utils
- openstack/diskimage-builder
- job:
name: oslo.messaging-telemetry-dsvm-integration-zmq
parent: legacy-dsvm-base
@ -304,19 +268,6 @@
- openstack/oslo.messaging
- openstack/tempest
- job:
name: oslo.messaging-tempest-neutron-dsvm-src-pika-default
parent: legacy-dsvm-base
run: playbooks/oslo.messaging-tempest-neutron-dsvm-src-pika-default/run.yaml
post-run: playbooks/oslo.messaging-tempest-neutron-dsvm-src-pika-default/post.yaml
timeout: 7800
required-projects:
- openstack-infra/devstack-gate
- openstack/devstack-plugin-pika
- openstack/neutron
- openstack/oslo.messaging
- openstack/tempest
- job:
name: oslo.messaging-tempest-neutron-dsvm-src-zmq-default
parent: legacy-dsvm-base
@ -338,7 +289,6 @@
voting: false
- oslo.messaging-tox-py27-func-kafka:
voting: false
- oslo.messaging-tox-py27-func-pika
- oslo.messaging-tox-py27-func-rabbit
- oslo.messaging-tox-py27-func-zmq-proxy:
voting: false
@ -364,8 +314,6 @@
voting: false
- oslo.messaging-src-dsvm-full-kafka-default:
voting: false
- oslo.messaging-src-dsvm-full-pika-default:
voting: false
- oslo.messaging-src-dsvm-full-zmq-default:
voting: false
@ -379,8 +327,6 @@
voting: false
- oslo.messaging-telemetry-dsvm-integration-kafka:
voting: false
- oslo.messaging-telemetry-dsvm-integration-pika:
voting: false
- oslo.messaging-telemetry-dsvm-integration-zmq:
voting: false
@ -390,15 +336,12 @@
branches: ^(?!stable/ocata).*$
- oslo.messaging-tempest-neutron-dsvm-src-kafka-default:
voting: false
- oslo.messaging-tempest-neutron-dsvm-src-pika-default:
voting: false
- oslo.messaging-tempest-neutron-dsvm-src-zmq-default:
voting: false
gate:
jobs:
- oslo.messaging-tox-py27-func-rabbit
- oslo.messaging-tox-py27-func-pika
- oslo.messaging-telemetry-dsvm-integration-rabbit
- oslo.messaging-src-dsvm-full-rabbit-default
- oslo.messaging-tempest-neutron-dsvm-src-rabbit-default

View File

@ -10,11 +10,9 @@ make [platform:rpm]
pkgconfig [platform:rpm]
libffi-devel [platform:rpm]
# kombu/pika dpkg
rabbitmq-server [platform:dpkg rabbit pika]
# kombu/pika rpm
rabbitmq-server [platform:rpm rabbit pika]
# RabbitMQ message broker
rabbitmq-server [platform:dpkg rabbit]
rabbitmq-server [platform:rpm rabbit]
# zmq
redis [platform:rpm zmq]

View File

@ -7,5 +7,4 @@ Deployment Guide
drivers
AMQP1.0
pika_driver
zmq_driver

View File

@ -1,160 +0,0 @@
------------------------------
Pika Driver Deployment Guide
------------------------------
.. currentmodule:: oslo_messaging
.. warning:: the Pika driver is no longer maintained and will be
removed from Oslo.Messaging at a future date. It is recommended that
all users of the Pika driver transition to using the Rabbit driver.
============
Introduction
============
Pika is a pure-Python implementation of the AMQP 0-9-1 protocol including
RabbitMQ's extensions. It is very actively supported and recommended by
RabbitMQ developers
========
Abstract
========
PikaDriver is one of oslo.messaging backend drivers. It supports RPC and Notify
patterns. Currently it could be the only oslo.messaging driver across the
OpenStack cluster. This document provides deployment information for this
driver in oslo_messaging.
This driver is able to work with single instance of RabbitMQ server or
RabbitMQ cluster.
=============
Configuration
=============
Enabling (mandatory)
--------------------
To enable the driver, in the section [DEFAULT] of the conf file,
the 'transport_url' parameter should be set to
`pika://user:pass@host1:port[,hostN:portN]`
[DEFAULT]
transport_url = pika://guest:guest@localhost:5672
Connection options (optional)
-----------------------------
In section [oslo_messaging_pika]:
#. channel_max - Maximum number of channels to allow,
#. frame_max (default - pika default value): The maximum byte size for
an AMQP frame,
#. heartbeat_interval (default=1): How often to send heartbeats for
consumer's connections in seconds. If 0 - disable heartbeats,
#. ssl (default=False): Enable SSL if True,
#. ssl_options (default=None): Arguments passed to ssl.wrap_socket,
#. socket_timeout (default=0.25): Set timeout for opening new connection's
socket,
#. tcp_user_timeout (default=0.25): Set TCP_USER_TIMEOUT in seconds for
connection's socket,
#. host_connection_reconnect_delay (default=0.25): Set delay for reconnection
to some host after connection error
Connection pool options (optional)
----------------------------------
In section [oslo_messaging_pika]:
#. pool_max_size (default=10): Maximum number of connections to keep queued,
#. pool_max_overflow (default=0): Maximum number of connections to create above
`pool_max_size`,
#. pool_timeout (default=30): Default number of seconds to wait for a
connections to available,
#. pool_recycle (default=600): Lifetime of a connection (since creation) in
seconds or None for no recycling. Expired connections are closed on acquire,
#. pool_stale (default=60): Threshold at which inactive (since release)
connections are considered stale in seconds or None for no staleness.
Stale connections are closed on acquire.")
RPC related options (optional)
------------------------------
In section [oslo_messaging_pika]:
#. rpc_queue_expiration (default=60): Time to live for rpc queues without
consumers in seconds,
#. default_rpc_exchange (default="${control_exchange}_rpc"): Exchange name for
sending RPC messages,
#. rpc_reply_exchange', default=("${control_exchange}_rpc_reply"): Exchange
name for receiving RPC replies,
#. rpc_listener_prefetch_count (default=100): Max number of not acknowledged
message which RabbitMQ can send to rpc listener,
#. rpc_reply_listener_prefetch_count (default=100): Max number of not
acknowledged message which RabbitMQ can send to rpc reply listener,
#. rpc_reply_retry_attempts (default=-1): Reconnecting retry count in case of
connectivity problem during sending reply. -1 means infinite retry during
rpc_timeout,
#. rpc_reply_retry_delay (default=0.25) Reconnecting retry delay in case of
connectivity problem during sending reply,
#. default_rpc_retry_attempts (default=-1): Reconnecting retry count in case of
connectivity problem during sending RPC message, -1 means infinite retry. If
actual retry attempts in not 0 the rpc request could be processed more than
one time,
#. rpc_retry_delay (default=0.25): Reconnecting retry delay in case of
connectivity problem during sending RPC message
$control_exchange in this code is value of [DEFAULT].control_exchange option,
which is "openstack" by default
Notification related options (optional)
---------------------------------------
In section [oslo_messaging_pika]:
#. notification_persistence (default=False): Persist notification messages,
#. default_notification_exchange (default="${control_exchange}_notification"):
Exchange name for sending notifications,
#. notification_listener_prefetch_count (default=100): Max number of not
acknowledged message which RabbitMQ can send to notification listener,
#. default_notification_retry_attempts (default=-1): Reconnecting retry count
in case of connectivity problem during sending notification, -1 means
infinite retry,
#. notification_retry_delay (default=0.25): Reconnecting retry delay in case of
connectivity problem during sending notification message
$control_exchange in this code is value of [DEFAULT].control_exchange option,
which is "openstack" by default
DevStack Support
----------------
Pika driver is supported by DevStack. To enable it you should edit
local.conf [localrc] section and add next there:
enable_plugin pika https://git.openstack.org/openstack/devstack-plugin-pika

View File

@ -1,366 +0,0 @@
# Copyright 2011 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from debtcollector import deprecate
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import timeutils
import pika_pool
import tenacity
from oslo_messaging._drivers import base
from oslo_messaging._drivers import common
from oslo_messaging._drivers.pika_driver import (pika_connection_factory as
pika_drv_conn_factory)
from oslo_messaging._drivers.pika_driver import pika_commons as pika_drv_cmns
from oslo_messaging._drivers.pika_driver import pika_engine as pika_drv_engine
from oslo_messaging._drivers.pika_driver import pika_exceptions as pika_drv_exc
from oslo_messaging._drivers.pika_driver import pika_listener as pika_drv_lstnr
from oslo_messaging._drivers.pika_driver import pika_message as pika_drv_msg
from oslo_messaging._drivers.pika_driver import pika_poller as pika_drv_poller
from oslo_messaging import exceptions
LOG = logging.getLogger(__name__)
pika_pool_opts = [
cfg.IntOpt('pool_max_size', default=30,
help="Maximum number of connections to keep queued."),
cfg.IntOpt('pool_max_overflow', default=0,
help="Maximum number of connections to create above "
"`pool_max_size`."),
cfg.IntOpt('pool_timeout', default=30,
help="Default number of seconds to wait for a connections to "
"available"),
cfg.IntOpt('pool_recycle', default=600,
help="Lifetime of a connection (since creation) in seconds "
"or None for no recycling. Expired connections are "
"closed on acquire."),
cfg.IntOpt('pool_stale', default=60,
help="Threshold at which inactive (since release) connections "
"are considered stale in seconds or None for no "
"staleness. Stale connections are closed on acquire.")
]
message_opts = [
cfg.StrOpt('default_serializer_type', default='json',
choices=('json', 'msgpack'),
help="Default serialization mechanism for "
"serializing/deserializing outgoing/incoming messages")
]
notification_opts = [
cfg.BoolOpt('notification_persistence', default=False,
help="Persist notification messages."),
cfg.StrOpt('default_notification_exchange',
default="${control_exchange}_notification",
help="Exchange name for sending notifications"),
cfg.IntOpt(
'notification_listener_prefetch_count', default=100,
help="Max number of not acknowledged message which RabbitMQ can send "
"to notification listener."
),
cfg.IntOpt(
'default_notification_retry_attempts', default=-1,
help="Reconnecting retry count in case of connectivity problem during "
"sending notification, -1 means infinite retry."
),
cfg.FloatOpt(
'notification_retry_delay', default=0.25,
help="Reconnecting retry delay in case of connectivity problem during "
"sending notification message"
)
]
rpc_opts = [
cfg.IntOpt('rpc_queue_expiration', default=60,
help="Time to live for rpc queues without consumers in "
"seconds."),
cfg.StrOpt('default_rpc_exchange', default="${control_exchange}_rpc",
help="Exchange name for sending RPC messages"),
cfg.StrOpt('rpc_reply_exchange', default="${control_exchange}_rpc_reply",
help="Exchange name for receiving RPC replies"),
cfg.IntOpt(
'rpc_listener_prefetch_count', default=100,
help="Max number of not acknowledged message which RabbitMQ can send "
"to rpc listener."
),
cfg.IntOpt(
'rpc_reply_listener_prefetch_count', default=100,
help="Max number of not acknowledged message which RabbitMQ can send "
"to rpc reply listener."
),
cfg.IntOpt(
'rpc_reply_retry_attempts', default=-1,
help="Reconnecting retry count in case of connectivity problem during "
"sending reply. -1 means infinite retry during rpc_timeout"
),
cfg.FloatOpt(
'rpc_reply_retry_delay', default=0.25,
help="Reconnecting retry delay in case of connectivity problem during "
"sending reply."
),
cfg.IntOpt(
'default_rpc_retry_attempts', default=-1,
help="Reconnecting retry count in case of connectivity problem during "
"sending RPC message, -1 means infinite retry. If actual "
"retry attempts in not 0 the rpc request could be processed more "
"than one time"
),
cfg.FloatOpt(
'rpc_retry_delay', default=0.25,
help="Reconnecting retry delay in case of connectivity problem during "
"sending RPC message"
)
]
class PikaDriver(base.BaseDriver):
"""Pika Driver
**Warning**: The ``pika`` driver has been deprecated and will be removed in
a future release. It is recommended that all users of the ``pika`` driver
transition to using the ``rabbit`` driver.
"""
def __init__(self, conf, url, default_exchange=None,
allowed_remote_exmods=None):
deprecate("The pika driver is no longer maintained. It has been"
" deprecated",
message="It is recommended that all users of the pika driver"
" transition to using the rabbit driver.",
version="pike", removal_version="rocky")
opt_group = cfg.OptGroup(name='oslo_messaging_pika',
title='Pika driver options')
conf.register_group(opt_group)
conf.register_opts(pika_drv_conn_factory.pika_opts, group=opt_group)
conf.register_opts(pika_pool_opts, group=opt_group)
conf.register_opts(message_opts, group=opt_group)
conf.register_opts(rpc_opts, group=opt_group)
conf.register_opts(notification_opts, group=opt_group)
conf = common.ConfigOptsProxy(conf, url, opt_group.name)
self._pika_engine = pika_drv_engine.PikaEngine(
conf, url, default_exchange, allowed_remote_exmods
)
self._reply_listener = pika_drv_lstnr.RpcReplyPikaListener(
self._pika_engine
)
super(PikaDriver, self).__init__(conf, url, default_exchange,
allowed_remote_exmods)
def require_features(self, requeue=False):
pass
def _declare_rpc_exchange(self, exchange, stopwatch):
timeout = stopwatch.leftover(return_none=True)
with (self._pika_engine.connection_without_confirmation_pool
.acquire(timeout=timeout)) as conn:
try:
self._pika_engine.declare_exchange_by_channel(
conn.channel,
self._pika_engine.get_rpc_exchange_name(
exchange
), "direct", False
)
except pika_pool.Timeout as e:
raise exceptions.MessagingTimeout(
"Timeout for current operation was expired. {}.".format(
str(e)
)
)
def send(self, target, ctxt, message, wait_for_reply=None, timeout=None,
retry=None):
with timeutils.StopWatch(duration=timeout) as stopwatch:
if retry is None:
retry = self._pika_engine.default_rpc_retry_attempts
exchange = self._pika_engine.get_rpc_exchange_name(
target.exchange
)
def on_exception(ex):
if isinstance(ex, pika_drv_exc.ExchangeNotFoundException):
# it is desired to create exchange because if we sent to
# exchange which is not exists, we get ChannelClosed
# exception and need to reconnect
try:
self._declare_rpc_exchange(exchange, stopwatch)
except pika_drv_exc.ConnectionException as e:
LOG.warning("Problem during declaring exchange. %s", e)
return True
elif isinstance(ex, (pika_drv_exc.ConnectionException,
exceptions.MessageDeliveryFailure)):
LOG.warning("Problem during message sending. %s", ex)
return True
else:
return False
if retry:
retrier = tenacity.retry(
stop=(tenacity.stop_never if retry == -1 else
tenacity.stop_after_attempt(retry)),
retry=tenacity.retry_if_exception(on_exception),
wait=tenacity.wait_fixed(self._pika_engine.rpc_retry_delay)
)
else:
retrier = None
if target.fanout:
return self.cast_all_workers(
exchange, target.topic, ctxt, message, stopwatch, retrier
)
routing_key = self._pika_engine.get_rpc_queue_name(
target.topic, target.server, retrier is None
)
msg = pika_drv_msg.RpcPikaOutgoingMessage(self._pika_engine,
message, ctxt)
try:
reply = msg.send(
exchange=exchange,
routing_key=routing_key,
reply_listener=(
self._reply_listener if wait_for_reply else None
),
stopwatch=stopwatch,
retrier=retrier
)
except pika_drv_exc.ExchangeNotFoundException as ex:
try:
self._declare_rpc_exchange(exchange, stopwatch)
except pika_drv_exc.ConnectionException as e:
LOG.warning("Problem during declaring exchange. %s", e)
raise ex
if reply is not None:
if reply.failure is not None:
raise reply.failure
return reply.result
def cast_all_workers(self, exchange, topic, ctxt, message, stopwatch,
retrier=None):
msg = pika_drv_msg.PikaOutgoingMessage(self._pika_engine, message,
ctxt)
try:
msg.send(
exchange=exchange,
routing_key=self._pika_engine.get_rpc_queue_name(
topic, "all_workers", retrier is None
),
mandatory=False,
stopwatch=stopwatch,
retrier=retrier
)
except pika_drv_exc.ExchangeNotFoundException:
try:
self._declare_rpc_exchange(exchange, stopwatch)
except pika_drv_exc.ConnectionException as e:
LOG.warning("Problem during declaring exchange. %s", e)
def _declare_notification_queue_binding(
self, target, stopwatch=pika_drv_cmns.INFINITE_STOP_WATCH):
if stopwatch.expired():
raise exceptions.MessagingTimeout(
"Timeout for current operation was expired."
)
try:
timeout = stopwatch.leftover(return_none=True)
with (self._pika_engine.connection_without_confirmation_pool
.acquire)(timeout=timeout) as conn:
self._pika_engine.declare_queue_binding_by_channel(
conn.channel,
exchange=(
target.exchange or
self._pika_engine.default_notification_exchange
),
queue=target.topic,
routing_key=target.topic,
exchange_type='direct',
queue_expiration=None,
durable=self._pika_engine.notification_persistence,
)
except pika_pool.Timeout as e:
raise exceptions.MessagingTimeout(
"Timeout for current operation was expired. {}.".format(str(e))
)
def send_notification(self, target, ctxt, message, version, retry=None):
if retry is None:
retry = self._pika_engine.default_notification_retry_attempts
def on_exception(ex):
if isinstance(ex, (pika_drv_exc.ExchangeNotFoundException,
pika_drv_exc.RoutingException)):
LOG.warning("Problem during sending notification. %s", ex)
try:
self._declare_notification_queue_binding(target)
except pika_drv_exc.ConnectionException as e:
LOG.warning("Problem during declaring notification queue "
"binding. %s", e)
return True
elif isinstance(ex, (pika_drv_exc.ConnectionException,
pika_drv_exc.MessageRejectedException)):
LOG.warning("Problem during sending notification. %s", ex)
return True
else:
return False
if retry:
retrier = tenacity.retry(
stop=(tenacity.stop_never if retry == -1 else
tenacity.stop_after_attempt(retry)),
retry=tenacity.retry_if_exception(on_exception),
wait=tenacity.wait_fixed(
self._pika_engine.notification_retry_delay
)
)
else:
retrier = None
msg = pika_drv_msg.PikaOutgoingMessage(self._pika_engine, message,
ctxt)
return msg.send(
exchange=(
target.exchange or
self._pika_engine.default_notification_exchange
),
routing_key=target.topic,
confirm=True,
mandatory=True,
persistent=self._pika_engine.notification_persistence,
retrier=retrier
)
def listen(self, target, batch_size, batch_timeout):
return pika_drv_poller.RpcServicePikaPoller(
self._pika_engine, target, batch_size, batch_timeout,
self._pika_engine.rpc_listener_prefetch_count
)
def listen_for_notifications(self, targets_and_priorities, pool,
batch_size, batch_timeout):
return pika_drv_poller.NotificationPikaPoller(
self._pika_engine, targets_and_priorities, batch_size,
batch_timeout,
self._pika_engine.notification_listener_prefetch_count, pool
)
def cleanup(self):
self._reply_listener.cleanup()
self._pika_engine.cleanup()

View File

@ -1,40 +0,0 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import select
import socket
from oslo_serialization.serializer import json_serializer
from oslo_serialization.serializer import msgpack_serializer
from oslo_utils import timeutils
from pika import exceptions as pika_exceptions
import six
PIKA_CONNECTIVITY_ERRORS = (
pika_exceptions.AMQPConnectionError,
pika_exceptions.ConnectionClosed,
pika_exceptions.ChannelClosed,
socket.timeout,
select.error
)
EXCEPTIONS_MODULE = 'exceptions' if six.PY2 else 'builtins'
INFINITE_STOP_WATCH = timeutils.StopWatch(duration=None).start()
MESSAGE_SERIALIZERS = {
'application/json': json_serializer.JSONSerializer(),
'application/msgpack': msgpack_serializer.MessagePackSerializer()
}

View File

@ -1,542 +0,0 @@
# Copyright 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import logging
import os
import threading
import futurist
from pika.adapters import select_connection
from pika import exceptions as pika_exceptions
from pika import spec as pika_spec
from oslo_utils import eventletutils
current_thread = eventletutils.fetch_current_thread_functor()
LOG = logging.getLogger(__name__)
class ThreadSafePikaConnection(object):
def __init__(self, parameters=None,
_impl_class=select_connection.SelectConnection):
self.params = parameters
self._connection_lock = threading.Lock()
self._evt_closed = threading.Event()
self._task_queue = collections.deque()
self._pending_connection_futures = set()
create_connection_future = self._register_pending_future()
def on_open_error(conn, err):
create_connection_future.set_exception(
pika_exceptions.AMQPConnectionError(err)
)
self._impl = _impl_class(
parameters=parameters,
on_open_callback=create_connection_future.set_result,
on_open_error_callback=on_open_error,
on_close_callback=self._on_connection_close,
stop_ioloop_on_close=False,
)
self._interrupt_pipein, self._interrupt_pipeout = os.pipe()
self._impl.ioloop.add_handler(self._interrupt_pipein,
self._impl.ioloop.read_interrupt,
select_connection.READ)
self._thread = threading.Thread(target=self._process_io)
self._thread.daemon = True
self._thread_id = None
self._thread.start()
create_connection_future.result()
def _check_called_not_from_event_loop(self):
if current_thread() == self._thread_id:
raise RuntimeError("This call is not allowed from ioloop thread")
def _execute_task(self, func, *args, **kwargs):
if current_thread() == self._thread_id:
return func(*args, **kwargs)
future = futurist.Future()
self._task_queue.append((func, args, kwargs, future))
if self._evt_closed.is_set():
self._notify_all_futures_connection_close()
elif self._interrupt_pipeout is not None:
os.write(self._interrupt_pipeout, b'X')
return future.result()
def _register_pending_future(self):
future = futurist.Future()
self._pending_connection_futures.add(future)
def on_done_callback(fut):
try:
self._pending_connection_futures.remove(fut)
except KeyError:
pass
future.add_done_callback(on_done_callback)
if self._evt_closed.is_set():
self._notify_all_futures_connection_close()
return future
def _notify_all_futures_connection_close(self):
while self._task_queue:
try:
method_res_future = self._task_queue.pop()[3]
except KeyError:
break
else:
method_res_future.set_exception(
pika_exceptions.ConnectionClosed()
)
while self._pending_connection_futures:
try:
pending_connection_future = (
self._pending_connection_futures.pop()
)
except KeyError:
break
else:
pending_connection_future.set_exception(
pika_exceptions.ConnectionClosed()
)
def _on_connection_close(self, conn, reply_code, reply_text):
self._evt_closed.set()
self._notify_all_futures_connection_close()
if self._interrupt_pipeout:
os.close(self._interrupt_pipeout)
os.close(self._interrupt_pipein)
def add_on_close_callback(self, callback):
return self._execute_task(self._impl.add_on_close_callback, callback)
def _do_process_io(self):
while self._task_queue:
func, args, kwargs, future = self._task_queue.pop()
try:
res = func(*args, **kwargs)
except BaseException as e:
LOG.exception(e)
future.set_exception(e)
else:
future.set_result(res)
self._impl.ioloop.poll()
self._impl.ioloop.process_timeouts()
def _process_io(self):
self._thread_id = current_thread()
while not self._evt_closed.is_set():
try:
self._do_process_io()
except BaseException:
LOG.exception("Error during processing connection's IO")
def close(self, *args, **kwargs):
self._check_called_not_from_event_loop()
res = self._execute_task(self._impl.close, *args, **kwargs)
self._evt_closed.wait()
self._thread.join()
return res
def channel(self, channel_number=None):
self._check_called_not_from_event_loop()
channel_opened_future = self._register_pending_future()
impl_channel = self._execute_task(
self._impl.channel,
on_open_callback=channel_opened_future.set_result,
channel_number=channel_number
)
# Create our proxy channel
channel = ThreadSafePikaChannel(impl_channel, self)
# Link implementation channel with our proxy channel
impl_channel._set_cookie(channel)
channel_opened_future.result()
return channel
def add_timeout(self, timeout, callback):
return self._execute_task(self._impl.add_timeout, timeout, callback)
def remove_timeout(self, timeout_id):
return self._execute_task(self._impl.remove_timeout, timeout_id)
@property
def is_closed(self):
return self._impl.is_closed
@property
def is_closing(self):
return self._impl.is_closing
@property
def is_open(self):
return self._impl.is_open
class ThreadSafePikaChannel(object): # pylint: disable=R0904,R0902
def __init__(self, channel_impl, connection):
self._impl = channel_impl
self._connection = connection
self._delivery_confirmation = False
self._message_returned = False
self._current_future = None
self._evt_closed = threading.Event()
self.add_on_close_callback(self._on_channel_close)
def _execute_task(self, func, *args, **kwargs):
return self._connection._execute_task(func, *args, **kwargs)
def _on_channel_close(self, channel, reply_code, reply_text):
self._evt_closed.set()
if self._current_future:
self._current_future.set_exception(
pika_exceptions.ChannelClosed(reply_code, reply_text))
def _on_message_confirmation(self, frame):
self._current_future.set_result(frame)
def add_on_close_callback(self, callback):
self._execute_task(self._impl.add_on_close_callback, callback)
def add_on_cancel_callback(self, callback):
self._execute_task(self._impl.add_on_cancel_callback, callback)
def __int__(self):
return self.channel_number
@property
def channel_number(self):
return self._impl.channel_number
@property
def is_closed(self):
return self._impl.is_closed
@property
def is_closing(self):
return self._impl.is_closing
@property
def is_open(self):
return self._impl.is_open
def close(self, reply_code=0, reply_text="Normal Shutdown"):
self._impl.close(reply_code=reply_code, reply_text=reply_text)
self._evt_closed.wait()
def _check_called_not_from_event_loop(self):
self._connection._check_called_not_from_event_loop()
def flow(self, active):
self._check_called_not_from_event_loop()
self._current_future = futurist.Future()
self._execute_task(
self._impl.flow, callback=self._current_future.set_result,
active=active
)
return self._current_future.result()
def basic_consume(self, # pylint: disable=R0913
consumer_callback,
queue,
no_ack=False,
exclusive=False,
consumer_tag=None,
arguments=None):
self._check_called_not_from_event_loop()
self._current_future = futurist.Future()
self._execute_task(
self._impl.add_callback, self._current_future.set_result,
replies=[pika_spec.Basic.ConsumeOk], one_shot=True
)
self._impl.add_callback(self._current_future.set_result,
replies=[pika_spec.Basic.ConsumeOk],
one_shot=True)
tag = self._execute_task(
self._impl.basic_consume,
consumer_callback=consumer_callback,
queue=queue,
no_ack=no_ack,
exclusive=exclusive,
consumer_tag=consumer_tag,
arguments=arguments
)
self._current_future.result()
return tag
def basic_cancel(self, consumer_tag):
self._check_called_not_from_event_loop()
self._current_future = futurist.Future()
self._execute_task(
self._impl.basic_cancel,
callback=self._current_future.set_result,
consumer_tag=consumer_tag,
nowait=False)
self._current_future.result()
def basic_ack(self, delivery_tag=0, multiple=False):
return self._execute_task(
self._impl.basic_ack, delivery_tag=delivery_tag, multiple=multiple)
def basic_nack(self, delivery_tag=None, multiple=False, requeue=True):
return self._execute_task(
self._impl.basic_nack, delivery_tag=delivery_tag,
multiple=multiple, requeue=requeue
)
def publish(self, exchange, routing_key, body, # pylint: disable=R0913
properties=None, mandatory=False, immediate=False):
if self._delivery_confirmation:
self._check_called_not_from_event_loop()
# In publisher-acknowledgments mode
self._message_returned = False
self._current_future = futurist.Future()
self._execute_task(self._impl.basic_publish,
exchange=exchange,
routing_key=routing_key,
body=body,
properties=properties,
mandatory=mandatory,
immediate=immediate)
conf_method = self._current_future.result().method
if isinstance(conf_method, pika_spec.Basic.Nack):
raise pika_exceptions.NackError((None,))
else:
assert isinstance(conf_method, pika_spec.Basic.Ack), (
conf_method)
if self._message_returned:
raise pika_exceptions.UnroutableError((None,))
else:
# In non-publisher-acknowledgments mode
self._execute_task(self._impl.basic_publish,
exchange=exchange,
routing_key=routing_key,
body=body,
properties=properties,
mandatory=mandatory,
immediate=immediate)
def basic_qos(self, prefetch_size=0, prefetch_count=0, all_channels=False):
self._check_called_not_from_event_loop()
self._current_future = futurist.Future()
self._execute_task(self._impl.basic_qos,
callback=self._current_future.set_result,
prefetch_size=prefetch_size,
prefetch_count=prefetch_count,
all_channels=all_channels)
self._current_future.result()
def basic_recover(self, requeue=False):
self._check_called_not_from_event_loop()
self._current_future = futurist.Future()
self._execute_task(
self._impl.basic_recover,
callback=lambda: self._current_future.set_result(None),
requeue=requeue
)
self._current_future.result()
def basic_reject(self, delivery_tag=None, requeue=True):
self._execute_task(self._impl.basic_reject,
delivery_tag=delivery_tag,
requeue=requeue)
def _on_message_returned(self, *args, **kwargs):
self._message_returned = True
def confirm_delivery(self):
self._check_called_not_from_event_loop()
self._current_future = futurist.Future()
self._execute_task(self._impl.add_callback,
callback=self._current_future.set_result,
replies=[pika_spec.Confirm.SelectOk],
one_shot=True)
self._execute_task(self._impl.confirm_delivery,
callback=self._on_message_confirmation,
nowait=False)
self._current_future.result()
self._delivery_confirmation = True
self._execute_task(self._impl.add_on_return_callback,
self._on_message_returned)
def exchange_declare(self, exchange=None, # pylint: disable=R0913
exchange_type='direct', passive=False, durable=False,
auto_delete=False, internal=False,
arguments=None, **kwargs):
self._check_called_not_from_event_loop()
self._current_future = futurist.Future()
self._execute_task(self._impl.exchange_declare,
callback=self._current_future.set_result,
exchange=exchange,
exchange_type=exchange_type,
passive=passive,
durable=durable,
auto_delete=auto_delete,
internal=internal,
nowait=False,
arguments=arguments,
type=kwargs["type"] if kwargs else None)
return self._current_future.result()
def exchange_delete(self, exchange=None, if_unused=False):
self._check_called_not_from_event_loop()
self._current_future = futurist.Future()
self._execute_task(self._impl.exchange_delete,
callback=self._current_future.set_result,
exchange=exchange,
if_unused=if_unused,
nowait=False)
return self._current_future.result()
def exchange_bind(self, destination=None, source=None, routing_key='',
arguments=None):
self._check_called_not_from_event_loop()
self._current_future = futurist.Future()
self._execute_task(self._impl.exchange_bind,
callback=self._current_future.set_result,
destination=destination,
source=source,
routing_key=routing_key,
nowait=False,
arguments=arguments)
return self._current_future.result()
def exchange_unbind(self, destination=None, source=None, routing_key='',
arguments=None):
self._check_called_not_from_event_loop()
self._current_future = futurist.Future()
self._execute_task(self._impl.exchange_unbind,
callback=self._current_future.set_result,
destination=destination,
source=source,
routing_key=routing_key,
nowait=False,
arguments=arguments)
return self._current_future.result()
def queue_declare(self, queue='', passive=False, durable=False,
exclusive=False, auto_delete=False,
arguments=None):
self._check_called_not_from_event_loop()
self._current_future = futurist.Future()
self._execute_task(self._impl.queue_declare,
callback=self._current_future.set_result,
queue=queue,
passive=passive,
durable=durable,
exclusive=exclusive,
auto_delete=auto_delete,
nowait=False,
arguments=arguments)
return self._current_future.result()
def queue_delete(self, queue='', if_unused=False, if_empty=False):
self._check_called_not_from_event_loop()
self._current_future = futurist.Future()
self._execute_task(self._impl.queue_delete,
callback=self._current_future.set_result,
queue=queue,
if_unused=if_unused,
if_empty=if_empty,
nowait=False)
return self._current_future.result()
def queue_purge(self, queue=''):
self._check_called_not_from_event_loop()
self._current_future = futurist.Future()
self._execute_task(self._impl.queue_purge,
callback=self._current_future.set_result,
queue=queue,
nowait=False)
return self._current_future.result()
def queue_bind(self, queue, exchange, routing_key=None,
arguments=None):
self._check_called_not_from_event_loop()
self._current_future = futurist.Future()
self._execute_task(self._impl.queue_bind,
callback=self._current_future.set_result,
queue=queue,
exchange=exchange,
routing_key=routing_key,
nowait=False,
arguments=arguments)
return self._current_future.result()
def queue_unbind(self, queue='', exchange=None, routing_key=None,
arguments=None):
self._check_called_not_from_event_loop()
self._current_future = futurist.Future()
self._execute_task(self._impl.queue_unbind,
callback=self._current_future.set_result,
queue=queue,
exchange=exchange,
routing_key=routing_key,
arguments=arguments)
return self._current_future.result()

View File

@ -1,307 +0,0 @@
# Copyright 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import random
import socket
import threading
import time
from oslo_config import cfg
import pika
from pika import credentials as pika_credentials
from oslo_messaging._drivers.pika_driver import pika_commons as pika_drv_cmns
from oslo_messaging._drivers.pika_driver import pika_connection
from oslo_messaging._drivers.pika_driver import pika_exceptions as pika_drv_exc
LOG = logging.getLogger(__name__)
# constant for setting tcp_user_timeout socket option
# (it should be defined in 'select' module of standard library in future)
TCP_USER_TIMEOUT = 18
# constants for creating connection statistics
HOST_CONNECTION_LAST_TRY_TIME = "last_try_time"
HOST_CONNECTION_LAST_SUCCESS_TRY_TIME = "last_success_try_time"
pika_opts = [
cfg.IntOpt('channel_max',
help='Maximum number of channels to allow'),
cfg.IntOpt('frame_max',
help='The maximum byte size for an AMQP frame'),
cfg.IntOpt('heartbeat_interval', default=3,
help="How often to send heartbeats for consumer's connections"),
cfg.BoolOpt('ssl',
help='Enable SSL'),
cfg.DictOpt('ssl_options',
help='Arguments passed to ssl.wrap_socket'),
cfg.FloatOpt('socket_timeout', default=0.25,
help="Set socket timeout in seconds for connection's socket"),
cfg.FloatOpt('tcp_user_timeout', default=0.25,
help="Set TCP_USER_TIMEOUT in seconds for connection's "
"socket"),
cfg.FloatOpt('host_connection_reconnect_delay', default=0.25,
help="Set delay for reconnection to some host which has "
"connection error"),
cfg.StrOpt('connection_factory', default="single",
choices=["new", "single", "read_write"],
help='Connection factory implementation')
]
class PikaConnectionFactory(object):
def __init__(self, url, conf):
self._url = url
self._conf = conf
self._connection_lock = threading.RLock()
if not url.hosts:
raise ValueError("You should provide at least one RabbitMQ host")
# initializing connection parameters for configured RabbitMQ hosts
self._common_pika_params = {
'virtual_host': url.virtual_host,
'channel_max': conf.oslo_messaging_pika.channel_max,
'frame_max': conf.oslo_messaging_pika.frame_max,
'ssl': conf.oslo_messaging_pika.ssl,
'ssl_options': conf.oslo_messaging_pika.ssl_options,
'socket_timeout': conf.oslo_messaging_pika.socket_timeout
}
self._host_list = url.hosts
self._heartbeat_interval = conf.oslo_messaging_pika.heartbeat_interval
self._host_connection_reconnect_delay = (
conf.oslo_messaging_pika.host_connection_reconnect_delay
)
self._tcp_user_timeout = conf.oslo_messaging_pika.tcp_user_timeout
self._connection_host_status = {}
self._cur_connection_host_num = random.randint(
0, len(url.hosts) - 1
)
def cleanup(self):
pass
def create_connection(self, for_listening=False):
"""Create and return connection to any available host.
:return: created connection
:raise: ConnectionException if all hosts are not reachable
"""
with self._connection_lock:
host_count = len(self._host_list)
connection_attempts = host_count
while connection_attempts > 0:
self._cur_connection_host_num += 1
self._cur_connection_host_num %= host_count
try:
return self._create_host_connection(
self._cur_connection_host_num, for_listening
)
except pika_drv_cmns.PIKA_CONNECTIVITY_ERRORS as e:
LOG.warning("Can't establish connection to host. %s", e)
except pika_drv_exc.HostConnectionNotAllowedException as e:
LOG.warning("Connection to host is not allowed. %s", e)
connection_attempts -= 1
raise pika_drv_exc.EstablishConnectionException(
"Can not establish connection to any configured RabbitMQ "
"host: " + str(self._host_list)
)
def _set_tcp_user_timeout(self, s):
if not self._tcp_user_timeout:
return
try:
s.setsockopt(
socket.IPPROTO_TCP, TCP_USER_TIMEOUT,
int(self._tcp_user_timeout * 1000)
)
except socket.error:
LOG.warning(
"Whoops, this kernel doesn't seem to support TCP_USER_TIMEOUT."
)
def _create_host_connection(self, host_index, for_listening):
"""Create new connection to host #host_index
:param host_index: Integer, number of host for connection establishing
:param for_listening: Boolean, creates connection for listening
if True
:return: New connection
"""
host = self._host_list[host_index]
cur_time = time.time()
host_connection_status = self._connection_host_status.get(host)
if host_connection_status is None:
host_connection_status = {
HOST_CONNECTION_LAST_SUCCESS_TRY_TIME: 0,
HOST_CONNECTION_LAST_TRY_TIME: 0
}
self._connection_host_status[host] = host_connection_status
last_success_time = host_connection_status[
HOST_CONNECTION_LAST_SUCCESS_TRY_TIME
]
last_time = host_connection_status[
HOST_CONNECTION_LAST_TRY_TIME
]
# raise HostConnectionNotAllowedException if we tried to establish
# connection in last 'host_connection_reconnect_delay' and got
# failure
if (last_time != last_success_time and
cur_time - last_time <
self._host_connection_reconnect_delay):
raise pika_drv_exc.HostConnectionNotAllowedException(
"Connection to host #{} is not allowed now because of "
"previous failure".format(host_index)
)
try:
connection = self._do_create_host_connection(
host, for_listening
)
self._connection_host_status[host][
HOST_CONNECTION_LAST_SUCCESS_TRY_TIME
] = cur_time
return connection
finally:
self._connection_host_status[host][
HOST_CONNECTION_LAST_TRY_TIME
] = cur_time
def _do_create_host_connection(self, host, for_listening):
connection_params = pika.ConnectionParameters(
host=host.hostname,
port=host.port,
credentials=pika_credentials.PlainCredentials(
host.username, host.password
),
heartbeat_interval=(
self._heartbeat_interval if for_listening else None
),
**self._common_pika_params
)
if for_listening:
connection = pika_connection.ThreadSafePikaConnection(
parameters=connection_params
)
else:
connection = pika.BlockingConnection(
parameters=connection_params
)
connection.params = connection_params
self._set_tcp_user_timeout(connection._impl.socket)
return connection
class NotClosableConnection(object):
def __init__(self, connection):
self._connection = connection
def __getattr__(self, item):
return getattr(self._connection, item)
def close(self):
pass
class SinglePikaConnectionFactory(PikaConnectionFactory):
def __init__(self, url, conf):
super(SinglePikaConnectionFactory, self).__init__(url, conf)
self._connection = None
def create_connection(self, for_listening=False):
with self._connection_lock:
if self._connection is None or not self._connection.is_open:
self._connection = (
super(SinglePikaConnectionFactory, self).create_connection(
True
)
)
return NotClosableConnection(self._connection)
def cleanup(self):
with self._connection_lock:
if self._connection is not None and self._connection.is_open:
try:
self._connection.close()
except Exception:
LOG.warning(
"Unexpected exception during connection closing",
exc_info=True
)
self._connection = None
class ReadWritePikaConnectionFactory(PikaConnectionFactory):
def __init__(self, url, conf):
super(ReadWritePikaConnectionFactory, self).__init__(url, conf)
self._read_connection = None
self._write_connection = None
def create_connection(self, for_listening=False):
with self._connection_lock:
if for_listening:
if (self._read_connection is None or
not self._read_connection.is_open):
self._read_connection = super(
ReadWritePikaConnectionFactory, self
).create_connection(True)
return NotClosableConnection(self._read_connection)
else:
if (self._write_connection is None or
not self._write_connection.is_open):
self._write_connection = super(
ReadWritePikaConnectionFactory, self
).create_connection(True)
return NotClosableConnection(self._write_connection)
def cleanup(self):
with self._connection_lock:
if (self._read_connection is not None and
self._read_connection.is_open):
try:
self._read_connection.close()
except Exception:
LOG.warning(
"Unexpected exception during connection closing",
exc_info=True
)
self._read_connection = None
if (self._write_connection is not None and
self._write_connection.is_open):
try:
self._write_connection.close()
except Exception:
LOG.warning(
"Unexpected exception during connection closing",
exc_info=True
)
self._write_connection = None

View File

@ -1,303 +0,0 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import os
import threading
import uuid
from oslo_utils import eventletutils
import pika_pool
from stevedore import driver
from oslo_messaging._drivers.pika_driver import pika_commons as pika_drv_cmns
from oslo_messaging._drivers.pika_driver import pika_exceptions as pika_drv_exc
LOG = logging.getLogger(__name__)
class _PooledConnectionWithConfirmations(pika_pool.Connection):
"""Derived from 'pika_pool.Connection' and extends its logic - adds
'confirm_delivery' call after channel creation to enable delivery
confirmation for channel
"""
@property
def channel(self):
if self.fairy.channel is None:
self.fairy.channel = self.fairy.cxn.channel()
self.fairy.channel.confirm_delivery()
return self.fairy.channel
class PikaEngine(object):
"""Used for shared functionality between other pika driver modules, like
connection factory, connection pools, processing and holding configuration,
etc.
"""
def __init__(self, conf, url, default_exchange=None,
allowed_remote_exmods=None):
self.conf = conf
self.url = url
self._connection_factory_type = (
self.conf.oslo_messaging_pika.connection_factory
)
self._connection_factory = None
self._connection_without_confirmation_pool = None
self._connection_with_confirmation_pool = None
self._pid = None
self._init_lock = threading.Lock()
self.host_connection_reconnect_delay = (
conf.oslo_messaging_pika.host_connection_reconnect_delay
)
# processing rpc options
self.default_rpc_exchange = (
conf.oslo_messaging_pika.default_rpc_exchange
)
self.rpc_reply_exchange = (
conf.oslo_messaging_pika.rpc_reply_exchange
)
self.allowed_remote_exmods = [pika_drv_cmns.EXCEPTIONS_MODULE]
if allowed_remote_exmods:
self.allowed_remote_exmods.extend(allowed_remote_exmods)
self.rpc_listener_prefetch_count = (
conf.oslo_messaging_pika.rpc_listener_prefetch_count
)
self.default_rpc_retry_attempts = (
conf.oslo_messaging_pika.default_rpc_retry_attempts
)
self.rpc_retry_delay = (
conf.oslo_messaging_pika.rpc_retry_delay
)
if self.rpc_retry_delay < 0:
raise ValueError("rpc_retry_delay should be non-negative integer")
self.rpc_reply_listener_prefetch_count = (
conf.oslo_messaging_pika.rpc_listener_prefetch_count
)
self.rpc_reply_retry_attempts = (
conf.oslo_messaging_pika.rpc_reply_retry_attempts
)
self.rpc_reply_retry_delay = (
conf.oslo_messaging_pika.rpc_reply_retry_delay
)
if self.rpc_reply_retry_delay < 0:
raise ValueError("rpc_reply_retry_delay should be non-negative "
"integer")
self.rpc_queue_expiration = (
self.conf.oslo_messaging_pika.rpc_queue_expiration
)
# processing notification options
self.default_notification_exchange = (
conf.oslo_messaging_pika.default_notification_exchange
)
self.notification_persistence = (
conf.oslo_messaging_pika.notification_persistence
)
self.notification_listener_prefetch_count = (
conf.oslo_messaging_pika.notification_listener_prefetch_count
)
self.default_notification_retry_attempts = (
conf.oslo_messaging_pika.default_notification_retry_attempts
)
if self.default_notification_retry_attempts is None:
raise ValueError("default_notification_retry_attempts should be "
"an integer")
self.notification_retry_delay = (
conf.oslo_messaging_pika.notification_retry_delay
)
if (self.notification_retry_delay is None or
self.notification_retry_delay < 0):
raise ValueError("notification_retry_delay should be non-negative "
"integer")
self.default_content_type = (
'application/' + conf.oslo_messaging_pika.default_serializer_type
)
def _init_if_needed(self):
cur_pid = os.getpid()
if self._pid == cur_pid:
return
with self._init_lock:
if self._pid == cur_pid:
return
if self._pid:
LOG.warning("New pid is detected. Old: %s, new: %s. "
"Cleaning up...", self._pid, cur_pid)
# Note(dukhlov): we need to force select poller usage in case
# when 'thread' module is monkey patched becase current
# eventlet implementation does not support patching of
# poll/epoll/kqueue
if eventletutils.is_monkey_patched("thread"):
from pika.adapters import select_connection
select_connection.SELECT_TYPE = "select"
mgr = driver.DriverManager(
'oslo.messaging.pika.connection_factory',
self._connection_factory_type
)
self._connection_factory = mgr.driver(self.url, self.conf)
# initializing 2 connection pools: 1st for connections without
# confirmations, 2nd - with confirmations
self._connection_without_confirmation_pool = pika_pool.QueuedPool(
create=self.create_connection,
max_size=self.conf.oslo_messaging_pika.pool_max_size,
max_overflow=self.conf.oslo_messaging_pika.pool_max_overflow,
timeout=self.conf.oslo_messaging_pika.pool_timeout,
recycle=self.conf.oslo_messaging_pika.pool_recycle,
stale=self.conf.oslo_messaging_pika.pool_stale,
)
self._connection_with_confirmation_pool = pika_pool.QueuedPool(
create=self.create_connection,
max_size=self.conf.oslo_messaging_pika.pool_max_size,
max_overflow=self.conf.oslo_messaging_pika.pool_max_overflow,
timeout=self.conf.oslo_messaging_pika.pool_timeout,
recycle=self.conf.oslo_messaging_pika.pool_recycle,
stale=self.conf.oslo_messaging_pika.pool_stale,
)
self._connection_with_confirmation_pool.Connection = (
_PooledConnectionWithConfirmations
)
self._pid = cur_pid
def create_connection(self, for_listening=False):
self._init_if_needed()
return self._connection_factory.create_connection(for_listening)
@property
def connection_without_confirmation_pool(self):
self._init_if_needed()
return self._connection_without_confirmation_pool
@property
def connection_with_confirmation_pool(self):
self._init_if_needed()
return self._connection_with_confirmation_pool
def cleanup(self):
if self._connection_factory:
self._connection_factory.cleanup()
def declare_exchange_by_channel(self, channel, exchange, exchange_type,
durable):
"""Declare exchange using already created channel, if they don't exist
:param channel: Channel for communication with RabbitMQ
:param exchange: String, RabbitMQ exchange name
:param exchange_type: String ('direct', 'topic' or 'fanout')
exchange type for exchange to be declared
:param durable: Boolean, creates durable exchange if true
"""
try:
channel.exchange_declare(
exchange, exchange_type, auto_delete=True, durable=durable
)
except pika_drv_cmns.PIKA_CONNECTIVITY_ERRORS as e:
raise pika_drv_exc.ConnectionException(
"Connectivity problem detected during declaring exchange: "
"exchange:{}, exchange_type: {}, durable: {}. {}".format(
exchange, exchange_type, durable, str(e)
)
)
def declare_queue_binding_by_channel(self, channel, exchange, queue,
routing_key, exchange_type,
queue_expiration, durable):
"""Declare exchange, queue and bind them using already created
channel, if they don't exist
:param channel: Channel for communication with RabbitMQ
:param exchange: String, RabbitMQ exchange name
:param queue: Sting, RabbitMQ queue name
:param routing_key: Sting, RabbitMQ routing key for queue binding
:param exchange_type: String ('direct', 'topic' or 'fanout')
exchange type for exchange to be declared
:param queue_expiration: Integer, time in seconds which queue will
remain existing in RabbitMQ when there no consumers connected
:param durable: Boolean, creates durable exchange and queue if true
"""
try:
channel.exchange_declare(
exchange, exchange_type, auto_delete=True, durable=durable
)
arguments = {}
if queue_expiration > 0:
arguments['x-expires'] = queue_expiration * 1000
channel.queue_declare(queue, durable=durable, arguments=arguments)
channel.queue_bind(queue, exchange, routing_key)
except pika_drv_cmns.PIKA_CONNECTIVITY_ERRORS as e:
raise pika_drv_exc.ConnectionException(
"Connectivity problem detected during declaring queue "
"binding: exchange:{}, queue: {}, routing_key: {}, "
"exchange_type: {}, queue_expiration: {}, "
"durable: {}. {}".format(
exchange, queue, routing_key, exchange_type,
queue_expiration, durable, str(e)
)
)
def get_rpc_exchange_name(self, exchange):
"""Returns RabbitMQ exchange name for given rpc request
:param exchange: String, oslo.messaging target's exchange
:return: String, RabbitMQ exchange name
"""
return exchange or self.default_rpc_exchange
@staticmethod
def get_rpc_queue_name(topic, server, no_ack, worker=False):
"""Returns RabbitMQ queue name for given rpc request
:param topic: String, oslo.messaging target's topic
:param server: String, oslo.messaging target's server
:param no_ack: Boolean, use message delivery with acknowledges or not
:param worker: Boolean, use queue by single worker only or not
:return: String, RabbitMQ queue name
"""
queue_parts = ["no_ack" if no_ack else "with_ack", topic]
if server is not None:
queue_parts.append(server)
if worker:
queue_parts.append("worker")
queue_parts.append(uuid.uuid4().hex)
queue = '.'.join(queue_parts)
return queue

View File

@ -1,68 +0,0 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_messaging import exceptions
class ExchangeNotFoundException(exceptions.MessageDeliveryFailure):
"""Is raised if specified exchange is not found in RabbitMQ."""
pass
class MessageRejectedException(exceptions.MessageDeliveryFailure):
"""Is raised if message which you are trying to send was nacked by RabbitMQ
it may happen if RabbitMQ is not able to process message
"""
pass
class RoutingException(exceptions.MessageDeliveryFailure):
"""Is raised if message can not be delivered to any queue. Usually it means
that any queue is not binded to given exchange with given routing key.
Raised if 'mandatory' flag specified only
"""
pass
class ConnectionException(exceptions.MessagingException):
"""Is raised if some operation can not be performed due to connectivity
problem
"""
pass
class TimeoutConnectionException(ConnectionException):
"""Is raised if socket timeout was expired during network interaction"""
pass
class EstablishConnectionException(ConnectionException):
"""Is raised if we have some problem during establishing connection
procedure
"""
pass
class HostConnectionNotAllowedException(EstablishConnectionException):
"""Is raised in case of try to establish connection to temporary
not allowed host (because of reconnection policy for example)
"""
pass
class UnsupportedDriverVersion(exceptions.MessagingException):
"""Is raised when message is received but was sent by different,
not supported driver version
"""
pass

View File

@ -1,123 +0,0 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import threading
import uuid
from concurrent import futures
from oslo_log import log as logging
from oslo_messaging._drivers.pika_driver import pika_poller as pika_drv_poller
LOG = logging.getLogger(__name__)
class RpcReplyPikaListener(object):
"""Provide functionality for listening RPC replies. Create and handle
reply poller and coroutine for performing polling job
"""
def __init__(self, pika_engine):
super(RpcReplyPikaListener, self).__init__()
self._pika_engine = pika_engine
# preparing poller for listening replies
self._reply_queue = None
self._reply_poller = None
self._reply_waiting_futures = {}
self._reply_consumer_initialized = False
self._reply_consumer_initialization_lock = threading.Lock()
self._shutdown = False
def get_reply_qname(self):
"""As result return reply queue name, shared for whole process,
but before this check is RPC listener initialized or not and perform
initialization if needed
:return: String, queue name which hould be used for reply sending
"""
if self._reply_consumer_initialized:
return self._reply_queue
with self._reply_consumer_initialization_lock:
if self._reply_consumer_initialized:
return self._reply_queue
# generate reply queue name if needed
if self._reply_queue is None:
self._reply_queue = "reply.{}.{}.{}".format(
self._pika_engine.conf.project,
self._pika_engine.conf.prog, uuid.uuid4().hex
)
# initialize reply poller if needed
if self._reply_poller is None:
self._reply_poller = pika_drv_poller.RpcReplyPikaPoller(
self._pika_engine, self._pika_engine.rpc_reply_exchange,
self._reply_queue, 1, None,
self._pika_engine.rpc_reply_listener_prefetch_count
)
self._reply_poller.start(self._on_incoming)
self._reply_consumer_initialized = True
return self._reply_queue
def _on_incoming(self, incoming):
"""Reply polling job. Poll replies in infinite loop and notify
registered features
"""
for message in incoming:
try:
message.acknowledge()
future = self._reply_waiting_futures.pop(
message.msg_id, None
)
if future is not None:
future.set_result(message)
except Exception:
LOG.exception("Unexpected exception during processing"
"reply message")
def register_reply_waiter(self, msg_id):
"""Register reply waiter. Should be called before message sending to
the server
:param msg_id: String, message_id of expected reply
:return future: Future, container for expected reply to be returned
over
"""
future = futures.Future()
self._reply_waiting_futures[msg_id] = future
return future
def unregister_reply_waiter(self, msg_id):
"""Unregister reply waiter. Should be called if client has not got
reply and doesn't want to continue waiting (if timeout_expired for
example)
:param msg_id:
"""
self._reply_waiting_futures.pop(msg_id, None)
def cleanup(self):
"""Stop replies consuming and cleanup resources"""
self._shutdown = True
if self._reply_poller:
self._reply_poller.stop()
self._reply_poller.cleanup()
self._reply_poller = None
self._reply_queue = None

View File

@ -1,618 +0,0 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import socket
import time
import traceback
import uuid
from concurrent import futures
from oslo_log import log as logging
from oslo_utils import importutils
from oslo_utils import timeutils
from pika import exceptions as pika_exceptions
from pika import spec as pika_spec
import pika_pool
import six
import tenacity
import oslo_messaging
from oslo_messaging._drivers import base
from oslo_messaging._drivers.pika_driver import pika_commons as pika_drv_cmns
from oslo_messaging._drivers.pika_driver import pika_exceptions as pika_drv_exc
from oslo_messaging import _utils as utils
from oslo_messaging import exceptions
LOG = logging.getLogger(__name__)
_VERSION_HEADER = "version"
_VERSION = "1.0"
class RemoteExceptionMixin(object):
"""Used for constructing dynamic exception type during deserialization of
remote exception. It defines unified '__init__' method signature and
exception message format
"""
def __init__(self, module, clazz, message, trace):
"""Store serialized data
:param module: String, module name for importing original exception
class of serialized remote exception
:param clazz: String, original class name of serialized remote
exception
:param message: String, original message of serialized remote
exception
:param trace: String, original trace of serialized remote exception
"""
self.module = module
self.clazz = clazz
self.message = message
self.trace = trace
self._str_msgs = message + "\n" + "\n".join(trace)
def __str__(self):
return self._str_msgs
class PikaIncomingMessage(base.IncomingMessage):
"""Driver friendly adapter for received message. Extract message
information from RabbitMQ message and provide access to it
"""
def __init__(self, pika_engine, channel, method, properties, body):
"""Parse RabbitMQ message
:param pika_engine: PikaEngine, shared object with configuration and
shared driver functionality
:param channel: Channel, RabbitMQ channel which was used for
this message delivery, used for sending ack back.
If None - ack is not required
:param method: Method, RabbitMQ message method
:param properties: Properties, RabbitMQ message properties
:param body: Bytes, RabbitMQ message body
"""
headers = getattr(properties, "headers", {})
version = headers.get(_VERSION_HEADER, None)
if not utils.version_is_compatible(version, _VERSION):
raise pika_drv_exc.UnsupportedDriverVersion(
"Message's version: {} is not compatible with driver version: "
"{}".format(version, _VERSION))
self._pika_engine = pika_engine
self._channel = channel
self._delivery_tag = method.delivery_tag
self._version = version
self._content_type = properties.content_type
self.unique_id = properties.message_id
self.expiration_time = (
None if properties.expiration is None else
time.time() + float(properties.expiration) / 1000
)
try:
serializer = pika_drv_cmns.MESSAGE_SERIALIZERS[self._content_type]
except KeyError:
raise NotImplementedError(
"Content-type['{}'] is not supported.".format(
self._content_type
)
)
message_dict = serializer.load_from_bytes(body)
context_dict = {}
for key in list(message_dict.keys()):
key = six.text_type(key)
if key.startswith('_$_'):
value = message_dict.pop(key)
context_dict[key[3:]] = value
super(PikaIncomingMessage, self).__init__(context_dict, message_dict)
def need_ack(self):
return self._channel is not None
def acknowledge(self):
"""Ack the message. Should be called by message processing logic when
it considered as consumed (means that we don't need redelivery of this
message anymore)
"""
if self.need_ack():
self._channel.basic_ack(delivery_tag=self._delivery_tag)
def requeue(self):
"""Rollback the message. Should be called by message processing logic
when it can not process the message right now and should be redelivered
later if it is possible
"""
if self.need_ack():
return self._channel.basic_nack(delivery_tag=self._delivery_tag,
requeue=True)
class RpcPikaIncomingMessage(PikaIncomingMessage, base.RpcIncomingMessage):
"""PikaIncomingMessage implementation for RPC messages. It expects
extra RPC related fields in message body (msg_id and reply_q). Also 'reply'
method added to allow consumer to send RPC reply back to the RPC client
"""
def __init__(self, pika_engine, channel, method, properties, body):
"""Defines default values of msg_id and reply_q fields and just call
super.__init__ method
:param pika_engine: PikaEngine, shared object with configuration and
shared driver functionality
:param channel: Channel, RabbitMQ channel which was used for
this message delivery, used for sending ack back.
If None - ack is not required
:param method: Method, RabbitMQ message method
:param properties: Properties, RabbitMQ message properties
:param body: Bytes, RabbitMQ message body
"""
super(RpcPikaIncomingMessage, self).__init__(
pika_engine, channel, method, properties, body
)
self.reply_q = properties.reply_to
self.msg_id = properties.correlation_id
def reply(self, reply=None, failure=None):
"""Send back reply to the RPC client
:param reply: Dictionary, reply. In case of exception should be None
:param failure: Tuple, should be a sys.exc_info() tuple.
Should be None if RPC request was successfully processed.
:return RpcReplyPikaIncomingMessage: message with reply
"""
if self.reply_q is None:
return
reply_outgoing_message = RpcReplyPikaOutgoingMessage(
self._pika_engine, self.msg_id, reply=reply, failure_info=failure,
content_type=self._content_type,
)
def on_exception(ex):
if isinstance(ex, pika_drv_exc.ConnectionException):
LOG.warning(
"Connectivity related problem during reply sending. %s",
ex
)
return True
else:
return False
if self._pika_engine.rpc_reply_retry_attempts:
retrier = tenacity.retry(
stop=(
tenacity.stop_never
if self._pika_engine.rpc_reply_retry_attempts == -1 else
tenacity.stop_after_attempt(
self._pika_engine.rpc_reply_retry_attempts
)
),
retry=tenacity.retry_if_exception(on_exception),
wait=tenacity.wait_fixed(
self._pika_engine.rpc_reply_retry_delay
)
)
else:
retrier = None
try:
timeout = (None if self.expiration_time is None else
max(self.expiration_time - time.time(), 0))
with timeutils.StopWatch(duration=timeout) as stopwatch:
reply_outgoing_message.send(
reply_q=self.reply_q,
stopwatch=stopwatch,
retrier=retrier
)
LOG.debug(
"Message [id:'%s'] replied to '%s'.", self.msg_id, self.reply_q
)
except Exception:
LOG.exception(
"Message [id:'%s'] wasn't replied to : %s", self.msg_id,
self.reply_q
)
class RpcReplyPikaIncomingMessage(PikaIncomingMessage):
"""PikaIncomingMessage implementation for RPC reply messages. It expects
extra RPC reply related fields in message body (result and failure).
"""
def __init__(self, pika_engine, channel, method, properties, body):
"""Defines default values of result and failure fields, call
super.__init__ method and then construct Exception object if failure is
not None
:param pika_engine: PikaEngine, shared object with configuration and
shared driver functionality
:param channel: Channel, RabbitMQ channel which was used for
this message delivery, used for sending ack back.
If None - ack is not required
:param method: Method, RabbitMQ message method
:param properties: Properties, RabbitMQ message properties
:param body: Bytes, RabbitMQ message body
"""
super(RpcReplyPikaIncomingMessage, self).__init__(
pika_engine, channel, method, properties, body
)
self.msg_id = properties.correlation_id
self.result = self.message.get("s", None)
self.failure = self.message.get("e", None)
if self.failure is not None:
trace = self.failure.get('t', [])
message = self.failure.get('s', "")
class_name = self.failure.get('c')
module_name = self.failure.get('m')
res_exc = None
if module_name in pika_engine.allowed_remote_exmods:
try:
module = importutils.import_module(module_name)
klass = getattr(module, class_name)
ex_type = type(
klass.__name__,
(RemoteExceptionMixin, klass),
{}
)
res_exc = ex_type(module_name, class_name, message, trace)
except ImportError as e:
LOG.warning(
"Can not deserialize remote exception [module:%s, "
"class:%s]. %s", module_name, class_name, e
)
# if we have not processed failure yet, use RemoteError class
if res_exc is None:
res_exc = oslo_messaging.RemoteError(
class_name, message, trace
)
self.failure = res_exc
class PikaOutgoingMessage(object):
"""Driver friendly adapter for sending message. Construct RabbitMQ message
and send it
"""
def __init__(self, pika_engine, message, context, content_type=None):
"""Parse RabbitMQ message
:param pika_engine: PikaEngine, shared object with configuration and
shared driver functionality
:param message: Dictionary, user's message fields
:param context: Dictionary, request context's fields
:param content_type: String, content-type header, defines serialization
mechanism, if None default content-type from pika_engine is used
"""
self._pika_engine = pika_engine
self._content_type = (
content_type if content_type is not None else
self._pika_engine.default_content_type
)
try:
self._serializer = pika_drv_cmns.MESSAGE_SERIALIZERS[
self._content_type
]
except KeyError:
raise NotImplementedError(
"Content-type['{}'] is not supported.".format(
self._content_type
)
)
self.message = message
self.context = context
self.unique_id = uuid.uuid4().hex
def _prepare_message_to_send(self):
"""Combine user's message fields an system fields (_unique_id,
context's data etc)
"""
msg = self.message.copy()
if self.context:
for key, value in self.context.items():
key = six.text_type(key)
msg['_$_' + key] = value
props = pika_spec.BasicProperties(
content_type=self._content_type,
headers={_VERSION_HEADER: _VERSION},
message_id=self.unique_id,
)
return msg, props
@staticmethod
def _publish(pool, exchange, routing_key, body, properties, mandatory,
stopwatch):
"""Execute pika publish method using connection from connection pool
Also this message catches all pika related exceptions and raise
oslo.messaging specific exceptions
:param pool: Pool, pika connection pool for connection choosing
:param exchange: String, RabbitMQ exchange name for message sending
:param routing_key: String, RabbitMQ routing key for message routing
:param body: Bytes, RabbitMQ message payload
:param properties: Properties, RabbitMQ message properties
:param mandatory: Boolean, RabbitMQ publish mandatory flag (raise
exception if it is not possible to deliver message to any queue)
:param stopwatch: StopWatch, stopwatch object for calculating
allowed timeouts
"""
if stopwatch.expired():
raise exceptions.MessagingTimeout(
"Timeout for current operation was expired."
)
try:
timeout = stopwatch.leftover(return_none=True)
with pool.acquire(timeout=timeout) as conn:
if timeout is not None:
properties.expiration = str(int(timeout * 1000))
conn.channel.publish(
exchange=exchange,
routing_key=routing_key,
body=body,
properties=properties,
mandatory=mandatory
)
except pika_exceptions.NackError as e:
raise pika_drv_exc.MessageRejectedException(
"Can not send message: [body: {}], properties: {}] to "
"target [exchange: {}, routing_key: {}]. {}".format(
body, properties, exchange, routing_key, str(e)
)
)
except pika_exceptions.UnroutableError as e:
raise pika_drv_exc.RoutingException(
"Can not deliver message:[body:{}, properties: {}] to any "
"queue using target: [exchange:{}, "
"routing_key:{}]. {}".format(
body, properties, exchange, routing_key, str(e)
)
)
except pika_pool.Timeout as e:
raise exceptions.MessagingTimeout(
"Timeout for current operation was expired. {}".format(str(e))
)
except pika_pool.Connection.connectivity_errors as e:
if (isinstance(e, pika_exceptions.ChannelClosed)
and e.args and e.args[0] == 404):
raise pika_drv_exc.ExchangeNotFoundException(
"Attempt to send message to not existing exchange "
"detected, message: [body:{}, properties: {}], target: "
"[exchange:{}, routing_key:{}]. {}".format(
body, properties, exchange, routing_key, str(e)
)
)
raise pika_drv_exc.ConnectionException(
"Connectivity problem detected during sending the message: "
"[body:{}, properties: {}] to target: [exchange:{}, "
"routing_key:{}]. {}".format(
body, properties, exchange, routing_key, str(e)
)
)
except socket.timeout:
raise pika_drv_exc.TimeoutConnectionException(
"Socket timeout exceeded."
)
def _do_send(self, exchange, routing_key, msg_dict, msg_props,
confirm=True, mandatory=True, persistent=False,
stopwatch=pika_drv_cmns.INFINITE_STOP_WATCH, retrier=None):
"""Send prepared message with configured retrying
:param exchange: String, RabbitMQ exchange name for message sending
:param routing_key: String, RabbitMQ routing key for message routing
:param msg_dict: Dictionary, message payload
:param msg_props: Properties, message properties
:param confirm: Boolean, enable publisher confirmation if True
:param mandatory: Boolean, RabbitMQ publish mandatory flag (raise
exception if it is not possible to deliver message to any queue)
:param persistent: Boolean, send persistent message if True, works only
for routing into durable queues
:param stopwatch: StopWatch, stopwatch object for calculating
allowed timeouts
:param retrier: tenacity.Retrying, configured retrier object for
sending message, if None no retrying is performed
"""
msg_props.delivery_mode = 2 if persistent else 1
pool = (self._pika_engine.connection_with_confirmation_pool
if confirm else
self._pika_engine.connection_without_confirmation_pool)
body = self._serializer.dump_as_bytes(msg_dict)
LOG.debug(
"Sending message:[body:%s; properties: %s] to target: "
"[exchange:%s; routing_key:%s]", body, msg_props, exchange,
routing_key
)
publish = (self._publish if retrier is None else
retrier(self._publish))
return publish(pool, exchange, routing_key, body, msg_props,
mandatory, stopwatch)
def send(self, exchange, routing_key='', confirm=True, mandatory=True,
persistent=False, stopwatch=pika_drv_cmns.INFINITE_STOP_WATCH,
retrier=None):
"""Send message with configured retrying
:param exchange: String, RabbitMQ exchange name for message sending
:param routing_key: String, RabbitMQ routing key for message routing
:param confirm: Boolean, enable publisher confirmation if True
:param mandatory: Boolean, RabbitMQ publish mandatory flag (raise
exception if it is not possible to deliver message to any queue)
:param persistent: Boolean, send persistent message if True, works only
for routing into durable queues
:param stopwatch: StopWatch, stopwatch object for calculating
allowed timeouts
:param retrier: tenacity.Retrying, configured retrier object for
sending message, if None no retrying is performed
"""
msg_dict, msg_props = self._prepare_message_to_send()
return self._do_send(exchange, routing_key, msg_dict, msg_props,
confirm, mandatory, persistent,
stopwatch, retrier)
class RpcPikaOutgoingMessage(PikaOutgoingMessage):
"""PikaOutgoingMessage implementation for RPC messages. It adds
possibility to wait and receive RPC reply
"""
def __init__(self, pika_engine, message, context, content_type=None):
super(RpcPikaOutgoingMessage, self).__init__(
pika_engine, message, context, content_type
)
self.msg_id = None
self.reply_q = None
def send(self, exchange, routing_key, reply_listener=None,
stopwatch=pika_drv_cmns.INFINITE_STOP_WATCH, retrier=None):
"""Send RPC message with configured retrying
:param exchange: String, RabbitMQ exchange name for message sending
:param routing_key: String, RabbitMQ routing key for message routing
:param reply_listener: RpcReplyPikaListener, listener for waiting
reply. If None - return immediately without reply waiting
:param stopwatch: StopWatch, stopwatch object for calculating
allowed timeouts
:param retrier: tenacity.Retrying, configured retrier object for
sending message, if None no retrying is performed
"""
msg_dict, msg_props = self._prepare_message_to_send()
if reply_listener:
self.msg_id = uuid.uuid4().hex
msg_props.correlation_id = self.msg_id
LOG.debug('MSG_ID is %s', self.msg_id)
self.reply_q = reply_listener.get_reply_qname()
msg_props.reply_to = self.reply_q
future = reply_listener.register_reply_waiter(msg_id=self.msg_id)
self._do_send(
exchange=exchange, routing_key=routing_key, msg_dict=msg_dict,
msg_props=msg_props, confirm=True, mandatory=True,
persistent=False, stopwatch=stopwatch, retrier=retrier
)
try:
return future.result(stopwatch.leftover(return_none=True))
except BaseException as e:
reply_listener.unregister_reply_waiter(self.msg_id)
if isinstance(e, futures.TimeoutError):
e = exceptions.MessagingTimeout()
raise e
else:
self._do_send(
exchange=exchange, routing_key=routing_key, msg_dict=msg_dict,
msg_props=msg_props, confirm=True, mandatory=True,
persistent=False, stopwatch=stopwatch, retrier=retrier
)
class RpcReplyPikaOutgoingMessage(PikaOutgoingMessage):
"""PikaOutgoingMessage implementation for RPC reply messages. It sets
correlation_id AMQP property to link this reply with response
"""
def __init__(self, pika_engine, msg_id, reply=None, failure_info=None,
content_type=None):
"""Initialize with reply information for sending
:param pika_engine: PikaEngine, shared object with configuration and
shared driver functionality
:param msg_id: String, msg_id of RPC request, which waits for reply
:param reply: Dictionary, reply. In case of exception should be None
:param failure_info: Tuple, should be a sys.exc_info() tuple.
Should be None if RPC request was successfully processed.
:param content_type: String, content-type header, defines serialization
mechanism, if None default content-type from pika_engine is used
"""
self.msg_id = msg_id
if failure_info is not None:
ex_class = failure_info[0]
ex = failure_info[1]
tb = traceback.format_exception(*failure_info)
if issubclass(ex_class, RemoteExceptionMixin):
failure_data = {
'c': ex.clazz,
'm': ex.module,
's': ex.message,
't': tb
}
else:
failure_data = {
'c': six.text_type(ex_class.__name__),
'm': six.text_type(ex_class.__module__),
's': six.text_type(ex),
't': tb
}
msg = {'e': failure_data}
else:
msg = {'s': reply}
super(RpcReplyPikaOutgoingMessage, self).__init__(
pika_engine, msg, None, content_type
)
def send(self, reply_q, stopwatch=pika_drv_cmns.INFINITE_STOP_WATCH,
retrier=None):
"""Send RPC message with configured retrying
:param reply_q: String, queue name for sending reply
:param stopwatch: StopWatch, stopwatch object for calculating
allowed timeouts
:param retrier: tenacity.Retrying, configured retrier object for
sending message, if None no retrying is performed
"""
msg_dict, msg_props = self._prepare_message_to_send()
msg_props.correlation_id = self.msg_id
self._do_send(
exchange=self._pika_engine.rpc_reply_exchange, routing_key=reply_q,
msg_dict=msg_dict, msg_props=msg_props, confirm=True,
mandatory=True, persistent=False, stopwatch=stopwatch,
retrier=retrier
)

View File

@ -1,538 +0,0 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import threading
from oslo_log import log as logging
from oslo_service import loopingcall
from oslo_messaging._drivers import base
from oslo_messaging._drivers.pika_driver import pika_commons as pika_drv_cmns
from oslo_messaging._drivers.pika_driver import pika_exceptions as pika_drv_exc
from oslo_messaging._drivers.pika_driver import pika_message as pika_drv_msg
LOG = logging.getLogger(__name__)
class PikaPoller(base.Listener):
"""Provides user friendly functionality for RabbitMQ message consuming,
handles low level connectivity problems and restore connection if some
connectivity related problem detected
"""
def __init__(self, pika_engine, batch_size, batch_timeout, prefetch_count,
incoming_message_class):
"""Initialize required fields
:param pika_engine: PikaEngine, shared object with configuration and
shared driver functionality
:param batch_size: desired number of messages passed to
single on_incoming_callback call
:param batch_timeout: defines how long should we wait for batch_size
messages if we already have some messages waiting for processing
:param prefetch_count: Integer, maximum count of unacknowledged
messages which RabbitMQ broker sends to this consumer
:param incoming_message_class: PikaIncomingMessage, wrapper for
consumed RabbitMQ message
"""
super(PikaPoller, self).__init__(batch_size, batch_timeout,
prefetch_count)
self._pika_engine = pika_engine
self._incoming_message_class = incoming_message_class
self._connection = None
self._channel = None
self._recover_loopingcall = None
self._lock = threading.RLock()
self._cur_batch_buffer = None
self._cur_batch_timeout_id = None
self._started = False
self._closing_connection_by_poller = False
self._queues_to_consume = None
def _on_connection_close(self, connection, reply_code, reply_text):
self._deliver_cur_batch()
if self._closing_connection_by_poller:
return
with self._lock:
self._connection = None
self._start_recover_consuming_task()
def _on_channel_close(self, channel, reply_code, reply_text):
if self._cur_batch_buffer:
self._cur_batch_buffer = [
message for message in self._cur_batch_buffer
if not message.need_ack()
]
if self._closing_connection_by_poller:
return
with self._lock:
self._channel = None
self._start_recover_consuming_task()
def _on_consumer_cancel(self, method_frame):
with self._lock:
if self._queues_to_consume:
consumer_tag = method_frame.method.consumer_tag
for queue_info in self._queues_to_consume:
if queue_info["consumer_tag"] == consumer_tag:
queue_info["consumer_tag"] = None
self._start_recover_consuming_task()
def _on_message_no_ack_callback(self, unused, method, properties, body):
"""Is called by Pika when message was received from queue listened with
no_ack=True mode
"""
incoming_message = self._incoming_message_class(
self._pika_engine, None, method, properties, body
)
self._on_incoming_message(incoming_message)
def _on_message_with_ack_callback(self, unused, method, properties, body):
"""Is called by Pika when message was received from queue listened with
no_ack=False mode
"""
incoming_message = self._incoming_message_class(
self._pika_engine, self._channel, method, properties, body
)
self._on_incoming_message(incoming_message)
def _deliver_cur_batch(self):
if self._cur_batch_timeout_id is not None:
self._connection.remove_timeout(self._cur_batch_timeout_id)
self._cur_batch_timeout_id = None
if self._cur_batch_buffer:
buf_to_send = self._cur_batch_buffer
self._cur_batch_buffer = None
try:
self.on_incoming_callback(buf_to_send)
except Exception:
LOG.exception("Unexpected exception during incoming delivery")
def _on_incoming_message(self, incoming_message):
if self._cur_batch_buffer is None:
self._cur_batch_buffer = [incoming_message]
else:
self._cur_batch_buffer.append(incoming_message)
if len(self._cur_batch_buffer) >= self.batch_size:
self._deliver_cur_batch()
return
if self._cur_batch_timeout_id is None:
self._cur_batch_timeout_id = self._connection.add_timeout(
self.batch_timeout, self._deliver_cur_batch)
def _start_recover_consuming_task(self):
"""Start async job for checking connection to the broker."""
if self._recover_loopingcall is None and self._started:
self._recover_loopingcall = (
loopingcall.DynamicLoopingCall(
self._try_recover_consuming
)
)
LOG.info("Starting recover consuming job for listener: %s", self)
self._recover_loopingcall.start()
def _try_recover_consuming(self):
with self._lock:
try:
if self._started:
self._start_or_recover_consuming()
except pika_drv_exc.EstablishConnectionException as e:
LOG.warning(
"Problem during establishing connection for pika "
"poller %s", e, exc_info=True
)
return self._pika_engine.host_connection_reconnect_delay
except pika_drv_exc.ConnectionException as e:
LOG.warning(
"Connectivity exception during starting/recovering pika "
"poller %s", e, exc_info=True
)
except pika_drv_cmns.PIKA_CONNECTIVITY_ERRORS as e:
LOG.warning(
"Connectivity exception during starting/recovering pika "
"poller %s", e, exc_info=True
)
except BaseException:
# NOTE (dukhlov): I preffer to use here BaseException because
# if this method raise such exception LoopingCall stops
# execution Probably it should never happen and Exception
# should be enough but in case of programmer mistake it could
# be and it is potentially hard to catch problem if we will
# stop background task. It is better when it continue to work
# and write a lot of LOG with this error
LOG.exception("Unexpected exception during "
"starting/recovering pika poller")
else:
self._recover_loopingcall = None
LOG.info("Recover consuming job was finished for listener: %s",
self)
raise loopingcall.LoopingCallDone(True)
return 0
def _start_or_recover_consuming(self):
"""Performs reconnection to the broker. It is unsafe method for
internal use only
"""
if self._connection is None or not self._connection.is_open:
self._connection = self._pika_engine.create_connection(
for_listening=True
)
self._connection.add_on_close_callback(self._on_connection_close)
self._channel = None
if self._channel is None or not self._channel.is_open:
if self._queues_to_consume:
for queue_info in self._queues_to_consume:
queue_info["consumer_tag"] = None
self._channel = self._connection.channel()
self._channel.add_on_close_callback(self._on_channel_close)
self._channel.add_on_cancel_callback(self._on_consumer_cancel)
self._channel.basic_qos(prefetch_count=self.prefetch_size)
if self._queues_to_consume is None:
self._queues_to_consume = self._declare_queue_binding()
self._start_consuming()
def _declare_queue_binding(self):
"""Is called by recovering connection logic if target RabbitMQ
exchange and (or) queue do not exist. Should be overridden in child
classes
:return Dictionary: declared_queue_name -> no_ack_mode
"""
raise NotImplementedError(
"It is base class. Please declare exchanges and queues here"
)
def _start_consuming(self):
"""Is called by recovering connection logic for starting consumption
of configured RabbitMQ queues
"""
assert self._queues_to_consume is not None
try:
for queue_info in self._queues_to_consume:
if queue_info["consumer_tag"] is not None:
continue
no_ack = queue_info["no_ack"]
on_message_callback = (
self._on_message_no_ack_callback if no_ack
else self._on_message_with_ack_callback
)
queue_info["consumer_tag"] = self._channel.basic_consume(
on_message_callback, queue_info["queue_name"],
no_ack=no_ack
)
except Exception:
self._queues_to_consume = None
raise
def _stop_consuming(self):
"""Is called by poller's stop logic for stopping consumption
of configured RabbitMQ queues
"""
assert self._queues_to_consume is not None
for queue_info in self._queues_to_consume:
consumer_tag = queue_info["consumer_tag"]
if consumer_tag is not None:
self._channel.basic_cancel(consumer_tag)
queue_info["consumer_tag"] = None
def start(self, on_incoming_callback):
"""Starts poller. Should be called before polling to allow message
consuming
:param on_incoming_callback: callback function to be executed when
listener received messages. Messages should be processed and
acked/nacked by callback
"""
super(PikaPoller, self).start(on_incoming_callback)
with self._lock:
if self._started:
return
connected = False
try:
self._start_or_recover_consuming()
except pika_drv_exc.EstablishConnectionException as exc:
LOG.warning(
"Can not establish connection during pika poller's "
"start(). %s", exc, exc_info=True
)
except pika_drv_exc.ConnectionException as exc:
LOG.warning(
"Connectivity problem during pika poller's start(). %s",
exc, exc_info=True
)
except pika_drv_cmns.PIKA_CONNECTIVITY_ERRORS as exc:
LOG.warning(
"Connectivity problem during pika poller's start(). %s",
exc, exc_info=True
)
else:
connected = True
self._started = True
if not connected:
self._start_recover_consuming_task()
def stop(self):
"""Stops poller. Should be called when polling is not needed anymore to
stop new message consuming. After that it is necessary to poll already
prefetched messages
"""
super(PikaPoller, self).stop()
with self._lock:
if not self._started:
return
if self._recover_loopingcall is not None:
self._recover_loopingcall.stop()
self._recover_loopingcall = None
if (self._queues_to_consume and self._channel and
self._channel.is_open):
try:
self._stop_consuming()
except pika_drv_cmns.PIKA_CONNECTIVITY_ERRORS as exc:
LOG.warning(
"Connectivity problem detected during consumer "
"cancellation. %s", exc, exc_info=True
)
self._deliver_cur_batch()
self._started = False
def cleanup(self):
"""Cleanup allocated resources (channel, connection, etc)."""
with self._lock:
if self._connection and self._connection.is_open:
try:
self._closing_connection_by_poller = True
self._connection.close()
self._closing_connection_by_poller = False
except pika_drv_cmns.PIKA_CONNECTIVITY_ERRORS:
# expected errors
pass
except Exception:
LOG.exception("Unexpected error during closing connection")
finally:
self._channel = None
self._connection = None
class RpcServicePikaPoller(PikaPoller):
"""PikaPoller implementation for polling RPC messages. Overrides base
functionality according to RPC specific
"""
def __init__(self, pika_engine, target, batch_size, batch_timeout,
prefetch_count):
"""Adds target parameter for declaring RPC specific exchanges and
queues
:param pika_engine: PikaEngine, shared object with configuration and
shared driver functionality
:param target: Target, oslo.messaging Target object which defines RPC
endpoint
:param batch_size: desired number of messages passed to
single on_incoming_callback call
:param batch_timeout: defines how long should we wait for batch_size
messages if we already have some messages waiting for processing
:param prefetch_count: Integer, maximum count of unacknowledged
messages which RabbitMQ broker sends to this consumer
"""
self._target = target
super(RpcServicePikaPoller, self).__init__(
pika_engine, batch_size, batch_timeout, prefetch_count,
pika_drv_msg.RpcPikaIncomingMessage
)
def _declare_queue_binding(self):
"""Overrides base method and perform declaration of RabbitMQ exchanges
and queues which correspond to oslo.messaging RPC target
:return Dictionary: declared_queue_name -> no_ack_mode
"""
queue_expiration = self._pika_engine.rpc_queue_expiration
exchange = self._pika_engine.get_rpc_exchange_name(
self._target.exchange
)
queues_to_consume = []
for no_ack in [True, False]:
queue = self._pika_engine.get_rpc_queue_name(
self._target.topic, None, no_ack
)
self._pika_engine.declare_queue_binding_by_channel(
channel=self._channel, exchange=exchange, queue=queue,
routing_key=queue, exchange_type='direct', durable=False,
queue_expiration=queue_expiration
)
queues_to_consume.append(
{"queue_name": queue, "no_ack": no_ack, "consumer_tag": None}
)
if self._target.server:
server_queue = self._pika_engine.get_rpc_queue_name(
self._target.topic, self._target.server, no_ack
)
self._pika_engine.declare_queue_binding_by_channel(
channel=self._channel, exchange=exchange, durable=False,
queue=server_queue, routing_key=server_queue,
exchange_type='direct', queue_expiration=queue_expiration
)
queues_to_consume.append(
{"queue_name": server_queue, "no_ack": no_ack,
"consumer_tag": None}
)
worker_queue = self._pika_engine.get_rpc_queue_name(
self._target.topic, self._target.server, no_ack, True
)
all_workers_routing_key = self._pika_engine.get_rpc_queue_name(
self._target.topic, "all_workers", no_ack
)
self._pika_engine.declare_queue_binding_by_channel(
channel=self._channel, exchange=exchange, durable=False,
queue=worker_queue, routing_key=all_workers_routing_key,
exchange_type='direct', queue_expiration=queue_expiration
)
queues_to_consume.append(
{"queue_name": worker_queue, "no_ack": no_ack,
"consumer_tag": None}
)
return queues_to_consume
class RpcReplyPikaPoller(PikaPoller):
"""PikaPoller implementation for polling RPC reply messages. Overrides
base functionality according to RPC reply specific
"""
def __init__(self, pika_engine, exchange, queue, batch_size, batch_timeout,
prefetch_count):
"""Adds exchange and queue parameter for declaring exchange and queue
used for RPC reply delivery
:param pika_engine: PikaEngine, shared object with configuration and
shared driver functionality
:param exchange: String, exchange name used for RPC reply delivery
:param queue: String, queue name used for RPC reply delivery
:param batch_size: desired number of messages passed to
single on_incoming_callback call
:param batch_timeout: defines how long should we wait for batch_size
messages if we already have some messages waiting for processing
:param prefetch_count: Integer, maximum count of unacknowledged
messages which RabbitMQ broker sends to this consumer
"""
self._exchange = exchange
self._queue = queue
super(RpcReplyPikaPoller, self).__init__(
pika_engine, batch_size, batch_timeout, prefetch_count,
pika_drv_msg.RpcReplyPikaIncomingMessage
)
def _declare_queue_binding(self):
"""Overrides base method and perform declaration of RabbitMQ exchange
and queue used for RPC reply delivery
:return Dictionary: declared_queue_name -> no_ack_mode
"""
self._pika_engine.declare_queue_binding_by_channel(
channel=self._channel,
exchange=self._exchange, queue=self._queue,
routing_key=self._queue, exchange_type='direct',
queue_expiration=self._pika_engine.rpc_queue_expiration,
durable=False
)
return [{"queue_name": self._queue, "no_ack": False,
"consumer_tag": None}]
class NotificationPikaPoller(PikaPoller):
"""PikaPoller implementation for polling Notification messages. Overrides
base functionality according to Notification specific
"""
def __init__(self, pika_engine, targets_and_priorities,
batch_size, batch_timeout, prefetch_count, queue_name=None):
"""Adds targets_and_priorities and queue_name parameter
for declaring exchanges and queues used for notification delivery
:param pika_engine: PikaEngine, shared object with configuration and
shared driver functionality
:param targets_and_priorities: list of (target, priority), defines
default queue names for corresponding notification types
:param batch_size: desired number of messages passed to
single on_incoming_callback call
:param batch_timeout: defines how long should we wait for batch_size
messages if we already have some messages waiting for processing
:param prefetch_count: Integer, maximum count of unacknowledged
messages which RabbitMQ broker sends to this consumer
:param queue: String, alternative queue name used for this poller
instead of default queue name
"""
self._targets_and_priorities = targets_and_priorities
self._queue_name = queue_name
super(NotificationPikaPoller, self).__init__(
pika_engine, batch_size, batch_timeout, prefetch_count,
pika_drv_msg.PikaIncomingMessage
)
def _declare_queue_binding(self):
"""Overrides base method and perform declaration of RabbitMQ exchanges
and queues used for notification delivery
:return Dictionary: declared_queue_name -> no_ack_mode
"""
queues_to_consume = []
for target, priority in self._targets_and_priorities:
routing_key = '%s.%s' % (target.topic, priority)
queue = self._queue_name or routing_key
self._pika_engine.declare_queue_binding_by_channel(
channel=self._channel,
exchange=(
target.exchange or
self._pika_engine.default_notification_exchange
),
queue=queue,
routing_key=routing_key,
exchange_type='direct',
queue_expiration=None,
durable=self._pika_engine.notification_persistence,
)
queues_to_consume.append(
{"queue_name": queue, "no_ack": False, "consumer_tag": None}
)
return queues_to_consume

View File

@ -23,11 +23,9 @@ import itertools
from oslo_messaging._drivers import amqp
from oslo_messaging._drivers.amqp1_driver import opts as amqp_opts
from oslo_messaging._drivers import base as drivers_base
from oslo_messaging._drivers import impl_pika
from oslo_messaging._drivers import impl_rabbit
from oslo_messaging._drivers.impl_zmq import zmq_options
from oslo_messaging._drivers.kafka_driver import kafka_options
from oslo_messaging._drivers.pika_driver import pika_connection_factory
from oslo_messaging._drivers.zmq_driver.matchmaker import zmq_matchmaker_redis
from oslo_messaging.notify import notifier
from oslo_messaging.rpc import client
@ -50,10 +48,7 @@ _opts = [
('oslo_messaging_amqp', amqp_opts.amqp1_opts),
('oslo_messaging_notifications', notifier._notifier_opts),
('oslo_messaging_rabbit', list(
itertools.chain(amqp.amqp_opts, impl_rabbit.rabbit_opts,
pika_connection_factory.pika_opts,
impl_pika.pika_pool_opts, impl_pika.message_opts,
impl_pika.notification_opts, impl_pika.rpc_opts))),
itertools.chain(amqp.amqp_opts, impl_rabbit.rabbit_opts))),
('oslo_messaging_kafka', kafka_options.KAFKA_OPTS),
]

View File

@ -1,615 +0,0 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import functools
import unittest
from concurrent import futures
from oslo_serialization import jsonutils
from oslo_utils import timeutils
import pika
from six.moves import mock
import oslo_messaging
from oslo_messaging._drivers.pika_driver import pika_commons as pika_drv_cmns
from oslo_messaging._drivers.pika_driver import pika_message as pika_drv_msg
class PikaIncomingMessageTestCase(unittest.TestCase):
def setUp(self):
self._pika_engine = mock.Mock()
self._channel = mock.Mock()
self._delivery_tag = 12345
self._method = pika.spec.Basic.Deliver(delivery_tag=self._delivery_tag)
self._properties = pika.BasicProperties(
content_type="application/json",
headers={"version": "1.0"},
)
self._body = (
b'{"_$_key_context":"context_value",'
b'"payload_key": "payload_value"}'
)
def test_message_body_parsing(self):
message = pika_drv_msg.PikaIncomingMessage(
self._pika_engine, self._channel, self._method, self._properties,
self._body
)
self.assertEqual("context_value",
message.ctxt.get("key_context", None))
self.assertEqual("payload_value",
message.message.get("payload_key", None))
def test_message_acknowledge(self):
message = pika_drv_msg.PikaIncomingMessage(
self._pika_engine, self._channel, self._method, self._properties,
self._body
)
message.acknowledge()
self.assertEqual(1, self._channel.basic_ack.call_count)
self.assertEqual({"delivery_tag": self._delivery_tag},
self._channel.basic_ack.call_args[1])
def test_message_acknowledge_no_ack(self):
message = pika_drv_msg.PikaIncomingMessage(
self._pika_engine, None, self._method, self._properties,
self._body
)
message.acknowledge()
self.assertEqual(0, self._channel.basic_ack.call_count)
def test_message_requeue(self):
message = pika_drv_msg.PikaIncomingMessage(
self._pika_engine, self._channel, self._method, self._properties,
self._body
)
message.requeue()
self.assertEqual(1, self._channel.basic_nack.call_count)
self.assertEqual({"delivery_tag": self._delivery_tag, 'requeue': True},
self._channel.basic_nack.call_args[1])
def test_message_requeue_no_ack(self):
message = pika_drv_msg.PikaIncomingMessage(
self._pika_engine, None, self._method, self._properties,
self._body
)
message.requeue()
self.assertEqual(0, self._channel.basic_nack.call_count)
class RpcPikaIncomingMessageTestCase(unittest.TestCase):
def setUp(self):
self._pika_engine = mock.Mock()
self._pika_engine.rpc_reply_retry_attempts = 3
self._pika_engine.rpc_reply_retry_delay = 0.25
self._channel = mock.Mock()
self._delivery_tag = 12345
self._method = pika.spec.Basic.Deliver(delivery_tag=self._delivery_tag)
self._body = (
b'{"_$_key_context":"context_value",'
b'"payload_key":"payload_value"}'
)
self._properties = pika.BasicProperties(
content_type="application/json",
headers={"version": "1.0"},
)
def test_call_message_body_parsing(self):
self._properties.correlation_id = 123456789
self._properties.reply_to = "reply_queue"
message = pika_drv_msg.RpcPikaIncomingMessage(
self._pika_engine, self._channel, self._method, self._properties,
self._body
)
self.assertEqual("context_value",
message.ctxt.get("key_context", None))
self.assertEqual(123456789, message.msg_id)
self.assertEqual("reply_queue", message.reply_q)
self.assertEqual("payload_value",
message.message.get("payload_key", None))
def test_cast_message_body_parsing(self):
message = pika_drv_msg.RpcPikaIncomingMessage(
self._pika_engine, self._channel, self._method, self._properties,
self._body
)
self.assertEqual("context_value",
message.ctxt.get("key_context", None))
self.assertIsNone(message.msg_id)
self.assertIsNone(message.reply_q)
self.assertEqual("payload_value",
message.message.get("payload_key", None))
@mock.patch(("oslo_messaging._drivers.pika_driver.pika_message."
"PikaOutgoingMessage.send"))
def test_reply_for_cast_message(self, send_reply_mock):
message = pika_drv_msg.RpcPikaIncomingMessage(
self._pika_engine, self._channel, self._method, self._properties,
self._body
)
self.assertEqual("context_value",
message.ctxt.get("key_context", None))
self.assertIsNone(message.msg_id)
self.assertIsNone(message.reply_q)
self.assertEqual("payload_value",
message.message.get("payload_key", None))
message.reply(reply=object())
self.assertEqual(0, send_reply_mock.call_count)
@mock.patch("oslo_messaging._drivers.pika_driver.pika_message."
"RpcReplyPikaOutgoingMessage")
@mock.patch("tenacity.retry")
def test_positive_reply_for_call_message(self,
retry_mock,
outgoing_message_mock):
self._properties.correlation_id = 123456789
self._properties.reply_to = "reply_queue"
message = pika_drv_msg.RpcPikaIncomingMessage(
self._pika_engine, self._channel, self._method, self._properties,
self._body
)
self.assertEqual("context_value",
message.ctxt.get("key_context", None))
self.assertEqual(123456789, message.msg_id)
self.assertEqual("reply_queue", message.reply_q)
self.assertEqual("payload_value",
message.message.get("payload_key", None))
reply = "all_fine"
message.reply(reply=reply)
outgoing_message_mock.assert_called_once_with(
self._pika_engine, 123456789, failure_info=None, reply='all_fine',
content_type='application/json'
)
outgoing_message_mock().send.assert_called_once_with(
reply_q='reply_queue', stopwatch=mock.ANY, retrier=mock.ANY
)
retry_mock.assert_called_once_with(
stop=mock.ANY, retry=mock.ANY, wait=mock.ANY
)
@mock.patch("oslo_messaging._drivers.pika_driver.pika_message."
"RpcReplyPikaOutgoingMessage")
@mock.patch("tenacity.retry")
def test_negative_reply_for_call_message(self,
retry_mock,
outgoing_message_mock):
self._properties.correlation_id = 123456789
self._properties.reply_to = "reply_queue"
message = pika_drv_msg.RpcPikaIncomingMessage(
self._pika_engine, self._channel, self._method, self._properties,
self._body
)
self.assertEqual("context_value",
message.ctxt.get("key_context", None))
self.assertEqual(123456789, message.msg_id)
self.assertEqual("reply_queue", message.reply_q)
self.assertEqual("payload_value",
message.message.get("payload_key", None))
failure_info = object()
message.reply(failure=failure_info)
outgoing_message_mock.assert_called_once_with(
self._pika_engine, 123456789,
failure_info=failure_info,
reply=None,
content_type='application/json'
)
outgoing_message_mock().send.assert_called_once_with(
reply_q='reply_queue', stopwatch=mock.ANY, retrier=mock.ANY
)
retry_mock.assert_called_once_with(
stop=mock.ANY, retry=mock.ANY, wait=mock.ANY
)
class RpcReplyPikaIncomingMessageTestCase(unittest.TestCase):
def setUp(self):
self._pika_engine = mock.Mock()
self._pika_engine.allowed_remote_exmods = [
pika_drv_cmns.EXCEPTIONS_MODULE, "oslo_messaging.exceptions"
]
self._channel = mock.Mock()
self._delivery_tag = 12345
self._method = pika.spec.Basic.Deliver(delivery_tag=self._delivery_tag)
self._properties = pika.BasicProperties(
content_type="application/json",
headers={"version": "1.0"},
correlation_id=123456789
)
def test_positive_reply_message_body_parsing(self):
body = b'{"s": "all fine"}'
message = pika_drv_msg.RpcReplyPikaIncomingMessage(
self._pika_engine, self._channel, self._method, self._properties,
body
)
self.assertEqual(123456789, message.msg_id)
self.assertIsNone(message.failure)
self.assertEqual("all fine", message.result)
def test_negative_reply_message_body_parsing(self):
body = (b'{'
b' "e": {'
b' "s": "Error message",'
b' "t": ["TRACE HERE"],'
b' "c": "MessagingException",'
b' "m": "oslo_messaging.exceptions"'
b' }'
b'}')
message = pika_drv_msg.RpcReplyPikaIncomingMessage(
self._pika_engine, self._channel, self._method, self._properties,
body
)
self.assertEqual(123456789, message.msg_id)
self.assertIsNone(message.result)
self.assertEqual(
'Error message\n'
'TRACE HERE',
str(message.failure)
)
self.assertIsInstance(message.failure,
oslo_messaging.MessagingException)
class PikaOutgoingMessageTestCase(unittest.TestCase):
def setUp(self):
self._pika_engine = mock.MagicMock()
self._pika_engine.default_content_type = "application/json"
self._exchange = "it is exchange"
self._routing_key = "it is routing key"
self._expiration = 1
self._stopwatch = (
timeutils.StopWatch(duration=self._expiration).start()
)
self._mandatory = object()
self._message = {"msg_type": 1, "msg_str": "hello"}
self._context = {"request_id": 555, "token": "it is a token"}
@mock.patch("oslo_serialization.jsonutils.dump_as_bytes",
new=functools.partial(jsonutils.dump_as_bytes, sort_keys=True))
def test_send_with_confirmation(self):
message = pika_drv_msg.PikaOutgoingMessage(
self._pika_engine, self._message, self._context
)
message.send(
exchange=self._exchange,
routing_key=self._routing_key,
confirm=True,
mandatory=self._mandatory,
persistent=True,
stopwatch=self._stopwatch,
retrier=None
)
self._pika_engine.connection_with_confirmation_pool.acquire(
).__enter__().channel.publish.assert_called_once_with(
body=mock.ANY,
exchange=self._exchange, mandatory=self._mandatory,
properties=mock.ANY,
routing_key=self._routing_key
)
body = self._pika_engine.connection_with_confirmation_pool.acquire(
).__enter__().channel.publish.call_args[1]["body"]
self.assertEqual(
b'{"_$_request_id": 555, "_$_token": "it is a token", '
b'"msg_str": "hello", "msg_type": 1}',
body
)
props = self._pika_engine.connection_with_confirmation_pool.acquire(
).__enter__().channel.publish.call_args[1]["properties"]
self.assertEqual('application/json', props.content_type)
self.assertEqual(2, props.delivery_mode)
self.assertTrue(self._expiration * 1000 - float(props.expiration) <
100)
self.assertEqual({'version': '1.0'}, props.headers)
self.assertTrue(props.message_id)
@mock.patch("oslo_serialization.jsonutils.dump_as_bytes",
new=functools.partial(jsonutils.dump_as_bytes, sort_keys=True))
def test_send_without_confirmation(self):
message = pika_drv_msg.PikaOutgoingMessage(
self._pika_engine, self._message, self._context
)
message.send(
exchange=self._exchange,
routing_key=self._routing_key,
confirm=False,
mandatory=self._mandatory,
persistent=False,
stopwatch=self._stopwatch,
retrier=None
)
self._pika_engine.connection_without_confirmation_pool.acquire(
).__enter__().channel.publish.assert_called_once_with(
body=mock.ANY,
exchange=self._exchange, mandatory=self._mandatory,
properties=mock.ANY,
routing_key=self._routing_key
)
body = self._pika_engine.connection_without_confirmation_pool.acquire(
).__enter__().channel.publish.call_args[1]["body"]
self.assertEqual(
b'{"_$_request_id": 555, "_$_token": "it is a token", '
b'"msg_str": "hello", "msg_type": 1}',
body
)
props = self._pika_engine.connection_without_confirmation_pool.acquire(
).__enter__().channel.publish.call_args[1]["properties"]
self.assertEqual('application/json', props.content_type)
self.assertEqual(1, props.delivery_mode)
self.assertTrue(self._expiration * 1000 - float(props.expiration)
< 100)
self.assertEqual({'version': '1.0'}, props.headers)
self.assertTrue(props.message_id)
class RpcPikaOutgoingMessageTestCase(unittest.TestCase):
def setUp(self):
self._exchange = "it is exchange"
self._routing_key = "it is routing key"
self._pika_engine = mock.MagicMock()
self._pika_engine.get_rpc_exchange_name.return_value = self._exchange
self._pika_engine.get_rpc_queue_name.return_value = self._routing_key
self._pika_engine.default_content_type = "application/json"
self._message = {"msg_type": 1, "msg_str": "hello"}
self._context = {"request_id": 555, "token": "it is a token"}
@mock.patch("oslo_serialization.jsonutils.dump_as_bytes",
new=functools.partial(jsonutils.dump_as_bytes, sort_keys=True))
def test_send_cast_message(self):
message = pika_drv_msg.RpcPikaOutgoingMessage(
self._pika_engine, self._message, self._context
)
expiration = 1
stopwatch = timeutils.StopWatch(duration=expiration).start()
message.send(
exchange=self._exchange,
routing_key=self._routing_key,
reply_listener=None,
stopwatch=stopwatch,
retrier=None
)
self._pika_engine.connection_with_confirmation_pool.acquire(
).__enter__().channel.publish.assert_called_once_with(
body=mock.ANY,
exchange=self._exchange, mandatory=True,
properties=mock.ANY,
routing_key=self._routing_key
)
body = self._pika_engine.connection_with_confirmation_pool.acquire(
).__enter__().channel.publish.call_args[1]["body"]
self.assertEqual(
b'{"_$_request_id": 555, "_$_token": "it is a token", '
b'"msg_str": "hello", "msg_type": 1}',
body
)
props = self._pika_engine.connection_with_confirmation_pool.acquire(
).__enter__().channel.publish.call_args[1]["properties"]
self.assertEqual('application/json', props.content_type)
self.assertEqual(1, props.delivery_mode)
self.assertTrue(expiration * 1000 - float(props.expiration) < 100)
self.assertEqual({'version': '1.0'}, props.headers)
self.assertIsNone(props.correlation_id)
self.assertIsNone(props.reply_to)
self.assertTrue(props.message_id)
@mock.patch("oslo_serialization.jsonutils.dump_as_bytes",
new=functools.partial(jsonutils.dump_as_bytes, sort_keys=True))
def test_send_call_message(self):
message = pika_drv_msg.RpcPikaOutgoingMessage(
self._pika_engine, self._message, self._context
)
expiration = 1
stopwatch = timeutils.StopWatch(duration=expiration).start()
result = "it is a result"
reply_queue_name = "reply_queue_name"
future = futures.Future()
future.set_result(result)
reply_listener = mock.Mock()
reply_listener.register_reply_waiter.return_value = future
reply_listener.get_reply_qname.return_value = reply_queue_name
res = message.send(
exchange=self._exchange,
routing_key=self._routing_key,
reply_listener=reply_listener,
stopwatch=stopwatch,
retrier=None
)
self.assertEqual(result, res)
self._pika_engine.connection_with_confirmation_pool.acquire(
).__enter__().channel.publish.assert_called_once_with(
body=mock.ANY,
exchange=self._exchange, mandatory=True,
properties=mock.ANY,
routing_key=self._routing_key
)
body = self._pika_engine.connection_with_confirmation_pool.acquire(
).__enter__().channel.publish.call_args[1]["body"]
self.assertEqual(
b'{"_$_request_id": 555, "_$_token": "it is a token", '
b'"msg_str": "hello", "msg_type": 1}',
body
)
props = self._pika_engine.connection_with_confirmation_pool.acquire(
).__enter__().channel.publish.call_args[1]["properties"]
self.assertEqual('application/json', props.content_type)
self.assertEqual(1, props.delivery_mode)
self.assertTrue(expiration * 1000 - float(props.expiration) < 100)
self.assertEqual({'version': '1.0'}, props.headers)
self.assertEqual(message.msg_id, props.correlation_id)
self.assertEqual(reply_queue_name, props.reply_to)
self.assertTrue(props.message_id)
class RpcReplyPikaOutgoingMessageTestCase(unittest.TestCase):
def setUp(self):
self._reply_q = "reply_queue_name"
self._expiration = 1
self._stopwatch = (
timeutils.StopWatch(duration=self._expiration).start()
)
self._pika_engine = mock.MagicMock()
self._rpc_reply_exchange = "rpc_reply_exchange"
self._pika_engine.rpc_reply_exchange = self._rpc_reply_exchange
self._pika_engine.default_content_type = "application/json"
self._msg_id = 12345567
@mock.patch("oslo_serialization.jsonutils.dump_as_bytes",
new=functools.partial(jsonutils.dump_as_bytes, sort_keys=True))
def test_success_message_send(self):
message = pika_drv_msg.RpcReplyPikaOutgoingMessage(
self._pika_engine, self._msg_id, reply="all_fine"
)
message.send(self._reply_q, stopwatch=self._stopwatch, retrier=None)
self._pika_engine.connection_with_confirmation_pool.acquire(
).__enter__().channel.publish.assert_called_once_with(
body=b'{"s": "all_fine"}',
exchange=self._rpc_reply_exchange, mandatory=True,
properties=mock.ANY,
routing_key=self._reply_q
)
props = self._pika_engine.connection_with_confirmation_pool.acquire(
).__enter__().channel.publish.call_args[1]["properties"]
self.assertEqual('application/json', props.content_type)
self.assertEqual(1, props.delivery_mode)
self.assertTrue(self._expiration * 1000 - float(props.expiration) <
100)
self.assertEqual({'version': '1.0'}, props.headers)
self.assertEqual(message.msg_id, props.correlation_id)
self.assertIsNone(props.reply_to)
self.assertTrue(props.message_id)
@mock.patch("traceback.format_exception", new=lambda x, y, z: z)
@mock.patch("oslo_serialization.jsonutils.dump_as_bytes",
new=functools.partial(jsonutils.dump_as_bytes, sort_keys=True))
def test_failure_message_send(self):
failure_info = (oslo_messaging.MessagingException,
oslo_messaging.MessagingException("Error message"),
['It is a trace'])
message = pika_drv_msg.RpcReplyPikaOutgoingMessage(
self._pika_engine, self._msg_id, failure_info=failure_info
)
message.send(self._reply_q, stopwatch=self._stopwatch, retrier=None)
self._pika_engine.connection_with_confirmation_pool.acquire(
).__enter__().channel.publish.assert_called_once_with(
body=mock.ANY,
exchange=self._rpc_reply_exchange,
mandatory=True,
properties=mock.ANY,
routing_key=self._reply_q
)
body = self._pika_engine.connection_with_confirmation_pool.acquire(
).__enter__().channel.publish.call_args[1]["body"]
self.assertEqual(
b'{"e": {"c": "MessagingException", '
b'"m": "oslo_messaging.exceptions", "s": "Error message", '
b'"t": ["It is a trace"]}}',
body
)
props = self._pika_engine.connection_with_confirmation_pool.acquire(
).__enter__().channel.publish.call_args[1]["properties"]
self.assertEqual('application/json', props.content_type)
self.assertEqual(1, props.delivery_mode)
self.assertTrue(self._expiration * 1000 - float(props.expiration) <
100)
self.assertEqual({'version': '1.0'}, props.headers)
self.assertEqual(message.msg_id, props.correlation_id)
self.assertIsNone(props.reply_to)
self.assertTrue(props.message_id)

View File

@ -1,482 +0,0 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import threading
import time
import unittest
from concurrent import futures
from six.moves import mock
from oslo_messaging._drivers.pika_driver import pika_exceptions as pika_drv_exc
from oslo_messaging._drivers.pika_driver import pika_poller
class PikaPollerTestCase(unittest.TestCase):
def setUp(self):
self._pika_engine = mock.Mock()
self._poller_connection_mock = mock.Mock()
self._poller_channel_mock = mock.Mock()
self._poller_connection_mock.channel.return_value = (
self._poller_channel_mock
)
self._pika_engine.create_connection.return_value = (
self._poller_connection_mock
)
self._executor = futures.ThreadPoolExecutor(1)
def timer_task(timeout, callback):
time.sleep(timeout)
callback()
self._poller_connection_mock.add_timeout.side_effect = (
lambda *args: self._executor.submit(timer_task, *args)
)
self._prefetch_count = 123
@mock.patch("oslo_messaging._drivers.pika_driver.pika_poller.PikaPoller."
"_declare_queue_binding")
def test_start(self, declare_queue_binding_mock):
poller = pika_poller.PikaPoller(
self._pika_engine, 1, None, self._prefetch_count, None
)
poller.start(None)
self.assertTrue(self._pika_engine.create_connection.called)
self.assertTrue(self._poller_connection_mock.channel.called)
self.assertTrue(declare_queue_binding_mock.called)
def test_start_when_connection_unavailable(self):
poller = pika_poller.PikaPoller(
self._pika_engine, 1, None, self._prefetch_count, None
)
self._pika_engine.create_connection.side_effect = (
pika_drv_exc.EstablishConnectionException
)
# start() should not raise socket.timeout exception
poller.start(None)
# stop is needed to stop reconnection background job
poller.stop()
@mock.patch("oslo_messaging._drivers.pika_driver.pika_poller.PikaPoller."
"_declare_queue_binding")
def test_message_processing(self, declare_queue_binding_mock):
res = []
def on_incoming_callback(incoming):
res.append(incoming)
incoming_message_class_mock = mock.Mock()
poller = pika_poller.PikaPoller(
self._pika_engine, 1, None, self._prefetch_count,
incoming_message_class=incoming_message_class_mock
)
unused = object()
method = object()
properties = object()
body = object()
poller.start(on_incoming_callback)
poller._on_message_with_ack_callback(
unused, method, properties, body
)
self.assertEqual(1, len(res))
self.assertEqual([incoming_message_class_mock.return_value], res[0])
incoming_message_class_mock.assert_called_once_with(
self._pika_engine, self._poller_channel_mock, method, properties,
body
)
self.assertTrue(self._pika_engine.create_connection.called)
self.assertTrue(self._poller_connection_mock.channel.called)
self.assertTrue(declare_queue_binding_mock.called)
@mock.patch("oslo_messaging._drivers.pika_driver.pika_poller.PikaPoller."
"_declare_queue_binding")
def test_message_processing_batch(self, declare_queue_binding_mock):
incoming_message_class_mock = mock.Mock()
n = 10
params = []
res = []
def on_incoming_callback(incoming):
res.append(incoming)
poller = pika_poller.PikaPoller(
self._pika_engine, n, None, self._prefetch_count,
incoming_message_class=incoming_message_class_mock
)
for i in range(n):
params.append((object(), object(), object(), object()))
poller.start(on_incoming_callback)
for i in range(n):
poller._on_message_with_ack_callback(
*params[i]
)
self.assertEqual(1, len(res))
self.assertEqual(10, len(res[0]))
self.assertEqual(n, incoming_message_class_mock.call_count)
for i in range(n):
self.assertEqual(incoming_message_class_mock.return_value,
res[0][i])
self.assertEqual(
(self._pika_engine, self._poller_channel_mock) + params[i][1:],
incoming_message_class_mock.call_args_list[i][0]
)
self.assertTrue(self._pika_engine.create_connection.called)
self.assertTrue(self._poller_connection_mock.channel.called)
self.assertTrue(declare_queue_binding_mock.called)
@mock.patch("oslo_messaging._drivers.pika_driver.pika_poller.PikaPoller."
"_declare_queue_binding")
def test_message_processing_batch_with_timeout(self,
declare_queue_binding_mock):
incoming_message_class_mock = mock.Mock()
n = 10
timeout = 1
res = []
evt = threading.Event()
def on_incoming_callback(incoming):
res.append(incoming)
evt.set()
poller = pika_poller.PikaPoller(
self._pika_engine, n, timeout, self._prefetch_count,
incoming_message_class=incoming_message_class_mock
)
params = []
success_count = 5
poller.start(on_incoming_callback)
for i in range(n):
params.append((object(), object(), object(), object()))
for i in range(success_count):
poller._on_message_with_ack_callback(
*params[i]
)
self.assertTrue(evt.wait(timeout * 2))
self.assertEqual(1, len(res))
self.assertEqual(success_count, len(res[0]))
self.assertEqual(success_count, incoming_message_class_mock.call_count)
for i in range(success_count):
self.assertEqual(incoming_message_class_mock.return_value,
res[0][i])
self.assertEqual(
(self._pika_engine, self._poller_channel_mock) + params[i][1:],
incoming_message_class_mock.call_args_list[i][0]
)
self.assertTrue(self._pika_engine.create_connection.called)
self.assertTrue(self._poller_connection_mock.channel.called)
self.assertTrue(declare_queue_binding_mock.called)
class RpcServicePikaPollerTestCase(unittest.TestCase):
def setUp(self):
self._pika_engine = mock.Mock()
self._poller_connection_mock = mock.Mock()
self._poller_channel_mock = mock.Mock()
self._poller_connection_mock.channel.return_value = (
self._poller_channel_mock
)
self._pika_engine.create_connection.return_value = (
self._poller_connection_mock
)
self._pika_engine.get_rpc_queue_name.side_effect = (
lambda topic, server, no_ack, worker=False:
"_".join([topic, str(server), str(no_ack), str(worker)])
)
self._pika_engine.get_rpc_exchange_name.side_effect = (
lambda exchange: exchange
)
self._prefetch_count = 123
self._target = mock.Mock(exchange="exchange", topic="topic",
server="server")
self._pika_engine.rpc_queue_expiration = 12345
@mock.patch("oslo_messaging._drivers.pika_driver.pika_message."
"RpcPikaIncomingMessage")
def test_declare_rpc_queue_bindings(self, rpc_pika_incoming_message_mock):
poller = pika_poller.RpcServicePikaPoller(
self._pika_engine, self._target, 1, None,
self._prefetch_count
)
poller.start(None)
self.assertTrue(self._pika_engine.create_connection.called)
self.assertTrue(self._poller_connection_mock.channel.called)
declare_queue_binding_by_channel_mock = (
self._pika_engine.declare_queue_binding_by_channel
)
self.assertEqual(
6, declare_queue_binding_by_channel_mock.call_count
)
declare_queue_binding_by_channel_mock.assert_has_calls((
mock.call(
channel=self._poller_channel_mock, durable=False,
exchange="exchange",
exchange_type='direct',
queue="topic_None_True_False",
queue_expiration=12345,
routing_key="topic_None_True_False"
),
mock.call(
channel=self._poller_channel_mock, durable=False,
exchange="exchange",
exchange_type='direct',
queue="topic_server_True_False",
queue_expiration=12345,
routing_key="topic_server_True_False"
),
mock.call(
channel=self._poller_channel_mock, durable=False,
exchange="exchange",
exchange_type='direct',
queue="topic_server_True_True",
queue_expiration=12345,
routing_key="topic_all_workers_True_False"
),
mock.call(
channel=self._poller_channel_mock, durable=False,
exchange="exchange",
exchange_type='direct',
queue="topic_None_False_False",
queue_expiration=12345,
routing_key="topic_None_False_False"
),
mock.call(
channel=self._poller_channel_mock, durable=False,
exchange="exchange",
exchange_type='direct',
queue="topic_server_False_False",
queue_expiration=12345,
routing_key='topic_server_False_False'
),
mock.call(
channel=self._poller_channel_mock, durable=False,
exchange="exchange",
exchange_type='direct',
queue="topic_server_False_True",
queue_expiration=12345,
routing_key='topic_all_workers_False_False'
)
))
class RpcReplyServicePikaPollerTestCase(unittest.TestCase):
def setUp(self):
self._pika_engine = mock.Mock()
self._poller_connection_mock = mock.Mock()
self._poller_channel_mock = mock.Mock()
self._poller_connection_mock.channel.return_value = (
self._poller_channel_mock
)
self._pika_engine.create_connection.return_value = (
self._poller_connection_mock
)
self._prefetch_count = 123
self._exchange = "rpc_reply_exchange"
self._queue = "rpc_reply_queue"
self._pika_engine.rpc_reply_retry_delay = 12132543456
self._pika_engine.rpc_queue_expiration = 12345
self._pika_engine.rpc_reply_retry_attempts = 3
def test_declare_rpc_reply_queue_binding(self):
poller = pika_poller.RpcReplyPikaPoller(
self._pika_engine, self._exchange, self._queue, 1, None,
self._prefetch_count,
)
poller.start(None)
poller.stop()
declare_queue_binding_by_channel_mock = (
self._pika_engine.declare_queue_binding_by_channel
)
self.assertEqual(
1, declare_queue_binding_by_channel_mock.call_count
)
declare_queue_binding_by_channel_mock.assert_called_once_with(
channel=self._poller_channel_mock, durable=False,
exchange='rpc_reply_exchange', exchange_type='direct',
queue='rpc_reply_queue', queue_expiration=12345,
routing_key='rpc_reply_queue'
)
class NotificationPikaPollerTestCase(unittest.TestCase):
def setUp(self):
self._pika_engine = mock.Mock()
self._poller_connection_mock = mock.Mock()
self._poller_channel_mock = mock.Mock()
self._poller_connection_mock.channel.return_value = (
self._poller_channel_mock
)
self._pika_engine.create_connection.return_value = (
self._poller_connection_mock
)
self._prefetch_count = 123
self._target_and_priorities = (
(
mock.Mock(exchange="exchange1", topic="topic1",
server="server1"), 1
),
(
mock.Mock(exchange="exchange1", topic="topic1"), 2
),
(
mock.Mock(exchange="exchange2", topic="topic2",), 1
),
)
self._pika_engine.notification_persistence = object()
def test_declare_notification_queue_bindings_default_queue(self):
poller = pika_poller.NotificationPikaPoller(
self._pika_engine, self._target_and_priorities, 1, None,
self._prefetch_count, None
)
poller.start(None)
self.assertTrue(self._pika_engine.create_connection.called)
self.assertTrue(self._poller_connection_mock.channel.called)
declare_queue_binding_by_channel_mock = (
self._pika_engine.declare_queue_binding_by_channel
)
self.assertEqual(
3, declare_queue_binding_by_channel_mock.call_count
)
declare_queue_binding_by_channel_mock.assert_has_calls((
mock.call(
channel=self._poller_channel_mock,
durable=self._pika_engine.notification_persistence,
exchange="exchange1",
exchange_type='direct',
queue="topic1.1",
queue_expiration=None,
routing_key="topic1.1"
),
mock.call(
channel=self._poller_channel_mock,
durable=self._pika_engine.notification_persistence,
exchange="exchange1",
exchange_type='direct',
queue="topic1.2",
queue_expiration=None,
routing_key="topic1.2"
),
mock.call(
channel=self._poller_channel_mock,
durable=self._pika_engine.notification_persistence,
exchange="exchange2",
exchange_type='direct',
queue="topic2.1",
queue_expiration=None,
routing_key="topic2.1"
)
))
def test_declare_notification_queue_bindings_custom_queue(self):
poller = pika_poller.NotificationPikaPoller(
self._pika_engine, self._target_and_priorities, 1, None,
self._prefetch_count, "custom_queue_name"
)
poller.start(None)
self.assertTrue(self._pika_engine.create_connection.called)
self.assertTrue(self._poller_connection_mock.channel.called)
declare_queue_binding_by_channel_mock = (
self._pika_engine.declare_queue_binding_by_channel
)
self.assertEqual(
3, declare_queue_binding_by_channel_mock.call_count
)
declare_queue_binding_by_channel_mock.assert_has_calls((
mock.call(
channel=self._poller_channel_mock,
durable=self._pika_engine.notification_persistence,
exchange="exchange1",
exchange_type='direct',
queue="custom_queue_name",
queue_expiration=None,
routing_key="topic1.1"
),
mock.call(
channel=self._poller_channel_mock,
durable=self._pika_engine.notification_persistence,
exchange="exchange1",
exchange_type='direct',
queue="custom_queue_name",
queue_expiration=None,
routing_key="topic1.2"
),
mock.call(
channel=self._poller_channel_mock,
durable=self._pika_engine.notification_persistence,
exchange="exchange2",
exchange_type='direct',
queue="custom_queue_name",
queue_expiration=None,
routing_key="topic2.1"
)
))

View File

@ -63,10 +63,6 @@ class RabbitMQFailoverTests(test_utils.BaseTestCase):
self.n2 = self.pifpaf.env["PIFPAF_RABBITMQ_NODENAME2"]
self.n3 = self.pifpaf.env["PIFPAF_RABBITMQ_NODENAME3"]
# NOTE(gdavoian): additional tweak for pika driver
if self.driver == "pika":
self.url = self.url.replace("rabbit", "pika")
# ensure connections will be establish to the first node
self.pifpaf.stop_node(self.n2)
self.pifpaf.stop_node(self.n3)
@ -115,19 +111,6 @@ class RabbitMQFailoverTests(test_utils.BaseTestCase):
return "callback done"
def _check_ports(self, port):
getattr(self, '_check_ports_%s_driver' % self.driver)(port)
def _check_ports_pika_driver(self, port):
rpc_server = self.servers.servers[0].server
# FIXME(sileht): Check other connections
connections = [
rpc_server.listener._connection
]
for conn in connections:
self.assertEqual(
port, conn._impl.socket.getpeername()[1])
def _check_ports_rabbit_driver(self, port):
rpc_server = self.servers.servers[0].server
connection_contexts = [
# rpc server

View File

@ -304,8 +304,6 @@ class SkipIfNoTransportURL(test_utils.BaseTestCase):
driver = os.environ.get("TRANSPORT_DRIVER")
if driver:
self.url = os.environ.get('PIFPAF_URL')
if driver == "pika" and self.url:
self.url = self.url.replace("rabbit://", "pika://")
else:
self.url = os.environ.get('TRANSPORT_URL')

View File

@ -1,15 +0,0 @@
- hosts: primary
tasks:
- name: Copy files from {{ ansible_user_dir }}/workspace/ on node
synchronize:
src: '{{ ansible_user_dir }}/workspace/'
dest: '{{ zuul.executor.log_root }}'
mode: pull
copy_links: true
verify_host: true
rsync_opts:
- --include=/logs/**
- --include=*/
- --exclude=*
- --prune-empty-dirs

View File

@ -1,42 +0,0 @@
- hosts: all
name: Autoconverted job legacy-oslo.messaging-src-dsvm-full-pika-default from old
job gate-oslo.messaging-src-dsvm-full-pika-default-ubuntu-xenial-nv
tasks:
- name: Ensure legacy workspace directory
file:
path: '{{ ansible_user_dir }}/workspace'
state: directory
- shell:
cmd: |
set -e
set -x
cat > clonemap.yaml << EOF
clonemap:
- name: openstack-infra/devstack-gate
dest: devstack-gate
EOF
/usr/zuul-env/bin/zuul-cloner -m clonemap.yaml --cache-dir /opt/git \
git://git.openstack.org \
openstack-infra/devstack-gate
executable: /bin/bash
chdir: '{{ ansible_user_dir }}/workspace'
environment: '{{ zuul | zuul_legacy_vars }}'
- shell:
cmd: |
set -e
set -x
export PYTHONUNBUFFERED=true
export DEVSTACK_GATE_TEMPEST=1
export DEVSTACK_GATE_TEMPEST_FULL=1
export PROJECTS="openstack/devstack-plugin-pika $PROJECTS"
export DEVSTACK_LOCAL_CONFIG="enable_plugin devstack-plugin-pika git://git.openstack.org/openstack/devstack-plugin-pika"
export DEVSTACK_PROJECT_FROM_GIT="oslo.messaging"
cp devstack-gate/devstack-vm-gate-wrap.sh ./safe-devstack-vm-gate-wrap.sh
./safe-devstack-vm-gate-wrap.sh
executable: /bin/bash
chdir: '{{ ansible_user_dir }}/workspace'
environment: '{{ zuul | zuul_legacy_vars }}'

View File

@ -1,80 +0,0 @@
- hosts: primary
tasks:
- name: Copy files from {{ ansible_user_dir }}/workspace/ on node
synchronize:
src: '{{ ansible_user_dir }}/workspace/'
dest: '{{ zuul.executor.log_root }}'
mode: pull
copy_links: true
verify_host: true
rsync_opts:
- --include=**/*nose_results.html
- --include=*/
- --exclude=*
- --prune-empty-dirs
- name: Copy files from {{ ansible_user_dir }}/workspace/ on node
synchronize:
src: '{{ ansible_user_dir }}/workspace/'
dest: '{{ zuul.executor.log_root }}'
mode: pull
copy_links: true
verify_host: true
rsync_opts:
- --include=**/*testr_results.html.gz
- --include=*/
- --exclude=*
- --prune-empty-dirs
- name: Copy files from {{ ansible_user_dir }}/workspace/ on node
synchronize:
src: '{{ ansible_user_dir }}/workspace/'
dest: '{{ zuul.executor.log_root }}'
mode: pull
copy_links: true
verify_host: true
rsync_opts:
- --include=/.testrepository/tmp*
- --include=*/
- --exclude=*
- --prune-empty-dirs
- name: Copy files from {{ ansible_user_dir }}/workspace/ on node
synchronize:
src: '{{ ansible_user_dir }}/workspace/'
dest: '{{ zuul.executor.log_root }}'
mode: pull
copy_links: true
verify_host: true
rsync_opts:
- --include=**/*testrepository.subunit.gz
- --include=*/
- --exclude=*
- --prune-empty-dirs
- name: Copy files from {{ ansible_user_dir }}/workspace/ on node
synchronize:
src: '{{ ansible_user_dir }}/workspace/'
dest: '{{ zuul.executor.log_root }}/tox'
mode: pull
copy_links: true
verify_host: true
rsync_opts:
- --include=/.tox/*/log/*
- --include=*/
- --exclude=*
- --prune-empty-dirs
- name: Copy files from {{ ansible_user_dir }}/workspace/ on node
synchronize:
src: '{{ ansible_user_dir }}/workspace/'
dest: '{{ zuul.executor.log_root }}'
mode: pull
copy_links: true
verify_host: true
rsync_opts:
- --include=/logs/**
- --include=*/
- --exclude=*
- --prune-empty-dirs

View File

@ -1,79 +0,0 @@
- hosts: all
name: Autoconverted job legacy-oslo.messaging-telemetry-dsvm-integration-pika from
old job gate-oslo.messaging-telemetry-dsvm-integration-pika-ubuntu-xenial-nv
tasks:
- name: Ensure legacy workspace directory
file:
path: '{{ ansible_user_dir }}/workspace'
state: directory
- shell:
cmd: |
set -e
set -x
cat > clonemap.yaml << EOF
clonemap:
- name: openstack-infra/devstack-gate
dest: devstack-gate
EOF
/usr/zuul-env/bin/zuul-cloner -m clonemap.yaml --cache-dir /opt/git \
git://git.openstack.org \
openstack-infra/devstack-gate
executable: /bin/bash
chdir: '{{ ansible_user_dir }}/workspace'
environment: '{{ zuul | zuul_legacy_vars }}'
- shell:
cmd: |
set -e
set -x
export PYTHONUNBUFFERED=true
export DEVSTACK_GATE_HEAT=1
export DEVSTACK_GATE_NEUTRON=1
export DEVSTACK_GATE_TEMPEST=1
export DEVSTACK_GATE_EXERCISES=0
export DEVSTACK_GATE_INSTALL_TESTONLY=1
export PROJECTS="openstack/ceilometer $PROJECTS"
export PROJECTS="openstack/aodh $PROJECTS"
export PROJECTS="openstack/devstack-plugin-pika $PROJECTS"
case "$ZUUL_BRANCH" in
"stable/ocata")
export DEVSTACK_LOCAL_CONFIG+=$'\n'"enable_plugin gnocchi git://git.openstack.org/openstack/gnocchi"
export DEVSTACK_LOCAL_CONFIG+=$'\n'"enable_plugin panko git://git.openstack.org/openstack/panko"
export OVERRIDE_GNOCCHI_PROJECT_BRANCH="stable/3.1"
export PROJECTS="openstack/panko $PROJECTS openstack/gnocchi"
;;
*)
export DEVSTACK_LOCAL_CONFIG+=$'\n'"enable_plugin panko git://git.openstack.org/openstack/panko"
export PROJECTS="openstack/panko $PROJECTS"
;;
esac
export DEVSTACK_LOCAL_CONFIG+=$'\n'"enable_plugin ceilometer git://git.openstack.org/openstack/ceilometer"
export DEVSTACK_LOCAL_CONFIG+=$'\n'"enable_plugin aodh git://git.openstack.org/openstack/aodh"
export DEVSTACK_LOCAL_CONFIG+=$'\n'"enable_plugin heat git://git.openstack.org/openstack/heat"
export DEVSTACK_LOCAL_CONFIG+=$'\n'"CEILOMETER_BACKEND=gnocchi"
export DEVSTACK_LOCAL_CONFIG+=$'\n'"GNOCCHI_ARCHIVE_POLICY=high"
export DEVSTACK_LOCAL_CONFIG+=$'\n'"CEILOMETER_PIPELINE_INTERVAL=5"
export DEVSTACK_LOCAL_CONFIG+=$'\n'"GNOCCHI_STORAGE_BACKEND=file"
export DEVSTACK_LOCAL_CONFIG+=$'\n'"enable_plugin devstack-plugin-pika git://git.openstack.org/openstack/devstack-plugin-pika"
export DEVSTACK_PROJECT_FROM_GIT="oslo.messaging"
function post_test_hook {
cd /opt/stack/new/ceilometer/ceilometer/tests/integration/hooks/
./post_test_hook.sh
}
export -f post_test_hook
cp devstack-gate/devstack-vm-gate-wrap.sh ./safe-devstack-vm-gate-wrap.sh
./safe-devstack-vm-gate-wrap.sh
executable: /bin/bash
chdir: '{{ ansible_user_dir }}/workspace'
environment: '{{ zuul | zuul_legacy_vars }}'

View File

@ -1,15 +0,0 @@
- hosts: primary
tasks:
- name: Copy files from {{ ansible_user_dir }}/workspace/ on node
synchronize:
src: '{{ ansible_user_dir }}/workspace/'
dest: '{{ zuul.executor.log_root }}'
mode: pull
copy_links: true
verify_host: true
rsync_opts:
- --include=/logs/**
- --include=*/
- --exclude=*
- --prune-empty-dirs

View File

@ -1,45 +0,0 @@
- hosts: all
name: Autoconverted job legacy-tempest-neutron-dsvm-src-oslo.messaging-pika-default
from old job gate-tempest-neutron-dsvm-src-oslo.messaging-pika-default-ubuntu-xenial-nv
tasks:
- name: Ensure legacy workspace directory
file:
path: '{{ ansible_user_dir }}/workspace'
state: directory
- shell:
cmd: |
set -e
set -x
cat > clonemap.yaml << EOF
clonemap:
- name: openstack-infra/devstack-gate
dest: devstack-gate
EOF
/usr/zuul-env/bin/zuul-cloner -m clonemap.yaml --cache-dir /opt/git \
git://git.openstack.org \
openstack-infra/devstack-gate
executable: /bin/bash
chdir: '{{ ansible_user_dir }}/workspace'
environment: '{{ zuul | zuul_legacy_vars }}'
- shell:
cmd: |
set -e
set -x
export PYTHONUNBUFFERED=true
export DEVSTACK_GATE_TEMPEST=1
export DEVSTACK_GATE_TEMPEST_FULL=1
export DEVSTACK_GATE_NEUTRON=1
export PROJECTS="openstack/devstack-plugin-pika $PROJECTS"
export DEVSTACK_LOCAL_CONFIG="enable_plugin devstack-plugin-pika git://git.openstack.org/openstack/devstack-plugin-pika"
export DEVSTACK_PROJECT_FROM_GIT="oslo.messaging"
cp devstack-gate/devstack-vm-gate-wrap.sh ./safe-devstack-vm-gate-wrap.sh
./safe-devstack-vm-gate-wrap.sh
executable: /bin/bash
chdir: '{{ ansible_user_dir }}/workspace'
environment: '{{ zuul | zuul_legacy_vars }}'

View File

@ -0,0 +1,8 @@
---
prelude: >
The Pika-based driver for RabbitMQ has been removed.
upgrade:
- |
Users of the Pika-based driver must change the prefix of all the
transport_url configuration options from "pika://..." to
"rabbit://..." to use the default kombu based RabbitMQ driver.

View File

@ -28,10 +28,8 @@ PyYAML>=3.12 # MIT
# we set the amqp version to ensure heartbeat works
amqp!=2.1.4,>=2.1.1 # BSD
kombu!=4.0.2,>=4.0.0 # BSD
pika>=0.10.0 # BSD
pika-pool>=0.1.3 # BSD
# used by pika and zmq drivers
# used by zmq driver
futures>=3.0.0;python_version=='2.7' or python_version=='2.6' # BSD
tenacity>=3.2.1 # Apache-2.0

View File

@ -41,7 +41,6 @@ oslo.messaging.drivers =
# This is just for internal testing
fake = oslo_messaging._drivers.impl_fake:FakeDriver
pika = oslo_messaging._drivers.impl_pika:PikaDriver
oslo.messaging.executors =
blocking = futurist:SynchronousExecutor
@ -56,21 +55,6 @@ oslo.messaging.notify.drivers =
noop = oslo_messaging.notify._impl_noop:NoOpDriver
routing = oslo_messaging.notify._impl_routing:RoutingDriver
oslo.messaging.pika.connection_factory =
# Creates new connection for each create_connection call. Old-style behaviour
# Uses a much more connections then single and read_write factories but still available as
# an option
new = oslo_messaging._drivers.pika_driver.pika_connection_factory:PikaConnectionFactory
# Creates only one connection for transport and return it for each create connection call
# it is default, but you can not use it with synchronous executor
single = oslo_messaging._drivers.pika_driver.pika_connection_factory:SinglePikaConnectionFactory
# Create two connections - one for listening and another one for sending and return them
# for each create connection call depending on connection purpose. Creates one more connection
# but you can use it with synchronous executor
read_write = oslo_messaging._drivers.pika_driver.pika_connection_factory:ReadWritePikaConnectionFactory
oslo.messaging.zmq.matchmaker =
# Matchmakers for ZeroMQ
dummy = oslo_messaging._drivers.zmq_driver.matchmaker.zmq_matchmaker_base:MatchmakerDummy

View File

@ -44,12 +44,6 @@ setenv =
basepython = python3.5
commands = pifpaf run rabbitmq -- python setup.py testr --slowest --testr-args='{posargs:oslo_messaging.tests.functional}'
[testenv:py27-func-pika]
setenv =
{[testenv]setenv}
TRANSPORT_DRIVER=pika
commands = pifpaf run rabbitmq -- python setup.py testr --slowest --testr-args='{posargs:oslo_messaging.tests.functional}'
[testenv:py27-func-kafka]
setenv =
{[testenv]setenv}