diff --git a/system_test/systemtest.cfg b/system_test/systemtest.cfg new file mode 100644 index 0000000..b6168a3 --- /dev/null +++ b/system_test/systemtest.cfg @@ -0,0 +1,22 @@ +[config] + +# The Keystone URL to talk to +#authurl=http://localhost:5000/v2.0 + +# The username to login as +#username=admin + +# The tenant name to scope into +#tenantname=admin + +# The password for that user +#password=password + +# How many workers to spin up +#num_workers=2 + +# How many write/read cycles to execute +#num_cycles=8 + +# The name of the SwiftOnHPSS storage policy configured on your Swift cluster +#hpss_storage_policy=hpss diff --git a/system_test/systemtest.py b/system_test/systemtest.py new file mode 100755 index 0000000..d2bfa04 --- /dev/null +++ b/system_test/systemtest.py @@ -0,0 +1,402 @@ +#!/usr/bin/env python +# +# Copyright (c) 2015-2016 IBM Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import swiftclient +import multiprocessing +import os +import time +import ConfigParser +import collections +import itertools +from pprint import pprint as pretty_print + + +# This is here because you can't normally do multiprocessing on +# instance methods. We get around that restriction by using this +# top-level function in the worker process as the execution target +# to call it for us, with the class state and method name pickled and +# piped in through a queue. Who needs the Law of Demeter anyway?! +def _call_instance_method(instance, method_name, *args, **kwargs): + return getattr(instance.__class__, method_name)(instance, *args, **kwargs) + + +def _parse_config(): + parser = ConfigParser.SafeConfigParser() + parser.read('systemtest.cfg') + options = {} + for opt in ['authurl', 'username', 'tenantname', + 'password', 'num_workers', 'num_cycles', + 'hpss_storage_policy']: + options[opt] = parser.get('config', opt) + + # These are specifically numbers. + options['num_workers'] = int(options['num_workers']) + options['num_cycles'] = int(options['num_cycles']) + return options + + +def apply_ansi(string_to_format, ansi_codes): + input_codes = ansi_codes + if type(input_codes) is int: + input_codes = [ansi_codes] + if len(input_codes) is 0: + return string_to_format + code_str = ";".join(map(str, input_codes)) + return '\033[%sm%s\033[0m' % (code_str, string_to_format) + + +def average(value_list): + if len(value_list) == 0: + return 0 + return sum(value_list) / len(value_list) + + +def chain(value, *fns): + result = value + for fn in fns: + result = fn(result) + return result + + +def human_readable_size(size, over_time=True): + if over_time: + prefixes = ['B/s', 'KiB/s', 'MiB/s', 'GiB/s', 'TiB/s', 'PiB/s', 'EiB/s'] + else: + prefixes = ['B', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB', 'EiB'] + tmp = size + magnitude = 0 + while tmp >= 1024 and magnitude < len(prefixes)-1: + tmp /= 1024.0 + magnitude += 1 + return '%f %s' % (tmp, prefixes[magnitude]) + + +ansi_modes = {'clear': 0, + 'bold': 1, + 'blink': 5, + 'fg_black': 30, + 'fg_red': 31, + 'fg_green': 32, + 'fg_yellow': 33, + 'fg_blue': 34, + 'fg_magenta': 35, + 'fg_cyan': 36, + 'fg_white': 37, + 'bg_black': 40, + 'bg_red': 41, + 'bg_green': 42, + 'bg_yellow': 43, + 'bg_blue': 44, + 'bg_magenta': 45, + 'bg_cyan': 46, + 'bg_white': 47} + +# These three things are all classes in their own right. +IORecord = collections.namedtuple('IORecord', ['size', 'elapsed_time']) + +TestWorkerRecord = collections.namedtuple('TestWorkerRecord', + ['handle', 'queue']) + +TestResultRecord = collections.namedtuple('TestResultRecord', + ['worker_pid', 'write_result', + 'read_result', 'test_size', 'cycle']) + + +class TimedReader: + def __init__(self, source, size=None): + self.source = source + self.read_samples = [] + self.done = False + self.last_read_time = time.time() + self.size = size + self._total_read = 0 + + def read(self, size): + current = time.time() + result = self.source.read(size) + bytes_read = len(result) + self.read_samples.append( + IORecord(size=bytes_read, + elapsed_time=current - self.last_read_time)) + self._total_read += bytes_read + self.last_read_time = current + if result == '' or (self._total_read >= self.size): + self.done = True + return result + + def reset(self): + self.source.seek(0) + self.read_samples = [] + self.done = False + self.last_read_time = time.time() + self._total_read = 0 + + def time_so_far(self): + return sum([rec.elapsed_time for rec in self.read_samples]) + + def read_so_far(self): + return sum([rec.size for rec in self.read_samples]) + + def io_rate(self): + return self.read_so_far() / self.time_so_far() + + +class PerformanceTestWorker(): + + def __init__(self, args, queue): + self.pid = None + self.args = args + self.queue = queue + self.files_written = [] + self.swift_connection = None + self.results = [] + + # This process is what the controller process calls to start execution. + def start(self, sem): + worker = multiprocessing.Process(target=_call_instance_method, + args=(self, '_do_it', sem)) + worker.start() + + # This function gets bootstrapped on the remote process side. + def _do_it(self, semaphore): + semaphore.acquire() + try: + self.pid = os.getpid() + num_cycles = int(self.args['num_cycles']) + MEGABYTE = 1024*1024 + sizes = [1*MEGABYTE, 4*MEGABYTE, 16*MEGABYTE, 64*MEGABYTE] + self.swift_connection = swiftclient.client.Connection( + authurl=self.args['authurl'], + user=self.args['username'], + key=self.args['password'], + tenant_name=self.args['tenantname'], + auth_version='2') + for cycle in xrange(1, num_cycles+1): + for size in sizes: + result = self._test_cycle(size, cycle) + self.results.append(result) + except: + self.queue.put([]) + raise + else: + self.queue.put(self.results) + finally: + semaphore.release() + + def _test_cycle(self, size, cycle): + zero_name = '%d-%d-zero-%d' % (size, self.pid, cycle) + random_name = '%d-%d-random-%d' % (size, self.pid, cycle) + + zero_file = open('/tmp/%s' % zero_name, 'w') + random_file = open('/tmp/%s' % random_name, 'w') + + # Create files to read from, to make test more accurate + zero_file.write(open('/dev/zero').read(size)) + random_file.write(open('/dev/urandom').read(size)) + + zero_file.close() + random_file.close() + + zero_file = open('/tmp/%s' % zero_name, 'r') + random_file = open('/tmp/%s' % random_name, 'r') + + # Write the file + zero_timer = TimedReader(zero_file, size) + self.swift_connection.put_object('.performance_test', + zero_name, + zero_timer, size) + random_timer = TimedReader(random_file, size) + self.swift_connection.put_object('.performance_test', + random_name, + random_timer, size) + write_result = {'zero_avg': zero_timer.io_rate(), + 'random_avg': random_timer.io_rate(), + } + + # Read the file back + zero_stream = \ + self.swift_connection.get_object('.performance_test', zero_name, + resp_chunk_size=65536)[1] + random_stream = \ + self.swift_connection.get_object('.performance_test', random_name, + resp_chunk_size=65536)[1] + zero_timer = TimedReader(zero_stream, size) + random_timer = TimedReader(random_stream, size) + read_size = 65536 + while not (zero_timer.done or random_timer.done): + zero_timer.read(read_size) + random_timer.read(read_size) + read_result = {'zero_avg': zero_timer.io_rate(), + 'random_avg': random_timer.io_rate(), + } + + # Cleanup + self.swift_connection.delete_object('.performance_test', zero_name) + self.swift_connection.delete_object('.performance_test', random_name) + os.unlink('/tmp/%s' % zero_name) + os.unlink('/tmp/%s' % random_name) + + return TestResultRecord(write_result=write_result, + read_result=read_result, + worker_pid=self.pid, + test_size=size, + cycle=cycle) + + +class PerformanceTestController(): + + def __init__(self, args): + self.args = args + self.semaphore = None + self.workers = [] + + def create_worker(self): + queue = multiprocessing.Queue() + worker = PerformanceTestWorker(self.args, queue) + self.workers.append(TestWorkerRecord(worker, queue)) + + def start_test(self): + self.semaphore = multiprocessing.Semaphore(len(self.workers)) + for worker_record in self.workers: + worker_record.handle.start(self.semaphore) + + def is_done(self): + if not self.semaphore: + return False + return self.semaphore.get_value() == len(self.workers) + + def get_results(self): + worker_results = [worker.queue.get() for worker in self.workers] + results = list(itertools.chain(*worker_results)) + + # This next bit only works because Python's sort algorithm is stable. + # What this is supposed to do is sort this list so that we have + # the test size as the primary sort key, then worker_pid, then cycle. + # The order matters. + results = sorted(results, key=lambda x: x.cycle) + results = sorted(results, key=lambda x: x.worker_pid) + results = sorted(results, key=lambda x: x.test_size) + return results + + +# TODO: is there any nicer way to handle the view for this test? +# maybe in the future some fancy graphical view? something to copy-paste +# into slide decks +class PerformanceTestViewer(): + + def __init__(self, results): + self.results = results + + def title(self, content): + return apply_ansi(content, ansi_modes['bold']) + + def header(self, content): + return apply_ansi(content, ansi_modes['fg_green']) + + def error(self, content): + return apply_ansi(content, ansi_modes['fg_red']) + + def _group_by(self, iterable, keyfunc): + return {k: list(v) for k, v + in itertools.groupby(iterable, key=keyfunc)} + + # TODO: make this much more concise + def show_results(self): + size_to_results = self._group_by(self.results, + keyfunc=lambda x: x.test_size) + + pretty_print(size_to_results) + for size in size_to_results: + pid_to_results = self._group_by(size_to_results[size], + keyfunc=lambda x: x.worker_pid) + print self.title('Testing with files of size %s:' % + human_readable_size(size, over_time=False)) + print '---' + + print self.header(' Ingesting random bytes:') + for pid in pid_to_results: + print self.header(' Worker %s:' % pid), \ + map(lambda res: human_readable_size( + res.write_result['random_avg']), + pid_to_results[pid]) + print self.header(' Overall average per worker:'), \ + chain(map(lambda res: res.write_result['random_avg'], + size_to_results[size]), average, human_readable_size) + print '' + + print self.header(' Ingesting zero bytes:') + for pid in pid_to_results: + print self.header(' Worker %s:' % pid), \ + map(lambda res: human_readable_size( + res.write_result['zero_avg']), + pid_to_results[pid]) + print self.header(' Overall average per worker:'), \ + chain(map(lambda res: res.write_result['zero_avg'], + size_to_results[size]), average, human_readable_size) + print '' + + print self.header(' Recalling random bytes:') + for pid in pid_to_results: + print self.header(' Worker %s:' % pid), \ + map(lambda res: human_readable_size( + res.read_result['random_avg']), + pid_to_results[pid]) + print self.header(' Overall average per worker:'), \ + chain(map(lambda res: res.read_result['random_avg'], + size_to_results[size]), average, human_readable_size) + print '' + + print self.header(' Recalling zero bytes:') + for pid in pid_to_results: + print self.header(' Worker %s:' % pid), \ + map(lambda res: human_readable_size( + res.read_result['zero_avg']), + pid_to_results[pid]) + print self.header(' Overall average per worker:'), \ + chain(map(lambda res: res.read_result['zero_avg'], + size_to_results[size]), average, human_readable_size) + print '---' + print '' + + +def main(): + args = _parse_config() + worker_count = int(args['num_workers']) + swift = swiftclient.client.Connection(authurl=args['authurl'], + user=args['username'], + tenant_name=args['tenantname'], + key=args['password'], + auth_version='2') + swift.put_container('.performance_test', + {'X-Storage-Policy': + args['hpss_storage_policy']}) + controller = PerformanceTestController(args) + for i in xrange(0, worker_count): + controller.create_worker() + print "Starting %d worker(s)..." % worker_count + controller.start_test() + print "Waiting for test results..." + # Wait for all "threads" to finish + time.sleep(.5) + while not controller.is_done(): + time.sleep(.1) + all_results = controller.get_results() + view = PerformanceTestViewer(all_results) + view.show_results() + +if __name__ == "__main__": + main()