Store time-series data into Mongo

This commit is contained in:
Ilya Shakhat 2016-02-26 17:15:32 +03:00
parent 9a605c2a68
commit 2c4e5167ee
5 changed files with 44 additions and 34 deletions

View File

@ -53,12 +53,9 @@ def main():
tag = utils.random_string()
LOG.info('Using auto-generated tag "%s"', tag)
records = player.play_scenario(scenario, tag)
records, series = player.play_scenario(scenario, tag)
if records:
storage.store_data(records, cfg.CONF.mongo_url, cfg.CONF.mongo_db)
else:
LOG.warning('Execution generated no records')
storage.store_data(cfg.CONF.mongo_url, cfg.CONF.mongo_db, records, series)
report.generate_report(scenario, base_dir, cfg.CONF.mongo_url,
cfg.CONF.mongo_db, cfg.CONF.book, tag)

View File

@ -48,6 +48,7 @@ def play_setup(setup):
def play_execution(execution_playbook):
records = []
series = []
for play in execution_playbook:
matrix = play.get('matrix')
@ -62,35 +63,37 @@ def play_execution(execution_playbook):
for command_result in command_results:
if command_result.get('status') == 'OK':
record = dict(id=utils.make_id(),
payload = command_result['payload']
common = dict(id=utils.make_id(),
host=command_result['host'],
status=command_result['status'],
task=command_result['task'])
payload = command_result['payload']
record.update(payload['invocation']['module_args'])
record.update(payload)
common.update(payload['invocation']['module_args'])
# keep flat values only
for k, v in record.items():
if isinstance(v, list) or isinstance(v, dict):
del record[k]
if 'records' in payload:
for rec in payload['records']:
rec.update(common)
records.append(rec)
LOG.debug('New record: %s', rec)
if 'stdout' in record:
del record['stdout']
if 'series' in payload:
for rec in payload['series']:
rec.update(common)
series.append(rec)
LOG.debug('New time series: %s', rec)
LOG.debug('Record: %s', record)
records.append(record)
return records
return records, series
def tag_records(records, tag):
def add_tag(records, tag):
for r in records:
r['tag'] = tag
def play_scenario(scenario, tag):
records = {}
records = []
series = []
if 'setup' in scenario:
play_setup(scenario['setup'])
@ -98,7 +101,8 @@ def play_scenario(scenario, tag):
if 'execution' in scenario:
execution = scenario['execution']
records = play_execution(execution)
tag_records(records, tag)
records, series = play_execution(execution)
add_tag(records, tag)
add_tag(series, tag)
return records
return records, series

View File

@ -21,12 +21,17 @@ from performa.engine import utils
LOG = logging.getLogger(__name__)
def store_data(records, mongo_url, mongo_db):
def store_data(mongo_url, mongo_db, records, series):
LOG.info('Store data to Mongo: %s', mongo_url)
connection_params = utils.parse_url(mongo_url)
mongo_client = pymongo.MongoClient(**connection_params)
db = mongo_client.get_database(mongo_db)
records_collection = db.get_collection('records')
records_collection.insert_many(records)
if records:
records_collection = db.get_collection('records')
records_collection.insert_many(records)
if series:
series_collection = db.get_collection('series')
series_collection.insert_many(series)

View File

@ -1,8 +1,10 @@
#!/usr/bin/python
import os
import re
import tempfile
ATOP_FILE_NAME = '/tmp/performa.atop'
ATOP_FILE_NAME = os.path.join(tempfile.gettempdir(), 'performa.atop')
UNIQUE_NAME = 'performa_atop'
PREFIX_PATTERN = (

View File

@ -68,17 +68,19 @@ def main():
'run'
) % module.params
start = int(time.time())
rc, stdout, stderr = module.run_command(cmd)
result = dict(changed=True, rc=rc, stdout=stdout, stderr=stderr, cmd=cmd)
end = int(time.time())
try:
result.update(parse_sysbench_oltp(stdout))
parsed = parse_sysbench_oltp(stdout)
parsed['start'] = start
parsed['end'] = end
result = dict(records=[parsed])
module.exit_json(**result)
except Exception as e:
result['exception'] = e
module.fail_json(**result)
module.fail_json(msg=e, rc=rc, stderr=stderr, stdout=stdout)
from ansible.module_utils.basic import * # noqa