Merge "remove zmq tests"

This commit is contained in:
Zuul 2018-03-27 19:35:04 +00:00 committed by Gerrit Code Review
commit 415e9b991b
17 changed files with 0 additions and 1683 deletions

View File

@ -18,27 +18,6 @@
tox_envlist: py27-func-rabbit
bindep_profile: rabbit
- job:
name: oslo.messaging-tox-py27-func-zmq
parent: openstack-tox-py27
vars:
tox_envlist: py27-func-zmq
bindep_profile: zmq
- job:
name: oslo.messaging-tox-py27-func-zmq-proxy
parent: openstack-tox-py27
vars:
tox_envlist: py27-func-zmq-proxy
bindep_profile: zmq
- job:
name: oslo.messaging-tox-py27-func-zmq-pubsub
parent: openstack-tox-py27
vars:
tox_envlist: py27-func-zmq-pubsub
bindep_profile: zmq
- job:
name: oslo.messaging-tox-py35-func-amqp1
parent: openstack-tox-py35
@ -52,13 +31,6 @@
tox_envlist: py35-func-rabbit
bindep_profile: rabbit
- job:
name: oslo.messaging-tox-py35-func-zmq
parent: openstack-tox-py35
vars:
tox_envlist: py35-func-zmq
bindep_profile: zmq
- job:
name: oslo.messaging-src-dsvm-full-rabbit-default
parent: legacy-dsvm-base
@ -115,17 +87,6 @@
- openstack/devstack-plugin-kafka
- openstack/oslo.messaging
- job:
name: oslo.messaging-src-dsvm-full-zmq-default
parent: legacy-dsvm-base
run: playbooks/oslo.messaging-src-dsvm-full-zmq-default/run.yaml
post-run: playbooks/oslo.messaging-src-dsvm-full-zmq-default/post.yaml
timeout: 10800
required-projects:
- openstack-infra/devstack-gate
- openstack/devstack-plugin-zmq
- openstack/oslo.messaging
- job:
name: oslo.messaging-src-grenade-dsvm
parent: legacy-dsvm-base
@ -194,24 +155,6 @@
- openstack/dib-utils
- openstack/diskimage-builder
- job:
name: oslo.messaging-telemetry-dsvm-integration-zmq
parent: legacy-dsvm-base
run: playbooks/oslo.messaging-telemetry-dsvm-integration-zmq/run.yaml
post-run: playbooks/oslo.messaging-telemetry-dsvm-integration-zmq/post.yaml
timeout: 4200
required-projects:
- openstack-infra/devstack-gate
- openstack/aodh
- openstack/ceilometer
- openstack/devstack-plugin-zmq
- openstack/oslo.messaging
- openstack/panko
# following are required when DEVSTACK_GATE_HEAT, which this
# job turns on
- openstack/dib-utils
- openstack/diskimage-builder
- job:
name: oslo.messaging-telemetry-dsvm-integration-rabbit
parent: legacy-dsvm-base
@ -270,19 +213,6 @@
- openstack/oslo.messaging
- openstack/tempest
- job:
name: oslo.messaging-tempest-neutron-dsvm-src-zmq-default
parent: legacy-dsvm-base
run: playbooks/oslo.messaging-tempest-neutron-dsvm-src-zmq-default/run.yaml
post-run: playbooks/oslo.messaging-tempest-neutron-dsvm-src-zmq-default/post.yaml
timeout: 7800
required-projects:
- openstack-infra/devstack-gate
- openstack/devstack-plugin-zmq
- openstack/neutron
- openstack/oslo.messaging
- openstack/tempest
- project:
check:
@ -292,18 +222,10 @@
- oslo.messaging-tox-py27-func-kafka:
voting: false
- oslo.messaging-tox-py27-func-rabbit
- oslo.messaging-tox-py27-func-zmq-proxy:
voting: false
- oslo.messaging-tox-py27-func-zmq-pubsub:
voting: false
- oslo.messaging-tox-py27-func-zmq:
voting: false
- oslo.messaging-tox-py35-func-amqp1:
voting: false
- oslo.messaging-tox-py35-func-rabbit:
voting: false
- oslo.messaging-tox-py35-func-zmq:
voting: false
- oslo.messaging-src-dsvm-full-rabbit-default
- oslo.messaging-src-dsvm-full-amqp1-hybrid:
@ -316,8 +238,6 @@
voting: false
- oslo.messaging-src-dsvm-full-kafka-default:
voting: false
- oslo.messaging-src-dsvm-full-zmq-default:
voting: false
- oslo.messaging-src-grenade-dsvm:
voting: false
@ -329,8 +249,6 @@
voting: false
- oslo.messaging-telemetry-dsvm-integration-kafka:
voting: false
- oslo.messaging-telemetry-dsvm-integration-zmq:
voting: false
- oslo.messaging-tempest-neutron-dsvm-src-rabbit-default
- oslo.messaging-tempest-neutron-dsvm-src-amqp1-hybrid:
@ -338,8 +256,6 @@
branches: ^(?!stable/ocata).*$
- oslo.messaging-tempest-neutron-dsvm-src-kafka-default:
voting: false
- oslo.messaging-tempest-neutron-dsvm-src-zmq-default:
voting: false
gate:
jobs:

View File

@ -1,140 +0,0 @@
# Copyright 2014 Canonical, Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import inspect
from stevedore import driver
import testscenarios
import oslo_messaging
from oslo_messaging.tests import utils as test_utils
from oslo_utils import importutils
redis = importutils.try_import('redis')
def redis_available():
'''Helper to see if local redis server is running'''
if not redis:
return False
try:
redis.StrictRedis(socket_timeout=1).ping()
return True
except redis.exceptions.ConnectionError:
return False
load_tests = testscenarios.load_tests_apply_scenarios
class TestImplMatchmaker(test_utils.BaseTestCase):
scenarios = [
("dummy", {"rpc_zmq_matchmaker": "dummy"}),
("redis", {"rpc_zmq_matchmaker": "redis"}),
]
def setUp(self):
super(TestImplMatchmaker, self).setUp()
if self.rpc_zmq_matchmaker == "redis":
if not redis_available():
self.skipTest("redis unavailable")
self.test_matcher = driver.DriverManager(
'oslo.messaging.zmq.matchmaker',
self.rpc_zmq_matchmaker,
).driver(self.conf)
if self.rpc_zmq_matchmaker == "redis":
for redis_instance in self.test_matcher._redis_instances:
self.addCleanup(redis_instance.flushdb)
self.target = oslo_messaging.Target(topic="test_topic")
self.host1 = b"test_host1"
self.host2 = b"test_host2"
def test_register(self):
self.test_matcher.register(
self.target,
self.host1,
"test",
expire=self.conf.oslo_messaging_zmq.zmq_target_expire)
self.assertEqual([self.host1],
self.test_matcher.get_hosts(self.target, "test"))
def test_register_two_hosts(self):
self.test_matcher.register(
self.target,
self.host1,
"test",
expire=self.conf.oslo_messaging_zmq.zmq_target_expire)
self.test_matcher.register(
self.target,
self.host2,
"test",
expire=self.conf.oslo_messaging_zmq.zmq_target_expire)
self.assertItemsEqual(self.test_matcher.get_hosts(self.target, "test"),
[self.host1, self.host2])
def test_register_unregister(self):
self.test_matcher.register(
self.target,
self.host1,
"test",
expire=self.conf.oslo_messaging_zmq.zmq_target_expire)
self.test_matcher.register(
self.target,
self.host2,
"test",
expire=self.conf.oslo_messaging_zmq.zmq_target_expire)
self.test_matcher.unregister(self.target, self.host2, "test")
self.assertItemsEqual(self.test_matcher.get_hosts(self.target, "test"),
[self.host1])
def test_register_two_same_hosts(self):
self.test_matcher.register(
self.target,
self.host1,
"test",
expire=self.conf.oslo_messaging_zmq.zmq_target_expire)
self.test_matcher.register(
self.target,
self.host1,
"test",
expire=self.conf.oslo_messaging_zmq.zmq_target_expire)
self.assertEqual([self.host1],
self.test_matcher.get_hosts(self.target, "test"))
def test_get_hosts_wrong_topic(self):
target = oslo_messaging.Target(topic="no_such_topic")
self.assertEqual([], self.test_matcher.get_hosts(target, "test"))
def test_handle_redis_package_error(self):
if self.rpc_zmq_matchmaker == "redis":
# move 'redis' variable to prevent this case affect others
module = inspect.getmodule(self.test_matcher)
redis_package = module.redis
# 'redis' variable is set to None, when package importing is failed
module.redis = None
self.assertRaises(ImportError, self.test_matcher.__init__,
self.conf)
# retrieve 'redis' variable which is set originally
module.redis = redis_package

View File

@ -1,129 +0,0 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import testtools
import oslo_messaging
from oslo_messaging._drivers import impl_zmq
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_socket
from oslo_messaging.tests.drivers.zmq import zmq_common
from oslo_messaging.tests import utils as test_utils
zmq = zmq_async.import_zmq()
class ZmqTestPortsRange(zmq_common.ZmqBaseTestCase):
@testtools.skipIf(zmq is None, "zmq not available")
def setUp(self):
super(ZmqTestPortsRange, self).setUp()
# Set config values
kwargs = {'rpc_zmq_min_port': 5555,
'rpc_zmq_max_port': 5560}
self.config(group='oslo_messaging_zmq', **kwargs)
def test_ports_range(self):
listeners = []
for i in range(10):
try:
target = oslo_messaging.Target(topic='testtopic_' + str(i))
new_listener = self.driver.listen(target, None, None)
listeners.append(new_listener)
except zmq_socket.ZmqPortBusy:
pass
self.assertLessEqual(len(listeners), 5)
for l in listeners:
l.cleanup()
class TestConfZmqDriverLoad(test_utils.BaseTestCase):
@testtools.skipIf(zmq is None, "zmq not available")
def setUp(self):
super(TestConfZmqDriverLoad, self).setUp()
self.messaging_conf.transport_driver = 'zmq'
def test_driver_load(self):
transport = oslo_messaging.get_transport(self.conf)
self.assertIsInstance(transport._driver, impl_zmq.ZmqDriver)
class TestZmqBasics(zmq_common.ZmqBaseTestCase):
@testtools.skipIf(zmq is None, "zmq not available")
def setUp(self):
super(TestZmqBasics, self).setUp()
self.target = oslo_messaging.Target(topic='topic')
self.ctxt = {'key': 'value'}
self.message = {'method': 'qwerty', 'args': {'int': 1, 'bool': True}}
def test_send_call_without_method_failure(self):
self.message.pop('method')
self.listener.listen(self.target)
self.assertRaises(KeyError, self.driver.send,
self.target, self.ctxt, self.message,
wait_for_reply=True, timeout=10)
def _check_listener_received(self):
self.assertTrue(self.listener._received.isSet())
self.assertEqual(self.ctxt, self.listener.message.ctxt)
self.assertEqual(self.message, self.listener.message.message)
def test_send_call_success(self):
self.listener.listen(self.target)
result = self.driver.send(self.target, self.ctxt, self.message,
wait_for_reply=True, timeout=10)
self.assertTrue(result)
self._check_listener_received()
def test_send_call_direct_success(self):
self.target.server = 'server'
self.listener.listen(self.target)
result = self.driver.send(self.target, self.ctxt, self.message,
wait_for_reply=True, timeout=10)
self.assertTrue(result)
self._check_listener_received()
def test_send_cast_direct_success(self):
self.target.server = 'server'
self.listener.listen(self.target)
result = self.driver.send(self.target, self.ctxt, self.message,
wait_for_reply=False)
self.listener._received.wait(5)
self.assertIsNone(result)
self._check_listener_received()
def test_send_fanout_success(self):
self.target.fanout = True
self.listener.listen(self.target)
result = self.driver.send(self.target, self.ctxt, self.message,
wait_for_reply=False)
self.listener._received.wait(5)
self.assertIsNone(result)
self._check_listener_received()
def test_send_notify_success(self):
self.listener.listen_notifications([(self.target, 'info')])
self.target.topic += '.info'
result = self.driver.send_notification(self.target, self.ctxt,
self.message, '3.0')
self.listener._received.wait(5)
self.assertIsNone(result)
self._check_listener_received()

View File

@ -1,150 +0,0 @@
# Copyright 2015-2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import time
import msgpack
import six
import testscenarios
from oslo_config import cfg
import oslo_messaging
from oslo_messaging._drivers.zmq_driver.proxy.central \
import zmq_publisher_proxy
from oslo_messaging._drivers.zmq_driver.proxy import zmq_proxy
from oslo_messaging._drivers.zmq_driver import zmq_address
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
from oslo_messaging._drivers.zmq_driver import zmq_version
from oslo_messaging.tests.drivers.zmq import zmq_common
load_tests = testscenarios.load_tests_apply_scenarios
zmq = zmq_async.import_zmq()
opt_group = cfg.OptGroup(name='zmq_proxy_opts',
title='ZeroMQ proxy options')
cfg.CONF.register_opts(zmq_proxy.zmq_proxy_opts, group=opt_group)
class TestPubSub(zmq_common.ZmqBaseTestCase):
LISTENERS_COUNT = 3
scenarios = [
('json', {'serialization': 'json',
'dumps': lambda obj: six.b(json.dumps(obj))}),
('msgpack', {'serialization': 'msgpack',
'dumps': msgpack.dumps})
]
def setUp(self):
super(TestPubSub, self).setUp()
kwargs = {'use_pub_sub': True,
'rpc_zmq_serialization': self.serialization}
self.config(group='oslo_messaging_zmq', **kwargs)
self.config(host="127.0.0.1", group="zmq_proxy_opts")
self.config(publisher_port=0, group="zmq_proxy_opts")
self.publisher = zmq_publisher_proxy.PublisherProxy(
self.conf, self.driver.matchmaker)
self.driver.matchmaker.register_publisher(
(self.publisher.host, ''),
expire=self.conf.oslo_messaging_zmq.zmq_target_expire)
self.listeners = []
for _ in range(self.LISTENERS_COUNT):
self.listeners.append(zmq_common.TestServerListener(self.driver))
def tearDown(self):
super(TestPubSub, self).tearDown()
self.publisher.cleanup()
for listener in self.listeners:
listener.stop()
def _send_request(self, target):
# Needed only in test env to give listener a chance to connect
# before request fires
time.sleep(1)
context = {}
message = {'method': 'hello-world'}
self.publisher.send_request(
[b"reply_id",
b'',
six.b(zmq_version.MESSAGE_VERSION),
six.b(str(zmq_names.CAST_FANOUT_TYPE)),
zmq_address.target_to_subscribe_filter(target),
b"message_id",
self.dumps([context, message])]
)
def _check_listener(self, listener):
listener._received.wait(timeout=5)
self.assertTrue(listener._received.isSet())
method = listener.message.message[u'method']
self.assertEqual(u'hello-world', method)
def _check_listener_negative(self, listener):
listener._received.wait(timeout=1)
self.assertFalse(listener._received.isSet())
def test_single_listener(self):
target = oslo_messaging.Target(topic='testtopic', fanout=True)
self.listener.listen(target)
self._send_request(target)
self._check_listener(self.listener)
def test_all_listeners(self):
target = oslo_messaging.Target(topic='testtopic', fanout=True)
for listener in self.listeners:
listener.listen(target)
self._send_request(target)
for listener in self.listeners:
self._check_listener(listener)
def test_filtered(self):
target = oslo_messaging.Target(topic='testtopic', fanout=True)
target_wrong = oslo_messaging.Target(topic='wrong', fanout=True)
self.listeners[0].listen(target)
self.listeners[1].listen(target)
self.listeners[2].listen(target_wrong)
self._send_request(target)
self._check_listener(self.listeners[0])
self._check_listener(self.listeners[1])
self._check_listener_negative(self.listeners[2])
def test_topic_part_matching(self):
target = oslo_messaging.Target(topic='testtopic', server='server')
target_part = oslo_messaging.Target(topic='testtopic', fanout=True)
self.listeners[0].listen(target)
self.listeners[1].listen(target)
self._send_request(target_part)
self._check_listener(self.listeners[0])
self._check_listener(self.listeners[1])

View File

@ -1,80 +0,0 @@
# Copyright 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_messaging._drivers.zmq_driver.client import zmq_routing_table
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging.tests import utils as test_utils
zmq = zmq_async.import_zmq()
class TestRoutingTable(test_utils.BaseTestCase):
def setUp(self):
super(TestRoutingTable, self).setUp()
def test_get_next_while_origin_changed(self):
table = zmq_routing_table.RoutingTable(self.conf)
table.register("topic1.server1", "1")
table.register("topic1.server1", "2")
table.register("topic1.server1", "3")
rr_gen = table.get_hosts_round_robin("topic1.server1")
result = []
for i in range(3):
result.append(next(rr_gen))
self.assertEqual(3, len(result))
self.assertIn("1", result)
self.assertIn("2", result)
self.assertIn("3", result)
table.register("topic1.server1", "4")
table.register("topic1.server1", "5")
table.register("topic1.server1", "6")
result = []
for i in range(6):
result.append(next(rr_gen))
self.assertEqual(6, len(result))
self.assertIn("1", result)
self.assertIn("2", result)
self.assertIn("3", result)
self.assertIn("4", result)
self.assertIn("5", result)
self.assertIn("6", result)
def test_no_targets(self):
table = zmq_routing_table.RoutingTable(self.conf)
rr_gen = table.get_hosts_round_robin("topic1.server1")
result = []
for t in rr_gen:
result.append(t)
self.assertEqual(0, len(result))
def test_target_unchanged(self):
table = zmq_routing_table.RoutingTable(self.conf)
table.register("topic1.server1", "1")
rr_gen = table.get_hosts_round_robin("topic1.server1")
result = []
for i in range(3):
result.append(next(rr_gen))
self.assertEqual(["1", "1", "1"], result)

View File

