Add retry logic on periodic_sync to stable/liberty

Change-Id: I15dadb0d90c7cbdd5fcead9e64398c75be3304b5
Closes-Bug: #1523691
This commit is contained in:
Federico Ceratto 2015-12-23 14:20:33 +00:00
parent 6c200765f1
commit 9b5f4c1f3d
7 changed files with 365 additions and 46 deletions

View File

@ -54,6 +54,10 @@ OPTS = [
cfg.IntOpt('periodic-sync-seconds', default=21600,
help='Zones Updated within last N seconds will be syncd. Use '
'None to sync all zones.'),
cfg.IntOpt('periodic-sync-max-attempts', default=3,
help='Number of attempts to update a zone during sync'),
cfg.IntOpt('periodic-sync-retry-interval', default=30,
help='Interval between zone update attempts during sync'),
cfg.StrOpt('cache-driver', default='memcache',
help='The cache driver to use'),
]

View File

@ -13,6 +13,7 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import time
from contextlib import contextmanager
from decimal import Decimal
@ -61,6 +62,20 @@ def wrap_backend_call():
raise exceptions.Backend('Unknown backend failure: %r' % e)
def _constant_retries(num_attempts, sleep_interval):
"""Generate a sequence of False terminated by a True
Sleep `sleep_interval` between cycles but not at the end.
"""
for cnt in range(num_attempts):
if cnt != 0:
LOG.debug(_LI("Executing retry n. %d"), cnt)
if cnt < num_attempts - 1:
yield False
time.sleep(sleep_interval)
else:
yield True
class Service(service.RPCService, coordination.CoordinationMixin,
service.Service):
"""
@ -91,6 +106,10 @@ class Service(service.RPCService, coordination.CoordinationMixin,
self.retry_interval = CONF['service:pool_manager'].poll_retry_interval
self.max_retries = CONF['service:pool_manager'].poll_max_retries
self.delay = CONF['service:pool_manager'].poll_delay
self._periodic_sync_max_attempts = \
CONF['service:pool_manager'].periodic_sync_max_attempts
self._periodic_sync_retry_interval = \
CONF['service:pool_manager'].periodic_sync_retry_interval
# Create the necessary Backend instances for each target
self._setup_target_backends()
@ -200,18 +219,10 @@ class Service(service.RPCService, coordination.CoordinationMixin,
LOG.exception(_LE('An unhandled exception in periodic '
'recovery occurred'))
def periodic_sync(self):
"""Periodically sync all the zones that are not in ERROR status
Runs only on the pool leader
:return: None
def _fetch_healthy_zones(self, context):
"""Fetch all zones not in error
:return: :class:`ZoneList` zones
"""
if not self._pool_election.is_leader:
return
context = DesignateContext.get_admin_context(all_tenants=True)
LOG.debug("Starting Periodic Synchronization")
criterion = {
'pool_id': CONF['service:pool_manager'].pool_id,
'status': '!%s' % ERROR_STATUS
@ -225,23 +236,51 @@ class Service(service.RPCService, coordination.CoordinationMixin,
current = utils.increment_serial()
criterion['serial'] = ">%s" % (current - periodic_sync_seconds)
domains = self.central_api.find_domains(context, criterion)
zones = self.central_api.find_domains(context, criterion)
return zones
try:
for domain in domains:
# TODO(kiall): If the zone was created within the last
# periodic_sync_seconds, attempt to recreate
# to fill in targets which may have failed.
success = self.update_domain(context, domain)
if not success:
self.central_api.update_status(
context, domain.id, ERROR_STATUS, domain.serial)
def periodic_sync(self):
"""Periodically sync all the zones that are not in ERROR status
Runs only on the pool leader
:return: None
"""
if not self._pool_election.is_leader:
return
except Exception:
LOG.exception(_LE('An unhandled exception in periodic '
'synchronization occurred.'))
self.central_api.update_status(context, domain.id, ERROR_STATUS,
domain.serial)
context = DesignateContext.get_admin_context(all_tenants=True)
LOG.debug("Starting Periodic Synchronization")
context = DesignateContext.get_admin_context(all_tenants=True)
zones = self._fetch_healthy_zones(context)
zones = set(zones)
# TODO(kiall): If the zone was created within the last
# periodic_sync_seconds, attempt to recreate
# to fill in targets which may have failed.
retry_gen = _constant_retries(
self._periodic_sync_max_attempts,
self._periodic_sync_retry_interval
)
for is_last_cycle in retry_gen:
zones_in_error = []
for zone in zones:
try:
success = self.update_domain(context, zone)
if not success:
zones_in_error.append(zone)
except Exception:
LOG.exception(_LE('An unhandled exception in periodic '
'synchronization occurred.'))
zones_in_error.append(zone)
if not zones_in_error:
return
zones = zones_in_error
for zone in zones_in_error:
self.central_api.update_status(context, zone.id, ERROR_STATUS,
zone.serial)
# Standard Create/Update/Delete Methods

View File

@ -13,6 +13,7 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import uuid
import oslo_messaging as messaging
@ -24,9 +25,13 @@ from mock import patch
from designate import exceptions
from designate import objects
from designate.backend import impl_fake
from designate.storage.impl_sqlalchemy import tables
from designate.central import rpcapi as central_rpcapi
from designate.mdns import rpcapi as mdns_rpcapi
from designate.tests.test_pool_manager import PoolManagerTestCase
import designate.pool_manager.service as pm_module
LOG = logging.getLogger(__name__)
class PoolManagerServiceNoopTest(PoolManagerTestCase):
@ -119,7 +124,7 @@ class PoolManagerServiceNoopTest(PoolManagerTestCase):
def _build_domains(self, n, action, status):
return [
self._build_domain("zone%02X.example" % cnt, action,
status, id=str(uuid.uuid4()))
status, id=str(uuid.uuid4()))
for cnt in range(n)
]
@ -475,7 +480,7 @@ class PoolManagerServiceNoopTest(PoolManagerTestCase):
mock_cent_update_status, *a):
self.service.update_domain = Mock()
mock_find_domains.return_value = self._build_domains(2, 'UPDATE',
'PENDING')
'PENDING')
self.service.periodic_sync()
self.assertEqual(1, mock_find_domains.call_count)
@ -484,36 +489,166 @@ class PoolManagerServiceNoopTest(PoolManagerTestCase):
self.assertEqual(2, self.service.update_domain.call_count)
self.assertEqual(0, mock_cent_update_status.call_count)
@patch.object(pm_module.time, 'sleep')
@patch.object(mdns_rpcapi.MdnsAPI, 'notify_zone_changed')
@patch.object(central_rpcapi.CentralAPI, 'update_status')
@patch.object(central_rpcapi.CentralAPI, 'find_domains')
def test_periodic_sync_with_failing_update(self, mock_find_domains,
mock_cent_update_status, *a):
def test_periodic_sync_with_failing_update(
self, mock_find_domains, mock_cent_update_status, *mocks):
self.service.update_domain = Mock(return_value=False) # fail update
mock_find_domains.return_value = self._build_domains(3, 'UPDATE',
'PENDING')
'PENDING')
self.service.periodic_sync()
self.assertEqual(1, mock_find_domains.call_count)
criterion = mock_find_domains.call_args_list[0][0][1]
self.assertEqual('!ERROR', criterion['status'])
# all zones are now in ERROR status
self.assertEqual(3, self.service.update_domain.call_count)
# 3 zones, all failing, with 3 attempts: 9 calls
self.assertEqual(9, self.service.update_domain.call_count)
# the zones have been put in ERROR status
self.assertEqual(3, mock_cent_update_status.call_count)
@patch.object(pm_module.time, 'sleep')
@patch.object(mdns_rpcapi.MdnsAPI, 'notify_zone_changed')
@patch.object(central_rpcapi.CentralAPI, 'update_status')
@patch.object(central_rpcapi.CentralAPI, 'find_domains')
def test_periodic_sync_with_failing_update_with_exception(
self, mock_find_domains, mock_cent_update_status, *a):
self, mock_find_domains, mock_cent_update_status, *mocks):
self.service.update_domain = Mock(side_effect=Exception)
mock_find_domains.return_value = self._build_domains(3, 'UPDATE',
'PENDING')
'PENDING')
self.service.periodic_sync()
self.assertEqual(1, mock_find_domains.call_count)
criterion = mock_find_domains.call_args_list[0][0][1]
self.assertEqual('!ERROR', criterion['status'])
# the first updated zone is now in ERROR status
self.assertEqual(1, self.service.update_domain.call_count)
self.assertEqual(1, mock_cent_update_status.call_count)
# 3 zones, all failing, with 3 attempts: 9 calls
self.assertEqual(9, self.service.update_domain.call_count)
# the zones have been put in ERROR status
self.assertEqual(3, mock_cent_update_status.call_count)
# Periodic recovery
@patch.object(pm_module.time, 'sleep')
@patch.object(mdns_rpcapi.MdnsAPI, 'notify_zone_changed')
@patch.object(central_rpcapi.CentralAPI, 'update_status')
def test_periodic_recovery(self, mock_find_domains,
mock_cent_update_status, *mocks):
def mock_get_failed_domains(ctx, action):
if action == pm_module.DELETE_ACTION:
return self._build_domains(3, 'DELETE', 'ERROR')
if action == pm_module.CREATE_ACTION:
return self._build_domains(4, 'CREATE', 'ERROR')
if action == pm_module.UPDATE_ACTION:
return self._build_domains(5, 'UPDATE', 'ERROR')
self.service._get_failed_domains = mock_get_failed_domains
self.service.delete_domain = Mock()
self.service.create_domain = Mock()
self.service.update_domain = Mock()
self.service.periodic_recovery()
self.assertEqual(3, self.service.delete_domain.call_count)
self.assertEqual(4, self.service.create_domain.call_count)
self.assertEqual(5, self.service.update_domain.call_count)
class PoolManagerServiceEndToEndTest(PoolManagerServiceNoopTest):
def setUp(self):
super(PoolManagerServiceEndToEndTest, self).setUp()
def _fetch_all_domains(self):
"""Fetch all zones including deleted ones
"""
query = tables.zones.select()
return self.storage.session.execute(query).fetchall()
def _log_all_domains(self, zones, msg=None):
"""Log out a summary of zones
"""
if msg:
LOG.debug("--- %s ---" % msg)
cols = ('name', 'status', 'action', 'deleted', 'deleted_at',
'parent_domain_id')
tpl = "%-35s | %-11s | %-11s | %-32s | %-20s | %s"
LOG.debug(tpl % cols)
for z in zones:
LOG.debug(tpl % tuple(z[k] for k in cols))
def _assert_count_all_domains(self, n):
"""Assert count ALL zones including deleted ones
"""
zones = self._fetch_all_domains()
if len(zones) == n:
return
msg = "failed: %d zones expected, %d found" % (n, len(zones))
self._log_all_domains(zones, msg=msg)
raise Exception("Unexpected number of zones")
def _assert_num_failed_domains(self, action, n):
zones = self.service._get_failed_domains(
self.admin_context, action)
if len(zones) != n:
LOG.error("Expected %d failed zones, got %d", n, len(zones))
self._log_all_domains(zones, msg='listing zones')
self.assertEqual(n, len(zones))
def _assert_num_healthy_domains(self, action, n):
criterion = {
'action': action,
'pool_id': pm_module.CONF['service:pool_manager'].pool_id,
'status': '!%s' % pm_module.ERROR_STATUS
}
zones = self.service.central_api.find_domains(self.admin_context,
criterion)
if len(zones) != n:
LOG.error("Expected %d healthy zones, got %d", n, len(zones))
self._log_all_domains(zones, msg='listing zones')
self.assertEqual(n, len(zones))
@patch.object(mdns_rpcapi.MdnsAPI, 'notify_zone_changed')
def test_periodic_sync_and_recovery(
self, mock_cent_update_status, *a):
# Periodic sync + recovery
self.service._periodic_sync_retry_interval = 0
# Create healthy zones, run a periodic sync that will fail
self.create_domain(name='created.example.com.')
self._assert_num_healthy_domains(pm_module.CREATE_ACTION, 1)
z = self.create_domain(name='updated.example.net.')
z.email = 'info@example.net'
self.service.central_api.update_domain(self.admin_context, z)
self._assert_num_healthy_domains(pm_module.UPDATE_ACTION, 1)
with patch.object(self.service, '_update_domain_on_target',
return_value=False):
self.service.periodic_sync()
zones = self.service._fetch_healthy_zones(self.admin_context)
self.assertEqual(0, len(zones))
self._assert_num_failed_domains(pm_module.CREATE_ACTION, 1)
self._assert_num_failed_domains(pm_module.UPDATE_ACTION, 1)
# Now run a periodic_recovery that will fix the zones
backends = self.service.target_backends
for tid in self.service.target_backends:
backends[tid].create_domain = Mock()
backends[tid].update_domain = Mock()
backends[tid].delete_domain = Mock()
self.service.periodic_recovery()
# There are 2 pool targets in use
for backend in self.service.target_backends.itervalues():
self.assertEqual(1, backend.create_domain.call_count)
self.assertEqual(1, backend.update_domain.call_count)

