Add support for persisting events

This change extends the persister to read OpenStack
event objects from kafka topic and then stores
them into ElasticSearch.

Story: 2001112
Task: 4843

Change-Id: I35b0db67dc088e56ad281c84fc4b50fa7a064e44
This commit is contained in:
Jakub Wachowski 2017-08-07 14:45:09 +02:00 committed by Adrian Czarnecki
parent ca5d223611
commit 8273de7ca6
12 changed files with 331 additions and 12 deletions

View File

@ -0,0 +1,52 @@
# Copyright 2017 FUJITSU LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_config import cfg
from monasca_persister.conf import types
elasticsearch_opts = [
cfg.StrOpt('index_name',
help='Index name where events are stored',
default='monevents'),
cfg.ListOpt('hosts',
help='List of Elasticsearch nodes in format host[:port]',
default=['localhost:9200'],
item_type=types.HostAddressPortType()),
cfg.BoolOpt('sniff_on_start',
help='Flag indicating whether to obtain a list of nodes from the cluser at startup time',
default=False),
cfg.BoolOpt('sniff_on_connection_fail',
help='Flag controlling if connection failure triggers a sniff',
default=False),
cfg.IntOpt('sniffer_timeout',
help='Number of seconds between automatic sniffs',
default=None),
cfg.IntOpt('max_retries',
help='Maximum number of retries before an exception is propagated',
default=3,
min=1)
]
elasticsearch_group = cfg.OptGroup(name='elasticsearch', title='elasticsearch')
def register_opts(conf):
conf.register_group(elasticsearch_group)
conf.register_opts(elasticsearch_opts, elasticsearch_group)
def list_opts():
return elasticsearch_group, elasticsearch_opts

View File

@ -0,0 +1,54 @@
# Copyright 2017 FUJITSU LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from copy import deepcopy
from oslo_config import cfg
from monasca_persister.conf import kafka_common
from monasca_persister.conf import types
kafka_events_group = cfg.OptGroup(name='kafka_events',
title='kafka_events')
kafka_events_opts = [
cfg.ListOpt('uri',
help='Comma separated list of Kafka broker host:port',
default=['127.0.0.1:9092'],
item_type=types.HostAddressPortType()),
cfg.StrOpt('group_id',
help='Kafka Group from which persister get data',
default='1_events'),
cfg.StrOpt('topic',
help='Kafka Topic from which persister get data',
default='monevents'),
cfg.StrOpt('zookeeper_path',
help='Path in zookeeper for kafka consumer group partitioning algorithm',
default='/persister_partitions/$kafka_events.topic'),
]
# Replace Default OPT with reference to kafka group option
kafka_common_opts = deepcopy(kafka_common.kafka_common_opts)
for opt in kafka_common_opts:
opt.default = '$kafka.{}'.format(opt.name)
def register_opts(conf):
conf.register_group(kafka_events_group)
conf.register_opts(kafka_events_opts + kafka_common_opts,
kafka_events_group)
def list_opts():
return kafka_events_group, kafka_events_opts

View File

@ -22,7 +22,10 @@ repositories_opts = [
default='monasca_persister.repositories.influxdb.metrics_repository:MetricInfluxdbRepository'),
cfg.StrOpt(name='alarm_state_history_driver',
help='The repository driver to use for alarm state history',
default='monasca_persister.repositories.influxdb.metrics_repository:MetricInfluxdbRepository')]
default='monasca_persister.repositories.influxdb.metrics_repository:MetricInfluxdbRepository'),
cfg.StrOpt(name='events_driver',
help='The repository driver to use for events',
default='monasca_persister.repositories.elasticsearch.events_repository:ElasticSearchEventsRepository')]
repositories_group = cfg.OptGroup(name='repositories',
title='repositories')

View File

@ -95,23 +95,24 @@ def start_process(respository, kafka_config):
m_persister.run()
def prepare_processes(conf, repo_driver):
if conf.num_processors > 0:
repository = simport.load(repo_driver)
for proc in range(0, conf.num_processors):
processors.append(multiprocessing.Process(
target=start_process, args=(repository, conf)))
def main():
"""Start persister."""
config.parse_args()
metric_repository = simport.load(cfg.CONF.repositories.metrics_driver)
alarm_state_history_repository = simport.load(cfg.CONF.repositories.alarm_state_history_driver)
# Add processors for metrics topic
for proc in range(0, cfg.CONF.kafka_metrics.num_processors):
processors.append(multiprocessing.Process(
target=start_process, args=(metric_repository, cfg.CONF.kafka_metrics)))
prepare_processes(cfg.CONF.kafka_metrics, cfg.CONF.repositories.metrics_driver)
# Add processors for alarm history topic
for proc in range(0, cfg.CONF.kafka_alarm_history.num_processors):
processors.append(multiprocessing.Process(
target=start_process, args=(alarm_state_history_repository, cfg.CONF.kafka_alarm_history)))
prepare_processes(cfg.CONF.kafka_alarm_history, cfg.CONF.repositories.alarm_state_history_driver)
# Add processors for events topic
prepare_processes(cfg.CONF.kafka_events, cfg.CONF.repositories.events_driver)
# Start
try:

View File

@ -0,0 +1,74 @@
# Copyright 2017 FUJITSU LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import ujson
from datetime import datetime
from elasticsearch import Elasticsearch
from oslo_config import cfg
from oslo_log import log
from monasca_persister.repositories import abstract_repository
from monasca_persister.repositories import utils
LOG = log.getLogger(__name__)
class ElasticSearchEventsRepository(abstract_repository.AbstractRepository):
def __init__(self):
super(ElasticSearchEventsRepository, self).__init__()
self.conf = cfg.CONF.elasticsearch
self.es = Elasticsearch(
hosts=self.conf.hosts,
sniff_on_start=self.conf.sniff_on_start,
sniff_on_connection_fail=self.conf.sniff_on_connection_fail,
sniffer_timeout=self.conf.sniffer_timeout,
max_retries=self.conf.max_retries
)
def process_message(self, message):
return utils.parse_events_message(message)
def write_batch(self, data_points):
for data_point in data_points:
(tenant_id, timestamp, event_type, payload) = data_point
index = '%s-%s-%s' % (self.conf.index_name, tenant_id,
ElasticSearchEventsRepository._normalize_timestamp(timestamp))
body = {
'tenant_id': tenant_id,
'timestamp': timestamp,
'event_type': event_type,
'payload': payload
}
self.es.create(
index=index,
doc_type='event',
body=ujson.dumps(body)
)
@staticmethod
def _normalize_timestamp(timestamp):
d = None
if timestamp and len(timestamp) >= 10:
try:
d = datetime.strptime(timestamp[0:10], '%Y-%m-%d')
except ValueError as e:
LOG.warning("Unable to parse timestamp '%s' - %s" % (timestamp, str(e)))
if not d:
d = datetime.today()
return d.strftime('%Y-%m-%d')

View File

@ -1,4 +1,5 @@
# (C) Copyright 2016-2017 Hewlett Packard Enterprise Development LP
# Copyright 2017 FUJITSU LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -98,3 +99,14 @@ def parse_alarm_state_hist_message(message):
return (alarm_id, metrics, new_state, old_state, link,
lifecycle_state, state_change_reason,
sub_alarms_json_snake_case, tenant_id, time_stamp)
def parse_events_message(message):
decoded_message = json.loads(message.message.value)
event_type = decoded_message['event_type']
timestamp = decoded_message['timestamp']
payload = decoded_message['payload']
tenant_id = payload['tenant_id']
return tenant_id, timestamp, event_type, payload

View File

@ -0,0 +1,54 @@
{
"event_1": {
"event_type": "compute.instance.create.start",
"timestamp": "2017-06-01 09:15:11.494606",
"payload": {
"state_description": "",
"availability_zone": "nova",
"terminated_at": "",
"ephemeral_gb": 0,
"instance_type_id": 1,
"deleted_at": "",
"reservation_id": "r-74ndofdp",
"instance_id": "cb724671-cc36-49cd-9987-d08f2c8356b9",
"display_name": "fred",
"hostname": "fred",
"state": "building",
"progress": "",
"launched_at": "",
"metadata": {
},
"node": null,
"ramdisk_id": "",
"access_ip_v6": null,
"disk_gb": 1,
"access_ip_v4": null,
"kernel_id": "",
"image_name": "cirros",
"host": null,
"user_id": "92e0ceb0f3d648ddabeae1bfde4071b2",
"image_ref_url": "http://d00-19-99-b3-7e-2e.st.est.fujitsu.com:9292/images/e08428a7-aa34-42bd-9e91-6fe15e0ed2ae",
"cell_name": "",
"root_gb": 1,
"tenant_id": "de98fbff448f4f278a56e9929db70b03",
"created_at": "2017-06-01 09:15:10+00:00",
"memory_mb": 512,
"instance_type": "m1.tiny",
"vcpus": 1,
"image_meta": {
"container_format": "bare",
"min_ram": "0",
"disk_format": "qcow2",
"architecture": "x86_64",
"min_disk": "1",
"base_image_ref": "e08428a7-aa34-42bd-9e91-6fe15e0ed2ae"
},
"architecture": "x86_64",
"os_type": null,
"instance_flavor_id": "1"
},
"priority": "INFO",
"publisher_id": "compute.d00-26-2d-0c-d5-64",
"message_id": "5e60c9f1-1cf9-4f5d-9826-91fa329a79c1"
}
}

View File

@ -0,0 +1,64 @@
# -*- coding: utf-8 -*-
# Copyright 2017 FUJITSU LIMITED
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import os
from oslotest import base
from monasca_persister.repositories.elasticsearch import events_repository
from monasca_persister.repositories import utils
from mock import Mock
from testtools import matchers
from datetime import datetime
class TestEvents(base.BaseTestCase):
def __init__(self, *args, **kwds):
super(TestEvents, self).__init__(*args, **kwds)
self.events = None
def setUp(self):
super(TestEvents, self).setUp()
def tearDown(self):
super(TestEvents, self).tearDown()
def test_parse_event(self):
event = self._load_event('event_1')
tenant_id, timestamp, event_type, payload = utils.parse_events_message(event)
self.assertEqual('de98fbff448f4f278a56e9929db70b03', tenant_id)
self.assertEqual('2017-06-01 09:15:11.494606', timestamp)
self.assertEqual('compute.instance.create.start', event_type)
self.assertIsNotNone(payload)
self.assertThat(len(payload), matchers.GreaterThan(0))
def test_normalize_timestamp(self):
today = datetime.today().strftime('%Y-%m-%d')
normalize_timestamp = events_repository.ElasticSearchEventsRepository._normalize_timestamp
self.assertEqual(today, normalize_timestamp(None))
self.assertEqual(today, normalize_timestamp(''))
self.assertEqual(today, normalize_timestamp('foo'))
self.assertEqual(today, normalize_timestamp('2017-02-3'))
self.assertEqual(today, normalize_timestamp('2017-02-31'))
self.assertEqual('2017-08-07', normalize_timestamp('2017-08-07 11:22:43'))
def _load_event(self, event_name):
if self.events is None:
filepath = os.path.join(os.path.dirname(__file__), 'events.json')
self.events = json.load(open(filepath))
# create a kafka message envelope
value = json.dumps(self.events[event_name])
return Mock(message=Mock(value=value))

View File

@ -22,6 +22,7 @@ CONF = cfg.CONF
NUMBER_OF_METRICS_PROCESSES = 2
NUMBER_OF_ALARM_HIST_PROCESSES = 3
NUMBER_OF_EVENTS_PROCESSES = 0
class FakeException(Exception):
@ -57,6 +58,8 @@ class TestPersister(base.BaseTestCase):
self.mock_cfg.CONF.kafka_metrics.num_processors = NUMBER_OF_METRICS_PROCESSES
self.mock_cfg.CONF.kafka_alarm_history.num_processors = NUMBER_OF_ALARM_HIST_PROCESSES
self.mock_cfg.CONF.kafka_events.num_processors = NUMBER_OF_EVENTS_PROCESSES
self.mock_cfg.CONF.zookeeper = 'zookeeper'
self.mock_sleep.side_effect = [FakeException, None]

View File

@ -35,6 +35,8 @@ influxdb =
influxdb>=2.9.2 # MIT
cassandra =
cassandra-driver!=3.6.0,>=2.1.4 # Apache-2.0
elasticsearch =
elasticsearch>=2.0.0,<=3.0.0 # Apache-2.0
[pbr]
autodoc_index_modules = True

View File

@ -20,7 +20,7 @@ whitelist_externals = bash
rm
install_command =
{toxinidir}/tools/tox_install.sh {env:UPPER_CONSTRAINTS_FILE:https://git.openstack.org/cgit/openstack/requirements/plain/upper-constraints.txt} {opts} {packages}
deps = .[influxdb,cassandra]
deps = .[influxdb,cassandra,elasticsearch]
-r{toxinidir}/requirements.txt
-r{toxinidir}/test-requirements.txt
commands =