From 141f59bd9b9d69a38abc533e4b10e2bf500d0f86 Mon Sep 17 00:00:00 2001 From: Oleksii Zamiatin Date: Wed, 29 Jul 2015 14:55:43 +0300 Subject: [PATCH] Notifier implementation Notifier implementation for zmq driver (ROUTER/DEALER variant). Publishers/consumers refactoring in order to make them pluggable. Change-Id: I2dd42cc805aa72b929a4dfa17498cd8b9c0ed7af --- oslo_messaging/_drivers/base.py | 2 +- oslo_messaging/_drivers/impl_zmq.py | 21 ++-- .../zmq_driver/{rpc => client}/__init__.py | 0 .../client => client/publishers}/__init__.py | 0 .../client/publishers/zmq_dealer_publisher.py | 77 ++++++++++++++ .../client/publishers/zmq_publisher_base.py | 56 ++++++++++ .../client/publishers/zmq_req_publisher.py | 85 +++++++++++++++ .../_drivers/zmq_driver/client/zmq_client.py | 73 +++++++++++++ .../_drivers/zmq_driver/client/zmq_request.py | 95 +++++++++++++++++ .../_drivers/zmq_driver/notifier/__init__.py | 1 - .../zmq_driver/rpc/client/zmq_call_request.py | 76 ------------- .../zmq_driver/rpc/client/zmq_cast_dealer.py | 100 ------------------ .../rpc/client/zmq_cast_publisher.py | 31 ------ .../zmq_driver/rpc/client/zmq_client.py | 41 ------- .../zmq_driver/rpc/client/zmq_request.py | 66 ------------ .../zmq_driver/{rpc => }/server/__init__.py | 0 .../zmq_driver/server/consumers/__init__.py | 0 .../consumers/zmq_router_consumer.py} | 63 +++++------ .../{rpc => }/server/zmq_incoming_message.py | 29 +++-- .../_drivers/zmq_driver/server/zmq_server.py | 80 ++++++++++++++ .../{zmq_target.py => zmq_address.py} | 0 .../{zmq_serializer.py => zmq_names.py} | 17 ++- .../tests/drivers/zmq/test_impl_zmq.py | 27 ++++- .../tests/functional/test_functional.py | 5 - tools/simulator.py | 8 +- tox.ini | 2 +- 26 files changed, 572 insertions(+), 383 deletions(-) rename oslo_messaging/_drivers/zmq_driver/{rpc => client}/__init__.py (100%) rename oslo_messaging/_drivers/zmq_driver/{rpc/client => client/publishers}/__init__.py (100%) create mode 100644 oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_dealer_publisher.py create mode 100644 oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py create mode 100644 oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_req_publisher.py create mode 100644 oslo_messaging/_drivers/zmq_driver/client/zmq_client.py create mode 100644 oslo_messaging/_drivers/zmq_driver/client/zmq_request.py delete mode 100644 oslo_messaging/_drivers/zmq_driver/notifier/__init__.py delete mode 100644 oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_call_request.py delete mode 100644 oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_cast_dealer.py delete mode 100644 oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_cast_publisher.py delete mode 100644 oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_client.py delete mode 100644 oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_request.py rename oslo_messaging/_drivers/zmq_driver/{rpc => }/server/__init__.py (100%) create mode 100644 oslo_messaging/_drivers/zmq_driver/server/consumers/__init__.py rename oslo_messaging/_drivers/zmq_driver/{rpc/server/zmq_server.py => server/consumers/zmq_router_consumer.py} (55%) rename oslo_messaging/_drivers/zmq_driver/{rpc => }/server/zmq_incoming_message.py (71%) create mode 100644 oslo_messaging/_drivers/zmq_driver/server/zmq_server.py rename oslo_messaging/_drivers/zmq_driver/{zmq_target.py => zmq_address.py} (100%) rename oslo_messaging/_drivers/zmq_driver/{zmq_serializer.py => zmq_names.py} (64%) diff --git a/oslo_messaging/_drivers/base.py b/oslo_messaging/_drivers/base.py index 1d2620825..607821faa 100644 --- a/oslo_messaging/_drivers/base.py +++ b/oslo_messaging/_drivers/base.py @@ -111,7 +111,7 @@ class BaseDriver(object): """Construct a Listener for the given target.""" @abc.abstractmethod - def listen_for_notifications(self, targets_and_priorities): + def listen_for_notifications(self, targets_and_priorities, pool): """Construct a notification Listener for the given list of tuple of (target, priority). """ diff --git a/oslo_messaging/_drivers/impl_zmq.py b/oslo_messaging/_drivers/impl_zmq.py index 18086eb43..7a4086009 100644 --- a/oslo_messaging/_drivers/impl_zmq.py +++ b/oslo_messaging/_drivers/impl_zmq.py @@ -21,8 +21,8 @@ from stevedore import driver from oslo_messaging._drivers import base from oslo_messaging._drivers import common as rpc_common -from oslo_messaging._drivers.zmq_driver.rpc.client import zmq_client -from oslo_messaging._drivers.zmq_driver.rpc.server import zmq_server +from oslo_messaging._drivers.zmq_driver.client import zmq_client +from oslo_messaging._drivers.zmq_driver.server import zmq_server from oslo_messaging._executors import base as executor_base @@ -108,21 +108,28 @@ class ZmqDriver(base.BaseDriver): def send(self, target, ctxt, message, wait_for_reply=None, timeout=None, retry=None): + timeout = timeout or self.conf.rpc_response_timeout if wait_for_reply: - return self.client.call(target, ctxt, message, timeout, retry) + return self.client.send_call(target, ctxt, message, timeout, retry) + elif target.fanout: + self.client.send_fanout(target, ctxt, message, timeout, retry) else: - self.client.cast(target, ctxt, message, timeout, retry) - return None + self.client.send_cast(target, ctxt, message, timeout, retry) def send_notification(self, target, ctxt, message, version, retry=None): - return None + if target.fanout: + self.client.send_notify_fanout(target, ctxt, message, version, + retry) + else: + self.client.send_notify(target, ctxt, message, version, retry) def listen(self, target): self.server.listen(target) return self.server def listen_for_notifications(self, targets_and_priorities, pool): - return None + self.server.listen_notification(targets_and_priorities) + return self.server def cleanup(self): self.client.cleanup() diff --git a/oslo_messaging/_drivers/zmq_driver/rpc/__init__.py b/oslo_messaging/_drivers/zmq_driver/client/__init__.py similarity index 100% rename from oslo_messaging/_drivers/zmq_driver/rpc/__init__.py rename to oslo_messaging/_drivers/zmq_driver/client/__init__.py diff --git a/oslo_messaging/_drivers/zmq_driver/rpc/client/__init__.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/__init__.py similarity index 100% rename from oslo_messaging/_drivers/zmq_driver/rpc/client/__init__.py rename to oslo_messaging/_drivers/zmq_driver/client/publishers/__init__.py diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_dealer_publisher.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_dealer_publisher.py new file mode 100644 index 000000000..bf6f253f9 --- /dev/null +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_dealer_publisher.py @@ -0,0 +1,77 @@ +# Copyright 2015 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging + +from oslo_messaging._drivers import common as rpc_common +from oslo_messaging._drivers.zmq_driver.client.publishers\ + import zmq_publisher_base +from oslo_messaging._drivers.zmq_driver import zmq_address +from oslo_messaging._drivers.zmq_driver import zmq_async +from oslo_messaging._drivers.zmq_driver import zmq_names +from oslo_messaging._i18n import _LE, _LI + +LOG = logging.getLogger(__name__) + +zmq = zmq_async.import_zmq() + + +class DealerPublisher(zmq_publisher_base.PublisherBase): + + def send_request(self, request): + + if request.msg_type == zmq_names.CALL_TYPE: + raise zmq_publisher_base.UnsupportedSendPattern(request.msg_type) + + dealer_socket, hosts = self._check_hosts_connections(request.target) + + if request.msg_type in zmq_names.MULTISEND_TYPES: + for _ in range(len(hosts)): + self._send_request(dealer_socket, request) + else: + self._send_request(dealer_socket, request) + + def _send_request(self, socket, request): + + socket.send(b'', zmq.SNDMORE) + super(DealerPublisher, self)._send_request(socket, request) + + LOG.info(_LI("Sending message %(message)s to a target %(target)s") + % {"message": request.message, + "target": request.target}) + + def _check_hosts_connections(self, target): + if str(target) in self.outbound_sockets: + dealer_socket, hosts = self.outbound_sockets[str(target)] + else: + dealer_socket = zmq.Context().socket(zmq.DEALER) + hosts = self.matchmaker.get_hosts(target) + for host in hosts: + self._connect_to_host(dealer_socket, host, target) + self.outbound_sockets[str(target)] = (dealer_socket, hosts) + return dealer_socket, hosts + + @staticmethod + def _connect_to_host(socket, host, target): + address = zmq_address.get_tcp_direct_address(host) + try: + LOG.info(_LI("Connecting DEALER to %(address)s for %(target)s") + % {"address": address, + "target": target}) + socket.connect(address) + except zmq.ZMQError as e: + errmsg = _LE("Failed connecting DEALER to %(address)s: %(e)s")\ + % (address, e) + LOG.error(errmsg) + raise rpc_common.RPCException(errmsg) diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py new file mode 100644 index 000000000..0f32f5884 --- /dev/null +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py @@ -0,0 +1,56 @@ +# Copyright 2015 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import abc + +import six + +from oslo_messaging._drivers import common as rpc_common +from oslo_messaging._drivers.zmq_driver import zmq_async +from oslo_messaging._i18n import _LE + + +zmq = zmq_async.import_zmq() + + +class UnsupportedSendPattern(rpc_common.RPCException): + + def __init__(self, pattern_name): + errmsg = _LE("Sending pattern %s is unsupported.") % pattern_name + super(UnsupportedSendPattern, self).__init__(errmsg) + + +@six.add_metaclass(abc.ABCMeta) +class PublisherBase(object): + + def __init__(self, conf, matchmaker): + self.conf = conf + self.zmq_context = zmq.Context() + self.matchmaker = matchmaker + self.outbound_sockets = {} + super(PublisherBase, self).__init__() + + @abc.abstractmethod + def send_request(self, request): + """Send request to consumer""" + + def _send_request(self, socket, request): + socket.send_string(request.msg_type, zmq.SNDMORE) + socket.send_json(request.context, zmq.SNDMORE) + socket.send_json(request.message) + + def cleanup(self): + for socket, hosts in self.outbound_sockets.values(): + socket.setsockopt(zmq.LINGER, 0) + socket.close() diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_req_publisher.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_req_publisher.py new file mode 100644 index 000000000..68beab903 --- /dev/null +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_req_publisher.py @@ -0,0 +1,85 @@ +# Copyright 2015 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import contextlib +import logging + +import oslo_messaging +from oslo_messaging._drivers import common as rpc_common +from oslo_messaging._drivers.zmq_driver.client.publishers\ + import zmq_publisher_base +from oslo_messaging._drivers.zmq_driver import zmq_address +from oslo_messaging._drivers.zmq_driver import zmq_async +from oslo_messaging._drivers.zmq_driver import zmq_names +from oslo_messaging._i18n import _LE, _LI + +LOG = logging.getLogger(__name__) + +zmq = zmq_async.import_zmq() + + +class ReqPublisher(zmq_publisher_base.PublisherBase): + + def send_request(self, request): + + if request.msg_type != zmq_names.CALL_TYPE: + raise zmq_publisher_base.UnsupportedSendPattern(request.msg_type) + + socket = self._connect_to_host(request.target) + self._send_request(socket, request) + return self._receive_reply(socket, request) + + def _connect_to_host(self, target): + + try: + self.zmq_context = zmq.Context() + socket = self.zmq_context.socket(zmq.REQ) + + host = self.matchmaker.get_single_host(target) + connect_address = zmq_address.get_tcp_direct_address(host) + + LOG.info(_LI("Connecting REQ to %s") % connect_address) + + socket.connect(connect_address) + self.outbound_sockets[str(target)] = (socket, [host]) + return socket + + except zmq.ZMQError as e: + errmsg = _LE("Error connecting to socket: %s") % str(e) + LOG.error(errmsg) + raise rpc_common.RPCException(errmsg) + + @staticmethod + def _receive_reply(socket, request): + + def _receive_method(socket): + return socket.recv_json() + + # NOTE(ozamiatin): Check for retry here (no retries now) + with contextlib.closing(zmq_async.get_reply_poller()) as poller: + poller.register(socket, recv_method=_receive_method) + reply, socket = poller.poll(timeout=request.timeout) + if reply is None: + raise oslo_messaging.MessagingTimeout( + "Timeout %s seconds was reached" % request.timeout) + if reply[zmq_names.FIELD_FAILURE]: + raise rpc_common.deserialize_remote_exception( + reply[zmq_names.FIELD_FAILURE], + request.allowed_remote_exmods) + else: + return reply[zmq_names.FIELD_REPLY] + + def close(self): + # For contextlib compatibility + self.cleanup() diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py new file mode 100644 index 000000000..23dfd09eb --- /dev/null +++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py @@ -0,0 +1,73 @@ +# Copyright 2015 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import contextlib + + +from oslo_messaging._drivers.zmq_driver.client.publishers\ + import zmq_dealer_publisher +from oslo_messaging._drivers.zmq_driver.client.publishers\ + import zmq_req_publisher +from oslo_messaging._drivers.zmq_driver.client import zmq_request +from oslo_messaging._drivers.zmq_driver import zmq_async + +zmq = zmq_async.import_zmq() + + +class ZmqClient(object): + + def __init__(self, conf, matchmaker=None, allowed_remote_exmods=None): + self.conf = conf + self.context = zmq.Context() + self.matchmaker = matchmaker + self.allowed_remote_exmods = allowed_remote_exmods or [] + self.dealer_publisher = zmq_dealer_publisher.DealerPublisher( + conf, matchmaker) + + def send_call(self, target, context, message, timeout=None, retry=None): + with contextlib.closing(zmq_request.CallRequest( + target, context=context, message=message, + timeout=timeout, retry=retry, + allowed_remote_exmods=self.allowed_remote_exmods)) as request: + with contextlib.closing(zmq_req_publisher.ReqPublisher( + self.conf, self.matchmaker)) as req_publisher: + return req_publisher.send_request(request) + + def send_cast(self, target, context, message, timeout=None, retry=None): + with contextlib.closing(zmq_request.CastRequest( + target, context=context, message=message, + timeout=timeout, retry=retry)) as request: + self.dealer_publisher.send_request(request) + + def send_fanout(self, target, context, message, timeout=None, retry=None): + with contextlib.closing(zmq_request.FanoutRequest( + target, context=context, message=message, + timeout=timeout, retry=retry)) as request: + self.dealer_publisher.send_request(request) + + def send_notify(self, target, context, message, version, retry=None): + with contextlib.closing(zmq_request.NotificationRequest( + target, context, message, version=version, + retry=retry)) as request: + self.dealer_publisher.send_request(request) + + def send_notify_fanout(self, target, context, message, version, + retry=None): + with contextlib.closing(zmq_request.NotificationFanoutRequest( + target, context, message, version=version, + retry=retry)) as request: + self.dealer_publisher.send_request(request) + + def cleanup(self): + self.dealer_publisher.cleanup() diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_request.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_request.py new file mode 100644 index 000000000..1caedff3e --- /dev/null +++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_request.py @@ -0,0 +1,95 @@ +# Copyright 2015 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import abc +import logging + +import six + +from oslo_messaging._drivers.zmq_driver import zmq_async +from oslo_messaging._drivers.zmq_driver import zmq_names +from oslo_messaging._i18n import _LE + +LOG = logging.getLogger(__name__) + +zmq = zmq_async.import_zmq() + + +@six.add_metaclass(abc.ABCMeta) +class Request(object): + + def __init__(self, target, context=None, message=None, retry=None): + + if self.msg_type not in zmq_names.MESSAGE_TYPES: + raise RuntimeError("Unknown message type!") + + self.target = target + self.context = context + self.message = message + self.retry = retry + + @abc.abstractproperty + def msg_type(self): + """ZMQ message type""" + + def close(self): + """Nothing to close in base request""" + + +class RpcRequest(Request): + + def __init__(self, *args, **kwargs): + message = kwargs.get("message") + if message['method'] is None: + errmsg = _LE("No method specified for RPC call") + LOG.error(errmsg) + raise KeyError(errmsg) + + self.timeout = kwargs.pop("timeout") + assert self.timeout is not None, "Timeout should be specified!" + + super(RpcRequest, self).__init__(*args, **kwargs) + + +class CallRequest(RpcRequest): + + msg_type = zmq_names.CALL_TYPE + + def __init__(self, *args, **kwargs): + self.allowed_remote_exmods = kwargs.pop("allowed_remote_exmods") + super(CallRequest, self).__init__(*args, **kwargs) + + +class CastRequest(RpcRequest): + + msg_type = zmq_names.CAST_TYPE + + +class FanoutRequest(RpcRequest): + + msg_type = zmq_names.CAST_FANOUT_TYPE + + +class NotificationRequest(Request): + + msg_type = zmq_names.NOTIFY_TYPE + + def __init__(self, *args, **kwargs): + self.version = kwargs.pop("version") + super(NotificationRequest, self).__init__(*args, **kwargs) + + +class NotificationFanoutRequest(NotificationRequest): + + msg_type = zmq_names.NOTIFY_FANOUT_TYPE diff --git a/oslo_messaging/_drivers/zmq_driver/notifier/__init__.py b/oslo_messaging/_drivers/zmq_driver/notifier/__init__.py deleted file mode 100644 index 8af3e63a7..000000000 --- a/oslo_messaging/_drivers/zmq_driver/notifier/__init__.py +++ /dev/null @@ -1 +0,0 @@ -__author__ = 'ozamiatin' diff --git a/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_call_request.py b/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_call_request.py deleted file mode 100644 index 0d35c31a6..000000000 --- a/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_call_request.py +++ /dev/null @@ -1,76 +0,0 @@ -# Copyright 2015 Mirantis, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import logging - -import oslo_messaging -from oslo_messaging._drivers import common as rpc_common -from oslo_messaging._drivers.zmq_driver.rpc.client.zmq_request import Request -from oslo_messaging._drivers.zmq_driver import zmq_async -from oslo_messaging._drivers.zmq_driver import zmq_serializer -from oslo_messaging._drivers.zmq_driver import zmq_target -from oslo_messaging._i18n import _LE, _LI - -LOG = logging.getLogger(__name__) - -zmq = zmq_async.import_zmq() - - -class CallRequest(Request): - - msg_type = zmq_serializer.CALL_TYPE - - def __init__(self, conf, target, context, message, timeout=None, - retry=None, allowed_remote_exmods=None, matchmaker=None): - self.allowed_remote_exmods = allowed_remote_exmods or [] - self.matchmaker = matchmaker - self.reply_poller = zmq_async.get_reply_poller() - - try: - self.zmq_context = zmq.Context() - socket = self.zmq_context.socket(zmq.REQ) - super(CallRequest, self).__init__(conf, target, context, - message, socket, - timeout, retry) - self.host = self.matchmaker.get_single_host(self.target) - self.connect_address = zmq_target.get_tcp_direct_address( - self.host) - LOG.info(_LI("Connecting REQ to %s") % self.connect_address) - self.socket.connect(self.connect_address) - self.reply_poller.register( - self.socket, recv_method=lambda socket: socket.recv_json()) - - except zmq.ZMQError as e: - errmsg = _LE("Error connecting to socket: %s") % str(e) - LOG.error(errmsg) - raise rpc_common.RPCException(errmsg) - - def close(self): - self.reply_poller.close() - self.socket.setsockopt(zmq.LINGER, 0) - self.socket.close() - - def receive_reply(self): - # NOTE(ozamiatin): Check for retry here (no retries now) - reply, socket = self.reply_poller.poll(timeout=self.timeout) - if reply is None: - raise oslo_messaging.MessagingTimeout( - "Timeout %s seconds was reached" % self.timeout) - - if reply[zmq_serializer.FIELD_FAILURE]: - raise rpc_common.deserialize_remote_exception( - reply[zmq_serializer.FIELD_FAILURE], - self.allowed_remote_exmods) - else: - return reply[zmq_serializer.FIELD_REPLY] diff --git a/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_cast_dealer.py b/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_cast_dealer.py deleted file mode 100644 index f1257badb..000000000 --- a/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_cast_dealer.py +++ /dev/null @@ -1,100 +0,0 @@ -# Copyright 2015 Mirantis, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import logging - -from oslo_messaging._drivers import common as rpc_common -from oslo_messaging._drivers.zmq_driver.rpc.client import zmq_cast_publisher -from oslo_messaging._drivers.zmq_driver.rpc.client.zmq_request import Request -from oslo_messaging._drivers.zmq_driver import zmq_async -from oslo_messaging._drivers.zmq_driver import zmq_serializer -from oslo_messaging._drivers.zmq_driver import zmq_target -from oslo_messaging._i18n import _LE, _LI - -LOG = logging.getLogger(__name__) - -zmq = zmq_async.import_zmq() - - -class CastRequest(Request): - - msg_type = zmq_serializer.CAST_TYPE - - def __call__(self, *args, **kwargs): - self.send_request() - - def send_request(self): - self.socket.send(b'', zmq.SNDMORE) - super(CastRequest, self).send_request() - - def receive_reply(self): - # Ignore reply for CAST - pass - - -class FanoutRequest(CastRequest): - - msg_type = zmq_serializer.FANOUT_TYPE - - def __init__(self, *args, **kwargs): - self.hosts_count = kwargs.pop("hosts_count") - super(FanoutRequest, self).__init__(*args, **kwargs) - - def send_request(self): - for _ in range(self.hosts_count): - super(FanoutRequest, self).send_request() - - -class DealerCastPublisher(zmq_cast_publisher.CastPublisherBase): - - def __init__(self, conf, matchmaker): - super(DealerCastPublisher, self).__init__(conf) - self.matchmaker = matchmaker - - def cast(self, target, context, - message, timeout=None, retry=None): - if str(target) in self.outbound_sockets: - dealer_socket, hosts = self.outbound_sockets[str(target)] - else: - dealer_socket = zmq.Context().socket(zmq.DEALER) - hosts = self.matchmaker.get_hosts(target) - for host in hosts: - self._connect_to_host(dealer_socket, host) - self.outbound_sockets[str(target)] = (dealer_socket, hosts) - - if target.fanout: - request = FanoutRequest(self.conf, target, context, message, - dealer_socket, timeout, retry, - hosts_count=len(hosts)) - else: - request = CastRequest(self.conf, target, context, message, - dealer_socket, timeout, retry) - - request.send_request() - - def _connect_to_host(self, socket, host): - address = zmq_target.get_tcp_direct_address(host) - try: - LOG.info(_LI("Connecting DEALER to %s") % address) - socket.connect(address) - except zmq.ZMQError as e: - errmsg = _LE("Failed connecting DEALER to %(address)s: %(e)s")\ - % (address, e) - LOG.error(errmsg) - raise rpc_common.RPCException(errmsg) - - def cleanup(self): - for socket, hosts in self.outbound_sockets.values(): - socket.setsockopt(zmq.LINGER, 0) - socket.close() diff --git a/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_cast_publisher.py b/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_cast_publisher.py deleted file mode 100644 index 38a470ba8..000000000 --- a/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_cast_publisher.py +++ /dev/null @@ -1,31 +0,0 @@ -# Copyright 2015 Mirantis, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import abc - -import six - - -@six.add_metaclass(abc.ABCMeta) -class CastPublisherBase(object): - - def __init__(self, conf): - self.conf = conf - self.outbound_sockets = {} - super(CastPublisherBase, self).__init__() - - @abc.abstractmethod - def cast(self, target, context, - message, timeout=None, retry=None): - "Send CAST to target" diff --git a/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_client.py b/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_client.py deleted file mode 100644 index 2bdbee18b..000000000 --- a/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_client.py +++ /dev/null @@ -1,41 +0,0 @@ -# Copyright 2015 Mirantis, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import contextlib - -from oslo_messaging._drivers.zmq_driver.rpc.client import zmq_call_request -from oslo_messaging._drivers.zmq_driver.rpc.client import zmq_cast_dealer - - -class ZmqClient(object): - - def __init__(self, conf, matchmaker=None, allowed_remote_exmods=None): - self.conf = conf - self.matchmaker = matchmaker - self.allowed_remote_exmods = allowed_remote_exmods or [] - self.cast_publisher = zmq_cast_dealer.DealerCastPublisher(conf, - matchmaker) - - def call(self, target, context, message, timeout=None, retry=None): - with contextlib.closing(zmq_call_request.CallRequest( - self.conf, target, context, message, timeout, retry, - self.allowed_remote_exmods, - self.matchmaker)) as request: - return request() - - def cast(self, target, context, message, timeout=None, retry=None): - self.cast_publisher.cast(target, context, message, timeout, retry) - - def cleanup(self): - self.cast_publisher.cleanup() diff --git a/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_request.py b/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_request.py deleted file mode 100644 index b06699d93..000000000 --- a/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_request.py +++ /dev/null @@ -1,66 +0,0 @@ -# Copyright 2015 Mirantis, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import abc -import logging - -import six - -from oslo_messaging._drivers.zmq_driver import zmq_async -from oslo_messaging._drivers.zmq_driver import zmq_serializer -from oslo_messaging._i18n import _LE - -LOG = logging.getLogger(__name__) - -zmq = zmq_async.import_zmq() - - -@six.add_metaclass(abc.ABCMeta) -class Request(object): - - def __init__(self, conf, target, context, message, - socket, timeout=None, retry=None): - - if self.msg_type not in zmq_serializer.MESSAGE_TYPES: - raise RuntimeError("Unknown msg type!") - - if message['method'] is None: - errmsg = _LE("No method specified for RPC call") - LOG.error(errmsg) - raise KeyError(errmsg) - - self.target = target - self.context = context - self.message = message - self.timeout = timeout or conf.rpc_response_timeout - self.retry = retry - self.reply = None - self.socket = socket - - @abc.abstractproperty - def msg_type(self): - """ZMQ message type""" - - def send_request(self): - self.socket.send_string(self.msg_type, zmq.SNDMORE) - self.socket.send_json(self.context, zmq.SNDMORE) - self.socket.send_json(self.message) - - def __call__(self): - self.send_request() - return self.receive_reply() - - @abc.abstractmethod - def receive_reply(self): - "Receive reply from server side" diff --git a/oslo_messaging/_drivers/zmq_driver/rpc/server/__init__.py b/oslo_messaging/_drivers/zmq_driver/server/__init__.py similarity index 100% rename from oslo_messaging/_drivers/zmq_driver/rpc/server/__init__.py rename to oslo_messaging/_drivers/zmq_driver/server/__init__.py diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/__init__.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_server.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py similarity index 55% rename from oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_server.py rename to oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py index 981966ddc..58680da90 100644 --- a/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_server.py +++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py @@ -14,60 +14,50 @@ import logging -from oslo_messaging._drivers import base from oslo_messaging._drivers import common as rpc_common -from oslo_messaging._drivers.zmq_driver.rpc.server import zmq_incoming_message +from oslo_messaging._drivers.zmq_driver.server import zmq_incoming_message +from oslo_messaging._drivers.zmq_driver import zmq_address from oslo_messaging._drivers.zmq_driver import zmq_async -from oslo_messaging._drivers.zmq_driver import zmq_serializer -from oslo_messaging._drivers.zmq_driver import zmq_target -from oslo_messaging._i18n import _LE +from oslo_messaging._drivers.zmq_driver import zmq_names +from oslo_messaging._i18n import _LE, _LI LOG = logging.getLogger(__name__) zmq = zmq_async.import_zmq() -class ZmqServer(base.Listener): +class RouterConsumer(object): + + def __init__(self, conf, poller, server): + + self.poller = poller + self.server = server - def __init__(self, conf, matchmaker=None): - self.conf = conf try: self.context = zmq.Context() self.socket = self.context.socket(zmq.ROUTER) - self.address = zmq_target.get_tcp_random_address(conf) + self.address = zmq_address.get_tcp_random_address(conf) self.port = self.socket.bind_to_random_port(self.address) - LOG.info("Run server on %s:%d" % (self.address, self.port)) + LOG.info(_LI("Run ROUTER consumer on %(addr)s:%(port)d"), + {"addr": self.address, + "port": self.port}) except zmq.ZMQError as e: errmsg = _LE("Failed binding to port %(port)d: %(e)s")\ % (self.port, e) LOG.error(errmsg) raise rpc_common.RPCException(errmsg) - self.poller = zmq_async.get_poller() - self.matchmaker = matchmaker - - def poll(self, timeout=None): + def listen(self, target): + LOG.info(_LI("Listen to target %s") % str(target)) self.poller.register(self.socket, self._receive_message) - incoming = self.poller.poll(timeout or self.conf.rpc_poll_timeout) - return incoming[0] - - def stop(self): - LOG.info("Stop server tcp://%s:%d" % (self.address, self.port)) def cleanup(self): - self.poller.close() if not self.socket.closed: self.socket.setsockopt(zmq.LINGER, 0) self.socket.close() - def listen(self, target): - LOG.info("Listen to Target %s on tcp://%s:%d" % - (target, self.address, self.port)) - host = zmq_target.combine_address(self.conf.rpc_zmq_host, self.port) - self.matchmaker.register(target=target, - hostname=host) - def _receive_message(self, socket): + try: reply_id = socket.recv() empty = socket.recv() @@ -76,15 +66,20 @@ class ZmqServer(base.Listener): assert msg_type is not None, 'Bad format: msg type expected' context = socket.recv_json() message = socket.recv_json() - LOG.debug("Received CALL message %s" % str(message)) + LOG.debug("Received %s message %s" % (msg_type, str(message))) - direct_type = (zmq_serializer.CALL_TYPE, zmq_serializer.CAST_TYPE) - if msg_type in direct_type: + if msg_type == zmq_names.CALL_TYPE: return zmq_incoming_message.ZmqIncomingRequest( - self, context, message, socket, reply_id, self.poller) - elif msg_type == zmq_serializer.FANOUT_TYPE: - return zmq_incoming_message.ZmqFanoutMessage( - self, context, message, socket, self.poller) + self.server, context, message, socket, reply_id, + self.poller) + elif msg_type in zmq_names.CAST_TYPES: + return zmq_incoming_message.ZmqCastMessage( + self.server, context, message, socket, self.poller) + elif msg_type in zmq_names.NOTIFY_TYPES: + return zmq_incoming_message.ZmqNotificationMessage( + self.server, context, message, socket, self.poller) + else: + LOG.error(_LE("Unknown message type: %s") % msg_type) except zmq.ZMQError as e: LOG.error(_LE("Receiving message failed: %s") % str(e)) diff --git a/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_incoming_message.py b/oslo_messaging/_drivers/zmq_driver/server/zmq_incoming_message.py similarity index 71% rename from oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_incoming_message.py rename to oslo_messaging/_drivers/zmq_driver/server/zmq_incoming_message.py index 1373019e1..d953e9334 100644 --- a/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_incoming_message.py +++ b/oslo_messaging/_drivers/zmq_driver/server/zmq_incoming_message.py @@ -18,7 +18,7 @@ import logging from oslo_messaging._drivers import base from oslo_messaging._drivers import common as rpc_common from oslo_messaging._drivers.zmq_driver import zmq_async -from oslo_messaging._drivers.zmq_driver import zmq_serializer +from oslo_messaging._drivers.zmq_driver import zmq_names LOG = logging.getLogger(__name__) @@ -39,9 +39,9 @@ class ZmqIncomingRequest(base.IncomingMessage): if failure is not None: failure = rpc_common.serialize_remote_exception(failure, log_failure) - message_reply = {zmq_serializer.FIELD_REPLY: reply, - zmq_serializer.FIELD_FAILURE: failure, - zmq_serializer.FIELD_LOG_FAILURE: log_failure} + message_reply = {zmq_names.FIELD_REPLY: reply, + zmq_names.FIELD_FAILURE: failure, + zmq_names.FIELD_LOG_FAILURE: log_failure} LOG.debug("Replying %s REP", (str(message_reply))) self.received = True self.reply_socket.send(self.reply_id, zmq.SNDMORE) @@ -56,10 +56,10 @@ class ZmqIncomingRequest(base.IncomingMessage): pass -class ZmqFanoutMessage(base.IncomingMessage): +class ZmqCastMessage(base.IncomingMessage): def __init__(self, listener, context, message, socket, poller): - super(ZmqFanoutMessage, self).__init__(listener, context, message) + super(ZmqCastMessage, self).__init__(listener, context, message) poller.resume_polling(socket) def reply(self, reply=None, failure=None, log_failure=True): @@ -70,3 +70,20 @@ class ZmqFanoutMessage(base.IncomingMessage): def requeue(self): pass + + +class ZmqNotificationMessage(base.IncomingMessage): + + def __init__(self, listener, context, message, socket, poller): + super(ZmqNotificationMessage, self).__init__(listener, context, + message) + poller.resume_polling(socket) + + def reply(self, reply=None, failure=None, log_failure=True): + """Reply is not needed for notification messages""" + + def acknowledge(self): + pass + + def requeue(self): + pass diff --git a/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py b/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py new file mode 100644 index 000000000..30cacd409 --- /dev/null +++ b/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py @@ -0,0 +1,80 @@ +# Copyright 2015 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import copy +import logging + +from oslo_messaging._drivers import base +from oslo_messaging._drivers.zmq_driver.server.consumers\ + import zmq_router_consumer +from oslo_messaging._drivers.zmq_driver import zmq_address +from oslo_messaging._drivers.zmq_driver import zmq_async + +LOG = logging.getLogger(__name__) + +zmq = zmq_async.import_zmq() + + +class ZmqServer(base.Listener): + + def __init__(self, conf, matchmaker=None): + self.conf = conf + self.matchmaker = matchmaker + self.poller = zmq_async.get_poller() + self.rpc_consumer = zmq_router_consumer.RouterConsumer( + conf, self.poller, self) + self.notify_consumer = self.rpc_consumer + self.consumers = [self.rpc_consumer] + + def poll(self, timeout=None): + message, socket = self.poller.poll( + timeout or self.conf.rpc_poll_timeout) + return message + + def stop(self): + consumer = self.rpc_consumer + LOG.info("Stop server %s:%d" % (consumer.address, consumer.port)) + + def cleanup(self): + self.poller.close() + for consumer in self.consumers: + consumer.cleanup() + + def listen(self, target): + + consumer = self.rpc_consumer + consumer.listen(target) + + LOG.info("Listen to target %s on %s:%d" % + (target, consumer.address, consumer.port)) + + host = zmq_address.combine_address(self.conf.rpc_zmq_host, + consumer.port) + self.matchmaker.register(target=target, + hostname=host) + + def listen_notification(self, targets_and_priorities): + + consumer = self.notify_consumer + + LOG.info("Listen for notifications on %s:%d" + % (consumer.address, consumer.port)) + + for target, priority in targets_and_priorities: + host = zmq_address.combine_address(self.conf.rpc_zmq_host, + consumer.port) + t = copy.deepcopy(target) + t.topic = target.topic + '.' + priority + self.matchmaker.register(target=t, hostname=host) + consumer.listen(t) diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_target.py b/oslo_messaging/_drivers/zmq_driver/zmq_address.py similarity index 100% rename from oslo_messaging/_drivers/zmq_driver/zmq_target.py rename to oslo_messaging/_drivers/zmq_driver/zmq_address.py diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_serializer.py b/oslo_messaging/_drivers/zmq_driver/zmq_names.py similarity index 64% rename from oslo_messaging/_drivers/zmq_driver/zmq_serializer.py rename to oslo_messaging/_drivers/zmq_driver/zmq_names.py index 6026ca655..583600ec4 100644 --- a/oslo_messaging/_drivers/zmq_driver/zmq_serializer.py +++ b/oslo_messaging/_drivers/zmq_driver/zmq_names.py @@ -12,9 +12,6 @@ # License for the specific language governing permissions and limitations # under the License. -MESSAGE_CALL_TYPE_POSITION = 1 -MESSAGE_CALL_TARGET_POSITION = 2 -MESSAGE_CALL_TOPIC_POSITION = 3 FIELD_FAILURE = 'failure' FIELD_REPLY = 'reply' @@ -22,7 +19,17 @@ FIELD_LOG_FAILURE = 'log_failure' CALL_TYPE = 'call' CAST_TYPE = 'cast' -FANOUT_TYPE = 'fanout' +CAST_FANOUT_TYPE = 'cast-f' NOTIFY_TYPE = 'notify' +NOTIFY_FANOUT_TYPE = 'notify-f' -MESSAGE_TYPES = (CALL_TYPE, CAST_TYPE, FANOUT_TYPE, NOTIFY_TYPE) +MESSAGE_TYPES = (CALL_TYPE, + CAST_TYPE, + CAST_FANOUT_TYPE, + NOTIFY_TYPE, + NOTIFY_FANOUT_TYPE) + +MULTISEND_TYPES = (CAST_FANOUT_TYPE, NOTIFY_FANOUT_TYPE) +DIRECT_TYPES = (CALL_TYPE, CAST_TYPE, NOTIFY_TYPE) +CAST_TYPES = (CAST_TYPE, CAST_FANOUT_TYPE) +NOTIFY_TYPES = (NOTIFY_TYPE, NOTIFY_FANOUT_TYPE) diff --git a/oslo_messaging/tests/drivers/zmq/test_impl_zmq.py b/oslo_messaging/tests/drivers/zmq/test_impl_zmq.py index d191ae64c..ca15f61ca 100644 --- a/oslo_messaging/tests/drivers/zmq/test_impl_zmq.py +++ b/oslo_messaging/tests/drivers/zmq/test_impl_zmq.py @@ -29,11 +29,10 @@ LOG = logging.getLogger(__name__) zmq = zmq_async.import_zmq() -class TestRPCServerListener(object): +class TestServerListener(object): def __init__(self, driver): self.driver = driver - self.target = None self.listener = None self.executor = zmq_async.get_executor(self._run) self._stop = threading.Event() @@ -41,8 +40,12 @@ class TestRPCServerListener(object): self.message = None def listen(self, target): - self.target = target - self.listener = self.driver.listen(self.target) + self.listener = self.driver.listen(target) + self.executor.execute() + + def listen_notifications(self, targets_and_priorities): + self.listener = self.driver.listen_for_notifications( + targets_and_priorities, {}) self.executor.execute() def _run(self): @@ -80,7 +83,7 @@ class ZmqBaseTestCase(test_utils.BaseTestCase): transport = oslo_messaging.get_transport(self.conf) self.driver = transport._driver - self.listener = TestRPCServerListener(self.driver) + self.listener = TestServerListener(self.driver) self.addCleanup(stopRpc(self.__dict__)) @@ -174,6 +177,20 @@ class TestZmqBasics(ZmqBaseTestCase): wait_for_reply=True) self.assertTrue(result) + def test_send_receive_notification(self): + """Notify() test""" + + target = oslo_messaging.Target(topic='t1', + server='notification@server') + self.listener.listen_notifications([(target, 'info')]) + + message = {'method': 'hello-world', 'tx_id': 1} + context = {} + target.topic = target.topic + '.info' + self.driver.send_notification(target, context, message, '3.0') + self.listener._received.wait() + self.assertTrue(self.listener._received.isSet()) + class TestPoller(test_utils.BaseTestCase): diff --git a/oslo_messaging/tests/functional/test_functional.py b/oslo_messaging/tests/functional/test_functional.py index 0e56e0c1c..ebca74aea 100644 --- a/oslo_messaging/tests/functional/test_functional.py +++ b/oslo_messaging/tests/functional/test_functional.py @@ -187,11 +187,6 @@ class NotifyTestCase(utils.SkipIfNoTransportURL): # NOTE(sileht): Each test must not use the same topics # to be run in parallel - def setUp(self): - super(NotifyTestCase, self).setUp() - if self.url.startswith("zmq"): - self.skipTest("Skip NotifyTestCase for ZMQ driver") - def test_simple(self): listener = self.useFixture( utils.NotificationFixture(self.url, ['test_simple'])) diff --git a/tools/simulator.py b/tools/simulator.py index 0a8309e17..8098fc654 100755 --- a/tools/simulator.py +++ b/tools/simulator.py @@ -25,10 +25,10 @@ import logging import sys import time -from oslo.config import cfg -from oslo import messaging -from oslo.messaging import notify -from oslo.messaging import rpc +from oslo_config import cfg +import oslo_messaging as messaging +from oslo_messaging import notify +from oslo_messaging import rpc LOG = logging.getLogger() diff --git a/tox.ini b/tox.ini index 6a92dbd23..6c86be990 100644 --- a/tox.ini +++ b/tox.ini @@ -41,7 +41,7 @@ setenv = TRANSPORT_URL=amqp://stackqpid:secretqpid@127.0.0.1:65123// commands = {toxinidir}/setup-test-env-qpid.sh python setup.py testr --slowest --testr-args='oslo_messaging.tests.functional' [testenv:py27-func-zeromq] -commands = {toxinidir}/setup-test-env-zmq.sh python setup.py testr --slowest --testr-args='oslo_messaging.tests.functional.test_functional' +commands = {toxinidir}/setup-test-env-zmq.sh python setup.py testr --slowest --testr-args='oslo_messaging.tests.functional' [flake8] show-source = True