Enhance omsimulator runner

Change-Id: I8adb7dae86f43aaf539648d93e27d0fae0b987ed
This commit is contained in:
Ilya Shakhat 2016-04-19 14:52:49 +03:00
parent 83aeb55e60
commit c5d91c68ab
2 changed files with 34 additions and 36 deletions

View File

@ -2,7 +2,7 @@
import copy
import multiprocessing
import os
import signal
import random
import tempfile
SERVER_PID = os.path.join(tempfile.gettempdir(), 'performa.oms.pid')
@ -49,10 +49,15 @@ def make_client_cmd(params, i):
params['client_file'] = make_file_name(CLIENT_FILE_NAME, i)
params['url'] = params['client_url'] or params['url']
params['this_topic'] = params['topic']
if params['unique_topic_per_pair']:
params['this_topic'] += '_%s' % i
client = ('%(python)s simulator.py '
'--url %(url)s '
'--json %(client_file)s '
'-l %(duration)s '
'--topic %(this_topic)s '
'%(client_tool)s '
'--timeout %(timeout)s '
'-w %(sending_delay)s '
@ -70,62 +75,49 @@ def make_client_cmd(params, i):
def make_server_cmd(params, i):
params['server_file'] = make_file_name(SERVER_FILE_NAME, i)
params['url'] = params['server_url'] or params['url']
params['server_duration'] = (params['duration'] +
params['server_teardown_duration'])
params['this_topic'] = params['topic']
if params['unique_topic_per_pair']:
params['this_topic'] += '_%s' % i
server = ('%(python)s simulator.py '
'--url %(url)s '
'--json %(server_file)s '
'-l %(server_duration)s '
'--topic %(this_topic)s '
'%(server_tool)s ') % params
return server
def run_client(module, command):
module.run_command(command)
def run_client_pool(module, params):
def run_pool(module, params):
processes = []
for i in range(params['processes']):
cmd = make_client_cmd(params, i)
p = multiprocessing.Process(target=run_client, args=(module, cmd))
p = multiprocessing.Process(target=module.run_command, args=(cmd,))
processes.append(p)
p.start()
for p in processes:
p.join()
def run_server(module, command):
module.run_command(command)
def start_server_pool(module, params):
processes = []
for i in range(params['processes']):
cmd = make_server_cmd(params, i)
p = multiprocessing.Process(target=run_client, args=(module, cmd))
p = multiprocessing.Process(target=module.run_command, args=(cmd,))
processes.append(p)
p.start()
return processes
def stop_server_pool(module, processes):
for p in processes:
rc, stdout, stderr = module.run_command('pgrep -P %s' % p.pid)
for child in (int(p) for p in stdout.split('\n') if p):
os.kill(child, signal.SIGINT)
time.sleep(3) # let simulators handle the signal
for p in processes:
os.kill(p.pid, signal.SIGINT)
p.join()
def cleanup_old_files(module, params):
for i in range(params['processes']):
try:
os.unlink(make_file_name(CLIENT_FILE_NAME, i))
os.unlink(make_file_name(SERVER_FILE_NAME, i))
except OSError:
pass # ignore
def collect_results(params):
result = dict(records=[], series=[])
@ -167,11 +159,12 @@ def run(module):
params['server_tool'] = server_tool
params['client_tool'] = client_tool
server_processes = start_server_pool(module, params)
if not params['topic']:
params['topic'] = 'performa_%s' % random.random()
run_client_pool(module, params)
cleanup_old_files(module, params)
stop_server_pool(module, server_processes)
run_pool(module, params)
try:
result = collect_results(params)
@ -194,6 +187,9 @@ def main():
duration=dict(type='int', default=10),
timeout=dict(type='int', default=5),
sending_delay=dict(type='float', default=-1.0),
topic=dict(),
unique_topic_per_pair=dict(type='bool', default=False),
server_teardown_duration=dict(type='int', default=15),
))
chdir(module)

View File

@ -58,6 +58,7 @@ execution:
url: {{ rabbit_url }}
client_url: {{ client_url }}
server_url: {{ server_url }}
topic: profiler_topic
-
hosts: {{ tester_hosts }}
matrix:
@ -70,6 +71,7 @@ execution:
url: {{ rabbit_url }}
client_url: {{ client_url }}
server_url: {{ server_url }}
topic: profiler_topic
-
hosts: {{ tester_hosts }}
matrix: