diff --git a/oslo_messaging/conffixture.py b/oslo_messaging/conffixture.py index c6edddd..4e6c9d5 100644 --- a/oslo_messaging/conffixture.py +++ b/oslo_messaging/conffixture.py @@ -49,11 +49,11 @@ class ConfFixture(fixtures.Fixture): 'oslo_messaging._drivers.impl_rabbit', 'rabbit_opts', 'oslo_messaging_rabbit') _import_opts(self.conf, - 'oslo_messaging._drivers.amqp', 'amqp_opts', + 'oslo_messaging._drivers.base', 'base_opts', 'oslo_messaging_rabbit') _import_opts(self.conf, 'oslo_messaging._drivers.amqp', 'amqp_opts', - 'oslo_messaging_qpid') + 'oslo_messaging_rabbit') _import_opts(self.conf, 'oslo_messaging._drivers.amqp1_driver.opts', 'amqp1_opts', 'oslo_messaging_amqp') diff --git a/oslo_messaging/tests/functional/test_rabbitmq.py b/oslo_messaging/tests/functional/test_rabbitmq.py new file mode 100644 index 0000000..74fb73e --- /dev/null +++ b/oslo_messaging/tests/functional/test_rabbitmq.py @@ -0,0 +1,140 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +import os +import signal +import time + +import fixtures +from pifpaf.drivers import rabbitmq + +from oslo_messaging.tests.functional import utils +from oslo_messaging.tests import utils as test_utils + + +class ConnectedPortMatcher(object): + def __init__(self, port): + self.port = port + + def __eq__(self, data): + return data.get("port") == self.port + + def __repr__(self): + return "" % self.port + + +class RabbitMQFailoverTests(test_utils.BaseTestCase): + def test_failover_scenario(self): + # NOTE(sileht): run this test only if functionnal suite run of a driver + # that use rabbitmq as backend + self.driver = os.environ.get('TRANSPORT_DRIVER') + if self.driver not in ["pika", "rabbit"]: + self.skipTest("TRANSPORT_DRIVER is not set to a rabbit driver") + + # NOTE(sileht): Allow only one response at a time, to + # have only one tcp connection for reply and ensure it will failover + # correctly + self.config(heartbeat_timeout_threshold=1, + rpc_conn_pool_size=1, + kombu_reconnect_delay=0, + rabbit_retry_interval=0, + rabbit_retry_backoff=0, + group='oslo_messaging_rabbit') + + #  + self.pifpaf = self.useFixture(rabbitmq.RabbitMQDriver(cluster=True, + port=5692)) + + self.url = self.pifpaf.env["PIFPAF_URL"] + self.n1 = self.pifpaf.env["PIFPAF_RABBITMQ_NODENAME1"] + self.n2 = self.pifpaf.env["PIFPAF_RABBITMQ_NODENAME2"] + self.n3 = self.pifpaf.env["PIFPAF_RABBITMQ_NODENAME3"] + + # ensure connections will be establish to the first node + self.pifpaf.stop_node(self.n2) + self.pifpaf.stop_node(self.n3) + + self.servers = self.useFixture(utils.RpcServerGroupFixture( + self.conf, self.url, endpoint=self, names=["server"])) + + # Don't randomize rabbit hosts + self.useFixture(fixtures.MockPatch( + 'oslo_messaging._drivers.impl_rabbit.random', + side_effect=lambda x: x)) + + # NOTE(sileht): this connects server connections and reply + # connection to nodename n1 + self.client = self.servers.client(0) + self.client.ping() + self._check_ports(self.pifpaf.port) + + # Switch to node n2 + self.pifpaf.start_node(self.n2) + self.assertEqual("callback done", self.client.kill_and_process()) + self.assertEqual("callback done", self.client.just_process()) + self._check_ports(self.pifpaf.get_port(self.n2)) + + # Switch to node n3 + self.pifpaf.start_node(self.n3) + time.sleep(0.1) + self.pifpaf.kill_node(self.n2, signal=signal.SIGKILL) + time.sleep(0.1) + self.assertEqual("callback done", self.client.just_process()) + self._check_ports(self.pifpaf.get_port(self.n3)) + + self.pifpaf.start_node(self.n1) + time.sleep(0.1) + self.pifpaf.kill_node(self.n3, signal=signal.SIGKILL) + time.sleep(0.1) + self.assertEqual("callback done", self.client.just_process()) + self._check_ports(self.pifpaf.get_port(self.n1)) + + def kill_and_process(self, *args, **kargs): + self.pifpaf.kill_node(self.n1, signal=signal.SIGKILL) + time.sleep(0.1) + return "callback done" + + def just_process(self, *args, **kargs): + return "callback done" + + def _get_log_call_startswith(self, filter): + return [call for call in self.logger.debug.mock_calls + if call[1][0].startswith(filter)] + + def _check_ports(self, port): + getattr(self, '_check_ports_%s_driver' % self.driver)(port) + + def _check_ports_pika_driver(self, port): + rpc_server = self.servers.servers[0].server + # FIXME(sileht): Check other connections + connections = [ + rpc_server.listener._poll_style_listener._connection + ] + for conn in connections: + self.assertEqual( + port, conn._impl.socket.getpeername()[1]) + + def _check_ports_rabbit_driver(self, port): + rpc_server = self.servers.servers[0].server + connection_contexts = [ + # rpc server + rpc_server.listener._poll_style_listener.conn, + # rpc client + self.client.client.transport._driver._get_connection(), + # rpc client replies waiter + self.client.client.transport._driver._reply_q_conn, + ] + + for cctxt in connection_contexts: + socket = cctxt.connection.channel.connection.sock + self.assertEqual(port, socket.getpeername()[1]) diff --git a/oslo_messaging/tests/functional/utils.py b/oslo_messaging/tests/functional/utils.py index 0580cbd..e489522 100644 --- a/oslo_messaging/tests/functional/utils.py +++ b/oslo_messaging/tests/functional/utils.py @@ -122,7 +122,7 @@ class RpcServerFixture(fixtures.Fixture): class RpcServerGroupFixture(fixtures.Fixture): def __init__(self, conf, url, topic=None, names=None, exchange=None, - use_fanout_ctrl=False): + use_fanout_ctrl=False, endpoint=None): self.conf = conf self.url = url # NOTE(sileht): topic and servier_name must be uniq @@ -133,6 +133,7 @@ class RpcServerGroupFixture(fixtures.Fixture): self.exchange = exchange self.targets = [self._target(server=n) for n in self.names] self.use_fanout_ctrl = use_fanout_ctrl + self.endpoint = endpoint def setUp(self): super(RpcServerGroupFixture, self).setUp() @@ -149,6 +150,7 @@ class RpcServerGroupFixture(fixtures.Fixture): if self.use_fanout_ctrl: ctrl = self._target(fanout=True) server = RpcServerFixture(self.conf, self.url, target, + endpoint=self.endpoint, ctrl_target=ctrl) return server @@ -277,7 +279,15 @@ class IsValidDistributionOf(object): class SkipIfNoTransportURL(test_utils.BaseTestCase): def setUp(self, conf=cfg.CONF): super(SkipIfNoTransportURL, self).setUp(conf=conf) - self.url = os.environ.get('TRANSPORT_URL') + + driver = os.environ.get("TRANSPORT_DRIVER") + if driver: + self.url = os.environ.get('PIFPAF_URL') + if driver == "pika" and self.url: + self.url = self.url.replace("rabbit://", "pika://") + else: + self.url = os.environ.get('TRANSPORT_URL') + if not self.url: self.skipTest("No transport url configured") diff --git a/setup-test-env-pika.sh b/setup-test-env-pika.sh deleted file mode 100755 index 5fe1895..0000000 --- a/setup-test-env-pika.sh +++ /dev/null @@ -1,32 +0,0 @@ -#!/bin/bash -set -e - -. tools/functions.sh - -DATADIR=$(mktemp -d /tmp/OSLOMSG-RABBIT.XXXXX) -trap "clean_exit $DATADIR" EXIT - -export RABBITMQ_NODE_IP_ADDRESS=127.0.0.1 -export RABBITMQ_NODE_PORT=65123 -export RABBITMQ_NODENAME=oslomsg-test@localhost -export RABBITMQ_LOG_BASE=$DATADIR -export RABBITMQ_MNESIA_BASE=$DATADIR -export RABBITMQ_PID_FILE=$DATADIR/pid -export HOME=$DATADIR - -# NOTE(sileht): We directly use the rabbitmq scripts -# to avoid distribution check, like running as root/rabbitmq -# enforcing. -export PATH=/usr/lib/rabbitmq/bin/:$PATH - - -mkfifo ${DATADIR}/out -rabbitmq-server &> ${DATADIR}/out & -wait_for_line "Starting broker... completed" "ERROR:" ${DATADIR}/out - -rabbitmqctl add_user oslomsg oslosecret -rabbitmqctl set_permissions "oslomsg" ".*" ".*" ".*" - - -export TRANSPORT_URL=pika://oslomsg:oslosecret@127.0.0.1:65123// -$* diff --git a/setup-test-env-rabbit.sh b/setup-test-env-rabbit.sh deleted file mode 100755 index aaf6603..0000000 --- a/setup-test-env-rabbit.sh +++ /dev/null @@ -1,32 +0,0 @@ -#!/bin/bash -set -e - -. tools/functions.sh - -DATADIR=$(mktemp -d /tmp/OSLOMSG-RABBIT.XXXXX) -trap "clean_exit $DATADIR" EXIT - -export RABBITMQ_NODE_IP_ADDRESS=127.0.0.1 -export RABBITMQ_NODE_PORT=65123 -export RABBITMQ_NODENAME=oslomsg-test@localhost -export RABBITMQ_LOG_BASE=$DATADIR -export RABBITMQ_MNESIA_BASE=$DATADIR -export RABBITMQ_PID_FILE=$DATADIR/pid -export HOME=$DATADIR - -# NOTE(sileht): We directly use the rabbitmq scripts -# to avoid distribution check, like running as root/rabbitmq -# enforcing. -export PATH=/usr/lib/rabbitmq/bin/:$PATH - - -mkfifo ${DATADIR}/out -rabbitmq-server &> ${DATADIR}/out & -wait_for_line "Starting broker... completed" "ERROR:" ${DATADIR}/out - -rabbitmqctl add_user oslomsg oslosecret -rabbitmqctl set_permissions "oslomsg" ".*" ".*" ".*" - - -export TRANSPORT_URL=rabbit://oslomsg:oslosecret@127.0.0.1:65123// -$* diff --git a/test-requirements.txt b/test-requirements.txt index c50aa10..a9aad5a 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -14,7 +14,7 @@ testrepository>=0.0.18 # Apache-2.0/BSD testscenarios>=0.4 # Apache-2.0/BSD testtools>=1.4.0 # MIT oslotest>=1.10.0 # Apache-2.0 - +pifpaf>=0.4.0 # Apache-2.0 # for test_matchmaker_redis redis>=2.10.0 # MIT diff --git a/tox.ini b/tox.ini index e39daba..044fb44 100644 --- a/tox.ini +++ b/tox.ini @@ -25,14 +25,17 @@ commands = {posargs} commands = python setup.py build_sphinx [testenv:py27-func-rabbit] -commands = {toxinidir}/setup-test-env-rabbit.sh python setup.py testr --slowest --testr-args='oslo_messaging.tests.functional' +setenv = TRANSPORT_DRIVER=rabbit +commands = pifpaf run rabbitmq -- python setup.py testr --slowest --testr-args='oslo_messaging.tests.functional' [testenv:py34-func-rabbit] +setenv = TRANSPORT_DRIVER=rabbit basepython = python3.4 -commands = {toxinidir}/setup-test-env-rabbit.sh python setup.py testr --slowest --testr-args='oslo_messaging.tests.functional' +commands = pifpaf run rabbitmq -- python setup.py testr --slowest --testr-args='oslo_messaging.tests.functional' [testenv:py27-func-pika] -commands = {toxinidir}/setup-test-env-pika.sh python setup.py testr --slowest --testr-args='oslo_messaging.tests.functional' +setenv = TRANSPORT_DRIVER=pika +commands = pifpaf run rabbitmq -- python setup.py testr --slowest --testr-args='oslo_messaging.tests.functional' [testenv:py27-func-amqp1] setenv = TRANSPORT_URL=amqp://stackqpid:secretqpid@127.0.0.1:65123//