Versioned Notifications for service object

Implements: blueprint service-versioned-notifications-api

Change-Id: I9d601edb265ee230104f6c63a5f044869aeb3a02
This commit is contained in:
Vladimir Ostroverkhov 2017-02-23 14:10:10 +03:00
parent 40f6eea637
commit d2a8454043
10 changed files with 448 additions and 0 deletions

View File

@ -0,0 +1,26 @@
{
"payload": {
"watcher_object.name": "ServiceUpdatePayload",
"watcher_object.namespace": "watcher",
"watcher_object.data": {
"status_update": {
"watcher_object.name": "ServiceStatusUpdatePayload",
"watcher_object.namespace": "watcher",
"watcher_object.data": {
"old_state": "ACTIVE",
"state": "FAILED"
},
"watcher_object.version": "1.0"
},
"last_seen_up": "2016-09-22T08:32:06Z",
"name": "watcher-service",
"sevice_host": "controller"
},
"watcher_object.version": "1.0"
},
"event_type": "service.update",
"priority": "INFO",
"message_id": "3984dc2b-8aef-462b-a220-8ae04237a56e",
"timestamp": "2016-10-18 09:52:05.219414",
"publisher_id": "infra-optim:node0"
}

View File

@ -34,6 +34,7 @@ from watcher.api.controllers import base
from watcher.api.controllers import link
from watcher.api.controllers.v1 import collection
from watcher.api.controllers.v1 import utils as api_utils
from watcher.common import context
from watcher.common import exception
from watcher.common import policy
from watcher import objects
@ -51,6 +52,7 @@ class Service(base.APIBase):
"""
_status = None
_context = context.RequestContext(is_admin=True)
def _get_status(self):
return self._status

99
watcher/api/scheduling.py Normal file
View File

@ -0,0 +1,99 @@
# -*- encoding: utf-8 -*-
# Copyright (c) 2017 Servionica
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
from oslo_config import cfg
from oslo_log import log
from oslo_utils import timeutils
import six
from watcher._i18n import _LW
from watcher.common import context as watcher_context
from watcher.common import scheduling
from watcher import notifications
from watcher import objects
CONF = cfg.CONF
LOG = log.getLogger(__name__)
class APISchedulingService(scheduling.BackgroundSchedulerService):
def __init__(self, gconfig=None, **options):
self.services_status = {}
gconfig = None or {}
super(APISchedulingService, self).__init__(gconfig, **options)
def get_services_status(self, context):
services = objects.service.Service.list(context)
for service in services:
result = self.get_service_status(context, service.id)
if service.id not in self.services_status.keys():
self.services_status[service.id] = result
continue
if self.services_status[service.id] != result:
self.services_status[service.id] = result
notifications.service.send_service_update(context, service,
state=result)
def get_service_status(self, context, service_id):
service = objects.Service.get(context, service_id)
last_heartbeat = (service.last_seen_up or service.updated_at
or service.created_at)
if isinstance(last_heartbeat, six.string_types):
# NOTE(russellb) If this service came in over rpc via
# conductor, then the timestamp will be a string and needs to be
# converted back to a datetime.
last_heartbeat = timeutils.parse_strtime(last_heartbeat)
else:
# Objects have proper UTC timezones, but the timeutils comparison
# below does not (and will fail)
last_heartbeat = last_heartbeat.replace(tzinfo=None)
elapsed = timeutils.delta_seconds(last_heartbeat, timeutils.utcnow())
is_up = abs(elapsed) <= CONF.service_down_time
if not is_up:
LOG.warning(_LW('Seems service %(name)s on host %(host)s is down. '
'Last heartbeat was %(lhb)s.'
'Elapsed time is %(el)s'),
{'name': service.name,
'host': service.host,
'lhb': str(last_heartbeat), 'el': str(elapsed)})
return objects.service.ServiceStatus.FAILED
return objects.service.ServiceStatus.ACTIVE
def start(self):
"""Start service."""
context = watcher_context.make_context(is_admin=True)
self.add_job(self.get_services_status, name='service_status',
trigger='interval', jobstore='default', args=[context],
next_run_time=datetime.datetime.now(), seconds=60)
super(APISchedulingService, self).start()
def stop(self):
"""Stop service."""
self.shutdown()
def wait(self):
"""Wait for service to complete."""
def reset(self):
"""Reset service.
Called in case service running in daemon mode receives SIGHUP.
"""

View File

@ -22,6 +22,7 @@ import sys
from oslo_config import cfg
from oslo_log import log as logging
from watcher.api import scheduling
from watcher.common import service
from watcher import conf
@ -45,5 +46,8 @@ def main():
LOG.info('serving on %(protocol)s://%(host)s:%(port)s' %
dict(protocol=protocol, host=host, port=port))
api_schedule = scheduling.APISchedulingService()
api_schedule.start()
launcher = service.launch(CONF, server, workers=server.workers)
launcher.wait()

View File

@ -25,4 +25,5 @@ from watcher.notifications import action_plan # noqa
from watcher.notifications import audit # noqa
from watcher.notifications import exception # noqa
from watcher.notifications import goal # noqa
from watcher.notifications import service # noqa
from watcher.notifications import strategy # noqa

View File

@ -0,0 +1,113 @@
# -*- encoding: utf-8 -*-
# Copyright (c) 2017 Servionica
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_config import cfg
from watcher.notifications import base as notificationbase
from watcher.objects import base
from watcher.objects import fields as wfields
from watcher.objects import service as o_service
CONF = cfg.CONF
@base.WatcherObjectRegistry.register_notification
class ServicePayload(notificationbase.NotificationPayloadBase):
SCHEMA = {
'sevice_host': ('failed_service', 'host'),
'name': ('failed_service', 'name'),
'last_seen_up': ('failed_service', 'last_seen_up'),
}
# Version 1.0: Initial version
VERSION = '1.0'
fields = {
'sevice_host': wfields.StringField(),
'name': wfields.StringField(),
'last_seen_up': wfields.DateTimeField(nullable=True),
}
def __init__(self, failed_service, status_update, **kwargs):
super(ServicePayload, self).__init__(
failed_service=failed_service,
status_update=status_update, **kwargs)
self.populate_schema(failed_service=failed_service)
@base.WatcherObjectRegistry.register_notification
class ServiceStatusUpdatePayload(notificationbase.NotificationPayloadBase):
# Version 1.0: Initial version
VERSION = '1.0'
fields = {
'old_state': wfields.StringField(nullable=True),
'state': wfields.StringField(nullable=True),
}
@base.WatcherObjectRegistry.register_notification
class ServiceUpdatePayload(ServicePayload):
# Version 1.0: Initial version
VERSION = '1.0'
fields = {
'status_update': wfields.ObjectField('ServiceStatusUpdatePayload'),
}
def __init__(self, failed_service, status_update):
super(ServiceUpdatePayload, self).__init__(
failed_service=failed_service,
status_update=status_update)
@notificationbase.notification_sample('service-update.json')
@base.WatcherObjectRegistry.register_notification
class ServiceUpdateNotification(notificationbase.NotificationBase):
# Version 1.0: Initial version
VERSION = '1.0'
fields = {
'payload': wfields.ObjectField('ServiceUpdatePayload')
}
def send_service_update(context, failed_service, state,
service='infra-optim',
host=None):
"""Emit an service failed notification."""
if state == o_service.ServiceStatus.FAILED:
priority = wfields.NotificationPriority.WARNING
status_update = ServiceStatusUpdatePayload(
old_state=o_service.ServiceStatus.ACTIVE,
state=o_service.ServiceStatus.FAILED)
else:
priority = wfields.NotificationPriority.INFO
status_update = ServiceStatusUpdatePayload(
old_state=o_service.ServiceStatus.FAILED,
state=o_service.ServiceStatus.ACTIVE)
versioned_payload = ServiceUpdatePayload(
failed_service=failed_service,
status_update=status_update
)
notification = ServiceUpdateNotification(
priority=priority,
event_type=notificationbase.EventType(
object='service',
action=wfields.NotificationAction.UPDATE),
publisher=notificationbase.NotificationPublisher(
host=host or CONF.host,
binary=service),
payload=versioned_payload)
notification.emit(context)

View File

@ -32,6 +32,7 @@ from six.moves.urllib import parse as urlparse
from watcher.api import hooks
from watcher.common import context as watcher_context
from watcher.notifications import service as n_service
from watcher.tests.db import base
PATH_PREFIX = '/v1'
@ -55,6 +56,12 @@ class FunctionalTest(base.DbTestCase):
cfg.CONF.set_override("admin_user", "admin",
group='keystone_authtoken',
enforce_type=True)
p_services = mock.patch.object(n_service, "send_service_update",
new_callable=mock.PropertyMock)
self.m_services = p_services.start()
self.addCleanup(p_services.stop)
self.app = self._make_app()
def reset_pecan():

View File

@ -0,0 +1,114 @@
# -*- encoding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from apscheduler.schedulers import background
import datetime
import freezegun
import mock
from watcher.api import scheduling
from watcher.notifications import service
from watcher import objects
from watcher.tests import base
from watcher.tests.db import base as db_base
from watcher.tests.db import utils
class TestSchedulingService(base.TestCase):
@mock.patch.object(background.BackgroundScheduler, 'start')
def test_start_scheduling_service(self, m_start):
scheduler = scheduling.APISchedulingService()
scheduler.start()
m_start.assert_called_once_with(scheduler)
jobs = scheduler.get_jobs()
self.assertEqual(1, len(jobs))
class TestSchedulingServiceFunctions(db_base.DbTestCase):
def setUp(self):
super(TestSchedulingServiceFunctions, self).setUp()
fake_service = utils.get_test_service(
created_at=datetime.datetime.utcnow())
self.fake_service = objects.Service(**fake_service)
@mock.patch.object(scheduling.APISchedulingService, 'get_service_status')
@mock.patch.object(objects.Service, 'list')
@mock.patch.object(service, 'send_service_update')
def test_get_services_status_without_services_in_list(
self, mock_service_update, mock_get_list, mock_service_status):
scheduler = scheduling.APISchedulingService()
mock_get_list.return_value = [self.fake_service]
mock_service_status.return_value = 'ACTIVE'
scheduler.get_services_status(mock.ANY)
mock_service_status.assert_called_once_with(mock.ANY,
self.fake_service.id)
mock_service_update.assert_not_called()
@mock.patch.object(scheduling.APISchedulingService, 'get_service_status')
@mock.patch.object(objects.Service, 'list')
@mock.patch.object(service, 'send_service_update')
def test_get_services_status_with_services_in_list_same_status(
self, mock_service_update, mock_get_list, mock_service_status):
scheduler = scheduling.APISchedulingService()
mock_get_list.return_value = [self.fake_service]
scheduler.services_status = {1: 'ACTIVE'}
mock_service_status.return_value = 'ACTIVE'
scheduler.get_services_status(mock.ANY)
mock_service_status.assert_called_once_with(mock.ANY,
self.fake_service.id)
mock_service_update.assert_not_called()
@mock.patch.object(scheduling.APISchedulingService, 'get_service_status')
@mock.patch.object(objects.Service, 'list')
@mock.patch.object(service, 'send_service_update')
def test_get_services_status_with_services_in_list_diff_status(
self, mock_service_update, mock_get_list, mock_service_status):
scheduler = scheduling.APISchedulingService()
mock_get_list.return_value = [self.fake_service]
scheduler.services_status = {1: 'FAILED'}
mock_service_status.return_value = 'ACTIVE'
scheduler.get_services_status(mock.ANY)
mock_service_status.assert_called_once_with(mock.ANY,
self.fake_service.id)
mock_service_update.assert_called_once_with(mock.ANY,
self.fake_service,
state='ACTIVE')
@mock.patch.object(objects.Service, 'get')
def test_get_service_status_failed_service(
self, mock_get):
scheduler = scheduling.APISchedulingService()
mock_get.return_value = self.fake_service
service_status = scheduler.get_service_status(mock.ANY,
self.fake_service.id)
mock_get.assert_called_once_with(mock.ANY,
self.fake_service.id)
self.assertEqual('FAILED', service_status)
@freezegun.freeze_time('2016-09-22T08:32:26.219414')
@mock.patch.object(objects.Service, 'get')
def test_get_service_status_failed_active(
self, mock_get):
scheduler = scheduling.APISchedulingService()
mock_get.return_value = self.fake_service
service_status = scheduler.get_service_status(mock.ANY,
self.fake_service.id)
mock_get.assert_called_once_with(mock.ANY,
self.fake_service.id)
self.assertEqual('ACTIVE', service_status)

View File

@ -288,6 +288,11 @@ expected_notification_fingerprints = {
'ActionUpdateNotification': '1.0-9b69de0724fda8310d05e18418178866',
'ActionUpdatePayload': '1.0-03306c7e7f4d49ac328c261eff6b30b8',
'TerseActionPlanPayload': '1.0-42bf7a5585cc111a9a4dbc008a04c67e',
'ServiceUpdateNotification': '1.0-9b69de0724fda8310d05e18418178866',
'ServicePayload': '1.0-9c5a9bc51e6606e0ec3cf95baf698f4f',
'ServiceStatusUpdatePayload': '1.0-1a1b606bf14a2c468800c2b010801ce5',
'ServiceUpdatePayload': '1.0-e0e9812a45958974693a723a2c820c3f'
}

View File

@ -0,0 +1,77 @@
# -*- encoding: utf-8 -*-
# Copyright (c) 2017 Servionica
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import freezegun
import mock
import oslo_messaging as om
from watcher.common import rpc
from watcher import notifications
from watcher.objects import service as w_service
from watcher.tests.db import base
from watcher.tests.objects import utils
@freezegun.freeze_time('2016-10-18T09:52:05.219414')
class TestActionPlanNotification(base.DbTestCase):
def setUp(self):
super(TestActionPlanNotification, self).setUp()
p_get_notifier = mock.patch.object(rpc, 'get_notifier')
m_get_notifier = p_get_notifier.start()
self.addCleanup(p_get_notifier.stop)
self.m_notifier = mock.Mock(spec=om.Notifier)
def fake_get_notifier(publisher_id):
self.m_notifier.publisher_id = publisher_id
return self.m_notifier
m_get_notifier.side_effect = fake_get_notifier
def test_service_failed(self):
service = utils.get_test_service(mock.Mock(),
created_at=datetime.datetime.utcnow())
state = w_service.ServiceStatus.FAILED
notifications.service.send_service_update(mock.MagicMock(),
service,
state,
host='node0')
notification = self.m_notifier.warning.call_args[1]
payload = notification['payload']
self.assertEqual("infra-optim:node0", self.m_notifier.publisher_id)
self.assertDictEqual({
'watcher_object.data': {
'last_seen_up': '2016-09-22T08:32:06Z',
'name': 'watcher-service',
'sevice_host': 'controller',
'status_update': {
'watcher_object.data': {
'old_state': 'ACTIVE',
'state': 'FAILED'
},
'watcher_object.name': 'ServiceStatusUpdatePayload',
'watcher_object.namespace': 'watcher',
'watcher_object.version': '1.0'
}
},
'watcher_object.name': 'ServiceUpdatePayload',
'watcher_object.namespace': 'watcher',
'watcher_object.version': '1.0'
},
payload
)