From 8481674b116f65eac6c5dd9cc344564b266c7b3f Mon Sep 17 00:00:00 2001 From: Endre Karlson Date: Tue, 30 Jun 2015 15:25:06 +0200 Subject: [PATCH] Add tasks and periodic emits Change-Id: I645abc6634442a842f2af01667e97951c2cc4809 --- designate/tests/test_zone_manager/__init__.py | 0 .../tests/test_zone_manager/test_service.py | 27 +++++ .../tests/test_zone_manager/test_tasks.py | 109 +++++++++++++++++ designate/utils.py | 3 + designate/zone_manager/__init__.py | 2 + designate/zone_manager/service.py | 20 +++- designate/zone_manager/tasks.py | 111 ++++++++++++++++++ setup.cfg | 3 + 8 files changed, 270 insertions(+), 5 deletions(-) create mode 100644 designate/tests/test_zone_manager/__init__.py create mode 100644 designate/tests/test_zone_manager/test_service.py create mode 100644 designate/tests/test_zone_manager/test_tasks.py create mode 100644 designate/zone_manager/tasks.py diff --git a/designate/tests/test_zone_manager/__init__.py b/designate/tests/test_zone_manager/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/designate/tests/test_zone_manager/test_service.py b/designate/tests/test_zone_manager/test_service.py new file mode 100644 index 000000000..4bbe23afb --- /dev/null +++ b/designate/tests/test_zone_manager/test_service.py @@ -0,0 +1,27 @@ +# Copyright 2015 Hewlett-Packard Development Company, L.P. +# +# Author: Endre Karlson +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +from oslo_log import log as logging + +from designate.tests import TestCase + +LOG = logging.getLogger(__name__) + + +class ZoneManagerServiceTest(TestCase): + def test_stop(self): + # Test stopping the service + service = self.start_service("zone_manager") + service.stop() diff --git a/designate/tests/test_zone_manager/test_tasks.py b/designate/tests/test_zone_manager/test_tasks.py new file mode 100644 index 000000000..b7ba263b8 --- /dev/null +++ b/designate/tests/test_zone_manager/test_tasks.py @@ -0,0 +1,109 @@ +# Copyright 2015 Hewlett-Packard Development Company, L.P. +# +# Author: Endre Karlson +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import datetime +import time + +import mock +from oslo_messaging.notify import notifier + +from designate import objects +from designate import utils +from designate.zone_manager import tasks +from designate.tests import TestCase + + +class TaskTest(TestCase): + def setUp(self): + super(TaskTest, self).setUp() + utils.register_plugin_opts() + + def _enable_tasks(self, tasks): + self.config( + enabled_tasks=",".join(tasks), + group="service:zone_manager") + + +class PeriodicExistsTest(TaskTest): + def setUp(self): + super(PeriodicExistsTest, self).setUp() + self.config( + interval=2, + group="zone_manager_task:periodic_exists") + self._enable_tasks("periodic_exists") + + def _wait_for_cond(self, condition, interval=0.5, max_attempts=20): + attempts = 0 + while attempts < max_attempts: + result = condition() + if result: + return result + time.sleep(interval) + attempts += 1 + raise ValueError + + @mock.patch.object(notifier.Notifier, 'info') + def test_emit_exists(self, mock_notifier): + domain = self.create_domain() + # Clear the create domain notification + mock_notifier.reset_mock() + + # Install our own period results + start, end = tasks.PeriodicExistsTask._get_period(2) + with mock.patch.object(tasks.PeriodicExistsTask, "_get_period", + return_value=(start, end,)): + + svc = self.start_service("zone_manager") + result = self._wait_for_cond( + lambda: mock_notifier.called is True, .5, 3) + self.assertEqual(True, result) + svc.stop() + + # Make some notification data in the same format that the task does + data = dict(domain) + del data["attributes"] + # For some reason domain.created when doing dict(domain) is a datetime + data["created_at"] = datetime.datetime.isoformat(domain.created_at) + data["audit_period_start"] = str(start) + data["audit_period_end"] = str(end) + + # .info(ctxt, event, payload) + mock_notifier.assert_called_with(mock.ANY, "dns.domain.exists", data) + + @mock.patch.object(notifier.Notifier, 'info') + def test_emit_exists_no_zones(self, mock_notifier): + self.start_service("zone_manager") + # Since the interval is 2 seconds we wait for the call to have been + # executed for 3 seconds + time.sleep(2) + self.assertEqual(False, mock_notifier.called) + + @mock.patch.object(notifier.Notifier, 'info') + def test_emit_exists_multiple_zones(self, mock_notifier): + zones = [] + for i in range(0, 10): + z = self.central_service.create_domain( + self.admin_context, + objects.Domain( + name="example%s.net." % i, + email="foo@example.com")) + zones.append(z) + + # Clear any notifications from create etc. + mock_notifier.reset_mock() + + # Start ZM so that the periodic task fires + self.start_service("zone_manager") + self._wait_for_cond(lambda: mock_notifier.call_count is 10) diff --git a/designate/utils.py b/designate/utils.py index 7e5f053e2..0c1ffd84c 100644 --- a/designate/utils.py +++ b/designate/utils.py @@ -106,6 +106,9 @@ def register_plugin_opts(): # Avoid circular dependency imports from designate import plugin + plugin.Plugin.register_cfg_opts('designate.zone_manager_tasks') + plugin.Plugin.register_extra_cfg_opts('designate.zone_manager_tasks') + # Register Backend Plugin Config Options plugin.Plugin.register_cfg_opts('designate.backend') plugin.Plugin.register_extra_cfg_opts('designate.backend') diff --git a/designate/zone_manager/__init__.py b/designate/zone_manager/__init__.py index da7036315..da117578c 100644 --- a/designate/zone_manager/__init__.py +++ b/designate/zone_manager/__init__.py @@ -26,6 +26,8 @@ OPTS = [ help='Number of Zone Manager worker processes to spawn'), cfg.IntOpt('threads', default=1000, help='Number of Zone Manager greenthreads to spawn'), + cfg.ListOpt('enabled_tasks', default=None, + help='Enabled tasks to run') ] CONF.register_opts(OPTS, group='service:zone_manager') diff --git a/designate/zone_manager/service.py b/designate/zone_manager/service.py index 699af2fb0..2b90c6ceb 100644 --- a/designate/zone_manager/service.py +++ b/designate/zone_manager/service.py @@ -19,12 +19,14 @@ from oslo_log import log as logging from designate.i18n import _LI from designate import coordination from designate import service -from designate.central import rpcapi as central_api +from designate.zone_manager import tasks LOG = logging.getLogger(__name__) CONF = cfg.CONF +NS = 'designate.periodic_tasks' + class Service(coordination.CoordinationMixin, service.Service): def __init__(self, threads=None): @@ -43,10 +45,18 @@ class Service(coordination.CoordinationMixin, service.Service): self._partitioner.start() self._partitioner.watch_partition_change(self._rebalance) + for task in tasks.PeriodicTask.get_extensions(): + LOG.debug("Registering task %s" % task) + + # Instantiate the task + task = task() + + # Subscribe for partition size updates. + self._partitioner.watch_partition_change(task.on_partition_change) + + interval = CONF[task.get_canonical_name()].interval + self.tg.add_timer(interval, task) + @property def service_name(self): return 'zone_manager' - - @property - def central_api(self): - return central_api.CentralAPI.get_instance() diff --git a/designate/zone_manager/tasks.py b/designate/zone_manager/tasks.py new file mode 100644 index 000000000..90185e403 --- /dev/null +++ b/designate/zone_manager/tasks.py @@ -0,0 +1,111 @@ +# Copyright 2015 Hewlett-Packard Development Company, L.P. +# +# Author: Endre Karlson +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import datetime + +from designate import context +from designate import plugin +from designate import rpc +from designate.central import rpcapi +from designate.i18n import _LI + +from oslo_config import cfg +from oslo_log import log as logging +from oslo_utils import timeutils + +LOG = logging.getLogger(__name__) + + +class PeriodicTask(plugin.ExtensionPlugin): + __plugin_ns__ = 'designate.zone_manager_tasks' + __plugin_type__ = 'zone_manager_task' + __interval__ = None + + def __init__(self): + self.my_partitions = None + self.options = cfg.CONF[self.get_canonical_name()] + + @classmethod + def get_base_opts(cls): + return [cfg.IntOpt('interval', default=cls.__interval__)] + + def on_partition_change(self, my_partitions, members, event): + self.my_partitions = my_partitions + + def _my_range(self): + return self.my_partitions[0], self.my_partitions[-1] + + def _filter_between(self, col): + return {col: "BETWEEN %s,%s" % self._my_range()} + + +class PeriodicExistsTask(PeriodicTask): + __plugin_name__ = 'periodic_exists' + __interval__ = 3600 + + def __init__(self): + super(PeriodicExistsTask, self).__init__() + self.notifier = rpc.get_notifier('zone_manager') + + @classmethod + def get_cfg_opts(cls): + group = cfg.OptGroup(cls.get_canonical_name()) + options = cls.get_base_opts() + [ + cfg.IntOpt('per_page', default=100) + ] + return [(group, options)] + + @property + def central_api(self): + return rpcapi.CentralAPI.get_instance() + + @staticmethod + def _get_period(seconds): + interval = datetime.timedelta(seconds=seconds) + end = timeutils.utcnow() + return end - interval, end + + def __call__(self): + pstart, pend = self._my_range() + msg = _LI("Emitting zone exist events for %(start)s to %(end)s") + LOG.info(msg % {"start": pstart, "end": pend}) + + ctxt = context.DesignateContext.get_admin_context() + ctxt.all_tenants = True + criterion = self._filter_between('shard') + + start, end = self._get_period(self.options.interval) + + data = { + "audit_period_start": str(start), + "audit_period_end": str(end) + } + + marker = None + while True: + zones = self.central_api.find_domains(ctxt, criterion, + marker=marker, + limit=self.options.per_page) + if len(zones) == 0: + LOG.info(_LI("Finished emitting events.")) + break + else: + marker = zones.objects[-1].id + + for zone in zones: + zone_data = dict(zone) + zone_data.update(data) + + self.notifier.info(ctxt, 'dns.domain.exists', zone_data) diff --git a/setup.cfg b/setup.cfg index f0d2273c3..a107d4cc0 100644 --- a/setup.cfg +++ b/setup.cfg @@ -110,6 +110,9 @@ designate.manage = powerdns = designate.manage.powerdns:DatabaseCommands tlds = designate.manage.tlds:TLDCommands +designate.zone_manager_tasks = + periodic_exists = designate.zone_manager.tasks:PeriodicExistsTask + [build_sphinx] all_files = 1 build-dir = doc/build