Merge "Move ConnectionPool and ConnectionContext outside amqp.py"

This commit is contained in:
Jenkins 2015-11-30 18:33:06 +00:00 committed by Gerrit Code Review
commit f4f40ea9a5
6 changed files with 140 additions and 134 deletions

View File

@ -31,7 +31,6 @@ from oslo_config import cfg
import six
from oslo_messaging._drivers import common as rpc_common
from oslo_messaging._drivers import pool
deprecated_durable_opts = [
cfg.DeprecatedOpt('amqp_durable_queues',
@ -66,122 +65,6 @@ amqp_opts = [
UNIQUE_ID = '_unique_id'
LOG = logging.getLogger(__name__)
# NOTE(sileht): Even if rabbit has only one Connection class,
# this connection can be used for two purposes:
# * wait and receive amqp messages (only do read stuffs on the socket)
# * send messages to the broker (only do write stuffs on the socket)
# The code inside a connection class is not concurrency safe.
# Using one Connection class instance for doing both, will result
# of eventlet complaining of multiple greenthreads that read/write the
# same fd concurrently... because 'send' and 'listen' run in different
# greenthread.
# So, a connection cannot be shared between thread/greenthread and
# this two variables permit to define the purpose of the connection
# to allow drivers to add special handling if needed (like heatbeat).
# amqp drivers create 3 kind of connections:
# * driver.listen*(): each call create a new 'PURPOSE_LISTEN' connection
# * driver.send*(): a pool of 'PURPOSE_SEND' connections is used
# * driver internally have another 'PURPOSE_LISTEN' connection dedicated
# to wait replies of rpc call
PURPOSE_LISTEN = 'listen'
PURPOSE_SEND = 'send'
class ConnectionPool(pool.Pool):
"""Class that implements a Pool of Connections."""
def __init__(self, conf, rpc_conn_pool_size, url, connection_cls):
self.connection_cls = connection_cls
self.conf = conf
self.url = url
super(ConnectionPool, self).__init__(rpc_conn_pool_size)
self.reply_proxy = None
# TODO(comstud): Timeout connections not used in a while
def create(self, purpose=None):
if purpose is None:
purpose = PURPOSE_SEND
LOG.debug('Pool creating new connection')
return self.connection_cls(self.conf, self.url, purpose)
def empty(self):
for item in self.iter_free():
item.close()
class ConnectionContext(rpc_common.Connection):
"""The class that is actually returned to the create_connection() caller.
This is essentially a wrapper around Connection that supports 'with'.
It can also return a new Connection, or one from a pool.
The function will also catch when an instance of this class is to be
deleted. With that we can return Connections to the pool on exceptions
and so forth without making the caller be responsible for catching them.
If possible the function makes sure to return a connection to the pool.
"""
def __init__(self, connection_pool, purpose):
"""Create a new connection, or get one from the pool."""
self.connection = None
self.connection_pool = connection_pool
pooled = purpose == PURPOSE_SEND
if pooled:
self.connection = connection_pool.get()
else:
# a non-pooled connection is requested, so create a new connection
self.connection = connection_pool.create(purpose)
self.pooled = pooled
self.connection.pooled = pooled
def __enter__(self):
"""When with ConnectionContext() is used, return self."""
return self
def _done(self):
"""If the connection came from a pool, clean it up and put it back.
If it did not come from a pool, close it.
"""
if self.connection:
if self.pooled:
# Reset the connection so it's ready for the next caller
# to grab from the pool
try:
self.connection.reset()
except Exception:
LOG.exception("Fail to reset the connection, drop it")
try:
self.connection.close()
except Exception:
pass
self.connection = self.connection_pool.create()
finally:
self.connection_pool.put(self.connection)
else:
try:
self.connection.close()
except Exception:
pass
self.connection = None
def __exit__(self, exc_type, exc_value, tb):
"""End of 'with' statement. We're done here."""
self._done()
def __del__(self):
"""Caller is done with this connection. Make sure we cleaned up."""
self._done()
def close(self):
"""Caller is done with this connection."""
self._done()
def __getattr__(self, key):
"""Proxy all other calls to the Connection instance."""
if self.connection:
return getattr(self.connection, key)
else:
raise rpc_common.InvalidRPCConnectionReuse()
class RpcContext(rpc_common.CommonRpcContext):
"""Context that supports replying to a rpc.call."""

View File

@ -100,7 +100,7 @@ class AMQPIncomingMessage(base.IncomingMessage):
return
with self.listener.driver._get_connection(
rpc_amqp.PURPOSE_SEND) as conn:
rpc_common.PURPOSE_SEND) as conn:
if self.listener.driver.send_single_reply:
self._send_reply(conn, reply, failure, log_failure=log_failure,
ending=True)
@ -366,9 +366,9 @@ class AMQPDriverBase(base.BaseDriver):
def _get_exchange(self, target):
return target.exchange or self._default_exchange
def _get_connection(self, purpose=rpc_amqp.PURPOSE_SEND):
return rpc_amqp.ConnectionContext(self._connection_pool,
purpose=purpose)
def _get_connection(self, purpose=rpc_common.PURPOSE_SEND):
return rpc_common.ConnectionContext(self._connection_pool,
purpose=purpose)
def _get_reply_q(self):
with self._reply_q_lock:
@ -377,7 +377,7 @@ class AMQPDriverBase(base.BaseDriver):
reply_q = 'reply_' + uuid.uuid4().hex
conn = self._get_connection(rpc_amqp.PURPOSE_LISTEN)
conn = self._get_connection(rpc_common.PURPOSE_LISTEN)
self._waiter = ReplyWaiter(reply_q, conn,
self._allowed_remote_exmods)
@ -422,7 +422,7 @@ class AMQPDriverBase(base.BaseDriver):
log_msg = "CAST unique_id: %s " % unique_id
try:
with self._get_connection(rpc_amqp.PURPOSE_SEND) as conn:
with self._get_connection(rpc_common.PURPOSE_SEND) as conn:
if notify:
exchange = self._get_exchange(target)
log_msg += "NOTIFY exchange '%(exchange)s'" \
@ -468,7 +468,7 @@ class AMQPDriverBase(base.BaseDriver):
envelope=(version == 2.0), notify=True, retry=retry)
def listen(self, target):
conn = self._get_connection(rpc_amqp.PURPOSE_LISTEN)
conn = self._get_connection(rpc_common.PURPOSE_LISTEN)
listener = AMQPListener(self, conn)
@ -484,7 +484,7 @@ class AMQPDriverBase(base.BaseDriver):
return listener
def listen_for_notifications(self, targets_and_priorities, pool):
conn = self._get_connection(rpc_amqp.PURPOSE_LISTEN)
conn = self._get_connection(rpc_common.PURPOSE_LISTEN)
listener = AMQPListener(self, conn)
for target, priority in targets_and_priorities:

View File

@ -348,3 +348,99 @@ class DecayingTimer(object):
if left <= 0 and timeout_callback is not None:
timeout_callback(*args, **kwargs)
return left if maximum is None else min(left, maximum)
# NOTE(sileht): Even if rabbit has only one Connection class,
# this connection can be used for two purposes:
# * wait and receive amqp messages (only do read stuffs on the socket)
# * send messages to the broker (only do write stuffs on the socket)
# The code inside a connection class is not concurrency safe.
# Using one Connection class instance for doing both, will result
# of eventlet complaining of multiple greenthreads that read/write the
# same fd concurrently... because 'send' and 'listen' run in different
# greenthread.
# So, a connection cannot be shared between thread/greenthread and
# this two variables permit to define the purpose of the connection
# to allow drivers to add special handling if needed (like heatbeat).
# amqp drivers create 3 kind of connections:
# * driver.listen*(): each call create a new 'PURPOSE_LISTEN' connection
# * driver.send*(): a pool of 'PURPOSE_SEND' connections is used
# * driver internally have another 'PURPOSE_LISTEN' connection dedicated
# to wait replies of rpc call
PURPOSE_LISTEN = 'listen'
PURPOSE_SEND = 'send'
class ConnectionContext(Connection):
"""The class that is actually returned to the create_connection() caller.
This is essentially a wrapper around Connection that supports 'with'.
It can also return a new Connection, or one from a pool.
The function will also catch when an instance of this class is to be
deleted. With that we can return Connections to the pool on exceptions
and so forth without making the caller be responsible for catching them.
If possible the function makes sure to return a connection to the pool.
"""
def __init__(self, connection_pool, purpose):
"""Create a new connection, or get one from the pool."""
self.connection = None
self.connection_pool = connection_pool
pooled = purpose == PURPOSE_SEND
if pooled:
self.connection = connection_pool.get()
else:
# a non-pooled connection is requested, so create a new connection
self.connection = connection_pool.create(purpose)
self.pooled = pooled
self.connection.pooled = pooled
def __enter__(self):
"""When with ConnectionContext() is used, return self."""
return self
def _done(self):
"""If the connection came from a pool, clean it up and put it back.
If it did not come from a pool, close it.
"""
if self.connection:
if self.pooled:
# Reset the connection so it's ready for the next caller
# to grab from the pool
try:
self.connection.reset()
except Exception:
LOG.exception("Fail to reset the connection, drop it")
try:
self.connection.close()
except Exception:
pass
self.connection = self.connection_pool.create()
finally:
self.connection_pool.put(self.connection)
else:
try:
self.connection.close()
except Exception:
pass
self.connection = None
def __exit__(self, exc_type, exc_value, tb):
"""End of 'with' statement. We're done here."""
self._done()
def __del__(self):
"""Caller is done with this connection. Make sure we cleaned up."""
self._done()
def close(self):
"""Caller is done with this connection."""
self._done()
def __getattr__(self, key):
"""Proxy all other calls to the Connection instance."""
if self.connection:
return getattr(self.connection, key)
else:
raise InvalidRPCConnectionReuse()

View File

@ -37,6 +37,7 @@ from oslo_messaging._drivers import amqp as rpc_amqp
from oslo_messaging._drivers import amqpdriver
from oslo_messaging._drivers import base
from oslo_messaging._drivers import common as rpc_common
from oslo_messaging._drivers import pool
from oslo_messaging._i18n import _
from oslo_messaging._i18n import _LE
from oslo_messaging._i18n import _LI
@ -456,7 +457,7 @@ class Connection(object):
# NOTE(sileht): if purpose is PURPOSE_LISTEN
# we don't need the lock because we don't
# have a heartbeat thread
if purpose == rpc_amqp.PURPOSE_SEND:
if purpose == rpc_common.PURPOSE_SEND:
self._connection_lock = ConnectionLock()
else:
self._connection_lock = DummyConnectionLock()
@ -496,7 +497,7 @@ class Connection(object):
# the consume code does the heartbeat stuff
# we don't need a thread
self._heartbeat_thread = None
if purpose == rpc_amqp.PURPOSE_SEND:
if purpose == rpc_common.PURPOSE_SEND:
self._heartbeat_start()
LOG.debug('Connected to AMQP server on %(hostname)s:%(port)s '
@ -1159,7 +1160,7 @@ class RabbitDriver(amqpdriver.AMQPDriverBase):
conf.register_opts(rpc_amqp.amqp_opts, group=opt_group)
conf.register_opts(base.base_opts, group=opt_group)
connection_pool = rpc_amqp.ConnectionPool(
connection_pool = pool.ConnectionPool(
conf, conf.oslo_messaging_rabbit.rpc_conn_pool_size,
url, Connection)

View File

@ -17,8 +17,13 @@ import abc
import collections
import threading
from oslo_log import log as logging
import six
from oslo_messaging._drivers import common
LOG = logging.getLogger(__name__)
@six.add_metaclass(abc.ABCMeta)
class Pool(object):
@ -86,3 +91,24 @@ class Pool(object):
@abc.abstractmethod
def create(self):
"""Construct a new item."""
class ConnectionPool(Pool):
"""Class that implements a Pool of Connections."""
def __init__(self, conf, rpc_conn_pool_size, url, connection_cls):
self.connection_cls = connection_cls
self.conf = conf
self.url = url
super(ConnectionPool, self).__init__(rpc_conn_pool_size)
self.reply_proxy = None
# TODO(comstud): Timeout connections not used in a while
def create(self, purpose=None):
if purpose is None:
purpose = common.PURPOSE_SEND
LOG.debug('Pool creating new connection')
return self.connection_cls(self.conf, self.url, purpose)
def empty(self):
for item in self.iter_free():
item.close()

View File

@ -177,7 +177,7 @@ class TestRabbitPublisher(test_utils.BaseTestCase):
def test_send_with_timeout(self, fake_publish):
transport = oslo_messaging.get_transport(self.conf,
'kombu+memory:////')
with transport._driver._get_connection(amqp.PURPOSE_SEND) as pool_conn:
with transport._driver._get_connection(driver_common.PURPOSE_SEND) as pool_conn:
conn = pool_conn.connection
conn._publish(mock.Mock(), 'msg', routing_key='routing_key',
timeout=1)
@ -187,7 +187,7 @@ class TestRabbitPublisher(test_utils.BaseTestCase):
def test_send_no_timeout(self, fake_publish):
transport = oslo_messaging.get_transport(self.conf,
'kombu+memory:////')
with transport._driver._get_connection(amqp.PURPOSE_SEND) as pool_conn:
with transport._driver._get_connection(driver_common.PURPOSE_SEND) as pool_conn:
conn = pool_conn.connection
conn._publish(mock.Mock(), 'msg', routing_key='routing_key')
fake_publish.assert_called_with('msg', expiration=None)
@ -207,7 +207,7 @@ class TestRabbitPublisher(test_utils.BaseTestCase):
type='topic',
passive=False)
with transport._driver._get_connection(amqp.PURPOSE_SEND) as pool_conn:
with transport._driver._get_connection(driver_common.PURPOSE_SEND) as pool_conn:
conn = pool_conn.connection
exc = conn.connection.channel_errors[0]
@ -240,7 +240,7 @@ class TestRabbitConsume(test_utils.BaseTestCase):
'kombu+memory:////')
self.addCleanup(transport.cleanup)
deadline = time.time() + 6
with transport._driver._get_connection(amqp.PURPOSE_LISTEN) as conn:
with transport._driver._get_connection(driver_common.PURPOSE_LISTEN) as conn:
self.assertRaises(driver_common.Timeout,
conn.consume, timeout=3)
@ -259,7 +259,7 @@ class TestRabbitConsume(test_utils.BaseTestCase):
transport = oslo_messaging.get_transport(self.conf,
'kombu+memory:////')
self.addCleanup(transport.cleanup)
with transport._driver._get_connection(amqp.PURPOSE_LISTEN) as conn:
with transport._driver._get_connection(driver_common.PURPOSE_LISTEN) as conn:
channel = conn.connection.channel
with mock.patch('kombu.connection.Connection.connected',
new_callable=mock.PropertyMock,
@ -902,7 +902,7 @@ class RpcKombuHATestCase(test_utils.BaseTestCase):
# starting from the first broker in the list
url = oslo_messaging.TransportURL.parse(self.conf, None)
self.connection = rabbit_driver.Connection(self.conf, url,
amqp.PURPOSE_SEND)
driver_common.PURPOSE_SEND)
self.addCleanup(self.connection.close)
def test_ensure_four_retry(self):