Merge "Add retry logic to post_lifecycle_hook_message"
This commit is contained in:
commit
55421aa987
|
@ -34,4 +34,5 @@ six>=1.10.0 # MIT
|
|||
SQLAlchemy!=1.1.5,!=1.1.6,!=1.1.7,!=1.1.8,>=1.0.10 # MIT
|
||||
sqlalchemy-migrate>=0.11.0 # Apache-2.0
|
||||
stevedore>=1.20.0 # Apache-2.0
|
||||
tenacity>=4.9.0 # Apache-2.0
|
||||
WebOb>=1.7.1 # MIT
|
||||
|
|
|
@ -14,6 +14,7 @@ from oslo_config import cfg
|
|||
from oslo_context import context as oslo_context
|
||||
from oslo_log import log as logging
|
||||
import six
|
||||
import tenacity
|
||||
|
||||
from senlin.common import context as senlin_context
|
||||
from senlin.common import exception
|
||||
|
@ -24,6 +25,11 @@ LOG = logging.getLogger(__name__)
|
|||
|
||||
CONF = cfg.CONF
|
||||
|
||||
RETRY_ATTEMPTS = 3
|
||||
RETRY_INITIAL_DELAY = 1
|
||||
RETRY_BACKOFF = 1
|
||||
RETRY_MAX = 3
|
||||
|
||||
|
||||
class Message(object):
|
||||
"""Zaqar message type of notification."""
|
||||
|
@ -67,19 +73,23 @@ class Message(object):
|
|||
|
||||
return params
|
||||
|
||||
@tenacity.retry(
|
||||
retry=tenacity.retry_if_exception_type(exception.EResourceCreation),
|
||||
wait=tenacity.wait_incrementing(
|
||||
RETRY_INITIAL_DELAY, RETRY_BACKOFF, RETRY_MAX),
|
||||
stop=tenacity.stop_after_attempt(RETRY_ATTEMPTS))
|
||||
def post_lifecycle_hook_message(self, lifecycle_action_token, node_id,
|
||||
resource_id, lifecycle_transition_type):
|
||||
message_list = [{
|
||||
"ttl": CONF.notification.ttl,
|
||||
"body": {
|
||||
"lifecycle_action_token": lifecycle_action_token,
|
||||
"node_id": node_id,
|
||||
"resource_id": resource_id,
|
||||
"lifecycle_transition_type": lifecycle_transition_type
|
||||
}
|
||||
}]
|
||||
try:
|
||||
message_list = [{
|
||||
"ttl": CONF.notification.ttl,
|
||||
"body": {
|
||||
"lifecycle_action_token": lifecycle_action_token,
|
||||
"node_id": node_id,
|
||||
"resource_id": resource_id,
|
||||
"lifecycle_transition_type": lifecycle_transition_type
|
||||
}
|
||||
}]
|
||||
|
||||
if not self.zaqar().queue_exists(self.queue_name):
|
||||
kwargs = {
|
||||
"_max_messages_post_size":
|
||||
|
@ -91,5 +101,6 @@ class Message(object):
|
|||
|
||||
return self.zaqar().message_post(self.queue_name, message_list)
|
||||
except exception.InternalError as ex:
|
||||
raise exception.EResourceCreation(type='queue',
|
||||
message=six.text_type(ex))
|
||||
raise exception.EResourceCreation(
|
||||
type='queue',
|
||||
message=six.text_type(ex))
|
||||
|
|
|
@ -15,6 +15,7 @@ import mock
|
|||
|
||||
from oslo_config import cfg
|
||||
|
||||
from senlin.common import exception
|
||||
from senlin.drivers import base as driver_base
|
||||
from senlin.engine.notifications import message as mmod
|
||||
from senlin.tests.unit.common import base
|
||||
|
@ -121,3 +122,28 @@ class TestMessage(base.SenlinTestCase):
|
|||
}
|
||||
}]
|
||||
mock_zc.message_post.assert_called_once_with(queue_name, message_list)
|
||||
|
||||
@mock.patch.object(mmod.Message, 'zaqar')
|
||||
def test_post_lifecycle_hook_message_queue_retry(self, mock_zaqar):
|
||||
cfg.CONF.set_override('max_message_size', 8192, 'notification')
|
||||
mock_zc = mock.Mock()
|
||||
mock_zaqar.return_value = mock_zc
|
||||
queue_name = 'my_queue'
|
||||
message = mmod.Message(queue_name)
|
||||
mock_zc.queue_exists.return_value = True
|
||||
test_exception = exception.EResourceCreation(type='queue',
|
||||
message="test")
|
||||
mock_zc.message_post.side_effect = [
|
||||
test_exception, test_exception, None]
|
||||
|
||||
lifecycle_action_token = 'ACTION_ID'
|
||||
node_id = 'NODE_ID'
|
||||
resource_id = 'RESOURCE_ID'
|
||||
lifecycle_transition_type = 'TYPE'
|
||||
|
||||
message.post_lifecycle_hook_message(lifecycle_action_token, node_id,
|
||||
resource_id,
|
||||
lifecycle_transition_type)
|
||||
|
||||
mock_zc.queue_create.assert_not_called()
|
||||
self.assertEqual(3, mock_zc.message_post.call_count)
|
||||
|
|
Loading…
Reference in New Issue