diff --git a/mistral/engine/rpc_backend/kombu/kombu_server.py b/mistral/engine/rpc_backend/kombu/kombu_server.py index 99339eca4..6a5386887 100644 --- a/mistral/engine/rpc_backend/kombu/kombu_server.py +++ b/mistral/engine/rpc_backend/kombu/kombu_server.py @@ -86,6 +86,7 @@ class KombuRPCServer(rpc_base.RPCServer, kombu_base.Base): while True: try: + _retry_connection = False host = self._hosts.get_host() self.conn = self._make_connection( @@ -139,19 +140,22 @@ class KombuRPCServer(rpc_base.RPCServer, kombu_base.Base): return except (socket.error, amqp.exceptions.ConnectionForced) as e: LOG.debug("Broker connection failed: %s" % e) + _retry_connection = True finally: self._stopped.set() - LOG.debug("Sleeping for %s seconds, than retrying connection" % - self._sleep_time - ) + if _retry_connection: + LOG.debug( + "Sleeping for %s seconds, than retrying connection" % + self._sleep_time + ) - time.sleep(self._sleep_time) + time.sleep(self._sleep_time) - self._sleep_time = min( - self._sleep_time * 2, - self._max_sleep_time - ) + self._sleep_time = min( + self._sleep_time * 2, + self._max_sleep_time + ) def stop(self, graceful=False): self._running.clear() diff --git a/mistral/tests/unit/engine/rpc_backend/__init__.py b/mistral/tests/unit/engine/rpc_backend/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/mistral/tests/unit/engine/rpc_backend/kombu/test_kombu_client.py b/mistral/tests/unit/engine/rpc_backend/kombu/test_kombu_client.py index 40777169e..f3e70627f 100644 --- a/mistral/tests/unit/engine/rpc_backend/kombu/test_kombu_client.py +++ b/mistral/tests/unit/engine/rpc_backend/kombu/test_kombu_client.py @@ -45,7 +45,9 @@ class KombuClientTestCase(base.KombuTestCase): self.client._listener.get_result = mock.MagicMock( return_value={ kombu_base.TYPE: None, - kombu_base.RESULT: self._RESPONSE + kombu_base.RESULT: self.client._serialize_message({ + 'body': self._RESPONSE + }) } ) response = self.client.sync_call(self.ctx, 'method') diff --git a/mistral/tests/unit/engine/rpc_backend/kombu/test_kombu_listener.py b/mistral/tests/unit/engine/rpc_backend/kombu/test_kombu_listener.py index 8ee1a0e71..96abe426c 100644 --- a/mistral/tests/unit/engine/rpc_backend/kombu/test_kombu_listener.py +++ b/mistral/tests/unit/engine/rpc_backend/kombu/test_kombu_listener.py @@ -36,7 +36,7 @@ class KombuListenerTestCase(base.KombuTestCase): super(KombuListenerTestCase, self).setUp() self.listener = kombu_listener.KombuRPCListener( - mock.MagicMock(), + [mock.MagicMock()], mock.MagicMock() ) self.ctx = type('context', (object,), {'to_dict': lambda self: {}})() diff --git a/mistral/tests/unit/engine/rpc_backend/kombu/test_kombu_server.py b/mistral/tests/unit/engine/rpc_backend/kombu/test_kombu_server.py index 19391b428..35f31e4f7 100644 --- a/mistral/tests/unit/engine/rpc_backend/kombu/test_kombu_server.py +++ b/mistral/tests/unit/engine/rpc_backend/kombu/test_kombu_server.py @@ -64,12 +64,12 @@ class KombuServerTestCase(base.KombuTestCase): self.server.publish_message(body, reply_to, corr_id, type) enter_mock.publish.assert_called_once_with( - body=body, - exchange=self.conf['exchange'], + body={'body': '"body"'}, + exchange='openstack', routing_key=reply_to, correlation_id=corr_id, type=type, - serializer='mistral_serialization' + serializer='json' ) def test_run_launch_successfully(self): @@ -84,7 +84,7 @@ class KombuServerTestCase(base.KombuTestCase): def side_effect(*args, **kwargs): self.assertTrue(self.server.is_running) - self.server.stop() + raise KeyboardInterrupt acquire_mock = mock.MagicMock() acquire_mock.drain_events.side_effect = side_effect @@ -92,13 +92,21 @@ class KombuServerTestCase(base.KombuTestCase): self.server.run() self.assertFalse(self.server.is_running) + self.assertEqual(self.server._sleep_time, 1) + + def test_run_socket_error_reconnect(self): + + def side_effect(*args, **kwargs): + if acquire_mock.drain_events.call_count == 1: + raise socket.error() + raise TestException() - def test_run_raise_mistral_exception(self): acquire_mock = mock.MagicMock() - acquire_mock.drain_events.side_effect = socket.error() + acquire_mock.drain_events.side_effect = side_effect fake_kombu.connection.acquire.return_value = acquire_mock - self.assertRaises(exc.MistralException, self.server.run) + self.assertRaises(TestException, self.server.run) + self.assertEqual(self.server._sleep_time, 2) def test_run_socket_timeout_still_running(self): @@ -197,10 +205,10 @@ class KombuServerTestCase(base.KombuTestCase): 'async': True, 'rpc_ctx': {}, 'rpc_method': 'found_method', - 'arguments': { + 'arguments': self.server._serialize_message({ 'a': 1, 'b': 2 - } + }) } message = mock.MagicMock() @@ -231,10 +239,10 @@ class KombuServerTestCase(base.KombuTestCase): 'async': False, 'rpc_ctx': {}, 'rpc_method': 'found_method', - 'arguments': { + 'arguments': self.server._serialize_message({ 'a': 1, 'b': 2 - } + }) } reply_to = 'reply_to'