Merge "Adding monasca push driver to receive webhook notifications"

This commit is contained in:
Zuul 2018-07-26 20:27:01 +00:00 committed by Gerrit Code Review
commit 814eac046e
4 changed files with 207 additions and 15 deletions

View File

@ -1,4 +1,4 @@
# Copyright (c) 2015 Cisco.
# Copyright (c) 2015 Cisco, 2018 NEC, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@ -13,9 +13,13 @@
# under the License.
#
import datetime
from datetime import datetime
from datetime import timedelta
import eventlet
from futurist import periodics
from monascaclient import client as monasca_client
from oslo_concurrency import lockutils
from oslo_log import log as logging
from congress.datasources import constants
@ -24,6 +28,13 @@ from congress.datasources import datasource_utils as ds_utils
LOG = logging.getLogger(__name__)
DATA = "statistics.data"
DIMENSIONS = "dimensions"
METRICS = "metrics"
NOTIFICATIONS = "alarm_notification"
STATISTICS = "statistics"
value_trans = {'translation-type': 'VALUE'}
# TODO(thinrichs): figure out how to move even more of this boilerplate
# into DataSourceDriver. E.g. change all the classes to Driver instead of
@ -32,16 +43,10 @@ LOG = logging.getLogger(__name__)
class MonascaDriver(datasource_driver.PollingDataSourceDriver,
datasource_driver.ExecutionDriver):
METRICS = "metrics"
DIMENSIONS = "dimensions"
STATISTICS = "statistics"
DATA = "statistics.data"
# TODO(fabiog): add events and logs when fully supported in Monasca
# EVENTS = "events"
# LOGS = "logs"
value_trans = {'translation-type': 'VALUE'}
metric_translator = {
'translation-type': 'HDICT',
'table-name': METRICS,
@ -115,9 +120,9 @@ class MonascaDriver(datasource_driver.PollingDataSourceDriver,
self.add_update_method(statistics_method, self.statistics_translator)
def update_statistics(self):
today = datetime.datetime.now()
yesterday = datetime.timedelta(hours=24)
start_from = datetime.datetime.isoformat(today-yesterday)
today = datetime.utcnow()
yesterday = timedelta(hours=24)
start_from = datetime.isoformat(today-yesterday)
for metric in self.monasca.metrics.list_names():
LOG.debug("Monasca statistics for metric %s", metric['name'])
@ -158,3 +163,115 @@ class MonascaDriver(datasource_driver.PollingDataSourceDriver,
func(action_args)
else:
self._execute_api(self.monasca, action, action_args)
class MonascaWebhookDriver(datasource_driver.PushedDataSourceDriver):
def flatten_alarm_webhook(alarm_obj):
flattened = []
key_to_sub_dict = 'metrics'
for alarm in alarm_obj:
sub_dict = alarm.pop(key_to_sub_dict)[0]
for k, v in sub_dict.items():
if isinstance(v, dict):
for key, value in v.items():
alarm[key_to_sub_dict + '_' + k + '_' + key] = value
else:
alarm[key_to_sub_dict + '_' + k] = v
flattened.append(alarm)
return flattened
alarm_notification_translator = {
'translation-type': 'HDICT',
'table-name': NOTIFICATIONS,
'selector-type': 'DICT_SELECTOR',
'objects-extract-fn': flatten_alarm_webhook,
'field-translators':
({'fieldname': 'alarm_id', 'translator': value_trans},
{'fieldname': 'alarm_definition_id', 'translator': value_trans},
{'fieldname': 'alarm_name', 'translator': value_trans},
{'fieldname': 'alarm_description', 'translator': value_trans},
{'fieldname': 'alarm_timestamp', 'translator': value_trans},
{'fieldname': 'state', 'translator': value_trans},
{'fieldname': 'old_state', 'translator': value_trans},
{'fieldname': 'message', 'translator': value_trans},
{'fieldname': 'tenant_id', 'translator': value_trans},
{'fieldname': 'metrics_id', 'translator': value_trans},
{'fieldname': 'metrics_name', 'translator': value_trans},
{'fieldname': 'metrics_dimensions_hostname',
'translator': value_trans},
{'fieldname': 'metrics_dimensions_service',
'translator': value_trans},)
}
TRANSLATORS = [alarm_notification_translator]
def __init__(self, name='', args=None):
super(MonascaWebhookDriver, self).__init__(name, args=args)
if args is None:
args = {}
# set default time to 10 days before deleting an active alarm
self.hours_to_keep_alarm = int(args.get('hours_to_keep_alarm', 240))
self.set_up_periodic_tasks()
@staticmethod
def get_datasource_info():
result = {}
result['id'] = 'monasca_webhook_driver'
result['description'] = ('Datasource driver that accepts Monasca '
'webhook alarm notifications.')
result['config'] = {'persist_data': constants.OPTIONAL,
'hours_to_keep_alarm': constants.OPTIONAL}
return result
def _webhook_handler(self, alarm):
tablename = NOTIFICATIONS
# remove already existing same alarm row
alarm_id = alarm['alarm_id']
column_index_number_of_alarm_id = 0
to_remove = [row for row in self.state[tablename]
if row[column_index_number_of_alarm_id] == alarm_id]
for row in to_remove:
self.state[tablename].discard(row)
translator = self.alarm_notification_translator
row_data = MonascaWebhookDriver.convert_objs([alarm], translator)
# add alarm to table
for table, row in row_data:
if table == tablename:
self.state[tablename].add(row)
LOG.debug('publish a new state %s in %s',
self.state[tablename], tablename)
self.publish(tablename, self.state[tablename])
return [tablename]
def set_up_periodic_tasks(self):
@lockutils.synchronized('congress_monasca_webhook_ds_data')
@periodics.periodic(spacing=max(self.hours_to_keep_alarm * 3600/10, 1))
def delete_old_alarms():
tablename = NOTIFICATIONS
col_index_of_timestamp = 4
# find for removal all alarms at least self.hours_to_keep_alarm old
to_remove = [
row for row in self.state[tablename]
if (datetime.utcnow() -
datetime.utcfromtimestamp(row[col_index_of_timestamp])
>= timedelta(hours=self.hours_to_keep_alarm))]
for row in to_remove:
self.state[tablename].discard(row)
periodic_task_callables = [
(delete_old_alarms, None, {}),
(delete_old_alarms, None, {})]
self.periodic_tasks = periodics.PeriodicWorker(periodic_task_callables)
self.periodic_tasks_thread = eventlet.spawn_n(
self.periodic_tasks.start)
def __del__(self):
if self.periodic_tasks:
self.periodic_tasks.stop()
self.periodic_tasks.wait()
self.periodic_tasks = None
if self.periodic_tasks_thread:
eventlet.greenthread.kill(self.periodic_tasks_thread)
self.periodic_tasks_thread = None

View File

@ -1,4 +1,4 @@
# Copyright (c) 2015 Cisco, Inc. All rights reserved.
# Copyright (c) 2018 NEC, Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@ -14,6 +14,7 @@
#
import mock
import sys
import time
sys.modules['monascaclient.client'] = mock.Mock()
sys.modules['monascaclient'] = mock.Mock()
@ -68,8 +69,8 @@ class TestMonascaDriver(base.TestCase):
def test_statistics_update_from_datasource(self):
self.driver._translate_statistics(self.mock_statistics['elements'])
stats_list = list(self.driver.state[self.driver.STATISTICS])
stats_data_list = list(self.driver.state[self.driver.DATA])
stats_list = list(self.driver.state[monasca_driver.STATISTICS])
stats_data_list = list(self.driver.state[monasca_driver.DATA])
self.assertIsNotNone(stats_list)
self.assertIsNotNone(stats_data_list)
@ -129,3 +130,73 @@ class TestMonascaDriver(base.TestCase):
self.driver.execute('getStatistics', api_args)
self.assertEqual(expected_ans, monasca_client.testkey)
class TestMonascaWebhookDriver(base.TestCase):
def setUp(self):
super(TestMonascaWebhookDriver, self).setUp()
self.monasca = monasca_driver.MonascaWebhookDriver('test-monasca')
@mock.patch.object(monasca_driver.MonascaWebhookDriver, 'publish')
def test_monasca_webhook_alarm(self, mocked_publish):
test_alarm = {
'metrics': [
{u'dimensions': {u'hostname': u'openstack-13.local.lan',
u'service': u'monitoring'},
u'id': None,
u'name': u'load.avg_1_min'}],
'alarm_id': u'3beb4934-053d-4f8f-9704-273bffc2441b',
'state': u'ALARM',
'alarm_timestamp': 1531821822,
'tenant_id': u'3661888238874df098988deab07c599d',
'old_state': u'UNDETERMINED',
'alarm_description': u'',
'message': u'Thresholds were exceeded for the sub-alarms',
'alarm_definition_id': u'8e5d033f-28cc-459f-91d4-813307e4ca8a',
'alarm_name': u'alarmPerHost23'}
self.monasca._webhook_handler(test_alarm)
expected_rows = set([(u'3beb4934-053d-4f8f-9704-273bffc2441b',
u'8e5d033f-28cc-459f-91d4-813307e4ca8a',
u'alarmPerHost23',
u'',
1531821822,
u'ALARM',
u'UNDETERMINED',
u'Thresholds were exceeded for the sub-alarms',
u'3661888238874df098988deab07c599d',
None,
u'load.avg_1_min',
u'openstack-13.local.lan',
u'monitoring')])
self.assertEqual(self.monasca.state['alarm_notification'],
expected_rows)
@mock.patch.object(monasca_driver.MonascaWebhookDriver, 'publish')
def test_webhook_alarm_cleanup(self, mocked_publish):
self.monasca = monasca_driver.MonascaWebhookDriver(
'test-monasca',
args={'hours_to_keep_alarm': 1 / 3600}) # set to 1 sec for test
test_alarm = {
'metrics': [
{u'dimensions': {u'hostname': u'openstack-13.local.lan',
u'service': u'monitoring'},
u'id': None,
u'name': u'load.avg_1_min'}],
'alarm_id': u'3beb4934-053d-4f8f-9704-273bffc2441b',
'state': u'ALARM',
'alarm_timestamp': 1531821822,
'tenant_id': u'3661888238874df098988deab07c599d',
'old_state': u'UNDETERMINED',
'alarm_description': u'',
'message': u'Thresholds were exceeded for the sub-alarms',
'alarm_definition_id': u'8e5d033f-28cc-459f-91d4-813307e4ca8a',
'alarm_name': u'alarmPerHost23'}
self.monasca._webhook_handler(test_alarm)
self.assertEqual(1, len(self.monasca.state['alarm_notification']))
time.sleep(3)
self.assertEqual(0, len(self.monasca.state['alarm_notification']))

View File

@ -515,5 +515,8 @@ def supported_drivers():
"description": "Datasource driver that interfaces with Mistral."},
{"id": "vitrage",
"description": "Datasource driver that accepts Vitrage "
"webhook alarm notifications."}]
"webhook alarm notifications."},
{"id": "monasca_webhook_driver",
"description": "Datasource driver that accepts Monasca webhook "
"alarm notifications."}]
return results

View File

@ -63,6 +63,7 @@ congress.datasource.drivers =
keystonev3 = congress.datasources.keystonev3_driver:KeystoneV3Driver
mistral = congress.datasources.mistral_driver:MistralDriver
monasca = congress.datasources.monasca_driver:MonascaDriver
monasca_webhook_driver = congress.datasources.monasca_driver:MonascaWebhookDriver
murano = congress.datasources.murano_driver:MuranoDriver
neutronv2 = congress.datasources.neutronv2_driver:NeutronV2Driver
neutronv2_qos = congress.datasources.neutronv2_qos_driver:NeutronV2QosDriver