mpu-auditor: iterate multiple batches per cycle

Also add some basic exception handling.

Change-Id: Ie33664979dfa853e1eea828533f6b267156e85c0
This commit is contained in:
Alistair Coles 2024-04-22 16:15:28 +01:00
parent ba398e15ed
commit 9eda38ced9
2 changed files with 130 additions and 49 deletions

View File

@ -34,6 +34,19 @@ def safe_split_reserved_name(reserved_name):
return None, reserved_name
def yield_item_batches(broker, start_row, max_batches, batch_size):
remaining = max_batches * batch_size
while remaining > 0:
batch_limit = min(batch_size, remaining)
items = broker.get_items_since(start_row, batch_limit)
if items:
remaining -= len(items)
start_row = items[-1]['ROWID']
yield items
else:
remaining = 0
class Item(object):
def __init__(self, name, created_at, size=0, content_type='',
etag='', deleted=0, **kwargs):
@ -121,6 +134,7 @@ class MpuAuditorContext(object):
class BaseMpuBrokerAuditor(object):
ROWS_PER_BATCH = 1000
MAX_BATCHES_PER_CYCLE = 10
def __init__(self, client, logger, broker):
self.client = client
@ -137,6 +151,10 @@ class BaseMpuBrokerAuditor(object):
}
log_func('mpu_auditor ' + json.dumps(data))
def warning(self, fmt, *args):
msg = fmt % args
self.log(self.logger.warning, msg)
def debug(self, fmt, *args):
msg = fmt % args
self.log(self.logger.debug, msg)
@ -219,38 +237,37 @@ class BaseMpuBrokerAuditor(object):
self._process_marker(marker_item, upload)
def _audit_item(self, item):
try:
upload = extract_upload_prefix(item.name)
except ValueError as err:
self.log(self.log.warning, 'mpu_audit %s' % err)
return
self.debug('item %s %s', item.name, item.deleted)
upload = extract_upload_prefix(item.name)
if upload in self.uploads_already_checked:
return
self._process_item(item, upload)
self.uploads_already_checked[upload] = True
def audit(self):
self.broker.get_info()
self.debug('mpu_audit visiting %s', self.broker.path)
context = MpuAuditorContext.load(self.broker)
self.debug('auditing from row %s' % context.last_audit_row)
items = [
(it['ROWID'], Item(**it))
for it in self.broker.get_items_since(
context.last_audit_row, self.ROWS_PER_BATCH)
]
audited_rows = 0
for row_id, item in items:
self.debug('item %s %s', item.name, item.deleted)
context.last_audit_row = row_id
if not item.deleted:
audited_rows += 1
self._audit_item(item)
num_audited = num_processed = num_errors = 0
for batch in yield_item_batches(self.broker,
context.last_audit_row,
self.MAX_BATCHES_PER_CYCLE,
self.ROWS_PER_BATCH):
for item_dict in batch:
try:
item = Item(**item_dict)
if not item.deleted:
self._audit_item(item)
num_audited += 1
except Exception as err: # noqa
self.warning('%s: %s', item_dict['name'], str(err))
num_errors += 1
num_processed += 1
context.last_audit_row = item_dict['ROWID']
context.store(self.broker)
self.debug('processed_rows %d (%d audited)', len(items), audited_rows)
self.debug('processed: %d, audited: %d, errors: %d)',
num_processed, num_audited, num_errors)
return None

View File

