From 6e684cd8414250271a2360059594a5002c33a258 Mon Sep 17 00:00:00 2001 From: Ilya Shakhat Date: Thu, 31 Mar 2016 19:36:37 +0300 Subject: [PATCH] Filter atop series inside the module Introduce ability to filter series inside atop module. The feature is configured via parameter 'filter', which is a dictinary where keys as param names and values play role of filter. Change-Id: I553558a63879a3e6f66522a040b74b91e4884800 --- performa/engine/player.py | 2 ++ performa/modules/atop.py | 52 ++++++++++++++++++++++++----- performa/scenarios/db/sysbench.yaml | 6 ++-- performa/scenarios/mq/rabbitmq.yaml | 4 ++- performa/tests/test_atop.py | 41 +++++++++++++++++++---- 5 files changed, 88 insertions(+), 17 deletions(-) diff --git a/performa/engine/player.py b/performa/engine/player.py index e935a64..3155d87 100644 --- a/performa/engine/player.py +++ b/performa/engine/player.py @@ -87,6 +87,8 @@ def play_execution(runner, execution_playbook): rec.update(common) series.append(rec) LOG.debug('New time series: %s', rec) + else: + LOG.warning('Play failed: %s', command_result) return records, series diff --git a/performa/modules/atop.py b/performa/modules/atop.py index a605ffb..19f7535 100644 --- a/performa/modules/atop.py +++ b/performa/modules/atop.py @@ -1,5 +1,6 @@ #!/usr/bin/python +import functools import os import re import tempfile @@ -145,10 +146,7 @@ def normalize_point(point): return point -def parse_output(raw, filter_labels): - filter_labels = set(filter_labels) - series = [] - +def parse_output(raw): active = False for line in raw.split('\n'): if line == 'SEP': @@ -162,11 +160,44 @@ def parse_output(raw, filter_labels): m = re.match(pattern, line) if m: point = m.groupdict() - if point['label'] in filter_labels: - series.append(normalize_point(point)) + yield normalize_point(point) break - return series + +def make_filter_funcs(filters): + def _in_list(point, name, lst): + return point.get(name) in lst + + def _match(point, name, patt): + return re.match(patt, point.get(name, '')) + + funcs = [] + for name, values in filters.items(): + fn = None + if isinstance(values, list): + fn = functools.partial(_in_list, name=name, lst=set(values)) + elif isinstance(values, str): + fn = functools.partial(_match, name=name, patt=re.compile(values)) + if fn: + funcs.append(fn) + return funcs + + +def run_filter_funcs(points, funcs): + for point in points: + accepted = True + for fn in funcs: + if not fn(point): + accepted = False + break + + if accepted: + yield point + + +def parse(raw, filters): + funcs = make_filter_funcs(filters) + return list(run_filter_funcs(parse_output(raw), funcs)) def start(module): @@ -206,13 +237,17 @@ def stop(module): # grab data labels = module.params['labels'] or ALL_LABELS + ft = module.params.get('filter') + if ft and 'label' in ft: + labels = ft['label'] + cmd = ('atop -r %(file)s -P %(labels)s' % dict(file=ATOP_FILE_NAME, labels=','.join(labels))) rc, stdout, stderr = module.run_command(cmd) try: - series = parse_output(stdout, labels) + series = parse(stdout, module.params.get('filter', {})) module.exit_json(series=series) except Exception as e: module.fail_json(msg=str(e), stderr=stderr, rc=rc) @@ -224,6 +259,7 @@ def main(): command=dict(required=True, choices=['start', 'stop']), interval=dict(type='int', default=1), labels=dict(type='list'), + filter=dict(type='dict', default={}), )) command = module.params['command'] diff --git a/performa/scenarios/db/sysbench.yaml b/performa/scenarios/db/sysbench.yaml index 34b26cd..f35edd7 100644 --- a/performa/scenarios/db/sysbench.yaml +++ b/performa/scenarios/db/sysbench.yaml @@ -35,7 +35,7 @@ execution: threads: [ 5, 10, 15, 20, 30, 40, 50 ] tasks: - sysbench_oltp: - duration: 60 + duration: 10 mysql_host: {{ mysql_endpoint }} mysql_port: {{ mysql_port }} mysql_db: sbtest @@ -44,7 +44,9 @@ execution: tasks: - atop: command: stop - labels: [ PRC ] + filter: + name: mysqld + label: [ PRC ] aggregation: - diff --git a/performa/scenarios/mq/rabbitmq.yaml b/performa/scenarios/mq/rabbitmq.yaml index a274b77..a0c565c 100644 --- a/performa/scenarios/mq/rabbitmq.yaml +++ b/performa/scenarios/mq/rabbitmq.yaml @@ -87,7 +87,9 @@ execution: tasks: - atop: command: stop - labels: [ PRC ] + filter: + name: beam.* + label: [ PRC ] aggregation: - diff --git a/performa/tests/test_atop.py b/performa/tests/test_atop.py index 82a9764..2b55f0f 100644 --- a/performa/tests/test_atop.py +++ b/performa/tests/test_atop.py @@ -36,7 +36,8 @@ class TestAtop(testtools.TestCase): 'sys': 0.04, 'ticks_per_second': 100, 'time': '10:01:05', 'timestamp': 1456480865, 'user': 0.04, 'wait': 0.0}] - self.assertEqual(expected, atop.parse_output(_read_sample(), ['CPU'])) + self.assertEqual(expected, + atop.parse(_read_sample(), dict(label=['CPU']))) def test_parse_cpu(self): needle = {'cpu_id': 2, 'date': '2016/02/26', 'guest': 0.0, @@ -45,7 +46,8 @@ class TestAtop(testtools.TestCase): 'sys': 0.03, 'ticks_per_second': 100, 'time': '10:01:05', 'timestamp': 1456480865, 'user': 0.03, 'wait': 0.0} - self.assertIn(needle, atop.parse_output(_read_sample(), ['cpu'])) + self.assertIn(needle, + atop.parse(_read_sample(), dict(label=['cpu']))) def test_parse_mem(self): expected = [ @@ -58,7 +60,8 @@ class TestAtop(testtools.TestCase): 'label': 'MEM', 'page_size': 4096, 'phys': 8373075968, 'slab': 298115072, 'time': '10:01:05', 'timestamp': 1456480865}] - self.assertEqual(expected, atop.parse_output(_read_sample(), ['MEM'])) + self.assertEqual(expected, + atop.parse(_read_sample(), dict(label=['MEM']))) def test_parse_net(self): needle = {'date': '2016/02/26', 'host': 'host', 'interval': 1, @@ -66,7 +69,8 @@ class TestAtop(testtools.TestCase): 'label': 'NET', 'tcp_rx': 0, 'tcp_tx': 0, 'time': '10:01:04', 'timestamp': 1456480864, 'udp_rx': 0, 'udp_tx': 0} - self.assertIn(needle, atop.parse_output(_read_sample(), ['NET'])) + self.assertIn(needle, + atop.parse(_read_sample(), dict(label=['NET']))) def test_parse_prc(self): needle = {'current_cpu': 2, 'date': '2016/02/26', 'host': 'host', @@ -76,7 +80,8 @@ class TestAtop(testtools.TestCase): 'sys': 0.02, 'ticks_per_second': 100, 'time': '10:01:04', 'timestamp': 1456480864, 'user': 0.01} - self.assertIn(needle, atop.parse_output(_read_sample(), ['PRC'])) + self.assertIn(needle, + atop.parse(_read_sample(), dict(label=['PRC']))) def test_parse_prm(self): needle = {'date': '2016/02/26', 'host': 'host', 'interval': 1, @@ -87,4 +92,28 @@ class TestAtop(testtools.TestCase): 'timestamp': 1456480865, 'virtual': 17412096, 'virtual_growth': 0} - self.assertIn(needle, atop.parse_output(_read_sample(), ['PRM'])) + self.assertIn(needle, + atop.parse(_read_sample(), dict(label=['PRM']))) + + def test_parse_match_name_regex(self): + expected = [{'current_cpu': 2, 'date': '2016/02/26', 'host': 'host', + 'interval': 1, 'label': 'PRC', 'name': 'dstat', 'nice': 0, + 'pid': 11014, 'priority': 120, 'realtime_priority': 0, + 'scheduling_policy': 0, 'sleep_avg': 0, 'state': 'S', + 'sys': 0.02, 'ticks_per_second': 100, 'time': '10:01:04', + 'timestamp': 1456480864, 'user': 0.01}, + {'current_cpu': 2, 'date': '2016/02/26', 'host': 'host', + 'interval': 1, 'label': 'PRC', 'name': 'dstat', 'nice': 0, + 'pid': 11014, 'priority': 120, 'realtime_priority': 0, + 'scheduling_policy': 0, 'sleep_avg': 0, 'state': 'S', + 'sys': 0.0, 'ticks_per_second': 100, 'time': '10:01:05', + 'timestamp': 1456480865, 'user': 0.02}] + + filter = { + 'name': 'dstat', + 'label': ['PRC'], + } + self.assertEqual(expected, atop.parse(_read_sample(), filter)) + + def test_parse_no_filter(self): + self.assertEqual(43, len(atop.parse(_read_sample(), {})))