Implement aggregation stage
This commit is contained in:
parent
4ab6709723
commit
abab0404cd
|
@ -0,0 +1,70 @@
|
|||
# Copyright (c) 2016 OpenStack Foundation
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from oslo_log import log as logging
|
||||
import pymongo
|
||||
|
||||
from performa.engine import utils
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def aggregate(scenario, mongo_url, db_name, tag):
|
||||
if 'aggregation' not in scenario:
|
||||
return # nothing to do
|
||||
|
||||
LOG.info('Running aggregation')
|
||||
|
||||
connection_params = utils.parse_url(mongo_url)
|
||||
mongo_client = pymongo.MongoClient(**connection_params)
|
||||
db = mongo_client.get_database(db_name)
|
||||
|
||||
aggregation = scenario['aggregation']
|
||||
|
||||
records_collection = db.get_collection('records')
|
||||
series_collection = db.get_collection('series')
|
||||
|
||||
for op_group in aggregation:
|
||||
for op, op_params in op_group.items():
|
||||
if op == 'update':
|
||||
|
||||
select_query = op_params['query']
|
||||
values_query = op_params['values']
|
||||
values_pipeline = values_query['pipeline']
|
||||
|
||||
select_query['tag'] = tag
|
||||
select_query['status'] = 'OK'
|
||||
|
||||
for rec in records_collection.find(select_query):
|
||||
start = rec['start']
|
||||
stop = rec['end'] # todo rename field into 'stop'
|
||||
|
||||
series_pipeline = [
|
||||
{'$match': {'$and': [
|
||||
{'tag': tag},
|
||||
{'timestamp': {'$gte': start}},
|
||||
{'timestamp': {'$lte': stop}}
|
||||
]}}
|
||||
]
|
||||
series_pipeline.extend(values_pipeline)
|
||||
|
||||
point = next(series_collection.aggregate(series_pipeline))
|
||||
del point['_id'] # to avoid overwriting
|
||||
rec.update(point)
|
||||
|
||||
records_collection.update_one({'_id': rec['_id']},
|
||||
{'$set': point})
|
||||
|
||||
LOG.debug('Updated record: %s', rec)
|
|
@ -19,6 +19,7 @@ from oslo_config import cfg
|
|||
from oslo_log import log as logging
|
||||
import yaml
|
||||
|
||||
from performa.engine import aggregator
|
||||
from performa.engine import ansible_runner
|
||||
from performa.engine import config
|
||||
from performa.engine import player
|
||||
|
@ -60,6 +61,8 @@ def main():
|
|||
|
||||
storage.store_data(cfg.CONF.mongo_url, cfg.CONF.mongo_db, records, series)
|
||||
|
||||
aggregator.aggregate(scenario, cfg.CONF.mongo_url, cfg.CONF.mongo_db, tag)
|
||||
|
||||
report.generate_report(scenario, base_dir, cfg.CONF.mongo_url,
|
||||
cfg.CONF.mongo_db, cfg.CONF.book, tag)
|
||||
|
||||
|
|
|
@ -7,7 +7,7 @@ This is the report of execution test plan
|
|||
Results
|
||||
^^^^^^^
|
||||
|
||||
Chart and table:
|
||||
Queries per second depending on threads count:
|
||||
|
||||
{{'''
|
||||
title: Queries per second
|
||||
|
@ -30,6 +30,29 @@ Chart and table:
|
|||
''' | chart
|
||||
}}
|
||||
|
||||
Queries per second and mysqld CPU consumption depending on threads count:
|
||||
|
||||
{{'''
|
||||
title: Queries and and CPU util per second
|
||||
axes:
|
||||
x: threads
|
||||
y: queries per sec
|
||||
y2: mysqld CPU consumption, %
|
||||
chart: line
|
||||
pipeline:
|
||||
- { $match: { task: sysbench_oltp, status: OK }}
|
||||
- { $group: { _id: { threads: "$threads" },
|
||||
queries_total_per_sec: { $avg: { $divide: ["$queries_total", "$duration"] }},
|
||||
mysqld_total: { $avg: "$mysqld_total" }
|
||||
}}
|
||||
- { $project: { x: "$_id.threads",
|
||||
y: "$queries_total_per_sec",
|
||||
y2: { $multiply: [ "$mysqld_total", 100 ] }
|
||||
}}
|
||||
- { $sort: { x: 1 }}
|
||||
''' | chart
|
||||
}}
|
||||
|
||||
.. references:
|
||||
|
||||
.. _Sysbench: https://github.com/akopytov/sysbench
|
||||
|
|
|
@ -23,7 +23,7 @@ execution:
|
|||
-
|
||||
hosts: $target
|
||||
matrix:
|
||||
threads: [ 10, 20, 30 ]
|
||||
threads: [ 10, 20, 30, 40, 50, 60 ]
|
||||
tasks:
|
||||
- sysbench_oltp:
|
||||
duration: 10
|
||||
|
@ -34,5 +34,15 @@ execution:
|
|||
command: stop
|
||||
labels: [ CPU, PRC, PRM ]
|
||||
|
||||
aggregation:
|
||||
-
|
||||
update:
|
||||
query:
|
||||
{ task: sysbench_oltp }
|
||||
values:
|
||||
pipeline:
|
||||
- { $match: { task: atop, status: OK, label: PRC, name: mysqld }}
|
||||
- { $group: { _id: null, mysqld_sys: { $avg: "$sys" }, mysqld_user: { $avg: "$user" }, mysqld_total: { $avg: { $add: [ "$sys", "$user" ] }} }}
|
||||
|
||||
report:
|
||||
template: sysbench.rst
|
||||
|
|
Loading…
Reference in New Issue