From 8273de7ca6264e336c8401cc1f708fa3bf94757e Mon Sep 17 00:00:00 2001 From: Jakub Wachowski Date: Mon, 7 Aug 2017 14:45:09 +0200 Subject: [PATCH] Add support for persisting events This change extends the persister to read OpenStack event objects from kafka topic and then stores them into ElasticSearch. Story: 2001112 Task: 4843 Change-Id: I35b0db67dc088e56ad281c84fc4b50fa7a064e44 --- monasca_persister/conf/elasticsearch.py | 52 +++++++++++++ monasca_persister/conf/kafka_events.py | 54 ++++++++++++++ monasca_persister/conf/repositories.py | 5 +- monasca_persister/persister.py | 21 +++--- .../repositories/elasticsearch/__init__.py | 0 .../elasticsearch/events_repository.py | 74 +++++++++++++++++++ monasca_persister/repositories/utils.py | 12 +++ monasca_persister/tests/events.json | 54 ++++++++++++++ monasca_persister/tests/test_events.py | 64 ++++++++++++++++ .../tests/test_persister_main.py | 3 + setup.cfg | 2 + tox.ini | 2 +- 12 files changed, 331 insertions(+), 12 deletions(-) create mode 100644 monasca_persister/conf/elasticsearch.py create mode 100644 monasca_persister/conf/kafka_events.py create mode 100644 monasca_persister/repositories/elasticsearch/__init__.py create mode 100644 monasca_persister/repositories/elasticsearch/events_repository.py create mode 100644 monasca_persister/tests/events.json create mode 100644 monasca_persister/tests/test_events.py diff --git a/monasca_persister/conf/elasticsearch.py b/monasca_persister/conf/elasticsearch.py new file mode 100644 index 00000000..7054aa83 --- /dev/null +++ b/monasca_persister/conf/elasticsearch.py @@ -0,0 +1,52 @@ +# Copyright 2017 FUJITSU LIMITED +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo_config import cfg + +from monasca_persister.conf import types + +elasticsearch_opts = [ + cfg.StrOpt('index_name', + help='Index name where events are stored', + default='monevents'), + cfg.ListOpt('hosts', + help='List of Elasticsearch nodes in format host[:port]', + default=['localhost:9200'], + item_type=types.HostAddressPortType()), + cfg.BoolOpt('sniff_on_start', + help='Flag indicating whether to obtain a list of nodes from the cluser at startup time', + default=False), + cfg.BoolOpt('sniff_on_connection_fail', + help='Flag controlling if connection failure triggers a sniff', + default=False), + cfg.IntOpt('sniffer_timeout', + help='Number of seconds between automatic sniffs', + default=None), + cfg.IntOpt('max_retries', + help='Maximum number of retries before an exception is propagated', + default=3, + min=1) +] + +elasticsearch_group = cfg.OptGroup(name='elasticsearch', title='elasticsearch') + + +def register_opts(conf): + conf.register_group(elasticsearch_group) + conf.register_opts(elasticsearch_opts, elasticsearch_group) + + +def list_opts(): + return elasticsearch_group, elasticsearch_opts diff --git a/monasca_persister/conf/kafka_events.py b/monasca_persister/conf/kafka_events.py new file mode 100644 index 00000000..cec049e4 --- /dev/null +++ b/monasca_persister/conf/kafka_events.py @@ -0,0 +1,54 @@ +# Copyright 2017 FUJITSU LIMITED +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from copy import deepcopy + +from oslo_config import cfg + +from monasca_persister.conf import kafka_common +from monasca_persister.conf import types + +kafka_events_group = cfg.OptGroup(name='kafka_events', + title='kafka_events') +kafka_events_opts = [ + cfg.ListOpt('uri', + help='Comma separated list of Kafka broker host:port', + default=['127.0.0.1:9092'], + item_type=types.HostAddressPortType()), + cfg.StrOpt('group_id', + help='Kafka Group from which persister get data', + default='1_events'), + cfg.StrOpt('topic', + help='Kafka Topic from which persister get data', + default='monevents'), + cfg.StrOpt('zookeeper_path', + help='Path in zookeeper for kafka consumer group partitioning algorithm', + default='/persister_partitions/$kafka_events.topic'), +] + +# Replace Default OPT with reference to kafka group option +kafka_common_opts = deepcopy(kafka_common.kafka_common_opts) +for opt in kafka_common_opts: + opt.default = '$kafka.{}'.format(opt.name) + + +def register_opts(conf): + conf.register_group(kafka_events_group) + conf.register_opts(kafka_events_opts + kafka_common_opts, + kafka_events_group) + + +def list_opts(): + return kafka_events_group, kafka_events_opts diff --git a/monasca_persister/conf/repositories.py b/monasca_persister/conf/repositories.py index eac796d3..4d92172b 100644 --- a/monasca_persister/conf/repositories.py +++ b/monasca_persister/conf/repositories.py @@ -22,7 +22,10 @@ repositories_opts = [ default='monasca_persister.repositories.influxdb.metrics_repository:MetricInfluxdbRepository'), cfg.StrOpt(name='alarm_state_history_driver', help='The repository driver to use for alarm state history', - default='monasca_persister.repositories.influxdb.metrics_repository:MetricInfluxdbRepository')] + default='monasca_persister.repositories.influxdb.metrics_repository:MetricInfluxdbRepository'), + cfg.StrOpt(name='events_driver', + help='The repository driver to use for events', + default='monasca_persister.repositories.elasticsearch.events_repository:ElasticSearchEventsRepository')] repositories_group = cfg.OptGroup(name='repositories', title='repositories') diff --git a/monasca_persister/persister.py b/monasca_persister/persister.py index 70729167..63448909 100644 --- a/monasca_persister/persister.py +++ b/monasca_persister/persister.py @@ -95,23 +95,24 @@ def start_process(respository, kafka_config): m_persister.run() +def prepare_processes(conf, repo_driver): + if conf.num_processors > 0: + repository = simport.load(repo_driver) + for proc in range(0, conf.num_processors): + processors.append(multiprocessing.Process( + target=start_process, args=(repository, conf))) + def main(): """Start persister.""" config.parse_args() - metric_repository = simport.load(cfg.CONF.repositories.metrics_driver) - alarm_state_history_repository = simport.load(cfg.CONF.repositories.alarm_state_history_driver) - # Add processors for metrics topic - for proc in range(0, cfg.CONF.kafka_metrics.num_processors): - processors.append(multiprocessing.Process( - target=start_process, args=(metric_repository, cfg.CONF.kafka_metrics))) - + prepare_processes(cfg.CONF.kafka_metrics, cfg.CONF.repositories.metrics_driver) # Add processors for alarm history topic - for proc in range(0, cfg.CONF.kafka_alarm_history.num_processors): - processors.append(multiprocessing.Process( - target=start_process, args=(alarm_state_history_repository, cfg.CONF.kafka_alarm_history))) + prepare_processes(cfg.CONF.kafka_alarm_history, cfg.CONF.repositories.alarm_state_history_driver) + # Add processors for events topic + prepare_processes(cfg.CONF.kafka_events, cfg.CONF.repositories.events_driver) # Start try: diff --git a/monasca_persister/repositories/elasticsearch/__init__.py b/monasca_persister/repositories/elasticsearch/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/monasca_persister/repositories/elasticsearch/events_repository.py b/monasca_persister/repositories/elasticsearch/events_repository.py new file mode 100644 index 00000000..6ea2c1a2 --- /dev/null +++ b/monasca_persister/repositories/elasticsearch/events_repository.py @@ -0,0 +1,74 @@ +# Copyright 2017 FUJITSU LIMITED +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import ujson + +from datetime import datetime +from elasticsearch import Elasticsearch +from oslo_config import cfg +from oslo_log import log + +from monasca_persister.repositories import abstract_repository +from monasca_persister.repositories import utils + +LOG = log.getLogger(__name__) + + +class ElasticSearchEventsRepository(abstract_repository.AbstractRepository): + def __init__(self): + super(ElasticSearchEventsRepository, self).__init__() + self.conf = cfg.CONF.elasticsearch + self.es = Elasticsearch( + hosts=self.conf.hosts, + sniff_on_start=self.conf.sniff_on_start, + sniff_on_connection_fail=self.conf.sniff_on_connection_fail, + sniffer_timeout=self.conf.sniffer_timeout, + max_retries=self.conf.max_retries + ) + + def process_message(self, message): + return utils.parse_events_message(message) + + def write_batch(self, data_points): + for data_point in data_points: + (tenant_id, timestamp, event_type, payload) = data_point + + index = '%s-%s-%s' % (self.conf.index_name, tenant_id, + ElasticSearchEventsRepository._normalize_timestamp(timestamp)) + + body = { + 'tenant_id': tenant_id, + 'timestamp': timestamp, + 'event_type': event_type, + 'payload': payload + } + + self.es.create( + index=index, + doc_type='event', + body=ujson.dumps(body) + ) + + @staticmethod + def _normalize_timestamp(timestamp): + d = None + if timestamp and len(timestamp) >= 10: + try: + d = datetime.strptime(timestamp[0:10], '%Y-%m-%d') + except ValueError as e: + LOG.warning("Unable to parse timestamp '%s' - %s" % (timestamp, str(e))) + if not d: + d = datetime.today() + return d.strftime('%Y-%m-%d') diff --git a/monasca_persister/repositories/utils.py b/monasca_persister/repositories/utils.py index 23459ce0..fdf15e5c 100644 --- a/monasca_persister/repositories/utils.py +++ b/monasca_persister/repositories/utils.py @@ -1,4 +1,5 @@ # (C) Copyright 2016-2017 Hewlett Packard Enterprise Development LP +# Copyright 2017 FUJITSU LIMITED # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -98,3 +99,14 @@ def parse_alarm_state_hist_message(message): return (alarm_id, metrics, new_state, old_state, link, lifecycle_state, state_change_reason, sub_alarms_json_snake_case, tenant_id, time_stamp) + + +def parse_events_message(message): + + decoded_message = json.loads(message.message.value) + event_type = decoded_message['event_type'] + timestamp = decoded_message['timestamp'] + payload = decoded_message['payload'] + tenant_id = payload['tenant_id'] + + return tenant_id, timestamp, event_type, payload diff --git a/monasca_persister/tests/events.json b/monasca_persister/tests/events.json new file mode 100644 index 00000000..edbf57f8 --- /dev/null +++ b/monasca_persister/tests/events.json @@ -0,0 +1,54 @@ +{ + "event_1": { + "event_type": "compute.instance.create.start", + "timestamp": "2017-06-01 09:15:11.494606", + "payload": { + "state_description": "", + "availability_zone": "nova", + "terminated_at": "", + "ephemeral_gb": 0, + "instance_type_id": 1, + "deleted_at": "", + "reservation_id": "r-74ndofdp", + "instance_id": "cb724671-cc36-49cd-9987-d08f2c8356b9", + "display_name": "fred", + "hostname": "fred", + "state": "building", + "progress": "", + "launched_at": "", + "metadata": { + }, + "node": null, + "ramdisk_id": "", + "access_ip_v6": null, + "disk_gb": 1, + "access_ip_v4": null, + "kernel_id": "", + "image_name": "cirros", + "host": null, + "user_id": "92e0ceb0f3d648ddabeae1bfde4071b2", + "image_ref_url": "http://d00-19-99-b3-7e-2e.st.est.fujitsu.com:9292/images/e08428a7-aa34-42bd-9e91-6fe15e0ed2ae", + "cell_name": "", + "root_gb": 1, + "tenant_id": "de98fbff448f4f278a56e9929db70b03", + "created_at": "2017-06-01 09:15:10+00:00", + "memory_mb": 512, + "instance_type": "m1.tiny", + "vcpus": 1, + "image_meta": { + "container_format": "bare", + "min_ram": "0", + "disk_format": "qcow2", + "architecture": "x86_64", + "min_disk": "1", + "base_image_ref": "e08428a7-aa34-42bd-9e91-6fe15e0ed2ae" + }, + "architecture": "x86_64", + "os_type": null, + "instance_flavor_id": "1" + }, + "priority": "INFO", + "publisher_id": "compute.d00-26-2d-0c-d5-64", + "message_id": "5e60c9f1-1cf9-4f5d-9826-91fa329a79c1" + } +} \ No newline at end of file diff --git a/monasca_persister/tests/test_events.py b/monasca_persister/tests/test_events.py new file mode 100644 index 00000000..85a6c344 --- /dev/null +++ b/monasca_persister/tests/test_events.py @@ -0,0 +1,64 @@ +# -*- coding: utf-8 -*- +# Copyright 2017 FUJITSU LIMITED +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import os +from oslotest import base +from monasca_persister.repositories.elasticsearch import events_repository +from monasca_persister.repositories import utils +from mock import Mock +from testtools import matchers +from datetime import datetime + + +class TestEvents(base.BaseTestCase): + + def __init__(self, *args, **kwds): + super(TestEvents, self).__init__(*args, **kwds) + self.events = None + + def setUp(self): + super(TestEvents, self).setUp() + + def tearDown(self): + super(TestEvents, self).tearDown() + + def test_parse_event(self): + event = self._load_event('event_1') + tenant_id, timestamp, event_type, payload = utils.parse_events_message(event) + self.assertEqual('de98fbff448f4f278a56e9929db70b03', tenant_id) + self.assertEqual('2017-06-01 09:15:11.494606', timestamp) + self.assertEqual('compute.instance.create.start', event_type) + self.assertIsNotNone(payload) + self.assertThat(len(payload), matchers.GreaterThan(0)) + + def test_normalize_timestamp(self): + today = datetime.today().strftime('%Y-%m-%d') + normalize_timestamp = events_repository.ElasticSearchEventsRepository._normalize_timestamp + + self.assertEqual(today, normalize_timestamp(None)) + self.assertEqual(today, normalize_timestamp('')) + self.assertEqual(today, normalize_timestamp('foo')) + self.assertEqual(today, normalize_timestamp('2017-02-3')) + self.assertEqual(today, normalize_timestamp('2017-02-31')) + + self.assertEqual('2017-08-07', normalize_timestamp('2017-08-07 11:22:43')) + + def _load_event(self, event_name): + if self.events is None: + filepath = os.path.join(os.path.dirname(__file__), 'events.json') + self.events = json.load(open(filepath)) + # create a kafka message envelope + value = json.dumps(self.events[event_name]) + return Mock(message=Mock(value=value)) diff --git a/monasca_persister/tests/test_persister_main.py b/monasca_persister/tests/test_persister_main.py index 478d47b9..6ca03249 100644 --- a/monasca_persister/tests/test_persister_main.py +++ b/monasca_persister/tests/test_persister_main.py @@ -22,6 +22,7 @@ CONF = cfg.CONF NUMBER_OF_METRICS_PROCESSES = 2 NUMBER_OF_ALARM_HIST_PROCESSES = 3 +NUMBER_OF_EVENTS_PROCESSES = 0 class FakeException(Exception): @@ -57,6 +58,8 @@ class TestPersister(base.BaseTestCase): self.mock_cfg.CONF.kafka_metrics.num_processors = NUMBER_OF_METRICS_PROCESSES self.mock_cfg.CONF.kafka_alarm_history.num_processors = NUMBER_OF_ALARM_HIST_PROCESSES + self.mock_cfg.CONF.kafka_events.num_processors = NUMBER_OF_EVENTS_PROCESSES + self.mock_cfg.CONF.zookeeper = 'zookeeper' self.mock_sleep.side_effect = [FakeException, None] diff --git a/setup.cfg b/setup.cfg index 9b818381..611645b8 100644 --- a/setup.cfg +++ b/setup.cfg @@ -35,6 +35,8 @@ influxdb = influxdb>=2.9.2 # MIT cassandra = cassandra-driver!=3.6.0,>=2.1.4 # Apache-2.0 +elasticsearch = + elasticsearch>=2.0.0,<=3.0.0 # Apache-2.0 [pbr] autodoc_index_modules = True diff --git a/tox.ini b/tox.ini index a61a5b5b..3bc58fe5 100644 --- a/tox.ini +++ b/tox.ini @@ -20,7 +20,7 @@ whitelist_externals = bash rm install_command = {toxinidir}/tools/tox_install.sh {env:UPPER_CONSTRAINTS_FILE:https://git.openstack.org/cgit/openstack/requirements/plain/upper-constraints.txt} {opts} {packages} -deps = .[influxdb,cassandra] +deps = .[influxdb,cassandra,elasticsearch] -r{toxinidir}/requirements.txt -r{toxinidir}/test-requirements.txt commands =