From d3acf120346937e751695179717f2517a895c267 Mon Sep 17 00:00:00 2001 From: Deklan Dieterly Date: Thu, 2 Oct 2014 08:33:19 -0600 Subject: [PATCH] Increase throughput of persister Add batching to InfluxDB writes to improve throughput. Change-Id: Ia173f55726cb11245f0bcf4580f1af8129c23aa3 --- .gitignore | 1 + monasca_persister/persister.conf | 4 + monasca_persister/persister.py | 239 +++++++++++++++++++---------- monasca_persister/servicerunner.py | 4 +- setup.cfg | 3 +- tox.ini | 2 +- 6 files changed, 164 insertions(+), 89 deletions(-) diff --git a/.gitignore b/.gitignore index 8d642998..64f56a22 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,4 @@ target/ debs/ logs/ .idea/ +*.pyc diff --git a/monasca_persister/persister.conf b/monasca_persister/persister.conf index f57a74cd..08aba1bf 100644 --- a/monasca_persister/persister.conf +++ b/monasca_persister/persister.conf @@ -10,10 +10,14 @@ alarm_history_group_id = 1_alarm-state-transitions alarm_history_topic = alarm-state-transitions alarm_history_consumer_id = 1 alarm_history_client_id = 1 +alarm_batch_size = 1000 +alarm_max_wait_time_seconds = 30 metrics_group_id = 1_metrics metrics_topic = metrics metrics_consumer_id = 1 metrics_client_id = 1 +metrics_batch_size = 1000 +metrics_max_wait_time_seconds = 30 [influxdb] database_name = test diff --git a/monasca_persister/persister.py b/monasca_persister/persister.py index f36e8ce4..f2089ebb 100644 --- a/monasca_persister/persister.py +++ b/monasca_persister/persister.py @@ -23,19 +23,22 @@ Also able to use Openstack service to start the persister. """ -import threading -from kafka import KafkaClient, SimpleConsumer -from influxdb import InfluxDBClient +from datetime import datetime import json -import urllib import sys +import threading +import urllib + +from influxdb import InfluxDBClient +from kafka import KafkaClient +from kafka import SimpleConsumer from oslo.config import cfg from openstack.common import log from openstack.common import service as os_service - import service + LOG = log.getLogger(__name__) kafka_opts = [ @@ -44,10 +47,14 @@ kafka_opts = [ cfg.StrOpt('alarm_history_topic'), cfg.StrOpt('alarm_history_consumer_id'), cfg.StrOpt('alarm_history_client_id'), + cfg.IntOpt('alarm_batch_size'), + cfg.IntOpt('alarm_max_wait_time_seconds'), cfg.StrOpt('metrics_group_id'), cfg.StrOpt('metrics_topic'), cfg.StrOpt('metrics_consumer_id'), - cfg.StrOpt('metrics_client_id') + cfg.StrOpt('metrics_client_id'), + cfg.IntOpt('metrics_batch_size'), + cfg.IntOpt('metrics_max_wait_time_seconds') ] kafka_group = cfg.OptGroup(name='kafka', @@ -108,68 +115,104 @@ class AlarmPersister(threading.Thread): def run(self): + count = 0 + json_body = [] + last_flush = datetime.now() + try: kafka = KafkaClient(self.conf.kafka.uri) consumer = SimpleConsumer(kafka, self.conf.kafka.alarm_history_group_id, self.conf.kafka.alarm_history_topic, - auto_commit=True) + auto_commit=False, iter_timeout=1) influxdb_client = InfluxDBClient(self.conf.influxdb.ip_address, self.conf.influxdb.port, self.conf.influxdb.user, self.conf.influxdb.password, self.conf.influxdb.database_name) + while (True): - for message in consumer: - LOG.debug(message.message.value.decode('utf8')) + delta_time = datetime.now() - last_flush + if (delta_time.seconds > + self.conf.kafka.alarm_max_wait_time_seconds): + if json_body: + influxdb_client.write_points(json_body) + consumer.commit() + last_flush = datetime.now() + count = 0 + json_body = [] - decoded = json.loads(message.message.value) - LOG.debug(json.dumps(decoded, sort_keys=True, indent=4)) + for message in consumer: + LOG.debug(message.message.value.decode('utf8')) - actions_enabled = decoded['alarm-transitioned'][ - 'actionsEnabled'] - LOG.debug('actions enabled: %s', actions_enabled) + decoded = json.loads(message.message.value) + LOG.debug( + json.dumps(decoded, sort_keys=True, indent=4)) - alarm_description = decoded['alarm-transitioned'][ - 'alarmDescription'] - LOG.debug('alarm description: %s', alarm_description) + actions_enabled = decoded['alarm-transitioned'][ + 'actionsEnabled'] + LOG.debug('actions enabled: %s', actions_enabled) - alarm_id = decoded['alarm-transitioned']['alarmId'] - LOG.debug('alarm id: %s', alarm_id) + alarm_description = decoded['alarm-transitioned'][ + 'alarmDescription'] + LOG.debug('alarm description: %s', alarm_description) - alarm_name = decoded['alarm-transitioned']['alarmName'] - LOG.debug('alarm name: %s', alarm_name) + alarm_id = decoded['alarm-transitioned']['alarmId'] + LOG.debug('alarm id: %s', alarm_id) - new_state = decoded['alarm-transitioned']['newState'] - LOG.debug('new state: %s', new_state) + alarm_definition_id = decoded['alarm-transitioned'][ + 'alarmDefinitionId'] + LOG.debug('alarm definition id: %s', alarm_definition_id) - old_state = decoded['alarm-transitioned']['oldState'] - LOG.debug('old state: %s', old_state) + metrics = decoded['alarm-transitioned']['metrics'] + LOG.debug('metrics: %s', metrics) - state_changeReason = decoded['alarm-transitioned'][ - 'stateChangeReason'] - LOG.debug('state change reason: %s', state_changeReason) + alarm_name = decoded['alarm-transitioned']['alarmName'] + LOG.debug('alarm name: %s', alarm_name) - tenant_id = decoded['alarm-transitioned']['tenantId'] - LOG.debug('tenant id: %s', tenant_id) + new_state = decoded['alarm-transitioned']['newState'] + LOG.debug('new state: %s', new_state) - time_stamp = decoded['alarm-transitioned']['timestamp'] - LOG.debug('time stamp: %s', time_stamp) + old_state = decoded['alarm-transitioned']['oldState'] + LOG.debug('old state: %s', old_state) - json_body = [ - {"points": [ + state_change_reason = decoded['alarm-transitioned'][ + 'stateChangeReason'] + LOG.debug('state change reason: %s', + state_change_reason) + + tenant_id = decoded['alarm-transitioned']['tenantId'] + LOG.debug('tenant id: %s', tenant_id) + + time_stamp = decoded['alarm-transitioned']['timestamp'] + LOG.debug('time stamp: %s', time_stamp) + + data = {"points": [ [time_stamp, '{}', tenant_id.encode('utf8'), - alarm_id.encode('utf8'), old_state.encode('utf8'), + alarm_id.encode('utf8'), + alarm_definition_id.encode('utf8'), + json.dumps(metrics).encode('utf8'), old_state.encode('utf8'), new_state.encode('utf8'), - state_changeReason.encode('utf8')]], - "name": 'alarm_state_history', - "columns": ["time", "reason_data", "tenant_id", - "alarm_id", "old_state", "new_state", - "reason"]}] + state_change_reason.encode('utf8')]], + "name": 'alarm_state_history', + "columns": ["time", "reason_data", "tenant_id", + "alarm_id", "alarm_definition_id", + "metrics", "old_state", + "new_state", + "reason"]} - influxdb_client.write_points(json_body) + LOG.debug(data) + json_body.append(data) + + count += 1 + if count % self.conf.kafka.alarm_batch_size == 0: + influxdb_client.write_points(json_body) + consumer.commit() + last_flush = datetime.now() + count = 0 + json_body = [] except Exception: LOG.exception( @@ -187,13 +230,17 @@ class MetricPersister(threading.Thread): def run(self): + count = 0 + json_body = [] + last_flush = datetime.now() + try: kafka = KafkaClient(self.conf.kafka.uri) consumer = SimpleConsumer(kafka, self.conf.kafka.metrics_group_id, self.conf.kafka.metrics_topic, - auto_commit=True) + auto_commit=False, iter_timeout=1) influxdb_client = InfluxDBClient(self.conf.influxdb.ip_address, self.conf.influxdb.port, @@ -201,63 +248,86 @@ class MetricPersister(threading.Thread): self.conf.influxdb.password, self.conf.influxdb.database_name) - for message in consumer: - LOG.debug(message.message.value.decode('utf8')) + while (True): - decoded = json.loads(message.message.value) - LOG.debug(json.dumps(decoded, sort_keys=True, indent=4)) + delta_time = datetime.now() - last_flush + if (delta_time.seconds > + self.conf.kafka.metrics_max_wait_time_seconds): + if json_body: + influxdb_client.write_points(json_body) + consumer.commit() + last_flush = datetime.now() + count = 0 + json_body = [] - metric_name = decoded['metric']['name'] - LOG.debug('name: %s', metric_name) + for message in consumer: + LOG.debug(message.message.value.decode('utf8')) - creation_time = decoded['creation_time'] - LOG.debug('creation time: %s', creation_time) + decoded = json.loads(message.message.value) + LOG.debug( + json.dumps(decoded, sort_keys=True, indent=4)) - region = decoded['meta']['region'] - LOG.debug('region: %s', region) + metric_name = decoded['metric']['name'] + LOG.debug('name: %s', metric_name) - tenant_id = decoded['meta']['tenantId'] - LOG.debug('tenant id: %s', tenant_id) + creation_time = decoded['creation_time'] + LOG.debug('creation time: %s', creation_time) - dimensions = {} - if 'dimensions' in decoded['metric']: - for dimension_name in decoded['metric']['dimensions']: - dimensions[dimension_name] = ( - decoded['metric']['dimensions'][dimension_name]) - LOG.debug('dimension: %s : %s', dimension_name, - dimensions[dimension_name]) + region = decoded['meta']['region'] + LOG.debug('region: %s', region) - time_stamp = decoded['metric']['timestamp'] - LOG.debug('timestamp %s', time_stamp) + tenant_id = decoded['meta']['tenantId'] + LOG.debug('tenant id: %s', tenant_id) - value = decoded['metric']['value'] - LOG.debug('value: %s', value) + dimensions = {} + if 'dimensions' in decoded['metric']: + for dimension_name in decoded['metric'][ + 'dimensions']: + dimensions[dimension_name] = ( + decoded['metric']['dimensions'][ + dimension_name]) + LOG.debug('dimension: %s : %s', dimension_name, + dimensions[dimension_name]) - url_encoded_serie_name = ( - urllib.quote(metric_name.encode('utf8'), safe='') - + '?' + urllib.quote(tenant_id.encode('utf8'), safe='') - + '&' + urllib.quote(region.encode('utf8'), safe='')) + time_stamp = decoded['metric']['timestamp'] + LOG.debug('timestamp %s', time_stamp) - for dimension_name in dimensions: - url_encoded_serie_name += ('&' - + urllib.quote( - dimension_name.encode('utf8'), safe='') - + '=' - + urllib.quote( - dimensions[dimension_name].encode('utf8'), safe='')) + value = decoded['metric']['value'] + LOG.debug('value: %s', value) - LOG.debug("url_encoded_serie_name: %s", url_encoded_serie_name) + url_encoded_serie_name = ( + urllib.quote(metric_name.encode('utf8'), safe='') + + '?' + urllib.quote(tenant_id.encode('utf8'), + safe='') + + '&' + urllib.quote(region.encode('utf8'), + safe='')) - json_body = [ + for dimension_name in dimensions: + url_encoded_serie_name += ('&' + + urllib.quote( + dimension_name.encode('utf8'), safe='') + + '=' + + urllib.quote( + dimensions[dimension_name].encode('utf8'), + safe='')) + LOG.debug("url_encoded_serie_name: %s", + url_encoded_serie_name) - {"points": [[value, time_stamp]], - "name": url_encoded_serie_name, - "columns": ["value", "time"]}] + data = {"points": [[value, time_stamp]], + "name": url_encoded_serie_name, + "columns": ["value", "time"]} - LOG.debug(json_body) + LOG.debug(data) + json_body.append(data) - influxdb_client.write_points(json_body) + count += 1 + if count % self.conf.kafka.metrics_batch_size == 0: + influxdb_client.write_points(json_body) + consumer.commit() + last_flush = datetime.now() + count = 0 + json_body = [] except Exception: LOG.exception( @@ -276,5 +346,4 @@ def main_service(): # Used if run without Openstack service. if __name__ == "__main__": - sys.exit(main()) - + sys.exit(main()) \ No newline at end of file diff --git a/monasca_persister/servicerunner.py b/monasca_persister/servicerunner.py index ee22cbb2..a124e68f 100644 --- a/monasca_persister/servicerunner.py +++ b/monasca_persister/servicerunner.py @@ -18,6 +18,7 @@ """ import sys + from persister import main_service @@ -26,5 +27,4 @@ def main(): if __name__ == "__main__": - sys.exit(main()) - + sys.exit(main()) \ No newline at end of file diff --git a/setup.cfg b/setup.cfg index d42da26c..5ae2e921 100644 --- a/setup.cfg +++ b/setup.cfg @@ -28,4 +28,5 @@ autodoc_index_modules = True max-line-length = 120 [wheel] -universal = 1 \ No newline at end of file +universal = 1 + diff --git a/tox.ini b/tox.ini index f28847ec..87d82b43 100644 --- a/tox.ini +++ b/tox.ini @@ -28,5 +28,5 @@ max-line-length = 120 # H307 like imports should be grouped together # H405 multi line docstring summary not separated with an empty line # H904 Wrap long lines in parentheses instead of a backslash -ignore = F821,H201,H302,H305,H307,H405,H904 +ignore = F821,H201,H302,H305,H307,H405,H904,E126,E125,H306,E302,E122 exclude=.venv,.git,.tox,dist,*openstack/common*,*egg,build