@ -1,226 +0,0 @@
# Copyright 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from six.moves import mock
import testtools
import time
import oslo_messaging
from oslo_messaging._drivers.zmq_driver.client import zmq_receivers
from oslo_messaging._drivers.zmq_driver.client import zmq_senders
from oslo_messaging._drivers.zmq_driver.proxy import zmq_proxy
from oslo_messaging._drivers.zmq_driver.server.consumers.zmq_dealer_consumer \
import DealerConsumerWithAcks
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_options
from oslo_messaging.tests.drivers.zmq import zmq_common
from oslo_messaging.tests import utils as test_utils
zmq = zmq_async.import_zmq()
class TestZmqAckManager(test_utils.BaseTestCase):
@testtools.skipIf(zmq is None, "zmq not available")
def setUp(self):
super(TestZmqAckManager, self).setUp()
# register and set necessary config opts
self.messaging_conf.transport_driver = 'zmq'
zmq_options.register_opts(self.conf, mock.MagicMock())
kwargs = {'rpc_zmq_matchmaker': 'dummy',
'use_pub_sub': False,
'use_router_proxy': True,
'rpc_thread_pool_size': 1,
'rpc_use_acks': True,
'rpc_ack_timeout_base': 5,
'rpc_ack_timeout_multiplier': 1,
'rpc_retry_attempts': 2}
self.config(group='oslo_messaging_zmq', **kwargs)
self.conf.register_opts(zmq_proxy.zmq_proxy_opts,
group='zmq_proxy_opts')
# mock set_result method of futures
self.set_result_patcher = mock.patch.object(
zmq_receivers.futurist.Future, 'set_result',
side_effect=zmq_receivers.futurist.Future.set_result, autospec=True
)
self.set_result = self.set_result_patcher.start()
# mock send method of senders
self.send_patcher = mock.patch.object(
zmq_senders.RequestSenderProxy, 'send',
side_effect=zmq_senders.RequestSenderProxy.send, autospec=True
)
self.send = self.send_patcher.start()
# get driver
transport = oslo_messaging.get_transport(self.conf)
self.driver = transport._driver
# prepare and launch proxy
self.proxy = zmq_proxy.ZmqProxy(self.conf)
vars(self.driver.matchmaker).update(vars(self.proxy.matchmaker))
self.executor = zmq_async.get_executor(self.proxy.run)
self.executor.execute()
# create listener
self.listener = zmq_common.TestServerListener(self.driver)
# create target and message
self.target = oslo_messaging.Target(topic='topic', server='server')
self.message = {'method': 'xyz', 'args': {'x': 1, 'y': 2, 'z': 3}}
# start listening to target
self.listener.listen(self.target)
# get ack manager
self.ack_manager = self.driver.client.get().publishers['default']
self.addCleanup(
zmq_common.StopRpc(
self, [('listener', 'stop'), ('executor', 'stop'),
('proxy', 'close'), ('driver', 'cleanup'),
('send_patcher', 'stop'),
('set_result_patcher', 'stop')]
)
)
# wait for all connections to be established
# and all parties to be ready for messaging
time.sleep(1)
@mock.patch.object(DealerConsumerWithAcks, '_acknowledge',
side_effect=DealerConsumerWithAcks._acknowledge,
autospec=True)
def test_cast_success_without_retries(self, received_ack_mock):
result = self.driver.send(
self.target, {}, self.message, wait_for_reply=False
)
self.assertIsNone(result)
self.ack_manager.pool.shutdown(wait=True)
self.assertTrue(self.listener._received.isSet())
self.assertEqual(self.message, self.listener.message.message)
self.assertEqual(1, self.send.call_count)
self.assertEqual(1, received_ack_mock.call_count)
self.assertEqual(2, self.set_result.call_count)
def test_cast_success_with_one_retry(self):
with mock.patch.object(DealerConsumerWithAcks,
'_acknowledge') as lost_ack_mock:
result = self.driver.send(
self.target, {}, self.message, wait_for_reply=False
)
self.assertIsNone(result)
self.listener._received.wait(5)
self.assertTrue(self.listener._received.isSet())
self.assertEqual(self.message, self.listener.message.message)
self.assertEqual(1, self.send.call_count)
self.assertEqual(1, lost_ack_mock.call_count)
self.assertEqual(0, self.set_result.call_count)
self.listener._received.clear()
with mock.patch.object(DealerConsumerWithAcks, '_acknowledge',
side_effect=DealerConsumerWithAcks._acknowledge,
autospec=True) as received_ack_mock:
self.ack_manager.pool.shutdown(wait=True)
self.assertFalse(self.listener._received.isSet())
self.assertEqual(2, self.send.call_count)
self.assertEqual(1, received_ack_mock.call_count)
self.assertEqual(2, self.set_result.call_count)
def test_cast_success_with_two_retries(self):
with mock.patch.object(DealerConsumerWithAcks,
'_acknowledge') as lost_ack_mock:
result = self.driver.send(
self.target, {}, self.message, wait_for_reply=False
)
self.assertIsNone(result)
self.listener._received.wait(5)
self.assertTrue(self.listener._received.isSet())
self.assertEqual(self.message, self.listener.message.message)
self.assertEqual(1, self.send.call_count)
self.assertEqual(1, lost_ack_mock.call_count)
self.assertEqual(0, self.set_result.call_count)
self.listener._received.clear()
self.listener._received.wait(7.5)
self.assertFalse(self.listener._received.isSet())
self.assertEqual(2, self.send.call_count)
self.assertEqual(2, lost_ack_mock.call_count)
self.assertEqual(0, self.set_result.call_count)
with mock.patch.object(DealerConsumerWithAcks, '_acknowledge',
side_effect=DealerConsumerWithAcks._acknowledge,
autospec=True) as received_ack_mock:
self.ack_manager.pool.shutdown(wait=True)
self.assertFalse(self.listener._received.isSet())
self.assertEqual(3, self.send.call_count)
self.assertEqual(1, received_ack_mock.call_count)
self.assertEqual(2, self.set_result.call_count)
@mock.patch.object(DealerConsumerWithAcks, '_acknowledge')
def test_cast_failure_exhausted_retries(self, lost_ack_mock):
result = self.driver.send(
self.target, {}, self.message, wait_for_reply=False
)
self.assertIsNone(result)
self.ack_manager.pool.shutdown(wait=True)
self.assertTrue(self.listener._received.isSet())
self.assertEqual(self.message, self.listener.message.message)
self.assertEqual(3, self.send.call_count)
self.assertEqual(3, lost_ack_mock.call_count)
self.assertEqual(1, self.set_result.call_count)
@mock.patch.object(DealerConsumerWithAcks, '_acknowledge',
side_effect=DealerConsumerWithAcks._acknowledge,
autospec=True)
@mock.patch.object(DealerConsumerWithAcks, '_reply',
side_effect=DealerConsumerWithAcks._reply,
autospec=True)
@mock.patch.object(DealerConsumerWithAcks, '_reply_from_cache',
side_effect=DealerConsumerWithAcks._reply_from_cache,
autospec=True)
def test_call_success_without_retries(self, unused_reply_from_cache_mock,
received_reply_mock,
received_ack_mock):
result = self.driver.send(
self.target, {}, self.message, wait_for_reply=True, timeout=10
)
self.assertIsNotNone(result)
self.ack_manager.pool.shutdown(wait=True)
self.assertTrue(self.listener._received.isSet())
self.assertEqual(self.message, self.listener.message.message)
self.assertEqual(1, self.send.call_count)
self.assertEqual(1, received_ack_mock.call_count)
self.assertEqual(3, self.set_result.call_count)
received_reply_mock.assert_called_once_with(mock.ANY, mock.ANY,
reply=True, failure=None)
self.assertEqual(0, unused_reply_from_cache_mock.call_count)
@mock.patch.object(DealerConsumerWithAcks, '_acknowledge')
@mock.patch.object(DealerConsumerWithAcks, '_reply')
@mock.patch.object(DealerConsumerWithAcks, '_reply_from_cache')
def test_call_failure_exhausted_retries(self, lost_reply_from_cache_mock,
lost_reply_mock, lost_ack_mock):
self.assertRaises(oslo_messaging.MessagingTimeout,
self.driver.send,
self.target, {}, self.message,
wait_for_reply=True, timeout=20)
self.ack_manager.pool.shutdown(wait=True)
self.assertTrue(self.listener._received.isSet())
self.assertEqual(self.message, self.listener.message.message)
self.assertEqual(3, self.send.call_count)
self.assertEqual(3, lost_ack_mock.call_count)
self.assertEqual(2, self.set_result.call_count)
lost_reply_mock.assert_called_once_with(mock.ANY,
reply=True, failure=None)
self.assertEqual(2, lost_reply_from_cache_mock.call_count)

