Merge "Increase Persister Performance"

This commit is contained in:
Jenkins 2017-06-26 05:19:10 +00:00 committed by Gerrit Code Review
commit 97a9a7032a
6 changed files with 140 additions and 55 deletions

View File

@ -1,4 +1,4 @@
# (C) Copyright 2016 Hewlett Packard Enterprise Development Company LP
# (C) Copyright 2016-2017 Hewlett Packard Enterprise Development LP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -34,4 +34,4 @@ class AbstractInfluxdbRepository(abstract_repository.AbstractRepository):
self.conf.influxdb.database_name)
def write_batch(self, data_points):
self._influxdb_client.write_points(data_points, 'ms')
self._influxdb_client.write_points(data_points, 'ms', protocol='line')

View File

@ -1,4 +1,4 @@
# (C) Copyright 2016 Hewlett Packard Enterprise Development Company LP
# (C) Copyright 2016-2017 Hewlett Packard Enterprise Development LP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -12,13 +12,12 @@
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from datetime import datetime
import json
from oslo_log import log
import pytz
from monasca_persister.repositories.influxdb import abstract_repository
from monasca_persister.repositories.influxdb import line_utils
from monasca_persister.repositories.utils import parse_alarm_state_hist_message
LOG = log.getLogger(__name__)
@ -39,28 +38,27 @@ class AlarmStateHistInfluxdbRepository(
time_stamp) = parse_alarm_state_hist_message(
message)
ts = time_stamp / 1000.0
name = u'alarm_state_history'
fields = []
fields.append(u'tenant_id=' + line_utils.escape_value(tenant_id))
fields.append(u'alarm_id=' + line_utils.escape_value(alarm_id))
fields.append(u'metrics=' + line_utils.escape_value(
json.dumps(metrics, ensure_ascii=False)))
fields.append(u'new_state=' + line_utils.escape_value(new_state))
fields.append(u'old_state=' + line_utils.escape_value(old_state))
fields.append(u'link=' + line_utils.escape_value(link))
fields.append(u'lifecycle_state=' + line_utils.escape_value(
lifecycle_state))
fields.append(u'reason=' + line_utils.escape_value(
state_change_reason))
fields.append(u'reason_data=' + line_utils.escape_value("{}"))
fields.append(u'sub_alarms=' + line_utils.escape_value(
sub_alarms_json_snake_case))
data = {"measurement": 'alarm_state_history',
"time": datetime.fromtimestamp(ts, tz=pytz.utc).strftime(
'%Y-%m-%dT%H:%M:%S.%fZ'),
"fields": {
"tenant_id": tenant_id.encode('utf8'),
"alarm_id": alarm_id.encode('utf8'),
"metrics": json.dumps(metrics, ensure_ascii=False).encode(
'utf8'),
"new_state": new_state.encode('utf8'),
"old_state": old_state.encode('utf8'),
"link": link.encode('utf8'),
"lifecycle_state": lifecycle_state.encode('utf8'),
"reason": state_change_reason.encode('utf8'),
"reason_data": "{}".encode('utf8'),
"sub_alarms": sub_alarms_json_snake_case.encode('utf8')
},
"tags": {
"tenant_id": tenant_id.encode('utf8')
}}
line = name + u',tenant_id=' + line_utils.escape_tag(tenant_id)
line += u' ' + u','.join(fields)
line += u' ' + str(int(time_stamp))
LOG.debug(data)
LOG.debug(line)
return data
return line

View File

@ -0,0 +1,46 @@
# (C) Copyright 2017 Hewlett Packard Enterprise Development LP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from six import PY2
def escape_tag(tag):
tag = get_unicode(tag)
return tag.replace(
u"\\", u"\\\\"
).replace(
u" ", u"\\ "
).replace(
u",", u"\\,"
).replace(
u"=", u"\\="
)
def get_unicode(data):
if PY2:
return unicode(data)
else:
return str(data)
def escape_value(value):
return u"\"{0}\"".format(
get_unicode(value).replace(
u"\\", u"\\\\"
).replace(
u"\"", u"\\\""
).replace(
u"\n", u"\\n"
)
)

View File

