Move each drivers options into its own group

All drivers options are current stored into the DEFAULT group.
This change makes the configuration clearer by putting driver options
into a group named oslo_messaging_<driver>.

Closes-bug: #1417040
Change-Id: I96a9682afe7eb0caf1fbf47bbb0291833aec245b
This commit is contained in:
Mehdi Abaakouk 2015-01-28 08:57:21 +01:00
parent f5b9defce1
commit 824313ac9c
10 changed files with 156 additions and 87 deletions

View File

@ -42,11 +42,13 @@ amqp_opts = [
help='Use durable queues in AMQP.'),
cfg.BoolOpt('amqp_auto_delete',
default=False,
deprecated_group='DEFAULT',
help='Auto-delete queues in AMQP.'),
# FIXME(markmc): this was toplevel in openstack.common.rpc
cfg.IntOpt('rpc_conn_pool_size',
default=30,
deprecated_group='DEFAULT',
help='Size of RPC connection pool.'),
]
@ -56,11 +58,11 @@ LOG = logging.getLogger(__name__)
class ConnectionPool(pool.Pool):
"""Class that implements a Pool of Connections."""
def __init__(self, conf, url, connection_cls):
def __init__(self, conf, rpc_conn_pool_size, url, connection_cls):
self.connection_cls = connection_cls
self.conf = conf
self.url = url
super(ConnectionPool, self).__init__(self.conf.rpc_conn_pool_size)
super(ConnectionPool, self).__init__(rpc_conn_pool_size)
self.reply_proxy = None
# TODO(comstud): Timeout connections not used in a while

View File

