From fca263c43c85c29ee9c1b57b767552c0c613a72e Mon Sep 17 00:00:00 2001 From: Doug Hellmann Date: Mon, 26 Mar 2018 11:32:49 -0400 Subject: [PATCH] remove zmq tests At the Rocky PTG the Oslo team decided to once again deprecate and remove the ZMQ driver. This is the first part of doing that, which removes the unit tests and extra functional test jobs, which are failing. Change-Id: Ia02adf122d2d4ff281e7c9fd3dff8894da241925 Signed-off-by: Doug Hellmann --- .zuul.yaml | 84 ------- oslo_messaging/tests/drivers/zmq/__init__.py | 0 .../tests/drivers/zmq/matchmaker/__init__.py | 0 .../zmq/matchmaker/test_impl_matchmaker.py | 140 ----------- .../tests/drivers/zmq/test_impl_zmq.py | 129 ---------- .../tests/drivers/zmq/test_pub_sub.py | 150 ----------- .../tests/drivers/zmq/test_routing_table.py | 80 ------ .../tests/drivers/zmq/test_zmq_ack_manager.py | 226 ----------------- .../tests/drivers/zmq/test_zmq_address.py | 67 ----- .../tests/drivers/zmq/test_zmq_async.py | 93 ------- .../drivers/zmq/test_zmq_transport_url.py | 127 ---------- .../tests/drivers/zmq/test_zmq_ttl_cache.py | 132 ---------- .../tests/drivers/zmq/test_zmq_version.py | 63 ----- .../tests/drivers/zmq/zmq_common.py | 111 --------- .../tests/functional/zmq/__init__.py | 0 .../tests/functional/zmq/multiproc_utils.py | 232 ------------------ .../tests/functional/zmq/test_startup.py | 49 ---- 17 files changed, 1683 deletions(-) delete mode 100644 oslo_messaging/tests/drivers/zmq/__init__.py delete mode 100644 oslo_messaging/tests/drivers/zmq/matchmaker/__init__.py delete mode 100755 oslo_messaging/tests/drivers/zmq/matchmaker/test_impl_matchmaker.py delete mode 100644 oslo_messaging/tests/drivers/zmq/test_impl_zmq.py delete mode 100755 oslo_messaging/tests/drivers/zmq/test_pub_sub.py delete mode 100644 oslo_messaging/tests/drivers/zmq/test_routing_table.py delete mode 100644 oslo_messaging/tests/drivers/zmq/test_zmq_ack_manager.py delete mode 100644 oslo_messaging/tests/drivers/zmq/test_zmq_address.py delete mode 100644 oslo_messaging/tests/drivers/zmq/test_zmq_async.py delete mode 100644 oslo_messaging/tests/drivers/zmq/test_zmq_transport_url.py delete mode 100644 oslo_messaging/tests/drivers/zmq/test_zmq_ttl_cache.py delete mode 100644 oslo_messaging/tests/drivers/zmq/test_zmq_version.py delete mode 100644 oslo_messaging/tests/drivers/zmq/zmq_common.py delete mode 100644 oslo_messaging/tests/functional/zmq/__init__.py delete mode 100644 oslo_messaging/tests/functional/zmq/multiproc_utils.py delete mode 100644 oslo_messaging/tests/functional/zmq/test_startup.py diff --git a/.zuul.yaml b/.zuul.yaml index 11fb98b05..7014f7036 100644 --- a/.zuul.yaml +++ b/.zuul.yaml @@ -18,27 +18,6 @@ tox_envlist: py27-func-rabbit bindep_profile: rabbit -- job: - name: oslo.messaging-tox-py27-func-zmq - parent: openstack-tox-py27 - vars: - tox_envlist: py27-func-zmq - bindep_profile: zmq - -- job: - name: oslo.messaging-tox-py27-func-zmq-proxy - parent: openstack-tox-py27 - vars: - tox_envlist: py27-func-zmq-proxy - bindep_profile: zmq - -- job: - name: oslo.messaging-tox-py27-func-zmq-pubsub - parent: openstack-tox-py27 - vars: - tox_envlist: py27-func-zmq-pubsub - bindep_profile: zmq - - job: name: oslo.messaging-tox-py35-func-amqp1 parent: openstack-tox-py35 @@ -52,13 +31,6 @@ tox_envlist: py35-func-rabbit bindep_profile: rabbit -- job: - name: oslo.messaging-tox-py35-func-zmq - parent: openstack-tox-py35 - vars: - tox_envlist: py35-func-zmq - bindep_profile: zmq - - job: name: oslo.messaging-src-dsvm-full-rabbit-default parent: legacy-dsvm-base @@ -115,17 +87,6 @@ - openstack/devstack-plugin-kafka - openstack/oslo.messaging -- job: - name: oslo.messaging-src-dsvm-full-zmq-default - parent: legacy-dsvm-base - run: playbooks/oslo.messaging-src-dsvm-full-zmq-default/run.yaml - post-run: playbooks/oslo.messaging-src-dsvm-full-zmq-default/post.yaml - timeout: 10800 - required-projects: - - openstack-infra/devstack-gate - - openstack/devstack-plugin-zmq - - openstack/oslo.messaging - - job: name: oslo.messaging-src-grenade-dsvm parent: legacy-dsvm-base @@ -193,24 +154,6 @@ - openstack/dib-utils - openstack/diskimage-builder -- job: - name: oslo.messaging-telemetry-dsvm-integration-zmq - parent: legacy-dsvm-base - run: playbooks/oslo.messaging-telemetry-dsvm-integration-zmq/run.yaml - post-run: playbooks/oslo.messaging-telemetry-dsvm-integration-zmq/post.yaml - timeout: 4200 - required-projects: - - openstack-infra/devstack-gate - - openstack/aodh - - openstack/ceilometer - - openstack/devstack-plugin-zmq - - openstack/oslo.messaging - - openstack/panko - # following are required when DEVSTACK_GATE_HEAT, which this - # job turns on - - openstack/dib-utils - - openstack/diskimage-builder - - job: name: oslo.messaging-telemetry-dsvm-integration-rabbit parent: legacy-dsvm-base @@ -268,19 +211,6 @@ - openstack/oslo.messaging - openstack/tempest -- job: - name: oslo.messaging-tempest-neutron-dsvm-src-zmq-default - parent: legacy-dsvm-base - run: playbooks/oslo.messaging-tempest-neutron-dsvm-src-zmq-default/run.yaml - post-run: playbooks/oslo.messaging-tempest-neutron-dsvm-src-zmq-default/post.yaml - timeout: 7800 - required-projects: - - openstack-infra/devstack-gate - - openstack/devstack-plugin-zmq - - openstack/neutron - - openstack/oslo.messaging - - openstack/tempest - - project: check: @@ -290,18 +220,10 @@ - oslo.messaging-tox-py27-func-kafka: voting: false - oslo.messaging-tox-py27-func-rabbit - - oslo.messaging-tox-py27-func-zmq-proxy: - voting: false - - oslo.messaging-tox-py27-func-zmq-pubsub: - voting: false - - oslo.messaging-tox-py27-func-zmq: - voting: false - oslo.messaging-tox-py35-func-amqp1: voting: false - oslo.messaging-tox-py35-func-rabbit: voting: false - - oslo.messaging-tox-py35-func-zmq: - voting: false - oslo.messaging-src-dsvm-full-rabbit-default - oslo.messaging-src-dsvm-full-amqp1-hybrid: @@ -314,8 +236,6 @@ voting: false - oslo.messaging-src-dsvm-full-kafka-default: voting: false - - oslo.messaging-src-dsvm-full-zmq-default: - voting: false - oslo.messaging-src-grenade-dsvm: voting: false @@ -327,8 +247,6 @@ voting: false - oslo.messaging-telemetry-dsvm-integration-kafka: voting: false - - oslo.messaging-telemetry-dsvm-integration-zmq: - voting: false - oslo.messaging-tempest-neutron-dsvm-src-rabbit-default - oslo.messaging-tempest-neutron-dsvm-src-amqp1-hybrid: @@ -336,8 +254,6 @@ branches: ^(?!stable/ocata).*$ - oslo.messaging-tempest-neutron-dsvm-src-kafka-default: voting: false - - oslo.messaging-tempest-neutron-dsvm-src-zmq-default: - voting: false gate: jobs: diff --git a/oslo_messaging/tests/drivers/zmq/__init__.py b/oslo_messaging/tests/drivers/zmq/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/oslo_messaging/tests/drivers/zmq/matchmaker/__init__.py b/oslo_messaging/tests/drivers/zmq/matchmaker/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/oslo_messaging/tests/drivers/zmq/matchmaker/test_impl_matchmaker.py b/oslo_messaging/tests/drivers/zmq/matchmaker/test_impl_matchmaker.py deleted file mode 100755 index a20cdb4c7..000000000 --- a/oslo_messaging/tests/drivers/zmq/matchmaker/test_impl_matchmaker.py +++ /dev/null @@ -1,140 +0,0 @@ -# Copyright 2014 Canonical, Ltd. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import inspect -from stevedore import driver -import testscenarios - -import oslo_messaging -from oslo_messaging.tests import utils as test_utils -from oslo_utils import importutils - -redis = importutils.try_import('redis') - - -def redis_available(): - '''Helper to see if local redis server is running''' - if not redis: - return False - try: - redis.StrictRedis(socket_timeout=1).ping() - return True - except redis.exceptions.ConnectionError: - return False - - -load_tests = testscenarios.load_tests_apply_scenarios - - -class TestImplMatchmaker(test_utils.BaseTestCase): - - scenarios = [ - ("dummy", {"rpc_zmq_matchmaker": "dummy"}), - ("redis", {"rpc_zmq_matchmaker": "redis"}), - ] - - def setUp(self): - super(TestImplMatchmaker, self).setUp() - - if self.rpc_zmq_matchmaker == "redis": - if not redis_available(): - self.skipTest("redis unavailable") - - self.test_matcher = driver.DriverManager( - 'oslo.messaging.zmq.matchmaker', - self.rpc_zmq_matchmaker, - ).driver(self.conf) - - if self.rpc_zmq_matchmaker == "redis": - for redis_instance in self.test_matcher._redis_instances: - self.addCleanup(redis_instance.flushdb) - - self.target = oslo_messaging.Target(topic="test_topic") - self.host1 = b"test_host1" - self.host2 = b"test_host2" - - def test_register(self): - self.test_matcher.register( - self.target, - self.host1, - "test", - expire=self.conf.oslo_messaging_zmq.zmq_target_expire) - - self.assertEqual([self.host1], - self.test_matcher.get_hosts(self.target, "test")) - - def test_register_two_hosts(self): - self.test_matcher.register( - self.target, - self.host1, - "test", - expire=self.conf.oslo_messaging_zmq.zmq_target_expire) - self.test_matcher.register( - self.target, - self.host2, - "test", - expire=self.conf.oslo_messaging_zmq.zmq_target_expire) - - self.assertItemsEqual(self.test_matcher.get_hosts(self.target, "test"), - [self.host1, self.host2]) - - def test_register_unregister(self): - self.test_matcher.register( - self.target, - self.host1, - "test", - expire=self.conf.oslo_messaging_zmq.zmq_target_expire) - self.test_matcher.register( - self.target, - self.host2, - "test", - expire=self.conf.oslo_messaging_zmq.zmq_target_expire) - - self.test_matcher.unregister(self.target, self.host2, "test") - - self.assertItemsEqual(self.test_matcher.get_hosts(self.target, "test"), - [self.host1]) - - def test_register_two_same_hosts(self): - self.test_matcher.register( - self.target, - self.host1, - "test", - expire=self.conf.oslo_messaging_zmq.zmq_target_expire) - self.test_matcher.register( - self.target, - self.host1, - "test", - expire=self.conf.oslo_messaging_zmq.zmq_target_expire) - - self.assertEqual([self.host1], - self.test_matcher.get_hosts(self.target, "test")) - - def test_get_hosts_wrong_topic(self): - target = oslo_messaging.Target(topic="no_such_topic") - self.assertEqual([], self.test_matcher.get_hosts(target, "test")) - - def test_handle_redis_package_error(self): - if self.rpc_zmq_matchmaker == "redis": - # move 'redis' variable to prevent this case affect others - module = inspect.getmodule(self.test_matcher) - redis_package = module.redis - - # 'redis' variable is set to None, when package importing is failed - module.redis = None - self.assertRaises(ImportError, self.test_matcher.__init__, - self.conf) - - # retrieve 'redis' variable which is set originally - module.redis = redis_package diff --git a/oslo_messaging/tests/drivers/zmq/test_impl_zmq.py b/oslo_messaging/tests/drivers/zmq/test_impl_zmq.py deleted file mode 100644 index 5c2d7e49e..000000000 --- a/oslo_messaging/tests/drivers/zmq/test_impl_zmq.py +++ /dev/null @@ -1,129 +0,0 @@ -# Copyright 2015 Mirantis, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import testtools - -import oslo_messaging -from oslo_messaging._drivers import impl_zmq -from oslo_messaging._drivers.zmq_driver import zmq_async -from oslo_messaging._drivers.zmq_driver import zmq_socket -from oslo_messaging.tests.drivers.zmq import zmq_common -from oslo_messaging.tests import utils as test_utils - - -zmq = zmq_async.import_zmq() - - -class ZmqTestPortsRange(zmq_common.ZmqBaseTestCase): - - @testtools.skipIf(zmq is None, "zmq not available") - def setUp(self): - super(ZmqTestPortsRange, self).setUp() - - # Set config values - kwargs = {'rpc_zmq_min_port': 5555, - 'rpc_zmq_max_port': 5560} - self.config(group='oslo_messaging_zmq', **kwargs) - - def test_ports_range(self): - listeners = [] - - for i in range(10): - try: - target = oslo_messaging.Target(topic='testtopic_' + str(i)) - new_listener = self.driver.listen(target, None, None) - listeners.append(new_listener) - except zmq_socket.ZmqPortBusy: - pass - - self.assertLessEqual(len(listeners), 5) - - for l in listeners: - l.cleanup() - - -class TestConfZmqDriverLoad(test_utils.BaseTestCase): - - @testtools.skipIf(zmq is None, "zmq not available") - def setUp(self): - super(TestConfZmqDriverLoad, self).setUp() - self.messaging_conf.transport_driver = 'zmq' - - def test_driver_load(self): - transport = oslo_messaging.get_transport(self.conf) - self.assertIsInstance(transport._driver, impl_zmq.ZmqDriver) - - -class TestZmqBasics(zmq_common.ZmqBaseTestCase): - - @testtools.skipIf(zmq is None, "zmq not available") - def setUp(self): - super(TestZmqBasics, self).setUp() - self.target = oslo_messaging.Target(topic='topic') - self.ctxt = {'key': 'value'} - self.message = {'method': 'qwerty', 'args': {'int': 1, 'bool': True}} - - def test_send_call_without_method_failure(self): - self.message.pop('method') - self.listener.listen(self.target) - self.assertRaises(KeyError, self.driver.send, - self.target, self.ctxt, self.message, - wait_for_reply=True, timeout=10) - - def _check_listener_received(self): - self.assertTrue(self.listener._received.isSet()) - self.assertEqual(self.ctxt, self.listener.message.ctxt) - self.assertEqual(self.message, self.listener.message.message) - - def test_send_call_success(self): - self.listener.listen(self.target) - result = self.driver.send(self.target, self.ctxt, self.message, - wait_for_reply=True, timeout=10) - self.assertTrue(result) - self._check_listener_received() - - def test_send_call_direct_success(self): - self.target.server = 'server' - self.listener.listen(self.target) - result = self.driver.send(self.target, self.ctxt, self.message, - wait_for_reply=True, timeout=10) - self.assertTrue(result) - self._check_listener_received() - - def test_send_cast_direct_success(self): - self.target.server = 'server' - self.listener.listen(self.target) - result = self.driver.send(self.target, self.ctxt, self.message, - wait_for_reply=False) - self.listener._received.wait(5) - self.assertIsNone(result) - self._check_listener_received() - - def test_send_fanout_success(self): - self.target.fanout = True - self.listener.listen(self.target) - result = self.driver.send(self.target, self.ctxt, self.message, - wait_for_reply=False) - self.listener._received.wait(5) - self.assertIsNone(result) - self._check_listener_received() - - def test_send_notify_success(self): - self.listener.listen_notifications([(self.target, 'info')]) - self.target.topic += '.info' - result = self.driver.send_notification(self.target, self.ctxt, - self.message, '3.0') - self.listener._received.wait(5) - self.assertIsNone(result) - self._check_listener_received() diff --git a/oslo_messaging/tests/drivers/zmq/test_pub_sub.py b/oslo_messaging/tests/drivers/zmq/test_pub_sub.py deleted file mode 100755 index cc72608a8..000000000 --- a/oslo_messaging/tests/drivers/zmq/test_pub_sub.py +++ /dev/null @@ -1,150 +0,0 @@ -# Copyright 2015-2016 Mirantis, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import json -import time - -import msgpack -import six -import testscenarios - -from oslo_config import cfg - -import oslo_messaging -from oslo_messaging._drivers.zmq_driver.proxy.central \ - import zmq_publisher_proxy -from oslo_messaging._drivers.zmq_driver.proxy import zmq_proxy -from oslo_messaging._drivers.zmq_driver import zmq_address -from oslo_messaging._drivers.zmq_driver import zmq_async -from oslo_messaging._drivers.zmq_driver import zmq_names -from oslo_messaging._drivers.zmq_driver import zmq_version -from oslo_messaging.tests.drivers.zmq import zmq_common - -load_tests = testscenarios.load_tests_apply_scenarios - -zmq = zmq_async.import_zmq() - -opt_group = cfg.OptGroup(name='zmq_proxy_opts', - title='ZeroMQ proxy options') -cfg.CONF.register_opts(zmq_proxy.zmq_proxy_opts, group=opt_group) - - -class TestPubSub(zmq_common.ZmqBaseTestCase): - - LISTENERS_COUNT = 3 - - scenarios = [ - ('json', {'serialization': 'json', - 'dumps': lambda obj: six.b(json.dumps(obj))}), - ('msgpack', {'serialization': 'msgpack', - 'dumps': msgpack.dumps}) - ] - - def setUp(self): - super(TestPubSub, self).setUp() - - kwargs = {'use_pub_sub': True, - 'rpc_zmq_serialization': self.serialization} - self.config(group='oslo_messaging_zmq', **kwargs) - - self.config(host="127.0.0.1", group="zmq_proxy_opts") - self.config(publisher_port=0, group="zmq_proxy_opts") - - self.publisher = zmq_publisher_proxy.PublisherProxy( - self.conf, self.driver.matchmaker) - self.driver.matchmaker.register_publisher( - (self.publisher.host, ''), - expire=self.conf.oslo_messaging_zmq.zmq_target_expire) - - self.listeners = [] - for _ in range(self.LISTENERS_COUNT): - self.listeners.append(zmq_common.TestServerListener(self.driver)) - - def tearDown(self): - super(TestPubSub, self).tearDown() - self.publisher.cleanup() - for listener in self.listeners: - listener.stop() - - def _send_request(self, target): - # Needed only in test env to give listener a chance to connect - # before request fires - time.sleep(1) - context = {} - message = {'method': 'hello-world'} - - self.publisher.send_request( - [b"reply_id", - b'', - six.b(zmq_version.MESSAGE_VERSION), - six.b(str(zmq_names.CAST_FANOUT_TYPE)), - zmq_address.target_to_subscribe_filter(target), - b"message_id", - self.dumps([context, message])] - ) - - def _check_listener(self, listener): - listener._received.wait(timeout=5) - self.assertTrue(listener._received.isSet()) - method = listener.message.message[u'method'] - self.assertEqual(u'hello-world', method) - - def _check_listener_negative(self, listener): - listener._received.wait(timeout=1) - self.assertFalse(listener._received.isSet()) - - def test_single_listener(self): - target = oslo_messaging.Target(topic='testtopic', fanout=True) - self.listener.listen(target) - - self._send_request(target) - - self._check_listener(self.listener) - - def test_all_listeners(self): - target = oslo_messaging.Target(topic='testtopic', fanout=True) - - for listener in self.listeners: - listener.listen(target) - - self._send_request(target) - - for listener in self.listeners: - self._check_listener(listener) - - def test_filtered(self): - target = oslo_messaging.Target(topic='testtopic', fanout=True) - target_wrong = oslo_messaging.Target(topic='wrong', fanout=True) - - self.listeners[0].listen(target) - self.listeners[1].listen(target) - self.listeners[2].listen(target_wrong) - - self._send_request(target) - - self._check_listener(self.listeners[0]) - self._check_listener(self.listeners[1]) - self._check_listener_negative(self.listeners[2]) - - def test_topic_part_matching(self): - target = oslo_messaging.Target(topic='testtopic', server='server') - target_part = oslo_messaging.Target(topic='testtopic', fanout=True) - - self.listeners[0].listen(target) - self.listeners[1].listen(target) - - self._send_request(target_part) - - self._check_listener(self.listeners[0]) - self._check_listener(self.listeners[1]) diff --git a/oslo_messaging/tests/drivers/zmq/test_routing_table.py b/oslo_messaging/tests/drivers/zmq/test_routing_table.py deleted file mode 100644 index 508a161e4..000000000 --- a/oslo_messaging/tests/drivers/zmq/test_routing_table.py +++ /dev/null @@ -1,80 +0,0 @@ -# Copyright 2016 Mirantis, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from oslo_messaging._drivers.zmq_driver.client import zmq_routing_table -from oslo_messaging._drivers.zmq_driver import zmq_async -from oslo_messaging.tests import utils as test_utils - - -zmq = zmq_async.import_zmq() - - -class TestRoutingTable(test_utils.BaseTestCase): - - def setUp(self): - super(TestRoutingTable, self).setUp() - - def test_get_next_while_origin_changed(self): - table = zmq_routing_table.RoutingTable(self.conf) - table.register("topic1.server1", "1") - table.register("topic1.server1", "2") - table.register("topic1.server1", "3") - - rr_gen = table.get_hosts_round_robin("topic1.server1") - - result = [] - for i in range(3): - result.append(next(rr_gen)) - - self.assertEqual(3, len(result)) - self.assertIn("1", result) - self.assertIn("2", result) - self.assertIn("3", result) - - table.register("topic1.server1", "4") - table.register("topic1.server1", "5") - table.register("topic1.server1", "6") - - result = [] - for i in range(6): - result.append(next(rr_gen)) - - self.assertEqual(6, len(result)) - self.assertIn("1", result) - self.assertIn("2", result) - self.assertIn("3", result) - self.assertIn("4", result) - self.assertIn("5", result) - self.assertIn("6", result) - - def test_no_targets(self): - table = zmq_routing_table.RoutingTable(self.conf) - rr_gen = table.get_hosts_round_robin("topic1.server1") - - result = [] - for t in rr_gen: - result.append(t) - self.assertEqual(0, len(result)) - - def test_target_unchanged(self): - table = zmq_routing_table.RoutingTable(self.conf) - table.register("topic1.server1", "1") - - rr_gen = table.get_hosts_round_robin("topic1.server1") - - result = [] - for i in range(3): - result.append(next(rr_gen)) - - self.assertEqual(["1", "1", "1"], result) diff --git a/oslo_messaging/tests/drivers/zmq/test_zmq_ack_manager.py b/oslo_messaging/tests/drivers/zmq/test_zmq_ack_manager.py deleted file mode 100644 index a0264cf01..000000000 --- a/oslo_messaging/tests/drivers/zmq/test_zmq_ack_manager.py +++ /dev/null @@ -1,226 +0,0 @@ -# Copyright 2016 Mirantis, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from six.moves import mock -import testtools -import time - -import oslo_messaging -from oslo_messaging._drivers.zmq_driver.client import zmq_receivers -from oslo_messaging._drivers.zmq_driver.client import zmq_senders -from oslo_messaging._drivers.zmq_driver.proxy import zmq_proxy -from oslo_messaging._drivers.zmq_driver.server.consumers.zmq_dealer_consumer \ - import DealerConsumerWithAcks -from oslo_messaging._drivers.zmq_driver import zmq_async -from oslo_messaging._drivers.zmq_driver import zmq_options -from oslo_messaging.tests.drivers.zmq import zmq_common -from oslo_messaging.tests import utils as test_utils - -zmq = zmq_async.import_zmq() - - -class TestZmqAckManager(test_utils.BaseTestCase): - - @testtools.skipIf(zmq is None, "zmq not available") - def setUp(self): - super(TestZmqAckManager, self).setUp() - - # register and set necessary config opts - self.messaging_conf.transport_driver = 'zmq' - zmq_options.register_opts(self.conf, mock.MagicMock()) - kwargs = {'rpc_zmq_matchmaker': 'dummy', - 'use_pub_sub': False, - 'use_router_proxy': True, - 'rpc_thread_pool_size': 1, - 'rpc_use_acks': True, - 'rpc_ack_timeout_base': 5, - 'rpc_ack_timeout_multiplier': 1, - 'rpc_retry_attempts': 2} - self.config(group='oslo_messaging_zmq', **kwargs) - self.conf.register_opts(zmq_proxy.zmq_proxy_opts, - group='zmq_proxy_opts') - - # mock set_result method of futures - self.set_result_patcher = mock.patch.object( - zmq_receivers.futurist.Future, 'set_result', - side_effect=zmq_receivers.futurist.Future.set_result, autospec=True - ) - self.set_result = self.set_result_patcher.start() - - # mock send method of senders - self.send_patcher = mock.patch.object( - zmq_senders.RequestSenderProxy, 'send', - side_effect=zmq_senders.RequestSenderProxy.send, autospec=True - ) - self.send = self.send_patcher.start() - - # get driver - transport = oslo_messaging.get_transport(self.conf) - self.driver = transport._driver - - # prepare and launch proxy - self.proxy = zmq_proxy.ZmqProxy(self.conf) - vars(self.driver.matchmaker).update(vars(self.proxy.matchmaker)) - self.executor = zmq_async.get_executor(self.proxy.run) - self.executor.execute() - - # create listener - self.listener = zmq_common.TestServerListener(self.driver) - - # create target and message - self.target = oslo_messaging.Target(topic='topic', server='server') - self.message = {'method': 'xyz', 'args': {'x': 1, 'y': 2, 'z': 3}} - - # start listening to target - self.listener.listen(self.target) - - # get ack manager - self.ack_manager = self.driver.client.get().publishers['default'] - - self.addCleanup( - zmq_common.StopRpc( - self, [('listener', 'stop'), ('executor', 'stop'), - ('proxy', 'close'), ('driver', 'cleanup'), - ('send_patcher', 'stop'), - ('set_result_patcher', 'stop')] - ) - ) - - # wait for all connections to be established - # and all parties to be ready for messaging - time.sleep(1) - - @mock.patch.object(DealerConsumerWithAcks, '_acknowledge', - side_effect=DealerConsumerWithAcks._acknowledge, - autospec=True) - def test_cast_success_without_retries(self, received_ack_mock): - result = self.driver.send( - self.target, {}, self.message, wait_for_reply=False - ) - self.assertIsNone(result) - self.ack_manager.pool.shutdown(wait=True) - self.assertTrue(self.listener._received.isSet()) - self.assertEqual(self.message, self.listener.message.message) - self.assertEqual(1, self.send.call_count) - self.assertEqual(1, received_ack_mock.call_count) - self.assertEqual(2, self.set_result.call_count) - - def test_cast_success_with_one_retry(self): - with mock.patch.object(DealerConsumerWithAcks, - '_acknowledge') as lost_ack_mock: - result = self.driver.send( - self.target, {}, self.message, wait_for_reply=False - ) - self.assertIsNone(result) - self.listener._received.wait(5) - self.assertTrue(self.listener._received.isSet()) - self.assertEqual(self.message, self.listener.message.message) - self.assertEqual(1, self.send.call_count) - self.assertEqual(1, lost_ack_mock.call_count) - self.assertEqual(0, self.set_result.call_count) - self.listener._received.clear() - with mock.patch.object(DealerConsumerWithAcks, '_acknowledge', - side_effect=DealerConsumerWithAcks._acknowledge, - autospec=True) as received_ack_mock: - self.ack_manager.pool.shutdown(wait=True) - self.assertFalse(self.listener._received.isSet()) - self.assertEqual(2, self.send.call_count) - self.assertEqual(1, received_ack_mock.call_count) - self.assertEqual(2, self.set_result.call_count) - - def test_cast_success_with_two_retries(self): - with mock.patch.object(DealerConsumerWithAcks, - '_acknowledge') as lost_ack_mock: - result = self.driver.send( - self.target, {}, self.message, wait_for_reply=False - ) - self.assertIsNone(result) - self.listener._received.wait(5) - self.assertTrue(self.listener._received.isSet()) - self.assertEqual(self.message, self.listener.message.message) - self.assertEqual(1, self.send.call_count) - self.assertEqual(1, lost_ack_mock.call_count) - self.assertEqual(0, self.set_result.call_count) - self.listener._received.clear() - self.listener._received.wait(7.5) - self.assertFalse(self.listener._received.isSet()) - self.assertEqual(2, self.send.call_count) - self.assertEqual(2, lost_ack_mock.call_count) - self.assertEqual(0, self.set_result.call_count) - with mock.patch.object(DealerConsumerWithAcks, '_acknowledge', - side_effect=DealerConsumerWithAcks._acknowledge, - autospec=True) as received_ack_mock: - self.ack_manager.pool.shutdown(wait=True) - self.assertFalse(self.listener._received.isSet()) - self.assertEqual(3, self.send.call_count) - self.assertEqual(1, received_ack_mock.call_count) - self.assertEqual(2, self.set_result.call_count) - - @mock.patch.object(DealerConsumerWithAcks, '_acknowledge') - def test_cast_failure_exhausted_retries(self, lost_ack_mock): - result = self.driver.send( - self.target, {}, self.message, wait_for_reply=False - ) - self.assertIsNone(result) - self.ack_manager.pool.shutdown(wait=True) - self.assertTrue(self.listener._received.isSet()) - self.assertEqual(self.message, self.listener.message.message) - self.assertEqual(3, self.send.call_count) - self.assertEqual(3, lost_ack_mock.call_count) - self.assertEqual(1, self.set_result.call_count) - - @mock.patch.object(DealerConsumerWithAcks, '_acknowledge', - side_effect=DealerConsumerWithAcks._acknowledge, - autospec=True) - @mock.patch.object(DealerConsumerWithAcks, '_reply', - side_effect=DealerConsumerWithAcks._reply, - autospec=True) - @mock.patch.object(DealerConsumerWithAcks, '_reply_from_cache', - side_effect=DealerConsumerWithAcks._reply_from_cache, - autospec=True) - def test_call_success_without_retries(self, unused_reply_from_cache_mock, - received_reply_mock, - received_ack_mock): - result = self.driver.send( - self.target, {}, self.message, wait_for_reply=True, timeout=10 - ) - self.assertIsNotNone(result) - self.ack_manager.pool.shutdown(wait=True) - self.assertTrue(self.listener._received.isSet()) - self.assertEqual(self.message, self.listener.message.message) - self.assertEqual(1, self.send.call_count) - self.assertEqual(1, received_ack_mock.call_count) - self.assertEqual(3, self.set_result.call_count) - received_reply_mock.assert_called_once_with(mock.ANY, mock.ANY, - reply=True, failure=None) - self.assertEqual(0, unused_reply_from_cache_mock.call_count) - - @mock.patch.object(DealerConsumerWithAcks, '_acknowledge') - @mock.patch.object(DealerConsumerWithAcks, '_reply') - @mock.patch.object(DealerConsumerWithAcks, '_reply_from_cache') - def test_call_failure_exhausted_retries(self, lost_reply_from_cache_mock, - lost_reply_mock, lost_ack_mock): - self.assertRaises(oslo_messaging.MessagingTimeout, - self.driver.send, - self.target, {}, self.message, - wait_for_reply=True, timeout=20) - self.ack_manager.pool.shutdown(wait=True) - self.assertTrue(self.listener._received.isSet()) - self.assertEqual(self.message, self.listener.message.message) - self.assertEqual(3, self.send.call_count) - self.assertEqual(3, lost_ack_mock.call_count) - self.assertEqual(2, self.set_result.call_count) - lost_reply_mock.assert_called_once_with(mock.ANY, - reply=True, failure=None) - self.assertEqual(2, lost_reply_from_cache_mock.call_count) diff --git a/oslo_messaging/tests/drivers/zmq/test_zmq_address.py b/oslo_messaging/tests/drivers/zmq/test_zmq_address.py deleted file mode 100644 index 519c294cc..000000000 --- a/oslo_messaging/tests/drivers/zmq/test_zmq_address.py +++ /dev/null @@ -1,67 +0,0 @@ -# Copyright 2016 Mirantis, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import testscenarios -import testtools - -import oslo_messaging -from oslo_messaging._drivers.zmq_driver import zmq_address -from oslo_messaging._drivers.zmq_driver import zmq_async -from oslo_messaging._drivers.zmq_driver import zmq_names -from oslo_messaging.tests import utils as test_utils - - -zmq = zmq_async.import_zmq() - -load_tests = testscenarios.load_tests_apply_scenarios - - -class TestZmqAddress(test_utils.BaseTestCase): - - scenarios = [ - ('router', {'listener_type': zmq_names.socket_type_str(zmq.ROUTER)}), - ('dealer', {'listener_type': zmq_names.socket_type_str(zmq.DEALER)}) - ] - - @testtools.skipIf(zmq is None, "zmq not available") - def test_target_to_key_topic_only(self): - target = oslo_messaging.Target(topic='topic') - key = zmq_address.target_to_key(target, self.listener_type) - self.assertEqual(self.listener_type + '/topic', key) - - @testtools.skipIf(zmq is None, "zmq not available") - def test_target_to_key_topic_server_round_robin(self): - target = oslo_messaging.Target(topic='topic', server='server') - key = zmq_address.target_to_key(target, self.listener_type) - self.assertEqual(self.listener_type + '/topic/server', key) - - @testtools.skipIf(zmq is None, "zmq not available") - def test_target_to_key_topic_fanout(self): - target = oslo_messaging.Target(topic='topic', fanout=True) - key = zmq_address.target_to_key(target, self.listener_type) - self.assertEqual(self.listener_type + '/topic', key) - - @testtools.skipIf(zmq is None, "zmq not available") - def test_target_to_key_topic_server_fanout(self): - target = oslo_messaging.Target(topic='topic', server='server', - fanout=True) - key = zmq_address.target_to_key(target, self.listener_type) - self.assertEqual(self.listener_type + '/topic', key) - - @testtools.skipIf(zmq is None, "zmq not available") - def test_target_to_key_topic_server_fanout_no_prefix(self): - target = oslo_messaging.Target(topic='topic', server='server', - fanout=True) - key = zmq_address.target_to_key(target) - self.assertEqual('topic', key) diff --git a/oslo_messaging/tests/drivers/zmq/test_zmq_async.py b/oslo_messaging/tests/drivers/zmq/test_zmq_async.py deleted file mode 100644 index a4dccd9ec..000000000 --- a/oslo_messaging/tests/drivers/zmq/test_zmq_async.py +++ /dev/null @@ -1,93 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from six.moves import mock -import testtools - -from oslo_messaging._drivers.zmq_driver.poller import green_poller -from oslo_messaging._drivers.zmq_driver.poller import threading_poller -from oslo_messaging._drivers.zmq_driver import zmq_async -from oslo_messaging.tests import utils as test_utils - -zmq = zmq_async.import_zmq() - - -class TestImportZmq(test_utils.BaseTestCase): - - @testtools.skipIf(zmq is None, "zmq not available") - def setUp(self): - super(TestImportZmq, self).setUp() - - def test_when_eventlet_is_available_then_load_eventlet_green_zmq(self): - zmq_async.eventletutils.is_monkey_patched = lambda _: True - - mock_try_import = mock.Mock() - zmq_async.importutils.try_import = mock_try_import - - zmq_async.import_zmq() - - mock_try_import.assert_called_with('eventlet.green.zmq', default=None) - - def test_when_evetlet_is_unavailable_then_load_zmq(self): - zmq_async.eventletutils.is_monkey_patched = lambda _: False - - mock_try_import = mock.Mock() - zmq_async.importutils.try_import = mock_try_import - - zmq_async.import_zmq() - - mock_try_import.assert_called_with('zmq', default=None) - - -class TestGetPoller(test_utils.BaseTestCase): - - @testtools.skipIf(zmq is None, "zmq not available") - def setUp(self): - super(TestGetPoller, self).setUp() - - def test_when_eventlet_is_available_then_return_GreenPoller(self): - zmq_async.eventletutils.is_monkey_patched = lambda _: True - - poller = zmq_async.get_poller() - - self.assertIsInstance(poller, green_poller.GreenPoller) - - def test_when_eventlet_is_unavailable_then_return_ThreadingPoller(self): - zmq_async.eventletutils.is_monkey_patched = lambda _: False - - poller = zmq_async.get_poller() - - self.assertIsInstance(poller, threading_poller.ThreadingPoller) - - -class TestGetExecutor(test_utils.BaseTestCase): - - @testtools.skipIf(zmq is None, "zmq not available") - def setUp(self): - super(TestGetExecutor, self).setUp() - - def test_when_eventlet_module_is_available_then_return_GreenExecutor(self): - zmq_async.eventletutils.is_monkey_patched = lambda _: True - - executor = zmq_async.get_executor('any method') - - self.assertIsInstance(executor, green_poller.GreenExecutor) - self.assertEqual('any method', executor._method) - - def test_when_eventlet_is_unavailable_then_return_ThreadingExecutor(self): - zmq_async.eventletutils.is_monkey_patched = lambda _: False - - executor = zmq_async.get_executor('any method') - - self.assertIsInstance(executor, - threading_poller.ThreadingExecutor) - self.assertEqual('any method', executor._method) diff --git a/oslo_messaging/tests/drivers/zmq/test_zmq_transport_url.py b/oslo_messaging/tests/drivers/zmq/test_zmq_transport_url.py deleted file mode 100644 index 45df7967b..000000000 --- a/oslo_messaging/tests/drivers/zmq/test_zmq_transport_url.py +++ /dev/null @@ -1,127 +0,0 @@ -# Copyright 2016 Mirantis, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from six.moves import mock -import testtools - -import oslo_messaging -from oslo_messaging._drivers import common -from oslo_messaging._drivers.zmq_driver.matchmaker.zmq_matchmaker_base \ - import MatchmakerDummy -from oslo_messaging._drivers.zmq_driver.matchmaker import zmq_matchmaker_redis -from oslo_messaging._drivers.zmq_driver import zmq_async -from oslo_messaging.tests import utils as test_utils - -zmq = zmq_async.import_zmq() - -redis = zmq_matchmaker_redis.redis -sentinel = zmq_matchmaker_redis.redis_sentinel - - -class TestZmqTransportUrl(test_utils.BaseTestCase): - - @testtools.skipIf(zmq is None, "zmq not available") - def setUp(self): - super(TestZmqTransportUrl, self).setUp() - - def setup_url(self, url): - transport = oslo_messaging.get_transport(self.conf, url) - self.addCleanup(transport.cleanup) - driver = transport._driver - return driver, url - - def mock_redis(self): - if redis is None: - self.skipTest("redis not available") - else: - redis_patcher = mock.patch.object(redis, 'StrictRedis') - self.addCleanup(redis_patcher.stop) - return redis_patcher.start() - - def mock_sentinel(self): - if sentinel is None: - self.skipTest("sentinel not available") - else: - sentinel_patcher = mock.patch.object(sentinel, 'Sentinel') - self.addCleanup(sentinel_patcher.stop) - return sentinel_patcher.start() - - def test_empty_url(self): - self.mock_redis() - driver, url = self.setup_url("zmq:///") - self.assertIs(zmq_matchmaker_redis.MatchmakerRedis, - driver.matchmaker.__class__) - self.assertEqual('zmq', driver.matchmaker.url.transport) - - def test_error_url(self): - self.assertRaises(common.RPCException, self.setup_url, "zmq+error:///") - - def test_dummy_url(self): - driver, url = self.setup_url("zmq+dummy:///") - self.assertIs(MatchmakerDummy, - driver.matchmaker.__class__) - self.assertEqual('zmq+dummy', driver.matchmaker.url.transport) - - def test_redis_url(self): - self.mock_redis() - driver, url = self.setup_url("zmq+redis:///") - self.assertIs(zmq_matchmaker_redis.MatchmakerRedis, - driver.matchmaker.__class__) - self.assertEqual('zmq+redis', driver.matchmaker.url.transport) - - def test_sentinel_url(self): - self.mock_sentinel() - driver, url = self.setup_url("zmq+sentinel:///") - self.assertIs(zmq_matchmaker_redis.MatchmakerSentinel, - driver.matchmaker.__class__) - self.assertEqual('zmq+sentinel', driver.matchmaker.url.transport) - - def test_host_with_credentials_url(self): - self.mock_redis() - driver, url = self.setup_url("zmq://:password@host:60000/") - self.assertIs(zmq_matchmaker_redis.MatchmakerRedis, - driver.matchmaker.__class__) - self.assertEqual('zmq', driver.matchmaker.url.transport) - self.assertEqual( - [{"host": "host", "port": 60000, "password": "password"}], - driver.matchmaker._redis_hosts - ) - - def test_redis_multiple_hosts_url(self): - self.mock_redis() - driver, url = self.setup_url( - "zmq+redis://host1:60001,host2:60002,host3:60003/" - ) - self.assertIs(zmq_matchmaker_redis.MatchmakerRedis, - driver.matchmaker.__class__) - self.assertEqual('zmq+redis', driver.matchmaker.url.transport) - self.assertEqual( - [{"host": "host1", "port": 60001, "password": None}, - {"host": "host2", "port": 60002, "password": None}, - {"host": "host3", "port": 60003, "password": None}], - driver.matchmaker._redis_hosts - ) - - def test_sentinel_multiple_hosts_url(self): - self.mock_sentinel() - driver, url = self.setup_url( - "zmq+sentinel://host1:20001,host2:20002,host3:20003/" - ) - self.assertIs(zmq_matchmaker_redis.MatchmakerSentinel, - driver.matchmaker.__class__) - self.assertEqual('zmq+sentinel', driver.matchmaker.url.transport) - self.assertEqual( - [("host1", 20001), ("host2", 20002), ("host3", 20003)], - driver.matchmaker._sentinel_hosts - ) diff --git a/oslo_messaging/tests/drivers/zmq/test_zmq_ttl_cache.py b/oslo_messaging/tests/drivers/zmq/test_zmq_ttl_cache.py deleted file mode 100644 index 772a97ded..000000000 --- a/oslo_messaging/tests/drivers/zmq/test_zmq_ttl_cache.py +++ /dev/null @@ -1,132 +0,0 @@ -# Copyright 2016 Mirantis, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import time - -from oslo_messaging._drivers.zmq_driver.server import zmq_ttl_cache -from oslo_messaging.tests import utils as test_utils - - -class TestZmqTTLCache(test_utils.BaseTestCase): - - def setUp(self): - super(TestZmqTTLCache, self).setUp() - - def call_count_decorator(unbound_method): - def wrapper(self, *args, **kwargs): - wrapper.call_count += 1 - return unbound_method(self, *args, **kwargs) - wrapper.call_count = 0 - return wrapper - - zmq_ttl_cache.TTLCache._update_cache = \ - call_count_decorator(zmq_ttl_cache.TTLCache._update_cache) - - self.cache = zmq_ttl_cache.TTLCache(ttl=1) - - self.addCleanup(lambda: self.cache.cleanup()) - - def _test_add_get(self): - self.cache.add('x', 'a') - - self.assertEqual(self.cache.get('x'), 'a') - self.assertEqual(self.cache.get('x', 'b'), 'a') - self.assertIsNone(self.cache.get('y')) - self.assertEqual(self.cache.get('y', 'b'), 'b') - - time.sleep(1) - - self.assertIsNone(self.cache.get('x')) - self.assertEqual(self.cache.get('x', 'b'), 'b') - - def test_add_get_with_executor(self): - self._test_add_get() - - def test_add_get_without_executor(self): - self.cache._executor.stop() - self._test_add_get() - - def _test_in_operator(self): - self.cache.add(1) - - self.assertIn(1, self.cache) - - time.sleep(0.5) - - self.cache.add(2) - - self.assertIn(1, self.cache) - self.assertIn(2, self.cache) - - time.sleep(0.75) - - self.cache.add(3) - - self.assertNotIn(1, self.cache) - self.assertIn(2, self.cache) - self.assertIn(3, self.cache) - - time.sleep(0.5) - - self.assertNotIn(2, self.cache) - self.assertIn(3, self.cache) - - def test_in_operator_with_executor(self): - self._test_in_operator() - - def test_in_operator_without_executor(self): - self.cache._executor.stop() - self._test_in_operator() - - def _is_expired(self, key): - with self.cache._lock: - _, expiration_time = self.cache._cache[key] - return self.cache._is_expired(expiration_time, time.time()) - - def test_executor(self): - self.cache.add(1) - - self.assertEqual([1], sorted(self.cache._cache.keys())) - self.assertFalse(self._is_expired(1)) - - time.sleep(0.75) - - self.assertEqual(1, self.cache._update_cache.call_count) - - self.cache.add(2) - - self.assertEqual([1, 2], sorted(self.cache._cache.keys())) - self.assertFalse(self._is_expired(1)) - self.assertFalse(self._is_expired(2)) - - time.sleep(0.75) - - self.assertEqual(2, self.cache._update_cache.call_count) - - self.cache.add(3) - - if 1 in self.cache: - self.assertEqual([1, 2, 3], sorted(self.cache._cache.keys())) - self.assertTrue(self._is_expired(1)) - else: - self.assertEqual([2, 3], sorted(self.cache._cache.keys())) - self.assertFalse(self._is_expired(2)) - self.assertFalse(self._is_expired(3)) - - time.sleep(0.75) - - self.assertEqual(3, self.cache._update_cache.call_count) - - self.assertEqual([3], sorted(self.cache._cache.keys())) - self.assertFalse(self._is_expired(3)) diff --git a/oslo_messaging/tests/drivers/zmq/test_zmq_version.py b/oslo_messaging/tests/drivers/zmq/test_zmq_version.py deleted file mode 100644 index 9b0189403..000000000 --- a/oslo_messaging/tests/drivers/zmq/test_zmq_version.py +++ /dev/null @@ -1,63 +0,0 @@ -# Copyright 2016 Mirantis, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from oslo_messaging._drivers.zmq_driver import zmq_version -from oslo_messaging.tests import utils as test_utils - - -class Doer(object): - - def __init__(self): - self.x = 1 - self.y = 2 - self.z = 3 - - def _sudo(self): - pass - - def do(self): - pass - - def _do_v_1_1(self): - pass - - def _do_v_2_2(self): - pass - - def _do_v_3_3(self): - pass - - -class TestZmqVersion(test_utils.BaseTestCase): - - def setUp(self): - super(TestZmqVersion, self).setUp() - self.doer = Doer() - - def test_get_unknown_attr_versions(self): - self.assertRaises(AssertionError, zmq_version.get_method_versions, - self.doer, 'qwerty') - - def test_get_non_method_attr_versions(self): - for attr_name in vars(self.doer): - self.assertRaises(AssertionError, zmq_version.get_method_versions, - self.doer, attr_name) - - def test_get_private_method_versions(self): - self.assertRaises(AssertionError, zmq_version.get_method_versions, - self.doer, '_sudo') - - def test_get_public_method_versions(self): - do_versions = zmq_version.get_method_versions(self.doer, 'do') - self.assertEqual(['1.1', '2.2', '3.3'], sorted(do_versions.keys())) diff --git a/oslo_messaging/tests/drivers/zmq/zmq_common.py b/oslo_messaging/tests/drivers/zmq/zmq_common.py deleted file mode 100644 index 2e3699933..000000000 --- a/oslo_messaging/tests/drivers/zmq/zmq_common.py +++ /dev/null @@ -1,111 +0,0 @@ -# Copyright 2015 Mirantis, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import logging -import threading - -import fixtures -from six.moves import mock -import testtools - -import oslo_messaging -from oslo_messaging._drivers.zmq_driver import zmq_async -from oslo_messaging._drivers.zmq_driver import zmq_options -from oslo_messaging._i18n import _LE -from oslo_messaging.tests import utils as test_utils - -LOG = logging.getLogger(__name__) - -zmq = zmq_async.import_zmq() - - -class TestServerListener(object): - - def __init__(self, driver): - self.driver = driver - self.listener = None - self.executor = zmq_async.get_executor(self._run) - self._stop = threading.Event() - self._received = threading.Event() - self.message = None - - def listen(self, target): - self.listener = self.driver.listen(target, None, - None)._poll_style_listener - self.executor.execute() - - def listen_notifications(self, targets_and_priorities): - self.listener = self.driver.listen_for_notifications( - targets_and_priorities, None, None, None)._poll_style_listener - self.executor.execute() - - def _run(self): - try: - messages = self.listener.poll() - if messages: - message = messages[0] - message.acknowledge() - self._received.set() - self.message = message - message.reply(reply=True) - except Exception: - LOG.exception(_LE("Unexpected exception occurred.")) - - def stop(self): - self.executor.stop() - - -class ZmqBaseTestCase(test_utils.BaseTestCase): - """Base test case for all ZMQ tests """ - - @testtools.skipIf(zmq is None, "zmq not available") - def setUp(self): - super(ZmqBaseTestCase, self).setUp() - self.messaging_conf.transport_driver = 'zmq' - zmq_options.register_opts(self.conf, mock.MagicMock()) - - # Set config values - self.internal_ipc_dir = self.useFixture(fixtures.TempDir()).path - kwargs = {'rpc_zmq_bind_address': '127.0.0.1', - 'rpc_zmq_host': '127.0.0.1', - 'rpc_zmq_ipc_dir': self.internal_ipc_dir, - 'use_pub_sub': False, - 'use_router_proxy': False, - 'rpc_zmq_matchmaker': 'dummy'} - self.config(group='oslo_messaging_zmq', **kwargs) - self.config(rpc_response_timeout=5) - - # Get driver - transport = oslo_messaging.get_transport(self.conf) - self.driver = transport._driver - - self.listener = TestServerListener(self.driver) - - self.addCleanup( - StopRpc(self, [('listener', 'stop'), ('driver', 'cleanup')]) - ) - - -class StopRpc(object): - def __init__(self, obj, attrs_and_stops): - self.obj = obj - self.attrs_and_stops = attrs_and_stops - - def __call__(self): - for attr, stop in self.attrs_and_stops: - if hasattr(self.obj, attr): - obj_attr = getattr(self.obj, attr) - if hasattr(obj_attr, stop): - obj_attr_stop = getattr(obj_attr, stop) - obj_attr_stop() diff --git a/oslo_messaging/tests/functional/zmq/__init__.py b/oslo_messaging/tests/functional/zmq/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/oslo_messaging/tests/functional/zmq/multiproc_utils.py b/oslo_messaging/tests/functional/zmq/multiproc_utils.py deleted file mode 100644 index cf3b6e3a5..000000000 --- a/oslo_messaging/tests/functional/zmq/multiproc_utils.py +++ /dev/null @@ -1,232 +0,0 @@ -# Copyright 2016 Mirantis, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import logging -import logging.handlers -import multiprocessing -import os -import sys -import threading -import time -import uuid - -from oslo_config import cfg - -import oslo_messaging -from oslo_messaging._drivers.zmq_driver import zmq_async -from oslo_messaging.tests.functional import utils - -LOG = logging.getLogger(__name__) - -zmq = zmq_async.import_zmq() - - -class QueueHandler(logging.Handler): - """This is a logging handler which sends events to a multiprocessing queue. - - The plan is to add it to Python 3.2, but this can be copy pasted into - user code for use with earlier Python versions. - """ - - def __init__(self, queue): - """Initialise an instance, using the passed queue.""" - logging.Handler.__init__(self) - self.queue = queue - - def emit(self, record): - """Emit a record. - - Writes the LogRecord to the queue. - """ - try: - ei = record.exc_info - if ei: - # just to get traceback text into record.exc_text - dummy = self.format(record) # noqa - record.exc_info = None # not needed any more - self.queue.put_nowait(record) - except (KeyboardInterrupt, SystemExit): - raise - except Exception: - self.handleError(record) - - -def listener_configurer(conf): - root = logging.getLogger() - h = logging.StreamHandler(sys.stdout) - f = logging.Formatter('%(asctime)s %(processName)-10s %(name)s ' - '%(levelname)-8s %(message)s') - h.setFormatter(f) - root.addHandler(h) - log_path = conf.oslo_messaging_zmq.rpc_zmq_ipc_dir + \ - "/" + "zmq_multiproc.log" - file_handler = logging.StreamHandler(open(log_path, 'w')) - file_handler.setFormatter(f) - root.addHandler(file_handler) - - -def server_configurer(queue): - h = QueueHandler(queue) - root = logging.getLogger() - root.addHandler(h) - root.setLevel(logging.DEBUG) - - -def listener_thread(queue, configurer, conf): - configurer(conf) - while True: - time.sleep(0.3) - try: - record = queue.get() - if record is None: - break - logger = logging.getLogger(record.name) - logger.handle(record) - except (KeyboardInterrupt, SystemExit): - raise - - -class Client(oslo_messaging.RPCClient): - - def __init__(self, transport, topic): - super(Client, self).__init__( - transport=transport, target=oslo_messaging.Target(topic=topic)) - self.replies = [] - - def call_a(self): - LOG.warning("call_a - client side") - rep = self.call({}, 'call_a') - LOG.warning("after call_a - client side") - self.replies.append(rep) - return rep - - -class ReplyServerEndpoint(object): - - def call_a(self, *args, **kwargs): - LOG.warning("call_a - Server endpoint reached!") - return "OK" - - -class Server(object): - - def __init__(self, conf, log_queue, transport_url, name, topic=None): - self.conf = conf - self.log_queue = log_queue - self.transport_url = transport_url - self.name = name - self.topic = topic or str(uuid.uuid4()) - self.ready = multiprocessing.Value('b', False) - self._stop = multiprocessing.Event() - - def start(self): - self.process = multiprocessing.Process(target=self._run_server, - name=self.name, - args=(self.conf, - self.transport_url, - self.log_queue, - self.ready)) - self.process.start() - LOG.debug("Server process started: pid: %d", self.process.pid) - - def _run_server(self, conf, url, log_queue, ready): - server_configurer(log_queue) - LOG.debug("Starting RPC server") - - transport = oslo_messaging.get_transport(conf, url=url) - target = oslo_messaging.Target(topic=self.topic, server=self.name) - self.rpc_server = oslo_messaging.get_rpc_server( - transport=transport, target=target, - endpoints=[ReplyServerEndpoint()], - executor='eventlet') - self.rpc_server.start() - ready.value = True - LOG.debug("RPC server being started") - while not self._stop.is_set(): - LOG.debug("Waiting for the stop signal ...") - time.sleep(1) - self.rpc_server.stop() - self.rpc_server.wait() - LOG.debug("Leaving process T:%s Pid:%d", str(target), os.getpid()) - - def cleanup(self): - LOG.debug("Stopping server") - self.shutdown() - - def shutdown(self): - self._stop.set() - - def restart(self, time_for_restart=1): - pass - - def hang(self): - pass - - def crash(self): - pass - - def ping(self): - pass - - -class MultiprocTestCase(utils.SkipIfNoTransportURL): - - def setUp(self): - super(MultiprocTestCase, self).setUp(conf=cfg.ConfigOpts()) - - if not self.url.startswith("zmq"): - self.skipTest("ZeroMQ specific skipped...") - - self.transport = oslo_messaging.get_transport(self.conf, url=self.url) - - LOG.debug("Start log queue") - - self.log_queue = multiprocessing.Queue() - self.log_listener = threading.Thread(target=listener_thread, - args=(self.log_queue, - listener_configurer, - self.conf)) - self.log_listener.start() - self.spawned = [] - - self.conf.prog = "test_prog" - self.conf.project = "test_project" - - def tearDown(self): - for process in self.spawned: - process.cleanup() - super(MultiprocTestCase, self).tearDown() - - def get_client(self, topic): - return Client(self.transport, topic) - - def spawn_server(self, wait_for_server=False, topic=None): - name = "server_%d_%s" % (len(self.spawned), str(uuid.uuid4())[:8]) - server = Server(self.conf, self.log_queue, self.url, name, topic) - LOG.debug("[SPAWN] %s (starting)...", server.name) - server.start() - if wait_for_server: - while not server.ready.value: - LOG.debug("[SPAWN] %s (waiting for server ready)...", - server.name) - time.sleep(1) - LOG.debug("[SPAWN] Server %s:%d started.", - server.name, server.process.pid) - self.spawned.append(server) - return server - - def spawn_servers(self, number, wait_for_server=False, common_topic=True): - topic = str(uuid.uuid4()) if common_topic else None - for _ in range(number): - self.spawn_server(wait_for_server, topic) diff --git a/oslo_messaging/tests/functional/zmq/test_startup.py b/oslo_messaging/tests/functional/zmq/test_startup.py deleted file mode 100644 index ebea76ec7..000000000 --- a/oslo_messaging/tests/functional/zmq/test_startup.py +++ /dev/null @@ -1,49 +0,0 @@ -# Copyright 2016 Mirantis, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import os -import sys - -from oslo_messaging.tests.functional.zmq import multiproc_utils - - -class StartupOrderTestCase(multiproc_utils.MultiprocTestCase): - - def setUp(self): - super(StartupOrderTestCase, self).setUp() - - self.conf.prog = "test_prog" - self.conf.project = "test_project" - - self.config(rpc_response_timeout=10) - - log_path = os.path.join(self.conf.oslo_messaging_zmq.rpc_zmq_ipc_dir, - str(os.getpid()) + ".log") - sys.stdout = open(log_path, "wb", buffering=0) - - def test_call_client_wait_for_server(self): - server = self.spawn_server(wait_for_server=True) - client = self.get_client(server.topic) - for _ in range(3): - reply = client.call_a() - self.assertIsNotNone(reply) - self.assertEqual(3, len(client.replies)) - - def test_call_client_dont_wait_for_server(self): - server = self.spawn_server(wait_for_server=False) - client = self.get_client(server.topic) - for _ in range(3): - reply = client.call_a() - self.assertIsNotNone(reply) - self.assertEqual(3, len(client.replies))