Turn executors into Ansible modules
This commit is contained in:
parent
29660a67ac
commit
92d86d73ad
|
@ -23,6 +23,8 @@ from ansible.plugins import callback
|
|||
from ansible.vars import VariableManager
|
||||
from oslo_log import log as logging
|
||||
|
||||
from performa.engine import utils
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
|
@ -70,10 +72,14 @@ Options = namedtuple('Options',
|
|||
|
||||
def _run(play_source, host_list):
|
||||
|
||||
LOG.debug('Running play: %s on hosts: %s', play_source, host_list)
|
||||
|
||||
variable_manager = VariableManager()
|
||||
loader = dataloader.DataLoader()
|
||||
module_path = utils.resolve_relative_path('performa/modules')
|
||||
|
||||
options = Options(connection='smart', password='swordfish',
|
||||
module_path='/path/to/mymodules',
|
||||
module_path=module_path,
|
||||
forks=100, remote_user='developer',
|
||||
private_key_file=None,
|
||||
ssh_common_args=None, ssh_extra_args=None,
|
||||
|
@ -129,11 +135,13 @@ def run_command(command, host_list):
|
|||
return _run(play_source, hosts)
|
||||
|
||||
|
||||
def run_playbook(playbook, host_list):
|
||||
def run_playbook(playbook):
|
||||
result = []
|
||||
|
||||
for play_source in playbook:
|
||||
hosts = ','.join(host_list) + ','
|
||||
play_source['hosts'] = hosts
|
||||
hosts = play_source['hosts']
|
||||
play_source['gather_facts'] = 'no'
|
||||
|
||||
_run(play_source, hosts)
|
||||
result += (_run(play_source, hosts))
|
||||
|
||||
return result
|
||||
|
|
|
@ -17,6 +17,7 @@ import copy
|
|||
|
||||
from oslo_config import cfg
|
||||
from oslo_config import types
|
||||
import yaml
|
||||
|
||||
from performa.engine import utils
|
||||
|
||||
|
@ -35,6 +36,20 @@ class Endpoint(types.String):
|
|||
return "Endpoint host[:port]"
|
||||
|
||||
|
||||
class Yaml(types.String):
|
||||
|
||||
def __call__(self, value):
|
||||
value = str(value)
|
||||
try:
|
||||
value = yaml.safe_load(value)
|
||||
except Exception:
|
||||
raise ValueError('YAML value is expected, but got: %s' % value)
|
||||
return value
|
||||
|
||||
def __repr__(self):
|
||||
return "YAML data"
|
||||
|
||||
|
||||
MAIN_OPTS = [
|
||||
cfg.StrOpt('scenario',
|
||||
default=utils.env('PERFORMA_SCENARIO'),
|
||||
|
@ -52,10 +67,12 @@ MAIN_OPTS = [
|
|||
default=utils.env('PERFORMA_MONGO_DB'),
|
||||
required=True,
|
||||
help='Mongo DB, defaults to env[PERFORMA_MONGO_DB].'),
|
||||
cfg.ListOpt('hosts',
|
||||
default=utils.env('PERFORMA_HOSTS'),
|
||||
required=True,
|
||||
help='List of hosts, defaults to env[PERFORMA_MONGO_URL].'),
|
||||
cfg.Opt('hosts',
|
||||
type=Yaml(),
|
||||
default=utils.env('PERFORMA_HOSTS'),
|
||||
required=True,
|
||||
help='Hosts inventory definition in YAML format, '
|
||||
'Can be specified via env[PERFORMA_HOSTS].'),
|
||||
cfg.StrOpt('book',
|
||||
default=utils.env('PERFORMA_BOOK'),
|
||||
help='Generate report in ReST format and store it into the '
|
||||
|
|
|
@ -17,6 +17,7 @@ import os
|
|||
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
import yaml
|
||||
|
||||
from performa.engine import config
|
||||
from performa.engine import player
|
||||
|
@ -27,6 +28,13 @@ from performa.engine import utils
|
|||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def resolve_hosts(scenario, hosts):
|
||||
for k, v in hosts.items():
|
||||
scenario = scenario.replace('$%s' % k, ','.join(v) + ',')
|
||||
|
||||
return scenario
|
||||
|
||||
|
||||
def main():
|
||||
utils.init_config_and_logging(config.MAIN_OPTS)
|
||||
|
||||
|
@ -34,7 +42,10 @@ def main():
|
|||
cfg.CONF.scenario,
|
||||
alias_mapper=lambda f: config.SCENARIOS + '%s.yaml' % f)
|
||||
|
||||
scenario = utils.read_yaml_file(scenario_file_path)
|
||||
scenario_raw = utils.read_file(scenario_file_path)
|
||||
scenario_raw = resolve_hosts(scenario_raw, cfg.CONF.hosts)
|
||||
scenario = yaml.safe_load(scenario_raw)
|
||||
|
||||
base_dir = os.path.dirname(scenario_file_path)
|
||||
|
||||
tag = cfg.CONF.tag
|
||||
|
@ -44,7 +55,10 @@ def main():
|
|||
|
||||
records = player.play_scenario(scenario, tag)
|
||||
|
||||
storage.store_data(records, cfg.CONF.mongo_url, cfg.CONF.mongo_db)
|
||||
if records:
|
||||
storage.store_data(records, cfg.CONF.mongo_url, cfg.CONF.mongo_db)
|
||||
else:
|
||||
LOG.warning('Execution generated no records')
|
||||
|
||||
report.generate_report(scenario, base_dir, cfg.CONF.mongo_url,
|
||||
cfg.CONF.mongo_db, cfg.CONF.book, tag)
|
||||
|
|
|
@ -14,14 +14,12 @@
|
|||
# limitations under the License.
|
||||
|
||||
import copy
|
||||
import re
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
|
||||
from performa.engine import ansible_runner
|
||||
from performa.engine import utils
|
||||
from performa import executors as executors_classes
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
@ -30,52 +28,57 @@ def run_command(command):
|
|||
return ansible_runner.run_command(command, cfg.CONF.hosts)
|
||||
|
||||
|
||||
def _make_test_title(test, params=None):
|
||||
s = test.get('title') or test.get('class')
|
||||
if params:
|
||||
s += ' '.join([','.join(['%s=%s' % (k, v) for k, v in params.items()
|
||||
if k != 'host'])])
|
||||
return re.sub(r'[^\x20-\x7e\x80-\xff]+', '_', s)
|
||||
|
||||
|
||||
def _pick_tests(tests, matrix):
|
||||
def _pick_tasks(tasks, matrix):
|
||||
matrix = matrix or {}
|
||||
for test in tests:
|
||||
for params in utils.algebraic_product(**matrix):
|
||||
parametrized_test = copy.deepcopy(test)
|
||||
parametrized_test.update(params)
|
||||
parametrized_test['title'] = _make_test_title(test, params)
|
||||
|
||||
yield parametrized_test
|
||||
for params in utils.algebraic_product(**matrix):
|
||||
for task in tasks:
|
||||
parametrized_task = copy.deepcopy(task)
|
||||
values = parametrized_task.values()[0]
|
||||
|
||||
if isinstance(values, dict):
|
||||
values.update(params)
|
||||
|
||||
yield parametrized_task
|
||||
|
||||
|
||||
def play_preparation(preparation):
|
||||
ansible_playbook = preparation.get('ansible-playbook')
|
||||
if ansible_playbook:
|
||||
ansible_runner.run_playbook(ansible_playbook, cfg.CONF.hosts)
|
||||
def play_setup(setup):
|
||||
ansible_runner.run_playbook(setup)
|
||||
|
||||
|
||||
def play_execution(execution):
|
||||
def play_execution(execution_playbook):
|
||||
records = []
|
||||
matrix = execution.get('matrix')
|
||||
|
||||
for test in _pick_tests(execution['tests'], matrix):
|
||||
executor = executors_classes.get_executor(test)
|
||||
command = executor.get_command()
|
||||
for play in execution_playbook:
|
||||
matrix = play.get('matrix')
|
||||
|
||||
command_results = run_command(command)
|
||||
for command_result in command_results:
|
||||
for task in _pick_tasks(play['tasks'], matrix):
|
||||
|
||||
record = dict(id=utils.make_id(),
|
||||
host=command_result['host'],
|
||||
status=command_result['status'])
|
||||
record.update(test)
|
||||
task_play = {
|
||||
'hosts': play['hosts'],
|
||||
'tasks': [task],
|
||||
}
|
||||
command_results = ansible_runner.run_playbook([task_play])
|
||||
|
||||
if command_result.get('status') == 'OK':
|
||||
er = executor.process_reply(command_result['payload'])
|
||||
record.update(er)
|
||||
for command_result in command_results:
|
||||
if command_result.get('status') == 'OK':
|
||||
record = dict(id=utils.make_id(),
|
||||
host=command_result['host'],
|
||||
status=command_result['status'],
|
||||
task=command_result['task'])
|
||||
payload = command_result['payload']
|
||||
record.update(payload['invocation']['module_args'])
|
||||
record.update(payload)
|
||||
|
||||
records.append(record)
|
||||
# keep flat values only
|
||||
for k, v in record.items():
|
||||
if isinstance(v, list) or isinstance(v, dict):
|
||||
del record[k]
|
||||
|
||||
del record['stdout']
|
||||
|
||||
LOG.debug('Record: %s', record)
|
||||
records.append(record)
|
||||
|
||||
return records
|
||||
|
||||
|
@ -88,8 +91,8 @@ def tag_records(records, tag):
|
|||
def play_scenario(scenario, tag):
|
||||
records = {}
|
||||
|
||||
if 'preparation' in scenario:
|
||||
play_preparation(scenario['preparation'])
|
||||
if 'setup' in scenario:
|
||||
play_setup(scenario['setup'])
|
||||
|
||||
if 'execution' in scenario:
|
||||
execution = scenario['execution']
|
||||
|
|
|
@ -14,10 +14,8 @@
|
|||
# limitations under the License.
|
||||
|
||||
from performa.executors import shell
|
||||
from performa.executors import sysbench
|
||||
|
||||
EXECUTORS = {
|
||||
'sysbench-oltp': sysbench.SysbenchOltpExecutor,
|
||||
'_default': shell.ShellExecutor,
|
||||
}
|
||||
|
||||
|
|
|
@ -1,91 +0,0 @@
|
|||
# Copyright (c) 2016 OpenStack Foundation
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import re
|
||||
|
||||
from oslo_log import log as logging
|
||||
|
||||
from performa.executors import base
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
TEST_STATS = re.compile(
|
||||
'\s+queries performed:\s*\n'
|
||||
'\s+read:\s+(?P<queries_read>\d+)\s*\n'
|
||||
'\s+write:\s+(?P<queries_write>\d+).*\n'
|
||||
'\s+other:\s+(?P<queries_other>\d+).*\n'
|
||||
'\s+total:\s+(?P<queries_total>\d+).*\n',
|
||||
flags=re.MULTILINE | re.DOTALL
|
||||
)
|
||||
PATTERNS = [
|
||||
r'sysbench (?P<version>[\d\.]+)',
|
||||
TEST_STATS,
|
||||
r'\s+transactions:\s+(?P<transactions>\d+).*\n',
|
||||
r'\s+deadlocks:\s+(?P<deadlocks>\d+).*\n',
|
||||
r'\s+total time:\s+(?P<duration>[\d\.]+).*\n',
|
||||
]
|
||||
TRANSFORM_FIELDS = {
|
||||
'queries_read': int,
|
||||
'queries_write': int,
|
||||
'queries_other': int,
|
||||
'queries_total': int,
|
||||
'duration': float,
|
||||
'transactions': int,
|
||||
'deadlocks': int,
|
||||
}
|
||||
|
||||
|
||||
def parse_sysbench_oltp(raw):
|
||||
result = {}
|
||||
|
||||
for pattern in PATTERNS:
|
||||
for parsed in re.finditer(pattern, raw):
|
||||
result.update(parsed.groupdict())
|
||||
|
||||
for k in result.keys():
|
||||
if k in TRANSFORM_FIELDS:
|
||||
result[k] = TRANSFORM_FIELDS[k](result[k])
|
||||
|
||||
return result
|
||||
|
||||
|
||||
class SysbenchOltpExecutor(base.BaseExecutor):
|
||||
def get_command(self):
|
||||
cmd = base.CommandLine('sysbench')
|
||||
|
||||
cmd.add('--test', 'oltp')
|
||||
cmd.add('--db-driver', 'mysql')
|
||||
cmd.add('--mysql-table-engine', 'innodb')
|
||||
cmd.add('--mysql-engine-trx', 'yes')
|
||||
cmd.add('--num-threads', self.test_definition.get('threads') or 10)
|
||||
cmd.add('--max-time', self.get_expected_duration())
|
||||
cmd.add('--max-requests', 0)
|
||||
cmd.add('--mysql-host', 'localhost')
|
||||
cmd.add('--mysql-db', 'sbtest')
|
||||
cmd.add('--oltp-table-name', 'sbtest')
|
||||
cmd.add('--oltp-table-size',
|
||||
self.test_definition.get('table_size') or 100000)
|
||||
# cmd.add('--oltp-num-tables',
|
||||
# self.test_definition.get('num_tables') or 10)
|
||||
# cmd.add('--oltp-auto-inc', 'off')
|
||||
# cmd.add('--oltp-read-only', 'off')
|
||||
cmd.add('run')
|
||||
|
||||
return cmd.make()
|
||||
|
||||
def process_reply(self, record):
|
||||
stdout = record.get('stdout')
|
||||
return parse_sysbench_oltp(stdout)
|
|
@ -0,0 +1,87 @@
|
|||
#!/usr/bin/python
|
||||
|
||||
import re
|
||||
|
||||
TEST_STATS = re.compile(
|
||||
'\s+queries performed:\s*\n'
|
||||
'\s+read:\s+(?P<queries_read>\d+)\s*\n'
|
||||
'\s+write:\s+(?P<queries_write>\d+).*\n'
|
||||
'\s+other:\s+(?P<queries_other>\d+).*\n'
|
||||
'\s+total:\s+(?P<queries_total>\d+).*\n',
|
||||
flags=re.MULTILINE | re.DOTALL
|
||||
)
|
||||
PATTERNS = [
|
||||
r'sysbench (?P<version>[\d\.]+)',
|
||||
TEST_STATS,
|
||||
r'\s+transactions:\s+(?P<transactions>\d+).*\n',
|
||||
r'\s+deadlocks:\s+(?P<deadlocks>\d+).*\n',
|
||||
r'\s+total time:\s+(?P<duration>[\d\.]+).*\n',
|
||||
]
|
||||
TRANSFORM_FIELDS = {
|
||||
'queries_read': int,
|
||||
'queries_write': int,
|
||||
'queries_other': int,
|
||||
'queries_total': int,
|
||||
'duration': float,
|
||||
'transactions': int,
|
||||
'deadlocks': int,
|
||||
}
|
||||
|
||||
|
||||
def parse_sysbench_oltp(raw):
|
||||
result = {}
|
||||
|
||||
for pattern in PATTERNS:
|
||||
for parsed in re.finditer(pattern, raw):
|
||||
result.update(parsed.groupdict())
|
||||
|
||||
for k in result.keys():
|
||||
if k in TRANSFORM_FIELDS:
|
||||
result[k] = TRANSFORM_FIELDS[k](result[k])
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def main():
|
||||
module = AnsibleModule(
|
||||
argument_spec=dict(
|
||||
threads=dict(type='int', default=10),
|
||||
duration=dict(type='int', default=10),
|
||||
mysql_host=dict(default='localhost'),
|
||||
mysql_db=dict(default='sbtest'),
|
||||
oltp_table_name=dict(default='sbtest'),
|
||||
oltp_table_size=dict(type='int', default=100000),
|
||||
))
|
||||
|
||||
cmd = ('sysbench '
|
||||
'--test=oltp '
|
||||
'--db-driver=mysql '
|
||||
'--mysql-table-engine=innodb '
|
||||
'--mysql-engine-trx=yes '
|
||||
'--num-threads=%(threads)s '
|
||||
'--max-time=%(duration)s '
|
||||
'--max-requests=0 '
|
||||
'--mysql-host=%(mysql_host)s '
|
||||
'--mysql-db=%(mysql_db)s '
|
||||
'--oltp-table-name=%(oltp_table_name)s '
|
||||
'--oltp-table-size=%(oltp_table_size)s '
|
||||
'run'
|
||||
) % module.params
|
||||
|
||||
rc, stdout, stderr = module.run_command(cmd)
|
||||
|
||||
result = dict(changed=True, rc=rc, stdout=stdout, stderr=stderr, cmd=cmd)
|
||||
|
||||
try:
|
||||
result.update(parse_sysbench_oltp(stdout))
|
||||
module.exit_json(**result)
|
||||
except Exception as e:
|
||||
result['exception'] = e
|
||||
|
||||
module.fail_json(**result)
|
||||
|
||||
|
||||
from ansible.module_utils.basic import * # noqa
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
|
@ -17,7 +17,7 @@ Chart and table:
|
|||
y2: read queries per sec
|
||||
chart: line
|
||||
pipeline:
|
||||
- { $match: { class: sysbench-oltp, status: OK }}
|
||||
- { $match: { task: sysbench_oltp, status: OK }}
|
||||
- { $group: { _id: { threads: "$threads" },
|
||||
queries_total_per_sec: { $avg: { $divide: ["$queries_total", "$duration"] }},
|
||||
queries_read_per_sec: { $avg: { $divide: ["$queries_read", "$duration"] }}
|
||||
|
|
|
@ -1,25 +1,26 @@
|
|||
title: DB
|
||||
title: Sysbench DB
|
||||
|
||||
description:
|
||||
This scenario uses sysbench to execute DB test plan.
|
||||
|
||||
preparation:
|
||||
ansible-playbook:
|
||||
-
|
||||
tasks:
|
||||
- apt: name=sysbench
|
||||
become: yes
|
||||
become_user: root
|
||||
become_method: sudo
|
||||
setup:
|
||||
-
|
||||
hosts: $target
|
||||
tasks:
|
||||
- name: installing sysbench
|
||||
apt: name=sysbench
|
||||
become: yes
|
||||
become_user: root
|
||||
become_method: sudo
|
||||
|
||||
execution:
|
||||
matrix:
|
||||
threads: [ 10, 20, 30, 40, 50, 60 ]
|
||||
tests:
|
||||
-
|
||||
title: sysbench-oltp
|
||||
class: sysbench-oltp
|
||||
time: 10
|
||||
hosts: $target
|
||||
matrix:
|
||||
threads: [ 10, 20, 30 ]
|
||||
tasks:
|
||||
- sysbench_oltp:
|
||||
duration: 10
|
||||
|
||||
report:
|
||||
template: sysbench.rst
|
||||
|
|
|
@ -15,7 +15,7 @@
|
|||
|
||||
import testtools
|
||||
|
||||
from performa.executors import sysbench
|
||||
from performa.modules import sysbench_oltp as sysbench
|
||||
|
||||
OLTP_OUTPUT = '''
|
||||
sysbench 0.4.12: multi-threaded system evaluation benchmark
|
||||
|
|
Loading…
Reference in New Issue