diff --git a/congress/datasources/monasca_driver.py b/congress/datasources/monasca_driver.py index 1165e3263..b61727bf8 100644 --- a/congress/datasources/monasca_driver.py +++ b/congress/datasources/monasca_driver.py @@ -1,4 +1,4 @@ -# Copyright (c) 2015 Cisco. +# Copyright (c) 2015 Cisco, 2018 NEC, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -13,9 +13,13 @@ # under the License. # -import datetime +from datetime import datetime +from datetime import timedelta +import eventlet +from futurist import periodics from monascaclient import client as monasca_client +from oslo_concurrency import lockutils from oslo_log import log as logging from congress.datasources import constants @@ -24,6 +28,13 @@ from congress.datasources import datasource_utils as ds_utils LOG = logging.getLogger(__name__) +DATA = "statistics.data" +DIMENSIONS = "dimensions" +METRICS = "metrics" +NOTIFICATIONS = "alarm_notification" +STATISTICS = "statistics" +value_trans = {'translation-type': 'VALUE'} + # TODO(thinrichs): figure out how to move even more of this boilerplate # into DataSourceDriver. E.g. change all the classes to Driver instead of @@ -32,16 +43,10 @@ LOG = logging.getLogger(__name__) class MonascaDriver(datasource_driver.PollingDataSourceDriver, datasource_driver.ExecutionDriver): - METRICS = "metrics" - DIMENSIONS = "dimensions" - STATISTICS = "statistics" - DATA = "statistics.data" # TODO(fabiog): add events and logs when fully supported in Monasca # EVENTS = "events" # LOGS = "logs" - value_trans = {'translation-type': 'VALUE'} - metric_translator = { 'translation-type': 'HDICT', 'table-name': METRICS, @@ -115,9 +120,9 @@ class MonascaDriver(datasource_driver.PollingDataSourceDriver, self.add_update_method(statistics_method, self.statistics_translator) def update_statistics(self): - today = datetime.datetime.now() - yesterday = datetime.timedelta(hours=24) - start_from = datetime.datetime.isoformat(today-yesterday) + today = datetime.utcnow() + yesterday = timedelta(hours=24) + start_from = datetime.isoformat(today-yesterday) for metric in self.monasca.metrics.list_names(): LOG.debug("Monasca statistics for metric %s", metric['name']) @@ -158,3 +163,115 @@ class MonascaDriver(datasource_driver.PollingDataSourceDriver, func(action_args) else: self._execute_api(self.monasca, action, action_args) + + +class MonascaWebhookDriver(datasource_driver.PushedDataSourceDriver): + + def flatten_alarm_webhook(alarm_obj): + flattened = [] + key_to_sub_dict = 'metrics' + for alarm in alarm_obj: + sub_dict = alarm.pop(key_to_sub_dict)[0] + for k, v in sub_dict.items(): + if isinstance(v, dict): + for key, value in v.items(): + alarm[key_to_sub_dict + '_' + k + '_' + key] = value + else: + alarm[key_to_sub_dict + '_' + k] = v + flattened.append(alarm) + return flattened + + alarm_notification_translator = { + 'translation-type': 'HDICT', + 'table-name': NOTIFICATIONS, + 'selector-type': 'DICT_SELECTOR', + 'objects-extract-fn': flatten_alarm_webhook, + 'field-translators': + ({'fieldname': 'alarm_id', 'translator': value_trans}, + {'fieldname': 'alarm_definition_id', 'translator': value_trans}, + {'fieldname': 'alarm_name', 'translator': value_trans}, + {'fieldname': 'alarm_description', 'translator': value_trans}, + {'fieldname': 'alarm_timestamp', 'translator': value_trans}, + {'fieldname': 'state', 'translator': value_trans}, + {'fieldname': 'old_state', 'translator': value_trans}, + {'fieldname': 'message', 'translator': value_trans}, + {'fieldname': 'tenant_id', 'translator': value_trans}, + {'fieldname': 'metrics_id', 'translator': value_trans}, + {'fieldname': 'metrics_name', 'translator': value_trans}, + {'fieldname': 'metrics_dimensions_hostname', + 'translator': value_trans}, + {'fieldname': 'metrics_dimensions_service', + 'translator': value_trans},) + } + TRANSLATORS = [alarm_notification_translator] + + def __init__(self, name='', args=None): + super(MonascaWebhookDriver, self).__init__(name, args=args) + if args is None: + args = {} + # set default time to 10 days before deleting an active alarm + self.hours_to_keep_alarm = int(args.get('hours_to_keep_alarm', 240)) + self.set_up_periodic_tasks() + + @staticmethod + def get_datasource_info(): + result = {} + result['id'] = 'monasca_webhook_driver' + result['description'] = ('Datasource driver that accepts Monasca ' + 'webhook alarm notifications.') + result['config'] = {'persist_data': constants.OPTIONAL, + 'hours_to_keep_alarm': constants.OPTIONAL} + return result + + def _webhook_handler(self, alarm): + tablename = NOTIFICATIONS + # remove already existing same alarm row + alarm_id = alarm['alarm_id'] + column_index_number_of_alarm_id = 0 + to_remove = [row for row in self.state[tablename] + if row[column_index_number_of_alarm_id] == alarm_id] + for row in to_remove: + self.state[tablename].discard(row) + + translator = self.alarm_notification_translator + row_data = MonascaWebhookDriver.convert_objs([alarm], translator) + + # add alarm to table + for table, row in row_data: + if table == tablename: + self.state[tablename].add(row) + LOG.debug('publish a new state %s in %s', + self.state[tablename], tablename) + self.publish(tablename, self.state[tablename]) + return [tablename] + + def set_up_periodic_tasks(self): + @lockutils.synchronized('congress_monasca_webhook_ds_data') + @periodics.periodic(spacing=max(self.hours_to_keep_alarm * 3600/10, 1)) + def delete_old_alarms(): + tablename = NOTIFICATIONS + col_index_of_timestamp = 4 + # find for removal all alarms at least self.hours_to_keep_alarm old + to_remove = [ + row for row in self.state[tablename] + if (datetime.utcnow() - + datetime.utcfromtimestamp(row[col_index_of_timestamp]) + >= timedelta(hours=self.hours_to_keep_alarm))] + for row in to_remove: + self.state[tablename].discard(row) + + periodic_task_callables = [ + (delete_old_alarms, None, {}), + (delete_old_alarms, None, {})] + self.periodic_tasks = periodics.PeriodicWorker(periodic_task_callables) + self.periodic_tasks_thread = eventlet.spawn_n( + self.periodic_tasks.start) + + def __del__(self): + if self.periodic_tasks: + self.periodic_tasks.stop() + self.periodic_tasks.wait() + self.periodic_tasks = None + if self.periodic_tasks_thread: + eventlet.greenthread.kill(self.periodic_tasks_thread) + self.periodic_tasks_thread = None diff --git a/congress/tests/datasources/test_monasca_driver.py b/congress/tests/datasources/test_monasca_driver.py index f87a4ee8a..028571b00 100644 --- a/congress/tests/datasources/test_monasca_driver.py +++ b/congress/tests/datasources/test_monasca_driver.py @@ -1,4 +1,4 @@ -# Copyright (c) 2015 Cisco, Inc. All rights reserved. +# Copyright (c) 2018 NEC, Inc. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -14,6 +14,7 @@ # import mock import sys +import time sys.modules['monascaclient.client'] = mock.Mock() sys.modules['monascaclient'] = mock.Mock() @@ -68,8 +69,8 @@ class TestMonascaDriver(base.TestCase): def test_statistics_update_from_datasource(self): self.driver._translate_statistics(self.mock_statistics['elements']) - stats_list = list(self.driver.state[self.driver.STATISTICS]) - stats_data_list = list(self.driver.state[self.driver.DATA]) + stats_list = list(self.driver.state[monasca_driver.STATISTICS]) + stats_data_list = list(self.driver.state[monasca_driver.DATA]) self.assertIsNotNone(stats_list) self.assertIsNotNone(stats_data_list) @@ -129,3 +130,73 @@ class TestMonascaDriver(base.TestCase): self.driver.execute('getStatistics', api_args) self.assertEqual(expected_ans, monasca_client.testkey) + + +class TestMonascaWebhookDriver(base.TestCase): + + def setUp(self): + super(TestMonascaWebhookDriver, self).setUp() + self.monasca = monasca_driver.MonascaWebhookDriver('test-monasca') + + @mock.patch.object(monasca_driver.MonascaWebhookDriver, 'publish') + def test_monasca_webhook_alarm(self, mocked_publish): + test_alarm = { + 'metrics': [ + {u'dimensions': {u'hostname': u'openstack-13.local.lan', + u'service': u'monitoring'}, + u'id': None, + u'name': u'load.avg_1_min'}], + 'alarm_id': u'3beb4934-053d-4f8f-9704-273bffc2441b', + 'state': u'ALARM', + 'alarm_timestamp': 1531821822, + 'tenant_id': u'3661888238874df098988deab07c599d', + 'old_state': u'UNDETERMINED', + 'alarm_description': u'', + 'message': u'Thresholds were exceeded for the sub-alarms', + 'alarm_definition_id': u'8e5d033f-28cc-459f-91d4-813307e4ca8a', + 'alarm_name': u'alarmPerHost23'} + self.monasca._webhook_handler(test_alarm) + + expected_rows = set([(u'3beb4934-053d-4f8f-9704-273bffc2441b', + u'8e5d033f-28cc-459f-91d4-813307e4ca8a', + u'alarmPerHost23', + u'', + 1531821822, + u'ALARM', + u'UNDETERMINED', + u'Thresholds were exceeded for the sub-alarms', + u'3661888238874df098988deab07c599d', + None, + u'load.avg_1_min', + u'openstack-13.local.lan', + u'monitoring')]) + self.assertEqual(self.monasca.state['alarm_notification'], + expected_rows) + + @mock.patch.object(monasca_driver.MonascaWebhookDriver, 'publish') + def test_webhook_alarm_cleanup(self, mocked_publish): + self.monasca = monasca_driver.MonascaWebhookDriver( + 'test-monasca', + args={'hours_to_keep_alarm': 1 / 3600}) # set to 1 sec for test + + test_alarm = { + 'metrics': [ + {u'dimensions': {u'hostname': u'openstack-13.local.lan', + u'service': u'monitoring'}, + u'id': None, + u'name': u'load.avg_1_min'}], + 'alarm_id': u'3beb4934-053d-4f8f-9704-273bffc2441b', + 'state': u'ALARM', + 'alarm_timestamp': 1531821822, + 'tenant_id': u'3661888238874df098988deab07c599d', + 'old_state': u'UNDETERMINED', + 'alarm_description': u'', + 'message': u'Thresholds were exceeded for the sub-alarms', + 'alarm_definition_id': u'8e5d033f-28cc-459f-91d4-813307e4ca8a', + 'alarm_name': u'alarmPerHost23'} + + self.monasca._webhook_handler(test_alarm) + + self.assertEqual(1, len(self.monasca.state['alarm_notification'])) + time.sleep(3) + self.assertEqual(0, len(self.monasca.state['alarm_notification'])) diff --git a/congress/tests/helper.py b/congress/tests/helper.py index baff43963..89c0a9f34 100644 --- a/congress/tests/helper.py +++ b/congress/tests/helper.py @@ -515,5 +515,8 @@ def supported_drivers(): "description": "Datasource driver that interfaces with Mistral."}, {"id": "vitrage", "description": "Datasource driver that accepts Vitrage " - "webhook alarm notifications."}] + "webhook alarm notifications."}, + {"id": "monasca_webhook_driver", + "description": "Datasource driver that accepts Monasca webhook " + "alarm notifications."}] return results diff --git a/setup.cfg b/setup.cfg index 6909d9f8e..7eb994e6e 100644 --- a/setup.cfg +++ b/setup.cfg @@ -63,6 +63,7 @@ congress.datasource.drivers = keystonev3 = congress.datasources.keystonev3_driver:KeystoneV3Driver mistral = congress.datasources.mistral_driver:MistralDriver monasca = congress.datasources.monasca_driver:MonascaDriver + monasca_webhook_driver = congress.datasources.monasca_driver:MonascaWebhookDriver murano = congress.datasources.murano_driver:MuranoDriver neutronv2 = congress.datasources.neutronv2_driver:NeutronV2Driver neutronv2_qos = congress.datasources.neutronv2_qos_driver:NeutronV2QosDriver