From 52b38a0dfb88b0f1b8d291d8c32559516889dfa0 Mon Sep 17 00:00:00 2001 From: Deklan Dieterly Date: Tue, 29 Dec 2015 09:55:26 -0700 Subject: [PATCH] Enable Cassandra Database Allow Cassandra Database to be used as backend database. 100% backward compatible with Influxdb. Should not require any install or config changes to use the default Influxdb. Change-Id: I4cfe50a99eea0f8ae7421dcefbdd2cb2cc008b3b --- monasca_persister/persister.conf | 11 + monasca_persister/persister.py | 515 +++++++++++++++++++++++-------- requirements.txt | 1 + 3 files changed, 399 insertions(+), 128 deletions(-) diff --git a/monasca_persister/persister.conf b/monasca_persister/persister.conf index 42da185c..c3813bd1 100644 --- a/monasca_persister/persister.conf +++ b/monasca_persister/persister.conf @@ -7,6 +7,12 @@ debug = false # Show more verbose log output (sets INFO log level output) if debug is False verbose = true +[database] +# Choose database type +# database_type := 'cassandra' | 'influxdb' +#database_type = cassandra +database_type = influxdb + [zookeeper] # Comma separated list of host:port uri = 192.168.10.4:2181 @@ -53,3 +59,8 @@ port = 8086 user = mon_persister password = password +# Uncomment and set cluster_ip_addresses if database_type is 'cassandra' +#[cassandra] +# Comma separated list of Cassandra node IP addresses. No spaces. +#cluster_ip_addresses: 10.10.10.3 +#keyspace: monasca diff --git a/monasca_persister/persister.py b/monasca_persister/persister.py index fd8614ce..bd61c919 100644 --- a/monasca_persister/persister.py +++ b/monasca_persister/persister.py @@ -27,13 +27,19 @@ """ import abc +import hashlib +import urllib from datetime import datetime import json import os + import six import sys import threading +from cassandra.cluster import Cluster +from cassandra.query import BatchStatement + from influxdb import InfluxDBClient import pytz @@ -45,11 +51,17 @@ import service from monasca_common.kafka.consumer import KafkaConsumer - LOG = log.getLogger(__name__) log.register_options(cfg.CONF) log.set_defaults() +database_opts = [cfg.StrOpt('database_type')] + +database_group = cfg.OptGroup(name='database') + +cfg.CONF.register_group(database_group) +cfg.CONF.register_opts(database_opts, database_group) + zookeeper_opts = [cfg.StrOpt('uri'), cfg.IntOpt('partition_interval_recheck_seconds')] @@ -91,22 +103,62 @@ influxdb_group = cfg.OptGroup(name='influxdb', title='influxdb') cfg.CONF.register_group(influxdb_group) cfg.CONF.register_opts(influxdb_opts, influxdb_group) +cassandra_opts = [cfg.StrOpt('cluster_ip_addresses'), + cfg.StrOpt('keyspace')] + +cassandra_group = cfg.OptGroup(name='cassandra') +cfg.CONF.register_group(cassandra_group) +cfg.CONF.register_opts(cassandra_opts, cassandra_group) + cfg.CONF(sys.argv[1:], project='monasca', prog='persister') log.setup(cfg.CONF, "monasca-persister") + def main(): """Start persister. Start metric persister and alarm persister in separate threads. """ - metric_persister = MetricPersister(cfg.CONF.kafka_metrics, - cfg.CONF.influxdb, - cfg.CONF.zookeeper) + database_type = cfg.CONF.database.database_type - alarm_persister = AlarmPersister(cfg.CONF.kafka_alarm_history, - cfg.CONF.influxdb, - cfg.CONF.zookeeper) + if database_type is None: + LOG.warn("Database type is not configured.") + LOG.warn("Using influxdb for default database type.") + LOG.warn("Please configure a database type using the 'database_type' " + "property in the config file.") + + # Allow None for database_type for backwards compatibility. + if database_type is None or database_type.lower() == 'influxdb': + + metric_persister = MetricInfluxdbPersister(cfg.CONF.kafka_metrics, + cfg.CONF.influxdb, + cfg.CONF.zookeeper) + + alarm_persister = AlarmStateHistInfluxdbPersister( + cfg.CONF.kafka_alarm_history, + cfg.CONF.influxdb, + cfg.CONF.zookeeper) + + elif database_type.lower() == 'cassandra': + + metric_persister = MetricCassandraPersister( + cfg.CONF.kafka_metrics, + cfg.CONF.cassandra, + cfg.CONF.zookeeper) + + alarm_persister = AlarmStateHistCassandraPersister( + cfg.CONF.kafka_alarm_history, + cfg.CONF.cassandra, + cfg.CONF.zookeeper) + + else: + + LOG.error("Unknown database type [{}] is not implemented".format( + database_type)) + LOG.error("Known database types are [influxdb] and [cassandra]") + LOG.error("Please configure a known database type in the config file.") + os._exit(1) metric_persister.start() alarm_persister.start() @@ -148,7 +200,7 @@ class Persister(os_service.Service): """ def __init__(self, threads=1): - super(Persister, self).__init__(threads) + super(Persister, self).__init__(threads) def start(self): @@ -164,7 +216,8 @@ class Persister(os_service.Service): @six.add_metaclass(abc.ABCMeta) class AbstractPersister(threading.Thread): - def __init__(self, kafka_conf, influxdb_conf, zookeeper_conf): + + def __init__(self, kafka_conf, db_conf, zookeeper_conf): super(AbstractPersister, self).__init__() @@ -174,39 +227,44 @@ class AbstractPersister(threading.Thread): self._database_batch_size = kafka_conf.database_batch_size - self._consumer = KafkaConsumer(kafka_conf.uri, - zookeeper_conf.uri, - kafka_conf.zookeeper_path, - kafka_conf.group_id, - kafka_conf.topic, - repartition_callback=self._flush, - commit_callback=self._flush, - commit_timeout=kafka_conf.max_wait_time_seconds) + self._consumer = KafkaConsumer( + kafka_conf.uri, + zookeeper_conf.uri, + kafka_conf.zookeeper_path, + kafka_conf.group_id, + kafka_conf.topic, + repartition_callback=self._flush, + commit_callback=self._flush, + commit_timeout=kafka_conf.max_wait_time_seconds) - self._influxdb_client = InfluxDBClient(influxdb_conf.ip_address, - influxdb_conf.port, - influxdb_conf.user, - influxdb_conf.password, - influxdb_conf.database_name) + self.init_db(db_conf) + + @abc.abstractmethod + def init_db(self, db_conf): + pass @abc.abstractmethod def process_message(self, message): pass + @abc.abstractmethod + def execute_batch(self, data_points): + pass + def _flush(self): if not self._data_points: return try: - self._influxdb_client.write_points(self._data_points, 'ms') + self.execute_batch(self._data_points) LOG.info("Processed {} messages from topic '{}'".format( - len(self._data_points), self._kafka_topic)) + len(self._data_points), self._kafka_topic)) self._data_points = [] self._consumer.commit() except Exception: - LOG.exception("Error writing to influxdb: {}" + LOG.exception("Error writing to database: {}" .format(self._data_points)) raise @@ -225,96 +283,222 @@ class AbstractPersister(threading.Thread): self._flush() except: LOG.exception( - 'Persister encountered fatal exception processing messages. ' - 'Shutting down all threads and exiting') + 'Persister encountered fatal exception processing ' + 'messages. ' + 'Shutting down all threads and exiting') shutdown_all_threads_and_die() -class AlarmPersister(AbstractPersister): - """Class for persisting alarms. - """ +@six.add_metaclass(abc.ABCMeta) +class AbstractCassandraPersister(AbstractPersister): - def __init__(self, kafka_conf, influxdb_conf, zookeeper_conf): + def __init__(self, kafka_conf, cassandra_db_conf, zookeeper_conf): - super(AlarmPersister, self).__init__(kafka_conf, - influxdb_conf, - zookeeper_conf) + super(AbstractCassandraPersister, self).__init__( + kafka_conf, cassandra_db_conf, zookeeper_conf) + + def init_db(self, cassandra_db_conf): + + self._cassandra_cluster = Cluster( + cassandra_db_conf.cluster_ip_addresses.split(',')) + + self.cassandra_session = self._cassandra_cluster.connect( + cassandra_db_conf.keyspace) + + self._batch_stmt = BatchStatement() + +class MetricMeasurementInfo(object): + + def __init__(self, tenant_id, region, metric_hash, metric_set, + measurement): + + self.tenant_id = tenant_id + self.region = region + self.metric_hash = metric_hash + self.metric_set = metric_set + self.measurement = measurement + + +class MetricCassandraPersister(AbstractCassandraPersister): + + def __init__(self, kafka_conf, cassandra_db_conf, zookeeper_conf): + + super(MetricCassandraPersister, self).__init__( + kafka_conf, + cassandra_db_conf, + zookeeper_conf) + + self._insert_measurement_stmt = self.cassandra_session.prepare( + 'insert into measurements (tenant_id,' + 'region, metric_hash, time_stamp, value,' + 'value_meta) values (?, ?, ?, ?, ?, ?)') + + self._insert_metric_map_stmt = self.cassandra_session.prepare( + 'insert into metric_map (tenant_id,' + 'region, metric_hash, ' + 'metric_set) values' + '(?,?,?,?)') def process_message(self, message): - LOG.debug(message.message.value.decode('utf8')) + (dimensions, metric_name, region, tenant_id, time_stamp, value, + value_meta) = parse_measurement_message(message) - decoded = json.loads(message.message.value) - LOG.debug(json.dumps(decoded, sort_keys=True, indent=4)) + metric_hash, metric_set = create_metric_hash(metric_name, + dimensions) - alarm_transitioned = decoded['alarm-transitioned'] + measurement = (tenant_id.encode('utf8'), + region.encode('utf8'), + metric_hash, + time_stamp, + value, + json.dumps(value_meta, ensure_ascii=False).encode( + 'utf8')) - actions_enabled = alarm_transitioned['actionsEnabled'] - LOG.debug('actions enabled: %s', actions_enabled) + LOG.debug(measurement) - alarm_description = alarm_transitioned['alarmDescription'] - LOG.debug('alarm description: %s', alarm_description) + return MetricMeasurementInfo( + tenant_id.encode('utf8'), + region.encode('utf8'), + metric_hash, + metric_set, + measurement) - alarm_id = alarm_transitioned['alarmId'] - LOG.debug('alarm id: %s', alarm_id) + def execute_batch(self, metric_measurement_infos): - alarm_definition_id = alarm_transitioned[ - 'alarmDefinitionId'] - LOG.debug('alarm definition id: %s', alarm_definition_id) + for metric_measurement_info in metric_measurement_infos: - metrics = alarm_transitioned['metrics'] - LOG.debug('metrics: %s', metrics) + self._batch_stmt.add(self._insert_measurement_stmt, + metric_measurement_info.measurement) - alarm_name = alarm_transitioned['alarmName'] - LOG.debug('alarm name: %s', alarm_name) + metric_map = (metric_measurement_info.tenant_id, + metric_measurement_info.region, + metric_measurement_info.metric_hash, + metric_measurement_info.metric_set) - new_state = alarm_transitioned['newState'] - LOG.debug('new state: %s', new_state) + self._batch_stmt.add(self._insert_metric_map_stmt, + metric_map) - old_state = alarm_transitioned['oldState'] - LOG.debug('old state: %s', old_state) + self.cassandra_session.execute(self._batch_stmt) - state_change_reason = alarm_transitioned[ - 'stateChangeReason'] - LOG.debug('state change reason: %s', state_change_reason) + self._batch_stmt = BatchStatement() - tenant_id = alarm_transitioned['tenantId'] - LOG.debug('tenant id: %s', tenant_id) +def create_metric_hash(metric_name, dimensions): - time_stamp = alarm_transitioned['timestamp'] - LOG.debug('time stamp: %s', time_stamp) + metric_name_part = '__name__' + '=' + urllib.quote_plus(metric_name) - sub_alarms = alarm_transitioned['subAlarms'] + hash_string = metric_name_part - if sub_alarms: + metric_set = set() - sub_alarms_json = json.dumps(sub_alarms, ensure_ascii=False) + metric_set.add(metric_name_part) - sub_alarms_json_snake_case = sub_alarms_json.replace( - '"subAlarmExpression":', - '"sub_alarm_expression":') + for dim_name in sorted(dimensions.iterkeys()): + dimension = (urllib.quote_plus(dim_name) + '=' + urllib.quote_plus( + dimensions[dim_name])) + metric_set.add(dimension) + hash_string += dimension - sub_alarms_json_snake_case = sub_alarms_json_snake_case.replace( - '"metricDefinition":', - '"metric_definition":') + sha1_hash = hashlib.sha1(hash_string).hexdigest() - sub_alarms_json_snake_case = sub_alarms_json_snake_case.replace( - '"subAlarmState":', - '"sub_alarm_state":') + return bytearray.fromhex(sha1_hash), metric_set - else: - sub_alarms_json_snake_case = "[]" +class AlarmStateHistCassandraPersister(AbstractCassandraPersister): + + def __init__(self, kafka_conf, cassandra_db_conf, zookeeper_conf): + + super(AlarmStateHistCassandraPersister, self).__init__( + kafka_conf, + cassandra_db_conf, + zookeeper_conf) + + self._insert_alarm_state_hist_stmt = self.cassandra_session.prepare( + 'insert into alarm_state_history (tenant_id, alarm_id, ' + 'metrics, new_state, ' + 'old_state, reason, reason_data, ' + 'sub_alarms, time_stamp) values (?,?,?,?,?,?,?,?,?)') + + def process_message(self, message): + + (alarm_id, metrics, new_state, old_state, state_change_reason, + sub_alarms_json_snake_case, tenant_id, + time_stamp) = parse_alarm_state_hist_message( + message) + + alarm_state_hist = ( + tenant_id.encode('utf8'), + alarm_id.encode('utf8'), + json.dumps(metrics, ensure_ascii=False).encode( + 'utf8'), + new_state.encode('utf8'), + old_state.encode('utf8'), + state_change_reason.encode('utf8'), + "{}".encode('utf8'), + sub_alarms_json_snake_case.encode('utf8'), + time_stamp + ) + + LOG.debug(alarm_state_hist) + + return alarm_state_hist + + def execute_batch(self, alarm_state_hists): + + for alarm_state_hist in alarm_state_hists: + self._batch_stmt.add(self._insert_alarm_state_hist_stmt, + alarm_state_hist) + + self.cassandra_session.execute(self._batch_stmt) + + self._batch_stmt = BatchStatement() + + +@six.add_metaclass(abc.ABCMeta) +class AbstractInfluxdbPersister(AbstractPersister): + + def __init__(self, kafka_conf, influxdb_db_conf, zookeeper_conf): + + super(AbstractInfluxdbPersister, self).__init__( + kafka_conf, influxdb_db_conf, zookeeper_conf) + + def init_db(self, influxdb_db_conf): + + self._influxdb_client = InfluxDBClient(influxdb_db_conf.ip_address, + influxdb_db_conf.port, + influxdb_db_conf.user, + influxdb_db_conf.password, + influxdb_db_conf.database_name) + + def execute_batch(self, data_points): + + self._influxdb_client.write_points(data_points, 'ms') + + +class AlarmStateHistInfluxdbPersister(AbstractInfluxdbPersister): + + def __init__(self, kafka_conf, influxdb_db_conf, zookeeper_conf): + + super(AlarmStateHistInfluxdbPersister, self).__init__( + kafka_conf, influxdb_db_conf, zookeeper_conf) + + def process_message(self, message): + + (alarm_id, metrics, new_state, old_state, state_change_reason, + sub_alarms_json_snake_case, tenant_id, + time_stamp) = parse_alarm_state_hist_message( + message) ts = time_stamp / 1000.0 data = {"measurement": 'alarm_state_history', "time": datetime.fromtimestamp(ts, tz=pytz.utc).strftime( - '%Y-%m-%dT%H:%M:%S.%fZ'), + '%Y-%m-%dT%H:%M:%S.%fZ'), "fields": { "tenant_id": tenant_id.encode('utf8'), "alarm_id": alarm_id.encode('utf8'), - "metrics": json.dumps(metrics, ensure_ascii=False).encode('utf8'), + "metrics": json.dumps(metrics, ensure_ascii=False).encode( + 'utf8'), "new_state": new_state.encode('utf8'), "old_state": old_state.encode('utf8'), "reason": state_change_reason.encode('utf8'), @@ -330,60 +514,18 @@ class AlarmPersister(AbstractPersister): return data -class MetricPersister(AbstractPersister): - """Class for persisting metrics. - """ +class MetricInfluxdbPersister(AbstractInfluxdbPersister): - def __init__(self, kafka_conf, influxdb_conf, zookeeper_conf): + def __init__(self, kafka_conf, influxdb_db_conf, zookeeper_conf): - super(MetricPersister, self).__init__(kafka_conf, - influxdb_conf, - zookeeper_conf) + super(MetricInfluxdbPersister, self).__init__(kafka_conf, + influxdb_db_conf, + zookeeper_conf) def process_message(self, message): - LOG.debug(message.message.value.decode('utf8')) - - decoded = json.loads(message.message.value) - LOG.debug(json.dumps(decoded, sort_keys=True, indent=4)) - - metric = decoded['metric'] - - metric_name = metric['name'] - LOG.debug('name: %s', metric_name) - - creation_time = decoded['creation_time'] - LOG.debug('creation time: %s', creation_time) - - region = decoded['meta']['region'] - LOG.debug('region: %s', region) - - tenant_id = decoded['meta']['tenantId'] - LOG.debug('tenant id: %s', tenant_id) - - dimensions = {} - if 'dimensions' in metric: - for dimension_name in metric['dimensions']: - dimensions[dimension_name.encode('utf8')] = ( - metric['dimensions'][dimension_name].encode('utf8')) - LOG.debug('dimension: %s : %s', dimension_name, - dimensions[dimension_name]) - - time_stamp = metric['timestamp'] - LOG.debug('timestamp %s', time_stamp) - - value = float(metric['value']) - LOG.debug('value: %s', value) - - if 'value_meta' in metric and metric['value_meta']: - - value_meta = metric['value_meta'] - - else: - - value_meta = {} - - LOG.debug('value_meta: %s', value_meta) + (dimensions, metric_name, region, tenant_id, time_stamp, value, + value_meta) = parse_measurement_message(message) tags = dimensions tags['_tenant_id'] = tenant_id.encode('utf8') @@ -393,7 +535,7 @@ class MetricPersister(AbstractPersister): data = {"measurement": metric_name.encode('utf8'), "time": datetime.fromtimestamp(ts, tz=pytz.utc).strftime( - '%Y-%m-%dT%H:%M:%S.%fZ'), + '%Y-%m-%dT%H:%M:%S.%fZ'), "fields": { "value": value, "value_meta": json.dumps(value_meta, @@ -406,6 +548,122 @@ class MetricPersister(AbstractPersister): return data +def parse_measurement_message(message): + + LOG.debug(message.message.value.decode('utf8')) + + decoded_message = json.loads(message.message.value) + LOG.debug(json.dumps(decoded_message, sort_keys=True, indent=4)) + + metric = decoded_message['metric'] + + metric_name = metric['name'] + LOG.debug('name: %s', metric_name) + + creation_time = decoded_message['creation_time'] + LOG.debug('creation time: %s', creation_time) + + region = decoded_message['meta']['region'] + LOG.debug('region: %s', region) + + tenant_id = decoded_message['meta']['tenantId'] + LOG.debug('tenant id: %s', tenant_id) + + dimensions = {} + if 'dimensions' in metric: + for dimension_name in metric['dimensions']: + dimensions[dimension_name.encode('utf8')] = ( + metric['dimensions'][dimension_name].encode('utf8')) + LOG.debug('dimension: %s : %s', dimension_name, + dimensions[dimension_name]) + + time_stamp = metric['timestamp'] + LOG.debug('timestamp %s', time_stamp) + + value = float(metric['value']) + LOG.debug('value: %s', value) + + if 'value_meta' in metric and metric['value_meta']: + + value_meta = metric['value_meta'] + + else: + + value_meta = {} + LOG.debug('value_meta: %s', value_meta) + + return (dimensions, metric_name, region, tenant_id, time_stamp, value, + value_meta) + + +def parse_alarm_state_hist_message(message): + + LOG.debug(message.message.value.decode('utf8')) + + decoded_message = json.loads(message.message.value) + LOG.debug(json.dumps(decoded_message, sort_keys=True, indent=4)) + + alarm_transitioned = decoded_message['alarm-transitioned'] + + actions_enabled = alarm_transitioned['actionsEnabled'] + LOG.debug('actions enabled: %s', actions_enabled) + + alarm_description = alarm_transitioned['alarmDescription'] + LOG.debug('alarm description: %s', alarm_description) + + alarm_id = alarm_transitioned['alarmId'] + LOG.debug('alarm id: %s', alarm_id) + + alarm_definition_id = alarm_transitioned[ + 'alarmDefinitionId'] + LOG.debug('alarm definition id: %s', alarm_definition_id) + + metrics = alarm_transitioned['metrics'] + LOG.debug('metrics: %s', metrics) + + alarm_name = alarm_transitioned['alarmName'] + LOG.debug('alarm name: %s', alarm_name) + + new_state = alarm_transitioned['newState'] + LOG.debug('new state: %s', new_state) + + old_state = alarm_transitioned['oldState'] + LOG.debug('old state: %s', old_state) + + state_change_reason = alarm_transitioned[ + 'stateChangeReason'] + LOG.debug('state change reason: %s', state_change_reason) + + tenant_id = alarm_transitioned['tenantId'] + LOG.debug('tenant id: %s', tenant_id) + + time_stamp = alarm_transitioned['timestamp'] + LOG.debug('time stamp: %s', time_stamp) + + sub_alarms = alarm_transitioned['subAlarms'] + if sub_alarms: + + sub_alarms_json = json.dumps(sub_alarms, ensure_ascii=False) + + sub_alarms_json_snake_case = sub_alarms_json.replace( + '"subAlarmExpression":', + '"sub_alarm_expression":') + + sub_alarms_json_snake_case = sub_alarms_json_snake_case.replace( + '"metricDefinition":', + '"metric_definition":') + + sub_alarms_json_snake_case = sub_alarms_json_snake_case.replace( + '"subAlarmState":', + '"sub_alarm_state":') + + else: + + sub_alarms_json_snake_case = "[]" + + return (alarm_id, metrics, new_state, old_state, state_change_reason, + sub_alarms_json_snake_case, tenant_id, time_stamp) + def main_service(): """Method to use with Openstack service. """ @@ -415,6 +673,7 @@ def main_service(): launcher.launch_service(Persister()) launcher.wait() + # Used if run without Openstack service. if __name__ == "__main__": sys.exit(main()) diff --git a/requirements.txt b/requirements.txt index 50b1ac4b..42030066 100644 --- a/requirements.txt +++ b/requirements.txt @@ -10,5 +10,6 @@ six==1.9.0 babel eventlet influxdb==2.8.0 +cassandra-driver==3.0.0 iso8601 monasca-common