Merge "Breakup reclaim into batches" into stable/ussuri
This commit is contained in:
commit
94f975e931
|
@ -53,6 +53,9 @@ PICKLE_PROTOCOL = 2
|
|||
# records will be merged.
|
||||
PENDING_CAP = 131072
|
||||
|
||||
SQLITE_ARG_LIMIT = 999
|
||||
RECLAIM_PAGE_SIZE = 10000
|
||||
|
||||
|
||||
def utf8encode(*args):
|
||||
return [(s.encode('utf8') if isinstance(s, six.text_type) else s)
|
||||
|
@ -981,16 +984,48 @@ class DatabaseBroker(object):
|
|||
with lock_parent_directory(self.pending_file,
|
||||
self.pending_timeout):
|
||||
self._commit_puts()
|
||||
with self.get() as conn:
|
||||
self._reclaim(conn, age_timestamp, sync_timestamp)
|
||||
self._reclaim_metadata(conn, age_timestamp)
|
||||
conn.commit()
|
||||
marker = ''
|
||||
finished = False
|
||||
while not finished:
|
||||
with self.get() as conn:
|
||||
marker = self._reclaim(conn, age_timestamp, marker)
|
||||
if not marker:
|
||||
finished = True
|
||||
self._reclaim_other_stuff(
|
||||
conn, age_timestamp, sync_timestamp)
|
||||
conn.commit()
|
||||
|
||||
def _reclaim(self, conn, age_timestamp, sync_timestamp):
|
||||
conn.execute('''
|
||||
DELETE FROM %s WHERE deleted = 1 AND %s < ?
|
||||
''' % (self.db_contains_type, self.db_reclaim_timestamp),
|
||||
(age_timestamp,))
|
||||
def _reclaim_other_stuff(self, conn, age_timestamp, sync_timestamp):
|
||||
"""
|
||||
This is only called once at the end of reclaim after _reclaim has been
|
||||
called for each page.
|
||||
"""
|
||||
self._reclaim_sync(conn, sync_timestamp)
|
||||
self._reclaim_metadata(conn, age_timestamp)
|
||||
|
||||
def _reclaim(self, conn, age_timestamp, marker):
|
||||
clean_batch_qry = '''
|
||||
DELETE FROM %s WHERE deleted = 1
|
||||
AND name > ? AND %s < ?
|
||||
''' % (self.db_contains_type, self.db_reclaim_timestamp)
|
||||
curs = conn.execute('''
|
||||
SELECT name FROM %s WHERE deleted = 1
|
||||
AND name > ?
|
||||
ORDER BY NAME LIMIT 1 OFFSET ?
|
||||
''' % (self.db_contains_type,), (marker, RECLAIM_PAGE_SIZE))
|
||||
row = curs.fetchone()
|
||||
if row:
|
||||
# do a single book-ended DELETE and bounce out
|
||||
end_marker = row[0]
|
||||
conn.execute(clean_batch_qry + ' AND name <= ?', (
|
||||
marker, age_timestamp, end_marker))
|
||||
else:
|
||||
# delete off the end and reset marker to indicate we're done
|
||||
end_marker = ''
|
||||
conn.execute(clean_batch_qry, (marker, age_timestamp))
|
||||
return end_marker
|
||||
|
||||
def _reclaim_sync(self, conn, sync_timestamp):
|
||||
try:
|
||||
conn.execute('''
|
||||
DELETE FROM outgoing_sync WHERE updated_at < ?
|
||||
|
|
|
@ -34,9 +34,7 @@ from swift.common.utils import Timestamp, encode_timestamps, \
|
|||
get_db_files, parse_db_filename, make_db_file_path, split_path, \
|
||||
RESERVED_BYTE
|
||||
from swift.common.db import DatabaseBroker, utf8encode, BROKER_TIMEOUT, \
|
||||
zero_like, DatabaseAlreadyExists
|
||||
|
||||
SQLITE_ARG_LIMIT = 999
|
||||
zero_like, DatabaseAlreadyExists, SQLITE_ARG_LIMIT
|
||||
|
||||
DATADIR = 'containers'
|
||||
|
||||
|
@ -1581,9 +1579,9 @@ class ContainerBroker(DatabaseBroker):
|
|||
CONTAINER_STAT_VIEW_SCRIPT +
|
||||
'COMMIT;')
|
||||
|
||||
def _reclaim(self, conn, age_timestamp, sync_timestamp):
|
||||
super(ContainerBroker, self)._reclaim(conn, age_timestamp,
|
||||
sync_timestamp)
|
||||
def _reclaim_other_stuff(self, conn, age_timestamp, sync_timestamp):
|
||||
super(ContainerBroker, self)._reclaim_other_stuff(
|
||||
conn, age_timestamp, sync_timestamp)
|
||||
# populate instance cache, but use existing conn to avoid deadlock
|
||||
# when it has a pending update
|
||||
self._populate_instance_cache(conn=conn)
|
||||
|
|
|
@ -180,6 +180,72 @@ class TestAccountBroker(unittest.TestCase):
|
|||
broker.delete_db(Timestamp.now().internal)
|
||||
broker.reclaim(Timestamp.now().internal, time())
|
||||
|
||||
def test_batched_reclaim(self):
|
||||
num_of_containers = 60
|
||||
container_specs = []
|
||||
now = time()
|
||||
top_of_the_minute = now - (now % 60)
|
||||
c = itertools.cycle([True, False])
|
||||
for m, is_deleted in six.moves.zip(range(num_of_containers), c):
|
||||
offset = top_of_the_minute - (m * 60)
|
||||
container_specs.append((Timestamp(offset), is_deleted))
|
||||
random.seed(now)
|
||||
random.shuffle(container_specs)
|
||||
policy_indexes = list(p.idx for p in POLICIES)
|
||||
broker = AccountBroker(':memory:', account='test_account')
|
||||
broker.initialize(Timestamp('1').internal)
|
||||
for i, container_spec in enumerate(container_specs):
|
||||
# with container12 before container2 and shuffled ts.internal we
|
||||
# shouldn't be able to accidently rely on any implicit ordering
|
||||
name = 'container%s' % i
|
||||
pidx = random.choice(policy_indexes)
|
||||
ts, is_deleted = container_spec
|
||||
if is_deleted:
|
||||
broker.put_container(name, 0, ts.internal, 0, 0, pidx)
|
||||
else:
|
||||
broker.put_container(name, ts.internal, 0, 0, 0, pidx)
|
||||
|
||||
def count_reclaimable(conn, reclaim_age):
|
||||
return conn.execute(
|
||||
"SELECT count(*) FROM container "
|
||||
"WHERE deleted = 1 AND delete_timestamp < ?", (reclaim_age,)
|
||||
).fetchone()[0]
|
||||
|
||||
# This is intended to divide the set of timestamps exactly in half
|
||||
# regardless of the value of now
|
||||
reclaim_age = top_of_the_minute + 1 - (num_of_containers / 2 * 60)
|
||||
with broker.get() as conn:
|
||||
self.assertEqual(count_reclaimable(conn, reclaim_age),
|
||||
num_of_containers / 4)
|
||||
|
||||
orig__reclaim = broker._reclaim
|
||||
trace = []
|
||||
|
||||
def tracing_reclaim(conn, age_timestamp, marker):
|
||||
trace.append((age_timestamp, marker,
|
||||
count_reclaimable(conn, age_timestamp)))
|
||||
return orig__reclaim(conn, age_timestamp, marker)
|
||||
|
||||
with mock.patch.object(broker, '_reclaim', new=tracing_reclaim), \
|
||||
mock.patch('swift.common.db.RECLAIM_PAGE_SIZE', 10):
|
||||
broker.reclaim(reclaim_age, reclaim_age)
|
||||
with broker.get() as conn:
|
||||
self.assertEqual(count_reclaimable(conn, reclaim_age), 0)
|
||||
self.assertEqual(3, len(trace), trace)
|
||||
self.assertEqual([age for age, marker, reclaimable in trace],
|
||||
[reclaim_age] * 3)
|
||||
# markers are in-order
|
||||
self.assertLess(trace[0][1], trace[1][1])
|
||||
self.assertLess(trace[1][1], trace[2][1])
|
||||
# reclaimable count gradually decreases
|
||||
# generally, count1 > count2 > count3, but because of the randomness
|
||||
# we may occassionally have count1 == count2 or count2 == count3
|
||||
self.assertGreaterEqual(trace[0][2], trace[1][2])
|
||||
self.assertGreaterEqual(trace[1][2], trace[2][2])
|
||||
# technically, this might happen occasionally, but *really* rarely
|
||||
self.assertTrue(trace[0][2] > trace[1][2] or
|
||||
trace[1][2] > trace[2][2])
|
||||
|
||||
def test_delete_db_status(self):
|
||||
start = next(self.ts)
|
||||
broker = AccountBroker(':memory:', account='a')
|
||||
|
|
|
@ -1154,7 +1154,7 @@ class TestDatabaseBroker(unittest.TestCase):
|
|||
return broker
|
||||
|
||||
# only testing _reclaim_metadata here
|
||||
@patch.object(DatabaseBroker, '_reclaim')
|
||||
@patch.object(DatabaseBroker, '_reclaim', return_value='')
|
||||
def test_metadata(self, mock_reclaim):
|
||||
# Initializes a good broker for us
|
||||
broker = self.get_replication_info_tester(metadata=True)
|
||||
|
|
|
@ -28,6 +28,7 @@ from contextlib import contextmanager
|
|||
import sqlite3
|
||||
import pickle
|
||||
import json
|
||||
import itertools
|
||||
|
||||
import six
|
||||
|
||||
|
@ -558,6 +559,98 @@ class TestContainerBroker(unittest.TestCase):
|
|||
broker.reclaim(Timestamp.now().internal, time())
|
||||
broker.delete_db(Timestamp.now().internal)
|
||||
|
||||
def test_batch_reclaim(self):
|
||||
num_of_objects = 60
|
||||
obj_specs = []
|
||||
now = time()
|
||||
top_of_the_minute = now - (now % 60)
|
||||
c = itertools.cycle([True, False])
|
||||
for m, is_deleted in six.moves.zip(range(num_of_objects), c):
|
||||
offset = top_of_the_minute - (m * 60)
|
||||
obj_specs.append((Timestamp(offset), is_deleted))
|
||||
random.seed(now)
|
||||
random.shuffle(obj_specs)
|
||||
policy_indexes = list(p.idx for p in POLICIES)
|
||||
broker = ContainerBroker(':memory:', account='test_account',
|
||||
container='test_container')
|
||||
broker.initialize(Timestamp('1').internal, 0)
|
||||
for i, obj_spec in enumerate(obj_specs):
|
||||
# with object12 before object2 and shuffled ts.internal we
|
||||
# shouldn't be able to accidently rely on any implicit ordering
|
||||
obj_name = 'object%s' % i
|
||||
pidx = random.choice(policy_indexes)
|
||||
ts, is_deleted = obj_spec
|
||||
if is_deleted:
|
||||
broker.delete_object(obj_name, ts.internal, pidx)
|
||||
else:
|
||||
broker.put_object(obj_name, ts.internal, 0, 'text/plain',
|
||||
'etag', storage_policy_index=pidx)
|
||||
|
||||
def count_reclaimable(conn, reclaim_age):
|
||||
return conn.execute(
|
||||
"SELECT count(*) FROM object "
|
||||
"WHERE deleted = 1 AND created_at < ?", (reclaim_age,)
|
||||
).fetchone()[0]
|
||||
|
||||
# This is intended to divide the set of timestamps exactly in half
|
||||
# regardless of the value of now
|
||||
reclaim_age = top_of_the_minute + 1 - (num_of_objects / 2 * 60)
|
||||
with broker.get() as conn:
|
||||
self.assertEqual(count_reclaimable(conn, reclaim_age),
|
||||
num_of_objects / 4)
|
||||
|
||||
orig__reclaim = broker._reclaim
|
||||
trace = []
|
||||
|
||||
def tracing_reclaim(conn, age_timestamp, marker):
|
||||
trace.append((age_timestamp, marker,
|
||||
count_reclaimable(conn, age_timestamp)))
|
||||
return orig__reclaim(conn, age_timestamp, marker)
|
||||
|
||||
with mock.patch.object(broker, '_reclaim', new=tracing_reclaim), \
|
||||
mock.patch('swift.common.db.RECLAIM_PAGE_SIZE', 10):
|
||||
broker.reclaim(reclaim_age, reclaim_age)
|
||||
|
||||
with broker.get() as conn:
|
||||
self.assertEqual(count_reclaimable(conn, reclaim_age), 0)
|
||||
self.assertEqual(3, len(trace), trace)
|
||||
self.assertEqual([age for age, marker, reclaimable in trace],
|
||||
[reclaim_age] * 3)
|
||||
# markers are in-order
|
||||
self.assertLess(trace[0][1], trace[1][1])
|
||||
self.assertLess(trace[1][1], trace[2][1])
|
||||
# reclaimable count gradually decreases
|
||||
# generally, count1 > count2 > count3, but because of the randomness
|
||||
# we may occassionally have count1 == count2 or count2 == count3
|
||||
self.assertGreaterEqual(trace[0][2], trace[1][2])
|
||||
self.assertGreaterEqual(trace[1][2], trace[2][2])
|
||||
# technically, this might happen occasionally, but *really* rarely
|
||||
self.assertTrue(trace[0][2] > trace[1][2] or
|
||||
trace[1][2] > trace[2][2])
|
||||
|
||||
def test_reclaim_with_duplicate_names(self):
|
||||
broker = ContainerBroker(':memory:', account='test_account',
|
||||
container='test_container')
|
||||
broker.initialize(Timestamp('1').internal, 0)
|
||||
now = time()
|
||||
ages_ago = Timestamp(now - (3 * 7 * 24 * 60 * 60))
|
||||
for i in range(10):
|
||||
for spidx in range(10):
|
||||
obj_name = 'object%s' % i
|
||||
broker.delete_object(obj_name, ages_ago.internal, spidx)
|
||||
reclaim_age = now - (2 * 7 * 24 * 60 * 60)
|
||||
with broker.get() as conn:
|
||||
self.assertEqual(conn.execute(
|
||||
"SELECT count(*) FROM object "
|
||||
"WHERE created_at < ?", (reclaim_age,)
|
||||
).fetchone()[0], 100)
|
||||
with mock.patch('swift.common.db.RECLAIM_PAGE_SIZE', 10):
|
||||
broker.reclaim(reclaim_age, reclaim_age)
|
||||
with broker.get() as conn:
|
||||
self.assertEqual(conn.execute(
|
||||
"SELECT count(*) FROM object "
|
||||
).fetchone()[0], 0)
|
||||
|
||||
@with_tempdir
|
||||
def test_reclaim_deadlock(self, tempdir):
|
||||
db_path = os.path.join(
|
||||
|
|
Loading…
Reference in New Issue