Cleanup lint errors in test suite

Add some more linting to test modules and clean 'em up

Change-Id: I8bece8e1e2971ef508c058220dd2646ae880fe35
This commit is contained in:
Clay Gerrard 2016-11-29 14:48:21 -08:00
parent cc6f4bba26
commit 685bc06180
3 changed files with 161 additions and 206 deletions

View File

@ -154,11 +154,11 @@ class TestPyECLibDriver(unittest.TestCase):
_m = 4
else:
_m = 5
driver = ECDriver(k=10, m=_m, ec_type=_type, validate=True)
ECDriver(k=10, m=_m, ec_type=_type, validate=True)
available_ec_types.append(_type)
except Exception as e:
except Exception:
# ignore any errors, assume backend not available
continue
pass
self.assertEqual(available_ec_types, VALID_EC_TYPES)
def test_valid_algo(self):
@ -184,43 +184,43 @@ class TestPyECLibDriver(unittest.TestCase):
_type1 = 'jerasure_rs_vand'
if _type1 in VALID_EC_TYPES:
pyeclib_drivers.append(ECDriver(k=12, m=2, ec_type=_type1,
chksum_type=csum))
chksum_type=csum))
pyeclib_drivers.append(ECDriver(k=11, m=2, ec_type=_type1,
chksum_type=csum))
chksum_type=csum))
pyeclib_drivers.append(ECDriver(k=10, m=2, ec_type=_type1,
chksum_type=csum))
chksum_type=csum))
pyeclib_drivers.append(ECDriver(k=8, m=4, ec_type=_type1,
chksum_type=csum))
chksum_type=csum))
_type2 = 'liberasurecode_rs_vand'
if _type2 in VALID_EC_TYPES:
pyeclib_drivers.append(ECDriver(k=12, m=2, ec_type=_type2,
chksum_type=csum))
chksum_type=csum))
pyeclib_drivers.append(ECDriver(k=11, m=2, ec_type=_type2,
chksum_type=csum))
chksum_type=csum))
pyeclib_drivers.append(ECDriver(k=10, m=2, ec_type=_type2,
chksum_type=csum))
chksum_type=csum))
pyeclib_drivers.append(ECDriver(k=8, m=4, ec_type=_type2,
chksum_type=csum))
chksum_type=csum))
_type3_1 = 'flat_xor_hd'
if _type3_1 in VALID_EC_TYPES:
pyeclib_drivers.append(ECDriver(k=12, m=6, ec_type=_type3_1,
chksum_type=csum))
chksum_type=csum))
pyeclib_drivers.append(ECDriver(k=10, m=5, ec_type=_type3_1,
chksum_type=csum))
chksum_type=csum))
_type3_2 = 'flat_xor_hd_4'
if _type3_2 in VALID_EC_TYPES:
pyeclib_drivers.append(ECDriver(k=12, m=6, ec_type=_type3_2,
chksum_type=csum))
chksum_type=csum))
pyeclib_drivers.append(ECDriver(k=10, m=5, ec_type=_type3_2,
chksum_type=csum))
chksum_type=csum))
_type4 = 'shss'
if _type4 in VALID_EC_TYPES:
pyeclib_drivers.append(ECDriver(k=10, m=4, ec_type=_type4,
chksum_type=csum))
chksum_type=csum))
pyeclib_drivers.append(ECDriver(k=20, m=4, ec_type=_type4,
chksum_type=csum))
chksum_type=csum))
pyeclib_drivers.append(ECDriver(k=11, m=7, ec_type=_type4,
chksum_type=csum))
chksum_type=csum))
return pyeclib_drivers
def test_small_encode(self):
@ -274,68 +274,6 @@ class TestPyECLibDriver(unittest.TestCase):
'Invalid fragment payload in ECPyECLibDriver.reconstruct',
str(exc_mgr.exception))
# def disabled_test_verify_fragment_algsig_chksum_fail(self):
# pyeclib_drivers = []
# pyeclib_drivers.append(
# ECDriver(k=12, m=2, ec_type="jerasure_rs_vand", chksum_type="algsig"))
# pyeclib_drivers.append(
# ECDriver(k=12, m=3, ec_type="jerasure_rs_vand", chksum_type="algsig"))
# pyeclib_drivers.append(
# ECDriver(k=12, m=6, ec_type="flat_xor_hd", chksum_type="algsig"))
# pyeclib_drivers.append(
# ECDriver(k=10, m=5, ec_type="flat_xor_hd", chksum_type="algsig"))
#
# filesize = 1024 * 1024 * 3
# file_str = ''.join(random.choice(ascii_letters) for i in range(filesize))
# file_bytes = file_str.encode('utf-8')
#
# fragment_to_corrupt = random.randint(0, 12)
#
# for pyeclib_driver in pyeclib_drivers:
# fragments = pyeclib_driver.encode(file_bytes)
# fragment_metadata_list = []
#
# i = 0
# for fragment in fragments:
# if i == fragment_to_corrupt:
# corrupted_fragment = fragment[:100] +\
# (str(chr((b2i(fragment[100]) + 0x1)
# % 0xff))).encode('utf-8') + fragment[101:]
# fragment_metadata_list.append(pyeclib_driver.get_metadata(corrupted_fragment))
# else:
# fragment_metadata_list.append(pyeclib_driver.get_metadata(fragment))
# i += 1
#
# self.assertTrue(pyeclib_driver.verify_stripe_metadata(fragment_metadata_list) != -1)
#
# def disabled_test_verify_fragment_inline_succeed(self):
# pyeclib_drivers = []
# pyeclib_drivers.append(
# ECDriver(k=12, m=2, ec_type="jerasure_rs_vand", chksum_type="algsig"))
# pyeclib_drivers.append(
# ECDriver(k=12, m=3, ec_type="jerasure_rs_vand", chksum_type="algsig"))
# pyeclib_drivers.append(
# ECDriver(k=12, m=6, ec_type="flat_xor_hd", chksum_type="algsig"))
# pyeclib_drivers.append(
# ECDriver(k=10, m=5, ec_type="flat_xor_hd", chksum_type="algsig"))
#
# filesize = 1024 * 1024 * 3
# file_str = ''.join(random.choice(ascii_letters) for i in range(filesize))
# file_bytes = file_str.encode('utf-8')
#
# for pyeclib_driver in pyeclib_drivers:
# fragments = pyeclib_driver.encode(file_bytes)
#
# fragment_metadata_list = []
#
# for fragment in fragments:
# fragment_metadata_list.append(
# pyeclib_driver.get_metadata(fragment))
#
# self.assertTrue(
# pyeclib_driver.verify_stripe_metadata(fragment_metadata_list) == -1)
#
def check_metadata_formatted(self, k, m, ec_type, chksum_type):
if ec_type not in VALID_EC_TYPES:
@ -401,7 +339,8 @@ class TestPyECLibDriver(unittest.TestCase):
def test_verify_fragment_inline_chksum_fail(self):
pyeclib_drivers = self.get_pyeclib_testspec("inline_crc32")
filesize = 1024 * 1024 * 3
file_str = ''.join(random.choice(ascii_letters) for i in range(filesize))
file_str = ''.join(random.choice(ascii_letters)
for i in range(filesize))
file_bytes = file_str.encode('utf-8')
for pyeclib_driver in pyeclib_drivers:
@ -412,8 +351,8 @@ class TestPyECLibDriver(unittest.TestCase):
first_fragment_to_corrupt = random.randint(0, len(fragments))
num_to_corrupt = 2
fragments_to_corrupt = [
(first_fragment_to_corrupt + i) % len(fragments) for i in range(num_to_corrupt + 1)
]
(first_fragment_to_corrupt + i) % len(fragments)
for i in range(num_to_corrupt + 1)]
fragments_to_corrupt.sort()
i = 0
@ -429,8 +368,8 @@ class TestPyECLibDriver(unittest.TestCase):
pyeclib_driver.get_metadata(fragment))
i += 1
expected_ret_value = {"status": -206,
"reason": "Bad checksum",
expected_ret_value = {"status": -206,
"reason": "Bad checksum",
"bad_fragments": fragments_to_corrupt}
self.assertEqual(
pyeclib_driver.verify_stripe_metadata(fragment_metadata_list),
@ -440,7 +379,8 @@ class TestPyECLibDriver(unittest.TestCase):
pyeclib_drivers = self.get_pyeclib_testspec("inline_crc32")
filesize = 1024 * 1024 * 3
file_str = ''.join(random.choice(ascii_letters) for i in range(filesize))
file_str = ''.join(random.choice(ascii_letters)
for i in range(filesize))
file_bytes = file_str.encode('utf-8')
for pyeclib_driver in pyeclib_drivers:
@ -452,10 +392,10 @@ class TestPyECLibDriver(unittest.TestCase):
fragment_metadata_list.append(
pyeclib_driver.get_metadata(fragment))
expected_ret_value = {"status": 0 }
expected_ret_value = {"status": 0}
self.assertTrue(
pyeclib_driver.verify_stripe_metadata(fragment_metadata_list) == expected_ret_value)
self.assertTrue(pyeclib_driver.verify_stripe_metadata(
fragment_metadata_list) == expected_ret_value)
def test_get_segment_byterange_info(self):
pyeclib_drivers = self.get_pyeclib_testspec()
@ -465,26 +405,42 @@ class TestPyECLibDriver(unittest.TestCase):
file_size = 1024 * 1024
segment_size = 3 * 1024
ranges = [(0, 1), (1, 12), (10, 1000), (0, segment_size-1), (1, segment_size+1), (segment_size-1, 2*segment_size)]
ranges = [
(0, 1),
(1, 12),
(10, 1000),
(0, segment_size - 1),
(1, segment_size + 1),
(segment_size - 1, 2 * segment_size)]
expected_results = {}
expected_results[(0, 1)] = {0: (0, 1)}
expected_results[(1, 12)] = {0: (1, 12)}
expected_results[(10, 1000)] = {0: (10, 1000)}
expected_results[(0, segment_size-1)] = {0: (0, segment_size-1)}
expected_results[(1, segment_size+1)] = {0: (1, segment_size-1), 1: (0, 1)}
expected_results[(segment_size-1, 2*segment_size)] = {0: (segment_size-1, segment_size-1), 1: (0, segment_size-1), 2: (0, 0)}
expected_results[(0, segment_size - 1)] = {0: (0, segment_size - 1)}
expected_results[(1, segment_size + 1)
] = {0: (1, segment_size - 1), 1: (0, 1)}
expected_results[
(segment_size - 1, 2 * segment_size)] = {
0: (segment_size - 1, segment_size - 1),
1: (0, segment_size - 1),
2: (0, 0)}
results = pyeclib_drivers[0].get_segment_info_byterange(ranges, file_size, segment_size)
results = pyeclib_drivers[0].get_segment_info_byterange(
ranges, file_size, segment_size)
for exp_result_key in expected_results:
self.assertIn(exp_result_key, results)
self.assertTrue(len(results[exp_result_key]) == len(expected_results[exp_result_key]))
self.assertTrue(
len(results[exp_result_key]) ==
len(expected_results[exp_result_key]))
exp_result_map = expected_results[exp_result_key]
for segment_key in exp_result_map:
self.assertIn(segment_key, results[exp_result_key])
self.assertTrue(results[exp_result_key][segment_key] == expected_results[exp_result_key][segment_key])
self.assertTrue(
results[exp_result_key][segment_key] ==
expected_results[exp_result_key][segment_key])
def test_get_segment_info(self):
pyeclib_drivers = self.get_pyeclib_testspec()
@ -504,8 +460,8 @@ class TestPyECLibDriver(unittest.TestCase):
#
char_set = ascii_uppercase + digits
for segment_size in segment_sizes:
segment_strings[segment_size] = \
''.join(random.choice(char_set) for i in range(segment_size * 2))
segment_strings[segment_size] = ''.join(
random.choice(char_set) for i in range(segment_size * 2))
for pyeclib_driver in pyeclib_drivers:
for file_size in file_sizes:
@ -632,7 +588,7 @@ class TestPyECLibDriver(unittest.TestCase):
self.assertEqual(len(reconstructed_fragments),
len(idxs_to_remove))
for idx, frag_data in zip(idxs_to_remove,
reconstructed_fragments):
reconstructed_fragments):
self.assertEqual(
frag_data, orig_fragments[idx],
'Failed to reconstruct fragment %d!' % idx)
@ -640,11 +596,12 @@ class TestPyECLibDriver(unittest.TestCase):
#
# Test decode with integrity checks
#
first_fragment_to_corrupt = random.randint(0, len(fragments))
first_fragment_to_corrupt = random.randint(
0, len(fragments))
num_to_corrupt = min(len(fragments), pyeclib_driver.m + 1)
fragments_to_corrupt = [
(first_fragment_to_corrupt + i) % len(fragments) for i in range(num_to_corrupt)
]
(first_fragment_to_corrupt + i) % len(fragments)
for i in range(num_to_corrupt)]
if StrictVersion(LIBERASURECODE_VERSION) < \
StrictVersion('1.2.0'):
@ -653,10 +610,11 @@ class TestPyECLibDriver(unittest.TestCase):
continue
i = 0
for fragment in fragments:
if i in fragments_to_corrupt:
corrupted_fragment = ("0" * len(fragment)).encode('utf-8')
fragments[i] = corrupted_fragment
i += 1
if i in fragments_to_corrupt:
corrupted_fragment = (
"0" * len(fragment)).encode('utf-8')
fragments[i] = corrupted_fragment
i += 1
self.assertRaises(ECInvalidFragmentMetadata,
pyeclib_driver.decode,
@ -693,28 +651,6 @@ class TestPyECLibDriver(unittest.TestCase):
self.assertTrue(
pyeclib_drivers[0].min_parity_fragments_needed() == 1)
def test_get_segment_info_memory_usage(self):
for ec_driver in self.get_pyeclib_testspec():
self._test_get_segment_info_memory_usage(ec_driver)
def _test_get_segment_info_memory_usage(self, ec_driver):
# 1. Preapre the expected memory allocation
info = ec_driver.get_segment_info(1024*1024, 1024*1024)
info = None
loop_range = range(1000)
# 2. Get current memory usage
usage = resource.getrusage(resource.RUSAGE_SELF)[2]
# 3. Loop to call get_segment_info
for x in loop_range:
ec_driver.get_segment_info(1024*1024, 1024*1024)
# 4. memory usage shoudln't be increased
self.assertEqual(usage, resource.getrusage(resource.RUSAGE_SELF)[2],
'Memory usage is increased unexpectedly %s - %s' %
(usage, resource.getrusage(resource.RUSAGE_SELF)[2]))
def test_pyeclib_driver_repr_expression(self):
pyeclib_drivers = self.get_pyeclib_testspec()
for driver in pyeclib_drivers:
@ -727,6 +663,27 @@ class TestPyECLibDriver(unittest.TestCase):
"ECDriver(ec_type='%s', k=%s, m=%s)" %
(name, driver.k, driver.m), repr(driver))
def test_get_segment_info_memory_usage(self):
for ec_driver in self.get_pyeclib_testspec():
self._test_get_segment_info_memory_usage(ec_driver)
def _test_get_segment_info_memory_usage(self, ec_driver):
# 1. Preapre the expected memory allocation
ec_driver.get_segment_info(1024 * 1024, 1024 * 1024)
loop_range = range(1000)
# 2. Get current memory usage
usage = resource.getrusage(resource.RUSAGE_SELF)[2]
# 3. Loop to call get_segment_info
for x in loop_range:
ec_driver.get_segment_info(1024 * 1024, 1024 * 1024)
# 4. memory usage shoudln't be increased
self.assertEqual(usage, resource.getrusage(resource.RUSAGE_SELF)[2],
'Memory usage is increased unexpectedly %s - %s' %
(usage, resource.getrusage(resource.RUSAGE_SELF)[2]))
def test_get_metadata_memory_usage(self):
for ec_driver in self.get_pyeclib_testspec():
self._test_get_metadata_memory_usage(ec_driver)
@ -734,8 +691,7 @@ class TestPyECLibDriver(unittest.TestCase):
def _test_get_metadata_memory_usage(self, ec_driver):
# 1. Prepare the expected memory allocation
encoded = ec_driver.encode(b'aaa')
info = ec_driver.get_metadata(encoded[0], formatted=True)
info = None
ec_driver.get_metadata(encoded[0], formatted=True)
loop_range = range(1000)
# 2. Get current memory usage

