enable bulk message post on persister

Change-Id: Ifbf2c5e1517c0e7a042b241657725640e8cdebec
This commit is contained in:
Tong Li 2016-02-12 00:09:57 -05:00
parent 8c25b2db61
commit b93ded41ed
4 changed files with 11 additions and 27 deletions

View File

@ -67,20 +67,11 @@ class ESConnection(object):
if self.drop_data:
return
else:
# figure out id situation
_id = ''
if self.id_field:
obj = json.loads(msg)
_id = obj.get(self.id_field)
if not _id:
LOG.error('Msg does not have required id field %s' %
self.id_field)
return 400
# index may change over the time, it has to be called for each
# request
index = self.index_strategy.get_index()
path = '%s%s%s/%s/%s' % (self.uri, self.index_prefix,
index, self.doc_type, _id)
path = '%s%s%s/%s/_bulk' % (self.uri, self.index_prefix,
index, self.doc_type)
res = requests.post(path, data=msg)
LOG.debug('Msg post target=%s' % path)
LOG.debug('Msg posted with response code: %s' % res.status_code)

View File

@ -45,7 +45,14 @@ class MetricsFixer(object):
def process_msg(self, msg):
try:
return MetricsFixer._add_hash(json.loads(msg))
data = json.loads(msg)
if not isinstance(data, list):
data = [data]
result = ''
for item in data:
result += '{"index":{}}\n' + MetricsFixer._add_hash(item)
result += '\n'
return result
except Exception:
LOG.exception('')
return ''

View File

@ -43,17 +43,3 @@ class TestESConnection(tests.BaseTestCase):
with mock.patch.object(requests, 'post', return_value=req_result):
conn.send_messages(json.dumps(msg))
self.assertTrue(requests.post.called)
def test_send_messages_without_id(self):
self.CONF.set_override('id_field', 'id', group='es_conn')
self.CONF.set_override('uri', 'http://fake', group='es_conn')
self.CONF.set_override('time_unit', 'h', group='timed_strategy')
strategy = timed_strategy.TimedStrategy()
conn = es_conn.ESConnection('alarms', strategy, 'pre_')
req_result = mock.Mock()
req_result.status_code = 204
msg = {'not_id': 'whatever'}
with mock.patch.object(requests, 'post', return_value=req_result):
res = conn.send_messages(json.dumps(msg))
self.assertFalse(requests.post.called)
self.assertEqual(res, 400)

View File

@ -48,7 +48,7 @@ class TestMetricsFixer(tests.BaseTestCase):
fixer = metrics_fixer.MetricsFixer()
result = fixer.process_msg(json.dumps(items))
self.assertTrue(isinstance(result, str))
self.assertFalse(result.startswith('{"index":{}}'))
self.assertTrue(result.startswith('{"index":{}}'))
def test_process_msg_multiple(self):
items = [{'name': 'name1', 'dimensions': {'name1': 'value1'},