diff --git a/doc/source/zmq_driver.rst b/doc/source/zmq_driver.rst index e73fdf9..bcc3d66 100644 --- a/doc/source/zmq_driver.rst +++ b/doc/source/zmq_driver.rst @@ -85,12 +85,14 @@ Configuration Enabling (mandatory) -------------------- -To enable the driver, in the section [DEFAULT] of the conf file, -the 'rpc_backend' flag must be set to 'zmq' and the 'rpc_zmq_host' flag +To enable the driver the 'transport_url' option must be set to 'zmq://' +in the section [DEFAULT] of the conf file, the 'rpc_zmq_host' flag must be set to the hostname of the current node. :: [DEFAULT] - rpc_backend = zmq + transport_url = "zmq://" + + [oslo_messaging_zmq] rpc_zmq_host = {hostname} @@ -110,27 +112,17 @@ RedisMatchMaker: loads the hash table from a remote Redis server, supports dynamic host/topic registrations, host expiration, and hooks for consuming applications to acknowledge or neg-acknowledge topic.host service availability. -To set the MatchMaker class, use option 'rpc_zmq_matchmaker' in [DEFAULT]. :: +For ZeroMQ driver Redis is configured in transport_url also. For using Redis +specify the URL as follows:: - rpc_zmq_matchmaker = dummy - -or:: - - rpc_zmq_matchmaker = redis - -To specify the Redis server for RedisMatchMaker, use options in -[matchmaker_redis] of each project. :: - - [matchmaker_redis] - host = 127.0.0.1 - port = 6379 + [DEFAULT] + transport_url = "zmq+redis://127.0.0.1:6379" In order to cleanup redis storage from expired records (e.g. target listener goes down) TTL may be applied for keys. Configure 'zmq_target_expire' option which is 120 (seconds) by default. The option is related not specifically to -redis so it is also defined in [DEFAULT] section. If option value is <= 0 -then keys don't expire and live forever in the storage. - +redis so it is also defined in [oslo_messaging_zmq] section. If option value +is <= 0 then keys don't expire and live forever in the storage. MatchMaker Data Source (mandatory) ---------------------------------- @@ -159,11 +151,10 @@ we use Sentinel solution and redis master-slave-slave configuration (if we have 3 controllers and run Redis on each of them). To deploy redis with HA follow the `sentinel-install`_ instructions. From the -messaging driver's side you will need to setup following configuration which -is different from a single-node redis deployment :: +messaging driver's side you will need to setup following configuration :: - [matchmaker_redis] - sentinel_hosts=host1:26379, host2:26379, host3:26379 + [DEFAULT] + transport_url = "zmq+redis://host1:26379,host2:26379,host3:26379" Restrict the number of TCP sockets on controller @@ -174,7 +165,7 @@ controller node in directly connected configuration. To solve the issue ROUTER proxy may be used. In order to configure driver to use ROUTER proxy set up the 'use_router_proxy' -option to true in [DEFAULT] section (false is set by default). +option to true in [oslo_messaging_zmq] section (false is set by default). For example:: @@ -198,7 +189,7 @@ direct DEALER/ROUTER unicast which is possible but less efficient and therefore is not recommended. In a case of direct DEALER/ROUTER unicast proxy is not needed. -This option can be set in [DEFAULT] section. +This option can be set in [oslo_messaging_zmq] section. For example:: @@ -218,7 +209,7 @@ All services bind to an IP address or Ethernet adapter. By default, all services bind to '*', effectively binding to 0.0.0.0. This may be changed with the option 'rpc_zmq_bind_address' which accepts a wildcard, IP address, or Ethernet adapter. -This configuration can be set in [DEFAULT] section. +This configuration can be set in [oslo_messaging_zmq] section. For example:: diff --git a/oslo_messaging/_cmd/zmq_proxy.py b/oslo_messaging/_cmd/zmq_proxy.py index a76b0b5..3126a41 100644 --- a/oslo_messaging/_cmd/zmq_proxy.py +++ b/oslo_messaging/_cmd/zmq_proxy.py @@ -17,12 +17,13 @@ import logging from oslo_config import cfg -from oslo_messaging._drivers import impl_zmq from oslo_messaging._drivers.zmq_driver.proxy import zmq_proxy from oslo_messaging._drivers.zmq_driver.proxy import zmq_queue_proxy +from oslo_messaging._drivers.zmq_driver import zmq_options CONF = cfg.CONF -CONF.register_opts(impl_zmq.zmq_opts) + +zmq_options.register_opts(CONF) opt_group = cfg.OptGroup(name='zmq_proxy_opts', title='ZeroMQ proxy options') diff --git a/oslo_messaging/_drivers/impl_pika.py b/oslo_messaging/_drivers/impl_pika.py old mode 100755 new mode 100644 diff --git a/oslo_messaging/_drivers/impl_zmq.py b/oslo_messaging/_drivers/impl_zmq.py index 5636d01..90c2c20 100644 --- a/oslo_messaging/_drivers/impl_zmq.py +++ b/oslo_messaging/_drivers/impl_zmq.py @@ -14,10 +14,8 @@ import logging import os -import socket import threading -from oslo_config import cfg from stevedore import driver from oslo_messaging._drivers import base @@ -25,90 +23,14 @@ from oslo_messaging._drivers import common as rpc_common from oslo_messaging._drivers.zmq_driver.client import zmq_client from oslo_messaging._drivers.zmq_driver.server import zmq_server from oslo_messaging._drivers.zmq_driver import zmq_async +from oslo_messaging._drivers.zmq_driver import zmq_options from oslo_messaging._i18n import _LE -from oslo_messaging import server RPCException = rpc_common.RPCException -_MATCHMAKER_BACKENDS = ('redis', 'dummy') -_MATCHMAKER_DEFAULT = 'redis' LOG = logging.getLogger(__name__) -zmq_opts = [ - cfg.StrOpt('rpc_zmq_bind_address', default='*', - help='ZeroMQ bind address. Should be a wildcard (*), ' - 'an ethernet interface, or IP. ' - 'The "host" option should point or resolve to this ' - 'address.'), - - cfg.StrOpt('rpc_zmq_matchmaker', default=_MATCHMAKER_DEFAULT, - choices=_MATCHMAKER_BACKENDS, - help='MatchMaker driver.'), - - cfg.IntOpt('rpc_zmq_contexts', default=1, - help='Number of ZeroMQ contexts, defaults to 1.'), - - cfg.IntOpt('rpc_zmq_topic_backlog', - help='Maximum number of ingress messages to locally buffer ' - 'per topic. Default is unlimited.'), - - cfg.StrOpt('rpc_zmq_ipc_dir', default='/var/run/openstack', - help='Directory for holding IPC sockets.'), - - cfg.StrOpt('rpc_zmq_host', default=socket.gethostname(), - sample_default='localhost', - help='Name of this node. Must be a valid hostname, FQDN, or ' - 'IP address. Must match "host" option, if running Nova.'), - - cfg.IntOpt('rpc_cast_timeout', default=-1, - help='Seconds to wait before a cast expires (TTL). ' - 'The default value of -1 specifies an infinite linger ' - 'period. The value of 0 specifies no linger period. ' - 'Pending messages shall be discarded immediately ' - 'when the socket is closed. Only supported by impl_zmq.'), - - cfg.IntOpt('rpc_poll_timeout', default=1, - help='The default number of seconds that poll should wait. ' - 'Poll raises timeout exception when timeout expired.'), - - cfg.IntOpt('zmq_target_expire', default=300, - help='Expiration timeout in seconds of a name service record ' - 'about existing target ( < 0 means no timeout).'), - - cfg.IntOpt('zmq_target_update', default=180, - help='Update period in seconds of a name service record ' - 'about existing target.'), - - cfg.BoolOpt('use_pub_sub', default=True, - help='Use PUB/SUB pattern for fanout methods. ' - 'PUB/SUB always uses proxy.'), - - cfg.BoolOpt('use_router_proxy', default=True, - help='Use ROUTER remote proxy.'), - - cfg.PortOpt('rpc_zmq_min_port', - default=49153, - help='Minimal port number for random ports range.'), - - cfg.IntOpt('rpc_zmq_max_port', - min=1, - max=65536, - default=65536, - help='Maximal port number for random ports range.'), - - cfg.IntOpt('rpc_zmq_bind_port_retries', - default=100, - help='Number of retries to find free port number before ' - 'fail with ZMQBindError.'), - - cfg.StrOpt('rpc_zmq_serialization', default='json', - choices=('json', 'msgpack'), - help='Default serialization mechanism for ' - 'serializing/deserializing outgoing/incoming messages') -] - - class LazyDriverItem(object): def __init__(self, item_cls, *args, **kwargs): @@ -174,9 +96,7 @@ class ZmqDriver(base.BaseDriver): if zmq is None: raise ImportError(_LE("ZeroMQ is not available!")) - conf.register_opts(zmq_opts) - conf.register_opts(server._pool_opts) - conf.register_opts(base.base_opts) + zmq_options.register_opts(conf) self.conf = conf self.allowed_remote_exmods = allowed_remote_exmods @@ -186,9 +106,11 @@ class ZmqDriver(base.BaseDriver): ).driver(self.conf, url=url) client_cls = zmq_client.ZmqClientProxy - if conf.use_pub_sub and not conf.use_router_proxy: + if conf.oslo_messaging_zmq.use_pub_sub and not \ + conf.oslo_messaging_zmq.use_router_proxy: client_cls = zmq_client.ZmqClientMixDirectPubSub - elif not conf.use_pub_sub and not conf.use_router_proxy: + elif not conf.oslo_messaging_zmq.use_pub_sub and not \ + conf.oslo_messaging_zmq.use_router_proxy: client_cls = zmq_client.ZmqClientDirect self.client = LazyDriverItem( @@ -206,13 +128,13 @@ class ZmqDriver(base.BaseDriver): zmq_transport, p, matchmaker_backend = url.transport.partition('+') assert zmq_transport == 'zmq', "Needs to be zmq for this transport!" if not matchmaker_backend: - return self.conf.rpc_zmq_matchmaker - elif matchmaker_backend not in _MATCHMAKER_BACKENDS: + return self.conf.oslo_messaging_zmq.rpc_zmq_matchmaker + elif matchmaker_backend not in zmq_options.MATCHMAKER_BACKENDS: raise rpc_common.RPCException( _LE("Incorrect matchmaker backend name %(backend_name)s!" "Available names are: %(available_names)s") % {"backend_name": matchmaker_backend, - "available_names": _MATCHMAKER_BACKENDS}) + "available_names": zmq_options.MATCHMAKER_BACKENDS}) return matchmaker_backend def send(self, target, ctxt, message, wait_for_reply=None, timeout=None, diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py index 29dd3fc..fb10ce7 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py @@ -63,7 +63,7 @@ class DealerPublisherProxy(zmq_dealer_publisher_base.DealerPublisherBase): else: return \ [zmq_address.target_to_subscribe_filter(request.target)] \ - if self.conf.use_pub_sub else \ + if self.conf.oslo_messaging_zmq.use_pub_sub else \ self.routing_table.get_all_hosts(request.target) except retrying.RetryError: return [] diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py index e7362e2..0ec27e9 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py +++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py @@ -39,7 +39,8 @@ class ZmqClientMixDirectPubSub(zmq_client_base.ZmqClientBase): def __init__(self, conf, matchmaker=None, allowed_remote_exmods=None): - if conf.use_router_proxy or not conf.use_pub_sub: + if conf.oslo_messaging_zmq.use_router_proxy or not \ + conf.oslo_messaging_zmq.use_pub_sub: raise WrongClientException() publisher_direct = \ @@ -68,7 +69,8 @@ class ZmqClientDirect(zmq_client_base.ZmqClientBase): def __init__(self, conf, matchmaker=None, allowed_remote_exmods=None): - if conf.use_pub_sub or conf.use_router_proxy: + if conf.oslo_messaging_zmq.use_pub_sub or \ + conf.oslo_messaging_zmq.use_router_proxy: raise WrongClientException() publisher = \ @@ -92,7 +94,7 @@ class ZmqClientProxy(zmq_client_base.ZmqClientBase): def __init__(self, conf, matchmaker=None, allowed_remote_exmods=None): - if not conf.use_router_proxy: + if not conf.oslo_messaging_zmq.use_router_proxy: raise WrongClientException() publisher = \ diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_receivers.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_receivers.py index 63c683f..96ebead 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/zmq_receivers.py +++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_receivers.py @@ -87,7 +87,8 @@ class ReceiverBase(object): return self._requests.pop((message_id, message_type), None) def _run_loop(self): - data, socket = self._poller.poll(timeout=self.conf.rpc_poll_timeout) + data, socket = self._poller.poll( + timeout=self.conf.oslo_messaging_zmq.rpc_poll_timeout) if data is None: return reply_id, message_type, message_id, response = data diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_routing_table.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_routing_table.py index 2abb21b..16de0bc 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/zmq_routing_table.py +++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_routing_table.py @@ -46,7 +46,8 @@ class RoutingTable(object): return host def _is_tm_expired(self, tm): - return 0 <= self.conf.zmq_target_expire <= time.time() - tm + return 0 <= self.conf.oslo_messaging_zmq.zmq_target_expire \ + <= time.time() - tm def _update_routing_table(self, target): routing_record = self.routing_table.get(str(target)) diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_sockets_manager.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_sockets_manager.py index 890d5a1..aa82b84 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/zmq_sockets_manager.py +++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_sockets_manager.py @@ -57,7 +57,8 @@ class SocketsManager(object): def _check_for_new_hosts(self, target): key = self._key_from_target(target) socket, tm = self.outbound_sockets[key] - if 0 <= self.conf.zmq_target_expire <= time.time() - tm: + if 0 <= self.conf.oslo_messaging_zmq.zmq_target_expire \ + <= time.time() - tm: self._get_hosts_and_connect(socket, target) return socket diff --git a/oslo_messaging/_drivers/zmq_driver/proxy/zmq_proxy.py b/oslo_messaging/_drivers/zmq_driver/proxy/zmq_proxy.py index b35a7f9..15c7774 100644 --- a/oslo_messaging/_drivers/zmq_driver/proxy/zmq_proxy.py +++ b/oslo_messaging/_drivers/zmq_driver/proxy/zmq_proxy.py @@ -85,7 +85,7 @@ class ZmqProxy(object): self.conf = conf self.matchmaker = driver.DriverManager( 'oslo.messaging.zmq.matchmaker', - self.conf.rpc_zmq_matchmaker, + self.conf.oslo_messaging_zmq.rpc_zmq_matchmaker, ).driver(self.conf) self.context = zmq.Context() self.proxy = proxy_cls(conf, self.context, self.matchmaker) diff --git a/oslo_messaging/_drivers/zmq_driver/proxy/zmq_queue_proxy.py b/oslo_messaging/_drivers/zmq_driver/proxy/zmq_queue_proxy.py index 39de566..4c747ab 100644 --- a/oslo_messaging/_drivers/zmq_driver/proxy/zmq_queue_proxy.py +++ b/oslo_messaging/_drivers/zmq_driver/proxy/zmq_queue_proxy.py @@ -68,8 +68,9 @@ class UniversalQueueProxy(object): return msg_type = message[0] - if self.conf.use_pub_sub and msg_type in (zmq_names.CAST_FANOUT_TYPE, - zmq_names.NOTIFY_TYPE): + if self.conf.oslo_messaging_zmq.use_pub_sub and \ + msg_type in (zmq_names.CAST_FANOUT_TYPE, + zmq_names.NOTIFY_TYPE): self.pub_publisher.send_request(message) else: self._redirect_message(self.be_router_socket.handle @@ -133,12 +134,13 @@ class RouterUpdater(zmq_updater.UpdaterBase): def _update_records(self): self.matchmaker.register_publisher( (self.publisher_address, self.fe_router_address), - expire=self.conf.zmq_target_expire) + expire=self.conf.oslo_messaging_zmq.zmq_target_expire) LOG.info(_LI("[PUB:%(pub)s, ROUTER:%(router)s] Update PUB publisher"), {"pub": self.publisher_address, "router": self.fe_router_address}) - self.matchmaker.register_router(self.be_router_address, - expire=self.conf.zmq_target_expire) + self.matchmaker.register_router( + self.be_router_address, + expire=self.conf.oslo_messaging_zmq.zmq_target_expire) LOG.info(_LI("[Backend ROUTER:%(router)s] Update ROUTER"), {"router": self.be_router_address}) diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_consumer_base.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_consumer_base.py index d413a98..69a7077 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_consumer_base.py +++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_consumer_base.py @@ -79,8 +79,8 @@ class SingleSocketConsumer(ConsumerBase): {"stype": zmq_names.socket_type_str(socket_type), "addr": socket.bind_address, "port": socket.port}) - self.host = zmq_address.combine_address(self.conf.rpc_zmq_host, - socket.port) + self.host = zmq_address.combine_address( + self.conf.oslo_messaging_zmq.rpc_zmq_host, socket.port) self.poller.register(socket, self.receive_message) return socket except zmq.ZMQError as e: @@ -119,7 +119,7 @@ class TargetUpdater(zmq_updater.UpdaterBase): self.matchmaker.register( self.target, self.host, zmq_names.socket_type_str(self.socket_type), - expire=self.conf.zmq_target_expire) + expire=self.conf.oslo_messaging_zmq.zmq_target_expire) def stop(self): super(TargetUpdater, self).stop() diff --git a/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py b/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py index fa7b0bc..b40bdc0 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py +++ b/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py @@ -41,11 +41,14 @@ class ZmqServer(base.PollStyleListener): self.poller = poller or zmq_async.get_poller() self.router_consumer = zmq_router_consumer.RouterConsumer( - conf, self.poller, self) if not conf.use_router_proxy else None + conf, self.poller, self) \ + if not conf.oslo_messaging_zmq.use_router_proxy else None self.dealer_consumer = zmq_dealer_consumer.DealerConsumer( - conf, self.poller, self) if conf.use_router_proxy else None + conf, self.poller, self) \ + if conf.oslo_messaging_zmq.use_router_proxy else None self.sub_consumer = zmq_sub_consumer.SubConsumer( - conf, self.poller, self) if conf.use_pub_sub else None + conf, self.poller, self) \ + if conf.oslo_messaging_zmq.use_pub_sub else None self.consumers = [] if self.router_consumer is not None: @@ -58,7 +61,7 @@ class ZmqServer(base.PollStyleListener): @base.batch_poll_helper def poll(self, timeout=None): message, socket = self.poller.poll( - timeout or self.conf.rpc_poll_timeout) + timeout or self.conf.oslo_messaging_zmq.rpc_poll_timeout) return message def stop(self): @@ -94,7 +97,7 @@ class ZmqNotificationServer(base.PollStyleListener): @base.batch_poll_helper def poll(self, timeout=None): message, socket = self.poller.poll( - timeout or self.conf.rpc_poll_timeout) + timeout or self.conf.oslo_messaging_zmq.rpc_poll_timeout) return message def stop(self): diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_address.py b/oslo_messaging/_drivers/zmq_driver/zmq_address.py index b33c288..0175e7e 100644 --- a/oslo_messaging/_drivers/zmq_driver/zmq_address.py +++ b/oslo_messaging/_drivers/zmq_driver/zmq_address.py @@ -24,11 +24,11 @@ def get_tcp_direct_address(host): def get_tcp_random_address(conf): - return "tcp://%s" % conf.rpc_zmq_bind_address + return "tcp://%s" % conf.oslo_messaging_zmq.rpc_zmq_bind_address def get_broker_address(conf): - return "ipc://%s/zmq-broker" % conf.rpc_zmq_ipc_dir + return "ipc://%s/zmq-broker" % conf.oslo_messaging_zmq.rpc_zmq_ipc_dir def prefix_str(key, listener_type): diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_options.py b/oslo_messaging/_drivers/zmq_driver/zmq_options.py new file mode 100644 index 0000000..2ac76f9 --- /dev/null +++ b/oslo_messaging/_drivers/zmq_driver/zmq_options.py @@ -0,0 +1,122 @@ +# Copyright 2016 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import socket + +from oslo_config import cfg + +from oslo_messaging._drivers import base +from oslo_messaging import server + + +MATCHMAKER_BACKENDS = ('redis', 'dummy') +MATCHMAKER_DEFAULT = 'redis' + + +zmq_opts = [ + cfg.StrOpt('rpc_zmq_bind_address', default='*', + deprecated_group='DEFAULT', + help='ZeroMQ bind address. Should be a wildcard (*), ' + 'an ethernet interface, or IP. ' + 'The "host" option should point or resolve to this ' + 'address.'), + + cfg.StrOpt('rpc_zmq_matchmaker', default=MATCHMAKER_DEFAULT, + choices=MATCHMAKER_BACKENDS, + deprecated_group='DEFAULT', + help='MatchMaker driver.'), + + cfg.IntOpt('rpc_zmq_contexts', default=1, + deprecated_group='DEFAULT', + help='Number of ZeroMQ contexts, defaults to 1.'), + + cfg.IntOpt('rpc_zmq_topic_backlog', + deprecated_group='DEFAULT', + help='Maximum number of ingress messages to locally buffer ' + 'per topic. Default is unlimited.'), + + cfg.StrOpt('rpc_zmq_ipc_dir', default='/var/run/openstack', + deprecated_group='DEFAULT', + help='Directory for holding IPC sockets.'), + + cfg.StrOpt('rpc_zmq_host', default=socket.gethostname(), + sample_default='localhost', + deprecated_group='DEFAULT', + help='Name of this node. Must be a valid hostname, FQDN, or ' + 'IP address. Must match "host" option, if running Nova.'), + + cfg.IntOpt('rpc_cast_timeout', default=-1, + deprecated_group='DEFAULT', + help='Seconds to wait before a cast expires (TTL). ' + 'The default value of -1 specifies an infinite linger ' + 'period. The value of 0 specifies no linger period. ' + 'Pending messages shall be discarded immediately ' + 'when the socket is closed. Only supported by impl_zmq.'), + + cfg.IntOpt('rpc_poll_timeout', default=1, + deprecated_group='DEFAULT', + help='The default number of seconds that poll should wait. ' + 'Poll raises timeout exception when timeout expired.'), + + cfg.IntOpt('zmq_target_expire', default=300, + deprecated_group='DEFAULT', + help='Expiration timeout in seconds of a name service record ' + 'about existing target ( < 0 means no timeout).'), + + cfg.IntOpt('zmq_target_update', default=180, + deprecated_group='DEFAULT', + help='Update period in seconds of a name service record ' + 'about existing target.'), + + cfg.BoolOpt('use_pub_sub', default=True, + deprecated_group='DEFAULT', + help='Use PUB/SUB pattern for fanout methods. ' + 'PUB/SUB always uses proxy.'), + + cfg.BoolOpt('use_router_proxy', default=True, + deprecated_group='DEFAULT', + help='Use ROUTER remote proxy.'), + + cfg.PortOpt('rpc_zmq_min_port', + default=49153, + deprecated_group='DEFAULT', + help='Minimal port number for random ports range.'), + + cfg.IntOpt('rpc_zmq_max_port', + min=1, + max=65536, + default=65536, + deprecated_group='DEFAULT', + help='Maximal port number for random ports range.'), + + cfg.IntOpt('rpc_zmq_bind_port_retries', + default=100, + deprecated_group='DEFAULT', + help='Number of retries to find free port number before ' + 'fail with ZMQBindError.'), + + cfg.StrOpt('rpc_zmq_serialization', default='json', + choices=('json', 'msgpack'), + deprecated_group='DEFAULT', + help='Default serialization mechanism for ' + 'serializing/deserializing outgoing/incoming messages') +] + + +def register_opts(conf): + opt_group = cfg.OptGroup(name='oslo_messaging_zmq', + title='ZeroMQ driver options') + conf.register_opts(zmq_opts, group=opt_group) + conf.register_opts(server._pool_opts) + conf.register_opts(base.base_opts) diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_socket.py b/oslo_messaging/_drivers/zmq_driver/zmq_socket.py index 14061b2..285eafa 100644 --- a/oslo_messaging/_drivers/zmq_driver/zmq_socket.py +++ b/oslo_messaging/_drivers/zmq_driver/zmq_socket.py @@ -47,8 +47,9 @@ class ZmqSocket(object): self.handle.set_hwm(high_watermark) self.close_linger = -1 - if self.conf.rpc_cast_timeout > 0: - self.close_linger = self.conf.rpc_cast_timeout * 1000 + if self.conf.oslo_messaging_zmq.rpc_cast_timeout > 0: + self.close_linger = \ + self.conf.oslo_messaging_zmq.rpc_cast_timeout * 1000 self.handle.setsockopt(zmq.LINGER, self.close_linger) # Put messages to only connected queues self.handle.setsockopt(zmq.IMMEDIATE, 1 if immediate else 0) @@ -96,8 +97,9 @@ class ZmqSocket(object): self.handle.send_multipart(*args, **kwargs) def send_dumped(self, obj, *args, **kwargs): - serialization = kwargs.pop('serialization', - self.conf.rpc_zmq_serialization) + serialization = kwargs.pop( + 'serialization', + self.conf.oslo_messaging_zmq.rpc_zmq_serialization) serializer = self._get_serializer(serialization) s = serializer.dump_as_bytes(obj) self.handle.send(s, *args, **kwargs) @@ -118,8 +120,9 @@ class ZmqSocket(object): return self.handle.recv_multipart(*args, **kwargs) def recv_loaded(self, *args, **kwargs): - serialization = kwargs.pop('serialization', - self.conf.rpc_zmq_serialization) + serialization = kwargs.pop( + 'serialization', + self.conf.oslo_messaging_zmq.rpc_zmq_serialization) serializer = self._get_serializer(serialization) s = self.handle.recv(*args, **kwargs) obj = serializer.load_from_bytes(s) @@ -170,13 +173,13 @@ class ZmqRandomPortSocket(ZmqSocket): high_watermark=high_watermark) self.bind_address = zmq_address.get_tcp_random_address(self.conf) if host is None: - host = conf.rpc_zmq_host + host = conf.oslo_messaging_zmq.rpc_zmq_host try: self.port = self.handle.bind_to_random_port( self.bind_address, - min_port=conf.rpc_zmq_min_port, - max_port=conf.rpc_zmq_max_port, - max_tries=conf.rpc_zmq_bind_port_retries) + min_port=conf.oslo_messaging_zmq.rpc_zmq_min_port, + max_port=conf.oslo_messaging_zmq.rpc_zmq_max_port, + max_tries=conf.oslo_messaging_zmq.rpc_zmq_bind_port_retries) self.connect_address = zmq_address.combine_address(host, self.port) except zmq.ZMQBindError: LOG.error(_LE("Random ports range exceeded!")) @@ -192,7 +195,8 @@ class ZmqFixedPortSocket(ZmqSocket): high_watermark=high_watermark) self.connect_address = zmq_address.combine_address(host, port) self.bind_address = zmq_address.get_tcp_direct_address( - zmq_address.combine_address(conf.rpc_zmq_bind_address, port)) + zmq_address.combine_address( + conf.oslo_messaging_zmq.rpc_zmq_bind_address, port)) self.host = host self.port = port diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_updater.py b/oslo_messaging/_drivers/zmq_driver/zmq_updater.py index 302915d..2d4f9e0 100644 --- a/oslo_messaging/_drivers/zmq_driver/zmq_updater.py +++ b/oslo_messaging/_drivers/zmq_driver/zmq_updater.py @@ -41,7 +41,7 @@ class UpdaterBase(object): def _update_loop(self): self.update_method() - time.sleep(self.conf.zmq_target_update) + time.sleep(self.conf.oslo_messaging_zmq.zmq_target_update) def cleanup(self): self.executor.stop() diff --git a/oslo_messaging/conffixture.py b/oslo_messaging/conffixture.py index 4e6c9d5..5eb4e5e 100644 --- a/oslo_messaging/conffixture.py +++ b/oslo_messaging/conffixture.py @@ -58,7 +58,8 @@ class ConfFixture(fixtures.Fixture): 'oslo_messaging._drivers.amqp1_driver.opts', 'amqp1_opts', 'oslo_messaging_amqp') _import_opts(self.conf, - 'oslo_messaging._drivers.impl_zmq', 'zmq_opts') + 'oslo_messaging._drivers.zmq_driver.zmq_options', + 'zmq_opts', 'oslo_messaging_zmq') _import_opts(self.conf, 'oslo_messaging._drivers.zmq_driver.' 'matchmaker.matchmaker_redis', diff --git a/oslo_messaging/opts.py b/oslo_messaging/opts.py index b04768a..c252496 100644 --- a/oslo_messaging/opts.py +++ b/oslo_messaging/opts.py @@ -25,7 +25,7 @@ from oslo_messaging._drivers.amqp1_driver import opts as amqp_opts from oslo_messaging._drivers import base as drivers_base from oslo_messaging._drivers import impl_pika from oslo_messaging._drivers import impl_rabbit -from oslo_messaging._drivers import impl_zmq +from oslo_messaging._drivers.impl_zmq import zmq_options from oslo_messaging._drivers.pika_driver import pika_connection_factory from oslo_messaging._drivers.zmq_driver.matchmaker import matchmaker_redis from oslo_messaging.notify import notifier @@ -36,7 +36,7 @@ from oslo_messaging import transport _global_opt_lists = [ drivers_base.base_opts, - impl_zmq.zmq_opts, + zmq_options.zmq_opts, server._pool_opts, client._client_opts, transport._transport_opts, @@ -45,6 +45,7 @@ _global_opt_lists = [ _opts = [ (None, list(itertools.chain(*_global_opt_lists))), ('matchmaker_redis', matchmaker_redis.matchmaker_redis_opts), + ('oslo_messaging_zmq', zmq_options.zmq_opts), ('oslo_messaging_amqp', amqp_opts.amqp1_opts), ('oslo_messaging_notifications', notifier._notifier_opts), ('oslo_messaging_rabbit', list( diff --git a/oslo_messaging/tests/drivers/zmq/test_impl_zmq.py b/oslo_messaging/tests/drivers/zmq/test_impl_zmq.py index 76b61cf..04d86d9 100644 --- a/oslo_messaging/tests/drivers/zmq/test_impl_zmq.py +++ b/oslo_messaging/tests/drivers/zmq/test_impl_zmq.py @@ -35,7 +35,7 @@ class ZmqTestPortsRange(zmq_common.ZmqBaseTestCase): # Set config values kwargs = {'rpc_zmq_min_port': 5555, 'rpc_zmq_max_port': 5560} - self.config(**kwargs) + self.config(group='oslo_messaging_zmq', **kwargs) def test_ports_range(self): listeners = [] diff --git a/oslo_messaging/tests/drivers/zmq/test_pub_sub.py b/oslo_messaging/tests/drivers/zmq/test_pub_sub.py index 02519de..2973521 100644 --- a/oslo_messaging/tests/drivers/zmq/test_pub_sub.py +++ b/oslo_messaging/tests/drivers/zmq/test_pub_sub.py @@ -54,7 +54,7 @@ class TestPubSub(zmq_common.ZmqBaseTestCase): kwargs = {'use_pub_sub': True, 'rpc_zmq_serialization': self.serialization} - self.config(**kwargs) + self.config(group='oslo_messaging_zmq', **kwargs) self.config(host="127.0.0.1", group="zmq_proxy_opts") self.config(publisher_port="0", group="zmq_proxy_opts") diff --git a/oslo_messaging/tests/drivers/zmq/zmq_common.py b/oslo_messaging/tests/drivers/zmq/zmq_common.py index f0ef4a4..ff48bfb 100644 --- a/oslo_messaging/tests/drivers/zmq/zmq_common.py +++ b/oslo_messaging/tests/drivers/zmq/zmq_common.py @@ -20,6 +20,7 @@ import testtools import oslo_messaging from oslo_messaging._drivers.zmq_driver import zmq_async +from oslo_messaging._drivers.zmq_driver import zmq_options from oslo_messaging._i18n import _LE from oslo_messaging.tests import utils as test_utils @@ -71,17 +72,18 @@ class ZmqBaseTestCase(test_utils.BaseTestCase): def setUp(self): super(ZmqBaseTestCase, self).setUp() self.messaging_conf.transport_driver = 'zmq' + zmq_options.register_opts(self.conf) # Set config values self.internal_ipc_dir = self.useFixture(fixtures.TempDir()).path kwargs = {'rpc_zmq_bind_address': '127.0.0.1', 'rpc_zmq_host': '127.0.0.1', - 'rpc_response_timeout': 5, 'rpc_zmq_ipc_dir': self.internal_ipc_dir, 'use_pub_sub': False, 'use_router_proxy': False, 'rpc_zmq_matchmaker': 'dummy'} - self.config(**kwargs) + self.config(group='oslo_messaging_zmq', **kwargs) + self.config(rpc_response_timeout=5) # Get driver transport = oslo_messaging.get_transport(self.conf) diff --git a/oslo_messaging/tests/functional/utils.py b/oslo_messaging/tests/functional/utils.py index 0dcc047..1d215a5 100644 --- a/oslo_messaging/tests/functional/utils.py +++ b/oslo_messaging/tests/functional/utils.py @@ -293,10 +293,12 @@ class SkipIfNoTransportURL(test_utils.BaseTestCase): zmq_matchmaker = os.environ.get('ZMQ_MATCHMAKER') if zmq_matchmaker: - self.config(rpc_zmq_matchmaker=zmq_matchmaker) + self.config(rpc_zmq_matchmaker=zmq_matchmaker, + group="oslo_messaging_zmq") zmq_ipc_dir = os.environ.get('ZMQ_IPC_DIR') if zmq_ipc_dir: - self.config(rpc_zmq_ipc_dir=zmq_ipc_dir) + self.config(group="oslo_messaging_zmq", + rpc_zmq_ipc_dir=zmq_ipc_dir) zmq_redis_port = os.environ.get('ZMQ_REDIS_PORT') if zmq_redis_port: self.config(port=zmq_redis_port, group="matchmaker_redis") @@ -304,10 +306,12 @@ class SkipIfNoTransportURL(test_utils.BaseTestCase): self.config(wait_timeout=1000, group="matchmaker_redis") zmq_use_pub_sub = os.environ.get('ZMQ_USE_PUB_SUB') if zmq_use_pub_sub: - self.config(use_pub_sub=zmq_use_pub_sub) + self.config(use_pub_sub=zmq_use_pub_sub, + group='oslo_messaging_zmq') zmq_use_router_proxy = os.environ.get('ZMQ_USE_ROUTER_PROXY') if zmq_use_router_proxy: - self.config(use_router_proxy=zmq_use_router_proxy) + self.config(use_router_proxy=zmq_use_router_proxy, + group='oslo_messaging_zmq') class NotificationFixture(fixtures.Fixture): diff --git a/oslo_messaging/tests/functional/zmq/multiproc_utils.py b/oslo_messaging/tests/functional/zmq/multiproc_utils.py index ee9f56e..4a1498a 100644 --- a/oslo_messaging/tests/functional/zmq/multiproc_utils.py +++ b/oslo_messaging/tests/functional/zmq/multiproc_utils.py @@ -70,7 +70,8 @@ def listener_configurer(conf): '%(levelname)-8s %(message)s') h.setFormatter(f) root.addHandler(h) - log_path = conf.rpc_zmq_ipc_dir + "/" + "zmq_multiproc.log" + log_path = conf.oslo_messaging_zmq.rpc_zmq_ipc_dir + \ + "/" + "zmq_multiproc.log" file_handler = logging.StreamHandler(open(log_path, 'w')) file_handler.setFormatter(f) root.addHandler(file_handler) diff --git a/oslo_messaging/tests/functional/zmq/test_startup.py b/oslo_messaging/tests/functional/zmq/test_startup.py index ea287b3..f1b89b0 100644 --- a/oslo_messaging/tests/functional/zmq/test_startup.py +++ b/oslo_messaging/tests/functional/zmq/test_startup.py @@ -30,10 +30,10 @@ class StartupOrderTestCase(multiproc_utils.MutliprocTestCase): self.conf.prog = "test_prog" self.conf.project = "test_project" - kwargs = {'rpc_response_timeout': 30} - self.config(**kwargs) + self.config(rpc_response_timeout=30) - log_path = self.conf.rpc_zmq_ipc_dir + "/" + str(os.getpid()) + ".log" + log_path = os.path.join(self.conf.oslo_messaging_zmq.rpc_zmq_ipc_dir, + str(os.getpid()) + ".log") sys.stdout = open(log_path, "w", buffering=0) def test_call_server_before_client(self): diff --git a/oslo_messaging/tests/test_opts.py b/oslo_messaging/tests/test_opts.py index 2ca8f8a..0e4b1f8 100644 --- a/oslo_messaging/tests/test_opts.py +++ b/oslo_messaging/tests/test_opts.py @@ -32,11 +32,12 @@ class OptsTestCase(test_utils.BaseTestCase): super(OptsTestCase, self).setUp() def _test_list_opts(self, result): - self.assertEqual(5, len(result)) + self.assertEqual(6, len(result)) groups = [g for (g, l) in result] self.assertIn(None, groups) self.assertIn('matchmaker_redis', groups) + self.assertIn('oslo_messaging_zmq', groups) self.assertIn('oslo_messaging_amqp', groups) self.assertIn('oslo_messaging_notifications', groups) self.assertIn('oslo_messaging_rabbit', groups) diff --git a/setup-test-env-zmq-proxy.sh b/setup-test-env-zmq-proxy.sh index ebce12c..12649c8 100755 --- a/setup-test-env-zmq-proxy.sh +++ b/setup-test-env-zmq-proxy.sh @@ -18,6 +18,7 @@ export ZMQ_PROXY_HOST=127.0.0.1 cat > ${DATADIR}/zmq.conf < ${DATADIR}/zmq.conf < ${DATADIR}/zmq.conf <