Enable Cassandra Database

Allow Cassandra Database to be used as backend database.

100% backward compatible with Influxdb.

Should not require any install or config changes to
use the default Influxdb.

Change-Id: I4cfe50a99eea0f8ae7421dcefbdd2cb2cc008b3b
This commit is contained in:
Deklan Dieterly 2015-12-29 09:55:26 -07:00
parent 852b42f3c7
commit 52b38a0dfb
3 changed files with 399 additions and 128 deletions

View File

@ -7,6 +7,12 @@ debug = false
# Show more verbose log output (sets INFO log level output) if debug is False # Show more verbose log output (sets INFO log level output) if debug is False
verbose = true verbose = true
[database]
# Choose database type
# database_type := 'cassandra' | 'influxdb'
#database_type = cassandra
database_type = influxdb
[zookeeper] [zookeeper]
# Comma separated list of host:port # Comma separated list of host:port
uri = 192.168.10.4:2181 uri = 192.168.10.4:2181
@ -53,3 +59,8 @@ port = 8086
user = mon_persister user = mon_persister
password = password password = password
# Uncomment and set cluster_ip_addresses if database_type is 'cassandra'
#[cassandra]
# Comma separated list of Cassandra node IP addresses. No spaces.
#cluster_ip_addresses: 10.10.10.3
#keyspace: monasca

View File