@ -30,23 +30,14 @@ from swift.common.swob import Request, HTTPOk, HTTPNoContent, HTTPCreated, \
HTTPNotFound
from swift.common.utils import md5
from swift.container.backend import ContainerBroker
from swift.container.mpu_auditor import MpuAuditor, extract_upload_prefix
from swift.container.mpu_auditor import MpuAuditor, extract_upload_prefix, \
yield_item_batches
from swift.container.server import ContainerController
from test.debug_logger import debug_logger
from test.unit import make_timestamp_iter, EMPTY_ETAG
from test.unit.common.middleware.helpers import FakeSwift
class TestModuleFunctions(unittest.TestCase):
def test_extract_upload_prefix(self):
self.assertEqual('obj/upload-id',
extract_upload_prefix('obj/upload-id'))
self.assertEqual('obj/upload-id',
extract_upload_prefix('obj/upload-id/manifest'))
self.assertEqual('obj/upload-id',
extract_upload_prefix('obj/upload-id/marker-deleted'))
class BaseTestMpuAuditor(unittest.TestCase):
user_container = 'c'
audit_container = get_reserved_name('mpu_manifests', user_container)
@ -77,9 +68,11 @@ class BaseTestMpuAuditor(unittest.TestCase):
return_value=self.broker):
return req.get_response(self.server)
def put_objects(self, objects):
def put_objects(self, objects, shuffle_order=True):
# PUT object updates to container in random order
random.shuffle(objects)
if shuffle_order:
# don't mutate the passed-in list!
objects = random.sample(objects, k=len(objects))
for obj in objects:
headers = {'name': obj['name'],
'x-timestamp': obj['created_at'],
@ -91,6 +84,10 @@ class BaseTestMpuAuditor(unittest.TestCase):
method='PUT', headers=headers)
resp = self.make_request(req)
self.assertEqual(201, resp.status_int, resp.body)
# On py2 the broker has a nasty habit of re-ordering batches of
# pending updates before merging; to make tests more deterministic,
# use get_info() to flush the pending file after every PUT
self.broker.get_info()
def delete_objects(self, objects):
ts = next(self.ts_iter)
@ -103,6 +100,17 @@ class BaseTestMpuAuditor(unittest.TestCase):
resp = self.make_request(req)
self.assertEqual(204, resp.status_int)
def _create_manifest_spec(self):
upload_id = unique_id()
manifest_name = '/'.join([get_reserved_name(self.obj_name), upload_id])
manifest_spec = {'name': manifest_name,
'created_at': next(self.ts_iter).normal,
'content_type': 'text/plain',
'etag': 'etag_1',
'size': 1024,
}
return upload_id, manifest_spec
def _create_marker_spec(self, upload, timestamp,
marker_type=MPU_DELETED_MARKER_SUFFIX):
return {'name': '/'.join([upload, marker_type]),
@ -120,21 +128,6 @@ class BaseTestMpuAuditor(unittest.TestCase):
for k, v in exp.items():
self.assertEqual(v, ctxt_value[k])
class TestMpuAuditorMPU(BaseTestMpuAuditor):
parts_container = get_reserved_name('mpu_parts', 'c')
def _create_manifest_spec(self):
upload_id = unique_id()
manifest_name = '/'.join([get_reserved_name(self.obj_name), upload_id])
manifest_spec = {'name': manifest_name,
'created_at': next(self.ts_iter).normal,
'content_type': 'text/plain',
'etag': 'etag_1',
'size': 1024,
}
return upload_id, manifest_spec
def _check_broker_rows(self, expected_items):
rows = self.broker.get_objects(include_deleted=False)
self.assertEqual(sorted([o['name'] for o in expected_items]),
@ -145,6 +138,53 @@ class TestMpuAuditorMPU(BaseTestMpuAuditor):
self.assertEqual(sorted([o['name'] for o in expected_items]),
sorted([row['name'] for row in rows]))
class TestModuleFunctions(BaseTestMpuAuditor):
def test_extract_upload_prefix(self):
self.assertEqual('obj/upload-id',
extract_upload_prefix('obj/upload-id'))
self.assertEqual('obj/upload-id',
extract_upload_prefix('obj/upload-id/manifest'))
self.assertEqual('obj/upload-id',
extract_upload_prefix('obj/upload-id/marker-deleted'))
def test_yield_item_batches(self):
items = [self._create_manifest_spec()[1] for i in range(123)]
self.put_objects(items, shuffle_order=False)
self._check_broker_rows(items)
# stop at max batches
actual = [b for b in yield_item_batches(self.broker, 0, 2, 50)]
self.assertEqual(2, len(actual))
self.assertEqual([it['name'] for it in items[:50]],
[a['name'] for a in actual[0]])
self.assertEqual([it['name'] for it in items[50:100]],
[a['name'] for a in actual[1]])
self.assertEqual(1, actual[0][0]['ROWID'])
self.assertEqual(100, actual[-1][-1]['ROWID'])
# stop at end
actual = [b for b in yield_item_batches(self.broker, 0, 1000, 100)]
self.assertEqual(2, len(actual))
self.assertEqual([it['name'] for it in items[:100]],
[a['name'] for a in actual[0]])
self.assertEqual([it['name'] for it in items[100:]],
[a['name'] for a in actual[1]])
self.assertEqual(1, actual[0][0]['ROWID'])
self.assertEqual(123, actual[-1][-1]['ROWID'])
# start at start_row
actual = [b for b in yield_item_batches(self.broker, 114, 100, 10)]
self.assertEqual(1, len(actual))
self.assertEqual([it['name'] for it in items[114:]],
[a['name'] for a in actual[0]])
self.assertEqual(115, actual[0][0]['ROWID'])
self.assertEqual(123, actual[-1][-1]['ROWID'])
class TestMpuAuditorMPU(BaseTestMpuAuditor):
parts_container = get_reserved_name('mpu_parts', 'c')
@contextlib.contextmanager
def _mock_internal_client(self, registered_calls):
fake_swift = FakeSwift()
@ -156,6 +196,30 @@ class TestMpuAuditorMPU(BaseTestMpuAuditor):
return_value=fake_ic):
yield fake_swift
def test_audit_cycle_continues_past_error(self):
upload_1, manifest_1 = self._create_manifest_spec()
upload_2, manifest_2 = self._create_manifest_spec()
calls = []
def mock_audit_item(auditor, item):
calls.append(item)
if len(calls) == 1:
raise Exception('boom')
self.put_objects([manifest_1, manifest_2], shuffle_order=False)
with self._mock_internal_client([]):
auditor = MpuAuditor({}, debug_logger('test'))
with mock.patch(
'swift.container.mpu_auditor.BaseMpuBrokerAuditor._audit_item',
mock_audit_item):
auditor.audit(self.broker)
self.assertEqual(2, len(calls))
self.assertEqual([manifest_1['name'], manifest_2['name']],
[item.name for item in calls])
warning_lines = auditor.logger.get_lines_for_level('warning')
self.assertEqual(1, len(warning_lines))
self.assertIn('boom', warning_lines[0])
def test_audit_delete_marker(self):
upload_1, manifest_1 = self._create_manifest_spec()
upload_2, manifest_2 = self._create_manifest_spec()