From 63de855fef3e214029db606a4393bde8fae04352 Mon Sep 17 00:00:00 2001 From: Dmitriy Ukhlov Date: Thu, 28 Apr 2016 00:02:34 -0500 Subject: [PATCH] Implements configurable connection factory new - create new connection each times single - use single connection for whole transport read_write - use two connections for whole transport (one for listening and one for sending) Change-Id: I464c83beb498453b6df2237e7b8022d47ca3fa14 --- oslo_messaging/_drivers/impl_pika.py | 26 +- .../_drivers/pika_driver/pika_connection.py | 55 +++- .../pika_driver/pika_connection_factory.py | 307 ++++++++++++++++++ .../_drivers/pika_driver/pika_engine.py | 272 +++++----------- oslo_messaging/opts.py | 6 +- setup.cfg | 15 + 6 files changed, 454 insertions(+), 227 deletions(-) create mode 100644 oslo_messaging/_drivers/pika_driver/pika_connection_factory.py diff --git a/oslo_messaging/_drivers/impl_pika.py b/oslo_messaging/_drivers/impl_pika.py index 12590abe9..d9862c003 100644 --- a/oslo_messaging/_drivers/impl_pika.py +++ b/oslo_messaging/_drivers/impl_pika.py @@ -19,6 +19,8 @@ import pika_pool import retrying from oslo_messaging._drivers import base +from oslo_messaging._drivers.pika_driver import (pika_connection_factory as + pika_drv_conn_factory) from oslo_messaging._drivers.pika_driver import pika_commons as pika_drv_cmns from oslo_messaging._drivers.pika_driver import pika_engine as pika_drv_engine from oslo_messaging._drivers.pika_driver import pika_exceptions as pika_drv_exc @@ -29,27 +31,6 @@ from oslo_messaging import exceptions LOG = logging.getLogger(__name__) -pika_opts = [ - cfg.IntOpt('channel_max', default=None, - help='Maximum number of channels to allow'), - cfg.IntOpt('frame_max', default=None, - help='The maximum byte size for an AMQP frame'), - cfg.IntOpt('heartbeat_interval', default=3, - help="How often to send heartbeats for consumer's connections"), - cfg.BoolOpt('ssl', default=None, - help='Enable SSL'), - cfg.DictOpt('ssl_options', default=None, - help='Arguments passed to ssl.wrap_socket'), - cfg.FloatOpt('socket_timeout', default=0.25, - help="Set socket timeout in seconds for connection's socket"), - cfg.FloatOpt('tcp_user_timeout', default=0.25, - help="Set TCP_USER_TIMEOUT in seconds for connection's " - "socket"), - cfg.FloatOpt('host_connection_reconnect_delay', default=0.25, - help="Set delay for reconnection to some host which has " - "connection error") -] - pika_pool_opts = [ cfg.IntOpt('pool_max_size', default=30, help="Maximum number of connections to keep queued."), @@ -141,7 +122,7 @@ class PikaDriver(base.BaseDriver): opt_group = cfg.OptGroup(name='oslo_messaging_pika', title='Pika driver options') conf.register_group(opt_group) - conf.register_opts(pika_opts, group=opt_group) + conf.register_opts(pika_drv_conn_factory.pika_opts, group=opt_group) conf.register_opts(pika_pool_opts, group=opt_group) conf.register_opts(rpc_opts, group=opt_group) conf.register_opts(notification_opts, group=opt_group) @@ -350,3 +331,4 @@ class PikaDriver(base.BaseDriver): def cleanup(self): self._reply_listener.cleanup() + self._pika_engine.cleanup() diff --git a/oslo_messaging/_drivers/pika_driver/pika_connection.py b/oslo_messaging/_drivers/pika_driver/pika_connection.py index a2b1d00f8..f0dca5aee 100644 --- a/oslo_messaging/_drivers/pika_driver/pika_connection.py +++ b/oslo_messaging/_drivers/pika_driver/pika_connection.py @@ -18,7 +18,6 @@ import os import threading import futurist -import pika from pika.adapters import select_connection from pika import exceptions as pika_exceptions from pika import spec as pika_spec @@ -31,8 +30,9 @@ LOG = logging.getLogger(__name__) class ThreadSafePikaConnection(object): - def __init__(self, params=None): - self.params = params + def __init__(self, parameters=None, + _impl_class=select_connection.SelectConnection): + self.params = parameters self._connection_lock = threading.Lock() self._evt_closed = threading.Event() self._task_queue = collections.deque() @@ -45,8 +45,8 @@ class ThreadSafePikaConnection(object): pika_exceptions.AMQPConnectionError(err) ) - self._impl = pika.SelectConnection( - parameters=params, + self._impl = _impl_class( + parameters=parameters, on_open_callback=create_connection_future.set_result, on_open_error_callback=on_open_error, on_close_callback=self._on_connection_close, @@ -64,6 +64,10 @@ class ThreadSafePikaConnection(object): create_connection_future.result() + def _check_called_not_from_event_loop(self): + if current_thread() == self._thread_id: + raise RuntimeError("This call is not allowed from ioloop thread") + def _execute_task(self, func, *args, **kwargs): if current_thread() == self._thread_id: return func(*args, **kwargs) @@ -150,6 +154,8 @@ class ThreadSafePikaConnection(object): LOG.exception("Error during processing connection's IO") def close(self, *args, **kwargs): + self._check_called_not_from_event_loop() + res = self._execute_task(self._impl.close, *args, **kwargs) self._evt_closed.wait() @@ -157,6 +163,8 @@ class ThreadSafePikaConnection(object): return res def channel(self, channel_number=None): + self._check_called_not_from_event_loop() + channel_opened_future = self._register_pending_future() impl_channel = self._execute_task( @@ -250,12 +258,18 @@ class ThreadSafePikaChannel(object): # pylint: disable=R0904,R0902 self._impl.close(reply_code=reply_code, reply_text=reply_text) self._evt_closed.wait() + def _check_called_not_from_event_loop(self): + self._connection._check_called_not_from_event_loop() + def flow(self, active): + self._check_called_not_from_event_loop() + self._current_future = futurist.Future() self._execute_task( self._impl.flow, callback=self._current_future.set_result, active=active ) + return self._current_future.result() def basic_consume(self, # pylint: disable=R0913 @@ -265,6 +279,9 @@ class ThreadSafePikaChannel(object): # pylint: disable=R0904,R0902 exclusive=False, consumer_tag=None, arguments=None): + + self._check_called_not_from_event_loop() + self._current_future = futurist.Future() self._execute_task( self._impl.add_callback, self._current_future.set_result, @@ -288,6 +305,8 @@ class ThreadSafePikaChannel(object): # pylint: disable=R0904,R0902 return tag def basic_cancel(self, consumer_tag): + self._check_called_not_from_event_loop() + self._current_future = futurist.Future() self._execute_task( self._impl.basic_cancel, @@ -310,6 +329,8 @@ class ThreadSafePikaChannel(object): # pylint: disable=R0904,R0902 properties=None, mandatory=False, immediate=False): if self._delivery_confirmation: + self._check_called_not_from_event_loop() + # In publisher-acknowledgments mode self._message_returned = False self._current_future = futurist.Future() @@ -343,6 +364,8 @@ class ThreadSafePikaChannel(object): # pylint: disable=R0904,R0902 immediate=immediate) def basic_qos(self, prefetch_size=0, prefetch_count=0, all_channels=False): + self._check_called_not_from_event_loop() + self._current_future = futurist.Future() self._execute_task(self._impl.basic_qos, callback=self._current_future.set_result, @@ -352,6 +375,8 @@ class ThreadSafePikaChannel(object): # pylint: disable=R0904,R0902 self._current_future.result() def basic_recover(self, requeue=False): + self._check_called_not_from_event_loop() + self._current_future = futurist.Future() self._execute_task( self._impl.basic_recover, @@ -369,6 +394,8 @@ class ThreadSafePikaChannel(object): # pylint: disable=R0904,R0902 self._message_returned = True def confirm_delivery(self): + self._check_called_not_from_event_loop() + self._current_future = futurist.Future() self._execute_task(self._impl.add_callback, callback=self._current_future.set_result, @@ -387,6 +414,8 @@ class ThreadSafePikaChannel(object): # pylint: disable=R0904,R0902 exchange_type='direct', passive=False, durable=False, auto_delete=False, internal=False, arguments=None, **kwargs): + self._check_called_not_from_event_loop() + self._current_future = futurist.Future() self._execute_task(self._impl.exchange_declare, callback=self._current_future.set_result, @@ -403,6 +432,8 @@ class ThreadSafePikaChannel(object): # pylint: disable=R0904,R0902 return self._current_future.result() def exchange_delete(self, exchange=None, if_unused=False): + self._check_called_not_from_event_loop() + self._current_future = futurist.Future() self._execute_task(self._impl.exchange_delete, callback=self._current_future.set_result, @@ -414,6 +445,8 @@ class ThreadSafePikaChannel(object): # pylint: disable=R0904,R0902 def exchange_bind(self, destination=None, source=None, routing_key='', arguments=None): + self._check_called_not_from_event_loop() + self._current_future = futurist.Future() self._execute_task(self._impl.exchange_bind, callback=self._current_future.set_result, @@ -427,6 +460,8 @@ class ThreadSafePikaChannel(object): # pylint: disable=R0904,R0902 def exchange_unbind(self, destination=None, source=None, routing_key='', arguments=None): + self._check_called_not_from_event_loop() + self._current_future = futurist.Future() self._execute_task(self._impl.exchange_unbind, callback=self._current_future.set_result, @@ -441,6 +476,8 @@ class ThreadSafePikaChannel(object): # pylint: disable=R0904,R0902 def queue_declare(self, queue='', passive=False, durable=False, exclusive=False, auto_delete=False, arguments=None): + self._check_called_not_from_event_loop() + self._current_future = futurist.Future() self._execute_task(self._impl.queue_declare, callback=self._current_future.set_result, @@ -455,6 +492,8 @@ class ThreadSafePikaChannel(object): # pylint: disable=R0904,R0902 return self._current_future.result() def queue_delete(self, queue='', if_unused=False, if_empty=False): + self._check_called_not_from_event_loop() + self._current_future = futurist.Future() self._execute_task(self._impl.queue_delete, callback=self._current_future.set_result, @@ -466,6 +505,8 @@ class ThreadSafePikaChannel(object): # pylint: disable=R0904,R0902 return self._current_future.result() def queue_purge(self, queue=''): + self._check_called_not_from_event_loop() + self._current_future = futurist.Future() self._execute_task(self._impl.queue_purge, callback=self._current_future.set_result, @@ -475,6 +516,8 @@ class ThreadSafePikaChannel(object): # pylint: disable=R0904,R0902 def queue_bind(self, queue, exchange, routing_key=None, arguments=None): + self._check_called_not_from_event_loop() + self._current_future = futurist.Future() self._execute_task(self._impl.queue_bind, callback=self._current_future.set_result, @@ -487,6 +530,8 @@ class ThreadSafePikaChannel(object): # pylint: disable=R0904,R0902 def queue_unbind(self, queue='', exchange=None, routing_key=None, arguments=None): + self._check_called_not_from_event_loop() + self._current_future = futurist.Future() self._execute_task(self._impl.queue_unbind, callback=self._current_future.set_result, diff --git a/oslo_messaging/_drivers/pika_driver/pika_connection_factory.py b/oslo_messaging/_drivers/pika_driver/pika_connection_factory.py new file mode 100644 index 000000000..3df08060a --- /dev/null +++ b/oslo_messaging/_drivers/pika_driver/pika_connection_factory.py @@ -0,0 +1,307 @@ +# Copyright 2016 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import logging +import random +import socket +import threading +import time + +from oslo_config import cfg +import pika +from pika import credentials as pika_credentials + +from oslo_messaging._drivers.pika_driver import pika_commons as pika_drv_cmns +from oslo_messaging._drivers.pika_driver import pika_connection +from oslo_messaging._drivers.pika_driver import pika_exceptions as pika_drv_exc + +LOG = logging.getLogger(__name__) + +# constant for setting tcp_user_timeout socket option +# (it should be defined in 'select' module of standard library in future) +TCP_USER_TIMEOUT = 18 + +# constants for creating connection statistics +HOST_CONNECTION_LAST_TRY_TIME = "last_try_time" +HOST_CONNECTION_LAST_SUCCESS_TRY_TIME = "last_success_try_time" + +pika_opts = [ + cfg.IntOpt('channel_max', default=None, + help='Maximum number of channels to allow'), + cfg.IntOpt('frame_max', default=None, + help='The maximum byte size for an AMQP frame'), + cfg.IntOpt('heartbeat_interval', default=3, + help="How often to send heartbeats for consumer's connections"), + cfg.BoolOpt('ssl', default=None, + help='Enable SSL'), + cfg.DictOpt('ssl_options', default=None, + help='Arguments passed to ssl.wrap_socket'), + cfg.FloatOpt('socket_timeout', default=0.25, + help="Set socket timeout in seconds for connection's socket"), + cfg.FloatOpt('tcp_user_timeout', default=0.25, + help="Set TCP_USER_TIMEOUT in seconds for connection's " + "socket"), + cfg.FloatOpt('host_connection_reconnect_delay', default=0.25, + help="Set delay for reconnection to some host which has " + "connection error"), + cfg.StrOpt('connection_factory', default="single", + choices=["new", "single", "read_write"], + help='Connection factory implementation') +] + + +class PikaConnectionFactory(object): + + def __init__(self, url, conf): + self._url = url + self._conf = conf + + self._connection_lock = threading.RLock() + + if not url.hosts: + raise ValueError("You should provide at least one RabbitMQ host") + + # initializing connection parameters for configured RabbitMQ hosts + self._common_pika_params = { + 'virtual_host': url.virtual_host, + 'channel_max': conf.oslo_messaging_pika.channel_max, + 'frame_max': conf.oslo_messaging_pika.frame_max, + 'ssl': conf.oslo_messaging_pika.ssl, + 'ssl_options': conf.oslo_messaging_pika.ssl_options, + 'socket_timeout': conf.oslo_messaging_pika.socket_timeout + } + + self._host_list = url.hosts + self._heartbeat_interval = conf.oslo_messaging_pika.heartbeat_interval + self._host_connection_reconnect_delay = ( + conf.oslo_messaging_pika.host_connection_reconnect_delay + ) + self._tcp_user_timeout = conf.oslo_messaging_pika.tcp_user_timeout + + self._connection_host_status = {} + + self._cur_connection_host_num = random.randint( + 0, len(url.hosts) - 1 + ) + + def cleanup(self): + pass + + def create_connection(self, for_listening=False): + """Create and return connection to any available host. + + :return: created connection + :raise: ConnectionException if all hosts are not reachable + """ + + with self._connection_lock: + + host_count = len(self._host_list) + connection_attempts = host_count + + while connection_attempts > 0: + self._cur_connection_host_num += 1 + self._cur_connection_host_num %= host_count + try: + return self._create_host_connection( + self._cur_connection_host_num, for_listening + ) + except pika_drv_cmns.PIKA_CONNECTIVITY_ERRORS as e: + LOG.warning("Can't establish connection to host. %s", e) + except pika_drv_exc.HostConnectionNotAllowedException as e: + LOG.warning("Connection to host is not allowed. %s", e) + + connection_attempts -= 1 + + raise pika_drv_exc.EstablishConnectionException( + "Can not establish connection to any configured RabbitMQ " + "host: " + str(self._host_list) + ) + + def _set_tcp_user_timeout(self, s): + if not self._tcp_user_timeout: + return + try: + s.setsockopt( + socket.IPPROTO_TCP, TCP_USER_TIMEOUT, + int(self._tcp_user_timeout * 1000) + ) + except socket.error: + LOG.warning( + "Whoops, this kernel doesn't seem to support TCP_USER_TIMEOUT." + ) + + def _create_host_connection(self, host_index, for_listening): + """Create new connection to host #host_index + + :param host_index: Integer, number of host for connection establishing + :param for_listening: Boolean, creates connection for listening + if True + :return: New connection + """ + host = self._host_list[host_index] + + cur_time = time.time() + + host_connection_status = self._connection_host_status.get(host) + + if host_connection_status is None: + host_connection_status = { + HOST_CONNECTION_LAST_SUCCESS_TRY_TIME: 0, + HOST_CONNECTION_LAST_TRY_TIME: 0 + } + self._connection_host_status[host] = host_connection_status + + last_success_time = host_connection_status[ + HOST_CONNECTION_LAST_SUCCESS_TRY_TIME + ] + last_time = host_connection_status[ + HOST_CONNECTION_LAST_TRY_TIME + ] + + # raise HostConnectionNotAllowedException if we tried to establish + # connection in last 'host_connection_reconnect_delay' and got + # failure + if (last_time != last_success_time and + cur_time - last_time < + self._host_connection_reconnect_delay): + raise pika_drv_exc.HostConnectionNotAllowedException( + "Connection to host #{} is not allowed now because of " + "previous failure".format(host_index) + ) + + try: + connection = self._do_create_host_connection( + host, for_listening + ) + self._connection_host_status[host][ + HOST_CONNECTION_LAST_SUCCESS_TRY_TIME + ] = cur_time + + return connection + finally: + self._connection_host_status[host][ + HOST_CONNECTION_LAST_TRY_TIME + ] = cur_time + + def _do_create_host_connection(self, host, for_listening): + connection_params = pika.ConnectionParameters( + host=host.hostname, + port=host.port, + credentials=pika_credentials.PlainCredentials( + host.username, host.password + ), + heartbeat_interval=( + self._heartbeat_interval if for_listening else None + ), + **self._common_pika_params + ) + if for_listening: + connection = pika_connection.ThreadSafePikaConnection( + parameters=connection_params + ) + else: + connection = pika.BlockingConnection( + parameters=connection_params + ) + connection.params = connection_params + + self._set_tcp_user_timeout(connection._impl.socket) + return connection + + +class NotClosableConnection(object): + def __init__(self, connection): + self._connection = connection + + def __getattr__(self, item): + return getattr(self._connection, item) + + def close(self): + pass + + +class SinglePikaConnectionFactory(PikaConnectionFactory): + def __init__(self, url, conf): + super(SinglePikaConnectionFactory, self).__init__(url, conf) + self._connection = None + + def create_connection(self, for_listening=False): + with self._connection_lock: + if self._connection is None or not self._connection.is_open: + self._connection = ( + super(SinglePikaConnectionFactory, self).create_connection( + True + ) + ) + return NotClosableConnection(self._connection) + + def cleanup(self): + with self._connection_lock: + if self._connection is not None and self._connection.is_open: + try: + self._connection.close() + except Exception: + LOG.warning( + "Unexpected exception during connection closing", + exc_info=True + ) + self._connection = None + + +class ReadWritePikaConnectionFactory(PikaConnectionFactory): + def __init__(self, url, conf): + super(ReadWritePikaConnectionFactory, self).__init__(url, conf) + self._read_connection = None + self._write_connection = None + + def create_connection(self, for_listening=False): + with self._connection_lock: + if for_listening: + if (self._read_connection is None or + not self._read_connection.is_open): + self._read_connection = super( + ReadWritePikaConnectionFactory, self + ).create_connection(True) + return NotClosableConnection(self._read_connection) + else: + if (self._write_connection is None or + not self._write_connection.is_open): + self._write_connection = super( + ReadWritePikaConnectionFactory, self + ).create_connection(True) + return NotClosableConnection(self._write_connection) + + def cleanup(self): + with self._connection_lock: + if (self._read_connection is not None and + self._read_connection.is_open): + try: + self._read_connection.close() + except Exception: + LOG.warning( + "Unexpected exception during connection closing", + exc_info=True + ) + self._read_connection = None + + if (self._write_connection is not None and + self._write_connection.is_open): + try: + self._write_connection.close() + except Exception: + LOG.warning( + "Unexpected exception during connection closing", + exc_info=True + ) + self._write_connection = None diff --git a/oslo_messaging/_drivers/pika_driver/pika_engine.py b/oslo_messaging/_drivers/pika_driver/pika_engine.py index 3c29f3069..8ff4e9103 100644 --- a/oslo_messaging/_drivers/pika_driver/pika_engine.py +++ b/oslo_messaging/_drivers/pika_driver/pika_engine.py @@ -11,28 +11,20 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. +import logging import os -import random -import socket import threading -import time - -from oslo_log import log as logging -from oslo_utils import eventletutils -import pika -from pika import credentials as pika_credentials - -import pika_pool import uuid +from oslo_utils import eventletutils +import pika_pool +from stevedore import driver + from oslo_messaging._drivers.pika_driver import pika_commons as pika_drv_cmns -from oslo_messaging._drivers.pika_driver import pika_connection from oslo_messaging._drivers.pika_driver import pika_exceptions as pika_drv_exc LOG = logging.getLogger(__name__) -_PID = None - class _PooledConnectionWithConfirmations(pika_pool.Connection): """Derived from 'pika_pool.Connection' and extends its logic - adds @@ -53,17 +45,24 @@ class PikaEngine(object): etc. """ - # constants for creating connection statistics - HOST_CONNECTION_LAST_TRY_TIME = "last_try_time" - HOST_CONNECTION_LAST_SUCCESS_TRY_TIME = "last_success_try_time" - - # constant for setting tcp_user_timeout socket option - # (it should be defined in 'select' module of standard library in future) - TCP_USER_TIMEOUT = 18 - def __init__(self, conf, url, default_exchange=None, allowed_remote_exmods=None): self.conf = conf + self.url = url + + self._connection_factory_type = ( + self.conf.oslo_messaging_pika.connection_factory + ) + + self._connection_factory = None + self._connection_without_confirmation_pool = None + self._connection_with_confirmation_pool = None + self._pid = None + self._init_lock = threading.Lock() + + self.host_connection_reconnect_delay = ( + conf.oslo_messaging_pika.host_connection_reconnect_delay + ) # processing rpc options self.default_rpc_exchange = ( @@ -136,201 +135,78 @@ class PikaEngine(object): raise ValueError("notification_retry_delay should be non-negative " "integer") - self._tcp_user_timeout = self.conf.oslo_messaging_pika.tcp_user_timeout - self.host_connection_reconnect_delay = ( - self.conf.oslo_messaging_pika.host_connection_reconnect_delay - ) - self._heartbeat_interval = ( - self.conf.oslo_messaging_pika.heartbeat_interval - ) - - # initializing connection parameters for configured RabbitMQ hosts - self._common_pika_params = { - 'virtual_host': url.virtual_host, - 'channel_max': self.conf.oslo_messaging_pika.channel_max, - 'frame_max': self.conf.oslo_messaging_pika.frame_max, - 'ssl': self.conf.oslo_messaging_pika.ssl, - 'ssl_options': self.conf.oslo_messaging_pika.ssl_options, - 'socket_timeout': self.conf.oslo_messaging_pika.socket_timeout, - } - - self._connection_lock = threading.RLock() - self._pid = None - - self._connection_host_status = {} - - if not url.hosts: - raise ValueError("You should provide at least one RabbitMQ host") - - self._host_list = url.hosts - - self._cur_connection_host_num = random.randint( - 0, len(self._host_list) - 1 - ) - - # initializing 2 connection pools: 1st for connections without - # confirmations, 2nd - with confirmations - self.connection_without_confirmation_pool = pika_pool.QueuedPool( - create=self.create_connection, - max_size=self.conf.oslo_messaging_pika.pool_max_size, - max_overflow=self.conf.oslo_messaging_pika.pool_max_overflow, - timeout=self.conf.oslo_messaging_pika.pool_timeout, - recycle=self.conf.oslo_messaging_pika.pool_recycle, - stale=self.conf.oslo_messaging_pika.pool_stale, - ) - - self.connection_with_confirmation_pool = pika_pool.QueuedPool( - create=self.create_connection, - max_size=self.conf.oslo_messaging_pika.pool_max_size, - max_overflow=self.conf.oslo_messaging_pika.pool_max_overflow, - timeout=self.conf.oslo_messaging_pika.pool_timeout, - recycle=self.conf.oslo_messaging_pika.pool_recycle, - stale=self.conf.oslo_messaging_pika.pool_stale, - ) - - self.connection_with_confirmation_pool.Connection = ( - _PooledConnectionWithConfirmations - ) - - def create_connection(self, for_listening=False): - """Create and return connection to any available host. - - :return: created connection - :raise: ConnectionException if all hosts are not reachable - """ - - with self._connection_lock: - self._init_if_needed() - - host_count = len(self._host_list) - connection_attempts = host_count - - while connection_attempts > 0: - self._cur_connection_host_num += 1 - self._cur_connection_host_num %= host_count - try: - return self.create_host_connection( - self._cur_connection_host_num, for_listening - ) - except pika_pool.Connection.connectivity_errors as e: - LOG.warning("Can't establish connection to host. %s", e) - except pika_drv_exc.HostConnectionNotAllowedException as e: - LOG.warning("Connection to host is not allowed. %s", e) - - connection_attempts -= 1 - - raise pika_drv_exc.EstablishConnectionException( - "Can not establish connection to any configured RabbitMQ " - "host: " + str(self._host_list) - ) - - def _set_tcp_user_timeout(self, s): - if not self._tcp_user_timeout: - return - try: - s.setsockopt( - socket.IPPROTO_TCP, self.TCP_USER_TIMEOUT, - int(self._tcp_user_timeout * 1000) - ) - except socket.error: - LOG.warning( - "Whoops, this kernel doesn't seem to support TCP_USER_TIMEOUT." - ) - def _init_if_needed(self): - global _PID - cur_pid = os.getpid() - if _PID != cur_pid: - if _PID: + if self._pid == cur_pid: + return + + with self._init_lock: + if self._pid == cur_pid: + return + + if self._pid: LOG.warning("New pid is detected. Old: %s, new: %s. " - "Cleaning up...", _PID, cur_pid) - # Note(dukhlov): we need to force select poller usage in case when - # 'thread' module is monkey patched becase current eventlet - # implementation does not support patching of poll/epoll/kqueue + "Cleaning up...", self._pid, cur_pid) + + # Note(dukhlov): we need to force select poller usage in case + # when 'thread' module is monkey patched becase current + # eventlet implementation does not support patching of + # poll/epoll/kqueue if eventletutils.is_monkey_patched("thread"): from pika.adapters import select_connection select_connection.SELECT_TYPE = "select" - _PID = cur_pid - - def create_host_connection(self, host_index, for_listening=False): - """Create new connection to host #host_index - - :param host_index: Integer, number of host for connection establishing - :param for_listening: Boolean, creates connection for listening - if True - :return: New connection - """ - with self._connection_lock: - self._init_if_needed() - - host = self._host_list[host_index] - - connection_params = pika.ConnectionParameters( - host=host.hostname, - port=host.port, - credentials=pika_credentials.PlainCredentials( - host.username, host.password - ), - heartbeat_interval=( - self._heartbeat_interval if for_listening else None - ), - **self._common_pika_params + mgr = driver.DriverManager( + 'oslo.messaging.pika.connection_factory', + self._connection_factory_type ) - cur_time = time.time() + self._connection_factory = mgr.driver(self.url, self.conf) - host_connection_status = self._connection_host_status.get(host) + # initializing 2 connection pools: 1st for connections without + # confirmations, 2nd - with confirmations + self._connection_without_confirmation_pool = pika_pool.QueuedPool( + create=self.create_connection, + max_size=self.conf.oslo_messaging_pika.pool_max_size, + max_overflow=self.conf.oslo_messaging_pika.pool_max_overflow, + timeout=self.conf.oslo_messaging_pika.pool_timeout, + recycle=self.conf.oslo_messaging_pika.pool_recycle, + stale=self.conf.oslo_messaging_pika.pool_stale, + ) - if host_connection_status is None: - host_connection_status = { - self.HOST_CONNECTION_LAST_SUCCESS_TRY_TIME: 0, - self.HOST_CONNECTION_LAST_TRY_TIME: 0 - } - self._connection_host_status[host] = host_connection_status + self._connection_with_confirmation_pool = pika_pool.QueuedPool( + create=self.create_connection, + max_size=self.conf.oslo_messaging_pika.pool_max_size, + max_overflow=self.conf.oslo_messaging_pika.pool_max_overflow, + timeout=self.conf.oslo_messaging_pika.pool_timeout, + recycle=self.conf.oslo_messaging_pika.pool_recycle, + stale=self.conf.oslo_messaging_pika.pool_stale, + ) - last_success_time = host_connection_status[ - self.HOST_CONNECTION_LAST_SUCCESS_TRY_TIME - ] - last_time = host_connection_status[ - self.HOST_CONNECTION_LAST_TRY_TIME - ] + self._connection_with_confirmation_pool.Connection = ( + _PooledConnectionWithConfirmations + ) - # raise HostConnectionNotAllowedException if we tried to establish - # connection in last 'host_connection_reconnect_delay' and got - # failure - if (last_time != last_success_time and - cur_time - last_time < - self.host_connection_reconnect_delay): - raise pika_drv_exc.HostConnectionNotAllowedException( - "Connection to host #{} is not allowed now because of " - "previous failure".format(host_index) - ) + self._pid = cur_pid - try: - if for_listening: - connection = pika_connection.ThreadSafePikaConnection( - params=connection_params - ) - else: - connection = pika.BlockingConnection( - parameters=connection_params - ) - connection.params = connection_params + def create_connection(self, for_listening=False): + self._init_if_needed() + return self._connection_factory.create_connection(for_listening) - self._set_tcp_user_timeout(connection._impl.socket) + @property + def connection_without_confirmation_pool(self): + self._init_if_needed() + return self._connection_without_confirmation_pool - self._connection_host_status[host][ - self.HOST_CONNECTION_LAST_SUCCESS_TRY_TIME - ] = cur_time + @property + def connection_with_confirmation_pool(self): + self._init_if_needed() + return self._connection_with_confirmation_pool - return connection - finally: - self._connection_host_status[host][ - self.HOST_CONNECTION_LAST_TRY_TIME - ] = cur_time + def cleanup(self): + if self._connection_factory: + self._connection_factory.cleanup() def declare_exchange_by_channel(self, channel, exchange, exchange_type, durable): diff --git a/oslo_messaging/opts.py b/oslo_messaging/opts.py index eb0878d71..068774336 100644 --- a/oslo_messaging/opts.py +++ b/oslo_messaging/opts.py @@ -25,6 +25,7 @@ from oslo_messaging._drivers import base as drivers_base from oslo_messaging._drivers import impl_pika from oslo_messaging._drivers import impl_rabbit from oslo_messaging._drivers import impl_zmq +from oslo_messaging._drivers.pika_driver import pika_connection_factory from oslo_messaging._drivers.protocols.amqp import opts as amqp_opts from oslo_messaging._drivers.zmq_driver.matchmaker import matchmaker_redis from oslo_messaging.notify import notifier @@ -48,8 +49,9 @@ _opts = [ ('oslo_messaging_notifications', notifier._notifier_opts), ('oslo_messaging_rabbit', list( itertools.chain(amqp.amqp_opts, impl_rabbit.rabbit_opts, - impl_pika.pika_opts, impl_pika.pika_pool_opts, - impl_pika.notification_opts, impl_pika.rpc_opts))), + pika_connection_factory.pika_opts, + impl_pika.pika_pool_opts, impl_pika.notification_opts, + impl_pika.rpc_opts))), ] diff --git a/setup.cfg b/setup.cfg index df33fc45e..05c0f3d1c 100644 --- a/setup.cfg +++ b/setup.cfg @@ -55,6 +55,21 @@ oslo.messaging.notify.drivers = noop = oslo_messaging.notify._impl_noop:NoOpDriver routing = oslo_messaging.notify._impl_routing:RoutingDriver +oslo.messaging.pika.connection_factory = + # Creates new connection for each create_connection call. Old-style behaviour + # Uses a much more connections then single and read_write factories but still avalable as + # an option + new = oslo_messaging._drivers.pika_driver.pika_connection_factory:PikaConnectionFactory + + # Creates only one connection for transport and return it for each create connection call + # it is default, but you can not use it with synchronous executor + single = oslo_messaging._drivers.pika_driver.pika_connection_factory:SinglePikaConnectionFactory + + # Create two connections - one for listening and another one for sending and return them + # for each create connection call depending on connection purpose. Creates one more connection + # but you can use it with synchronous executor + read_write = oslo_messaging._drivers.pika_driver.pika_connection_factory:ReadWritePikaConnectionFactory + oslo.messaging.zmq.matchmaker = # Matchmakers for ZeroMQ dummy = oslo_messaging._drivers.zmq_driver.matchmaker.base:DummyMatchMaker