View File

@ -1,67 +0,0 @@
# Copyright 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import testscenarios
import testtools
import oslo_messaging
from oslo_messaging._drivers.zmq_driver import zmq_address
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
from oslo_messaging.tests import utils as test_utils
zmq = zmq_async.import_zmq()
load_tests = testscenarios.load_tests_apply_scenarios
class TestZmqAddress(test_utils.BaseTestCase):
scenarios = [
('router', {'listener_type': zmq_names.socket_type_str(zmq.ROUTER)}),
('dealer', {'listener_type': zmq_names.socket_type_str(zmq.DEALER)})
]
@testtools.skipIf(zmq is None, "zmq not available")
def test_target_to_key_topic_only(self):
target = oslo_messaging.Target(topic='topic')
key = zmq_address.target_to_key(target, self.listener_type)
self.assertEqual(self.listener_type + '/topic', key)
@testtools.skipIf(zmq is None, "zmq not available")
def test_target_to_key_topic_server_round_robin(self):
target = oslo_messaging.Target(topic='topic', server='server')
key = zmq_address.target_to_key(target, self.listener_type)
self.assertEqual(self.listener_type + '/topic/server', key)
@testtools.skipIf(zmq is None, "zmq not available")
def test_target_to_key_topic_fanout(self):
target = oslo_messaging.Target(topic='topic', fanout=True)
key = zmq_address.target_to_key(target, self.listener_type)
self.assertEqual(self.listener_type + '/topic', key)
@testtools.skipIf(zmq is None, "zmq not available")
def test_target_to_key_topic_server_fanout(self):
target = oslo_messaging.Target(topic='topic', server='server',
fanout=True)
key = zmq_address.target_to_key(target, self.listener_type)
self.assertEqual(self.listener_type + '/topic', key)
@testtools.skipIf(zmq is None, "zmq not available")
def test_target_to_key_topic_server_fanout_no_prefix(self):
target = oslo_messaging.Target(topic='topic', server='server',
fanout=True)
key = zmq_address.target_to_key(target)
self.assertEqual('topic', key)

View File

@ -1,93 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from six.moves import mock
import testtools
from oslo_messaging._drivers.zmq_driver.poller import green_poller
from oslo_messaging._drivers.zmq_driver.poller import threading_poller
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging.tests import utils as test_utils
zmq = zmq_async.import_zmq()
class TestImportZmq(test_utils.BaseTestCase):
@testtools.skipIf(zmq is None, "zmq not available")
def setUp(self):
super(TestImportZmq, self).setUp()
def test_when_eventlet_is_available_then_load_eventlet_green_zmq(self):
zmq_async.eventletutils.is_monkey_patched = lambda _: True
mock_try_import = mock.Mock()
zmq_async.importutils.try_import = mock_try_import
zmq_async.import_zmq()
mock_try_import.assert_called_with('eventlet.green.zmq', default=None)
def test_when_evetlet_is_unavailable_then_load_zmq(self):
zmq_async.eventletutils.is_monkey_patched = lambda _: False
mock_try_import = mock.Mock()
zmq_async.importutils.try_import = mock_try_import
zmq_async.import_zmq()
mock_try_import.assert_called_with('zmq', default=None)
class TestGetPoller(test_utils.BaseTestCase):
@testtools.skipIf(zmq is None, "zmq not available")
def setUp(self):
super(TestGetPoller, self).setUp()
def test_when_eventlet_is_available_then_return_GreenPoller(self):
zmq_async.eventletutils.is_monkey_patched = lambda _: True
poller = zmq_async.get_poller()
self.assertIsInstance(poller, green_poller.GreenPoller)
def test_when_eventlet_is_unavailable_then_return_ThreadingPoller(self):
zmq_async.eventletutils.is_monkey_patched = lambda _: False
poller = zmq_async.get_poller()
self.assertIsInstance(poller, threading_poller.ThreadingPoller)
class TestGetExecutor(test_utils.BaseTestCase):
@testtools.skipIf(zmq is None, "zmq not available")
def setUp(self):
super(TestGetExecutor, self).setUp()
def test_when_eventlet_module_is_available_then_return_GreenExecutor(self):
zmq_async.eventletutils.is_monkey_patched = lambda _: True
executor = zmq_async.get_executor('any method')
self.assertIsInstance(executor, green_poller.GreenExecutor)
self.assertEqual('any method', executor._method)
def test_when_eventlet_is_unavailable_then_return_ThreadingExecutor(self):
zmq_async.eventletutils.is_monkey_patched = lambda _: False
executor = zmq_async.get_executor('any method')
self.assertIsInstance(executor,
threading_poller.ThreadingExecutor)
self.assertEqual('any method', executor._method)

