From 357dcb75abdfe1fc78e034d1913f478357cde18f Mon Sep 17 00:00:00 2001 From: Davanum Srinivas Date: Sun, 29 Nov 2015 18:26:32 -0500 Subject: [PATCH] Move ConnectionPool and ConnectionContext outside amqp.py ConnectionPool and ConnectionContext can be used by other drivers (like Kafka) and hence should be outside of amqp.py. * Moving ConnectionPool to pool.py * Moving ConnectionContext to common.py * Moving a couple of global variables to common.py No other logic changes, just refactoring Change-Id: I85154509a361690426772ef116590d38a965ca8d --- oslo_messaging/_drivers/amqp.py | 117 ------------------ oslo_messaging/_drivers/amqpdriver.py | 16 +-- oslo_messaging/_drivers/common.py | 96 ++++++++++++++ oslo_messaging/_drivers/impl_rabbit.py | 7 +- oslo_messaging/_drivers/pool.py | 26 ++++ .../tests/drivers/test_impl_rabbit.py | 12 +- 6 files changed, 140 insertions(+), 134 deletions(-) diff --git a/oslo_messaging/_drivers/amqp.py b/oslo_messaging/_drivers/amqp.py index 2308b80d1..86f41adde 100644 --- a/oslo_messaging/_drivers/amqp.py +++ b/oslo_messaging/_drivers/amqp.py @@ -31,7 +31,6 @@ from oslo_config import cfg import six from oslo_messaging._drivers import common as rpc_common -from oslo_messaging._drivers import pool deprecated_durable_opts = [ cfg.DeprecatedOpt('amqp_durable_queues', @@ -66,122 +65,6 @@ amqp_opts = [ UNIQUE_ID = '_unique_id' LOG = logging.getLogger(__name__) -# NOTE(sileht): Even if rabbit has only one Connection class, -# this connection can be used for two purposes: -# * wait and receive amqp messages (only do read stuffs on the socket) -# * send messages to the broker (only do write stuffs on the socket) -# The code inside a connection class is not concurrency safe. -# Using one Connection class instance for doing both, will result -# of eventlet complaining of multiple greenthreads that read/write the -# same fd concurrently... because 'send' and 'listen' run in different -# greenthread. -# So, a connection cannot be shared between thread/greenthread and -# this two variables permit to define the purpose of the connection -# to allow drivers to add special handling if needed (like heatbeat). -# amqp drivers create 3 kind of connections: -# * driver.listen*(): each call create a new 'PURPOSE_LISTEN' connection -# * driver.send*(): a pool of 'PURPOSE_SEND' connections is used -# * driver internally have another 'PURPOSE_LISTEN' connection dedicated -# to wait replies of rpc call -PURPOSE_LISTEN = 'listen' -PURPOSE_SEND = 'send' - - -class ConnectionPool(pool.Pool): - """Class that implements a Pool of Connections.""" - def __init__(self, conf, rpc_conn_pool_size, url, connection_cls): - self.connection_cls = connection_cls - self.conf = conf - self.url = url - super(ConnectionPool, self).__init__(rpc_conn_pool_size) - self.reply_proxy = None - - # TODO(comstud): Timeout connections not used in a while - def create(self, purpose=None): - if purpose is None: - purpose = PURPOSE_SEND - LOG.debug('Pool creating new connection') - return self.connection_cls(self.conf, self.url, purpose) - - def empty(self): - for item in self.iter_free(): - item.close() - - -class ConnectionContext(rpc_common.Connection): - """The class that is actually returned to the create_connection() caller. - - This is essentially a wrapper around Connection that supports 'with'. - It can also return a new Connection, or one from a pool. - - The function will also catch when an instance of this class is to be - deleted. With that we can return Connections to the pool on exceptions - and so forth without making the caller be responsible for catching them. - If possible the function makes sure to return a connection to the pool. - """ - - def __init__(self, connection_pool, purpose): - """Create a new connection, or get one from the pool.""" - self.connection = None - self.connection_pool = connection_pool - pooled = purpose == PURPOSE_SEND - if pooled: - self.connection = connection_pool.get() - else: - # a non-pooled connection is requested, so create a new connection - self.connection = connection_pool.create(purpose) - self.pooled = pooled - self.connection.pooled = pooled - - def __enter__(self): - """When with ConnectionContext() is used, return self.""" - return self - - def _done(self): - """If the connection came from a pool, clean it up and put it back. - If it did not come from a pool, close it. - """ - if self.connection: - if self.pooled: - # Reset the connection so it's ready for the next caller - # to grab from the pool - try: - self.connection.reset() - except Exception: - LOG.exception("Fail to reset the connection, drop it") - try: - self.connection.close() - except Exception: - pass - self.connection = self.connection_pool.create() - finally: - self.connection_pool.put(self.connection) - else: - try: - self.connection.close() - except Exception: - pass - self.connection = None - - def __exit__(self, exc_type, exc_value, tb): - """End of 'with' statement. We're done here.""" - self._done() - - def __del__(self): - """Caller is done with this connection. Make sure we cleaned up.""" - self._done() - - def close(self): - """Caller is done with this connection.""" - self._done() - - def __getattr__(self, key): - """Proxy all other calls to the Connection instance.""" - if self.connection: - return getattr(self.connection, key) - else: - raise rpc_common.InvalidRPCConnectionReuse() - class RpcContext(rpc_common.CommonRpcContext): """Context that supports replying to a rpc.call.""" diff --git a/oslo_messaging/_drivers/amqpdriver.py b/oslo_messaging/_drivers/amqpdriver.py index d3405086a..420587c45 100644 --- a/oslo_messaging/_drivers/amqpdriver.py +++ b/oslo_messaging/_drivers/amqpdriver.py @@ -100,7 +100,7 @@ class AMQPIncomingMessage(base.IncomingMessage): return with self.listener.driver._get_connection( - rpc_amqp.PURPOSE_SEND) as conn: + rpc_common.PURPOSE_SEND) as conn: if self.listener.driver.send_single_reply: self._send_reply(conn, reply, failure, log_failure=log_failure, ending=True) @@ -366,9 +366,9 @@ class AMQPDriverBase(base.BaseDriver): def _get_exchange(self, target): return target.exchange or self._default_exchange - def _get_connection(self, purpose=rpc_amqp.PURPOSE_SEND): - return rpc_amqp.ConnectionContext(self._connection_pool, - purpose=purpose) + def _get_connection(self, purpose=rpc_common.PURPOSE_SEND): + return rpc_common.ConnectionContext(self._connection_pool, + purpose=purpose) def _get_reply_q(self): with self._reply_q_lock: @@ -377,7 +377,7 @@ class AMQPDriverBase(base.BaseDriver): reply_q = 'reply_' + uuid.uuid4().hex - conn = self._get_connection(rpc_amqp.PURPOSE_LISTEN) + conn = self._get_connection(rpc_common.PURPOSE_LISTEN) self._waiter = ReplyWaiter(reply_q, conn, self._allowed_remote_exmods) @@ -422,7 +422,7 @@ class AMQPDriverBase(base.BaseDriver): log_msg = "CAST unique_id: %s " % unique_id try: - with self._get_connection(rpc_amqp.PURPOSE_SEND) as conn: + with self._get_connection(rpc_common.PURPOSE_SEND) as conn: if notify: exchange = self._get_exchange(target) log_msg += "NOTIFY exchange '%(exchange)s'" \ @@ -468,7 +468,7 @@ class AMQPDriverBase(base.BaseDriver): envelope=(version == 2.0), notify=True, retry=retry) def listen(self, target): - conn = self._get_connection(rpc_amqp.PURPOSE_LISTEN) + conn = self._get_connection(rpc_common.PURPOSE_LISTEN) listener = AMQPListener(self, conn) @@ -484,7 +484,7 @@ class AMQPDriverBase(base.BaseDriver): return listener def listen_for_notifications(self, targets_and_priorities, pool): - conn = self._get_connection(rpc_amqp.PURPOSE_LISTEN) + conn = self._get_connection(rpc_common.PURPOSE_LISTEN) listener = AMQPListener(self, conn) for target, priority in targets_and_priorities: diff --git a/oslo_messaging/_drivers/common.py b/oslo_messaging/_drivers/common.py index 78bdd9239..02c04805d 100644 --- a/oslo_messaging/_drivers/common.py +++ b/oslo_messaging/_drivers/common.py @@ -348,3 +348,99 @@ class DecayingTimer(object): if left <= 0 and timeout_callback is not None: timeout_callback(*args, **kwargs) return left if maximum is None else min(left, maximum) + + +# NOTE(sileht): Even if rabbit has only one Connection class, +# this connection can be used for two purposes: +# * wait and receive amqp messages (only do read stuffs on the socket) +# * send messages to the broker (only do write stuffs on the socket) +# The code inside a connection class is not concurrency safe. +# Using one Connection class instance for doing both, will result +# of eventlet complaining of multiple greenthreads that read/write the +# same fd concurrently... because 'send' and 'listen' run in different +# greenthread. +# So, a connection cannot be shared between thread/greenthread and +# this two variables permit to define the purpose of the connection +# to allow drivers to add special handling if needed (like heatbeat). +# amqp drivers create 3 kind of connections: +# * driver.listen*(): each call create a new 'PURPOSE_LISTEN' connection +# * driver.send*(): a pool of 'PURPOSE_SEND' connections is used +# * driver internally have another 'PURPOSE_LISTEN' connection dedicated +# to wait replies of rpc call +PURPOSE_LISTEN = 'listen' +PURPOSE_SEND = 'send' + + +class ConnectionContext(Connection): + """The class that is actually returned to the create_connection() caller. + + This is essentially a wrapper around Connection that supports 'with'. + It can also return a new Connection, or one from a pool. + + The function will also catch when an instance of this class is to be + deleted. With that we can return Connections to the pool on exceptions + and so forth without making the caller be responsible for catching them. + If possible the function makes sure to return a connection to the pool. + """ + + def __init__(self, connection_pool, purpose): + """Create a new connection, or get one from the pool.""" + self.connection = None + self.connection_pool = connection_pool + pooled = purpose == PURPOSE_SEND + if pooled: + self.connection = connection_pool.get() + else: + # a non-pooled connection is requested, so create a new connection + self.connection = connection_pool.create(purpose) + self.pooled = pooled + self.connection.pooled = pooled + + def __enter__(self): + """When with ConnectionContext() is used, return self.""" + return self + + def _done(self): + """If the connection came from a pool, clean it up and put it back. + If it did not come from a pool, close it. + """ + if self.connection: + if self.pooled: + # Reset the connection so it's ready for the next caller + # to grab from the pool + try: + self.connection.reset() + except Exception: + LOG.exception("Fail to reset the connection, drop it") + try: + self.connection.close() + except Exception: + pass + self.connection = self.connection_pool.create() + finally: + self.connection_pool.put(self.connection) + else: + try: + self.connection.close() + except Exception: + pass + self.connection = None + + def __exit__(self, exc_type, exc_value, tb): + """End of 'with' statement. We're done here.""" + self._done() + + def __del__(self): + """Caller is done with this connection. Make sure we cleaned up.""" + self._done() + + def close(self): + """Caller is done with this connection.""" + self._done() + + def __getattr__(self, key): + """Proxy all other calls to the Connection instance.""" + if self.connection: + return getattr(self.connection, key) + else: + raise InvalidRPCConnectionReuse() diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py index bfcc1c63a..69f413540 100644 --- a/oslo_messaging/_drivers/impl_rabbit.py +++ b/oslo_messaging/_drivers/impl_rabbit.py @@ -37,6 +37,7 @@ from oslo_messaging._drivers import amqp as rpc_amqp from oslo_messaging._drivers import amqpdriver from oslo_messaging._drivers import base from oslo_messaging._drivers import common as rpc_common +from oslo_messaging._drivers import pool from oslo_messaging._i18n import _ from oslo_messaging._i18n import _LE from oslo_messaging._i18n import _LI @@ -448,7 +449,7 @@ class Connection(object): # NOTE(sileht): if purpose is PURPOSE_LISTEN # we don't need the lock because we don't # have a heartbeat thread - if purpose == rpc_amqp.PURPOSE_SEND: + if purpose == rpc_common.PURPOSE_SEND: self._connection_lock = ConnectionLock() else: self._connection_lock = DummyConnectionLock() @@ -488,7 +489,7 @@ class Connection(object): # the consume code does the heartbeat stuff # we don't need a thread self._heartbeat_thread = None - if purpose == rpc_amqp.PURPOSE_SEND: + if purpose == rpc_common.PURPOSE_SEND: self._heartbeat_start() LOG.debug('Connected to AMQP server on %(hostname)s:%(port)s ' @@ -1151,7 +1152,7 @@ class RabbitDriver(amqpdriver.AMQPDriverBase): conf.register_opts(rpc_amqp.amqp_opts, group=opt_group) conf.register_opts(base.base_opts, group=opt_group) - connection_pool = rpc_amqp.ConnectionPool( + connection_pool = pool.ConnectionPool( conf, conf.oslo_messaging_rabbit.rpc_conn_pool_size, url, Connection) diff --git a/oslo_messaging/_drivers/pool.py b/oslo_messaging/_drivers/pool.py index e689d678a..699ce5c10 100644 --- a/oslo_messaging/_drivers/pool.py +++ b/oslo_messaging/_drivers/pool.py @@ -17,8 +17,13 @@ import abc import collections import threading +from oslo_log import log as logging import six +from oslo_messaging._drivers import common + +LOG = logging.getLogger(__name__) + @six.add_metaclass(abc.ABCMeta) class Pool(object): @@ -86,3 +91,24 @@ class Pool(object): @abc.abstractmethod def create(self): """Construct a new item.""" + + +class ConnectionPool(Pool): + """Class that implements a Pool of Connections.""" + def __init__(self, conf, rpc_conn_pool_size, url, connection_cls): + self.connection_cls = connection_cls + self.conf = conf + self.url = url + super(ConnectionPool, self).__init__(rpc_conn_pool_size) + self.reply_proxy = None + + # TODO(comstud): Timeout connections not used in a while + def create(self, purpose=None): + if purpose is None: + purpose = common.PURPOSE_SEND + LOG.debug('Pool creating new connection') + return self.connection_cls(self.conf, self.url, purpose) + + def empty(self): + for item in self.iter_free(): + item.close() diff --git a/oslo_messaging/tests/drivers/test_impl_rabbit.py b/oslo_messaging/tests/drivers/test_impl_rabbit.py index 06c78982a..441548d9a 100644 --- a/oslo_messaging/tests/drivers/test_impl_rabbit.py +++ b/oslo_messaging/tests/drivers/test_impl_rabbit.py @@ -177,7 +177,7 @@ class TestRabbitPublisher(test_utils.BaseTestCase): def test_send_with_timeout(self, fake_publish): transport = oslo_messaging.get_transport(self.conf, 'kombu+memory:////') - with transport._driver._get_connection(amqp.PURPOSE_SEND) as pool_conn: + with transport._driver._get_connection(driver_common.PURPOSE_SEND) as pool_conn: conn = pool_conn.connection conn._publish(mock.Mock(), 'msg', routing_key='routing_key', timeout=1) @@ -187,7 +187,7 @@ class TestRabbitPublisher(test_utils.BaseTestCase): def test_send_no_timeout(self, fake_publish): transport = oslo_messaging.get_transport(self.conf, 'kombu+memory:////') - with transport._driver._get_connection(amqp.PURPOSE_SEND) as pool_conn: + with transport._driver._get_connection(driver_common.PURPOSE_SEND) as pool_conn: conn = pool_conn.connection conn._publish(mock.Mock(), 'msg', routing_key='routing_key') fake_publish.assert_called_with('msg', expiration=None) @@ -207,7 +207,7 @@ class TestRabbitPublisher(test_utils.BaseTestCase): type='topic', passive=False) - with transport._driver._get_connection(amqp.PURPOSE_SEND) as pool_conn: + with transport._driver._get_connection(driver_common.PURPOSE_SEND) as pool_conn: conn = pool_conn.connection exc = conn.connection.channel_errors[0] @@ -240,7 +240,7 @@ class TestRabbitConsume(test_utils.BaseTestCase): 'kombu+memory:////') self.addCleanup(transport.cleanup) deadline = time.time() + 6 - with transport._driver._get_connection(amqp.PURPOSE_LISTEN) as conn: + with transport._driver._get_connection(driver_common.PURPOSE_LISTEN) as conn: self.assertRaises(driver_common.Timeout, conn.consume, timeout=3) @@ -259,7 +259,7 @@ class TestRabbitConsume(test_utils.BaseTestCase): transport = oslo_messaging.get_transport(self.conf, 'kombu+memory:////') self.addCleanup(transport.cleanup) - with transport._driver._get_connection(amqp.PURPOSE_LISTEN) as conn: + with transport._driver._get_connection(driver_common.PURPOSE_LISTEN) as conn: channel = conn.connection.channel with mock.patch('kombu.connection.Connection.connected', new_callable=mock.PropertyMock, @@ -902,7 +902,7 @@ class RpcKombuHATestCase(test_utils.BaseTestCase): # starting from the first broker in the list url = oslo_messaging.TransportURL.parse(self.conf, None) self.connection = rabbit_driver.Connection(self.conf, url, - amqp.PURPOSE_SEND) + driver_common.PURPOSE_SEND) self.addCleanup(self.connection.close) def test_ensure_four_retry(self):