@ -1,4 +1,4 @@
# (C) Copyright 2016 Hewlett Packard Enterprise Development Company LP
# (C) Copyright 2016-2017 Hewlett Packard Enterprise Development LP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -12,13 +12,12 @@
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from datetime import datetime
import json
from oslo_log import log
import pytz
from monasca_persister.repositories.influxdb import abstract_repository
from monasca_persister.repositories.influxdb import line_utils
from monasca_persister.repositories.utils import parse_measurement_message
LOG = log.getLogger(__name__)
@ -36,20 +35,27 @@ class MetricInfluxdbRepository(abstract_repository.AbstractInfluxdbRepository):
value_meta) = parse_measurement_message(message)
tags = dimensions
tags['_tenant_id'] = tenant_id.encode('utf8')
tags['_region'] = region.encode('utf8')
tags[u'_tenant_id'] = tenant_id
tags[u'_region'] = region
ts = time_stamp / 1000.0
if not value_meta:
value_meta_str = u'"{}"'
else:
value_meta_str = line_utils.escape_value(json.dumps(value_meta, ensure_ascii=False))
data = {"measurement": metric_name.encode('utf8'),
"time": datetime.fromtimestamp(ts, tz=pytz.utc).strftime(
'%Y-%m-%dT%H:%M:%S.%fZ'),
"fields": {
"value": value,
"value_meta": json.dumps(value_meta,
ensure_ascii=False).encode('utf8')
},
"tags": tags}
key_values = [line_utils.escape_tag(metric_name)]
# tags should be sorted client-side to take load off server
for key in sorted(tags.keys()):
key_tag = line_utils.escape_tag(key)
value_tag = line_utils.escape_tag(tags[key])
key_values.append(key_tag + u'=' + value_tag)
key_values = u','.join(key_values)
value_field = u'value={}'.format(value)
value_meta_field = u'value_meta=' + value_meta_str
data = key_values + u' ' + value_field + u',' + value_meta_field + u' ' + str(int(time_stamp))
LOG.debug(data)

View File

@ -1,4 +1,4 @@
# (C) Copyright 2016 Hewlett Packard Enterprise Development Company LP
# (C) Copyright 2016-2017 Hewlett Packard Enterprise Development LP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -27,24 +27,17 @@ def parse_measurement_message(message):
tenant_id = decoded_message['meta']['tenantId']
dimensions = {}
if 'dimensions' in metric:
for dimension_name in metric['dimensions']:
dimensions[dimension_name.encode('utf8')] = (
metric['dimensions'][dimension_name].encode('utf8'))
time_stamp = metric['timestamp']
value = float(metric['value'])
if 'value_meta' in metric and metric['value_meta']:
value_meta = metric['value_meta']
else:
value_meta = metric.get('value_meta', {})
if 'value_meta' is None:
# Ensure value_meta is a dict
value_meta = {}
return (dimensions, metric_name, region, tenant_id, time_stamp, value,
value_meta)
return (metric.get('dimensions', {}), metric_name, region, tenant_id,
time_stamp, value, value_meta)
def parse_alarm_state_hist_message(message):

View File

@ -0,0 +1,42 @@
# -*- coding: utf-8 -*-
# (C) Copyright 2017 Hewlett Packard Enterprise Development LP
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslotest import base
from monasca_persister.repositories.influxdb import line_utils
class TestInfluxdb(base.BaseTestCase):
def setUp(self):
super(TestInfluxdb, self).setUp()
def tearDown(self):
super(TestInfluxdb, self).tearDown()
def test_line_utils_handles_utf8(self):
utf8_name = u''
self.assertEqual(u'"' + utf8_name + u'"', line_utils.escape_value(utf8_name))
self.assertEqual(utf8_name, line_utils.escape_tag(utf8_name))
def test_line_utils_escape_tag(self):
simple = u"aaaaa"
self.assertEqual(simple, line_utils.escape_tag(simple))
complex = u"a\\ b,c="
self.assertEqual("a\\\\\\ b\\,c\\=", line_utils.escape_tag(complex))
def test_line_utils_escape_value(self):
simple = u"aaaaa"
self.assertEqual(u'"' + simple + u'"', line_utils.escape_value(simple))
complex = u"a\\b\"\n"
self.assertEqual(u"\"a\\\\b\\\"\\n\"", line_utils.escape_value(complex))