summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTong Li <litong01@us.ibm.com>2016-02-12 00:09:57 -0500
committerTong Li <litong01@us.ibm.com>2016-02-12 00:10:24 -0500
commitb93ded41ed4150468f210154c569b258411685d4 (patch)
tree0d213d7ff1e860c003ecc5987453e6b1525c5eb6
parent8c25b2db61bd3b1bff126707411a70633f83e1e5 (diff)
enable bulk message post on persister
Notes
Notes (review): Code-Review+2: litong01 <litong01@us.ibm.com> Workflow+1: litong01 <litong01@us.ibm.com> Verified+2: Jenkins Submitted-by: Jenkins Submitted-at: Fri, 12 Feb 2016 05:38:27 +0000 Reviewed-on: https://review.openstack.org/279243 Project: openstack/kiloeyes Branch: refs/heads/master
-rwxr-xr-xkiloeyes/common/es_conn.py13
-rwxr-xr-xkiloeyes/microservice/metrics_fixer.py9
-rwxr-xr-xkiloeyes/tests/common/test_es_conn.py14
-rwxr-xr-xkiloeyes/tests/microservice/test_metrics_fixer.py2
4 files changed, 11 insertions, 27 deletions
diff --git a/kiloeyes/common/es_conn.py b/kiloeyes/common/es_conn.py
index 181b7b7..7604892 100755
--- a/kiloeyes/common/es_conn.py
+++ b/kiloeyes/common/es_conn.py
@@ -67,20 +67,11 @@ class ESConnection(object):
67 if self.drop_data: 67 if self.drop_data:
68 return 68 return
69 else: 69 else:
70 # figure out id situation
71 _id = ''
72 if self.id_field:
73 obj = json.loads(msg)
74 _id = obj.get(self.id_field)
75 if not _id:
76 LOG.error('Msg does not have required id field %s' %
77 self.id_field)
78 return 400
79 # index may change over the time, it has to be called for each 70 # index may change over the time, it has to be called for each
80 # request 71 # request
81 index = self.index_strategy.get_index() 72 index = self.index_strategy.get_index()
82 path = '%s%s%s/%s/%s' % (self.uri, self.index_prefix, 73 path = '%s%s%s/%s/_bulk' % (self.uri, self.index_prefix,
83 index, self.doc_type, _id) 74 index, self.doc_type)
84 res = requests.post(path, data=msg) 75 res = requests.post(path, data=msg)
85 LOG.debug('Msg post target=%s' % path) 76 LOG.debug('Msg post target=%s' % path)
86 LOG.debug('Msg posted with response code: %s' % res.status_code) 77 LOG.debug('Msg posted with response code: %s' % res.status_code)
diff --git a/kiloeyes/microservice/metrics_fixer.py b/kiloeyes/microservice/metrics_fixer.py
index cbff6cf..8738b22 100755
--- a/kiloeyes/microservice/metrics_fixer.py
+++ b/kiloeyes/microservice/metrics_fixer.py
@@ -45,7 +45,14 @@ class MetricsFixer(object):
45 45
46 def process_msg(self, msg): 46 def process_msg(self, msg):
47 try: 47 try:
48 return MetricsFixer._add_hash(json.loads(msg)) 48 data = json.loads(msg)
49 if not isinstance(data, list):
50 data = [data]
51 result = ''
52 for item in data:
53 result += '{"index":{}}\n' + MetricsFixer._add_hash(item)
54 result += '\n'
55 return result
49 except Exception: 56 except Exception:
50 LOG.exception('') 57 LOG.exception('')
51 return '' 58 return ''
diff --git a/kiloeyes/tests/common/test_es_conn.py b/kiloeyes/tests/common/test_es_conn.py
index ef7b0c4..2038e23 100755
--- a/kiloeyes/tests/common/test_es_conn.py
+++ b/kiloeyes/tests/common/test_es_conn.py
@@ -43,17 +43,3 @@ class TestESConnection(tests.BaseTestCase):
43 with mock.patch.object(requests, 'post', return_value=req_result): 43 with mock.patch.object(requests, 'post', return_value=req_result):
44 conn.send_messages(json.dumps(msg)) 44 conn.send_messages(json.dumps(msg))
45 self.assertTrue(requests.post.called) 45 self.assertTrue(requests.post.called)
46
47 def test_send_messages_without_id(self):
48 self.CONF.set_override('id_field', 'id', group='es_conn')
49 self.CONF.set_override('uri', 'http://fake', group='es_conn')
50 self.CONF.set_override('time_unit', 'h', group='timed_strategy')
51 strategy = timed_strategy.TimedStrategy()
52 conn = es_conn.ESConnection('alarms', strategy, 'pre_')
53 req_result = mock.Mock()
54 req_result.status_code = 204
55 msg = {'not_id': 'whatever'}
56 with mock.patch.object(requests, 'post', return_value=req_result):
57 res = conn.send_messages(json.dumps(msg))
58 self.assertFalse(requests.post.called)
59 self.assertEqual(res, 400)
diff --git a/kiloeyes/tests/microservice/test_metrics_fixer.py b/kiloeyes/tests/microservice/test_metrics_fixer.py
index 1daf4c5..5fce241 100755
--- a/kiloeyes/tests/microservice/test_metrics_fixer.py
+++ b/kiloeyes/tests/microservice/test_metrics_fixer.py
@@ -48,7 +48,7 @@ class TestMetricsFixer(tests.BaseTestCase):
48 fixer = metrics_fixer.MetricsFixer() 48 fixer = metrics_fixer.MetricsFixer()
49 result = fixer.process_msg(json.dumps(items)) 49 result = fixer.process_msg(json.dumps(items))
50 self.assertTrue(isinstance(result, str)) 50 self.assertTrue(isinstance(result, str))
51 self.assertFalse(result.startswith('{"index":{}}')) 51 self.assertTrue(result.startswith('{"index":{}}'))
52 52
53 def test_process_msg_multiple(self): 53 def test_process_msg_multiple(self):
54 items = [{'name': 'name1', 'dimensions': {'name1': 'value1'}, 54 items = [{'name': 'name1', 'dimensions': {'name1': 'value1'},