diff --git a/monasca_persister/repositories/influxdb/abstract_repository.py b/monasca_persister/repositories/influxdb/abstract_repository.py index 3b9a8f22..49354266 100644 --- a/monasca_persister/repositories/influxdb/abstract_repository.py +++ b/monasca_persister/repositories/influxdb/abstract_repository.py @@ -1,4 +1,4 @@ -# (C) Copyright 2016 Hewlett Packard Enterprise Development Company LP +# (C) Copyright 2016-2017 Hewlett Packard Enterprise Development LP # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -34,4 +34,4 @@ class AbstractInfluxdbRepository(abstract_repository.AbstractRepository): self.conf.influxdb.database_name) def write_batch(self, data_points): - self._influxdb_client.write_points(data_points, 'ms') + self._influxdb_client.write_points(data_points, 'ms', protocol='line') diff --git a/monasca_persister/repositories/influxdb/alarm_state_history_repository.py b/monasca_persister/repositories/influxdb/alarm_state_history_repository.py index bc93204d..fea37bb6 100644 --- a/monasca_persister/repositories/influxdb/alarm_state_history_repository.py +++ b/monasca_persister/repositories/influxdb/alarm_state_history_repository.py @@ -1,4 +1,4 @@ -# (C) Copyright 2016 Hewlett Packard Enterprise Development Company LP +# (C) Copyright 2016-2017 Hewlett Packard Enterprise Development LP # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -12,13 +12,12 @@ # implied. # See the License for the specific language governing permissions and # limitations under the License. -from datetime import datetime import json from oslo_log import log -import pytz from monasca_persister.repositories.influxdb import abstract_repository +from monasca_persister.repositories.influxdb import line_utils from monasca_persister.repositories.utils import parse_alarm_state_hist_message LOG = log.getLogger(__name__) @@ -39,28 +38,27 @@ class AlarmStateHistInfluxdbRepository( time_stamp) = parse_alarm_state_hist_message( message) - ts = time_stamp / 1000.0 + name = u'alarm_state_history' + fields = [] + fields.append(u'tenant_id=' + line_utils.escape_value(tenant_id)) + fields.append(u'alarm_id=' + line_utils.escape_value(alarm_id)) + fields.append(u'metrics=' + line_utils.escape_value( + json.dumps(metrics, ensure_ascii=False))) + fields.append(u'new_state=' + line_utils.escape_value(new_state)) + fields.append(u'old_state=' + line_utils.escape_value(old_state)) + fields.append(u'link=' + line_utils.escape_value(link)) + fields.append(u'lifecycle_state=' + line_utils.escape_value( + lifecycle_state)) + fields.append(u'reason=' + line_utils.escape_value( + state_change_reason)) + fields.append(u'reason_data=' + line_utils.escape_value("{}")) + fields.append(u'sub_alarms=' + line_utils.escape_value( + sub_alarms_json_snake_case)) - data = {"measurement": 'alarm_state_history', - "time": datetime.fromtimestamp(ts, tz=pytz.utc).strftime( - '%Y-%m-%dT%H:%M:%S.%fZ'), - "fields": { - "tenant_id": tenant_id.encode('utf8'), - "alarm_id": alarm_id.encode('utf8'), - "metrics": json.dumps(metrics, ensure_ascii=False).encode( - 'utf8'), - "new_state": new_state.encode('utf8'), - "old_state": old_state.encode('utf8'), - "link": link.encode('utf8'), - "lifecycle_state": lifecycle_state.encode('utf8'), - "reason": state_change_reason.encode('utf8'), - "reason_data": "{}".encode('utf8'), - "sub_alarms": sub_alarms_json_snake_case.encode('utf8') - }, - "tags": { - "tenant_id": tenant_id.encode('utf8') - }} + line = name + u',tenant_id=' + line_utils.escape_tag(tenant_id) + line += u' ' + u','.join(fields) + line += u' ' + str(int(time_stamp)) - LOG.debug(data) + LOG.debug(line) - return data + return line diff --git a/monasca_persister/repositories/influxdb/line_utils.py b/monasca_persister/repositories/influxdb/line_utils.py new file mode 100644 index 00000000..367e0c71 --- /dev/null +++ b/monasca_persister/repositories/influxdb/line_utils.py @@ -0,0 +1,46 @@ +# (C) Copyright 2017 Hewlett Packard Enterprise Development LP +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from six import PY2 + + +def escape_tag(tag): + tag = get_unicode(tag) + return tag.replace( + u"\\", u"\\\\" + ).replace( + u" ", u"\\ " + ).replace( + u",", u"\\," + ).replace( + u"=", u"\\=" + ) + +def get_unicode(data): + if PY2: + return unicode(data) + else: + return str(data) + +def escape_value(value): + return u"\"{0}\"".format( + get_unicode(value).replace( + u"\\", u"\\\\" + ).replace( + u"\"", u"\\\"" + ).replace( + u"\n", u"\\n" + ) + ) diff --git a/monasca_persister/repositories/influxdb/metrics_repository.py b/monasca_persister/repositories/influxdb/metrics_repository.py index 7afc4184..0ab342b3 100644 --- a/monasca_persister/repositories/influxdb/metrics_repository.py +++ b/monasca_persister/repositories/influxdb/metrics_repository.py @@ -1,4 +1,4 @@ -# (C) Copyright 2016 Hewlett Packard Enterprise Development Company LP +# (C) Copyright 2016-2017 Hewlett Packard Enterprise Development LP # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -12,13 +12,12 @@ # implied. # See the License for the specific language governing permissions and # limitations under the License. -from datetime import datetime import json from oslo_log import log -import pytz from monasca_persister.repositories.influxdb import abstract_repository +from monasca_persister.repositories.influxdb import line_utils from monasca_persister.repositories.utils import parse_measurement_message LOG = log.getLogger(__name__) @@ -36,20 +35,27 @@ class MetricInfluxdbRepository(abstract_repository.AbstractInfluxdbRepository): value_meta) = parse_measurement_message(message) tags = dimensions - tags['_tenant_id'] = tenant_id.encode('utf8') - tags['_region'] = region.encode('utf8') + tags[u'_tenant_id'] = tenant_id + tags[u'_region'] = region - ts = time_stamp / 1000.0 + if not value_meta: + value_meta_str = u'"{}"' + else: + value_meta_str = line_utils.escape_value(json.dumps(value_meta, ensure_ascii=False)) - data = {"measurement": metric_name.encode('utf8'), - "time": datetime.fromtimestamp(ts, tz=pytz.utc).strftime( - '%Y-%m-%dT%H:%M:%S.%fZ'), - "fields": { - "value": value, - "value_meta": json.dumps(value_meta, - ensure_ascii=False).encode('utf8') - }, - "tags": tags} + key_values = [line_utils.escape_tag(metric_name)] + + # tags should be sorted client-side to take load off server + for key in sorted(tags.keys()): + key_tag = line_utils.escape_tag(key) + value_tag = line_utils.escape_tag(tags[key]) + key_values.append(key_tag + u'=' + value_tag) + key_values = u','.join(key_values) + + value_field = u'value={}'.format(value) + value_meta_field = u'value_meta=' + value_meta_str + + data = key_values + u' ' + value_field + u',' + value_meta_field + u' ' + str(int(time_stamp)) LOG.debug(data) diff --git a/monasca_persister/repositories/utils.py b/monasca_persister/repositories/utils.py index e55c7816..6a78fa1b 100644 --- a/monasca_persister/repositories/utils.py +++ b/monasca_persister/repositories/utils.py @@ -1,4 +1,4 @@ -# (C) Copyright 2016 Hewlett Packard Enterprise Development Company LP +# (C) Copyright 2016-2017 Hewlett Packard Enterprise Development LP # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -27,24 +27,17 @@ def parse_measurement_message(message): tenant_id = decoded_message['meta']['tenantId'] - dimensions = {} - if 'dimensions' in metric: - for dimension_name in metric['dimensions']: - dimensions[dimension_name.encode('utf8')] = ( - metric['dimensions'][dimension_name].encode('utf8')) - time_stamp = metric['timestamp'] value = float(metric['value']) - if 'value_meta' in metric and metric['value_meta']: - value_meta = metric['value_meta'] - - else: + value_meta = metric.get('value_meta', {}) + if 'value_meta' is None: + # Ensure value_meta is a dict value_meta = {} - return (dimensions, metric_name, region, tenant_id, time_stamp, value, - value_meta) + return (metric.get('dimensions', {}), metric_name, region, tenant_id, + time_stamp, value, value_meta) def parse_alarm_state_hist_message(message): diff --git a/monasca_persister/tests/test_influxdb.py b/monasca_persister/tests/test_influxdb.py new file mode 100644 index 00000000..eebb0183 --- /dev/null +++ b/monasca_persister/tests/test_influxdb.py @@ -0,0 +1,42 @@ +# -*- coding: utf-8 -*- +# (C) Copyright 2017 Hewlett Packard Enterprise Development LP +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslotest import base + +from monasca_persister.repositories.influxdb import line_utils + +class TestInfluxdb(base.BaseTestCase): + + def setUp(self): + super(TestInfluxdb, self).setUp() + + def tearDown(self): + super(TestInfluxdb, self).tearDown() + + def test_line_utils_handles_utf8(self): + utf8_name = u'name' + self.assertEqual(u'"' + utf8_name + u'"', line_utils.escape_value(utf8_name)) + self.assertEqual(utf8_name, line_utils.escape_tag(utf8_name)) + + def test_line_utils_escape_tag(self): + simple = u"aaaaa" + self.assertEqual(simple, line_utils.escape_tag(simple)) + complex = u"a\\ b,c=" + self.assertEqual("a\\\\\\ b\\,c\\=", line_utils.escape_tag(complex)) + + def test_line_utils_escape_value(self): + simple = u"aaaaa" + self.assertEqual(u'"' + simple + u'"', line_utils.escape_value(simple)) + complex = u"a\\b\"\n" + self.assertEqual(u"\"a\\\\b\\\"\\n\"", line_utils.escape_value(complex))