From 97d457f0afa94e21254148cc749c103943f16954 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Herv=C3=A9=20Beraud?= Date: Thu, 24 Sep 2020 11:24:14 +0200 Subject: [PATCH] Display the reply queue's name in timeout logs It would be helpful if "Timed out waiting for " log messages at least specified on which `reply_q` it was waited for. Example without the reply_q: ``` 12228 2020-09-14 14:56:37.187 7 WARNING nova.conductor.api [req-1e081db6-808b-4af1-afc1-b87db7839394 - - - - -] Timed out waiting for nova-conductor. Is it running? Or did this service start before nova-conductor? Reattempting establishment of nova-conductor connection...: oslo_messaging.exceptions.MessagingTimeout: Timed out waiting for a reply to message ID 1640e7ef6f314451ba9a75d9ff6136ad ``` Example after adding the reply_q: ``` 12228 2020-09-14 14:56:37.187 7 WARNING nova.conductor.api [req-1e081db6-808b-4af1-afc1-b87db7839394 - - - - -] Timed out waiting for nova-conductor. Is it running? Or did this service start before nova-conductor? Reattempting establishment of nova-conductor connection...: oslo_messaging.exceptions.MessagingTimeout: Timed out waiting for a reply (reply_2882766a63b540dabaf7d019cf0c0cda) to message ID 1640e7ef6f314451ba9a75d9ff6136ad ``` It could help us to more merely debug and observe if something went wrong with a reply queue. Change-Id: Ied2c881c71930dc631919113adc00112648f9d72 Closes-Bug: #1896925 --- oslo_messaging/_drivers/amqpdriver.py | 33 +++++++++++++------ .../tests/drivers/test_impl_rabbit.py | 4 +-- .../reply_q-timeout-e3c3bae636e8bc74.yaml | 4 +++ 3 files changed, 29 insertions(+), 12 deletions(-) create mode 100644 releasenotes/notes/reply_q-timeout-e3c3bae636e8bc74.yaml diff --git a/oslo_messaging/_drivers/amqpdriver.py b/oslo_messaging/_drivers/amqpdriver.py index fa932bca4..1db25e7fa 100644 --- a/oslo_messaging/_drivers/amqpdriver.py +++ b/oslo_messaging/_drivers/amqpdriver.py @@ -583,9 +583,11 @@ class ReplyWaiter(object): self.waiters.remove(msg_id) @staticmethod - def _raise_timeout_exception(msg_id): + def _raise_timeout_exception(msg_id, reply_q): raise oslo_messaging.MessagingTimeout( - 'Timed out waiting for a reply to message ID %s.', msg_id) + 'Timed out waiting for a reply %(reply_q)s ' + 'to message ID %(msg_id)s.', + {'msg_id': msg_id, 'reply_q': reply_q}) def _process_reply(self, data): self.msg_id_cache.check_duplicate_message(data) @@ -599,7 +601,7 @@ class ReplyWaiter(object): ending = data.get('ending', False) return result, ending - def wait(self, msg_id, timeout, call_monitor_timeout): + def wait(self, msg_id, timeout, call_monitor_timeout, reply_q): # NOTE(sileht): for each msg_id we receive two amqp message # first one with the payload, a second one to ensure the other # have finish to send the payload @@ -617,16 +619,26 @@ class ReplyWaiter(object): final_reply = None ending = False while not ending: - timeout = timer.check_return(self._raise_timeout_exception, msg_id) + timeout = timer.check_return( + self._raise_timeout_exception, + msg_id, + reply_q + ) if call_monitor_timer and timeout > 0: cm_timeout = call_monitor_timer.check_return( - self._raise_timeout_exception, msg_id) + self._raise_timeout_exception, + msg_id, + reply_q + ) if cm_timeout < timeout: timeout = cm_timeout try: message = self.waiters.get(msg_id, timeout=timeout) except queue.Empty: - self._raise_timeout_exception(msg_id) + self._raise_timeout_exception( + msg_id, + reply_q + ) reply, ending = self._process_reply(message) if reply is not None: @@ -700,6 +712,7 @@ class AMQPDriverBase(base.BaseDriver): envelope=True, notify=False, retry=None, transport_options=None): msg = message + reply_q = None if 'method' in msg: LOG.debug('Calling RPC method %s on target %s', msg.get('method'), target.topic) @@ -707,13 +720,13 @@ class AMQPDriverBase(base.BaseDriver): LOG.debug('Sending message to topic %s', target.topic) if wait_for_reply: - _reply_q = self._get_reply_q() + reply_q = self._get_reply_q() msg_id = uuid.uuid4().hex msg.update({'_msg_id': msg_id}) - msg.update({'_reply_q': _reply_q}) + msg.update({'_reply_q': reply_q}) msg.update({'_timeout': call_monitor_timeout}) LOG.info('Expecting reply to msg %s in queue %s', msg_id, - _reply_q) + reply_q) rpc_amqp._add_unique_id(msg) unique_id = msg[rpc_amqp.UNIQUE_ID] @@ -756,7 +769,7 @@ class AMQPDriverBase(base.BaseDriver): if wait_for_reply: result = self._waiter.wait(msg_id, timeout, - call_monitor_timeout) + call_monitor_timeout, reply_q) if isinstance(result, Exception): raise result return result diff --git a/oslo_messaging/tests/drivers/test_impl_rabbit.py b/oslo_messaging/tests/drivers/test_impl_rabbit.py index dbbf33cd8..eb44b7142 100644 --- a/oslo_messaging/tests/drivers/test_impl_rabbit.py +++ b/oslo_messaging/tests/drivers/test_impl_rabbit.py @@ -668,7 +668,7 @@ class TestRacyWaitForReply(test_utils.BaseTestCase): wait_conditions = [] orig_reply_waiter = amqpdriver.ReplyWaiter.wait - def reply_waiter(self, msg_id, timeout, call_monitor_timeout): + def reply_waiter(self, msg_id, timeout, call_monitor_timeout, reply_q): if wait_conditions: cond = wait_conditions.pop() with cond: @@ -676,7 +676,7 @@ class TestRacyWaitForReply(test_utils.BaseTestCase): with cond: cond.wait() return orig_reply_waiter(self, msg_id, timeout, - call_monitor_timeout) + call_monitor_timeout, reply_q) self.useFixture(fixtures.MockPatchObject( amqpdriver.ReplyWaiter, 'wait', reply_waiter)) diff --git a/releasenotes/notes/reply_q-timeout-e3c3bae636e8bc74.yaml b/releasenotes/notes/reply_q-timeout-e3c3bae636e8bc74.yaml new file mode 100644 index 000000000..7b0147bf8 --- /dev/null +++ b/releasenotes/notes/reply_q-timeout-e3c3bae636e8bc74.yaml @@ -0,0 +1,4 @@ +--- +features: + - | + The name of the ``reply_q`` is now logged when a timeout occurs while waiting for a reply.