Store time-series data into Mongo
This commit is contained in:
parent
9a605c2a68
commit
2c4e5167ee
|
@ -53,12 +53,9 @@ def main():
|
|||
tag = utils.random_string()
|
||||
LOG.info('Using auto-generated tag "%s"', tag)
|
||||
|
||||
records = player.play_scenario(scenario, tag)
|
||||
records, series = player.play_scenario(scenario, tag)
|
||||
|
||||
if records:
|
||||
storage.store_data(records, cfg.CONF.mongo_url, cfg.CONF.mongo_db)
|
||||
else:
|
||||
LOG.warning('Execution generated no records')
|
||||
storage.store_data(cfg.CONF.mongo_url, cfg.CONF.mongo_db, records, series)
|
||||
|
||||
report.generate_report(scenario, base_dir, cfg.CONF.mongo_url,
|
||||
cfg.CONF.mongo_db, cfg.CONF.book, tag)
|
||||
|
|
|
@ -48,6 +48,7 @@ def play_setup(setup):
|
|||
|
||||
def play_execution(execution_playbook):
|
||||
records = []
|
||||
series = []
|
||||
|
||||
for play in execution_playbook:
|
||||
matrix = play.get('matrix')
|
||||
|
@ -62,35 +63,37 @@ def play_execution(execution_playbook):
|
|||
|
||||
for command_result in command_results:
|
||||
if command_result.get('status') == 'OK':
|
||||
record = dict(id=utils.make_id(),
|
||||
payload = command_result['payload']
|
||||
|
||||
common = dict(id=utils.make_id(),
|
||||
host=command_result['host'],
|
||||
status=command_result['status'],
|
||||
task=command_result['task'])
|
||||
payload = command_result['payload']
|
||||
record.update(payload['invocation']['module_args'])
|
||||
record.update(payload)
|
||||
common.update(payload['invocation']['module_args'])
|
||||
|
||||
# keep flat values only
|
||||
for k, v in record.items():
|
||||
if isinstance(v, list) or isinstance(v, dict):
|
||||
del record[k]
|
||||
if 'records' in payload:
|
||||
for rec in payload['records']:
|
||||
rec.update(common)
|
||||
records.append(rec)
|
||||
LOG.debug('New record: %s', rec)
|
||||
|
||||
if 'stdout' in record:
|
||||
del record['stdout']
|
||||
if 'series' in payload:
|
||||
for rec in payload['series']:
|
||||
rec.update(common)
|
||||
series.append(rec)
|
||||
LOG.debug('New time series: %s', rec)
|
||||
|
||||
LOG.debug('Record: %s', record)
|
||||
records.append(record)
|
||||
|
||||
return records
|
||||
return records, series
|
||||
|
||||
|
||||
def tag_records(records, tag):
|
||||
def add_tag(records, tag):
|
||||
for r in records:
|
||||
r['tag'] = tag
|
||||
|
||||
|
||||
def play_scenario(scenario, tag):
|
||||
records = {}
|
||||
records = []
|
||||
series = []
|
||||
|
||||
if 'setup' in scenario:
|
||||
play_setup(scenario['setup'])
|
||||
|
@ -98,7 +101,8 @@ def play_scenario(scenario, tag):
|
|||
if 'execution' in scenario:
|
||||
execution = scenario['execution']
|
||||
|
||||
records = play_execution(execution)
|
||||
tag_records(records, tag)
|
||||
records, series = play_execution(execution)
|
||||
add_tag(records, tag)
|
||||
add_tag(series, tag)
|
||||
|
||||
return records
|
||||
return records, series
|
||||
|
|
|
@ -21,12 +21,17 @@ from performa.engine import utils
|
|||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def store_data(records, mongo_url, mongo_db):
|
||||
def store_data(mongo_url, mongo_db, records, series):
|
||||
LOG.info('Store data to Mongo: %s', mongo_url)
|
||||
|
||||
connection_params = utils.parse_url(mongo_url)
|
||||
mongo_client = pymongo.MongoClient(**connection_params)
|
||||
db = mongo_client.get_database(mongo_db)
|
||||
|
||||
records_collection = db.get_collection('records')
|
||||
records_collection.insert_many(records)
|
||||
if records:
|
||||
records_collection = db.get_collection('records')
|
||||
records_collection.insert_many(records)
|
||||
|
||||
if series:
|
||||
series_collection = db.get_collection('series')
|
||||
series_collection.insert_many(series)
|
||||
|
|
|
@ -1,8 +1,10 @@
|
|||
#!/usr/bin/python
|
||||
|
||||
import os
|
||||
import re
|
||||
import tempfile
|
||||
|
||||
ATOP_FILE_NAME = '/tmp/performa.atop'
|
||||
ATOP_FILE_NAME = os.path.join(tempfile.gettempdir(), 'performa.atop')
|
||||
UNIQUE_NAME = 'performa_atop'
|
||||
|
||||
PREFIX_PATTERN = (
|
||||
|
|
|
@ -68,17 +68,19 @@ def main():
|
|||
'run'
|
||||
) % module.params
|
||||
|
||||
start = int(time.time())
|
||||
rc, stdout, stderr = module.run_command(cmd)
|
||||
|
||||
result = dict(changed=True, rc=rc, stdout=stdout, stderr=stderr, cmd=cmd)
|
||||
end = int(time.time())
|
||||
|
||||
try:
|
||||
result.update(parse_sysbench_oltp(stdout))
|
||||
parsed = parse_sysbench_oltp(stdout)
|
||||
parsed['start'] = start
|
||||
parsed['end'] = end
|
||||
|
||||
result = dict(records=[parsed])
|
||||
module.exit_json(**result)
|
||||
except Exception as e:
|
||||
result['exception'] = e
|
||||
|
||||
module.fail_json(**result)
|
||||
module.fail_json(msg=e, rc=rc, stderr=stderr, stdout=stdout)
|
||||
|
||||
|
||||
from ansible.module_utils.basic import * # noqa
|
||||
|
|
Loading…
Reference in New Issue