View File

@ -1,127 +0,0 @@
# Copyright 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from six.moves import mock
import testtools
import oslo_messaging
from oslo_messaging._drivers import common
from oslo_messaging._drivers.zmq_driver.matchmaker.zmq_matchmaker_base \
import MatchmakerDummy
from oslo_messaging._drivers.zmq_driver.matchmaker import zmq_matchmaker_redis
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging.tests import utils as test_utils
zmq = zmq_async.import_zmq()
redis = zmq_matchmaker_redis.redis
sentinel = zmq_matchmaker_redis.redis_sentinel
class TestZmqTransportUrl(test_utils.BaseTestCase):
@testtools.skipIf(zmq is None, "zmq not available")
def setUp(self):
super(TestZmqTransportUrl, self).setUp()
def setup_url(self, url):
transport = oslo_messaging.get_transport(self.conf, url)
self.addCleanup(transport.cleanup)
driver = transport._driver
return driver, url
def mock_redis(self):
if redis is None:
self.skipTest("redis not available")
else:
redis_patcher = mock.patch.object(redis, 'StrictRedis')
self.addCleanup(redis_patcher.stop)
return redis_patcher.start()
def mock_sentinel(self):
if sentinel is None:
self.skipTest("sentinel not available")
else:
sentinel_patcher = mock.patch.object(sentinel, 'Sentinel')
self.addCleanup(sentinel_patcher.stop)
return sentinel_patcher.start()
def test_empty_url(self):
self.mock_redis()
driver, url = self.setup_url("zmq:///")
self.assertIs(zmq_matchmaker_redis.MatchmakerRedis,
driver.matchmaker.__class__)
self.assertEqual('zmq', driver.matchmaker.url.transport)
def test_error_url(self):
self.assertRaises(common.RPCException, self.setup_url, "zmq+error:///")
def test_dummy_url(self):
driver, url = self.setup_url("zmq+dummy:///")
self.assertIs(MatchmakerDummy,
driver.matchmaker.__class__)
self.assertEqual('zmq+dummy', driver.matchmaker.url.transport)
def test_redis_url(self):
self.mock_redis()
driver, url = self.setup_url("zmq+redis:///")
self.assertIs(zmq_matchmaker_redis.MatchmakerRedis,
driver.matchmaker.__class__)
self.assertEqual('zmq+redis', driver.matchmaker.url.transport)
def test_sentinel_url(self):
self.mock_sentinel()
driver, url = self.setup_url("zmq+sentinel:///")
self.assertIs(zmq_matchmaker_redis.MatchmakerSentinel,
driver.matchmaker.__class__)
self.assertEqual('zmq+sentinel', driver.matchmaker.url.transport)
def test_host_with_credentials_url(self):
self.mock_redis()
driver, url = self.setup_url("zmq://:password@host:60000/")
self.assertIs(zmq_matchmaker_redis.MatchmakerRedis,
driver.matchmaker.__class__)
self.assertEqual('zmq', driver.matchmaker.url.transport)
self.assertEqual(
[{"host": "host", "port": 60000, "password": "password"}],
driver.matchmaker._redis_hosts
)
def test_redis_multiple_hosts_url(self):
self.mock_redis()
driver, url = self.setup_url(
"zmq+redis://host1:60001,host2:60002,host3:60003/"
)
self.assertIs(zmq_matchmaker_redis.MatchmakerRedis,
driver.matchmaker.__class__)
self.assertEqual('zmq+redis', driver.matchmaker.url.transport)
self.assertEqual(
[{"host": "host1", "port": 60001, "password": None},
{"host": "host2", "port": 60002, "password": None},
{"host": "host3", "port": 60003, "password": None}],
driver.matchmaker._redis_hosts
)
def test_sentinel_multiple_hosts_url(self):
self.mock_sentinel()
driver, url = self.setup_url(
"zmq+sentinel://host1:20001,host2:20002,host3:20003/"
)
self.assertIs(zmq_matchmaker_redis.MatchmakerSentinel,
driver.matchmaker.__class__)
self.assertEqual('zmq+sentinel', driver.matchmaker.url.transport)
self.assertEqual(
[("host1", 20001), ("host2", 20002), ("host3", 20003)],
driver.matchmaker._sentinel_hosts
)

View File

