Turn executors into Ansible modules

This commit is contained in:
Ilya Shakhat 2016-02-25 19:26:18 +03:00
parent 29660a67ac
commit 92d86d73ad
12 changed files with 197 additions and 160 deletions

View File

@ -23,6 +23,8 @@ from ansible.plugins import callback
from ansible.vars import VariableManager
from oslo_log import log as logging
from performa.engine import utils
LOG = logging.getLogger(__name__)
@ -70,10 +72,14 @@ Options = namedtuple('Options',
def _run(play_source, host_list):
LOG.debug('Running play: %s on hosts: %s', play_source, host_list)
variable_manager = VariableManager()
loader = dataloader.DataLoader()
module_path = utils.resolve_relative_path('performa/modules')
options = Options(connection='smart', password='swordfish',
module_path='/path/to/mymodules',
module_path=module_path,
forks=100, remote_user='developer',
private_key_file=None,
ssh_common_args=None, ssh_extra_args=None,
@ -129,11 +135,13 @@ def run_command(command, host_list):
return _run(play_source, hosts)
def run_playbook(playbook, host_list):
def run_playbook(playbook):
result = []
for play_source in playbook:
hosts = ','.join(host_list) + ','
play_source['hosts'] = hosts
hosts = play_source['hosts']
play_source['gather_facts'] = 'no'
_run(play_source, hosts)
result += (_run(play_source, hosts))
return result

View File

@ -17,6 +17,7 @@ import copy
from oslo_config import cfg
from oslo_config import types
import yaml
from performa.engine import utils
@ -35,6 +36,20 @@ class Endpoint(types.String):
return "Endpoint host[:port]"
class Yaml(types.String):
def __call__(self, value):
value = str(value)
try:
value = yaml.safe_load(value)
except Exception:
raise ValueError('YAML value is expected, but got: %s' % value)
return value
def __repr__(self):
return "YAML data"
MAIN_OPTS = [
cfg.StrOpt('scenario',
default=utils.env('PERFORMA_SCENARIO'),
@ -52,10 +67,12 @@ MAIN_OPTS = [
default=utils.env('PERFORMA_MONGO_DB'),
required=True,
help='Mongo DB, defaults to env[PERFORMA_MONGO_DB].'),
cfg.ListOpt('hosts',
default=utils.env('PERFORMA_HOSTS'),
required=True,
help='List of hosts, defaults to env[PERFORMA_MONGO_URL].'),
cfg.Opt('hosts',
type=Yaml(),
default=utils.env('PERFORMA_HOSTS'),
required=True,
help='Hosts inventory definition in YAML format, '
'Can be specified via env[PERFORMA_HOSTS].'),
cfg.StrOpt('book',
default=utils.env('PERFORMA_BOOK'),
help='Generate report in ReST format and store it into the '

View File

@ -17,6 +17,7 @@ import os
from oslo_config import cfg
from oslo_log import log as logging
import yaml
from performa.engine import config
from performa.engine import player
@ -27,6 +28,13 @@ from performa.engine import utils
LOG = logging.getLogger(__name__)
def resolve_hosts(scenario, hosts):
for k, v in hosts.items():
scenario = scenario.replace('$%s' % k, ','.join(v) + ',')
return scenario
def main():
utils.init_config_and_logging(config.MAIN_OPTS)
@ -34,7 +42,10 @@ def main():
cfg.CONF.scenario,
alias_mapper=lambda f: config.SCENARIOS + '%s.yaml' % f)
scenario = utils.read_yaml_file(scenario_file_path)
scenario_raw = utils.read_file(scenario_file_path)
scenario_raw = resolve_hosts(scenario_raw, cfg.CONF.hosts)
scenario = yaml.safe_load(scenario_raw)
base_dir = os.path.dirname(scenario_file_path)
tag = cfg.CONF.tag
@ -44,7 +55,10 @@ def main():
records = player.play_scenario(scenario, tag)
storage.store_data(records, cfg.CONF.mongo_url, cfg.CONF.mongo_db)
if records:
storage.store_data(records, cfg.CONF.mongo_url, cfg.CONF.mongo_db)
else:
LOG.warning('Execution generated no records')
report.generate_report(scenario, base_dir, cfg.CONF.mongo_url,
cfg.CONF.mongo_db, cfg.CONF.book, tag)

View File

@ -14,14 +14,12 @@
# limitations under the License.
import copy
import re
from oslo_config import cfg
from oslo_log import log as logging
from performa.engine import ansible_runner
from performa.engine import utils
from performa import executors as executors_classes
LOG = logging.getLogger(__name__)
@ -30,52 +28,57 @@ def run_command(command):
return ansible_runner.run_command(command, cfg.CONF.hosts)
def _make_test_title(test, params=None):
s = test.get('title') or test.get('class')
if params:
s += ' '.join([','.join(['%s=%s' % (k, v) for k, v in params.items()
if k != 'host'])])
return re.sub(r'[^\x20-\x7e\x80-\xff]+', '_', s)
def _pick_tests(tests, matrix):
def _pick_tasks(tasks, matrix):
matrix = matrix or {}
for test in tests:
for params in utils.algebraic_product(**matrix):
parametrized_test = copy.deepcopy(test)
parametrized_test.update(params)
parametrized_test['title'] = _make_test_title(test, params)
yield parametrized_test
for params in utils.algebraic_product(**matrix):
for task in tasks:
parametrized_task = copy.deepcopy(task)
values = parametrized_task.values()[0]
if isinstance(values, dict):
values.update(params)
yield parametrized_task
def play_preparation(preparation):
ansible_playbook = preparation.get('ansible-playbook')
if ansible_playbook:
ansible_runner.run_playbook(ansible_playbook, cfg.CONF.hosts)
def play_setup(setup):
ansible_runner.run_playbook(setup)
def play_execution(execution):
def play_execution(execution_playbook):
records = []
matrix = execution.get('matrix')
for test in _pick_tests(execution['tests'], matrix):
executor = executors_classes.get_executor(test)
command = executor.get_command()
for play in execution_playbook:
matrix = play.get('matrix')
command_results = run_command(command)
for command_result in command_results:
for task in _pick_tasks(play['tasks'], matrix):
record = dict(id=utils.make_id(),
host=command_result['host'],
status=command_result['status'])
record.update(test)
task_play = {
'hosts': play['hosts'],
'tasks': [task],
}
command_results = ansible_runner.run_playbook([task_play])
if command_result.get('status') == 'OK':
er = executor.process_reply(command_result['payload'])
record.update(er)
for command_result in command_results:
if command_result.get('status') == 'OK':
record = dict(id=utils.make_id(),
host=command_result['host'],
status=command_result['status'],
task=command_result['task'])
payload = command_result['payload']
record.update(payload['invocation']['module_args'])
record.update(payload)
records.append(record)
# keep flat values only
for k, v in record.items():
if isinstance(v, list) or isinstance(v, dict):
del record[k]
del record['stdout']
LOG.debug('Record: %s', record)
records.append(record)
return records
@ -88,8 +91,8 @@ def tag_records(records, tag):
def play_scenario(scenario, tag):
records = {}
if 'preparation' in scenario:
play_preparation(scenario['preparation'])
if 'setup' in scenario:
play_setup(scenario['setup'])
if 'execution' in scenario:
execution = scenario['execution']

View File

@ -14,10 +14,8 @@
# limitations under the License.
from performa.executors import shell
from performa.executors import sysbench
EXECUTORS = {
'sysbench-oltp': sysbench.SysbenchOltpExecutor,
'_default': shell.ShellExecutor,
}

View File

@ -1,91 +0,0 @@
# Copyright (c) 2016 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import re
from oslo_log import log as logging
from performa.executors import base
LOG = logging.getLogger(__name__)
TEST_STATS = re.compile(
'\s+queries performed:\s*\n'
'\s+read:\s+(?P<queries_read>\d+)\s*\n'
'\s+write:\s+(?P<queries_write>\d+).*\n'
'\s+other:\s+(?P<queries_other>\d+).*\n'
'\s+total:\s+(?P<queries_total>\d+).*\n',
flags=re.MULTILINE | re.DOTALL
)
PATTERNS = [
r'sysbench (?P<version>[\d\.]+)',
TEST_STATS,
r'\s+transactions:\s+(?P<transactions>\d+).*\n',
r'\s+deadlocks:\s+(?P<deadlocks>\d+).*\n',
r'\s+total time:\s+(?P<duration>[\d\.]+).*\n',
]
TRANSFORM_FIELDS = {
'queries_read': int,
'queries_write': int,
'queries_other': int,
'queries_total': int,
'duration': float,
'transactions': int,
'deadlocks': int,
}
def parse_sysbench_oltp(raw):
result = {}
for pattern in PATTERNS:
for parsed in re.finditer(pattern, raw):
result.update(parsed.groupdict())
for k in result.keys():
if k in TRANSFORM_FIELDS:
result[k] = TRANSFORM_FIELDS[k](result[k])
return result
class SysbenchOltpExecutor(base.BaseExecutor):
def get_command(self):
cmd = base.CommandLine('sysbench')
cmd.add('--test', 'oltp')
cmd.add('--db-driver', 'mysql')
cmd.add('--mysql-table-engine', 'innodb')
cmd.add('--mysql-engine-trx', 'yes')
cmd.add('--num-threads', self.test_definition.get('threads') or 10)
cmd.add('--max-time', self.get_expected_duration())
cmd.add('--max-requests', 0)
cmd.add('--mysql-host', 'localhost')
cmd.add('--mysql-db', 'sbtest')
cmd.add('--oltp-table-name', 'sbtest')
cmd.add('--oltp-table-size',
self.test_definition.get('table_size') or 100000)
# cmd.add('--oltp-num-tables',
# self.test_definition.get('num_tables') or 10)
# cmd.add('--oltp-auto-inc', 'off')
# cmd.add('--oltp-read-only', 'off')
cmd.add('run')
return cmd.make()
def process_reply(self, record):
stdout = record.get('stdout')
return parse_sysbench_oltp(stdout)

View File

View File

@ -0,0 +1,87 @@
#!/usr/bin/python
import re
TEST_STATS = re.compile(
'\s+queries performed:\s*\n'
'\s+read:\s+(?P<queries_read>\d+)\s*\n'
'\s+write:\s+(?P<queries_write>\d+).*\n'
'\s+other:\s+(?P<queries_other>\d+).*\n'
'\s+total:\s+(?P<queries_total>\d+).*\n',
flags=re.MULTILINE | re.DOTALL
)
PATTERNS = [
r'sysbench (?P<version>[\d\.]+)',
TEST_STATS,
r'\s+transactions:\s+(?P<transactions>\d+).*\n',
r'\s+deadlocks:\s+(?P<deadlocks>\d+).*\n',
r'\s+total time:\s+(?P<duration>[\d\.]+).*\n',
]
TRANSFORM_FIELDS = {
'queries_read': int,
'queries_write': int,
'queries_other': int,
'queries_total': int,
'duration': float,
'transactions': int,
'deadlocks': int,
}
def parse_sysbench_oltp(raw):
result = {}
for pattern in PATTERNS:
for parsed in re.finditer(pattern, raw):
result.update(parsed.groupdict())
for k in result.keys():
if k in TRANSFORM_FIELDS:
result[k] = TRANSFORM_FIELDS[k](result[k])
return result
def main():
module = AnsibleModule(
argument_spec=dict(
threads=dict(type='int', default=10),
duration=dict(type='int', default=10),
mysql_host=dict(default='localhost'),
mysql_db=dict(default='sbtest'),
oltp_table_name=dict(default='sbtest'),
oltp_table_size=dict(type='int', default=100000),
))
cmd = ('sysbench '
'--test=oltp '
'--db-driver=mysql '
'--mysql-table-engine=innodb '
'--mysql-engine-trx=yes '
'--num-threads=%(threads)s '
'--max-time=%(duration)s '
'--max-requests=0 '
'--mysql-host=%(mysql_host)s '
'--mysql-db=%(mysql_db)s '
'--oltp-table-name=%(oltp_table_name)s '
'--oltp-table-size=%(oltp_table_size)s '
'run'
) % module.params
rc, stdout, stderr = module.run_command(cmd)
result = dict(changed=True, rc=rc, stdout=stdout, stderr=stderr, cmd=cmd)
try:
result.update(parse_sysbench_oltp(stdout))
module.exit_json(**result)
except Exception as e:
result['exception'] = e
module.fail_json(**result)
from ansible.module_utils.basic import * # noqa
if __name__ == '__main__':
main()

View File

@ -17,7 +17,7 @@ Chart and table:
y2: read queries per sec
chart: line
pipeline:
- { $match: { class: sysbench-oltp, status: OK }}
- { $match: { task: sysbench_oltp, status: OK }}
- { $group: { _id: { threads: "$threads" },
queries_total_per_sec: { $avg: { $divide: ["$queries_total", "$duration"] }},
queries_read_per_sec: { $avg: { $divide: ["$queries_read", "$duration"] }}

View File

@ -1,25 +1,26 @@
title: DB
title: Sysbench DB
description:
This scenario uses sysbench to execute DB test plan.
preparation:
ansible-playbook:
-
tasks:
- apt: name=sysbench
become: yes
become_user: root
become_method: sudo
setup:
-
hosts: $target
tasks:
- name: installing sysbench
apt: name=sysbench
become: yes
become_user: root
become_method: sudo
execution:
matrix:
threads: [ 10, 20, 30, 40, 50, 60 ]
tests:
-
title: sysbench-oltp
class: sysbench-oltp
time: 10
hosts: $target
matrix:
threads: [ 10, 20, 30 ]
tasks:
- sysbench_oltp:
duration: 10
report:
template: sysbench.rst

View File

@ -15,7 +15,7 @@
import testtools
from performa.executors import sysbench
from performa.modules import sysbench_oltp as sysbench
OLTP_OUTPUT = '''
sysbench 0.4.12: multi-threaded system evaluation benchmark

View File

@ -25,6 +25,6 @@ commands = python setup.py build_sphinx
[flake8]
# E123, E125 skipped as they are invalid PEP-8.
show-source = True
ignore = E123,E125
ignore = E123,E125,H102
builtins = _
exclude=.venv,.git,.tox,dist,doc,*openstack/common*,*lib/python*,*egg,build