@ -41,41 +41,52 @@ LOG = logging.getLogger(__name__)
qpid_opts = [
cfg.StrOpt('qpid_hostname',
default='localhost',
deprecated_group='DEFAULT',
help='Qpid broker hostname.'),
cfg.IntOpt('qpid_port',
default=5672,
deprecated_group='DEFAULT',
help='Qpid broker port.'),
cfg.ListOpt('qpid_hosts',
default=['$qpid_hostname:$qpid_port'],
deprecated_group='DEFAULT',
help='Qpid HA cluster host:port pairs.'),
cfg.StrOpt('qpid_username',
default='',
deprecated_group='DEFAULT',
help='Username for Qpid connection.'),
cfg.StrOpt('qpid_password',
default='',
deprecated_group='DEFAULT',
help='Password for Qpid connection.',
secret=True),
cfg.StrOpt('qpid_sasl_mechanisms',
default='',
deprecated_group='DEFAULT',
help='Space separated list of SASL mechanisms to use for '
'auth.'),
cfg.IntOpt('qpid_heartbeat',
default=60,
deprecated_group='DEFAULT',
help='Seconds between connection keepalive heartbeats.'),
cfg.StrOpt('qpid_protocol',
default='tcp',
deprecated_group='DEFAULT',
help="Transport to use, either 'tcp' or 'ssl'."),
cfg.BoolOpt('qpid_tcp_nodelay',
default=True,
deprecated_group='DEFAULT',
help='Whether to disable the Nagle algorithm.'),
cfg.IntOpt('qpid_receiver_capacity',
default=1,
deprecated_group='DEFAULT',
help='The number of prefetched messages held by receiver.'),
# NOTE(russellb) If any additional versions are added (beyond 1 and 2),
# this file could probably use some additional refactoring so that the
# differences between each version are split into different classes.
cfg.IntOpt('qpid_topology_version',
default=1,
deprecated_group='DEFAULT',
help="The qpid topology version to use. Version 1 is what "
"was originally used by impl_qpid. Version 2 includes "
"some backwards-incompatible changes that allow broker "
@ -459,6 +470,7 @@ class Connection(object):
self.session = None
self.consumers = {}
self.conf = conf
self.driver_conf = conf.oslo_messaging_qpid
self._consume_loop_stopped = False
@ -476,7 +488,7 @@ class Connection(object):
self.brokers_params.append(params)
else:
# Old configuration format
for adr in self.conf.qpid_hosts:
for adr in self.driver_conf.qpid_hosts:
hostname, port = netutils.parse_host_port(
adr, default_port=5672)
@ -485,8 +497,8 @@ class Connection(object):
params = {
'host': '%s:%d' % (hostname, port),
'username': self.conf.qpid_username,
'password': self.conf.qpid_password,
'username': self.driver_conf.qpid_username,
'password': self.driver_conf.qpid_password,
}
self.brokers_params.append(params)
@ -505,12 +517,12 @@ class Connection(object):
self.connection.username = broker['username']
self.connection.password = broker['password']
self.connection.sasl_mechanisms = self.conf.qpid_sasl_mechanisms
self.connection.sasl_mechanisms = self.driver_conf.qpid_sasl_mechanisms
# Reconnection is done by self.reconnect()
self.connection.reconnect = False
self.connection.heartbeat = self.conf.qpid_heartbeat
self.connection.transport = self.conf.qpid_protocol
self.connection.tcp_nodelay = self.conf.qpid_tcp_nodelay
self.connection.heartbeat = self.driver_conf.qpid_heartbeat
self.connection.transport = self.driver_conf.qpid_protocol
self.connection.tcp_nodelay = self.driver_conf.qpid_tcp_nodelay
self.connection.open()
def _register_consumer(self, consumer):
@ -633,7 +645,8 @@ class Connection(object):
"%(err_str)s"), log_info)
def _declare_consumer():
consumer = consumer_cls(self.conf, self.session, topic, callback)
consumer = consumer_cls(self.driver_conf, self.session, topic,
callback)
self._register_consumer(consumer)
return consumer
@ -693,7 +706,8 @@ class Connection(object):
"'%(topic)s': %(err_str)s"), log_info)
def _publisher_send():
publisher = cls(self.conf, self.session, topic=topic, **kwargs)
publisher = cls(self.driver_conf, self.session, topic=topic,
**kwargs)
publisher.send(msg)
return self.ensure(_connect_error, _publisher_send, retry=retry)
@ -764,10 +778,15 @@ class QpidDriver(amqpdriver.AMQPDriverBase):
def __init__(self, conf, url,
default_exchange=None, allowed_remote_exmods=None):
conf.register_opts(qpid_opts)
conf.register_opts(rpc_amqp.amqp_opts)
opt_group = cfg.OptGroup(name='oslo_messaging_qpid',
title='QPID driver options')
conf.register_group(opt_group)
conf.register_opts(qpid_opts, group=opt_group)
conf.register_opts(rpc_amqp.amqp_opts, group=opt_group)
connection_pool = rpc_amqp.ConnectionPool(conf, url, Connection)
connection_pool = rpc_amqp.ConnectionPool(
conf, conf.oslo_messaging_qpid.rpc_conn_pool_size,
url, Connection)
super(QpidDriver, self).__init__(conf, url,
connection_pool,

View File

@ -42,6 +42,7 @@ from oslo_messaging import exceptions
rabbit_opts = [
cfg.StrOpt('kombu_ssl_version',
default='',
deprecated_group='DEFAULT',
help='SSL version to use (valid only if SSL enabled). '
'Valid values are TLSv1 and SSLv23. SSLv2, SSLv3, '
'TLSv1_1, and TLSv1_2 may be available on some '
@ -49,57 +50,72 @@ rabbit_opts = [
),
cfg.StrOpt('kombu_ssl_keyfile',
default='',
deprecated_group='DEFAULT',
help='SSL key file (valid only if SSL enabled).'),
cfg.StrOpt('kombu_ssl_certfile',
default='',
deprecated_group='DEFAULT',
help='SSL cert file (valid only if SSL enabled).'),
cfg.StrOpt('kombu_ssl_ca_certs',
default='',
deprecated_group='DEFAULT',
help='SSL certification authority file '
'(valid only if SSL enabled).'),
cfg.FloatOpt('kombu_reconnect_delay',
default=1.0,
deprecated_group='DEFAULT',
help='How long to wait before reconnecting in response to an '
'AMQP consumer cancel notification.'),
cfg.StrOpt('rabbit_host',
default='localhost',
deprecated_group='DEFAULT',
help='The RabbitMQ broker address where a single node is '
'used.'),
cfg.IntOpt('rabbit_port',
default=5672,
deprecated_group='DEFAULT',
help='The RabbitMQ broker port where a single node is used.'),
cfg.ListOpt('rabbit_hosts',
default=['$rabbit_host:$rabbit_port'],
deprecated_group='DEFAULT',
help='RabbitMQ HA cluster host:port pairs.'),
cfg.BoolOpt('rabbit_use_ssl',
default=False,
deprecated_group='DEFAULT',
help='Connect over SSL for RabbitMQ.'),
cfg.StrOpt('rabbit_userid',
default='guest',
deprecated_group='DEFAULT',
help='The RabbitMQ userid.'),
cfg.StrOpt('rabbit_password',
default='guest',
deprecated_group='DEFAULT',
help='The RabbitMQ password.',
secret=True),
cfg.StrOpt('rabbit_login_method',
default='AMQPLAIN',
deprecated_group='DEFAULT',
help='The RabbitMQ login method.'),
cfg.StrOpt('rabbit_virtual_host',
default='/',
deprecated_group='DEFAULT',
help='The RabbitMQ virtual host.'),
cfg.IntOpt('rabbit_retry_interval',
default=1,
help='How frequently to retry connecting with RabbitMQ.'),
cfg.IntOpt('rabbit_retry_backoff',
default=2,
deprecated_group='DEFAULT',
help='How long to backoff for between retries when connecting '
'to RabbitMQ.'),
cfg.IntOpt('rabbit_max_retries',
default=0,
deprecated_group='DEFAULT',
help='Maximum number of RabbitMQ connection retries. '
'Default is 0 (infinite retry count).'),
cfg.BoolOpt('rabbit_ha_queues',
default=False,
deprecated_group='DEFAULT',
help='Use HA queues in RabbitMQ (x-ha-policy: all). '
'If you change this option, you must wipe the '
'RabbitMQ database.'),
@ -107,6 +123,7 @@ rabbit_opts = [
# NOTE(sileht): deprecated option since oslo_messaging 1.5.0,
cfg.BoolOpt('fake_rabbit',
default=False,
deprecated_group='DEFAULT',
help='Deprecated, use rpc_backend=kombu+memory or '
'rpc_backend=fake'),
]
@ -447,25 +464,26 @@ class Connection(object):
self.consumers = []
self.consumer_num = itertools.count(1)
self.conf = conf
self.max_retries = self.conf.rabbit_max_retries
self.driver_conf = self.conf.oslo_messaging_rabbit
self.max_retries = self.driver_conf.rabbit_max_retries
# Try forever?
if self.max_retries <= 0:
self.max_retries = None
self.interval_start = self.conf.rabbit_retry_interval
self.interval_stepping = self.conf.rabbit_retry_backoff
self.interval_start = self.driver_conf.rabbit_retry_interval
self.interval_stepping = self.driver_conf.rabbit_retry_backoff
# max retry-interval = 30 seconds
self.interval_max = 30
self._ssl_params = self._fetch_ssl_params()
self._login_method = self.conf.rabbit_login_method
self._login_method = self.driver_conf.rabbit_login_method
if url.virtual_host is not None:
virtual_host = url.virtual_host
else:
virtual_host = self.conf.rabbit_virtual_host
virtual_host = self.driver_conf.rabbit_virtual_host
self._url = ''
if self.conf.fake_rabbit:
if self.driver_conf.fake_rabbit:
LOG.warn("Deprecated: fake_rabbit option is deprecated, set "
"rpc_backend to kombu+memory or use the fake "
"driver instead.")
@ -487,13 +505,13 @@ class Connection(object):
transport = url.transport.replace('kombu+', '')
self._url = "%s://%s" % (transport, virtual_host)
else:
for adr in self.conf.rabbit_hosts:
for adr in self.driver_conf.rabbit_hosts:
hostname, port = netutils.parse_host_port(
adr, default_port=self.conf.rabbit_port)
adr, default_port=self.driver_conf.rabbit_port)
self._url += '%samqp://%s:%s@%s:%s/%s' % (
";" if self._url else '',
parse.quote(self.conf.rabbit_userid),
parse.quote(self.conf.rabbit_password),
parse.quote(self.driver_conf.rabbit_userid),
parse.quote(self.driver_conf.rabbit_password),
hostname, port,
virtual_host)
@ -561,15 +579,15 @@ class Connection(object):
ssl_params = dict()
# http://docs.python.org/library/ssl.html - ssl.wrap_socket
if self.conf.kombu_ssl_version:
if self.driver_conf.kombu_ssl_version:
ssl_params['ssl_version'] = self.validate_ssl_version(
self.conf.kombu_ssl_version)
if self.conf.kombu_ssl_keyfile:
ssl_params['keyfile'] = self.conf.kombu_ssl_keyfile
if self.conf.kombu_ssl_certfile:
ssl_params['certfile'] = self.conf.kombu_ssl_certfile
if self.conf.kombu_ssl_ca_certs:
ssl_params['ca_certs'] = self.conf.kombu_ssl_ca_certs
self.driver_conf.kombu_ssl_version)
if self.driver_conf.kombu_ssl_keyfile:
ssl_params['keyfile'] = self.driver_conf.kombu_ssl_keyfile
if self.driver_conf.kombu_ssl_certfile:
ssl_params['certfile'] = self.driver_conf.kombu_ssl_certfile
if self.driver_conf.kombu_ssl_ca_certs:
ssl_params['ca_certs'] = self.driver_conf.kombu_ssl_ca_certs
# We might want to allow variations in the
# future with this?
ssl_params['cert_reqs'] = ssl.CERT_REQUIRED
@ -602,8 +620,9 @@ class Connection(object):
def on_error(exc, interval):
error_callback and error_callback(exc)
interval = (self.conf.kombu_reconnect_delay + interval
if self.conf.kombu_reconnect_delay > 0 else interval)
interval = (self.driver_conf.kombu_reconnect_delay + interval
if self.driver_conf.kombu_reconnect_delay > 0
else interval)
info = {'hostname': self.connection.hostname,
'port': self.connection.port,
@ -628,8 +647,8 @@ class Connection(object):
# use kombu for HA connection, the interval_step
# should sufficient, because the underlying kombu transport
# connection object freed.
if self.conf.kombu_reconnect_delay > 0:
time.sleep(self.conf.kombu_reconnect_delay)
if self.driver_conf.kombu_reconnect_delay > 0:
time.sleep(self.driver_conf.kombu_reconnect_delay)
def on_reconnection(new_channel):
"""Callback invoked when the kombu reconnects and creates
@ -706,8 +725,8 @@ class Connection(object):
"%(err_str)s"), log_info)
def _declare_consumer():
consumer = consumer_cls(self.conf, self.channel, topic, callback,
six.next(self.consumer_num))
consumer = consumer_cls(self.driver_conf, self.channel, topic,
callback, six.next(self.consumer_num))
self.consumers.append(consumer)
return consumer
@ -766,7 +785,8 @@ class Connection(object):
"'%(topic)s': %(err_str)s"), log_info)
def _publish():
publisher = cls(self.conf, self.channel, topic=topic, **kwargs)
publisher = cls(self.driver_conf, self.channel, topic=topic,
**kwargs)
publisher.send(msg, timeout)
self.ensure(_error_callback, _publish, retry=retry)
@ -851,10 +871,15 @@ class RabbitDriver(amqpdriver.AMQPDriverBase):
def __init__(self, conf, url,
default_exchange=None,
allowed_remote_exmods=None):
conf.register_opts(rabbit_opts)
conf.register_opts(rpc_amqp.amqp_opts)
opt_group = cfg.OptGroup(name='oslo_messaging_rabbit',
title='RabbitMQ driver options')
conf.register_group(opt_group)
conf.register_opts(rabbit_opts, group=opt_group)
conf.register_opts(rpc_amqp.amqp_opts, group=opt_group)
connection_pool = rpc_amqp.ConnectionPool(conf, url, Connection)
connection_pool = rpc_amqp.ConnectionPool(
conf, conf.oslo_messaging_rabbit.rpc_conn_pool_size,
url, Connection)
super(RabbitDriver, self).__init__(conf, url,
connection_pool,

View File

@ -20,9 +20,9 @@ import sys
import fixtures
def _import_opts(conf, module, opts):
def _import_opts(conf, module, opts, group=None):
__import__(module)
conf.register_opts(getattr(sys.modules[module], opts))
conf.register_opts(getattr(sys.modules[module], opts), group=group)
class ConfFixture(fixtures.Fixture):
@ -45,11 +45,17 @@ class ConfFixture(fixtures.Fixture):
def __init__(self, conf):
self.conf = conf
_import_opts(self.conf,
'oslo_messaging._drivers.impl_rabbit', 'rabbit_opts')
'oslo_messaging._drivers.impl_rabbit', 'rabbit_opts',
'oslo_messaging_rabbit')
_import_opts(self.conf,
'oslo_messaging._drivers.impl_qpid', 'qpid_opts')
'oslo_messaging._drivers.amqp', 'amqp_opts',
'oslo_messaging_rabbit')
_import_opts(self.conf,
'oslo_messaging._drivers.amqp', 'amqp_opts')
'oslo_messaging._drivers.impl_qpid', 'qpid_opts',
'oslo_messaging_qpid')
_import_opts(self.conf,
'oslo_messaging._drivers.amqp', 'amqp_opts',
'oslo_messaging_qpid')
_import_opts(self.conf, 'oslo_messaging.rpc.client', '_client_opts')
_import_opts(self.conf, 'oslo_messaging.transport', '_transport_opts')
_import_opts(self.conf,

View File

@ -34,9 +34,6 @@ from oslo_messaging.rpc import client
from oslo_messaging import transport
_global_opt_lists = [
amqp.amqp_opts,
impl_qpid.qpid_opts,
impl_rabbit.rabbit_opts,
impl_zmq.zmq_opts,
matchmaker.matchmaker_opts,
base._pool_opts,
@ -50,6 +47,10 @@ _opts = [
('matchmaker_redis', matchmaker_redis.matchmaker_redis_opts),
('matchmaker_ring', matchmaker_ring.matchmaker_opts),
('oslo_messaging_amqp', amqp_opts.amqp1_opts),
('oslo_messaging_rabbit', list(itertools.chain(amqp.amqp_opts,
impl_rabbit.rabbit_opts))),
('oslo_messaging_qpid', list(itertools.chain(amqp.amqp_opts,
impl_qpid.qpid_opts)))
]

View File

@ -187,7 +187,8 @@ class TestQpidInvalidTopologyVersion(_QpidBaseTestCase):
def setUp(self):
super(TestQpidInvalidTopologyVersion, self).setUp()
self.config(qpid_topology_version=-1)
self.config(qpid_topology_version=-1,
group='oslo_messaging_qpid')
def test_invalid_topology_version(self):
def consumer_callback(msg):
@ -199,11 +200,11 @@ class TestQpidInvalidTopologyVersion(_QpidBaseTestCase):
# 1. qpid driver raises Exception(msg) for invalid topology version
# 2. flake8 - H202 assertRaises Exception too broad
exception_msg = ("Invalid value for qpid_topology_version: %d" %
self.conf.qpid_topology_version)
self.conf.oslo_messaging_qpid.qpid_topology_version)
recvd_exc_msg = ''
try:
self.consumer_cls(self.conf,
self.consumer_cls(self.conf.oslo_messaging_qpid,
self.session_receive,
msgid_or_topic,
consumer_callback,
@ -215,7 +216,7 @@ class TestQpidInvalidTopologyVersion(_QpidBaseTestCase):
recvd_exc_msg = ''
try:
self.publisher_cls(self.conf,
self.publisher_cls(self.conf.oslo_messaging_qpid,
self.session_send,
topic=msgid_or_topic,
**self.publisher_kwargs)
@ -258,13 +259,15 @@ class TestQpidDirectConsumerPublisher(_QpidBaseTestCase):
self.msgid = str(random.randint(1, 100))
# create a DirectConsumer and DirectPublisher class objects
self.dir_cons = qpid_driver.DirectConsumer(self.conf,
self.session_receive,
self.msgid,
self.consumer_callback)
self.dir_pub = qpid_driver.DirectPublisher(self.conf,
self.session_send,
self.msgid)
self.dir_cons = qpid_driver.DirectConsumer(
self.conf.oslo_messaging_qpid,
self.session_receive,
self.msgid,
self.consumer_callback)
self.dir_pub = qpid_driver.DirectPublisher(
self.conf.oslo_messaging_qpid,
self.session_send,
self.msgid)
def try_send_msg(no_msgs):
for i in range(no_msgs):
@ -418,7 +421,7 @@ class TestQpidTopicAndFanout(_QpidBaseTestCase):
def test_qpid_topic_and_fanout(self):
for receiver_id in range(self.no_receivers):
consumer = self.consumer_cls(self.conf,
consumer = self.consumer_cls(self.conf.oslo_messaging_qpid,
self.session_receive,
self.receive_topic,
self.consumer_callback,
@ -431,7 +434,7 @@ class TestQpidTopicAndFanout(_QpidBaseTestCase):
self._receiver_threads.append(thread)
for sender_id in range(self.no_senders):
publisher = self.publisher_cls(self.conf,
publisher = self.publisher_cls(self.conf.oslo_messaging_qpid,
self.session_send,
topic=self.topic,
**self.publisher_kwargs)
@ -483,7 +486,8 @@ class TestDriverInterface(_QpidBaseTestCase):
def setUp(self):
super(TestDriverInterface, self).setUp()
self.config(qpid_topology_version=2)
self.config(qpid_topology_version=2,
group='oslo_messaging_qpid')
transport = oslo_messaging.get_transport(self.conf)
self.driver = transport._driver
@ -554,7 +558,8 @@ class TestQpidReconnectOrder(test_utils.BaseTestCase):
brokers = ['host1', 'host2', 'host3', 'host4', 'host5']
brokers_count = len(brokers)
self.config(qpid_hosts=brokers)
self.config(qpid_hosts=brokers,
group='oslo_messaging_qpid')
with mock.patch('qpid.messaging.Connection') as conn_mock:
# starting from the first broker in the list
@ -777,7 +782,8 @@ class QPidHATestCase(test_utils.BaseTestCase):
self.config(qpid_hosts=self.brokers,
qpid_username=None,
qpid_password=None)
qpid_password=None,
group='oslo_messaging_qpid')
hostname_sets = set()
self.info = {'attempt': 0,

View File

@ -41,7 +41,7 @@ class TestDeprecatedRabbitDriverLoad(test_utils.BaseTestCase):
super(TestDeprecatedRabbitDriverLoad, self).setUp(
conf=cfg.ConfigOpts())
self.messaging_conf.transport_driver = 'rabbit'
self.config(fake_rabbit=True)
self.config(fake_rabbit=True, group="oslo_messaging_rabbit")
def test_driver_load(self):
transport = oslo_messaging.get_transport(self.conf)
@ -673,7 +673,8 @@ class RpcKombuHATestCase(test_utils.BaseTestCase):
self.config(rabbit_hosts=self.brokers,
rabbit_retry_interval=0.01,
rabbit_retry_backoff=0.01,
kombu_reconnect_delay=0)
kombu_reconnect_delay=0,
group="oslo_messaging_rabbit")
self.kombu_connect = mock.Mock()
self.useFixture(mockpatch.Patch(

View File

@ -29,13 +29,15 @@ class OptsTestCase(test_utils.BaseTestCase):
super(OptsTestCase, self).setUp()
def _test_list_opts(self, result):
self.assertEqual(4, len(result))
self.assertEqual(6, len(result))
groups = [g for (g, l) in result]
self.assertIn(None, groups)
self.assertIn('matchmaker_ring', groups)
self.assertIn('matchmaker_redis', groups)
self.assertIn('oslo_messaging_amqp', groups)
self.assertIn('oslo_messaging_rabbit', groups)
self.assertIn('oslo_messaging_qpid', groups)
opt_names = [o.name for (g, l) in result for o in l]
self.assertIn('rpc_backend', opt_names)

View File

@ -187,7 +187,8 @@ class TestQpidInvalidTopologyVersion(_QpidBaseTestCase):
def setUp(self):
super(TestQpidInvalidTopologyVersion, self).setUp()
self.config(qpid_topology_version=-1)
self.config(qpid_topology_version=-1,
group='oslo_messaging_qpid')
def test_invalid_topology_version(self):
def consumer_callback(msg):
@ -199,11 +200,11 @@ class TestQpidInvalidTopologyVersion(_QpidBaseTestCase):
# 1. qpid driver raises Exception(msg) for invalid topology version
# 2. flake8 - H202 assertRaises Exception too broad
exception_msg = ("Invalid value for qpid_topology_version: %d" %
self.conf.qpid_topology_version)
self.conf.oslo_messaging_qpid.qpid_topology_version)
recvd_exc_msg = ''
try:
self.consumer_cls(self.conf,
self.consumer_cls(self.conf.oslo_messaging_qpid,
self.session_receive,
msgid_or_topic,
consumer_callback,
@ -215,7 +216,7 @@ class TestQpidInvalidTopologyVersion(_QpidBaseTestCase):
recvd_exc_msg = ''
try:
self.publisher_cls(self.conf,
self.publisher_cls(self.conf.oslo_messaging_qpid,
self.session_send,
topic=msgid_or_topic,
**self.publisher_kwargs)
@ -258,13 +259,15 @@ class TestQpidDirectConsumerPublisher(_QpidBaseTestCase):
self.msgid = str(random.randint(1, 100))
# create a DirectConsumer and DirectPublisher class objects
self.dir_cons = qpid_driver.DirectConsumer(self.conf,
self.session_receive,
self.msgid,
self.consumer_callback)
self.dir_pub = qpid_driver.DirectPublisher(self.conf,
self.session_send,
self.msgid)
self.dir_cons = qpid_driver.DirectConsumer(
self.conf.oslo_messaging_qpid,
self.session_receive,
self.msgid,
self.consumer_callback)
self.dir_pub = qpid_driver.DirectPublisher(
self.conf.oslo_messaging_qpid,
self.session_send,
self.msgid)
def try_send_msg(no_msgs):
for i in range(no_msgs):
@ -418,7 +421,7 @@ class TestQpidTopicAndFanout(_QpidBaseTestCase):
def test_qpid_topic_and_fanout(self):
for receiver_id in range(self.no_receivers):
consumer = self.consumer_cls(self.conf,
consumer = self.consumer_cls(self.conf.oslo_messaging_qpid,
self.session_receive,
self.receive_topic,
self.consumer_callback,
@ -431,7 +434,7 @@ class TestQpidTopicAndFanout(_QpidBaseTestCase):
self._receiver_threads.append(thread)
for sender_id in range(self.no_senders):
publisher = self.publisher_cls(self.conf,
publisher = self.publisher_cls(self.conf.oslo_messaging_qpid,
self.session_send,
topic=self.topic,
**self.publisher_kwargs)
@ -483,7 +486,8 @@ class TestDriverInterface(_QpidBaseTestCase):
def setUp(self):
super(TestDriverInterface, self).setUp()
self.config(qpid_topology_version=2)
self.config(qpid_topology_version=2,
group='oslo_messaging_qpid')
transport = messaging.get_transport(self.conf)
self.driver = transport._driver
@ -554,7 +558,8 @@ class TestQpidReconnectOrder(test_utils.BaseTestCase):
brokers = ['host1', 'host2', 'host3', 'host4', 'host5']
brokers_count = len(brokers)
self.config(qpid_hosts=brokers)
self.config(qpid_hosts=brokers,
group='oslo_messaging_qpid')
with mock.patch('qpid.messaging.Connection') as conn_mock:
# starting from the first broker in the list
@ -777,7 +782,8 @@ class QPidHATestCase(test_utils.BaseTestCase):
self.config(qpid_hosts=self.brokers,
qpid_username=None,
qpid_password=None)
qpid_password=None,
group='oslo_messaging_qpid')
hostname_sets = set()
self.info = {'attempt': 0,

View File

@ -41,7 +41,7 @@ class TestDeprecatedRabbitDriverLoad(test_utils.BaseTestCase):
super(TestDeprecatedRabbitDriverLoad, self).setUp(
conf=cfg.ConfigOpts())
self.messaging_conf.transport_driver = 'rabbit'
self.config(fake_rabbit=True)
self.config(fake_rabbit=True, group="oslo_messaging_rabbit")
def test_driver_load(self):
transport = messaging.get_transport(self.conf)
@ -686,7 +686,8 @@ class RpcKombuHATestCase(test_utils.BaseTestCase):
self.config(rabbit_hosts=self.brokers,
rabbit_retry_interval=0.01,
rabbit_retry_backoff=0.01,
kombu_reconnect_delay=0)
kombu_reconnect_delay=0,
group="oslo_messaging_rabbit")
self.kombu_connect = mock.Mock()
self.useFixture(mockpatch.Patch(