enable bulk message post on persister
Change-Id: Ifbf2c5e1517c0e7a042b241657725640e8cdebec
This commit is contained in:
parent
8c25b2db61
commit
b93ded41ed
|
@ -67,20 +67,11 @@ class ESConnection(object):
|
|||
if self.drop_data:
|
||||
return
|
||||
else:
|
||||
# figure out id situation
|
||||
_id = ''
|
||||
if self.id_field:
|
||||
obj = json.loads(msg)
|
||||
_id = obj.get(self.id_field)
|
||||
if not _id:
|
||||
LOG.error('Msg does not have required id field %s' %
|
||||
self.id_field)
|
||||
return 400
|
||||
# index may change over the time, it has to be called for each
|
||||
# request
|
||||
index = self.index_strategy.get_index()
|
||||
path = '%s%s%s/%s/%s' % (self.uri, self.index_prefix,
|
||||
index, self.doc_type, _id)
|
||||
path = '%s%s%s/%s/_bulk' % (self.uri, self.index_prefix,
|
||||
index, self.doc_type)
|
||||
res = requests.post(path, data=msg)
|
||||
LOG.debug('Msg post target=%s' % path)
|
||||
LOG.debug('Msg posted with response code: %s' % res.status_code)
|
||||
|
|
|
@ -45,7 +45,14 @@ class MetricsFixer(object):
|
|||
|
||||
def process_msg(self, msg):
|
||||
try:
|
||||
return MetricsFixer._add_hash(json.loads(msg))
|
||||
data = json.loads(msg)
|
||||
if not isinstance(data, list):
|
||||
data = [data]
|
||||
result = ''
|
||||
for item in data:
|
||||
result += '{"index":{}}\n' + MetricsFixer._add_hash(item)
|
||||
result += '\n'
|
||||
return result
|
||||
except Exception:
|
||||
LOG.exception('')
|
||||
return ''
|
||||
|
|
|
@ -43,17 +43,3 @@ class TestESConnection(tests.BaseTestCase):
|
|||
with mock.patch.object(requests, 'post', return_value=req_result):
|
||||
conn.send_messages(json.dumps(msg))
|
||||
self.assertTrue(requests.post.called)
|
||||
|
||||
def test_send_messages_without_id(self):
|
||||
self.CONF.set_override('id_field', 'id', group='es_conn')
|
||||
self.CONF.set_override('uri', 'http://fake', group='es_conn')
|
||||
self.CONF.set_override('time_unit', 'h', group='timed_strategy')
|
||||
strategy = timed_strategy.TimedStrategy()
|
||||
conn = es_conn.ESConnection('alarms', strategy, 'pre_')
|
||||
req_result = mock.Mock()
|
||||
req_result.status_code = 204
|
||||
msg = {'not_id': 'whatever'}
|
||||
with mock.patch.object(requests, 'post', return_value=req_result):
|
||||
res = conn.send_messages(json.dumps(msg))
|
||||
self.assertFalse(requests.post.called)
|
||||
self.assertEqual(res, 400)
|
||||
|
|
|
@ -48,7 +48,7 @@ class TestMetricsFixer(tests.BaseTestCase):
|
|||
fixer = metrics_fixer.MetricsFixer()
|
||||
result = fixer.process_msg(json.dumps(items))
|
||||
self.assertTrue(isinstance(result, str))
|
||||
self.assertFalse(result.startswith('{"index":{}}'))
|
||||
self.assertTrue(result.startswith('{"index":{}}'))
|
||||
|
||||
def test_process_msg_multiple(self):
|
||||
items = [{'name': 'name1', 'dimensions': {'name1': 'value1'},
|
||||
|
|
Loading…
Reference in New Issue