Merge "Add Pool Manager tests"
This commit is contained in:
commit
3649384d3b
|
@ -103,7 +103,7 @@ class Service(service.RPCService, coordination.CoordinationMixin,
|
|||
# Fetch an instance of the Backend class, passing in the options
|
||||
# and masters
|
||||
self.target_backends[target.id] = backend.get_backend(
|
||||
target.type, target)
|
||||
target.type, target)
|
||||
|
||||
LOG.info(_LI('%d targets setup'), len(self.pool.targets))
|
||||
|
||||
|
@ -165,35 +165,40 @@ class Service(service.RPCService, coordination.CoordinationMixin,
|
|||
def mdns_api(self):
|
||||
return mdns_api.MdnsAPI.get_instance()
|
||||
|
||||
def _get_admin_context_all_tenants(self):
|
||||
return DesignateContext.get_admin_context(all_tenants=True)
|
||||
|
||||
# Periodioc Tasks
|
||||
def periodic_recovery(self):
|
||||
"""
|
||||
Runs only on the pool leader
|
||||
:return: None
|
||||
"""
|
||||
# NOTE(kiall): Only run this periodic task on the pool leader
|
||||
if not self._pool_election.is_leader:
|
||||
return
|
||||
|
||||
context = DesignateContext.get_admin_context(all_tenants=True)
|
||||
context = self._get_admin_context_all_tenants()
|
||||
LOG.debug("Starting Periodic Recovery")
|
||||
|
||||
try:
|
||||
# Handle Deletion Failures
|
||||
zones = self._get_failed_zones(context, DELETE_ACTION)
|
||||
|
||||
LOG.info(_LI("periodic_recovery:delete_zone needed on %d zones"),
|
||||
len(zones))
|
||||
for zone in zones:
|
||||
self.delete_zone(context, zone)
|
||||
|
||||
# Handle Creation Failures
|
||||
zones = self._get_failed_zones(context, CREATE_ACTION)
|
||||
|
||||
LOG.info(_LI("periodic_recovery:create_zone needed on %d zones"),
|
||||
len(zones))
|
||||
for zone in zones:
|
||||
self.create_zone(context, zone)
|
||||
|
||||
# Handle Update Failures
|
||||
zones = self._get_failed_zones(context, UPDATE_ACTION)
|
||||
|
||||
LOG.info(_LI("periodic_recovery:update_zone needed on %d zones"),
|
||||
len(zones))
|
||||
for zone in zones:
|
||||
self.update_zone(context, zone)
|
||||
|
||||
|
@ -209,24 +214,9 @@ class Service(service.RPCService, coordination.CoordinationMixin,
|
|||
if not self._pool_election.is_leader:
|
||||
return
|
||||
|
||||
context = DesignateContext.get_admin_context(all_tenants=True)
|
||||
|
||||
LOG.debug("Starting Periodic Synchronization")
|
||||
|
||||
criterion = {
|
||||
'pool_id': CONF['service:pool_manager'].pool_id,
|
||||
'status': '!%s' % ERROR_STATUS
|
||||
}
|
||||
|
||||
periodic_sync_seconds = \
|
||||
CONF['service:pool_manager'].periodic_sync_seconds
|
||||
|
||||
if periodic_sync_seconds is not None:
|
||||
# Generate the current serial, will provide a UTC Unix TS.
|
||||
current = utils.increment_serial()
|
||||
criterion['serial'] = ">%s" % (current - periodic_sync_seconds)
|
||||
|
||||
zones = self.central_api.find_zones(context, criterion)
|
||||
context = self._get_admin_context_all_tenants()
|
||||
zones = self._fetch_healthy_zones(context)
|
||||
|
||||
try:
|
||||
for zone in zones:
|
||||
|
@ -269,6 +259,7 @@ class Service(service.RPCService, coordination.CoordinationMixin,
|
|||
if self._exceed_or_meet_threshold(results.count(True)):
|
||||
LOG.debug('Consensus reached for creating zone %(zone)s '
|
||||
'on pool targets' % {'zone': zone.name})
|
||||
# The zone status will be updated asyncronously by MiniDNS
|
||||
|
||||
else:
|
||||
|
||||
|
@ -276,7 +267,7 @@ class Service(service.RPCService, coordination.CoordinationMixin,
|
|||
' on pool targets') % {'zone': zone.name})
|
||||
|
||||
self.central_api.update_status(
|
||||
context, zone.id, ERROR_STATUS, zone.serial)
|
||||
context, zone.id, ERROR_STATUS, zone.serial)
|
||||
|
||||
return
|
||||
|
||||
|
@ -314,13 +305,14 @@ class Service(service.RPCService, coordination.CoordinationMixin,
|
|||
return True
|
||||
except Exception:
|
||||
retries += 1
|
||||
LOG.exception(_LE("Failed to create zone %(zone)s on "
|
||||
"target %(target)s on attempt %(attempt)d"),
|
||||
LOG.exception(_LE(
|
||||
"Failed to create zone %(zone)s on "
|
||||
"target %(target)s on attempt %(attempt)d"),
|
||||
{
|
||||
'zone': zone.name,
|
||||
'target': target.id,
|
||||
'attempt': retries
|
||||
})
|
||||
'zone': zone.name,
|
||||
'target': target.id,
|
||||
'attempt': retries
|
||||
}) # noqa
|
||||
time.sleep(self.retry_interval)
|
||||
|
||||
return False
|
||||
|
@ -351,6 +343,8 @@ class Service(service.RPCService, coordination.CoordinationMixin,
|
|||
LOG.debug('Consensus reached for updating zone %(zone)s '
|
||||
'on pool targets' % {'zone': zone.name})
|
||||
|
||||
# The zone status will be updated asyncronously by MiniDNS
|
||||
|
||||
# Send a NOTIFY to each also-notifies
|
||||
for also_notify in self.pool.also_notifies:
|
||||
self._update_zone_on_also_notify(context, also_notify, zone)
|
||||
|
@ -427,14 +421,14 @@ class Service(service.RPCService, coordination.CoordinationMixin,
|
|||
'on pool targets' % {'zone': zone.name})
|
||||
|
||||
self.central_api.update_status(
|
||||
context, zone.id, SUCCESS_STATUS, zone.serial)
|
||||
context, zone.id, SUCCESS_STATUS, zone.serial)
|
||||
|
||||
else:
|
||||
LOG.warning(_LW('Consensus not reached for deleting zone %(zone)s'
|
||||
' on pool targets') % {'zone': zone.name})
|
||||
|
||||
self.central_api.update_status(
|
||||
context, zone.id, ERROR_STATUS, zone.serial)
|
||||
context, zone.id, ERROR_STATUS, zone.serial)
|
||||
|
||||
def _delete_zone_on_target(self, context, target, zone):
|
||||
"""
|
||||
|
@ -455,12 +449,13 @@ class Service(service.RPCService, coordination.CoordinationMixin,
|
|||
return True
|
||||
except Exception:
|
||||
retries += 1
|
||||
LOG.exception(_LE("Failed to delete zone %(zone)s on "
|
||||
"target %(target)s on attempt %(attempt)d"),
|
||||
LOG.exception(_LE(
|
||||
"Failed to delete zone %(zone)s on "
|
||||
"target %(target)s on attempt %(attempt)d"),
|
||||
{
|
||||
'zone': zone.name,
|
||||
'target': target.id,
|
||||
'attempt': retries
|
||||
'zone': zone.name,
|
||||
'target': target.id,
|
||||
'attempt': retries
|
||||
})
|
||||
time.sleep(self.retry_interval)
|
||||
|
||||
|
@ -551,6 +546,26 @@ class Service(service.RPCService, coordination.CoordinationMixin,
|
|||
}
|
||||
return self.central_api.find_zones(context, criterion)
|
||||
|
||||
def _fetch_healthy_zones(self, context):
|
||||
"""Fetch all zones not in error
|
||||
:return: :class:`ZoneList` zones
|
||||
"""
|
||||
criterion = {
|
||||
'pool_id': CONF['service:pool_manager'].pool_id,
|
||||
'status': '!%s' % ERROR_STATUS
|
||||
}
|
||||
|
||||
periodic_sync_seconds = \
|
||||
CONF['service:pool_manager'].periodic_sync_seconds
|
||||
|
||||
if periodic_sync_seconds is not None:
|
||||
# Generate the current serial, will provide a UTC Unix TS.
|
||||
current = utils.increment_serial()
|
||||
criterion['serial'] = ">%s" % (current - periodic_sync_seconds)
|
||||
|
||||
zones = self.central_api.find_zones(context, criterion)
|
||||
return zones
|
||||
|
||||
@staticmethod
|
||||
def _get_destination(nameserver):
|
||||
return '%s:%s' % (nameserver.host, nameserver.port)
|
||||
|
|
|
@ -13,6 +13,8 @@
|
|||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import logging
|
||||
import uuid
|
||||
|
||||
import oslo_messaging as messaging
|
||||
|
@ -26,7 +28,11 @@ from designate import objects
|
|||
from designate.backend import impl_fake
|
||||
from designate.central import rpcapi as central_rpcapi
|
||||
from designate.mdns import rpcapi as mdns_rpcapi
|
||||
from designate.storage.impl_sqlalchemy import tables
|
||||
from designate.tests.test_pool_manager import PoolManagerTestCase
|
||||
import designate.pool_manager.service as pm_module
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class PoolManagerServiceNoopTest(PoolManagerTestCase):
|
||||
|
@ -470,6 +476,8 @@ class PoolManagerServiceNoopTest(PoolManagerTestCase):
|
|||
self.assertEqual(1, self.service._update_zone_on_also_notify.call_count) # noqa
|
||||
self.assertEqual(2, mock_mdns_poll.call_count)
|
||||
|
||||
# Periodic sync
|
||||
|
||||
@patch.object(mdns_rpcapi.MdnsAPI, 'notify_zone_changed')
|
||||
@patch.object(central_rpcapi.CentralAPI, 'update_status')
|
||||
@patch.object(central_rpcapi.CentralAPI, 'find_zones')
|
||||
|
@ -519,3 +527,123 @@ class PoolManagerServiceNoopTest(PoolManagerTestCase):
|
|||
# the first updated zone is now in ERROR status
|
||||
self.assertEqual(1, self.service.update_zone.call_count)
|
||||
self.assertEqual(1, mock_cent_update_status.call_count)
|
||||
|
||||
# Periodic recovery
|
||||
|
||||
@patch.object(mdns_rpcapi.MdnsAPI, 'notify_zone_changed')
|
||||
@patch.object(central_rpcapi.CentralAPI, 'update_status')
|
||||
def test_periodic_recovery(self, mock_find_zones,
|
||||
mock_cent_update_status, *a):
|
||||
|
||||
def mock_get_failed_zones(ctx, action):
|
||||
if action == pm_module.DELETE_ACTION:
|
||||
return self._build_zones(3, 'DELETE', 'ERROR')
|
||||
if action == pm_module.CREATE_ACTION:
|
||||
return self._build_zones(4, 'CREATE', 'ERROR')
|
||||
if action == pm_module.UPDATE_ACTION:
|
||||
return self._build_zones(5, 'UPDATE', 'ERROR')
|
||||
|
||||
self.service._get_failed_zones = mock_get_failed_zones
|
||||
self.service.delete_zone = Mock()
|
||||
self.service.create_zone = Mock()
|
||||
self.service.update_zone = Mock()
|
||||
|
||||
self.service.periodic_recovery()
|
||||
|
||||
self.assertEqual(3, self.service.delete_zone.call_count)
|
||||
self.assertEqual(4, self.service.create_zone.call_count)
|
||||
self.assertEqual(5, self.service.update_zone.call_count)
|
||||
|
||||
|
||||
class PoolManagerServiceEndToEndTest(PoolManagerServiceNoopTest):
|
||||
|
||||
def setUp(self):
|
||||
super(PoolManagerServiceEndToEndTest, self).setUp()
|
||||
|
||||
def _fetch_all_zones(self):
|
||||
"""Fetch all zones including deleted ones
|
||||
"""
|
||||
query = tables.zones.select()
|
||||
return self.storage.session.execute(query).fetchall()
|
||||
|
||||
def _log_all_zones(self, zones, msg=None):
|
||||
"""Log out a summary of zones
|
||||
"""
|
||||
if msg:
|
||||
LOG.debug("--- %s ---" % msg)
|
||||
cols = ('name', 'status', 'action', 'deleted', 'deleted_at',
|
||||
'parent_zone_id')
|
||||
tpl = "%-35s | %-11s | %-11s | %-32s | %-20s | %s"
|
||||
LOG.debug(tpl % cols)
|
||||
for z in zones:
|
||||
LOG.debug(tpl % tuple(z[k] for k in cols))
|
||||
|
||||
def _assert_count_all_zones(self, n):
|
||||
"""Assert count ALL zones including deleted ones
|
||||
"""
|
||||
zones = self._fetch_all_zones()
|
||||
if len(zones) == n:
|
||||
return
|
||||
|
||||
msg = "failed: %d zones expected, %d found" % (n, len(zones))
|
||||
self._log_all_zones(zones, msg=msg)
|
||||
raise Exception("Unexpected number of zones")
|
||||
|
||||
def _assert_num_failed_zones(self, action, n):
|
||||
zones = self.service._get_failed_zones(
|
||||
self.admin_context, action)
|
||||
if len(zones) != n:
|
||||
LOG.error("Expected %d failed zones, got %d", n, len(zones))
|
||||
self._log_all_zones(zones, msg='listing zones')
|
||||
self.assertEqual(n, len(zones))
|
||||
|
||||
def _assert_num_healthy_zones(self, action, n):
|
||||
criterion = {
|
||||
'action': action,
|
||||
'pool_id': pm_module.CONF['service:pool_manager'].pool_id,
|
||||
'status': '!%s' % pm_module.ERROR_STATUS
|
||||
}
|
||||
zones = self.service.central_api.find_zones(self.admin_context,
|
||||
criterion)
|
||||
if len(zones) != n:
|
||||
LOG.error("Expected %d healthy zones, got %d", n, len(zones))
|
||||
self._log_all_zones(zones, msg='listing zones')
|
||||
self.assertEqual(n, len(zones))
|
||||
|
||||
@patch.object(mdns_rpcapi.MdnsAPI, 'notify_zone_changed')
|
||||
def test_periodic_sync_and_recovery(
|
||||
self, mock_cent_update_status, *a):
|
||||
# Periodic sync + recovery
|
||||
|
||||
# Create healthy zones, run a periodic sync that will fail
|
||||
self.create_zone(name='created.example.com.')
|
||||
self._assert_num_healthy_zones(pm_module.CREATE_ACTION, 1)
|
||||
|
||||
z = self.create_zone(name='updated.example.net.')
|
||||
z.email = 'info@example.net'
|
||||
self.service.central_api.update_zone(self.admin_context, z)
|
||||
self._assert_num_healthy_zones(pm_module.UPDATE_ACTION, 1)
|
||||
|
||||
with patch.object(self.service, '_update_zone_on_target',
|
||||
return_value=False):
|
||||
self.service.periodic_sync()
|
||||
|
||||
zones = self.service._fetch_healthy_zones(self.admin_context)
|
||||
self.assertEqual(0, len(zones))
|
||||
self._assert_num_failed_zones(pm_module.CREATE_ACTION, 1)
|
||||
self._assert_num_failed_zones(pm_module.UPDATE_ACTION, 1)
|
||||
|
||||
# Now run a periodic_recovery that will fix the zones
|
||||
|
||||
backends = self.service.target_backends
|
||||
for tid in self.service.target_backends:
|
||||
backends[tid].create_zone = Mock()
|
||||
backends[tid].update_zone = Mock()
|
||||
backends[tid].delete_zone = Mock()
|
||||
|
||||
self.service.periodic_recovery()
|
||||
|
||||
# There are 2 pool targets in use
|
||||
for backend in self.service.target_backends.itervalues():
|
||||
self.assertEqual(1, backend.create_zone.call_count)
|
||||
self.assertEqual(1, backend.update_zone.call_count)
|
||||
|
|
|
@ -14,6 +14,10 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""
|
||||
Unit tests
|
||||
"""
|
||||
|
||||
from mock import Mock
|
||||
from mock import MagicMock
|
||||
from mock import patch
|
||||
|
@ -22,9 +26,11 @@ from oslotest import base as test
|
|||
from designate import exceptions
|
||||
from designate import objects
|
||||
from designate.pool_manager.service import Service
|
||||
from designate.tests.unit import RoObject
|
||||
import designate.pool_manager.service as pm_module
|
||||
|
||||
|
||||
class PoolManagerTest(test.BaseTestCase):
|
||||
class PoolManagerInitTest(test.BaseTestCase):
|
||||
def __setUp(self):
|
||||
super(PoolManagerTest, self).setUp()
|
||||
|
||||
|
@ -56,3 +62,79 @@ class PoolManagerTest(test.BaseTestCase):
|
|||
call2 = pm.tg.add_timer.call_args_list[1][0]
|
||||
self.assertEqual(1800, call2[0])
|
||||
self.assertEqual(1800, call2[-1])
|
||||
|
||||
|
||||
class PoolManagerTest(test.BaseTestCase):
|
||||
|
||||
@patch.object(pm_module.DesignateContext, 'get_admin_context')
|
||||
@patch.object(pm_module.central_api.CentralAPI, 'get_instance')
|
||||
@patch.object(objects.Pool, 'from_config')
|
||||
@patch.object(Service, '_setup_target_backends')
|
||||
def setUp(self, *mocks):
|
||||
super(PoolManagerTest, self).setUp()
|
||||
self.pm = Service()
|
||||
self.pm.pool.targets = ()
|
||||
self.pm.tg.add_timer = Mock()
|
||||
self.pm._pool_election = Mock()
|
||||
self.pm.target_backends = {}
|
||||
|
||||
def test_get_failed_zones(self, *mocks):
|
||||
with patch.object(self.pm.central_api, 'find_zones') as \
|
||||
mock_find_zones:
|
||||
self.pm._get_failed_zones('ctx', pm_module.DELETE_ACTION)
|
||||
|
||||
mock_find_zones.assert_called_once_with(
|
||||
'ctx', {'action': 'DELETE', 'status': 'ERROR', 'pool_id':
|
||||
'794ccc2c-d751-44fe-b57f-8894c9f5c842'})
|
||||
|
||||
@patch.object(pm_module.DesignateContext, 'get_admin_context')
|
||||
def test_periodic_recover(self, mock_get_ctx, *mocks):
|
||||
z = RoObject(name='a_zone')
|
||||
|
||||
def mock_get_failed_zones(ctx, action):
|
||||
if action == pm_module.DELETE_ACTION:
|
||||
return [z] * 3
|
||||
if action == pm_module.CREATE_ACTION:
|
||||
return [z] * 4
|
||||
if action == pm_module.UPDATE_ACTION:
|
||||
return [z] * 5
|
||||
|
||||
self.pm._get_failed_zones = mock_get_failed_zones
|
||||
self.pm.delete_zone = Mock()
|
||||
self.pm.create_zone = Mock()
|
||||
self.pm.update_zone = Mock()
|
||||
mock_ctx = mock_get_ctx.return_value
|
||||
|
||||
self.pm.periodic_recovery()
|
||||
|
||||
self.pm.delete_zone.assert_called_with(mock_ctx, z)
|
||||
self.assertEqual(3, self.pm.delete_zone.call_count)
|
||||
self.pm.create_zone.assert_called_with(mock_ctx, z)
|
||||
self.assertEqual(4, self.pm.create_zone.call_count)
|
||||
self.pm.update_zone.assert_called_with(mock_ctx, z)
|
||||
self.assertEqual(5, self.pm.update_zone.call_count)
|
||||
|
||||
@patch.object(pm_module.DesignateContext, 'get_admin_context')
|
||||
def test_periodic_recover_exception(self, mock_get_ctx, *mocks):
|
||||
z = RoObject(name='a_zone')
|
||||
# Raise an exception half through the recovery
|
||||
|
||||
def mock_get_failed_zones(ctx, action):
|
||||
if action == pm_module.DELETE_ACTION:
|
||||
return [z] * 3
|
||||
if action == pm_module.CREATE_ACTION:
|
||||
return [z] * 4
|
||||
|
||||
self.pm._get_failed_zones = mock_get_failed_zones
|
||||
self.pm.delete_zone = Mock()
|
||||
self.pm.create_zone = Mock(side_effect=Exception('oops'))
|
||||
self.pm.update_zone = Mock()
|
||||
mock_ctx = mock_get_ctx.return_value
|
||||
|
||||
self.pm.periodic_recovery()
|
||||
|
||||
self.pm.delete_zone.assert_called_with(mock_ctx, z)
|
||||
self.assertEqual(3, self.pm.delete_zone.call_count)
|
||||
self.pm.create_zone.assert_called_with(mock_ctx, z)
|
||||
self.assertEqual(1, self.pm.create_zone.call_count)
|
||||
self.assertEqual(0, self.pm.update_zone.call_count)
|
||||
|
|
Loading…
Reference in New Issue