@ -27,13 +27,19 @@
""" """
import abc import abc
import hashlib
import urllib
from datetime import datetime from datetime import datetime
import json import json
import os import os
import six import six
import sys import sys
import threading import threading
from cassandra.cluster import Cluster
from cassandra.query import BatchStatement
from influxdb import InfluxDBClient from influxdb import InfluxDBClient
import pytz import pytz
@ -45,11 +51,17 @@ import service
from monasca_common.kafka.consumer import KafkaConsumer from monasca_common.kafka.consumer import KafkaConsumer
LOG = log.getLogger(__name__) LOG = log.getLogger(__name__)
log.register_options(cfg.CONF) log.register_options(cfg.CONF)
log.set_defaults() log.set_defaults()
database_opts = [cfg.StrOpt('database_type')]
database_group = cfg.OptGroup(name='database')
cfg.CONF.register_group(database_group)
cfg.CONF.register_opts(database_opts, database_group)
zookeeper_opts = [cfg.StrOpt('uri'), zookeeper_opts = [cfg.StrOpt('uri'),
cfg.IntOpt('partition_interval_recheck_seconds')] cfg.IntOpt('partition_interval_recheck_seconds')]
@ -91,22 +103,62 @@ influxdb_group = cfg.OptGroup(name='influxdb', title='influxdb')
cfg.CONF.register_group(influxdb_group) cfg.CONF.register_group(influxdb_group)
cfg.CONF.register_opts(influxdb_opts, influxdb_group) cfg.CONF.register_opts(influxdb_opts, influxdb_group)
cassandra_opts = [cfg.StrOpt('cluster_ip_addresses'),
cfg.StrOpt('keyspace')]
cassandra_group = cfg.OptGroup(name='cassandra')
cfg.CONF.register_group(cassandra_group)
cfg.CONF.register_opts(cassandra_opts, cassandra_group)
cfg.CONF(sys.argv[1:], project='monasca', prog='persister') cfg.CONF(sys.argv[1:], project='monasca', prog='persister')
log.setup(cfg.CONF, "monasca-persister") log.setup(cfg.CONF, "monasca-persister")
def main(): def main():
"""Start persister. """Start persister.
Start metric persister and alarm persister in separate threads. Start metric persister and alarm persister in separate threads.
""" """
metric_persister = MetricPersister(cfg.CONF.kafka_metrics, database_type = cfg.CONF.database.database_type
cfg.CONF.influxdb,
cfg.CONF.zookeeper)
alarm_persister = AlarmPersister(cfg.CONF.kafka_alarm_history, if database_type is None:
cfg.CONF.influxdb, LOG.warn("Database type is not configured.")
cfg.CONF.zookeeper) LOG.warn("Using influxdb for default database type.")
LOG.warn("Please configure a database type using the 'database_type' "
"property in the config file.")
# Allow None for database_type for backwards compatibility.
if database_type is None or database_type.lower() == 'influxdb':
metric_persister = MetricInfluxdbPersister(cfg.CONF.kafka_metrics,
cfg.CONF.influxdb,
cfg.CONF.zookeeper)
alarm_persister = AlarmStateHistInfluxdbPersister(
cfg.CONF.kafka_alarm_history,
cfg.CONF.influxdb,
cfg.CONF.zookeeper)
elif database_type.lower() == 'cassandra':
metric_persister = MetricCassandraPersister(
cfg.CONF.kafka_metrics,
cfg.CONF.cassandra,
cfg.CONF.zookeeper)
alarm_persister = AlarmStateHistCassandraPersister(
cfg.CONF.kafka_alarm_history,
cfg.CONF.cassandra,
cfg.CONF.zookeeper)
else:
LOG.error("Unknown database type [{}] is not implemented".format(
database_type))
LOG.error("Known database types are [influxdb] and [cassandra]")
LOG.error("Please configure a known database type in the config file.")
os._exit(1)
metric_persister.start() metric_persister.start()
alarm_persister.start() alarm_persister.start()
@ -148,7 +200,7 @@ class Persister(os_service.Service):
""" """
def __init__(self, threads=1): def __init__(self, threads=1):
super(Persister, self).__init__(threads) super(Persister, self).__init__(threads)
def start(self): def start(self):
@ -164,7 +216,8 @@ class Persister(os_service.Service):
@six.add_metaclass(abc.ABCMeta) @six.add_metaclass(abc.ABCMeta)
class AbstractPersister(threading.Thread): class AbstractPersister(threading.Thread):
def __init__(self, kafka_conf, influxdb_conf, zookeeper_conf):
def __init__(self, kafka_conf, db_conf, zookeeper_conf):
super(AbstractPersister, self).__init__() super(AbstractPersister, self).__init__()
@ -174,39 +227,44 @@ class AbstractPersister(threading.Thread):
self._database_batch_size = kafka_conf.database_batch_size self._database_batch_size = kafka_conf.database_batch_size
self._consumer = KafkaConsumer(kafka_conf.uri, self._consumer = KafkaConsumer(
zookeeper_conf.uri, kafka_conf.uri,
kafka_conf.zookeeper_path, zookeeper_conf.uri,
kafka_conf.group_id, kafka_conf.zookeeper_path,
kafka_conf.topic, kafka_conf.group_id,
repartition_callback=self._flush, kafka_conf.topic,
commit_callback=self._flush, repartition_callback=self._flush,
commit_timeout=kafka_conf.max_wait_time_seconds) commit_callback=self._flush,
commit_timeout=kafka_conf.max_wait_time_seconds)
self._influxdb_client = InfluxDBClient(influxdb_conf.ip_address, self.init_db(db_conf)
influxdb_conf.port,
influxdb_conf.user, @abc.abstractmethod
influxdb_conf.password, def init_db(self, db_conf):
influxdb_conf.database_name) pass
@abc.abstractmethod @abc.abstractmethod
def process_message(self, message): def process_message(self, message):
pass pass
@abc.abstractmethod
def execute_batch(self, data_points):
pass
def _flush(self): def _flush(self):
if not self._data_points: if not self._data_points:
return return
try: try:
self._influxdb_client.write_points(self._data_points, 'ms') self.execute_batch(self._data_points)
LOG.info("Processed {} messages from topic '{}'".format( LOG.info("Processed {} messages from topic '{}'".format(
len(self._data_points), self._kafka_topic)) len(self._data_points), self._kafka_topic))
self._data_points = [] self._data_points = []
self._consumer.commit() self._consumer.commit()
except Exception: except Exception:
LOG.exception("Error writing to influxdb: {}" LOG.exception("Error writing to database: {}"
.format(self._data_points)) .format(self._data_points))
raise raise
@ -225,96 +283,222 @@ class AbstractPersister(threading.Thread):
self._flush() self._flush()
except: except:
LOG.exception( LOG.exception(
'Persister encountered fatal exception processing messages. ' 'Persister encountered fatal exception processing '
'Shutting down all threads and exiting') 'messages. '
'Shutting down all threads and exiting')
shutdown_all_threads_and_die() shutdown_all_threads_and_die()
class AlarmPersister(AbstractPersister): @six.add_metaclass(abc.ABCMeta)
"""Class for persisting alarms. class AbstractCassandraPersister(AbstractPersister):
"""
def __init__(self, kafka_conf, influxdb_conf, zookeeper_conf): def __init__(self, kafka_conf, cassandra_db_conf, zookeeper_conf):
super(AlarmPersister, self).__init__(kafka_conf, super(AbstractCassandraPersister, self).__init__(
influxdb_conf, kafka_conf, cassandra_db_conf, zookeeper_conf)
zookeeper_conf)
def init_db(self, cassandra_db_conf):
self._cassandra_cluster = Cluster(
cassandra_db_conf.cluster_ip_addresses.split(','))
self.cassandra_session = self._cassandra_cluster.connect(
cassandra_db_conf.keyspace)
self._batch_stmt = BatchStatement()
class MetricMeasurementInfo(object):
def __init__(self, tenant_id, region, metric_hash, metric_set,
measurement):
self.tenant_id = tenant_id
self.region = region
self.metric_hash = metric_hash
self.metric_set = metric_set
self.measurement = measurement
class MetricCassandraPersister(AbstractCassandraPersister):
def __init__(self, kafka_conf, cassandra_db_conf, zookeeper_conf):
super(MetricCassandraPersister, self).__init__(
kafka_conf,
cassandra_db_conf,
zookeeper_conf)
self._insert_measurement_stmt = self.cassandra_session.prepare(
'insert into measurements (tenant_id,'
'region, metric_hash, time_stamp, value,'
'value_meta) values (?, ?, ?, ?, ?, ?)')
self._insert_metric_map_stmt = self.cassandra_session.prepare(
'insert into metric_map (tenant_id,'
'region, metric_hash, '
'metric_set) values'
'(?,?,?,?)')
def process_message(self, message): def process_message(self, message):
LOG.debug(message.message.value.decode('utf8')) (dimensions, metric_name, region, tenant_id, time_stamp, value,
value_meta) = parse_measurement_message(message)
decoded = json.loads(message.message.value) metric_hash, metric_set = create_metric_hash(metric_name,
LOG.debug(json.dumps(decoded, sort_keys=True, indent=4)) dimensions)
alarm_transitioned = decoded['alarm-transitioned'] measurement = (tenant_id.encode('utf8'),
region.encode('utf8'),
metric_hash,
time_stamp,
value,
json.dumps(value_meta, ensure_ascii=False).encode(
'utf8'))
actions_enabled = alarm_transitioned['actionsEnabled'] LOG.debug(measurement)
LOG.debug('actions enabled: %s', actions_enabled)
alarm_description = alarm_transitioned['alarmDescription'] return MetricMeasurementInfo(
LOG.debug('alarm description: %s', alarm_description) tenant_id.encode('utf8'),
region.encode('utf8'),
metric_hash,
metric_set,
measurement)
alarm_id = alarm_transitioned['alarmId'] def execute_batch(self, metric_measurement_infos):
LOG.debug('alarm id: %s', alarm_id)
alarm_definition_id = alarm_transitioned[ for metric_measurement_info in metric_measurement_infos:
'alarmDefinitionId']
LOG.debug('alarm definition id: %s', alarm_definition_id)
metrics = alarm_transitioned['metrics'] self._batch_stmt.add(self._insert_measurement_stmt,
LOG.debug('metrics: %s', metrics) metric_measurement_info.measurement)
alarm_name = alarm_transitioned['alarmName'] metric_map = (metric_measurement_info.tenant_id,
LOG.debug('alarm name: %s', alarm_name) metric_measurement_info.region,
metric_measurement_info.metric_hash,
metric_measurement_info.metric_set)
new_state = alarm_transitioned['newState'] self._batch_stmt.add(self._insert_metric_map_stmt,
LOG.debug('new state: %s', new_state) metric_map)
old_state = alarm_transitioned['oldState'] self.cassandra_session.execute(self._batch_stmt)
LOG.debug('old state: %s', old_state)
state_change_reason = alarm_transitioned[ self._batch_stmt = BatchStatement()
'stateChangeReason']
LOG.debug('state change reason: %s', state_change_reason)
tenant_id = alarm_transitioned['tenantId'] def create_metric_hash(metric_name, dimensions):
LOG.debug('tenant id: %s', tenant_id)
time_stamp = alarm_transitioned['timestamp'] metric_name_part = '__name__' + '=' + urllib.quote_plus(metric_name)
LOG.debug('time stamp: %s', time_stamp)
sub_alarms = alarm_transitioned['subAlarms'] hash_string = metric_name_part
if sub_alarms: metric_set = set()
sub_alarms_json = json.dumps(sub_alarms, ensure_ascii=False) metric_set.add(metric_name_part)
sub_alarms_json_snake_case = sub_alarms_json.replace( for dim_name in sorted(dimensions.iterkeys()):
'"subAlarmExpression":', dimension = (urllib.quote_plus(dim_name) + '=' + urllib.quote_plus(
'"sub_alarm_expression":') dimensions[dim_name]))
metric_set.add(dimension)
hash_string += dimension
sub_alarms_json_snake_case = sub_alarms_json_snake_case.replace( sha1_hash = hashlib.sha1(hash_string).hexdigest()
'"metricDefinition":',
'"metric_definition":')
sub_alarms_json_snake_case = sub_alarms_json_snake_case.replace( return bytearray.fromhex(sha1_hash), metric_set
'"subAlarmState":',
'"sub_alarm_state":')
else:
sub_alarms_json_snake_case = "[]" class AlarmStateHistCassandraPersister(AbstractCassandraPersister):
def __init__(self, kafka_conf, cassandra_db_conf, zookeeper_conf):
super(AlarmStateHistCassandraPersister, self).__init__(
kafka_conf,
cassandra_db_conf,
zookeeper_conf)
self._insert_alarm_state_hist_stmt = self.cassandra_session.prepare(
'insert into alarm_state_history (tenant_id, alarm_id, '
'metrics, new_state, '
'old_state, reason, reason_data, '
'sub_alarms, time_stamp) values (?,?,?,?,?,?,?,?,?)')
def process_message(self, message):
(alarm_id, metrics, new_state, old_state, state_change_reason,
sub_alarms_json_snake_case, tenant_id,
time_stamp) = parse_alarm_state_hist_message(
message)
alarm_state_hist = (
tenant_id.encode('utf8'),
alarm_id.encode('utf8'),
json.dumps(metrics, ensure_ascii=False).encode(
'utf8'),
new_state.encode('utf8'),
old_state.encode('utf8'),
state_change_reason.encode('utf8'),
"{}".encode('utf8'),
sub_alarms_json_snake_case.encode('utf8'),
time_stamp
)
LOG.debug(alarm_state_hist)
return alarm_state_hist
def execute_batch(self, alarm_state_hists):
for alarm_state_hist in alarm_state_hists:
self._batch_stmt.add(self._insert_alarm_state_hist_stmt,
alarm_state_hist)
self.cassandra_session.execute(self._batch_stmt)
self._batch_stmt = BatchStatement()
@six.add_metaclass(abc.ABCMeta)
class AbstractInfluxdbPersister(AbstractPersister):
def __init__(self, kafka_conf, influxdb_db_conf, zookeeper_conf):
super(AbstractInfluxdbPersister, self).__init__(
kafka_conf, influxdb_db_conf, zookeeper_conf)
def init_db(self, influxdb_db_conf):
self._influxdb_client = InfluxDBClient(influxdb_db_conf.ip_address,
influxdb_db_conf.port,
influxdb_db_conf.user,
influxdb_db_conf.password,
influxdb_db_conf.database_name)
def execute_batch(self, data_points):
self._influxdb_client.write_points(data_points, 'ms')
class AlarmStateHistInfluxdbPersister(AbstractInfluxdbPersister):
def __init__(self, kafka_conf, influxdb_db_conf, zookeeper_conf):
super(AlarmStateHistInfluxdbPersister, self).__init__(
kafka_conf, influxdb_db_conf, zookeeper_conf)
def process_message(self, message):
(alarm_id, metrics, new_state, old_state, state_change_reason,
sub_alarms_json_snake_case, tenant_id,
time_stamp) = parse_alarm_state_hist_message(
message)
ts = time_stamp / 1000.0 ts = time_stamp / 1000.0
data = {"measurement": 'alarm_state_history', data = {"measurement": 'alarm_state_history',
"time": datetime.fromtimestamp(ts, tz=pytz.utc).strftime( "time": datetime.fromtimestamp(ts, tz=pytz.utc).strftime(
'%Y-%m-%dT%H:%M:%S.%fZ'), '%Y-%m-%dT%H:%M:%S.%fZ'),
"fields": { "fields": {
"tenant_id": tenant_id.encode('utf8'), "tenant_id": tenant_id.encode('utf8'),
"alarm_id": alarm_id.encode('utf8'), "alarm_id": alarm_id.encode('utf8'),
"metrics": json.dumps(metrics, ensure_ascii=False).encode('utf8'), "metrics": json.dumps(metrics, ensure_ascii=False).encode(
'utf8'),
"new_state": new_state.encode('utf8'), "new_state": new_state.encode('utf8'),
"old_state": old_state.encode('utf8'), "old_state": old_state.encode('utf8'),
"reason": state_change_reason.encode('utf8'), "reason": state_change_reason.encode('utf8'),
@ -330,60 +514,18 @@ class AlarmPersister(AbstractPersister):
return data return data
class MetricPersister(AbstractPersister): class MetricInfluxdbPersister(AbstractInfluxdbPersister):
"""Class for persisting metrics.
"""
def __init__(self, kafka_conf, influxdb_conf, zookeeper_conf): def __init__(self, kafka_conf, influxdb_db_conf, zookeeper_conf):
super(MetricPersister, self).__init__(kafka_conf, super(MetricInfluxdbPersister, self).__init__(kafka_conf,
influxdb_conf, influxdb_db_conf,
zookeeper_conf) zookeeper_conf)
def process_message(self, message): def process_message(self, message):
LOG.debug(message.message.value.decode('utf8')) (dimensions, metric_name, region, tenant_id, time_stamp, value,
value_meta) = parse_measurement_message(message)
decoded = json.loads(message.message.value)
LOG.debug(json.dumps(decoded, sort_keys=True, indent=4))
metric = decoded['metric']
metric_name = metric['name']
LOG.debug('name: %s', metric_name)
creation_time = decoded['creation_time']
LOG.debug('creation time: %s', creation_time)
region = decoded['meta']['region']
LOG.debug('region: %s', region)
tenant_id = decoded['meta']['tenantId']
LOG.debug('tenant id: %s', tenant_id)
dimensions = {}
if 'dimensions' in metric:
for dimension_name in metric['dimensions']:
dimensions[dimension_name.encode('utf8')] = (
metric['dimensions'][dimension_name].encode('utf8'))
LOG.debug('dimension: %s : %s', dimension_name,
dimensions[dimension_name])
time_stamp = metric['timestamp']
LOG.debug('timestamp %s', time_stamp)
value = float(metric['value'])
LOG.debug('value: %s', value)
if 'value_meta' in metric and metric['value_meta']:
value_meta = metric['value_meta']
else:
value_meta = {}
LOG.debug('value_meta: %s', value_meta)
tags = dimensions tags = dimensions
tags['_tenant_id'] = tenant_id.encode('utf8') tags['_tenant_id'] = tenant_id.encode('utf8')
@ -393,7 +535,7 @@ class MetricPersister(AbstractPersister):
data = {"measurement": metric_name.encode('utf8'), data = {"measurement": metric_name.encode('utf8'),
"time": datetime.fromtimestamp(ts, tz=pytz.utc).strftime( "time": datetime.fromtimestamp(ts, tz=pytz.utc).strftime(
'%Y-%m-%dT%H:%M:%S.%fZ'), '%Y-%m-%dT%H:%M:%S.%fZ'),
"fields": { "fields": {
"value": value, "value": value,
"value_meta": json.dumps(value_meta, "value_meta": json.dumps(value_meta,
@ -406,6 +548,122 @@ class MetricPersister(AbstractPersister):
return data return data
def parse_measurement_message(message):
LOG.debug(message.message.value.decode('utf8'))
decoded_message = json.loads(message.message.value)
LOG.debug(json.dumps(decoded_message, sort_keys=True, indent=4))
metric = decoded_message['metric']
metric_name = metric['name']
LOG.debug('name: %s', metric_name)
creation_time = decoded_message['creation_time']
LOG.debug('creation time: %s', creation_time)
region = decoded_message['meta']['region']
LOG.debug('region: %s', region)
tenant_id = decoded_message['meta']['tenantId']
LOG.debug('tenant id: %s', tenant_id)
dimensions = {}
if 'dimensions' in metric:
for dimension_name in metric['dimensions']:
dimensions[dimension_name.encode('utf8')] = (
metric['dimensions'][dimension_name].encode('utf8'))
LOG.debug('dimension: %s : %s', dimension_name,
dimensions[dimension_name])
time_stamp = metric['timestamp']
LOG.debug('timestamp %s', time_stamp)
value = float(metric['value'])
LOG.debug('value: %s', value)
if 'value_meta' in metric and metric['value_meta']:
value_meta = metric['value_meta']
else:
value_meta = {}
LOG.debug('value_meta: %s', value_meta)
return (dimensions, metric_name, region, tenant_id, time_stamp, value,
value_meta)
def parse_alarm_state_hist_message(message):
LOG.debug(message.message.value.decode('utf8'))
decoded_message = json.loads(message.message.value)
LOG.debug(json.dumps(decoded_message, sort_keys=True, indent=4))
alarm_transitioned = decoded_message['alarm-transitioned']
actions_enabled = alarm_transitioned['actionsEnabled']
LOG.debug('actions enabled: %s', actions_enabled)
alarm_description = alarm_transitioned['alarmDescription']
LOG.debug('alarm description: %s', alarm_description)
alarm_id = alarm_transitioned['alarmId']
LOG.debug('alarm id: %s', alarm_id)
alarm_definition_id = alarm_transitioned[
'alarmDefinitionId']
LOG.debug('alarm definition id: %s', alarm_definition_id)
metrics = alarm_transitioned['metrics']
LOG.debug('metrics: %s', metrics)
alarm_name = alarm_transitioned['alarmName']
LOG.debug('alarm name: %s', alarm_name)
new_state = alarm_transitioned['newState']
LOG.debug('new state: %s', new_state)
old_state = alarm_transitioned['oldState']
LOG.debug('old state: %s', old_state)
state_change_reason = alarm_transitioned[
'stateChangeReason']
LOG.debug('state change reason: %s', state_change_reason)
tenant_id = alarm_transitioned['tenantId']
LOG.debug('tenant id: %s', tenant_id)
time_stamp = alarm_transitioned['timestamp']
LOG.debug('time stamp: %s', time_stamp)
sub_alarms = alarm_transitioned['subAlarms']
if sub_alarms:
sub_alarms_json = json.dumps(sub_alarms, ensure_ascii=False)
sub_alarms_json_snake_case = sub_alarms_json.replace(
'"subAlarmExpression":',
'"sub_alarm_expression":')
sub_alarms_json_snake_case = sub_alarms_json_snake_case.replace(
'"metricDefinition":',
'"metric_definition":')
sub_alarms_json_snake_case = sub_alarms_json_snake_case.replace(
'"subAlarmState":',
'"sub_alarm_state":')
else:
sub_alarms_json_snake_case = "[]"
return (alarm_id, metrics, new_state, old_state, state_change_reason,
sub_alarms_json_snake_case, tenant_id, time_stamp)
def main_service(): def main_service():
"""Method to use with Openstack service. """Method to use with Openstack service.
""" """
@ -415,6 +673,7 @@ def main_service():
launcher.launch_service(Persister()) launcher.launch_service(Persister())
launcher.wait() launcher.wait()
# Used if run without Openstack service. # Used if run without Openstack service.
if __name__ == "__main__": if __name__ == "__main__":
sys.exit(main()) sys.exit(main())

View File

@ -10,5 +10,6 @@ six==1.9.0
babel babel
eventlet eventlet
influxdb==2.8.0 influxdb==2.8.0
cassandra-driver==3.0.0
iso8601 iso8601
monasca-common monasca-common