Enable Cassandra Database

Allow Cassandra Database to be used as backend database.

100% backward compatible with Influxdb.

Should not require any install or config changes to
use the default Influxdb.

Change-Id: I4cfe50a99eea0f8ae7421dcefbdd2cb2cc008b3b
This commit is contained in:
Deklan Dieterly 2015-12-29 09:55:26 -07:00
parent 852b42f3c7
commit 52b38a0dfb
3 changed files with 399 additions and 128 deletions

View File

@ -7,6 +7,12 @@ debug = false
# Show more verbose log output (sets INFO log level output) if debug is False
verbose = true
[database]
# Choose database type
# database_type := 'cassandra' | 'influxdb'
#database_type = cassandra
database_type = influxdb
[zookeeper]
# Comma separated list of host:port
uri = 192.168.10.4:2181
@ -53,3 +59,8 @@ port = 8086
user = mon_persister
password = password
# Uncomment and set cluster_ip_addresses if database_type is 'cassandra'
#[cassandra]
# Comma separated list of Cassandra node IP addresses. No spaces.
#cluster_ip_addresses: 10.10.10.3
#keyspace: monasca

View File

@ -27,13 +27,19 @@
"""
import abc
import hashlib
import urllib
from datetime import datetime
import json
import os
import six
import sys
import threading
from cassandra.cluster import Cluster
from cassandra.query import BatchStatement
from influxdb import InfluxDBClient
import pytz
@ -45,11 +51,17 @@ import service
from monasca_common.kafka.consumer import KafkaConsumer
LOG = log.getLogger(__name__)
log.register_options(cfg.CONF)
log.set_defaults()
database_opts = [cfg.StrOpt('database_type')]
database_group = cfg.OptGroup(name='database')
cfg.CONF.register_group(database_group)
cfg.CONF.register_opts(database_opts, database_group)
zookeeper_opts = [cfg.StrOpt('uri'),
cfg.IntOpt('partition_interval_recheck_seconds')]
@ -91,22 +103,62 @@ influxdb_group = cfg.OptGroup(name='influxdb', title='influxdb')
cfg.CONF.register_group(influxdb_group)
cfg.CONF.register_opts(influxdb_opts, influxdb_group)
cassandra_opts = [cfg.StrOpt('cluster_ip_addresses'),
cfg.StrOpt('keyspace')]
cassandra_group = cfg.OptGroup(name='cassandra')
cfg.CONF.register_group(cassandra_group)
cfg.CONF.register_opts(cassandra_opts, cassandra_group)
cfg.CONF(sys.argv[1:], project='monasca', prog='persister')
log.setup(cfg.CONF, "monasca-persister")
def main():
"""Start persister.
Start metric persister and alarm persister in separate threads.
"""
metric_persister = MetricPersister(cfg.CONF.kafka_metrics,
cfg.CONF.influxdb,
cfg.CONF.zookeeper)
database_type = cfg.CONF.database.database_type
alarm_persister = AlarmPersister(cfg.CONF.kafka_alarm_history,
cfg.CONF.influxdb,
cfg.CONF.zookeeper)
if database_type is None:
LOG.warn("Database type is not configured.")
LOG.warn("Using influxdb for default database type.")
LOG.warn("Please configure a database type using the 'database_type' "
"property in the config file.")
# Allow None for database_type for backwards compatibility.
if database_type is None or database_type.lower() == 'influxdb':
metric_persister = MetricInfluxdbPersister(cfg.CONF.kafka_metrics,
cfg.CONF.influxdb,
cfg.CONF.zookeeper)
alarm_persister = AlarmStateHistInfluxdbPersister(
cfg.CONF.kafka_alarm_history,
cfg.CONF.influxdb,
cfg.CONF.zookeeper)
elif database_type.lower() == 'cassandra':
metric_persister = MetricCassandraPersister(
cfg.CONF.kafka_metrics,
cfg.CONF.cassandra,
cfg.CONF.zookeeper)
alarm_persister = AlarmStateHistCassandraPersister(
cfg.CONF.kafka_alarm_history,
cfg.CONF.cassandra,
cfg.CONF.zookeeper)
else:
LOG.error("Unknown database type [{}] is not implemented".format(
database_type))
LOG.error("Known database types are [influxdb] and [cassandra]")
LOG.error("Please configure a known database type in the config file.")
os._exit(1)
metric_persister.start()
alarm_persister.start()
@ -148,7 +200,7 @@ class Persister(os_service.Service):
"""
def __init__(self, threads=1):
super(Persister, self).__init__(threads)
super(Persister, self).__init__(threads)
def start(self):
@ -164,7 +216,8 @@ class Persister(os_service.Service):
@six.add_metaclass(abc.ABCMeta)
class AbstractPersister(threading.Thread):
def __init__(self, kafka_conf, influxdb_conf, zookeeper_conf):
def __init__(self, kafka_conf, db_conf, zookeeper_conf):
super(AbstractPersister, self).__init__()
@ -174,39 +227,44 @@ class AbstractPersister(threading.Thread):
self._database_batch_size = kafka_conf.database_batch_size
self._consumer = KafkaConsumer(kafka_conf.uri,
zookeeper_conf.uri,
kafka_conf.zookeeper_path,
kafka_conf.group_id,
kafka_conf.topic,
repartition_callback=self._flush,
commit_callback=self._flush,
commit_timeout=kafka_conf.max_wait_time_seconds)
self._consumer = KafkaConsumer(
kafka_conf.uri,
zookeeper_conf.uri,
kafka_conf.zookeeper_path,
kafka_conf.group_id,
kafka_conf.topic,
repartition_callback=self._flush,
commit_callback=self._flush,
commit_timeout=kafka_conf.max_wait_time_seconds)
self._influxdb_client = InfluxDBClient(influxdb_conf.ip_address,
influxdb_conf.port,
influxdb_conf.user,
influxdb_conf.password,
influxdb_conf.database_name)
self.init_db(db_conf)
@abc.abstractmethod
def init_db(self, db_conf):
pass
@abc.abstractmethod
def process_message(self, message):
pass
@abc.abstractmethod
def execute_batch(self, data_points):
pass
def _flush(self):
if not self._data_points:
return
try:
self._influxdb_client.write_points(self._data_points, 'ms')
self.execute_batch(self._data_points)
LOG.info("Processed {} messages from topic '{}'".format(
len(self._data_points), self._kafka_topic))
len(self._data_points), self._kafka_topic))
self._data_points = []
self._consumer.commit()
except Exception:
LOG.exception("Error writing to influxdb: {}"
LOG.exception("Error writing to database: {}"
.format(self._data_points))
raise
@ -225,96 +283,222 @@ class AbstractPersister(threading.Thread):
self._flush()
except:
LOG.exception(
'Persister encountered fatal exception processing messages. '
'Shutting down all threads and exiting')
'Persister encountered fatal exception processing '
'messages. '
'Shutting down all threads and exiting')
shutdown_all_threads_and_die()
class AlarmPersister(AbstractPersister):
"""Class for persisting alarms.
"""
@six.add_metaclass(abc.ABCMeta)
class AbstractCassandraPersister(AbstractPersister):
def __init__(self, kafka_conf, influxdb_conf, zookeeper_conf):
def __init__(self, kafka_conf, cassandra_db_conf, zookeeper_conf):
super(AlarmPersister, self).__init__(kafka_conf,
influxdb_conf,
zookeeper_conf)
super(AbstractCassandraPersister, self).__init__(
kafka_conf, cassandra_db_conf, zookeeper_conf)
def init_db(self, cassandra_db_conf):
self._cassandra_cluster = Cluster(
cassandra_db_conf.cluster_ip_addresses.split(','))
self.cassandra_session = self._cassandra_cluster.connect(
cassandra_db_conf.keyspace)
self._batch_stmt = BatchStatement()
class MetricMeasurementInfo(object):
def __init__(self, tenant_id, region, metric_hash, metric_set,
measurement):
self.tenant_id = tenant_id
self.region = region
self.metric_hash = metric_hash
self.metric_set = metric_set
self.measurement = measurement
class MetricCassandraPersister(AbstractCassandraPersister):
def __init__(self, kafka_conf, cassandra_db_conf, zookeeper_conf):
super(MetricCassandraPersister, self).__init__(
kafka_conf,
cassandra_db_conf,
zookeeper_conf)
self._insert_measurement_stmt = self.cassandra_session.prepare(
'insert into measurements (tenant_id,'
'region, metric_hash, time_stamp, value,'
'value_meta) values (?, ?, ?, ?, ?, ?)')
self._insert_metric_map_stmt = self.cassandra_session.prepare(
'insert into metric_map (tenant_id,'
'region, metric_hash, '
'metric_set) values'
'(?,?,?,?)')
def process_message(self, message):
LOG.debug(message.message.value.decode('utf8'))
(dimensions, metric_name, region, tenant_id, time_stamp, value,
value_meta) = parse_measurement_message(message)
decoded = json.loads(message.message.value)
LOG.debug(json.dumps(decoded, sort_keys=True, indent=4))
metric_hash, metric_set = create_metric_hash(metric_name,
dimensions)
alarm_transitioned = decoded['alarm-transitioned']
measurement = (tenant_id.encode('utf8'),
region.encode('utf8'),
metric_hash,
time_stamp,
value,
json.dumps(value_meta, ensure_ascii=False).encode(
'utf8'))
actions_enabled = alarm_transitioned['actionsEnabled']
LOG.debug('actions enabled: %s', actions_enabled)
LOG.debug(measurement)
alarm_description = alarm_transitioned['alarmDescription']
LOG.debug('alarm description: %s', alarm_description)
return MetricMeasurementInfo(
tenant_id.encode('utf8'),
region.encode('utf8'),
metric_hash,
metric_set,
measurement)
alarm_id = alarm_transitioned['alarmId']
LOG.debug('alarm id: %s', alarm_id)
def execute_batch(self, metric_measurement_infos):
alarm_definition_id = alarm_transitioned[
'alarmDefinitionId']
LOG.debug('alarm definition id: %s', alarm_definition_id)
for metric_measurement_info in metric_measurement_infos:
metrics = alarm_transitioned['metrics']
LOG.debug('metrics: %s', metrics)
self._batch_stmt.add(self._insert_measurement_stmt,
metric_measurement_info.measurement)
alarm_name = alarm_transitioned['alarmName']
LOG.debug('alarm name: %s', alarm_name)
metric_map = (metric_measurement_info.tenant_id,
metric_measurement_info.region,
metric_measurement_info.metric_hash,
metric_measurement_info.metric_set)
new_state = alarm_transitioned['newState']
LOG.debug('new state: %s', new_state)
self._batch_stmt.add(self._insert_metric_map_stmt,
metric_map)
old_state = alarm_transitioned['oldState']
LOG.debug('old state: %s', old_state)
self.cassandra_session.execute(self._batch_stmt)
state_change_reason = alarm_transitioned[
'stateChangeReason']
LOG.debug('state change reason: %s', state_change_reason)
self._batch_stmt = BatchStatement()
tenant_id = alarm_transitioned['tenantId']
LOG.debug('tenant id: %s', tenant_id)
def create_metric_hash(metric_name, dimensions):
time_stamp = alarm_transitioned['timestamp']
LOG.debug('time stamp: %s', time_stamp)
metric_name_part = '__name__' + '=' + urllib.quote_plus(metric_name)
sub_alarms = alarm_transitioned['subAlarms']
hash_string = metric_name_part
if sub_alarms:
metric_set = set()
sub_alarms_json = json.dumps(sub_alarms, ensure_ascii=False)
metric_set.add(metric_name_part)
sub_alarms_json_snake_case = sub_alarms_json.replace(
'"subAlarmExpression":',
'"sub_alarm_expression":')
for dim_name in sorted(dimensions.iterkeys()):
dimension = (urllib.quote_plus(dim_name) + '=' + urllib.quote_plus(
dimensions[dim_name]))
metric_set.add(dimension)
hash_string += dimension
sub_alarms_json_snake_case = sub_alarms_json_snake_case.replace(
'"metricDefinition":',
'"metric_definition":')
sha1_hash = hashlib.sha1(hash_string).hexdigest()
sub_alarms_json_snake_case = sub_alarms_json_snake_case.replace(
'"subAlarmState":',
'"sub_alarm_state":')
return bytearray.fromhex(sha1_hash), metric_set
else:
sub_alarms_json_snake_case = "[]"
class AlarmStateHistCassandraPersister(AbstractCassandraPersister):
def __init__(self, kafka_conf, cassandra_db_conf, zookeeper_conf):
super(AlarmStateHistCassandraPersister, self).__init__(
kafka_conf,
cassandra_db_conf,
zookeeper_conf)
self._insert_alarm_state_hist_stmt = self.cassandra_session.prepare(
'insert into alarm_state_history (tenant_id, alarm_id, '
'metrics, new_state, '
'old_state, reason, reason_data, '
'sub_alarms, time_stamp) values (?,?,?,?,?,?,?,?,?)')
def process_message(self, message):
(alarm_id, metrics, new_state, old_state, state_change_reason,
sub_alarms_json_snake_case, tenant_id,
time_stamp) = parse_alarm_state_hist_message(
message)
alarm_state_hist = (
tenant_id.encode('utf8'),
alarm_id.encode('utf8'),
json.dumps(metrics, ensure_ascii=False).encode(
'utf8'),
new_state.encode('utf8'),
old_state.encode('utf8'),
state_change_reason.encode('utf8'),
"{}".encode('utf8'),
sub_alarms_json_snake_case.encode('utf8'),
time_stamp
)
LOG.debug(alarm_state_hist)
return alarm_state_hist
def execute_batch(self, alarm_state_hists):
for alarm_state_hist in alarm_state_hists:
self._batch_stmt.add(self._insert_alarm_state_hist_stmt,
alarm_state_hist)
self.cassandra_session.execute(self._batch_stmt)
self._batch_stmt = BatchStatement()
@six.add_metaclass(abc.ABCMeta)
class AbstractInfluxdbPersister(AbstractPersister):
def __init__(self, kafka_conf, influxdb_db_conf, zookeeper_conf):
super(AbstractInfluxdbPersister, self).__init__(
kafka_conf, influxdb_db_conf, zookeeper_conf)
def init_db(self, influxdb_db_conf):
self._influxdb_client = InfluxDBClient(influxdb_db_conf.ip_address,
influxdb_db_conf.port,
influxdb_db_conf.user,
influxdb_db_conf.password,
influxdb_db_conf.database_name)
def execute_batch(self, data_points):
self._influxdb_client.write_points(data_points, 'ms')
class AlarmStateHistInfluxdbPersister(AbstractInfluxdbPersister):
def __init__(self, kafka_conf, influxdb_db_conf, zookeeper_conf):
super(AlarmStateHistInfluxdbPersister, self).__init__(
kafka_conf, influxdb_db_conf, zookeeper_conf)
def process_message(self, message):
(alarm_id, metrics, new_state, old_state, state_change_reason,
sub_alarms_json_snake_case, tenant_id,
time_stamp) = parse_alarm_state_hist_message(
message)
ts = time_stamp / 1000.0
data = {"measurement": 'alarm_state_history',
"time": datetime.fromtimestamp(ts, tz=pytz.utc).strftime(
'%Y-%m-%dT%H:%M:%S.%fZ'),
'%Y-%m-%dT%H:%M:%S.%fZ'),
"fields": {
"tenant_id": tenant_id.encode('utf8'),
"alarm_id": alarm_id.encode('utf8'),
"metrics": json.dumps(metrics, ensure_ascii=False).encode('utf8'),
"metrics": json.dumps(metrics, ensure_ascii=False).encode(
'utf8'),
"new_state": new_state.encode('utf8'),
"old_state": old_state.encode('utf8'),
"reason": state_change_reason.encode('utf8'),
@ -330,60 +514,18 @@ class AlarmPersister(AbstractPersister):
return data
class MetricPersister(AbstractPersister):
"""Class for persisting metrics.
"""
class MetricInfluxdbPersister(AbstractInfluxdbPersister):
def __init__(self, kafka_conf, influxdb_conf, zookeeper_conf):
def __init__(self, kafka_conf, influxdb_db_conf, zookeeper_conf):
super(MetricPersister, self).__init__(kafka_conf,
influxdb_conf,
zookeeper_conf)
super(MetricInfluxdbPersister, self).__init__(kafka_conf,
influxdb_db_conf,
zookeeper_conf)
def process_message(self, message):
LOG.debug(message.message.value.decode('utf8'))
decoded = json.loads(message.message.value)
LOG.debug(json.dumps(decoded, sort_keys=True, indent=4))
metric = decoded['metric']
metric_name = metric['name']
LOG.debug('name: %s', metric_name)
creation_time = decoded['creation_time']
LOG.debug('creation time: %s', creation_time)
region = decoded['meta']['region']
LOG.debug('region: %s', region)
tenant_id = decoded['meta']['tenantId']
LOG.debug('tenant id: %s', tenant_id)
dimensions = {}
if 'dimensions' in metric:
for dimension_name in metric['dimensions']:
dimensions[dimension_name.encode('utf8')] = (
metric['dimensions'][dimension_name].encode('utf8'))
LOG.debug('dimension: %s : %s', dimension_name,
dimensions[dimension_name])
time_stamp = metric['timestamp']
LOG.debug('timestamp %s', time_stamp)
value = float(metric['value'])
LOG.debug('value: %s', value)
if 'value_meta' in metric and metric['value_meta']:
value_meta = metric['value_meta']
else:
value_meta = {}
LOG.debug('value_meta: %s', value_meta)
(dimensions, metric_name, region, tenant_id, time_stamp, value,
value_meta) = parse_measurement_message(message)
tags = dimensions
tags['_tenant_id'] = tenant_id.encode('utf8')
@ -393,7 +535,7 @@ class MetricPersister(AbstractPersister):
data = {"measurement": metric_name.encode('utf8'),
"time": datetime.fromtimestamp(ts, tz=pytz.utc).strftime(
'%Y-%m-%dT%H:%M:%S.%fZ'),
'%Y-%m-%dT%H:%M:%S.%fZ'),
"fields": {
"value": value,
"value_meta": json.dumps(value_meta,
@ -406,6 +548,122 @@ class MetricPersister(AbstractPersister):
return data
def parse_measurement_message(message):
LOG.debug(message.message.value.decode('utf8'))
decoded_message = json.loads(message.message.value)
LOG.debug(json.dumps(decoded_message, sort_keys=True, indent=4))
metric = decoded_message['metric']
metric_name = metric['name']
LOG.debug('name: %s', metric_name)
creation_time = decoded_message['creation_time']
LOG.debug('creation time: %s', creation_time)
region = decoded_message['meta']['region']
LOG.debug('region: %s', region)
tenant_id = decoded_message['meta']['tenantId']
LOG.debug('tenant id: %s', tenant_id)
dimensions = {}
if 'dimensions' in metric:
for dimension_name in metric['dimensions']:
dimensions[dimension_name.encode('utf8')] = (
metric['dimensions'][dimension_name].encode('utf8'))
LOG.debug('dimension: %s : %s', dimension_name,
dimensions[dimension_name])
time_stamp = metric['timestamp']
LOG.debug('timestamp %s', time_stamp)
value = float(metric['value'])
LOG.debug('value: %s', value)
if 'value_meta' in metric and metric['value_meta']:
value_meta = metric['value_meta']
else:
value_meta = {}
LOG.debug('value_meta: %s', value_meta)
return (dimensions, metric_name, region, tenant_id, time_stamp, value,
value_meta)
def parse_alarm_state_hist_message(message):
LOG.debug(message.message.value.decode('utf8'))
decoded_message = json.loads(message.message.value)
LOG.debug(json.dumps(decoded_message, sort_keys=True, indent=4))
alarm_transitioned = decoded_message['alarm-transitioned']
actions_enabled = alarm_transitioned['actionsEnabled']
LOG.debug('actions enabled: %s', actions_enabled)
alarm_description = alarm_transitioned['alarmDescription']
LOG.debug('alarm description: %s', alarm_description)
alarm_id = alarm_transitioned['alarmId']
LOG.debug('alarm id: %s', alarm_id)
alarm_definition_id = alarm_transitioned[
'alarmDefinitionId']
LOG.debug('alarm definition id: %s', alarm_definition_id)
metrics = alarm_transitioned['metrics']
LOG.debug('metrics: %s', metrics)
alarm_name = alarm_transitioned['alarmName']
LOG.debug('alarm name: %s', alarm_name)
new_state = alarm_transitioned['newState']
LOG.debug('new state: %s', new_state)
old_state = alarm_transitioned['oldState']
LOG.debug('old state: %s', old_state)
state_change_reason = alarm_transitioned[
'stateChangeReason']
LOG.debug('state change reason: %s', state_change_reason)
tenant_id = alarm_transitioned['tenantId']
LOG.debug('tenant id: %s', tenant_id)
time_stamp = alarm_transitioned['timestamp']
LOG.debug('time stamp: %s', time_stamp)
sub_alarms = alarm_transitioned['subAlarms']
if sub_alarms:
sub_alarms_json = json.dumps(sub_alarms, ensure_ascii=False)
sub_alarms_json_snake_case = sub_alarms_json.replace(
'"subAlarmExpression":',
'"sub_alarm_expression":')
sub_alarms_json_snake_case = sub_alarms_json_snake_case.replace(
'"metricDefinition":',
'"metric_definition":')
sub_alarms_json_snake_case = sub_alarms_json_snake_case.replace(
'"subAlarmState":',
'"sub_alarm_state":')
else:
sub_alarms_json_snake_case = "[]"
return (alarm_id, metrics, new_state, old_state, state_change_reason,
sub_alarms_json_snake_case, tenant_id, time_stamp)
def main_service():
"""Method to use with Openstack service.
"""
@ -415,6 +673,7 @@ def main_service():
launcher.launch_service(Persister())
launcher.wait()
# Used if run without Openstack service.
if __name__ == "__main__":
sys.exit(main())

View File

@ -10,5 +10,6 @@ six==1.9.0
babel
eventlet
influxdb==2.8.0
cassandra-driver==3.0.0
iso8601
monasca-common