diff --git a/requirements.txt b/requirements.txt index cc4c878d1..c1c207aa1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -34,4 +34,5 @@ six>=1.10.0 # MIT SQLAlchemy!=1.1.5,!=1.1.6,!=1.1.7,!=1.1.8,>=1.0.10 # MIT sqlalchemy-migrate>=0.11.0 # Apache-2.0 stevedore>=1.20.0 # Apache-2.0 +tenacity>=4.9.0 # Apache-2.0 WebOb>=1.7.1 # MIT diff --git a/senlin/engine/notifications/message.py b/senlin/engine/notifications/message.py index 7045f6ce8..a63d74926 100755 --- a/senlin/engine/notifications/message.py +++ b/senlin/engine/notifications/message.py @@ -14,6 +14,7 @@ from oslo_config import cfg from oslo_context import context as oslo_context from oslo_log import log as logging import six +import tenacity from senlin.common import context as senlin_context from senlin.common import exception @@ -24,6 +25,11 @@ LOG = logging.getLogger(__name__) CONF = cfg.CONF +RETRY_ATTEMPTS = 3 +RETRY_INITIAL_DELAY = 1 +RETRY_BACKOFF = 1 +RETRY_MAX = 3 + class Message(object): """Zaqar message type of notification.""" @@ -67,19 +73,23 @@ class Message(object): return params + @tenacity.retry( + retry=tenacity.retry_if_exception_type(exception.EResourceCreation), + wait=tenacity.wait_incrementing( + RETRY_INITIAL_DELAY, RETRY_BACKOFF, RETRY_MAX), + stop=tenacity.stop_after_attempt(RETRY_ATTEMPTS)) def post_lifecycle_hook_message(self, lifecycle_action_token, node_id, resource_id, lifecycle_transition_type): + message_list = [{ + "ttl": CONF.notification.ttl, + "body": { + "lifecycle_action_token": lifecycle_action_token, + "node_id": node_id, + "resource_id": resource_id, + "lifecycle_transition_type": lifecycle_transition_type + } + }] try: - message_list = [{ - "ttl": CONF.notification.ttl, - "body": { - "lifecycle_action_token": lifecycle_action_token, - "node_id": node_id, - "resource_id": resource_id, - "lifecycle_transition_type": lifecycle_transition_type - } - }] - if not self.zaqar().queue_exists(self.queue_name): kwargs = { "_max_messages_post_size": @@ -91,5 +101,6 @@ class Message(object): return self.zaqar().message_post(self.queue_name, message_list) except exception.InternalError as ex: - raise exception.EResourceCreation(type='queue', - message=six.text_type(ex)) + raise exception.EResourceCreation( + type='queue', + message=six.text_type(ex)) diff --git a/senlin/tests/unit/engine/notifications/test_message.py b/senlin/tests/unit/engine/notifications/test_message.py index 4180bfef4..57ef7e5fa 100755 --- a/senlin/tests/unit/engine/notifications/test_message.py +++ b/senlin/tests/unit/engine/notifications/test_message.py @@ -15,6 +15,7 @@ import mock from oslo_config import cfg +from senlin.common import exception from senlin.drivers import base as driver_base from senlin.engine.notifications import message as mmod from senlin.tests.unit.common import base @@ -121,3 +122,28 @@ class TestMessage(base.SenlinTestCase): } }] mock_zc.message_post.assert_called_once_with(queue_name, message_list) + + @mock.patch.object(mmod.Message, 'zaqar') + def test_post_lifecycle_hook_message_queue_retry(self, mock_zaqar): + cfg.CONF.set_override('max_message_size', 8192, 'notification') + mock_zc = mock.Mock() + mock_zaqar.return_value = mock_zc + queue_name = 'my_queue' + message = mmod.Message(queue_name) + mock_zc.queue_exists.return_value = True + test_exception = exception.EResourceCreation(type='queue', + message="test") + mock_zc.message_post.side_effect = [ + test_exception, test_exception, None] + + lifecycle_action_token = 'ACTION_ID' + node_id = 'NODE_ID' + resource_id = 'RESOURCE_ID' + lifecycle_transition_type = 'TYPE' + + message.post_lifecycle_hook_message(lifecycle_action_token, node_id, + resource_id, + lifecycle_transition_type) + + mock_zc.queue_create.assert_not_called() + self.assertEqual(3, mock_zc.message_post.call_count)