@ -1,132 +0,0 @@
# Copyright 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import time
from oslo_messaging._drivers.zmq_driver.server import zmq_ttl_cache
from oslo_messaging.tests import utils as test_utils
class TestZmqTTLCache(test_utils.BaseTestCase):
def setUp(self):
super(TestZmqTTLCache, self).setUp()
def call_count_decorator(unbound_method):
def wrapper(self, *args, **kwargs):
wrapper.call_count += 1
return unbound_method(self, *args, **kwargs)
wrapper.call_count = 0
return wrapper
zmq_ttl_cache.TTLCache._update_cache = \
call_count_decorator(zmq_ttl_cache.TTLCache._update_cache)
self.cache = zmq_ttl_cache.TTLCache(ttl=1)
self.addCleanup(lambda: self.cache.cleanup())
def _test_add_get(self):
self.cache.add('x', 'a')
self.assertEqual(self.cache.get('x'), 'a')
self.assertEqual(self.cache.get('x', 'b'), 'a')
self.assertIsNone(self.cache.get('y'))
self.assertEqual(self.cache.get('y', 'b'), 'b')
time.sleep(1)
self.assertIsNone(self.cache.get('x'))
self.assertEqual(self.cache.get('x', 'b'), 'b')
def test_add_get_with_executor(self):
self._test_add_get()
def test_add_get_without_executor(self):
self.cache._executor.stop()
self._test_add_get()
def _test_in_operator(self):
self.cache.add(1)
self.assertIn(1, self.cache)
time.sleep(0.5)
self.cache.add(2)
self.assertIn(1, self.cache)
self.assertIn(2, self.cache)
time.sleep(0.75)
self.cache.add(3)
self.assertNotIn(1, self.cache)
self.assertIn(2, self.cache)
self.assertIn(3, self.cache)
time.sleep(0.5)
self.assertNotIn(2, self.cache)
self.assertIn(3, self.cache)
def test_in_operator_with_executor(self):
self._test_in_operator()
def test_in_operator_without_executor(self):
self.cache._executor.stop()
self._test_in_operator()
def _is_expired(self, key):
with self.cache._lock:
_, expiration_time = self.cache._cache[key]
return self.cache._is_expired(expiration_time, time.time())
def test_executor(self):
self.cache.add(1)
self.assertEqual([1], sorted(self.cache._cache.keys()))
self.assertFalse(self._is_expired(1))
time.sleep(0.75)
self.assertEqual(1, self.cache._update_cache.call_count)
self.cache.add(2)
self.assertEqual([1, 2], sorted(self.cache._cache.keys()))
self.assertFalse(self._is_expired(1))
self.assertFalse(self._is_expired(2))
time.sleep(0.75)
self.assertEqual(2, self.cache._update_cache.call_count)
self.cache.add(3)
if 1 in self.cache:
self.assertEqual([1, 2, 3], sorted(self.cache._cache.keys()))
self.assertTrue(self._is_expired(1))
else:
self.assertEqual([2, 3], sorted(self.cache._cache.keys()))
self.assertFalse(self._is_expired(2))
self.assertFalse(self._is_expired(3))
time.sleep(0.75)
self.assertEqual(3, self.cache._update_cache.call_count)
self.assertEqual([3], sorted(self.cache._cache.keys()))
self.assertFalse(self._is_expired(3))

View File

@ -1,63 +0,0 @@
# Copyright 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_messaging._drivers.zmq_driver import zmq_version
from oslo_messaging.tests import utils as test_utils
class Doer(object):
def __init__(self):
self.x = 1
self.y = 2
self.z = 3
def _sudo(self):
pass
def do(self):
pass
def _do_v_1_1(self):
pass
def _do_v_2_2(self):
pass
def _do_v_3_3(self):
pass
class TestZmqVersion(test_utils.BaseTestCase):
def setUp(self):
super(TestZmqVersion, self).setUp()
self.doer = Doer()
def test_get_unknown_attr_versions(self):
self.assertRaises(AssertionError, zmq_version.get_method_versions,
self.doer, 'qwerty')
def test_get_non_method_attr_versions(self):
for attr_name in vars(self.doer):
self.assertRaises(AssertionError, zmq_version.get_method_versions,
self.doer, attr_name)
def test_get_private_method_versions(self):
self.assertRaises(AssertionError, zmq_version.get_method_versions,
self.doer, '_sudo')
def test_get_public_method_versions(self):
do_versions = zmq_version.get_method_versions(self.doer, 'do')
self.assertEqual(['1.1', '2.2', '3.3'], sorted(do_versions.keys()))

View File

@ -1,111 +0,0 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import threading
import fixtures
from six.moves import mock
import testtools
import oslo_messaging
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_options
from oslo_messaging._i18n import _LE
from oslo_messaging.tests import utils as test_utils
LOG = logging.getLogger(__name__)
zmq = zmq_async.import_zmq()
class TestServerListener(object):
def __init__(self, driver):
self.driver = driver
self.listener = None
self.executor = zmq_async.get_executor(self._run)
self._stop = threading.Event()
self._received = threading.Event()
self.message = None
def listen(self, target):
self.listener = self.driver.listen(target, None,
None)._poll_style_listener
self.executor.execute()
def listen_notifications(self, targets_and_priorities):
self.listener = self.driver.listen_for_notifications(
targets_and_priorities, None, None, None)._poll_style_listener
self.executor.execute()
def _run(self):
try:
messages = self.listener.poll()
if messages:
message = messages[0]
message.acknowledge()
self._received.set()
self.message = message
message.reply(reply=True)
except Exception:
LOG.exception(_LE("Unexpected exception occurred."))
def stop(self):
self.executor.stop()
class ZmqBaseTestCase(test_utils.BaseTestCase):
"""Base test case for all ZMQ tests """
@testtools.skipIf(zmq is None, "zmq not available")
def setUp(self):
super(ZmqBaseTestCase, self).setUp()
self.messaging_conf.transport_driver = 'zmq'
zmq_options.register_opts(self.conf, mock.MagicMock())
# Set config values
self.internal_ipc_dir = self.useFixture(fixtures.TempDir()).path
kwargs = {'rpc_zmq_bind_address': '127.0.0.1',
'rpc_zmq_host': '127.0.0.1',
'rpc_zmq_ipc_dir': self.internal_ipc_dir,
'use_pub_sub': False,
'use_router_proxy': False,
'rpc_zmq_matchmaker': 'dummy'}
self.config(group='oslo_messaging_zmq', **kwargs)
self.config(rpc_response_timeout=5)
# Get driver
transport = oslo_messaging.get_transport(self.conf)
self.driver = transport._driver
self.listener = TestServerListener(self.driver)
self.addCleanup(
StopRpc(self, [('listener', 'stop'), ('driver', 'cleanup')])
)
class StopRpc(object):
def __init__(self, obj, attrs_and_stops):
self.obj = obj
self.attrs_and_stops = attrs_and_stops
def __call__(self):
for attr, stop in self.attrs_and_stops:
if hasattr(self.obj, attr):
obj_attr = getattr(self.obj, attr)
if hasattr(obj_attr, stop):
obj_attr_stop = getattr(obj_attr, stop)
obj_attr_stop()

View File