View File

@ -52,6 +52,12 @@ class Timer:
self.end_time = time.time()
return self.curr_delta()
def require_backend(backend):
return unittest.skipIf(backend not in VALID_EC_TYPES,
"%s backend is not available" % backend)
class TestPyECLib(unittest.TestCase):
def __init__(self, *args):
@ -122,6 +128,18 @@ class TestPyECLib(unittest.TestCase):
def tearDown(self):
pass
def iter_available_types(self, ec_types):
found_one = False
for ec_type in ec_types:
if ec_type.name not in VALID_EC_TYPES:
continue
found_one = True
yield ec_type
if not found_one:
type_list = ', '.join(t.name for t in ec_types)
raise unittest.SkipTest('No backend available in types: %r' %
type_list)
def time_encode(self, num_data, num_parity, ec_type, hd,
file_size, iterations):
"""
@ -134,7 +152,7 @@ class TestPyECLib(unittest.TestCase):
timer.start()
for l in range(iterations):
fragments = pyeclib_c.encode(handle, whole_file_bytes)
pyeclib_c.encode(handle, whole_file_bytes)
tsum = timer.stop_and_return()
return tsum / iterations
@ -155,7 +173,6 @@ class TestPyECLib(unittest.TestCase):
orig_fragments = fragments[:]
for i in range(iterations):
missing_idxs = []
num_missing = hd - 1
for j in range(num_missing):
num_frags_left = len(fragments)
@ -164,20 +181,20 @@ class TestPyECLib(unittest.TestCase):
timer.start()
decoded_file_bytes = pyeclib_c.decode(handle,
fragments,
len(fragments[0]))
fragments,
len(fragments[0]))
tsum += timer.stop_and_return()
fragments = orig_fragments[:]
if whole_file_bytes != decoded_file_bytes:
success = False
success = False
return success, tsum / iterations
def time_range_decode(self,
num_data, num_parity, ec_type, hd,
file_size, iterations):
num_data, num_parity, ec_type, hd,
file_size, iterations):
"""
:return 2-tuple, (success, average decode time)
"""
@ -187,8 +204,10 @@ class TestPyECLib(unittest.TestCase):
whole_file_bytes = self.get_tmp_file(file_size).read()
success = True
begins = [int(random.randint(0, len(whole_file_bytes) - 1)) for i in range(3)]
ends = [int(random.randint(begins[i], len(whole_file_bytes))) for i in range(3)]
begins = [int(random.randint(0, len(whole_file_bytes) - 1))
for i in range(3)]
ends = [int(random.randint(begins[i], len(whole_file_bytes)))
for i in range(3)]
ranges = list(zip(begins, ends))
@ -196,7 +215,6 @@ class TestPyECLib(unittest.TestCase):
orig_fragments = fragments[:]
for i in range(iterations):
missing_idxs = []
num_missing = hd - 1
for j in range(num_missing):
num_frags_left = len(fragments)
@ -214,13 +232,13 @@ class TestPyECLib(unittest.TestCase):
range_offset = 0
for r in ranges:
if whole_file_bytes[r[0]:r[1]+1] != decoded_file_ranges[range_offset]:
success = False
range_offset += 1
if whole_file_bytes[
r[0]: r[1] + 1] != decoded_file_ranges[range_offset]:
success = False
range_offset += 1
return success, tsum / iterations
def time_reconstruct(self,
num_data, num_parity, ec_type, hd,
file_size, iterations):
@ -243,7 +261,7 @@ class TestPyECLib(unittest.TestCase):
num_frags_left = len(fragments)
idx = random.randint(0, num_frags_left - 1)
while idx in missing_idxs:
idx = random.randint(0, num_frags_left - 1)
idx = random.randint(0, num_frags_left - 1)
missing_idxs.append(idx)
fragments.pop(idx)
@ -261,7 +279,7 @@ class TestPyECLib(unittest.TestCase):
fd_orig.write(orig_fragments[missing_idxs[0]])
with open("decoded_fragments", "wb") as fd_decoded:
fd_decoded.write(reconstructed_fragment)
print(("Fragment %d was not reconstructed!!!" % missing_idxs[0]))
print("Fragment %d was not reconstructed!!!" % missing_idxs[0])
sys.exit(2)
return success, tsum / iterations
@ -277,13 +295,11 @@ class TestPyECLib(unittest.TestCase):
return format(throughput, '.10g')
@require_backend("flat_xor_hd_3")
def test_xor_code(self):
if "flat_xor_hd_3" not in VALID_EC_TYPES:
print("xor backend is not available in your enviromnet, skipping test")
return
for (ec_type, k, m, hd) in self.xor_types:
print(("\nRunning tests for flat_xor_hd k=%d, m=%d, hd=%d" % (k, m, hd)))
print("\nRunning tests for flat_xor_hd k=%d, m=%d, hd=%d" %
(k, m, hd))
for size_str in self.sizes:
avg_time = self.time_encode(k, m, ec_type.value, hd,
@ -301,18 +317,14 @@ class TestPyECLib(unittest.TestCase):
(size_str, self.get_throughput(avg_time, size_str)))
for size_str in self.sizes:
success, avg_time = self.time_reconstruct(k, m, ec_type.value, hd,
size_str,
self.iterations)
success, avg_time = self.time_reconstruct(
k, m, ec_type.value, hd, size_str, self.iterations)
self.assertTrue(success)
print("Reconstruct (%s): %s" %
(size_str, self.get_throughput(avg_time, size_str)))
@require_backend("shss")
def test_shss(self):
if "shss" not in VALID_EC_TYPES:
print("shss backend is not available in your enviromnet, skipping test")
return
for (ec_type, k, m) in self.shss:
print(("\nRunning tests for %s k=%d, m=%d" % (ec_type, k, m)))
@ -335,9 +347,8 @@ class TestPyECLib(unittest.TestCase):
(size_str, self.get_throughput(avg_time, size_str)))
for size_str in self.sizes:
success, avg_time = self.time_reconstruct(k, m, ec_type.value, 0,
size_str,
self.iterations)
success, avg_time = self.time_reconstruct(
k, m, ec_type.value, 0, size_str, self.iterations)
self.assertTrue(success)
print("Reconstruct (%s): %s" %
(size_str, self.get_throughput(avg_time, size_str)))
@ -372,68 +383,56 @@ class TestPyECLib(unittest.TestCase):
if expected_fragments != required_fragments:
success = False
print(("Unexpected required fragments list "
"(exp != req): %s != %s" %
(expected_fragments, required_fragments)))
print("Unexpected required fragments list "
"(exp != req): %s != %s" % (
expected_fragments, required_fragments))
return success
def test_codes(self):
for ec_type in self.rs_types:
if ec_type.name not in VALID_EC_TYPES:
print("%s backend is not available in your enviromnet, skipping test" % ec_type.name)
continue
print(("\nRunning tests for %s" % (ec_type)))
for ec_type in self.iter_available_types(self.rs_types):
for i in range(len(self.num_datas)):
success = self._test_get_required_fragments(self.num_datas[i],
self.num_parities[i],
ec_type)
success = self._test_get_required_fragments(
self.num_datas[i], self.num_parities[i], ec_type)
self.assertTrue(success)
for i in range(len(self.num_datas)):
for size_str in self.sizes:
avg_time = self.time_encode(self.num_datas[i],
self.num_parities[i],
ec_type.value, self.num_parities[i] + 1,
size_str, self.iterations)
avg_time = self.time_encode(
self.num_datas[i], self.num_parities[i], ec_type.value,
self.num_parities[i] + 1, size_str, self.iterations)
print(("Encode (%s): %s" %
(size_str, self.get_throughput(avg_time, size_str))))
print("Encode (%s): %s" %
(size_str, self.get_throughput(avg_time, size_str)))
for i in range(len(self.num_datas)):
for size_str in self.sizes:
success, avg_time = self.time_decode(self.num_datas[i],
self.num_parities[i],
ec_type.value, self.num_parities[i] + 1,
size_str, self.iterations)
success, avg_time = self.time_decode(
self.num_datas[i], self.num_parities[i], ec_type.value,
self.num_parities[i] + 1, size_str, self.iterations)
self.assertTrue(success)
print(("Decode (%s): %s" %
(size_str, self.get_throughput(avg_time, size_str))))
for i in range(len(self.num_datas)):
for size_str in self.sizes:
success, avg_time = self.time_range_decode(self.num_datas[i],
self.num_parities[i],
ec_type.value, self.num_parities[i] + 1,
size_str, self.iterations)
self.assertTrue(success)
print(("Range Decode (%s): %s" %
(size_str, self.get_throughput(avg_time, size_str))))
print("Decode (%s): %s" %
(size_str, self.get_throughput(avg_time, size_str)))
for i in range(len(self.num_datas)):
for size_str in self.sizes:
success, avg_time = self.time_reconstruct(self.num_datas[i],
self.num_parities[i],
ec_type.value, self.num_parities[i] + 1,
size_str,
self.iterations)
success, avg_time = self.time_range_decode(
self.num_datas[i], self.num_parities[i], ec_type.value,
self.num_parities[i] + 1, size_str, self.iterations)
self.assertTrue(success)
print(("Reconstruct (%s): %s" %
(size_str, self.get_throughput(avg_time, size_str))))
print("Range Decode (%s): %s" %
(size_str, self.get_throughput(avg_time, size_str)))
for i in range(len(self.num_datas)):
for size_str in self.sizes:
success, avg_time = self.time_reconstruct(
self.num_datas[i], self.num_parities[i], ec_type.value,
self.num_parities[i] + 1, size_str, self.iterations)
self.assertTrue(success)
print("Reconstruct (%s): %s" %
(size_str, self.get_throughput(avg_time, size_str)))
if __name__ == "__main__":

View File

@ -11,7 +11,7 @@ commands=
deps=
pep8
commands=
pep8 pyeclib/ setup.py
pep8 pyeclib/ setup.py test/
[testenv:venv]
commands = {posargs}