Fix for coverage job showing 0% coverage for kombu

0% coverage was due to unintended deletion of one
of __init__.py files. After adding it, some of
the tests failed, so I have to also fix them.

Change-Id: Iee094ffd24cc1397e84f5d7be4977efbd986e636
Closes-bug: 1664228
Closes-bug: 1664603
(cherry picked from commit 51c784daef)
This commit is contained in:
Dawid Deja 2017-02-13 15:16:31 +01:00 committed by Renat Akhmerov
parent 638ca145b6
commit b8927d8a94
5 changed files with 35 additions and 21 deletions

View File

@ -86,6 +86,7 @@ class KombuRPCServer(rpc_base.RPCServer, kombu_base.Base):
while True:
try:
_retry_connection = False
host = self._hosts.get_host()
self.conn = self._make_connection(
@ -139,19 +140,22 @@ class KombuRPCServer(rpc_base.RPCServer, kombu_base.Base):
return
except (socket.error, amqp.exceptions.ConnectionForced) as e:
LOG.debug("Broker connection failed: %s" % e)
_retry_connection = True
finally:
self._stopped.set()
LOG.debug("Sleeping for %s seconds, than retrying connection" %
self._sleep_time
)
if _retry_connection:
LOG.debug(
"Sleeping for %s seconds, than retrying connection" %
self._sleep_time
)
time.sleep(self._sleep_time)
time.sleep(self._sleep_time)
self._sleep_time = min(
self._sleep_time * 2,
self._max_sleep_time
)
self._sleep_time = min(
self._sleep_time * 2,
self._max_sleep_time
)
def stop(self, graceful=False):
self._running.clear()

View File

@ -45,7 +45,9 @@ class KombuClientTestCase(base.KombuTestCase):
self.client._listener.get_result = mock.MagicMock(
return_value={
kombu_base.TYPE: None,
kombu_base.RESULT: self._RESPONSE
kombu_base.RESULT: self.client._serialize_message({
'body': self._RESPONSE
})
}
)
response = self.client.sync_call(self.ctx, 'method')

View File

@ -36,7 +36,7 @@ class KombuListenerTestCase(base.KombuTestCase):
super(KombuListenerTestCase, self).setUp()
self.listener = kombu_listener.KombuRPCListener(
mock.MagicMock(),
[mock.MagicMock()],
mock.MagicMock()
)
self.ctx = type('context', (object,), {'to_dict': lambda self: {}})()

View File

@ -64,12 +64,12 @@ class KombuServerTestCase(base.KombuTestCase):
self.server.publish_message(body, reply_to, corr_id, type)
enter_mock.publish.assert_called_once_with(
body=body,
exchange=self.conf['exchange'],
body={'body': '"body"'},
exchange='openstack',
routing_key=reply_to,
correlation_id=corr_id,
type=type,
serializer='mistral_serialization'
serializer='json'
)
def test_run_launch_successfully(self):
@ -84,7 +84,7 @@ class KombuServerTestCase(base.KombuTestCase):
def side_effect(*args, **kwargs):
self.assertTrue(self.server.is_running)
self.server.stop()
raise KeyboardInterrupt
acquire_mock = mock.MagicMock()
acquire_mock.drain_events.side_effect = side_effect
@ -92,13 +92,21 @@ class KombuServerTestCase(base.KombuTestCase):
self.server.run()
self.assertFalse(self.server.is_running)
self.assertEqual(self.server._sleep_time, 1)
def test_run_socket_error_reconnect(self):
def side_effect(*args, **kwargs):
if acquire_mock.drain_events.call_count == 1:
raise socket.error()
raise TestException()
def test_run_raise_mistral_exception(self):
acquire_mock = mock.MagicMock()
acquire_mock.drain_events.side_effect = socket.error()
acquire_mock.drain_events.side_effect = side_effect
fake_kombu.connection.acquire.return_value = acquire_mock
self.assertRaises(exc.MistralException, self.server.run)
self.assertRaises(TestException, self.server.run)
self.assertEqual(self.server._sleep_time, 2)
def test_run_socket_timeout_still_running(self):
@ -197,10 +205,10 @@ class KombuServerTestCase(base.KombuTestCase):
'async': True,
'rpc_ctx': {},
'rpc_method': 'found_method',
'arguments': {
'arguments': self.server._serialize_message({
'a': 1,
'b': 2
}
})
}
message = mock.MagicMock()
@ -231,10 +239,10 @@ class KombuServerTestCase(base.KombuTestCase):
'async': False,
'rpc_ctx': {},
'rpc_method': 'found_method',
'arguments': {
'arguments': self.server._serialize_message({
'a': 1,
'b': 2
}
})
}
reply_to = 'reply_to'