From d2a845404351d14b0a05d76a741e9b19011f2b36 Mon Sep 17 00:00:00 2001 From: Vladimir Ostroverkhov Date: Thu, 23 Feb 2017 14:10:10 +0300 Subject: [PATCH] Versioned Notifications for service object Implements: blueprint service-versioned-notifications-api Change-Id: I9d601edb265ee230104f6c63a5f044869aeb3a02 --- doc/notification_samples/service-update.json | 26 ++++ watcher/api/controllers/v1/service.py | 2 + watcher/api/scheduling.py | 99 +++++++++++++++ watcher/cmd/api.py | 4 + watcher/notifications/__init__.py | 1 + watcher/notifications/service.py | 113 +++++++++++++++++ watcher/tests/api/base.py | 7 ++ watcher/tests/api/test_scheduling.py | 114 ++++++++++++++++++ .../tests/notifications/test_notification.py | 5 + .../test_service_notifications.py | 77 ++++++++++++ 10 files changed, 448 insertions(+) create mode 100644 doc/notification_samples/service-update.json create mode 100644 watcher/api/scheduling.py create mode 100644 watcher/notifications/service.py create mode 100644 watcher/tests/api/test_scheduling.py create mode 100644 watcher/tests/notifications/test_service_notifications.py diff --git a/doc/notification_samples/service-update.json b/doc/notification_samples/service-update.json new file mode 100644 index 000000000..1f61e588e --- /dev/null +++ b/doc/notification_samples/service-update.json @@ -0,0 +1,26 @@ +{ + "payload": { + "watcher_object.name": "ServiceUpdatePayload", + "watcher_object.namespace": "watcher", + "watcher_object.data": { + "status_update": { + "watcher_object.name": "ServiceStatusUpdatePayload", + "watcher_object.namespace": "watcher", + "watcher_object.data": { + "old_state": "ACTIVE", + "state": "FAILED" + }, + "watcher_object.version": "1.0" + }, + "last_seen_up": "2016-09-22T08:32:06Z", + "name": "watcher-service", + "sevice_host": "controller" + }, + "watcher_object.version": "1.0" + }, + "event_type": "service.update", + "priority": "INFO", + "message_id": "3984dc2b-8aef-462b-a220-8ae04237a56e", + "timestamp": "2016-10-18 09:52:05.219414", + "publisher_id": "infra-optim:node0" +} \ No newline at end of file diff --git a/watcher/api/controllers/v1/service.py b/watcher/api/controllers/v1/service.py index 822beb9cd..63ea1794a 100644 --- a/watcher/api/controllers/v1/service.py +++ b/watcher/api/controllers/v1/service.py @@ -34,6 +34,7 @@ from watcher.api.controllers import base from watcher.api.controllers import link from watcher.api.controllers.v1 import collection from watcher.api.controllers.v1 import utils as api_utils +from watcher.common import context from watcher.common import exception from watcher.common import policy from watcher import objects @@ -51,6 +52,7 @@ class Service(base.APIBase): """ _status = None + _context = context.RequestContext(is_admin=True) def _get_status(self): return self._status diff --git a/watcher/api/scheduling.py b/watcher/api/scheduling.py new file mode 100644 index 000000000..035c6d2d4 --- /dev/null +++ b/watcher/api/scheduling.py @@ -0,0 +1,99 @@ +# -*- encoding: utf-8 -*- +# Copyright (c) 2017 Servionica +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import datetime +from oslo_config import cfg +from oslo_log import log +from oslo_utils import timeutils +import six + +from watcher._i18n import _LW +from watcher.common import context as watcher_context +from watcher.common import scheduling +from watcher import notifications + +from watcher import objects + +CONF = cfg.CONF +LOG = log.getLogger(__name__) + + +class APISchedulingService(scheduling.BackgroundSchedulerService): + + def __init__(self, gconfig=None, **options): + self.services_status = {} + gconfig = None or {} + super(APISchedulingService, self).__init__(gconfig, **options) + + def get_services_status(self, context): + services = objects.service.Service.list(context) + for service in services: + result = self.get_service_status(context, service.id) + if service.id not in self.services_status.keys(): + self.services_status[service.id] = result + continue + if self.services_status[service.id] != result: + self.services_status[service.id] = result + notifications.service.send_service_update(context, service, + state=result) + + def get_service_status(self, context, service_id): + service = objects.Service.get(context, service_id) + last_heartbeat = (service.last_seen_up or service.updated_at + or service.created_at) + if isinstance(last_heartbeat, six.string_types): + # NOTE(russellb) If this service came in over rpc via + # conductor, then the timestamp will be a string and needs to be + # converted back to a datetime. + last_heartbeat = timeutils.parse_strtime(last_heartbeat) + else: + # Objects have proper UTC timezones, but the timeutils comparison + # below does not (and will fail) + last_heartbeat = last_heartbeat.replace(tzinfo=None) + elapsed = timeutils.delta_seconds(last_heartbeat, timeutils.utcnow()) + is_up = abs(elapsed) <= CONF.service_down_time + if not is_up: + LOG.warning(_LW('Seems service %(name)s on host %(host)s is down. ' + 'Last heartbeat was %(lhb)s.' + 'Elapsed time is %(el)s'), + {'name': service.name, + 'host': service.host, + 'lhb': str(last_heartbeat), 'el': str(elapsed)}) + return objects.service.ServiceStatus.FAILED + + return objects.service.ServiceStatus.ACTIVE + + def start(self): + """Start service.""" + context = watcher_context.make_context(is_admin=True) + self.add_job(self.get_services_status, name='service_status', + trigger='interval', jobstore='default', args=[context], + next_run_time=datetime.datetime.now(), seconds=60) + super(APISchedulingService, self).start() + + def stop(self): + """Stop service.""" + self.shutdown() + + def wait(self): + """Wait for service to complete.""" + + def reset(self): + """Reset service. + + Called in case service running in daemon mode receives SIGHUP. + """ diff --git a/watcher/cmd/api.py b/watcher/cmd/api.py index c354ffc70..58c27e28a 100644 --- a/watcher/cmd/api.py +++ b/watcher/cmd/api.py @@ -22,6 +22,7 @@ import sys from oslo_config import cfg from oslo_log import log as logging +from watcher.api import scheduling from watcher.common import service from watcher import conf @@ -45,5 +46,8 @@ def main(): LOG.info('serving on %(protocol)s://%(host)s:%(port)s' % dict(protocol=protocol, host=host, port=port)) + api_schedule = scheduling.APISchedulingService() + api_schedule.start() + launcher = service.launch(CONF, server, workers=server.workers) launcher.wait() diff --git a/watcher/notifications/__init__.py b/watcher/notifications/__init__.py index c2add648e..cfed437bc 100644 --- a/watcher/notifications/__init__.py +++ b/watcher/notifications/__init__.py @@ -25,4 +25,5 @@ from watcher.notifications import action_plan # noqa from watcher.notifications import audit # noqa from watcher.notifications import exception # noqa from watcher.notifications import goal # noqa +from watcher.notifications import service # noqa from watcher.notifications import strategy # noqa diff --git a/watcher/notifications/service.py b/watcher/notifications/service.py new file mode 100644 index 000000000..1d2ab8a8e --- /dev/null +++ b/watcher/notifications/service.py @@ -0,0 +1,113 @@ +# -*- encoding: utf-8 -*- +# Copyright (c) 2017 Servionica +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from oslo_config import cfg + +from watcher.notifications import base as notificationbase +from watcher.objects import base +from watcher.objects import fields as wfields +from watcher.objects import service as o_service + +CONF = cfg.CONF + + +@base.WatcherObjectRegistry.register_notification +class ServicePayload(notificationbase.NotificationPayloadBase): + + SCHEMA = { + 'sevice_host': ('failed_service', 'host'), + 'name': ('failed_service', 'name'), + 'last_seen_up': ('failed_service', 'last_seen_up'), + } + # Version 1.0: Initial version + VERSION = '1.0' + fields = { + 'sevice_host': wfields.StringField(), + 'name': wfields.StringField(), + 'last_seen_up': wfields.DateTimeField(nullable=True), + } + + def __init__(self, failed_service, status_update, **kwargs): + super(ServicePayload, self).__init__( + failed_service=failed_service, + status_update=status_update, **kwargs) + self.populate_schema(failed_service=failed_service) + + +@base.WatcherObjectRegistry.register_notification +class ServiceStatusUpdatePayload(notificationbase.NotificationPayloadBase): + # Version 1.0: Initial version + VERSION = '1.0' + fields = { + 'old_state': wfields.StringField(nullable=True), + 'state': wfields.StringField(nullable=True), + } + + +@base.WatcherObjectRegistry.register_notification +class ServiceUpdatePayload(ServicePayload): + # Version 1.0: Initial version + VERSION = '1.0' + fields = { + 'status_update': wfields.ObjectField('ServiceStatusUpdatePayload'), + } + + def __init__(self, failed_service, status_update): + super(ServiceUpdatePayload, self).__init__( + failed_service=failed_service, + status_update=status_update) + + +@notificationbase.notification_sample('service-update.json') +@base.WatcherObjectRegistry.register_notification +class ServiceUpdateNotification(notificationbase.NotificationBase): + # Version 1.0: Initial version + VERSION = '1.0' + + fields = { + 'payload': wfields.ObjectField('ServiceUpdatePayload') + } + + +def send_service_update(context, failed_service, state, + service='infra-optim', + host=None): + """Emit an service failed notification.""" + if state == o_service.ServiceStatus.FAILED: + priority = wfields.NotificationPriority.WARNING + status_update = ServiceStatusUpdatePayload( + old_state=o_service.ServiceStatus.ACTIVE, + state=o_service.ServiceStatus.FAILED) + else: + priority = wfields.NotificationPriority.INFO + status_update = ServiceStatusUpdatePayload( + old_state=o_service.ServiceStatus.FAILED, + state=o_service.ServiceStatus.ACTIVE) + versioned_payload = ServiceUpdatePayload( + failed_service=failed_service, + status_update=status_update + ) + + notification = ServiceUpdateNotification( + priority=priority, + event_type=notificationbase.EventType( + object='service', + action=wfields.NotificationAction.UPDATE), + publisher=notificationbase.NotificationPublisher( + host=host or CONF.host, + binary=service), + payload=versioned_payload) + + notification.emit(context) diff --git a/watcher/tests/api/base.py b/watcher/tests/api/base.py index b5eb158ad..9b858a307 100644 --- a/watcher/tests/api/base.py +++ b/watcher/tests/api/base.py @@ -32,6 +32,7 @@ from six.moves.urllib import parse as urlparse from watcher.api import hooks from watcher.common import context as watcher_context +from watcher.notifications import service as n_service from watcher.tests.db import base PATH_PREFIX = '/v1' @@ -55,6 +56,12 @@ class FunctionalTest(base.DbTestCase): cfg.CONF.set_override("admin_user", "admin", group='keystone_authtoken', enforce_type=True) + + p_services = mock.patch.object(n_service, "send_service_update", + new_callable=mock.PropertyMock) + self.m_services = p_services.start() + self.addCleanup(p_services.stop) + self.app = self._make_app() def reset_pecan(): diff --git a/watcher/tests/api/test_scheduling.py b/watcher/tests/api/test_scheduling.py new file mode 100644 index 000000000..b857afa46 --- /dev/null +++ b/watcher/tests/api/test_scheduling.py @@ -0,0 +1,114 @@ +# -*- encoding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from apscheduler.schedulers import background +import datetime +import freezegun +import mock + +from watcher.api import scheduling +from watcher.notifications import service +from watcher import objects +from watcher.tests import base +from watcher.tests.db import base as db_base +from watcher.tests.db import utils + + +class TestSchedulingService(base.TestCase): + + @mock.patch.object(background.BackgroundScheduler, 'start') + def test_start_scheduling_service(self, m_start): + scheduler = scheduling.APISchedulingService() + scheduler.start() + m_start.assert_called_once_with(scheduler) + jobs = scheduler.get_jobs() + self.assertEqual(1, len(jobs)) + + +class TestSchedulingServiceFunctions(db_base.DbTestCase): + + def setUp(self): + super(TestSchedulingServiceFunctions, self).setUp() + fake_service = utils.get_test_service( + created_at=datetime.datetime.utcnow()) + self.fake_service = objects.Service(**fake_service) + + @mock.patch.object(scheduling.APISchedulingService, 'get_service_status') + @mock.patch.object(objects.Service, 'list') + @mock.patch.object(service, 'send_service_update') + def test_get_services_status_without_services_in_list( + self, mock_service_update, mock_get_list, mock_service_status): + scheduler = scheduling.APISchedulingService() + mock_get_list.return_value = [self.fake_service] + mock_service_status.return_value = 'ACTIVE' + scheduler.get_services_status(mock.ANY) + mock_service_status.assert_called_once_with(mock.ANY, + self.fake_service.id) + + mock_service_update.assert_not_called() + + @mock.patch.object(scheduling.APISchedulingService, 'get_service_status') + @mock.patch.object(objects.Service, 'list') + @mock.patch.object(service, 'send_service_update') + def test_get_services_status_with_services_in_list_same_status( + self, mock_service_update, mock_get_list, mock_service_status): + scheduler = scheduling.APISchedulingService() + mock_get_list.return_value = [self.fake_service] + scheduler.services_status = {1: 'ACTIVE'} + mock_service_status.return_value = 'ACTIVE' + scheduler.get_services_status(mock.ANY) + mock_service_status.assert_called_once_with(mock.ANY, + self.fake_service.id) + + mock_service_update.assert_not_called() + + @mock.patch.object(scheduling.APISchedulingService, 'get_service_status') + @mock.patch.object(objects.Service, 'list') + @mock.patch.object(service, 'send_service_update') + def test_get_services_status_with_services_in_list_diff_status( + self, mock_service_update, mock_get_list, mock_service_status): + scheduler = scheduling.APISchedulingService() + mock_get_list.return_value = [self.fake_service] + scheduler.services_status = {1: 'FAILED'} + mock_service_status.return_value = 'ACTIVE' + scheduler.get_services_status(mock.ANY) + mock_service_status.assert_called_once_with(mock.ANY, + self.fake_service.id) + + mock_service_update.assert_called_once_with(mock.ANY, + self.fake_service, + state='ACTIVE') + + @mock.patch.object(objects.Service, 'get') + def test_get_service_status_failed_service( + self, mock_get): + scheduler = scheduling.APISchedulingService() + mock_get.return_value = self.fake_service + service_status = scheduler.get_service_status(mock.ANY, + self.fake_service.id) + mock_get.assert_called_once_with(mock.ANY, + self.fake_service.id) + self.assertEqual('FAILED', service_status) + + @freezegun.freeze_time('2016-09-22T08:32:26.219414') + @mock.patch.object(objects.Service, 'get') + def test_get_service_status_failed_active( + self, mock_get): + scheduler = scheduling.APISchedulingService() + mock_get.return_value = self.fake_service + service_status = scheduler.get_service_status(mock.ANY, + self.fake_service.id) + mock_get.assert_called_once_with(mock.ANY, + self.fake_service.id) + self.assertEqual('ACTIVE', service_status) diff --git a/watcher/tests/notifications/test_notification.py b/watcher/tests/notifications/test_notification.py index 0f5b91139..f4fe6595f 100644 --- a/watcher/tests/notifications/test_notification.py +++ b/watcher/tests/notifications/test_notification.py @@ -288,6 +288,11 @@ expected_notification_fingerprints = { 'ActionUpdateNotification': '1.0-9b69de0724fda8310d05e18418178866', 'ActionUpdatePayload': '1.0-03306c7e7f4d49ac328c261eff6b30b8', 'TerseActionPlanPayload': '1.0-42bf7a5585cc111a9a4dbc008a04c67e', + 'ServiceUpdateNotification': '1.0-9b69de0724fda8310d05e18418178866', + 'ServicePayload': '1.0-9c5a9bc51e6606e0ec3cf95baf698f4f', + 'ServiceStatusUpdatePayload': '1.0-1a1b606bf14a2c468800c2b010801ce5', + 'ServiceUpdatePayload': '1.0-e0e9812a45958974693a723a2c820c3f' + } diff --git a/watcher/tests/notifications/test_service_notifications.py b/watcher/tests/notifications/test_service_notifications.py new file mode 100644 index 000000000..538fed081 --- /dev/null +++ b/watcher/tests/notifications/test_service_notifications.py @@ -0,0 +1,77 @@ +# -*- encoding: utf-8 -*- +# Copyright (c) 2017 Servionica +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import datetime + +import freezegun +import mock +import oslo_messaging as om + +from watcher.common import rpc +from watcher import notifications +from watcher.objects import service as w_service +from watcher.tests.db import base +from watcher.tests.objects import utils + + +@freezegun.freeze_time('2016-10-18T09:52:05.219414') +class TestActionPlanNotification(base.DbTestCase): + + def setUp(self): + super(TestActionPlanNotification, self).setUp() + p_get_notifier = mock.patch.object(rpc, 'get_notifier') + m_get_notifier = p_get_notifier.start() + self.addCleanup(p_get_notifier.stop) + self.m_notifier = mock.Mock(spec=om.Notifier) + + def fake_get_notifier(publisher_id): + self.m_notifier.publisher_id = publisher_id + return self.m_notifier + + m_get_notifier.side_effect = fake_get_notifier + + def test_service_failed(self): + service = utils.get_test_service(mock.Mock(), + created_at=datetime.datetime.utcnow()) + state = w_service.ServiceStatus.FAILED + notifications.service.send_service_update(mock.MagicMock(), + service, + state, + host='node0') + notification = self.m_notifier.warning.call_args[1] + payload = notification['payload'] + self.assertEqual("infra-optim:node0", self.m_notifier.publisher_id) + self.assertDictEqual({ + 'watcher_object.data': { + 'last_seen_up': '2016-09-22T08:32:06Z', + 'name': 'watcher-service', + 'sevice_host': 'controller', + 'status_update': { + 'watcher_object.data': { + 'old_state': 'ACTIVE', + 'state': 'FAILED' + }, + 'watcher_object.name': 'ServiceStatusUpdatePayload', + 'watcher_object.namespace': 'watcher', + 'watcher_object.version': '1.0' + } + }, + 'watcher_object.name': 'ServiceUpdatePayload', + 'watcher_object.namespace': 'watcher', + 'watcher_object.version': '1.0' + }, + payload + )