Merge "Add Pool Manager tests"

This commit is contained in:
Jenkins 2016-01-11 18:11:05 +00:00 committed by Gerrit Code Review
commit 3649384d3b
4 changed files with 263 additions and 38 deletions

View File

@ -103,7 +103,7 @@ class Service(service.RPCService, coordination.CoordinationMixin,
# Fetch an instance of the Backend class, passing in the options
# and masters
self.target_backends[target.id] = backend.get_backend(
target.type, target)
target.type, target)
LOG.info(_LI('%d targets setup'), len(self.pool.targets))
@ -165,35 +165,40 @@ class Service(service.RPCService, coordination.CoordinationMixin,
def mdns_api(self):
return mdns_api.MdnsAPI.get_instance()
def _get_admin_context_all_tenants(self):
return DesignateContext.get_admin_context(all_tenants=True)
# Periodioc Tasks
def periodic_recovery(self):
"""
Runs only on the pool leader
:return: None
"""
# NOTE(kiall): Only run this periodic task on the pool leader
if not self._pool_election.is_leader:
return
context = DesignateContext.get_admin_context(all_tenants=True)
context = self._get_admin_context_all_tenants()
LOG.debug("Starting Periodic Recovery")
try:
# Handle Deletion Failures
zones = self._get_failed_zones(context, DELETE_ACTION)
LOG.info(_LI("periodic_recovery:delete_zone needed on %d zones"),
len(zones))
for zone in zones:
self.delete_zone(context, zone)
# Handle Creation Failures
zones = self._get_failed_zones(context, CREATE_ACTION)
LOG.info(_LI("periodic_recovery:create_zone needed on %d zones"),
len(zones))
for zone in zones:
self.create_zone(context, zone)
# Handle Update Failures
zones = self._get_failed_zones(context, UPDATE_ACTION)
LOG.info(_LI("periodic_recovery:update_zone needed on %d zones"),
len(zones))
for zone in zones:
self.update_zone(context, zone)
@ -209,24 +214,9 @@ class Service(service.RPCService, coordination.CoordinationMixin,
if not self._pool_election.is_leader:
return
context = DesignateContext.get_admin_context(all_tenants=True)
LOG.debug("Starting Periodic Synchronization")
criterion = {
'pool_id': CONF['service:pool_manager'].pool_id,
'status': '!%s' % ERROR_STATUS
}
periodic_sync_seconds = \
CONF['service:pool_manager'].periodic_sync_seconds
if periodic_sync_seconds is not None:
# Generate the current serial, will provide a UTC Unix TS.
current = utils.increment_serial()
criterion['serial'] = ">%s" % (current - periodic_sync_seconds)
zones = self.central_api.find_zones(context, criterion)
context = self._get_admin_context_all_tenants()
zones = self._fetch_healthy_zones(context)
try:
for zone in zones:
@ -269,6 +259,7 @@ class Service(service.RPCService, coordination.CoordinationMixin,
if self._exceed_or_meet_threshold(results.count(True)):
LOG.debug('Consensus reached for creating zone %(zone)s '
'on pool targets' % {'zone': zone.name})
# The zone status will be updated asyncronously by MiniDNS
else:
@ -276,7 +267,7 @@ class Service(service.RPCService, coordination.CoordinationMixin,
' on pool targets') % {'zone': zone.name})
self.central_api.update_status(
context, zone.id, ERROR_STATUS, zone.serial)
context, zone.id, ERROR_STATUS, zone.serial)
return
@ -314,13 +305,14 @@ class Service(service.RPCService, coordination.CoordinationMixin,
return True
except Exception:
retries += 1
LOG.exception(_LE("Failed to create zone %(zone)s on "
"target %(target)s on attempt %(attempt)d"),
LOG.exception(_LE(
"Failed to create zone %(zone)s on "
"target %(target)s on attempt %(attempt)d"),
{
'zone': zone.name,
'target': target.id,
'attempt': retries
})
'zone': zone.name,
'target': target.id,
'attempt': retries
}) # noqa
time.sleep(self.retry_interval)
return False
@ -351,6 +343,8 @@ class Service(service.RPCService, coordination.CoordinationMixin,
LOG.debug('Consensus reached for updating zone %(zone)s '
'on pool targets' % {'zone': zone.name})
# The zone status will be updated asyncronously by MiniDNS
# Send a NOTIFY to each also-notifies
for also_notify in self.pool.also_notifies:
self._update_zone_on_also_notify(context, also_notify, zone)
@ -427,14 +421,14 @@ class Service(service.RPCService, coordination.CoordinationMixin,
'on pool targets' % {'zone': zone.name})
self.central_api.update_status(
context, zone.id, SUCCESS_STATUS, zone.serial)
context, zone.id, SUCCESS_STATUS, zone.serial)
else:
LOG.warning(_LW('Consensus not reached for deleting zone %(zone)s'
' on pool targets') % {'zone': zone.name})
self.central_api.update_status(
context, zone.id, ERROR_STATUS, zone.serial)
context, zone.id, ERROR_STATUS, zone.serial)
def _delete_zone_on_target(self, context, target, zone):
"""
@ -455,12 +449,13 @@ class Service(service.RPCService, coordination.CoordinationMixin,
return True
except Exception:
retries += 1
LOG.exception(_LE("Failed to delete zone %(zone)s on "
"target %(target)s on attempt %(attempt)d"),
LOG.exception(_LE(
"Failed to delete zone %(zone)s on "
"target %(target)s on attempt %(attempt)d"),
{
'zone': zone.name,
'target': target.id,
'attempt': retries
'zone': zone.name,
'target': target.id,
'attempt': retries
})
time.sleep(self.retry_interval)
@ -551,6 +546,26 @@ class Service(service.RPCService, coordination.CoordinationMixin,
}
return self.central_api.find_zones(context, criterion)
def _fetch_healthy_zones(self, context):
"""Fetch all zones not in error
:return: :class:`ZoneList` zones
"""
criterion = {
'pool_id': CONF['service:pool_manager'].pool_id,
'status': '!%s' % ERROR_STATUS
}
periodic_sync_seconds = \
CONF['service:pool_manager'].periodic_sync_seconds
if periodic_sync_seconds is not None:
# Generate the current serial, will provide a UTC Unix TS.
current = utils.increment_serial()
criterion['serial'] = ">%s" % (current - periodic_sync_seconds)
zones = self.central_api.find_zones(context, criterion)
return zones
@staticmethod
def _get_destination(nameserver):
return '%s:%s' % (nameserver.host, nameserver.port)

View File

@ -13,6 +13,8 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import uuid
import oslo_messaging as messaging
@ -26,7 +28,11 @@ from designate import objects
from designate.backend import impl_fake
from designate.central import rpcapi as central_rpcapi
from designate.mdns import rpcapi as mdns_rpcapi
from designate.storage.impl_sqlalchemy import tables
from designate.tests.test_pool_manager import PoolManagerTestCase
import designate.pool_manager.service as pm_module
LOG = logging.getLogger(__name__)
class PoolManagerServiceNoopTest(PoolManagerTestCase):
@ -470,6 +476,8 @@ class PoolManagerServiceNoopTest(PoolManagerTestCase):
self.assertEqual(1, self.service._update_zone_on_also_notify.call_count) # noqa
self.assertEqual(2, mock_mdns_poll.call_count)
# Periodic sync
@patch.object(mdns_rpcapi.MdnsAPI, 'notify_zone_changed')
@patch.object(central_rpcapi.CentralAPI, 'update_status')
@patch.object(central_rpcapi.CentralAPI, 'find_zones')
@ -519,3 +527,123 @@ class PoolManagerServiceNoopTest(PoolManagerTestCase):
# the first updated zone is now in ERROR status
self.assertEqual(1, self.service.update_zone.call_count)
self.assertEqual(1, mock_cent_update_status.call_count)
# Periodic recovery
@patch.object(mdns_rpcapi.MdnsAPI, 'notify_zone_changed')
@patch.object(central_rpcapi.CentralAPI, 'update_status')
def test_periodic_recovery(self, mock_find_zones,
mock_cent_update_status, *a):
def mock_get_failed_zones(ctx, action):
if action == pm_module.DELETE_ACTION:
return self._build_zones(3, 'DELETE', 'ERROR')
if action == pm_module.CREATE_ACTION:
return self._build_zones(4, 'CREATE', 'ERROR')
if action == pm_module.UPDATE_ACTION:
return self._build_zones(5, 'UPDATE', 'ERROR')
self.service._get_failed_zones = mock_get_failed_zones
self.service.delete_zone = Mock()
self.service.create_zone = Mock()
self.service.update_zone = Mock()
self.service.periodic_recovery()
self.assertEqual(3, self.service.delete_zone.call_count)
self.assertEqual(4, self.service.create_zone.call_count)
self.assertEqual(5, self.service.update_zone.call_count)
class PoolManagerServiceEndToEndTest(PoolManagerServiceNoopTest):
def setUp(self):
super(PoolManagerServiceEndToEndTest, self).setUp()
def _fetch_all_zones(self):
"""Fetch all zones including deleted ones
"""
query = tables.zones.select()
return self.storage.session.execute(query).fetchall()
def _log_all_zones(self, zones, msg=None):
"""Log out a summary of zones
"""
if msg:
LOG.debug("--- %s ---" % msg)
cols = ('name', 'status', 'action', 'deleted', 'deleted_at',
'parent_zone_id')
tpl = "%-35s | %-11s | %-11s | %-32s | %-20s | %s"
LOG.debug(tpl % cols)
for z in zones:
LOG.debug(tpl % tuple(z[k] for k in cols))
def _assert_count_all_zones(self, n):
"""Assert count ALL zones including deleted ones
"""
zones = self._fetch_all_zones()
if len(zones) == n:
return
msg = "failed: %d zones expected, %d found" % (n, len(zones))
self._log_all_zones(zones, msg=msg)
raise Exception("Unexpected number of zones")
def _assert_num_failed_zones(self, action, n):
zones = self.service._get_failed_zones(
self.admin_context, action)
if len(zones) != n:
LOG.error("Expected %d failed zones, got %d", n, len(zones))
self._log_all_zones(zones, msg='listing zones')
self.assertEqual(n, len(zones))
def _assert_num_healthy_zones(self, action, n):
criterion = {
'action': action,
'pool_id': pm_module.CONF['service:pool_manager'].pool_id,
'status': '!%s' % pm_module.ERROR_STATUS
}
zones = self.service.central_api.find_zones(self.admin_context,
criterion)
if len(zones) != n:
LOG.error("Expected %d healthy zones, got %d", n, len(zones))
self._log_all_zones(zones, msg='listing zones')
self.assertEqual(n, len(zones))
@patch.object(mdns_rpcapi.MdnsAPI, 'notify_zone_changed')
def test_periodic_sync_and_recovery(
self, mock_cent_update_status, *a):
# Periodic sync + recovery
# Create healthy zones, run a periodic sync that will fail
self.create_zone(name='created.example.com.')
self._assert_num_healthy_zones(pm_module.CREATE_ACTION, 1)
z = self.create_zone(name='updated.example.net.')
z.email = 'info@example.net'
self.service.central_api.update_zone(self.admin_context, z)
self._assert_num_healthy_zones(pm_module.UPDATE_ACTION, 1)
with patch.object(self.service, '_update_zone_on_target',
return_value=False):
self.service.periodic_sync()
zones = self.service._fetch_healthy_zones(self.admin_context)
self.assertEqual(0, len(zones))
self._assert_num_failed_zones(pm_module.CREATE_ACTION, 1)
self._assert_num_failed_zones(pm_module.UPDATE_ACTION, 1)
# Now run a periodic_recovery that will fix the zones
backends = self.service.target_backends
for tid in self.service.target_backends:
backends[tid].create_zone = Mock()
backends[tid].update_zone = Mock()
backends[tid].delete_zone = Mock()
self.service.periodic_recovery()
# There are 2 pool targets in use
for backend in self.service.target_backends.itervalues():
self.assertEqual(1, backend.create_zone.call_count)
self.assertEqual(1, backend.update_zone.call_count)

View File

@ -14,6 +14,10 @@
# License for the specific language governing permissions and limitations
# under the License.
"""
Unit tests
"""
from mock import Mock
from mock import MagicMock
from mock import patch
@ -22,9 +26,11 @@ from oslotest import base as test
from designate import exceptions
from designate import objects
from designate.pool_manager.service import Service
from designate.tests.unit import RoObject
import designate.pool_manager.service as pm_module
class PoolManagerTest(test.BaseTestCase):
class PoolManagerInitTest(test.BaseTestCase):
def __setUp(self):
super(PoolManagerTest, self).setUp()
@ -56,3 +62,79 @@ class PoolManagerTest(test.BaseTestCase):
call2 = pm.tg.add_timer.call_args_list[1][0]
self.assertEqual(1800, call2[0])
self.assertEqual(1800, call2[-1])
class PoolManagerTest(test.BaseTestCase):
@patch.object(pm_module.DesignateContext, 'get_admin_context')
@patch.object(pm_module.central_api.CentralAPI, 'get_instance')
@patch.object(objects.Pool, 'from_config')
@patch.object(Service, '_setup_target_backends')
def setUp(self, *mocks):
super(PoolManagerTest, self).setUp()
self.pm = Service()
self.pm.pool.targets = ()
self.pm.tg.add_timer = Mock()
self.pm._pool_election = Mock()
self.pm.target_backends = {}
def test_get_failed_zones(self, *mocks):
with patch.object(self.pm.central_api, 'find_zones') as \
mock_find_zones:
self.pm._get_failed_zones('ctx', pm_module.DELETE_ACTION)
mock_find_zones.assert_called_once_with(
'ctx', {'action': 'DELETE', 'status': 'ERROR', 'pool_id':
'794ccc2c-d751-44fe-b57f-8894c9f5c842'})
@patch.object(pm_module.DesignateContext, 'get_admin_context')
def test_periodic_recover(self, mock_get_ctx, *mocks):
z = RoObject(name='a_zone')
def mock_get_failed_zones(ctx, action):
if action == pm_module.DELETE_ACTION:
return [z] * 3
if action == pm_module.CREATE_ACTION:
return [z] * 4
if action == pm_module.UPDATE_ACTION:
return [z] * 5
self.pm._get_failed_zones = mock_get_failed_zones
self.pm.delete_zone = Mock()
self.pm.create_zone = Mock()
self.pm.update_zone = Mock()
mock_ctx = mock_get_ctx.return_value
self.pm.periodic_recovery()
self.pm.delete_zone.assert_called_with(mock_ctx, z)
self.assertEqual(3, self.pm.delete_zone.call_count)
self.pm.create_zone.assert_called_with(mock_ctx, z)
self.assertEqual(4, self.pm.create_zone.call_count)
self.pm.update_zone.assert_called_with(mock_ctx, z)
self.assertEqual(5, self.pm.update_zone.call_count)
@patch.object(pm_module.DesignateContext, 'get_admin_context')
def test_periodic_recover_exception(self, mock_get_ctx, *mocks):
z = RoObject(name='a_zone')
# Raise an exception half through the recovery
def mock_get_failed_zones(ctx, action):
if action == pm_module.DELETE_ACTION:
return [z] * 3
if action == pm_module.CREATE_ACTION:
return [z] * 4
self.pm._get_failed_zones = mock_get_failed_zones
self.pm.delete_zone = Mock()
self.pm.create_zone = Mock(side_effect=Exception('oops'))
self.pm.update_zone = Mock()
mock_ctx = mock_get_ctx.return_value
self.pm.periodic_recovery()
self.pm.delete_zone.assert_called_with(mock_ctx, z)
self.assertEqual(3, self.pm.delete_zone.call_count)
self.pm.create_zone.assert_called_with(mock_ctx, z)
self.assertEqual(1, self.pm.create_zone.call_count)
self.assertEqual(0, self.pm.update_zone.call_count)