diff --git a/.gitignore b/.gitignore index 215d3f3cc..874bacdb3 100644 --- a/.gitignore +++ b/.gitignore @@ -14,3 +14,4 @@ doc/build/ doc/source/api/ dist/ .testrepository/ +releasenotes/build diff --git a/doc/source/AMQP1.0.rst b/doc/source/AMQP1.0.rst index 1483d321c..31c5cc351 100644 --- a/doc/source/AMQP1.0.rst +++ b/doc/source/AMQP1.0.rst @@ -120,7 +120,7 @@ The new driver is selected by specifying **amqp** as the transport name. For example:: from oslo import messaging - from olso.config import cfg + from oslo.config import cfg amqp_transport = messaging.get_transport(cfg.CONF, "amqp://me:passwd@host:5672") diff --git a/oslo_messaging/_drivers/protocols/__init__.py b/oslo_messaging/_drivers/amqp1_driver/__init__.py similarity index 100% rename from oslo_messaging/_drivers/protocols/__init__.py rename to oslo_messaging/_drivers/amqp1_driver/__init__.py diff --git a/oslo_messaging/_drivers/protocols/amqp/controller.py b/oslo_messaging/_drivers/amqp1_driver/controller.py similarity index 99% rename from oslo_messaging/_drivers/protocols/amqp/controller.py rename to oslo_messaging/_drivers/amqp1_driver/controller.py index 2c115a865..440939c56 100644 --- a/oslo_messaging/_drivers/protocols/amqp/controller.py +++ b/oslo_messaging/_drivers/amqp1_driver/controller.py @@ -26,6 +26,7 @@ functions scheduled by the Controller. import abc import logging +import random import threading import uuid @@ -34,8 +35,8 @@ import proton import pyngus from six import moves -from oslo_messaging._drivers.protocols.amqp import eventloop -from oslo_messaging._drivers.protocols.amqp import opts +from oslo_messaging._drivers.amqp1_driver import eventloop +from oslo_messaging._drivers.amqp1_driver import opts from oslo_messaging._i18n import _LE, _LI, _LW from oslo_messaging import exceptions from oslo_messaging import transport @@ -298,7 +299,7 @@ class Hosts(object): entry.port = entry.port or 5672 entry.username = entry.username or default_username entry.password = entry.password or default_password - self._current = 0 + self._current = random.randint(0, len(self._entries) - 1) @property def current(self): diff --git a/oslo_messaging/_drivers/protocols/amqp/drivertasks.py b/oslo_messaging/_drivers/amqp1_driver/drivertasks.py similarity index 96% rename from oslo_messaging/_drivers/protocols/amqp/drivertasks.py rename to oslo_messaging/_drivers/amqp1_driver/drivertasks.py index 0addc0758..74e3d3f61 100644 --- a/oslo_messaging/_drivers/protocols/amqp/drivertasks.py +++ b/oslo_messaging/_drivers/amqp1_driver/drivertasks.py @@ -16,7 +16,7 @@ import logging import threading import time -from oslo_messaging._drivers.protocols.amqp import controller +from oslo_messaging._drivers.amqp1_driver import controller from oslo_messaging._i18n import _LW from oslo_messaging import exceptions @@ -94,11 +94,10 @@ class ListenTask(controller.Task): class ReplyTask(controller.Task): """A task that sends 'response' message to 'address'. """ - def __init__(self, address, response, log_failure): + def __init__(self, address, response): super(ReplyTask, self).__init__() self._address = address self._response = response - self._log_failure = log_failure self._wakeup = threading.Event() def wait(self): diff --git a/oslo_messaging/_drivers/protocols/amqp/eventloop.py b/oslo_messaging/_drivers/amqp1_driver/eventloop.py similarity index 100% rename from oslo_messaging/_drivers/protocols/amqp/eventloop.py rename to oslo_messaging/_drivers/amqp1_driver/eventloop.py diff --git a/oslo_messaging/_drivers/protocols/amqp/opts.py b/oslo_messaging/_drivers/amqp1_driver/opts.py similarity index 100% rename from oslo_messaging/_drivers/protocols/amqp/opts.py rename to oslo_messaging/_drivers/amqp1_driver/opts.py diff --git a/oslo_messaging/_drivers/amqpdriver.py b/oslo_messaging/_drivers/amqpdriver.py index ed94bccc5..f02159222 100644 --- a/oslo_messaging/_drivers/amqpdriver.py +++ b/oslo_messaging/_drivers/amqpdriver.py @@ -50,14 +50,13 @@ class AMQPIncomingMessage(base.RpcIncomingMessage): self.stopwatch = timeutils.StopWatch() self.stopwatch.start() - def _send_reply(self, conn, reply=None, failure=None, log_failure=True): + def _send_reply(self, conn, reply=None, failure=None): if not self._obsolete_reply_queues.reply_q_valid(self.reply_q, self.msg_id): return if failure: - failure = rpc_common.serialize_remote_exception(failure, - log_failure) + failure = rpc_common.serialize_remote_exception(failure) # NOTE(sileht): ending can be removed in N*, see Listener.wait() # for more detail. msg = {'result': reply, 'failure': failure, 'ending': True, @@ -74,7 +73,7 @@ class AMQPIncomingMessage(base.RpcIncomingMessage): 'elapsed': self.stopwatch.elapsed()}) conn.direct_send(self.reply_q, rpc_common.serialize_msg(msg)) - def reply(self, reply=None, failure=None, log_failure=True): + def reply(self, reply=None, failure=None): if not self.msg_id: # NOTE(Alexei_987) not sending reply, if msg_id is empty # because reply should not be expected by caller side @@ -96,8 +95,7 @@ class AMQPIncomingMessage(base.RpcIncomingMessage): try: with self.listener.driver._get_connection( rpc_common.PURPOSE_SEND) as conn: - self._send_reply(conn, reply, failure, - log_failure=log_failure) + self._send_reply(conn, reply, failure) return except rpc_amqp.AMQPDestinationNotFound: if timer.check_return() > 0: diff --git a/oslo_messaging/_drivers/base.py b/oslo_messaging/_drivers/base.py index 78a379284..d21320998 100644 --- a/oslo_messaging/_drivers/base.py +++ b/oslo_messaging/_drivers/base.py @@ -92,7 +92,7 @@ class IncomingMessage(object): class RpcIncomingMessage(IncomingMessage): @abc.abstractmethod - def reply(self, reply=None, failure=None, log_failure=True): + def reply(self, reply=None, failure=None): """Send a reply or failure back to the client.""" diff --git a/oslo_messaging/_drivers/common.py b/oslo_messaging/_drivers/common.py index 7b446d74d..7ac1e4dff 100644 --- a/oslo_messaging/_drivers/common.py +++ b/oslo_messaging/_drivers/common.py @@ -162,18 +162,15 @@ class Connection(object): raise NotImplementedError() -def serialize_remote_exception(failure_info, log_failure=True): +def serialize_remote_exception(failure_info): """Prepares exception data to be sent over rpc. Failure_info should be a sys.exc_info() tuple. """ tb = traceback.format_exception(*failure_info) + failure = failure_info[1] - if log_failure: - LOG.error(_LE("Returning exception %s to caller"), - six.text_type(failure)) - LOG.error(tb) kwargs = {} if hasattr(failure, 'kwargs'): diff --git a/oslo_messaging/_drivers/protocols/amqp/driver.py b/oslo_messaging/_drivers/impl_amqp1.py similarity index 98% rename from oslo_messaging/_drivers/protocols/amqp/driver.py rename to oslo_messaging/_drivers/impl_amqp1.py index 50d0036a9..46fce39cd 100644 --- a/oslo_messaging/_drivers/protocols/amqp/driver.py +++ b/oslo_messaging/_drivers/impl_amqp1.py @@ -39,10 +39,10 @@ from oslo_messaging import target as messaging_target proton = importutils.try_import('proton') controller = importutils.try_import( - 'oslo_messaging._drivers.protocols.amqp.controller' + 'oslo_messaging._drivers.amqp1_driver.controller' ) drivertasks = importutils.try_import( - 'oslo_messaging._drivers.protocols.amqp.drivertasks' + 'oslo_messaging._drivers.amqp1_driver.drivertasks' ) LOG = logging.getLogger(__name__) @@ -98,13 +98,13 @@ class ProtonIncomingMessage(base.RpcIncomingMessage): self._reply_to = message.reply_to self._correlation_id = message.id - def reply(self, reply=None, failure=None, log_failure=True): + def reply(self, reply=None, failure=None): """Schedule a ReplyTask to send the reply.""" if self._reply_to: response = marshal_response(reply=reply, failure=failure) response.correlation_id = self._correlation_id LOG.debug("Replying to %s", self._correlation_id) - task = drivertasks.ReplyTask(self._reply_to, response, log_failure) + task = drivertasks.ReplyTask(self._reply_to, response) self.listener.driver._ctrl.add_task(task) else: LOG.debug("Ignoring reply as no reply address available") diff --git a/oslo_messaging/_drivers/impl_fake.py b/oslo_messaging/_drivers/impl_fake.py index f35980493..f25e8b7d1 100644 --- a/oslo_messaging/_drivers/impl_fake.py +++ b/oslo_messaging/_drivers/impl_fake.py @@ -30,7 +30,7 @@ class FakeIncomingMessage(base.RpcIncomingMessage): self.requeue_callback = requeue self._reply_q = reply_q - def reply(self, reply=None, failure=None, log_failure=True): + def reply(self, reply=None, failure=None): if self._reply_q: failure = failure[1] if failure else None self._reply_q.put((reply, failure)) diff --git a/oslo_messaging/_drivers/impl_kafka.py b/oslo_messaging/_drivers/impl_kafka.py index 7264d5fda..e7c364700 100644 --- a/oslo_messaging/_drivers/impl_kafka.py +++ b/oslo_messaging/_drivers/impl_kafka.py @@ -33,9 +33,13 @@ PURPOSE_LISTEN = 'listen' kafka_opts = [ cfg.StrOpt('kafka_default_host', default='localhost', + deprecated_for_removal=True, + deprecated_reason="Replaced by [DEFAULT]/transport_url", help='Default Kafka broker Host'), cfg.PortOpt('kafka_default_port', default=9092, + deprecated_for_removal=True, + deprecated_reason="Replaced by [DEFAULT]/transport_url", help='Default Kafka broker Port'), cfg.IntOpt('kafka_max_fetch_bytes', default=1024 * 1024, @@ -97,21 +101,18 @@ class Connection(object): def _parse_url(self): driver_conf = self.conf.oslo_messaging_kafka - try: - self.host = self.url.hosts[0].hostname - except (NameError, IndexError): - self.host = driver_conf.kafka_default_host - try: - self.port = self.url.hosts[0].port - except (NameError, IndexError): - self.port = driver_conf.kafka_default_port + self.hostaddrs = [] - if self.host is None: - self.host = driver_conf.kafka_default_host + for host in self.url.hosts: + if host.hostname: + self.hostaddrs.append("%s:%s" % ( + host.hostname, + host.port or driver_conf.kafka_default_port)) - if self.port is None: - self.port = driver_conf.kafka_default_port + if not self.hostaddrs: + self.hostaddrs.append("%s:%s" % (driver_conf.kafka_default_host, + driver_conf.kafka_default_port)) def notify_send(self, topic, ctxt, msg, retry): """Send messages to Kafka broker. @@ -215,7 +216,7 @@ class Connection(object): return try: self.kafka_client = kafka.KafkaClient( - "%s:%s" % (self.host, str(self.port))) + self.hostaddrs) self.producer = kafka.SimpleProducer(self.kafka_client) except KafkaError as e: LOG.exception(_LE("Kafka Connection is not available: %s"), e) @@ -227,7 +228,7 @@ class Connection(object): self.kafka_client.ensure_topic_exists(topic) self.consumer = kafka.KafkaConsumer( *topics, group_id=group, - bootstrap_servers=["%s:%s" % (self.host, str(self.port))], + bootstrap_servers=self.hostaddrs, fetch_message_max_bytes=self.fetch_messages_max_bytes) self._consume_loop_stopped = False @@ -240,7 +241,7 @@ class OsloKafkaMessage(base.RpcIncomingMessage): def requeue(self): LOG.warning(_LW("requeue is not supported")) - def reply(self, reply=None, failure=None, log_failure=True): + def reply(self, reply=None, failure=None): LOG.warning(_LW("reply is not supported")) diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py index f04bf610c..fcfabfdf1 100644 --- a/oslo_messaging/_drivers/impl_rabbit.py +++ b/oslo_messaging/_drivers/impl_rabbit.py @@ -261,7 +261,7 @@ class Consumer(object): def __init__(self, exchange_name, queue_name, routing_key, type, durable, exchange_auto_delete, queue_auto_delete, callback, - nowait=True, rabbit_ha_queues=None, rabbit_queue_ttl=0): + nowait=False, rabbit_ha_queues=None, rabbit_queue_ttl=0): """Init the Publisher class with the exchange_name, routing_key, type, durable auto_delete """ @@ -1024,11 +1024,31 @@ class Connection(object): if not self.connection.connected: raise self.connection.recoverable_connection_errors[0] - if self._new_tags: + consume_max_retries = 2 + while self._new_tags and consume_max_retries: for consumer, tag in self._consumers.items(): if tag in self._new_tags: - consumer.consume(tag=tag) - self._new_tags.remove(tag) + try: + consumer.consume(tag=tag) + self._new_tags.remove(tag) + except self.connection.channel_errors as exc: + # NOTE(kbespalov): during the interval between + # a queue declaration and consumer declaration + # the queue can disappear. In this case + # we must redeclare queue and try to re-consume. + # More details is here: + # bugs.launchpad.net/oslo.messaging/+bug/1581148 + if exc.code == 404 and consume_max_retries: + consumer.declare(self) + # NOTE(kbespalov): the broker closes a channel + # at any channel error. The py-amqp catches + # this situation and re-open a new channel. + # So, we must re-declare all consumers again. + self._new_tags = set(self._consumers.values()) + consume_max_retries -= 1 + break + else: + raise poll_timeout = (self._poll_timeout if timeout is None else min(timeout, self._poll_timeout)) diff --git a/oslo_messaging/_drivers/impl_zmq.py b/oslo_messaging/_drivers/impl_zmq.py index 979148c52..529063def 100644 --- a/oslo_messaging/_drivers/impl_zmq.py +++ b/oslo_messaging/_drivers/impl_zmq.py @@ -87,7 +87,7 @@ zmq_opts = [ 'PUB/SUB always uses proxy.'), cfg.BoolOpt('use_router_proxy', default=True, - help='Use ROUTER remote proxy for direct methods.'), + help='Use ROUTER remote proxy.'), cfg.PortOpt('rpc_zmq_min_port', default=49153, @@ -182,12 +182,18 @@ class ZmqDriver(base.BaseDriver): self.get_matchmaker_backend(url), ).driver(self.conf, url=url) + client_cls = zmq_client.ZmqClientProxy + if conf.use_pub_sub and not conf.use_router_proxy: + client_cls = zmq_client.ZmqClientMixDirectPubSub + elif not conf.use_pub_sub and not conf.use_router_proxy: + client_cls = zmq_client.ZmqClientDirect + self.client = LazyDriverItem( - zmq_client.ZmqClient, self.conf, self.matchmaker, + client_cls, self.conf, self.matchmaker, self.allowed_remote_exmods) self.notifier = LazyDriverItem( - zmq_client.ZmqClient, self.conf, self.matchmaker, + client_cls, self.conf, self.matchmaker, self.allowed_remote_exmods) super(ZmqDriver, self).__init__(conf, url, default_exchange, diff --git a/oslo_messaging/_drivers/pika_driver/pika_message.py b/oslo_messaging/_drivers/pika_driver/pika_message.py index 3ca4b0104..2802bedb1 100644 --- a/oslo_messaging/_drivers/pika_driver/pika_message.py +++ b/oslo_messaging/_drivers/pika_driver/pika_message.py @@ -175,13 +175,11 @@ class RpcPikaIncomingMessage(PikaIncomingMessage, base.RpcIncomingMessage): self.reply_q = properties.reply_to self.msg_id = properties.correlation_id - def reply(self, reply=None, failure=None, log_failure=True): + def reply(self, reply=None, failure=None): """Send back reply to the RPC client :param reply: Dictionary, reply. In case of exception should be None :param failure: Tuple, should be a sys.exc_info() tuple. Should be None if RPC request was successfully processed. - :param log_failure: Boolean, not used in this implementation. - It present here to be compatible with driver API :return RpcReplyPikaIncomingMessage, message with reply """ diff --git a/oslo_messaging/_drivers/zmq_driver/broker/zmq_queue_proxy.py b/oslo_messaging/_drivers/zmq_driver/broker/zmq_queue_proxy.py index 61f3e37a0..215f0a347 100644 --- a/oslo_messaging/_drivers/zmq_driver/broker/zmq_queue_proxy.py +++ b/oslo_messaging/_drivers/zmq_driver/broker/zmq_queue_proxy.py @@ -73,7 +73,7 @@ class UniversalQueueProxy(object): if self.conf.use_pub_sub and msg_type in (zmq_names.CAST_FANOUT_TYPE, zmq_names.NOTIFY_TYPE): self.pub_publisher.send_request(message) - elif msg_type in zmq_names.DIRECT_TYPES: + else: self._redirect_message(self.be_router_socket if socket is self.fe_router_socket else self.fe_router_socket, message) diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py index 1064e7279..5cba7820a 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py @@ -35,6 +35,7 @@ class DealerPublisherProxy(object): """Used when publishing to a proxy. """ def __init__(self, conf, matchmaker, socket_to_proxy): + self.conf = conf self.sockets_manager = zmq_publisher_base.SocketsManager( conf, matchmaker, zmq.ROUTER, zmq.DEALER) self.socket = socket_to_proxy @@ -45,10 +46,17 @@ class DealerPublisherProxy(object): raise zmq_publisher_base.UnsupportedSendPattern( request.msg_type) - routing_key = self.routing_table.get_routable_host(request.target) \ - if request.msg_type in zmq_names.DIRECT_TYPES else \ - zmq_address.target_to_subscribe_filter(request.target) + if self.conf.use_pub_sub: + routing_key = self.routing_table.get_routable_host(request.target) \ + if request.msg_type in zmq_names.DIRECT_TYPES else \ + zmq_address.target_to_subscribe_filter(request.target) + self._do_send_request(request, routing_key) + else: + routing_keys = self.routing_table.get_all_hosts(request.target) + for routing_key in routing_keys: + self._do_send_request(request, routing_key) + def _do_send_request(self, request, routing_key): self.socket.send(b'', zmq.SNDMORE) self.socket.send(six.b(str(request.msg_type)), zmq.SNDMORE) self.socket.send(six.b(routing_key), zmq.SNDMORE) @@ -132,6 +140,10 @@ class RoutingTable(object): self.routing_table = {} self.routable_hosts = {} + def get_all_hosts(self, target): + self._update_routing_table(target) + return list(self.routable_hosts.get(str(target)) or []) + def get_routable_host(self, target): self._update_routing_table(target) hosts_for_target = self.routable_hosts[str(target)] diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py index 51e8d4181..bfaff0d4d 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py @@ -112,30 +112,37 @@ class SocketsManager(object): self.socket_to_publishers = None self.socket_to_routers = None - def _track_socket(self, socket, target): - self.outbound_sockets[str(target)] = (socket, time.time()) - def get_hosts(self, target): return self.matchmaker.get_hosts( target, zmq_names.socket_type_str(self.listener_type)) + @staticmethod + def _key_from_target(target): + return target.topic if target.fanout else str(target) + def _get_hosts_and_connect(self, socket, target): hosts = self.get_hosts(target) self._connect_to_hosts(socket, target, hosts) + def _track_socket(self, socket, target): + key = self._key_from_target(target) + self.outbound_sockets[key] = (socket, time.time()) + def _connect_to_hosts(self, socket, target, hosts): for host in hosts: socket.connect_to_host(host) self._track_socket(socket, target) def _check_for_new_hosts(self, target): - socket, tm = self.outbound_sockets[str(target)] + key = self._key_from_target(target) + socket, tm = self.outbound_sockets[key] if 0 <= self.conf.zmq_target_expire <= time.time() - tm: self._get_hosts_and_connect(socket, target) return socket def get_socket(self, target): - if str(target) in self.outbound_sockets: + key = self._key_from_target(target) + if key in self.outbound_sockets: socket = self._check_for_new_hosts(target) else: socket = zmq_socket.ZmqSocket(self.conf, self.zmq_context, @@ -143,16 +150,6 @@ class SocketsManager(object): self._get_hosts_and_connect(socket, target) return socket - def get_socket_to_hosts(self, target, hosts): - key = str(target) - if key in self.outbound_sockets: - socket, tm = self.outbound_sockets[key] - else: - socket = zmq_socket.ZmqSocket(self.conf, self.zmq_context, - self.socket_type) - self._connect_to_hosts(socket, target, hosts) - return socket - def get_socket_to_publishers(self): if self.socket_to_publishers is not None: return self.socket_to_publishers diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py index 9f01eb23f..e5951cb9d 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py +++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py @@ -13,6 +13,7 @@ # under the License. +from oslo_messaging._drivers import common from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \ import zmq_dealer_call_publisher from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \ @@ -28,45 +29,102 @@ from oslo_messaging._drivers.zmq_driver import zmq_names zmq = zmq_async.import_zmq() -class ZmqClient(zmq_client_base.ZmqClientBase): +class WrongClientException(common.RPCException): + """Raised if client type doesn't match configuration""" + + +class ZmqClientMixDirectPubSub(zmq_client_base.ZmqClientBase): + """Client for using with direct connections and fanout over proxy: + + use_pub_sub = true + use_router_proxy = false + + """ def __init__(self, conf, matchmaker=None, allowed_remote_exmods=None): + if conf.use_router_proxy or not conf.use_pub_sub: + raise WrongClientException() + self.sockets_manager = zmq_publisher_base.SocketsManager( conf, matchmaker, zmq.ROUTER, zmq.DEALER) - default_publisher = zmq_dealer_publisher.DealerPublisher( - conf, matchmaker) - - publisher_to_proxy = zmq_dealer_publisher_proxy.DealerPublisherProxy( + fanout_publisher = zmq_dealer_publisher_proxy.DealerPublisherProxy( conf, matchmaker, self.sockets_manager.get_socket_to_publishers()) - call_publisher = zmq_dealer_publisher_proxy.DealerCallPublisherProxy( - conf, matchmaker, self.sockets_manager) if conf.use_router_proxy \ - else zmq_dealer_call_publisher.DealerCallPublisher( - conf, matchmaker, self.sockets_manager) - - cast_publisher = publisher_to_proxy if conf.use_router_proxy \ - else zmq_dealer_publisher.DealerPublisherAsync( - conf, matchmaker) - - fanout_publisher = publisher_to_proxy \ - if conf.use_pub_sub else default_publisher - - super(ZmqClient, self).__init__( + super(ZmqClientMixDirectPubSub, self).__init__( conf, matchmaker, allowed_remote_exmods, publishers={ - zmq_names.CALL_TYPE: call_publisher, + zmq_names.CALL_TYPE: + zmq_dealer_call_publisher.DealerCallPublisher( + conf, matchmaker, self.sockets_manager), - zmq_names.CAST_TYPE: cast_publisher, - - # Here use DealerPublisherLight for sending request to proxy - # which finally uses PubPublisher to send fanout in case of - # 'use_pub_sub' option configured. zmq_names.CAST_FANOUT_TYPE: fanout_publisher, zmq_names.NOTIFY_TYPE: fanout_publisher, - "default": default_publisher + "default": zmq_dealer_publisher.DealerPublisherAsync( + conf, matchmaker) + } + ) + + +class ZmqClientDirect(zmq_client_base.ZmqClientBase): + """This kind of client (publishers combination) is to be used for + direct connections only: + + use_pub_sub = false + use_router_proxy = false + """ + + def __init__(self, conf, matchmaker=None, allowed_remote_exmods=None): + + if conf.use_pub_sub or conf.use_router_proxy: + raise WrongClientException() + + self.sockets_manager = zmq_publisher_base.SocketsManager( + conf, matchmaker, zmq.ROUTER, zmq.DEALER) + + super(ZmqClientDirect, self).__init__( + conf, matchmaker, allowed_remote_exmods, + publishers={ + zmq_names.CALL_TYPE: + zmq_dealer_call_publisher.DealerCallPublisher( + conf, matchmaker, self.sockets_manager), + + "default": zmq_dealer_publisher.DealerPublisher( + conf, matchmaker) + } + ) + + +class ZmqClientProxy(zmq_client_base.ZmqClientBase): + """Client for using with proxy: + + use_pub_sub = true + use_router_proxy = true + or + use_pub_sub = false + use_router_proxy = true + """ + + def __init__(self, conf, matchmaker=None, allowed_remote_exmods=None): + + if not conf.use_router_proxy: + raise WrongClientException() + + self.sockets_manager = zmq_publisher_base.SocketsManager( + conf, matchmaker, zmq.ROUTER, zmq.DEALER) + + super(ZmqClientProxy, self).__init__( + conf, matchmaker, allowed_remote_exmods, + publishers={ + zmq_names.CALL_TYPE: + zmq_dealer_publisher_proxy.DealerCallPublisherProxy( + conf, matchmaker, self.sockets_manager), + + "default": zmq_dealer_publisher_proxy.DealerPublisherProxy( + conf, matchmaker, + self.sockets_manager.get_socket_to_publishers()) } ) diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_response.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_response.py index 9342bafb0..b6a7b7563 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/zmq_response.py +++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_response.py @@ -18,8 +18,7 @@ from oslo_messaging._drivers.zmq_driver import zmq_names class Response(object): def __init__(self, id=None, type=None, message_id=None, - reply_id=None, reply_body=None, - failure=None, log_failure=None): + reply_id=None, reply_body=None, failure=None): self._id = id self._type = type @@ -27,7 +26,6 @@ class Response(object): self._reply_id = reply_id self._reply_body = reply_body self._failure = failure - self._log_failure = log_failure @property def id_(self): @@ -53,18 +51,13 @@ class Response(object): def failure(self): return self._failure - @property - def log_failure(self): - return self._log_failure - def to_dict(self): return {zmq_names.FIELD_ID: self._id, zmq_names.FIELD_TYPE: self._type, zmq_names.FIELD_MSG_ID: self._message_id, zmq_names.FIELD_REPLY_ID: self._reply_id, zmq_names.FIELD_REPLY: self._reply_body, - zmq_names.FIELD_FAILURE: self._failure, - zmq_names.FIELD_LOG_FAILURE: self._log_failure} + zmq_names.FIELD_FAILURE: self._failure} def __str__(self): return str(self.to_dict()) diff --git a/oslo_messaging/_drivers/zmq_driver/matchmaker/matchmaker_redis.py b/oslo_messaging/_drivers/zmq_driver/matchmaker/matchmaker_redis.py index 8d53bec5e..c73cb872a 100644 --- a/oslo_messaging/_drivers/zmq_driver/matchmaker/matchmaker_redis.py +++ b/oslo_messaging/_drivers/zmq_driver/matchmaker/matchmaker_redis.py @@ -190,7 +190,7 @@ class RedisMatchMaker(base.MatchMakerBase): key = zmq_address.target_to_key(target, listener_type) hosts.extend(self._get_hosts_by_key(key)) - if not hosts and target.topic and target.server: + if (not hosts or target.fanout) and target.topic and target.server: key = zmq_address.prefix_str(target.topic, listener_type) hosts.extend(self._get_hosts_by_key(key)) diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py index 6fbebf63f..f1f5d6018 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py +++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py @@ -37,7 +37,7 @@ class DealerIncomingMessage(base.RpcIncomingMessage): def __init__(self, context, message): super(DealerIncomingMessage, self).__init__(context, message) - def reply(self, reply=None, failure=None, log_failure=True): + def reply(self, reply=None, failure=None): """Reply is not needed for non-call messages""" def acknowledge(self): @@ -55,16 +55,14 @@ class DealerIncomingRequest(base.RpcIncomingMessage): self.reply_id = reply_id self.message_id = message_id - def reply(self, reply=None, failure=None, log_failure=True): + def reply(self, reply=None, failure=None): if failure is not None: - failure = rpc_common.serialize_remote_exception(failure, - log_failure) + failure = rpc_common.serialize_remote_exception(failure) response = zmq_response.Response(type=zmq_names.REPLY_TYPE, message_id=self.message_id, reply_id=self.reply_id, reply_body=reply, - failure=failure, - log_failure=log_failure) + failure=failure) LOG.debug("Replying %s", self.message_id) diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_pull_consumer.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_pull_consumer.py index a69602982..719c24e4e 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_pull_consumer.py +++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_pull_consumer.py @@ -31,7 +31,7 @@ class PullIncomingMessage(base.RpcIncomingMessage): def __init__(self, context, message): super(PullIncomingMessage, self).__init__(context, message) - def reply(self, reply=None, failure=None, log_failure=True): + def reply(self, reply=None, failure=None): """Reply is not needed for non-call messages.""" def acknowledge(self): diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py index 8eb3dad7a..da487f5c9 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py +++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py @@ -37,7 +37,7 @@ class RouterIncomingMessage(base.RpcIncomingMessage): self.msg_id = msg_id self.message = message - def reply(self, reply=None, failure=None, log_failure=True): + def reply(self, reply=None, failure=None): """Reply is not needed for non-call messages""" def acknowledge(self): diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_sub_consumer.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_sub_consumer.py index 0d1c5213e..6aa8ec4eb 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_sub_consumer.py +++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_sub_consumer.py @@ -34,7 +34,7 @@ class SubIncomingMessage(base.RpcIncomingMessage): def __init__(self, context, message): super(SubIncomingMessage, self).__init__(context, message) - def reply(self, reply=None, failure=None, log_failure=True): + def reply(self, reply=None, failure=None): """Reply is not needed for non-call messages.""" def acknowledge(self): diff --git a/oslo_messaging/_drivers/zmq_driver/server/zmq_incoming_message.py b/oslo_messaging/_drivers/zmq_driver/server/zmq_incoming_message.py index e3bd186d2..2dc8ec309 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/zmq_incoming_message.py +++ b/oslo_messaging/_drivers/zmq_driver/server/zmq_incoming_message.py @@ -39,16 +39,14 @@ class ZmqIncomingRequest(base.RpcIncomingMessage): self.received = None self.poller = poller - def reply(self, reply=None, failure=None, log_failure=True): + def reply(self, reply=None, failure=None): if failure is not None: - failure = rpc_common.serialize_remote_exception(failure, - log_failure) + failure = rpc_common.serialize_remote_exception(failure) response = zmq_response.Response(type=zmq_names.REPLY_TYPE, message_id=self.request.message_id, reply_id=self.reply_id, reply_body=reply, - failure=failure, - log_failure=log_failure) + failure=failure) LOG.debug("Replying %s", (str(self.request.message_id))) diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_names.py b/oslo_messaging/_drivers/zmq_driver/zmq_names.py index ae477e6df..51f68c6e8 100644 --- a/oslo_messaging/_drivers/zmq_driver/zmq_names.py +++ b/oslo_messaging/_drivers/zmq_driver/zmq_names.py @@ -20,7 +20,6 @@ zmq = zmq_async.import_zmq() FIELD_TYPE = 'type' FIELD_FAILURE = 'failure' FIELD_REPLY = 'reply' -FIELD_LOG_FAILURE = 'log_failure' FIELD_ID = 'id' FIELD_MSG_ID = 'message_id' FIELD_MSG_TYPE = 'msg_type' diff --git a/oslo_messaging/conffixture.py b/oslo_messaging/conffixture.py index bb037b8db..c6edddd07 100644 --- a/oslo_messaging/conffixture.py +++ b/oslo_messaging/conffixture.py @@ -55,7 +55,7 @@ class ConfFixture(fixtures.Fixture): 'oslo_messaging._drivers.amqp', 'amqp_opts', 'oslo_messaging_qpid') _import_opts(self.conf, - 'oslo_messaging._drivers.protocols.amqp.opts', + 'oslo_messaging._drivers.amqp1_driver.opts', 'amqp1_opts', 'oslo_messaging_amqp') _import_opts(self.conf, 'oslo_messaging._drivers.impl_zmq', 'zmq_opts') diff --git a/oslo_messaging/opts.py b/oslo_messaging/opts.py index 068774336..b04768a28 100644 --- a/oslo_messaging/opts.py +++ b/oslo_messaging/opts.py @@ -21,12 +21,12 @@ import copy import itertools from oslo_messaging._drivers import amqp +from oslo_messaging._drivers.amqp1_driver import opts as amqp_opts from oslo_messaging._drivers import base as drivers_base from oslo_messaging._drivers import impl_pika from oslo_messaging._drivers import impl_rabbit from oslo_messaging._drivers import impl_zmq from oslo_messaging._drivers.pika_driver import pika_connection_factory -from oslo_messaging._drivers.protocols.amqp import opts as amqp_opts from oslo_messaging._drivers.zmq_driver.matchmaker import matchmaker_redis from oslo_messaging.notify import notifier from oslo_messaging.rpc import client diff --git a/oslo_messaging/rpc/client.py b/oslo_messaging/rpc/client.py index 63f388cf4..88e21a549 100644 --- a/oslo_messaging/rpc/client.py +++ b/oslo_messaging/rpc/client.py @@ -383,7 +383,7 @@ class RPCClient(_BaseCallContext): :type kwargs: dict :raises: MessageDeliveryFailure """ - super(RPCClient, self).cast(ctxt, method, **kwargs) + self.prepare().cast(ctxt, method, **kwargs) def call(self, ctxt, method, **kwargs): """Invoke a method and wait for a reply. @@ -425,8 +425,8 @@ class RPCClient(_BaseCallContext): :type kwargs: dict :raises: MessagingTimeout, RemoteError, MessageDeliveryFailure """ - return super(RPCClient, self).call(ctxt, method, **kwargs) + return self.prepare().call(ctxt, method, **kwargs) def can_send_version(self, version=_marker): """Check to see if a version is compatible with the version cap.""" - return super(RPCClient, self).can_send_version(version) + return self.prepare(version=version).can_send_version() diff --git a/oslo_messaging/rpc/server.py b/oslo_messaging/rpc/server.py index 2fbdda77f..c51ffb92c 100644 --- a/oslo_messaging/rpc/server.py +++ b/oslo_messaging/rpc/server.py @@ -132,15 +132,14 @@ class RPCServer(msg_server.MessageHandlingServer): try: res = self.dispatcher.dispatch(message) except rpc_dispatcher.ExpectedException as e: - LOG.debug(u'Expected exception during message handling (%s)', - e.exc_info[1]) failure = e.exc_info - except Exception as e: + LOG.debug(u'Expected exception during message handling (%s)', e) + except Exception: # current sys.exc_info() content can be overriden - # by another exception raise by a log handler during + # by another exception raised by a log handler during # LOG.exception(). So keep a copy and delete it later. failure = sys.exc_info() - LOG.exception(_LE('Exception during handling message')) + LOG.exception(_LE('Exception during message handling')) try: if failure is None: diff --git a/oslo_messaging/server.py b/oslo_messaging/server.py index 8b4f4197c..ac4a96474 100644 --- a/oslo_messaging/server.py +++ b/oslo_messaging/server.py @@ -38,7 +38,7 @@ import six from stevedore import driver from oslo_messaging._drivers import base as driver_base -from oslo_messaging._i18n import _LW +from oslo_messaging._i18n import _LW, _LI from oslo_messaging import exceptions LOG = logging.getLogger(__name__) @@ -313,7 +313,8 @@ class MessageHandlingServer(service.ServiceBase, _OrderedTaskRunner): The executor parameter controls how incoming messages will be received and dispatched. By default, the most simple executor is used - the - blocking executor. + blocking executor. It handles only one message at once. It's + recommended to use threading or eventlet. :param transport: the messaging transport :type transport: Transport @@ -330,6 +331,14 @@ class MessageHandlingServer(service.ServiceBase, _OrderedTaskRunner): self.transport = transport self.dispatcher = dispatcher self.executor_type = executor + if self.executor_type == 'blocking': + # NOTE(sileht): We keep blocking as default to not enforce the + # application to use threading or eventlet. Because application + # have to be preprepared accordingly for each one (monkeypatching, + # threadsafe, ...) + LOG.info(_LI("blocking executor handles only one message at " + "once. threading or eventlet executor is " + "recommended.")) self.listener = None diff --git a/oslo_messaging/tests/test_amqp_driver.py b/oslo_messaging/tests/drivers/test_amqp_driver.py similarity index 96% rename from oslo_messaging/tests/test_amqp_driver.py rename to oslo_messaging/tests/drivers/test_amqp_driver.py index af6724dba..e07f26c17 100644 --- a/oslo_messaging/tests/test_amqp_driver.py +++ b/oslo_messaging/tests/drivers/test_amqp_driver.py @@ -36,7 +36,7 @@ from oslo_messaging.tests import utils as test_utils # are available in the base repos for all supported platforms. pyngus = importutils.try_import("pyngus") if pyngus: - from oslo_messaging._drivers.protocols.amqp import driver as amqp_driver + import oslo_messaging._drivers.impl_amqp1 as amqp_driver # The Cyrus-based SASL tests can only be run if the installed version of proton # has been built with Cyrus SASL support. @@ -512,6 +512,8 @@ class TestFailover(test_utils.BaseTestCase): def setUp(self): super(TestFailover, self).setUp() self._brokers = [FakeBroker(), FakeBroker()] + self._primary = 0 + self._backup = 1 hosts = [] for broker in self._brokers: hosts.append(oslo_messaging.TransportHost(hostname=broker.host, @@ -526,8 +528,10 @@ class TestFailover(test_utils.BaseTestCase): if broker.isAlive(): broker.stop() - def _failover(self, fail_brokers): + def _failover(self, fail_broker): self._brokers[0].start() + self._brokers[1].start() + # self.config(trace=True, group="oslo_messaging_amqp") driver = amqp_driver.ProtonDriver(self.conf, self._broker_url) @@ -535,12 +539,17 @@ class TestFailover(test_utils.BaseTestCase): listener = _ListenerThread( driver.listen(target, None, None)._poll_style_listener, 2) - # wait for listener links to come up + # wait for listener links to come up on either broker # 4 == 3 links per listener + 1 for the global reply queue - predicate = lambda: self._brokers[0].sender_link_count == 4 + predicate = lambda: ((self._brokers[0].sender_link_count == 4) or + (self._brokers[1].sender_link_count == 4)) _wait_until(predicate, 30) self.assertTrue(predicate()) + if self._brokers[1].sender_link_count == 4: + self._primary = 1 + self._backup = 0 + rc = driver.send(target, {"context": "whatever"}, {"method": "echo", "id": "echo-1"}, wait_for_reply=True, @@ -549,15 +558,15 @@ class TestFailover(test_utils.BaseTestCase): self.assertEqual(rc.get('correlation-id'), 'echo-1') # 1 request msg, 1 response: - self.assertEqual(self._brokers[0].topic_count, 1) - self.assertEqual(self._brokers[0].direct_count, 1) + self.assertEqual(self._brokers[self._primary].topic_count, 1) + self.assertEqual(self._brokers[self._primary].direct_count, 1) # invoke failover method - fail_brokers(self._brokers[0], self._brokers[1]) + fail_broker(self._brokers[self._primary]) # wait for listener links to re-establish on broker 1 # 4 = 3 links per listener + 1 for the global reply queue - predicate = lambda: self._brokers[1].sender_link_count == 4 + predicate = lambda: self._brokers[self._backup].sender_link_count == 4 _wait_until(predicate, 30) self.assertTrue(predicate()) @@ -570,44 +579,41 @@ class TestFailover(test_utils.BaseTestCase): self.assertEqual(rc.get('correlation-id'), 'echo-2') # 1 request msg, 1 response: - self.assertEqual(self._brokers[1].topic_count, 1) - self.assertEqual(self._brokers[1].direct_count, 1) + self.assertEqual(self._brokers[self._backup].topic_count, 1) + self.assertEqual(self._brokers[self._backup].direct_count, 1) listener.join(timeout=30) self.assertFalse(listener.isAlive()) # note: stopping the broker first tests cleaning up driver without a # connection active - self._brokers[1].stop() + self._brokers[self._backup].stop() driver.cleanup() def test_broker_crash(self): """Simulate a failure of one broker.""" - def _meth(broker0, broker1): - # fail broker 0 and start broker 1: - broker0.stop() + def _meth(broker): + # fail broker: + broker.stop() time.sleep(0.5) - broker1.start() self._failover(_meth) def test_broker_shutdown(self): """Simulate a normal shutdown of a broker.""" - def _meth(broker0, broker1): - broker0.stop(clean=True) + def _meth(broker): + broker.stop(clean=True) time.sleep(0.5) - broker1.start() self._failover(_meth) def test_heartbeat_failover(self): """Simulate broker heartbeat timeout.""" - def _meth(broker0, broker1): - # keep alive heartbeat from broker 0 will stop, which should force - # failover to broker 1 in about two seconds - broker0.pause() - broker1.start() + def _meth(broker): + # keep alive heartbeat from primary broker will stop, which should + # force failover to backup broker in about two seconds + broker.pause() self.config(idle_timeout=2, group="oslo_messaging_amqp") self._failover(_meth) - self._brokers[0].stop() + self._brokers[self._primary].stop() def test_listener_failover(self): """Verify that Listeners sharing the same topic are re-established diff --git a/oslo_messaging/tests/drivers/test_impl_kafka.py b/oslo_messaging/tests/drivers/test_impl_kafka.py index 1ba3a85ed..4579453a7 100644 --- a/oslo_messaging/tests/drivers/test_impl_kafka.py +++ b/oslo_messaging/tests/drivers/test_impl_kafka.py @@ -57,13 +57,17 @@ class TestKafkaTransportURL(test_utils.BaseTestCase): scenarios = [ ('none', dict(url=None, - expected=[dict(host='localhost', port=9092)])), + expected=dict(hostaddrs=['localhost:9092']))), ('empty', dict(url='kafka:///', - expected=[dict(host='localhost', port=9092)])), + expected=dict(hostaddrs=['localhost:9092']))), ('host', dict(url='kafka://127.0.0.1', - expected=[dict(host='127.0.0.1', port=9092)])), + expected=dict(hostaddrs=['127.0.0.1:9092']))), ('port', dict(url='kafka://localhost:1234', - expected=[dict(host='localhost', port=1234)])), + expected=dict(hostaddrs=['localhost:1234']))), + ('two', dict(url='kafka://localhost:1234,localhost2:1234', + expected=dict(hostaddrs=['localhost:1234', + 'localhost2:1234']))), + ] def setUp(self): @@ -76,8 +80,7 @@ class TestKafkaTransportURL(test_utils.BaseTestCase): driver = transport._driver conn = driver._get_connection(kafka_driver.PURPOSE_SEND) - self.assertEqual(self.expected[0]['host'], conn.host) - self.assertEqual(self.expected[0]['port'], conn.port) + self.assertEqual(self.expected['hostaddrs'], conn.hostaddrs) class TestKafkaDriver(test_utils.BaseTestCase): diff --git a/oslo_messaging/tests/drivers/test_impl_rabbit.py b/oslo_messaging/tests/drivers/test_impl_rabbit.py index e4f86917f..ac8e9cbb1 100644 --- a/oslo_messaging/tests/drivers/test_impl_rabbit.py +++ b/oslo_messaging/tests/drivers/test_impl_rabbit.py @@ -321,6 +321,51 @@ class TestRabbitConsume(test_utils.BaseTestCase): self.assertEqual(0, int(deadline - time.time())) + def test_consume_from_missing_queue(self): + transport = oslo_messaging.get_transport(self.conf, 'kombu+memory://') + self.addCleanup(transport.cleanup) + with transport._driver._get_connection( + driver_common.PURPOSE_LISTEN) as conn: + with mock.patch('kombu.Queue.consume') as consume, mock.patch( + 'kombu.Queue.declare') as declare: + conn.declare_topic_consumer(exchange_name='test', + topic='test', + callback=lambda msg: True) + import amqp + consume.side_effect = [amqp.NotFound, None] + conn.connection.connection.recoverable_connection_errors = () + conn.connection.connection.recoverable_channel_errors = () + self.assertEqual(1, declare.call_count) + conn.connection.connection.transport.drain_events = mock.Mock() + # Ensure that a queue will be re-declared if the consume method + # of kombu.Queue raise amqp.NotFound + conn.consume() + self.assertEqual(2, declare.call_count) + + def test_consume_from_missing_queue_with_io_error_on_redeclaration(self): + transport = oslo_messaging.get_transport(self.conf, 'kombu+memory://') + self.addCleanup(transport.cleanup) + with transport._driver._get_connection( + driver_common.PURPOSE_LISTEN) as conn: + with mock.patch('kombu.Queue.consume') as consume, mock.patch( + 'kombu.Queue.declare') as declare: + conn.declare_topic_consumer(exchange_name='test', + topic='test', + callback=lambda msg: True) + import amqp + consume.side_effect = [amqp.NotFound, None] + declare.side_effect = [IOError, None] + + conn.connection.connection.recoverable_connection_errors = ( + IOError,) + conn.connection.connection.recoverable_channel_errors = () + self.assertEqual(1, declare.call_count) + conn.connection.connection.transport.drain_events = mock.Mock() + # Ensure that a queue will be re-declared after + # 'queue not found' exception despite on connection error. + conn.consume() + self.assertEqual(3, declare.call_count) + def test_connection_ack_have_disconnected_kombu_connection(self): transport = oslo_messaging.get_transport(self.conf, 'kombu+memory:////') @@ -451,14 +496,6 @@ class TestSendReceive(test_utils.BaseTestCase): senders = [] replies = [] msgs = [] - errors = [] - - def stub_error(msg, *a, **kw): - if (a and len(a) == 1 and isinstance(a[0], dict) and a[0]): - a = a[0] - errors.append(str(msg) % a) - - self.stubs.Set(driver_common.LOG, 'error', stub_error) def send_and_wait_for_reply(i): try: @@ -500,8 +537,7 @@ class TestSendReceive(test_utils.BaseTestCase): raise ZeroDivisionError except Exception: failure = sys.exc_info() - msgs[i].reply(failure=failure, - log_failure=not self.expected) + msgs[i].reply(failure=failure) elif self.rx_id: msgs[i].reply({'rx_id': i}) else: @@ -519,11 +555,6 @@ class TestSendReceive(test_utils.BaseTestCase): else: self.assertEqual(self.reply, reply) - if not self.timeout and self.failure and not self.expected: - self.assertTrue(len(errors) > 0, errors) - else: - self.assertEqual(0, len(errors), errors) - TestSendReceive.generate_scenarios() diff --git a/oslo_messaging/tests/rpc/test_server.py b/oslo_messaging/tests/rpc/test_server.py index 9d0cb6000..62a547f20 100644 --- a/oslo_messaging/tests/rpc/test_server.py +++ b/oslo_messaging/tests/rpc/test_server.py @@ -21,6 +21,7 @@ import testscenarios import mock import oslo_messaging +from oslo_messaging.rpc import server as rpc_server_module from oslo_messaging import server as server_module from oslo_messaging.tests import utils as test_utils @@ -326,6 +327,22 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin): def ping(self, ctxt, arg): raise ValueError(arg) + debugs = [] + errors = [] + + def stub_debug(msg, *a, **kw): + if (a and len(a) == 1 and isinstance(a[0], dict) and a[0]): + a = a[0] + debugs.append(str(msg) % a) + + def stub_error(msg, *a, **kw): + if (a and len(a) == 1 and isinstance(a[0], dict) and a[0]): + a = a[0] + errors.append(str(msg) % a) + + self.stubs.Set(rpc_server_module.LOG, 'debug', stub_debug) + self.stubs.Set(rpc_server_module.LOG, 'error', stub_error) + server_thread = self._setup_server(transport, TestEndpoint()) client = self._setup_client(transport) @@ -334,6 +351,8 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin): except Exception as ex: self.assertIsInstance(ex, ValueError) self.assertEqual('dsfoo', str(ex)) + self.assertTrue(len(debugs) == 0) + self.assertTrue(len(errors) > 0) else: self.assertTrue(False) @@ -342,6 +361,22 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin): def test_expected_failure(self): transport = oslo_messaging.get_transport(self.conf, url='fake:') + debugs = [] + errors = [] + + def stub_debug(msg, *a, **kw): + if (a and len(a) == 1 and isinstance(a[0], dict) and a[0]): + a = a[0] + debugs.append(str(msg) % a) + + def stub_error(msg, *a, **kw): + if (a and len(a) == 1 and isinstance(a[0], dict) and a[0]): + a = a[0] + errors.append(str(msg) % a) + + self.stubs.Set(rpc_server_module.LOG, 'debug', stub_debug) + self.stubs.Set(rpc_server_module.LOG, 'error', stub_error) + class TestEndpoint(object): @oslo_messaging.expected_exceptions(ValueError) def ping(self, ctxt, arg): @@ -355,6 +390,8 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin): except Exception as ex: self.assertIsInstance(ex, ValueError) self.assertEqual('dsfoo', str(ex)) + self.assertTrue(len(debugs) > 0) + self.assertTrue(len(errors) == 0) else: self.assertTrue(False) diff --git a/oslo_messaging/tests/test_exception_serialization.py b/oslo_messaging/tests/test_exception_serialization.py index c1079c0a4..ca4f92b02 100644 --- a/oslo_messaging/tests/test_exception_serialization.py +++ b/oslo_messaging/tests/test_exception_serialization.py @@ -61,11 +61,6 @@ def add_remote_postfix(ex): class SerializeRemoteExceptionTestCase(test_utils.BaseTestCase): - _log_failure = [ - ('log_failure', dict(log_failure=True)), - ('do_not_log_failure', dict(log_failure=False)), - ] - _add_remote = [ ('add_remote', dict(add_remote=True)), ('do_not_add_remote', dict(add_remote=False)), @@ -100,27 +95,19 @@ class SerializeRemoteExceptionTestCase(test_utils.BaseTestCase): @classmethod def generate_scenarios(cls): - cls.scenarios = testscenarios.multiply_scenarios(cls._log_failure, - cls._add_remote, + cls.scenarios = testscenarios.multiply_scenarios(cls._add_remote, cls._exception_types) def setUp(self): super(SerializeRemoteExceptionTestCase, self).setUp() def test_serialize_remote_exception(self): - errors = [] - - def stub_error(msg, *a, **kw): - if (a and len(a) == 1 and isinstance(a[0], dict) and a[0]): - a = a[0] - errors.append(str(msg) % a) - - self.stubs.Set(exceptions.LOG, 'error', stub_error) - try: try: raise self.cls(*self.args, **self.kwargs) except Exception as ex: + # Note: in Python 3 ex variable will be cleared at the end of + # the except clause, so explicitly make an extra copy of it cls_error = ex if self.add_remote: ex = add_remote_postfix(ex) @@ -128,8 +115,7 @@ class SerializeRemoteExceptionTestCase(test_utils.BaseTestCase): except Exception: exc_info = sys.exc_info() - serialized = exceptions.serialize_remote_exception( - exc_info, log_failure=self.log_failure) + serialized = exceptions.serialize_remote_exception(exc_info) failure = jsonutils.loads(serialized) @@ -143,11 +129,6 @@ class SerializeRemoteExceptionTestCase(test_utils.BaseTestCase): tb = cls_error.__class__.__name__ + ': ' + self.msg self.assertIn(tb, ''.join(failure['tb'])) - if self.log_failure: - self.assertTrue(len(errors) > 0, errors) - else: - self.assertEqual(0, len(errors), errors) - SerializeRemoteExceptionTestCase.generate_scenarios() diff --git a/oslo_messaging/tests/test_transport.py b/oslo_messaging/tests/test_transport.py index a9d7dc6b2..b8561f116 100644 --- a/oslo_messaging/tests/test_transport.py +++ b/oslo_messaging/tests/test_transport.py @@ -14,6 +14,7 @@ # under the License. import fixtures +import mock from mox3 import mox from oslo_config import cfg import six @@ -112,7 +113,8 @@ class GetTransportTestCase(test_utils.BaseTestCase): allowed=[]))), ] - def test_get_transport(self): + @mock.patch('oslo_messaging.transport.LOG') + def test_get_transport(self, fake_logger): self.config(rpc_backend=self.rpc_backend, control_exchange=self.control_exchange, transport_url=self.transport_url) @@ -142,6 +144,12 @@ class GetTransportTestCase(test_utils.BaseTestCase): kwargs['aliases'] = self.aliases transport_ = oslo_messaging.get_transport(self.conf, **kwargs) + if self.aliases is not None: + self.assertEqual(fake_logger.warning.mock_calls, + [mock.call('legacy "rpc_backend" is deprecated, ' + '"testfoo" must be replaced by ' + '"%s"' % self.aliases.get('testfoo'))]) + self.assertIsNotNone(transport_) self.assertIs(transport_.conf, self.conf) self.assertIs(transport_._driver, drvr) diff --git a/oslo_messaging/transport.py b/oslo_messaging/transport.py index aa8867dde..2a3c9aaf7 100644 --- a/oslo_messaging/transport.py +++ b/oslo_messaging/transport.py @@ -27,6 +27,8 @@ __all__ = [ 'set_transport_defaults', ] +import logging + from oslo_config import cfg import six from six.moves.urllib import parse @@ -34,6 +36,7 @@ from stevedore import driver from oslo_messaging import exceptions +LOG = logging.getLogger(__name__) _transport_opts = [ cfg.StrOpt('transport_url', @@ -227,7 +230,7 @@ class TransportURL(object): Transport URLs take the form:: - transport://user:pass@host1:port[,hostN:portN]/virtual_host + transport://user:pass@host:port[,userN:passN@hostN:portN]/virtual_host i.e. the scheme selects the transport driver, you may include multiple hosts in netloc and the path part is a "virtual host" partition path. @@ -240,7 +243,7 @@ class TransportURL(object): :type virtual_host: str :param hosts: a list of TransportHost objects :type hosts: list - :param aliases: A map of transport alias to transport name + :param aliases: DEPRECATED: A map of transport alias to transport name :type aliases: dict """ @@ -259,13 +262,28 @@ class TransportURL(object): else: self.aliases = aliases + self._deprecation_logged = False + @property def transport(self): if self._transport is None: transport = self.conf.rpc_backend else: transport = self._transport - return self.aliases.get(transport, transport) + final_transport = self.aliases.get(transport, transport) + if not self._deprecation_logged and final_transport != transport: + # NOTE(sileht): The first step is deprecate this one cycle. + # To ensure deployer have updated they configuration during Octavia + # Then in P we will deprecate aliases kwargs of TransportURL() and + # get_transport() for consuming application + LOG.warning('legacy "rpc_backend" is deprecated, ' + '"%(legacy_transport)s" must be replaced by ' + '"%(final_transport)s"' % { + 'legacy_transport': transport, + 'final_transport': final_transport}) + self._deprecation_logged = True + + return final_transport @transport.setter def transport(self, value): @@ -336,7 +354,7 @@ class TransportURL(object): Assuming a URL takes the form of:: - transport://user:pass@host1:port[,hostN:portN]/virtual_host + transport://user:pass@host:port[,userN:passN@hostN:portN]/virtual_host then parse the URL and return a TransportURL object. diff --git a/oslo_messaging/version.py b/oslo_messaging/version.py new file mode 100644 index 000000000..b4cd76b76 --- /dev/null +++ b/oslo_messaging/version.py @@ -0,0 +1,18 @@ +# Copyright 2016 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +import pbr.version + +version_info = pbr.version.VersionInfo('oslo_messaging') diff --git a/releasenotes/notes/add_reno-3b4ae0789e9c45b4.yaml b/releasenotes/notes/add_reno-3b4ae0789e9c45b4.yaml new file mode 100644 index 000000000..46a2da6a9 --- /dev/null +++ b/releasenotes/notes/add_reno-3b4ae0789e9c45b4.yaml @@ -0,0 +1,3 @@ +--- +other: + - Switch to reno for managing release notes. \ No newline at end of file diff --git a/oslo_messaging/_drivers/protocols/amqp/__init__.py b/releasenotes/source/_static/.placeholder similarity index 100% rename from oslo_messaging/_drivers/protocols/amqp/__init__.py rename to releasenotes/source/_static/.placeholder diff --git a/releasenotes/source/_templates/.placeholder b/releasenotes/source/_templates/.placeholder new file mode 100644 index 000000000..e69de29bb diff --git a/releasenotes/source/conf.py b/releasenotes/source/conf.py new file mode 100644 index 000000000..392c957ec --- /dev/null +++ b/releasenotes/source/conf.py @@ -0,0 +1,276 @@ +# -*- coding: utf-8 -*- +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# oslo.log Release Notes documentation build configuration file, created by +# sphinx-quickstart on Tue Nov 3 17:40:50 2015. +# +# This file is execfile()d with the current directory set to its +# containing dir. +# +# Note that not all possible configuration values are present in this +# autogenerated file. +# +# All configuration values have a default; values that are commented out +# serve to show the default. + +# If extensions (or modules to document with autodoc) are in another directory, +# add these directories to sys.path here. If the directory is relative to the +# documentation root, use os.path.abspath to make it absolute, like shown here. +# sys.path.insert(0, os.path.abspath('.')) + +# -- General configuration ------------------------------------------------ + +# If your documentation needs a minimal Sphinx version, state it here. +# needs_sphinx = '1.0' + +# Add any Sphinx extension module names here, as strings. They can be +# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom +# ones. +extensions = [ + 'oslosphinx', + 'reno.sphinxext', +] + +# Add any paths that contain templates here, relative to this directory. +templates_path = ['_templates'] + +# The suffix of source filenames. +source_suffix = '.rst' + +# The encoding of source files. +# source_encoding = 'utf-8-sig' + +# The master toctree document. +master_doc = 'index' + +# General information about the project. +project = u'oslo.messaging Release Notes' +copyright = u'2016, oslo.messaging Developers' + +# The version info for the project you're documenting, acts as replacement for +# |version| and |release|, also used in various other places throughout the +# built documents. +# +# The short X.Y version. +from oslo_messaging.version import version_info as oslo_messaging_version +# The full version, including alpha/beta/rc tags. +release = oslo_messaging_version.version_string_with_vcs() +# The short X.Y version. +version = oslo_messaging_version.canonical_version_string() + +# The language for content autogenerated by Sphinx. Refer to documentation +# for a list of supported languages. +# language = None + +# There are two options for replacing |today|: either, you set today to some +# non-false value, then it is used: +# today = '' +# Else, today_fmt is used as the format for a strftime call. +# today_fmt = '%B %d, %Y' + +# List of patterns, relative to source directory, that match files and +# directories to ignore when looking for source files. +exclude_patterns = [] + +# The reST default role (used for this markup: `text`) to use for all +# documents. +# default_role = None + +# If true, '()' will be appended to :func: etc. cross-reference text. +# add_function_parentheses = True + +# If true, the current module name will be prepended to all description +# unit titles (such as .. function::). +# add_module_names = True + +# If true, sectionauthor and moduleauthor directives will be shown in the +# output. They are ignored by default. +# show_authors = False + +# The name of the Pygments (syntax highlighting) style to use. +pygments_style = 'sphinx' + +# A list of ignored prefixes for module index sorting. +# modindex_common_prefix = [] + +# If true, keep warnings as "system message" paragraphs in the built documents. +# keep_warnings = False + + +# -- Options for HTML output ---------------------------------------------- + +# The theme to use for HTML and HTML Help pages. See the documentation for +# a list of builtin themes. +html_theme = 'default' + +# Theme options are theme-specific and customize the look and feel of a theme +# further. For a list of options available for each theme, see the +# documentation. +# html_theme_options = {} + +# Add any paths that contain custom themes here, relative to this directory. +# html_theme_path = [] + +# The name for this set of Sphinx documents. If None, it defaults to +# " v documentation". +# html_title = None + +# A shorter title for the navigation bar. Default is the same as html_title. +# html_short_title = None + +# The name of an image file (relative to this directory) to place at the top +# of the sidebar. +# html_logo = None + +# The name of an image file (within the static path) to use as favicon of the +# docs. This file should be a Windows icon file (.ico) being 16x16 or 32x32 +# pixels large. +# html_favicon = None + +# Add any paths that contain custom static files (such as style sheets) here, +# relative to this directory. They are copied after the builtin static files, +# so a file named "default.css" will overwrite the builtin "default.css". +html_static_path = ['_static'] + +# Add any extra paths that contain custom files (such as robots.txt or +# .htaccess) here, relative to this directory. These files are copied +# directly to the root of the documentation. +# html_extra_path = [] + +# If not '', a 'Last updated on:' timestamp is inserted at every page bottom, +# using the given strftime format. +# html_last_updated_fmt = '%b %d, %Y' + +# If true, SmartyPants will be used to convert quotes and dashes to +# typographically correct entities. +# html_use_smartypants = True + +# Custom sidebar templates, maps document names to template names. +# html_sidebars = {} + +# Additional templates that should be rendered to pages, maps page names to +# template names. +# html_additional_pages = {} + +# If false, no module index is generated. +# html_domain_indices = True + +# If false, no index is generated. +# html_use_index = True + +# If true, the index is split into individual pages for each letter. +# html_split_index = False + +# If true, links to the reST sources are added to the pages. +# html_show_sourcelink = True + +# If true, "Created using Sphinx" is shown in the HTML footer. Default is True. +# html_show_sphinx = True + +# If true, "(C) Copyright ..." is shown in the HTML footer. Default is True. +# html_show_copyright = True + +# If true, an OpenSearch description file will be output, and all pages will +# contain a tag referring to it. The value of this option must be the +# base URL from which the finished HTML is served. +# html_use_opensearch = '' + +# This is the file name suffix for HTML files (e.g. ".xhtml"). +# html_file_suffix = None + +# Output file base name for HTML help builder. +htmlhelp_basename = 'oslo.messagingReleaseNotesDoc' + + +# -- Options for LaTeX output --------------------------------------------- + +latex_elements = { + # The paper size ('letterpaper' or 'a4paper'). + # 'papersize': 'letterpaper', + + # The font size ('10pt', '11pt' or '12pt'). + # 'pointsize': '10pt', + + # Additional stuff for the LaTeX preamble. + # 'preamble': '', +} + +# Grouping the document tree into LaTeX files. List of tuples +# (source start file, target name, title, +# author, documentclass [howto, manual, or own class]). +latex_documents = [ + ('index', 'oslo.messagingReleaseNotes.tex', + u'oslo.messaging Release Notes Documentation', + u'oslo.messaging Developers', 'manual'), +] + +# The name of an image file (relative to this directory) to place at the top of +# the title page. +# latex_logo = None + +# For "manual" documents, if this is true, then toplevel headings are parts, +# not chapters. +# latex_use_parts = False + +# If true, show page references after internal links. +# latex_show_pagerefs = False + +# If true, show URL addresses after external links. +# latex_show_urls = False + +# Documents to append as an appendix to all manuals. +# latex_appendices = [] + +# If false, no module index is generated. +# latex_domain_indices = True + + +# -- Options for manual page output --------------------------------------- + +# One entry per manual page. List of tuples +# (source start file, name, description, authors, manual section). +man_pages = [ + ('index', 'oslo.messagingReleaseNotes', + u'oslo.messaging Release Notes Documentation', + [u'oslo.messaging Developers'], 1) +] + +# If true, show URL addresses after external links. +# man_show_urls = False + + +# -- Options for Texinfo output ------------------------------------------- + +# Grouping the document tree into Texinfo files. List of tuples +# (source start file, target name, title, author, +# dir menu entry, description, category) +texinfo_documents = [ + ('index', 'oslo.messagingReleaseNotes', + u'oslo.messaging Release Notes Documentation', + u'oslo.messaging Developers', 'oslo.messagingReleaseNotes', + 'One line description of project.', + 'Miscellaneous'), +] + +# Documents to append as an appendix to all manuals. +# texinfo_appendices = [] + +# If false, no module index is generated. +# texinfo_domain_indices = True + +# How to display URL addresses: 'footnote', 'no', or 'inline'. +# texinfo_show_urls = 'footnote' + +# If true, do not generate a @detailmenu in the "Top" node's menu. +# texinfo_no_detailmenu = False diff --git a/releasenotes/source/index.rst b/releasenotes/source/index.rst new file mode 100644 index 000000000..97dbb6b99 --- /dev/null +++ b/releasenotes/source/index.rst @@ -0,0 +1,8 @@ +============================= + oslo.messaging Release Notes +============================= + + .. toctree:: + :maxdepth: 1 + + unreleased diff --git a/releasenotes/source/unreleased.rst b/releasenotes/source/unreleased.rst new file mode 100644 index 000000000..5860a4691 --- /dev/null +++ b/releasenotes/source/unreleased.rst @@ -0,0 +1,5 @@ +========================== + Unreleased Release Notes +========================== + +.. release-notes:: diff --git a/requirements.txt b/requirements.txt index d3e99c4a5..7ec46a906 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,9 +8,9 @@ futurist>=0.11.0 # Apache-2.0 oslo.config>=3.9.0 # Apache-2.0 oslo.context>=2.2.0 # Apache-2.0 oslo.log>=1.14.0 # Apache-2.0 -oslo.utils>=3.5.0 # Apache-2.0 +oslo.utils>=3.11.0 # Apache-2.0 oslo.serialization>=1.10.0 # Apache-2.0 -oslo.service>=1.0.0 # Apache-2.0 +oslo.service>=1.10.0 # Apache-2.0 oslo.i18n>=2.1.0 # Apache-2.0 stevedore>=1.10.0 # Apache-2.0 debtcollector>=1.2.0 # Apache-2.0 @@ -33,7 +33,7 @@ PyYAML>=3.1.0 # MIT # rabbit driver is the default # we set the amqp version to ensure heartbeat works -amqp>=1.4.0 # LGPL +amqp<2.0,>=1.4.0 # LGPL kombu>=3.0.25 # BSD pika>=0.10.0 # BSD pika-pool>=0.1.3 # BSD diff --git a/setup-test-env-zmq.sh b/setup-test-env-zmq.sh index a8ed0dd32..ba767cacc 100755 --- a/setup-test-env-zmq.sh +++ b/setup-test-env-zmq.sh @@ -16,6 +16,7 @@ cat > ${DATADIR}/zmq.conf <=0.10.0 discover # BSD fixtures<2.0,>=1.3.1 # Apache-2.0/BSD -mock>=1.2 # BSD +mock>=2.0 # BSD mox3>=0.7.0 # Apache-2.0 python-subunit>=0.0.18 # Apache-2.0/BSD testrepository>=0.0.18 # Apache-2.0/BSD @@ -32,6 +32,7 @@ coverage>=3.6 # Apache-2.0 # this is required for the docs build jobs sphinx!=1.2.0,!=1.3b1,<1.3,>=1.1.2 # BSD oslosphinx!=3.4.0,>=2.5.0 # Apache-2.0 +reno>=1.6.2 # Apache2 # AMQP 1.0 support depends on the Qpid Proton AMQP 1.0 # development libraries. diff --git a/tox.ini b/tox.ini index 5a2abeb2e..e39daba8e 100644 --- a/tox.ini +++ b/tox.ini @@ -64,3 +64,6 @@ local-check-factory = oslo_messaging.hacking.checks.factory # of the requirements.txt files deps = pip_missing_reqs commands = pip-missing-reqs -d --ignore-module=oslo_messaging* --ignore-file=oslo_messaging/tests/* --ignore-file=tests/ oslo_messaging + +[testenv:releasenotes] +commands = sphinx-build -a -E -W -d releasenotes/build/doctrees -b html releasenotes/source releasenotes/build/html