From a7112fd30bd545dd850e0e267dcceb9ea27551ad Mon Sep 17 00:00:00 2001 From: Craig Bryant Date: Wed, 8 Feb 2017 18:31:40 -0700 Subject: [PATCH] Increase Persister Performance The main improvement comes from using the Influxdb Line Protocol. The encoding methods in line_utils.py are like the ones used in the influxdb client but optimized for our data Additional improvement comes from avoiding calls to encode('utf8') as the influxdb client already does that. On my test system, these changes increased the number of measurements processed from about 2200/second to about 3700/second. Measurement processing time is now dominated by Kafka. Approximately, 35% of time is spent reading from Kafka and approximately 22% of time is committing offsets. Only 10% of the time is spent writing to Influxdb. About 30% of the time is spent converting messages from the json string read from Kafka into the Line Protocol format for Influxdb. Once monasca-common is modified to use the faster kafka library, performance should be even better. I did try using ujson, but my tests showed it wasn't any faster than the json package. Change-Id: I2acf76d9a5f583c74a272e18350b9c0ad5883f95 --- .../influxdb/abstract_repository.py | 4 +- .../alarm_state_history_repository.py | 48 +++++++++---------- .../repositories/influxdb/line_utils.py | 46 ++++++++++++++++++ .../influxdb/metrics_repository.py | 36 ++++++++------ monasca_persister/repositories/utils.py | 19 +++----- monasca_persister/tests/test_influxdb.py | 42 ++++++++++++++++ 6 files changed, 140 insertions(+), 55 deletions(-) create mode 100644 monasca_persister/repositories/influxdb/line_utils.py create mode 100644 monasca_persister/tests/test_influxdb.py diff --git a/monasca_persister/repositories/influxdb/abstract_repository.py b/monasca_persister/repositories/influxdb/abstract_repository.py index 3b9a8f22..49354266 100644 --- a/monasca_persister/repositories/influxdb/abstract_repository.py +++ b/monasca_persister/repositories/influxdb/abstract_repository.py @@ -1,4 +1,4 @@ -# (C) Copyright 2016 Hewlett Packard Enterprise Development Company LP +# (C) Copyright 2016-2017 Hewlett Packard Enterprise Development LP # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -34,4 +34,4 @@ class AbstractInfluxdbRepository(abstract_repository.AbstractRepository): self.conf.influxdb.database_name) def write_batch(self, data_points): - self._influxdb_client.write_points(data_points, 'ms') + self._influxdb_client.write_points(data_points, 'ms', protocol='line') diff --git a/monasca_persister/repositories/influxdb/alarm_state_history_repository.py b/monasca_persister/repositories/influxdb/alarm_state_history_repository.py index bc93204d..fea37bb6 100644 --- a/monasca_persister/repositories/influxdb/alarm_state_history_repository.py +++ b/monasca_persister/repositories/influxdb/alarm_state_history_repository.py @@ -1,4 +1,4 @@ -# (C) Copyright 2016 Hewlett Packard Enterprise Development Company LP +# (C) Copyright 2016-2017 Hewlett Packard Enterprise Development LP # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -12,13 +12,12 @@ # implied. # See the License for the specific language governing permissions and # limitations under the License. -from datetime import datetime import json from oslo_log import log -import pytz from monasca_persister.repositories.influxdb import abstract_repository +from monasca_persister.repositories.influxdb import line_utils from monasca_persister.repositories.utils import parse_alarm_state_hist_message LOG = log.getLogger(__name__) @@ -39,28 +38,27 @@ class AlarmStateHistInfluxdbRepository( time_stamp) = parse_alarm_state_hist_message( message) - ts = time_stamp / 1000.0 + name = u'alarm_state_history' + fields = [] + fields.append(u'tenant_id=' + line_utils.escape_value(tenant_id)) + fields.append(u'alarm_id=' + line_utils.escape_value(alarm_id)) + fields.append(u'metrics=' + line_utils.escape_value( + json.dumps(metrics, ensure_ascii=False))) + fields.append(u'new_state=' + line_utils.escape_value(new_state)) + fields.append(u'old_state=' + line_utils.escape_value(old_state)) + fields.append(u'link=' + line_utils.escape_value(link)) + fields.append(u'lifecycle_state=' + line_utils.escape_value( + lifecycle_state)) + fields.append(u'reason=' + line_utils.escape_value( + state_change_reason)) + fields.append(u'reason_data=' + line_utils.escape_value("{}")) + fields.append(u'sub_alarms=' + line_utils.escape_value( + sub_alarms_json_snake_case)) - data = {"measurement": 'alarm_state_history', - "time": datetime.fromtimestamp(ts, tz=pytz.utc).strftime( - '%Y-%m-%dT%H:%M:%S.%fZ'), - "fields": { - "tenant_id": tenant_id.encode('utf8'), - "alarm_id": alarm_id.encode('utf8'), - "metrics": json.dumps(metrics, ensure_ascii=False).encode( - 'utf8'), - "new_state": new_state.encode('utf8'), - "old_state": old_state.encode('utf8'), - "link": link.encode('utf8'), - "lifecycle_state": lifecycle_state.encode('utf8'), - "reason": state_change_reason.encode('utf8'), - "reason_data": "{}".encode('utf8'), - "sub_alarms": sub_alarms_json_snake_case.encode('utf8') - }, - "tags": { - "tenant_id": tenant_id.encode('utf8') - }} + line = name + u',tenant_id=' + line_utils.escape_tag(tenant_id) + line += u' ' + u','.join(fields) + line += u' ' + str(int(time_stamp)) - LOG.debug(data) + LOG.debug(line) - return data + return line diff --git a/monasca_persister/repositories/influxdb/line_utils.py b/monasca_persister/repositories/influxdb/line_utils.py new file mode 100644 index 00000000..367e0c71 --- /dev/null +++ b/monasca_persister/repositories/influxdb/line_utils.py @@ -0,0 +1,46 @@ +# (C) Copyright 2017 Hewlett Packard Enterprise Development LP +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from six import PY2 + + +def escape_tag(tag): + tag = get_unicode(tag) + return tag.replace( + u"\\", u"\\\\" + ).replace( + u" ", u"\\ " + ).replace( + u",", u"\\," + ).replace( + u"=", u"\\=" + ) + +def get_unicode(data): + if PY2: + return unicode(data) + else: + return str(data) + +def escape_value(value): + return u"\"{0}\"".format( + get_unicode(value).replace( + u"\\", u"\\\\" + ).replace( + u"\"", u"\\\"" + ).replace( + u"\n", u"\\n" + ) + ) diff --git a/monasca_persister/repositories/influxdb/metrics_repository.py b/monasca_persister/repositories/influxdb/metrics_repository.py index 7afc4184..0ab342b3 100644 --- a/monasca_persister/repositories/influxdb/metrics_repository.py +++ b/monasca_persister/repositories/influxdb/metrics_repository.py @@ -1,4 +1,4 @@ -# (C) Copyright 2016 Hewlett Packard Enterprise Development Company LP +# (C) Copyright 2016-2017 Hewlett Packard Enterprise Development LP # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -12,13 +12,12 @@ # implied. # See the License for the specific language governing permissions and # limitations under the License. -from datetime import datetime import json from oslo_log import log -import pytz from monasca_persister.repositories.influxdb import abstract_repository +from monasca_persister.repositories.influxdb import line_utils from monasca_persister.repositories.utils import parse_measurement_message LOG = log.getLogger(__name__) @@ -36,20 +35,27 @@ class MetricInfluxdbRepository(abstract_repository.AbstractInfluxdbRepository): value_meta) = parse_measurement_message(message) tags = dimensions - tags['_tenant_id'] = tenant_id.encode('utf8') - tags['_region'] = region.encode('utf8') + tags[u'_tenant_id'] = tenant_id + tags[u'_region'] = region - ts = time_stamp / 1000.0 + if not value_meta: + value_meta_str = u'"{}"' + else: + value_meta_str = line_utils.escape_value(json.dumps(value_meta, ensure_ascii=False)) - data = {"measurement": metric_name.encode('utf8'), - "time": datetime.fromtimestamp(ts, tz=pytz.utc).strftime( - '%Y-%m-%dT%H:%M:%S.%fZ'), - "fields": { - "value": value, - "value_meta": json.dumps(value_meta, - ensure_ascii=False).encode('utf8') - }, - "tags": tags} + key_values = [line_utils.escape_tag(metric_name)] + + # tags should be sorted client-side to take load off server + for key in sorted(tags.keys()): + key_tag = line_utils.escape_tag(key) + value_tag = line_utils.escape_tag(tags[key]) + key_values.append(key_tag + u'=' + value_tag) + key_values = u','.join(key_values) + + value_field = u'value={}'.format(value) + value_meta_field = u'value_meta=' + value_meta_str + + data = key_values + u' ' + value_field + u',' + value_meta_field + u' ' + str(int(time_stamp)) LOG.debug(data) diff --git a/monasca_persister/repositories/utils.py b/monasca_persister/repositories/utils.py index e55c7816..6a78fa1b 100644 --- a/monasca_persister/repositories/utils.py +++ b/monasca_persister/repositories/utils.py @@ -1,4 +1,4 @@ -# (C) Copyright 2016 Hewlett Packard Enterprise Development Company LP +# (C) Copyright 2016-2017 Hewlett Packard Enterprise Development LP # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -27,24 +27,17 @@ def parse_measurement_message(message): tenant_id = decoded_message['meta']['tenantId'] - dimensions = {} - if 'dimensions' in metric: - for dimension_name in metric['dimensions']: - dimensions[dimension_name.encode('utf8')] = ( - metric['dimensions'][dimension_name].encode('utf8')) - time_stamp = metric['timestamp'] value = float(metric['value']) - if 'value_meta' in metric and metric['value_meta']: - value_meta = metric['value_meta'] - - else: + value_meta = metric.get('value_meta', {}) + if 'value_meta' is None: + # Ensure value_meta is a dict value_meta = {} - return (dimensions, metric_name, region, tenant_id, time_stamp, value, - value_meta) + return (metric.get('dimensions', {}), metric_name, region, tenant_id, + time_stamp, value, value_meta) def parse_alarm_state_hist_message(message): diff --git a/monasca_persister/tests/test_influxdb.py b/monasca_persister/tests/test_influxdb.py new file mode 100644 index 00000000..eebb0183 --- /dev/null +++ b/monasca_persister/tests/test_influxdb.py @@ -0,0 +1,42 @@ +# -*- coding: utf-8 -*- +# (C) Copyright 2017 Hewlett Packard Enterprise Development LP +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslotest import base + +from monasca_persister.repositories.influxdb import line_utils + +class TestInfluxdb(base.BaseTestCase): + + def setUp(self): + super(TestInfluxdb, self).setUp() + + def tearDown(self): + super(TestInfluxdb, self).tearDown() + + def test_line_utils_handles_utf8(self): + utf8_name = u'name' + self.assertEqual(u'"' + utf8_name + u'"', line_utils.escape_value(utf8_name)) + self.assertEqual(utf8_name, line_utils.escape_tag(utf8_name)) + + def test_line_utils_escape_tag(self): + simple = u"aaaaa" + self.assertEqual(simple, line_utils.escape_tag(simple)) + complex = u"a\\ b,c=" + self.assertEqual("a\\\\\\ b\\,c\\=", line_utils.escape_tag(complex)) + + def test_line_utils_escape_value(self): + simple = u"aaaaa" + self.assertEqual(u'"' + simple + u'"', line_utils.escape_value(simple)) + complex = u"a\\b\"\n" + self.assertEqual(u"\"a\\\\b\\\"\\n\"", line_utils.escape_value(complex))