@ -1,232 +0,0 @@
# Copyright 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import logging.handlers
import multiprocessing
import os
import sys
import threading
import time
import uuid
from oslo_config import cfg
import oslo_messaging
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging.tests.functional import utils
LOG = logging.getLogger(__name__)
zmq = zmq_async.import_zmq()
class QueueHandler(logging.Handler):
"""This is a logging handler which sends events to a multiprocessing queue.
The plan is to add it to Python 3.2, but this can be copy pasted into
user code for use with earlier Python versions.
"""
def __init__(self, queue):
"""Initialise an instance, using the passed queue."""
logging.Handler.__init__(self)
self.queue = queue
def emit(self, record):
"""Emit a record.
Writes the LogRecord to the queue.
"""
try:
ei = record.exc_info
if ei:
# just to get traceback text into record.exc_text
dummy = self.format(record) # noqa
record.exc_info = None # not needed any more
self.queue.put_nowait(record)
except (KeyboardInterrupt, SystemExit):
raise
except Exception:
self.handleError(record)
def listener_configurer(conf):
root = logging.getLogger()
h = logging.StreamHandler(sys.stdout)
f = logging.Formatter('%(asctime)s %(processName)-10s %(name)s '
'%(levelname)-8s %(message)s')
h.setFormatter(f)
root.addHandler(h)
log_path = conf.oslo_messaging_zmq.rpc_zmq_ipc_dir + \
"/" + "zmq_multiproc.log"
file_handler = logging.StreamHandler(open(log_path, 'w'))
file_handler.setFormatter(f)
root.addHandler(file_handler)
def server_configurer(queue):
h = QueueHandler(queue)
root = logging.getLogger()
root.addHandler(h)
root.setLevel(logging.DEBUG)
def listener_thread(queue, configurer, conf):
configurer(conf)
while True:
time.sleep(0.3)
try:
record = queue.get()
if record is None:
break
logger = logging.getLogger(record.name)
logger.handle(record)
except (KeyboardInterrupt, SystemExit):
raise
class Client(oslo_messaging.RPCClient):
def __init__(self, transport, topic):
super(Client, self).__init__(
transport=transport, target=oslo_messaging.Target(topic=topic))
self.replies = []
def call_a(self):
LOG.warning("call_a - client side")
rep = self.call({}, 'call_a')
LOG.warning("after call_a - client side")
self.replies.append(rep)
return rep
class ReplyServerEndpoint(object):
def call_a(self, *args, **kwargs):
LOG.warning("call_a - Server endpoint reached!")
return "OK"
class Server(object):
def __init__(self, conf, log_queue, transport_url, name, topic=None):
self.conf = conf
self.log_queue = log_queue
self.transport_url = transport_url
self.name = name
self.topic = topic or str(uuid.uuid4())
self.ready = multiprocessing.Value('b', False)
self._stop = multiprocessing.Event()
def start(self):
self.process = multiprocessing.Process(target=self._run_server,
name=self.name,
args=(self.conf,
self.transport_url,
self.log_queue,
self.ready))
self.process.start()
LOG.debug("Server process started: pid: %d", self.process.pid)
def _run_server(self, conf, url, log_queue, ready):
server_configurer(log_queue)
LOG.debug("Starting RPC server")
transport = oslo_messaging.get_transport(conf, url=url)
target = oslo_messaging.Target(topic=self.topic, server=self.name)
self.rpc_server = oslo_messaging.get_rpc_server(
transport=transport, target=target,
endpoints=[ReplyServerEndpoint()],
executor='eventlet')
self.rpc_server.start()
ready.value = True
LOG.debug("RPC server being started")
while not self._stop.is_set():
LOG.debug("Waiting for the stop signal ...")
time.sleep(1)
self.rpc_server.stop()
self.rpc_server.wait()
LOG.debug("Leaving process T:%s Pid:%d", str(target), os.getpid())
def cleanup(self):
LOG.debug("Stopping server")
self.shutdown()
def shutdown(self):
self._stop.set()
def restart(self, time_for_restart=1):
pass
def hang(self):
pass
def crash(self):
pass
def ping(self):
pass
class MultiprocTestCase(utils.SkipIfNoTransportURL):
def setUp(self):
super(MultiprocTestCase, self).setUp(conf=cfg.ConfigOpts())
if not self.url.startswith("zmq"):
self.skipTest("ZeroMQ specific skipped...")
self.transport = oslo_messaging.get_transport(self.conf, url=self.url)
LOG.debug("Start log queue")
self.log_queue = multiprocessing.Queue()
self.log_listener = threading.Thread(target=listener_thread,
args=(self.log_queue,
listener_configurer,
self.conf))
self.log_listener.start()
self.spawned = []
self.conf.prog = "test_prog"
self.conf.project = "test_project"
def tearDown(self):
for process in self.spawned:
process.cleanup()
super(MultiprocTestCase, self).tearDown()
def get_client(self, topic):
return Client(self.transport, topic)
def spawn_server(self, wait_for_server=False, topic=None):
name = "server_%d_%s" % (len(self.spawned), str(uuid.uuid4())[:8])
server = Server(self.conf, self.log_queue, self.url, name, topic)
LOG.debug("[SPAWN] %s (starting)...", server.name)
server.start()
if wait_for_server:
while not server.ready.value:
LOG.debug("[SPAWN] %s (waiting for server ready)...",
server.name)
time.sleep(1)
LOG.debug("[SPAWN] Server %s:%d started.",
server.name, server.process.pid)
self.spawned.append(server)
return server
def spawn_servers(self, number, wait_for_server=False, common_topic=True):
topic = str(uuid.uuid4()) if common_topic else None
for _ in range(number):
self.spawn_server(wait_for_server, topic)

View File

@ -1,49 +0,0 @@
# Copyright 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import sys
from oslo_messaging.tests.functional.zmq import multiproc_utils
class StartupOrderTestCase(multiproc_utils.MultiprocTestCase):
def setUp(self):
super(StartupOrderTestCase, self).setUp()
self.conf.prog = "test_prog"
self.conf.project = "test_project"
self.config(rpc_response_timeout=10)
log_path = os.path.join(self.conf.oslo_messaging_zmq.rpc_zmq_ipc_dir,
str(os.getpid()) + ".log")
sys.stdout = open(log_path, "wb", buffering=0)
def test_call_client_wait_for_server(self):
server = self.spawn_server(wait_for_server=True)
client = self.get_client(server.topic)
for _ in range(3):
reply = client.call_a()
self.assertIsNotNone(reply)
self.assertEqual(3, len(client.replies))
def test_call_client_dont_wait_for_server(self):
server = self.spawn_server(wait_for_server=False)
client = self.get_client(server.topic)
for _ in range(3):
reply = client.call_a()
self.assertIsNotNone(reply)
self.assertEqual(3, len(client.replies))