View File

@ -0,0 +1,54 @@
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Author: Federico Ceratto <federico.ceratto@hpe.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Unit test utilities
"""
import six
class RoObject(object):
"""Read-only object: raise exception on unexpected
__setitem__ or __setattr__
"""
def __init__(self, d=None, **kw):
if d:
kw.update(d)
self.__dict__.update(kw)
def __getitem__(self, k):
try:
return self.__dict__[k]
except KeyError:
raise NotImplementedError(
"Attempt to perform __getitem__"
" %r on RoObject %r" % (k, self.__dict__)
)
def __setitem__(self, k, v):
raise NotImplementedError(
"Attempt to perform __setitem__ or __setattr__"
" %r on RoObject %r" % (k, self.__dict__)
)
def __setattr__(self, k, v):
self.__setitem__(k, v)
def __iter__(self):
for k in six.iterkeys(self.__dict__):
yield k, self.__dict__[k]

View File

@ -15,24 +15,19 @@
# under the License.
from mock import Mock
from mock import MagicMock
from mock import patch
from oslotest import base as test
from designate import exceptions
from designate import objects
from designate.pool_manager.service import Service
import designate.pool_manager.service as pm_module
from designate.tests.unit import RoObject
class PoolManagerTest(test.BaseTestCase):
class PoolManagerInitTest(test.BaseTestCase):
def __setUp(self):
super(PoolManagerTest, self).setUp()
def test_init_no_pool_targets(self):
with patch.object(objects.Pool, 'from_config',
return_value=MagicMock()):
self.assertRaises(exceptions.NoPoolTargetsConfigured, Service)
def test_init(self):
with patch.object(objects.Pool, 'from_config',
return_value=Mock()):
@ -56,3 +51,90 @@ class PoolManagerTest(test.BaseTestCase):
call2 = pm.tg.add_timer.call_args_list[1][0]
self.assertEqual(1800, call2[0])
self.assertEqual(1800, call2[-1])
def test_constant_retries(self):
with patch.object(pm_module.time, 'sleep') as mock_zzz:
gen = pm_module._constant_retries(5, 2)
out = list(gen)
self.assertEqual(
[False, False, False, False, True],
out
)
self.assertEqual(4, mock_zzz.call_count)
mock_zzz.assert_called_with(2)
class PoolManagerTest(test.BaseTestCase):
@patch.object(pm_module.DesignateContext, 'get_admin_context')
@patch.object(pm_module.central_api.CentralAPI, 'get_instance')
@patch.object(objects.Pool, 'from_config')
@patch.object(Service, '_setup_target_backends')
def setUp(self, *mocks):
super(PoolManagerTest, self).setUp()
self.pm = Service()
self.pm.pool.targets = ()
self.pm.tg.add_timer = Mock()
self.pm._pool_election = Mock()
self.pm.target_backends = {}
def test_get_failed_domains(self, *mocks):
with patch.object(self.pm.central_api, 'find_domains') as \
mock_find_domains:
self.pm._get_failed_domains('ctx', pm_module.DELETE_ACTION)
mock_find_domains.assert_called_once_with(
'ctx', {'action': 'DELETE', 'status': 'ERROR', 'pool_id':
'794ccc2c-d751-44fe-b57f-8894c9f5c842'})
@patch.object(pm_module.DesignateContext, 'get_admin_context')
def test_periodic_recover(self, mock_get_ctx, *mocks):
z = RoObject(name='a_domain')
def mock_get_failed_domains(ctx, action):
if action == pm_module.DELETE_ACTION:
return [z] * 3
if action == pm_module.CREATE_ACTION:
return [z] * 4
if action == pm_module.UPDATE_ACTION:
return [z] * 5
self.pm._get_failed_domains = mock_get_failed_domains
self.pm.delete_domain = Mock()
self.pm.create_domain = Mock()
self.pm.update_domain = Mock()
mock_ctx = mock_get_ctx.return_value
self.pm.periodic_recovery()
self.pm.delete_domain.assert_called_with(mock_ctx, z)
self.assertEqual(3, self.pm.delete_domain.call_count)
self.pm.create_domain.assert_called_with(mock_ctx, z)
self.assertEqual(4, self.pm.create_domain.call_count)
self.pm.update_domain.assert_called_with(mock_ctx, z)
self.assertEqual(5, self.pm.update_domain.call_count)
@patch.object(pm_module.DesignateContext, 'get_admin_context')
def test_periodic_recover_exception(self, mock_get_ctx, *mocks):
z = RoObject(name='a_domain')
# Raise an exception half through the recovery
def mock_get_failed_domains(ctx, action):
if action == pm_module.DELETE_ACTION:
return [z] * 3
if action == pm_module.CREATE_ACTION:
return [z] * 4
self.pm._get_failed_domains = mock_get_failed_domains
self.pm.delete_domain = Mock()
self.pm.create_domain = Mock(side_effect=Exception('oops'))
self.pm.update_domain = Mock()
mock_ctx = mock_get_ctx.return_value
self.pm.periodic_recovery()
self.pm.delete_domain.assert_called_with(mock_ctx, z)
self.assertEqual(3, self.pm.delete_domain.call_count)
self.pm.create_domain.assert_called_with(mock_ctx, z)
self.assertEqual(1, self.pm.create_domain.call_count)
self.assertEqual(0, self.pm.update_domain.call_count)

View File

@ -285,6 +285,11 @@ debug = False
# Zones Updated within last N seconds will be syncd. Use None to sync all zones
#periodic_sync_seconds = None
# Perform multiple update attempts during periodic_sync
#periodic_sync_max_attempts = 3
#periodic_sync_retry_interval = 30
# The cache driver to use
#cache_driver = memcache