Merge "Fix reclaim on deleted containers" into stable/juno

This commit is contained in:
Jenkins 2015-07-08 05:05:16 +00:00 committed by Gerrit Code Review
commit 512b6a781e
5 changed files with 161 additions and 103 deletions

View File

@ -466,7 +466,7 @@ class Replicator(Daemon):
delete_timestamp > put_timestamp and \
info['count'] in (None, '', 0, '0'):
if self.report_up_to_date(info):
self.delete_db(object_file)
self.delete_db(broker)
self.logger.timing_since('timing', start_time)
return
responses = []

View File

@ -39,9 +39,14 @@ class ContainerReplicator(db_replicator.Replicator):
default_port = 6001
def report_up_to_date(self, full_info):
for key in ('put_timestamp', 'delete_timestamp', 'object_count',
'bytes_used'):
if full_info['reported_' + key] != full_info[key]:
reported_key_map = {
'reported_put_timestamp': 'put_timestamp',
'reported_delete_timestamp': 'delete_timestamp',
'reported_bytes_used': 'bytes_used',
'reported_object_count': 'count',
}
for reported, value_key in reported_key_map.items():
if full_info[reported] != full_info[value_key]:
return False
return True

View File

@ -187,17 +187,23 @@ class ExampleBroker(DatabaseBroker):
db_type = 'test'
db_contains_type = 'test'
db_reclaim_timestamp = 'created_at'
def _initialize(self, conn, put_timestamp, **kwargs):
if not self.account:
raise ValueError(
'Attempting to create a new database with no account set')
conn.executescript('''
CREATE TABLE test_stat (
account TEXT,
test_count INTEGER DEFAULT 0,
created_at TEXT,
put_timestamp TEXT DEFAULT '0',
delete_timestamp TEXT DEFAULT '0',
hash TEXT default '00000000000000000000000000000000',
id TEXT,
status_changed_at TEXT DEFAULT '0',
metadata TEXT DEFAULT ''
);
CREATE TABLE test (
ROWID INTEGER PRIMARY KEY AUTOINCREMENT,
@ -218,10 +224,10 @@ class ExampleBroker(DatabaseBroker):
''')
conn.execute("""
INSERT INTO test_stat (
created_at, put_timestamp, status_changed_at)
VALUES (?, ?, ?);
""", (Timestamp(time.time()).internal, put_timestamp,
put_timestamp))
account, created_at, id, put_timestamp, status_changed_at)
VALUES (?, ?, ?, ?, ?);
""", (self.account, Timestamp(time.time()).internal, str(uuid4()),
put_timestamp, put_timestamp))
def merge_items(self, item_list):
with self.get() as conn:
@ -268,6 +274,13 @@ class ExampleBroker(DatabaseBroker):
def delete_test(self, name, timestamp):
self._load_item(name, timestamp, 1)
def _delete_db(self, conn, timestamp):
conn.execute("""
UPDATE test_stat
SET delete_timestamp = ?,
status_changed_at = ?
WHERE delete_timestamp < ? """, (timestamp, timestamp, timestamp))
def _is_deleted(self, conn):
info = conn.execute('SELECT * FROM test_stat').fetchone()
return (info['test_count'] in (None, '', 0, '0')) and \
@ -284,10 +297,18 @@ class TestExampleBroker(unittest.TestCase):
broker_class = ExampleBroker
policy = 0
def setUp(self):
self.ts = (Timestamp(t).internal for t in
itertools.count(int(time.time())))
def test_delete_db(self):
broker = self.broker_class(':memory:', account='a', container='c')
broker.initialize(next(self.ts))
broker.delete_db(next(self.ts))
self.assertTrue(broker.is_deleted())
def test_merge_timestamps_simple_delete(self):
ts = (Timestamp(t).internal for t in
itertools.count(int(time.time())))
put_timestamp = ts.next()
put_timestamp = next(self.ts)
broker = self.broker_class(':memory:', account='a', container='c')
broker.initialize(put_timestamp)
created_at = broker.get_info()['created_at']
@ -298,7 +319,7 @@ class TestExampleBroker(unittest.TestCase):
self.assertEqual(info['delete_timestamp'], '0')
self.assertEqual(info['status_changed_at'], put_timestamp)
# delete
delete_timestamp = ts.next()
delete_timestamp = next(self.ts)
broker.merge_timestamps(created_at, put_timestamp, delete_timestamp)
self.assert_(broker.is_deleted())
info = broker.get_info()
@ -314,9 +335,7 @@ class TestExampleBroker(unittest.TestCase):
broker.delete_test('test', timestamp)
def test_merge_timestamps_delete_with_objects(self):
ts = (Timestamp(t).internal for t in
itertools.count(int(time.time())))
put_timestamp = ts.next()
put_timestamp = next(self.ts)
broker = self.broker_class(':memory:', account='a', container='c')
broker.initialize(put_timestamp, storage_policy_index=int(self.policy))
created_at = broker.get_info()['created_at']
@ -327,11 +346,11 @@ class TestExampleBroker(unittest.TestCase):
self.assertEqual(info['delete_timestamp'], '0')
self.assertEqual(info['status_changed_at'], put_timestamp)
# add object
self.put_item(broker, ts.next())
self.put_item(broker, next(self.ts))
self.assertEqual(broker.get_info()[
'%s_count' % broker.db_contains_type], 1)
# delete
delete_timestamp = ts.next()
delete_timestamp = next(self.ts)
broker.merge_timestamps(created_at, put_timestamp, delete_timestamp)
self.assertFalse(broker.is_deleted())
info = broker.get_info()
@ -341,20 +360,18 @@ class TestExampleBroker(unittest.TestCase):
# status is unchanged
self.assertEqual(info['status_changed_at'], put_timestamp)
# count is causing status to hold on
self.delete_item(broker, ts.next())
self.delete_item(broker, next(self.ts))
self.assertEqual(broker.get_info()[
'%s_count' % broker.db_contains_type], 0)
self.assert_(broker.is_deleted())
def test_merge_timestamps_simple_recreate(self):
ts = (Timestamp(t).internal for t in
itertools.count(int(time.time())))
put_timestamp = ts.next()
put_timestamp = next(self.ts)
broker = self.broker_class(':memory:', account='a', container='c')
broker.initialize(put_timestamp, storage_policy_index=int(self.policy))
virgin_status_changed_at = broker.get_info()['status_changed_at']
created_at = broker.get_info()['created_at']
delete_timestamp = ts.next()
delete_timestamp = next(self.ts)
broker.merge_timestamps(created_at, put_timestamp, delete_timestamp)
self.assert_(broker.is_deleted())
info = broker.get_info()
@ -365,7 +382,7 @@ class TestExampleBroker(unittest.TestCase):
self.assert_(orig_status_changed_at >
Timestamp(virgin_status_changed_at))
# recreate
recreate_timestamp = ts.next()
recreate_timestamp = next(self.ts)
status_changed_at = time.time()
with patch('swift.common.db.time.time', new=lambda: status_changed_at):
broker.merge_timestamps(created_at, recreate_timestamp, '0')
@ -377,14 +394,12 @@ class TestExampleBroker(unittest.TestCase):
self.assert_(info['status_changed_at'], status_changed_at)
def test_merge_timestamps_recreate_with_objects(self):
ts = (Timestamp(t).internal for t in
itertools.count(int(time.time())))
put_timestamp = ts.next()
put_timestamp = next(self.ts)
broker = self.broker_class(':memory:', account='a', container='c')
broker.initialize(put_timestamp, storage_policy_index=int(self.policy))
created_at = broker.get_info()['created_at']
# delete
delete_timestamp = ts.next()
delete_timestamp = next(self.ts)
broker.merge_timestamps(created_at, put_timestamp, delete_timestamp)
self.assert_(broker.is_deleted())
info = broker.get_info()
@ -395,12 +410,12 @@ class TestExampleBroker(unittest.TestCase):
self.assert_(Timestamp(orig_status_changed_at) >=
Timestamp(put_timestamp))
# add object
self.put_item(broker, ts.next())
self.put_item(broker, next(self.ts))
count_key = '%s_count' % broker.db_contains_type
self.assertEqual(broker.get_info()[count_key], 1)
self.assertFalse(broker.is_deleted())
# recreate
recreate_timestamp = ts.next()
recreate_timestamp = next(self.ts)
broker.merge_timestamps(created_at, recreate_timestamp, '0')
self.assertFalse(broker.is_deleted())
info = broker.get_info()
@ -409,34 +424,30 @@ class TestExampleBroker(unittest.TestCase):
self.assertEqual(info['delete_timestamp'], delete_timestamp)
self.assertEqual(info['status_changed_at'], orig_status_changed_at)
# count is not causing status to hold on
self.delete_item(broker, ts.next())
self.delete_item(broker, next(self.ts))
self.assertFalse(broker.is_deleted())
def test_merge_timestamps_update_put_no_status_change(self):
ts = (Timestamp(t).internal for t in
itertools.count(int(time.time())))
put_timestamp = ts.next()
put_timestamp = next(self.ts)
broker = self.broker_class(':memory:', account='a', container='c')
broker.initialize(put_timestamp, storage_policy_index=int(self.policy))
info = broker.get_info()
orig_status_changed_at = info['status_changed_at']
created_at = info['created_at']
new_put_timestamp = ts.next()
new_put_timestamp = next(self.ts)
broker.merge_timestamps(created_at, new_put_timestamp, '0')
info = broker.get_info()
self.assertEqual(new_put_timestamp, info['put_timestamp'])
self.assertEqual(orig_status_changed_at, info['status_changed_at'])
def test_merge_timestamps_update_delete_no_status_change(self):
ts = (Timestamp(t).internal for t in
itertools.count(int(time.time())))
put_timestamp = ts.next()
put_timestamp = next(self.ts)
broker = self.broker_class(':memory:', account='a', container='c')
broker.initialize(put_timestamp, storage_policy_index=int(self.policy))
created_at = broker.get_info()['created_at']
broker.merge_timestamps(created_at, put_timestamp, ts.next())
broker.merge_timestamps(created_at, put_timestamp, next(self.ts))
orig_status_changed_at = broker.get_info()['status_changed_at']
new_delete_timestamp = ts.next()
new_delete_timestamp = next(self.ts)
broker.merge_timestamps(created_at, put_timestamp,
new_delete_timestamp)
info = broker.get_info()
@ -444,16 +455,14 @@ class TestExampleBroker(unittest.TestCase):
self.assertEqual(orig_status_changed_at, info['status_changed_at'])
def test_get_max_row(self):
ts = (normalize_timestamp(t) for t in
itertools.count(int(time.time())))
broker = self.broker_class(':memory:', account='a', container='c')
broker.initialize(ts.next(), storage_policy_index=int(self.policy))
broker.initialize(next(self.ts), storage_policy_index=int(self.policy))
self.assertEquals(-1, broker.get_max_row())
self.put_item(broker, ts.next())
self.put_item(broker, next(self.ts))
self.assertEquals(1, broker.get_max_row())
self.delete_item(broker, ts.next())
self.delete_item(broker, next(self.ts))
self.assertEquals(2, broker.get_max_row())
self.put_item(broker, ts.next())
self.put_item(broker, next(self.ts))
self.assertEquals(3, broker.get_max_row())
def test_get_info(self):
@ -493,10 +502,8 @@ class TestExampleBroker(unittest.TestCase):
json.dumps(metadata))
def test_put_timestamp(self):
ts = (Timestamp(t).internal for t in
itertools.count(int(time.time())))
broker = self.broker_class(':memory:', account='a', container='c')
orig_put_timestamp = ts.next()
orig_put_timestamp = next(self.ts)
broker.initialize(orig_put_timestamp,
storage_policy_index=int(self.policy))
self.assertEqual(broker.get_info()['put_timestamp'],
@ -506,7 +513,7 @@ class TestExampleBroker(unittest.TestCase):
self.assertEqual(broker.get_info()['put_timestamp'],
orig_put_timestamp)
# put_timestamp newer - gets newer
newer_put_timestamp = ts.next()
newer_put_timestamp = next(self.ts)
broker.update_put_timestamp(newer_put_timestamp)
self.assertEqual(broker.get_info()['put_timestamp'],
newer_put_timestamp)
@ -516,10 +523,8 @@ class TestExampleBroker(unittest.TestCase):
newer_put_timestamp)
def test_status_changed_at(self):
ts = (Timestamp(t).internal for t in
itertools.count(int(time.time())))
broker = self.broker_class(':memory:', account='test', container='c')
put_timestamp = ts.next()
put_timestamp = next(self.ts)
created_at = time.time()
with patch('swift.common.db.time.time', new=lambda: created_at):
broker.initialize(put_timestamp,
@ -528,13 +533,13 @@ class TestExampleBroker(unittest.TestCase):
put_timestamp)
self.assertEquals(broker.get_info()['created_at'],
Timestamp(created_at).internal)
status_changed_at = ts.next()
status_changed_at = next(self.ts)
broker.update_status_changed_at(status_changed_at)
self.assertEqual(broker.get_info()['status_changed_at'],
status_changed_at)
# save the old and get a new status_changed_at
old_status_changed_at, status_changed_at = \
status_changed_at, ts.next()
status_changed_at, next(self.ts)
broker.update_status_changed_at(status_changed_at)
self.assertEqual(broker.get_info()['status_changed_at'],
status_changed_at)
@ -559,12 +564,11 @@ class TestExampleBroker(unittest.TestCase):
@with_tempdir
def test_commit_pending(self, tempdir):
ts = (Timestamp(t).internal for t in
itertools.count(int(time.time())))
broker = self.broker_class(os.path.join(tempdir, 'test.db'),
account='a', container='c')
broker.initialize(ts.next(), storage_policy_index=int(self.policy))
self.put_item(broker, ts.next())
broker.initialize(next(self.ts),
storage_policy_index=int(self.policy))
self.put_item(broker, next(self.ts))
qry = 'select * from %s_stat' % broker.db_type
with broker.get() as conn:
rows = [dict(x) for x in conn.execute(qry)]

View File

@ -34,6 +34,7 @@ from swift.common.exceptions import DriveNotMounted
from swift.common.swob import HTTPException
from test import unit
from test.unit.common.test_db import ExampleBroker
TEST_ACCOUNT_NAME = 'a c t'
@ -1302,11 +1303,19 @@ def attach_fake_replication_rpc(rpc, replicate_hook=None):
return FakeReplConnection
class ExampleReplicator(db_replicator.Replicator):
server_type = 'fake'
brokerclass = ExampleBroker
datadir = 'fake'
default_port = 1000
class TestReplicatorSync(unittest.TestCase):
backend = None # override in subclass
datadir = None
replicator_daemon = db_replicator.Replicator
# override in subclass
backend = ExampleReplicator.brokerclass
datadir = ExampleReplicator.datadir
replicator_daemon = ExampleReplicator
replicator_rpc = db_replicator.ReplicatorRpc
def setUp(self):
@ -1370,9 +1379,6 @@ class TestReplicatorSync(unittest.TestCase):
return daemon
def test_local_ids(self):
if self.datadir is None:
# base test case
return
for drive in ('sda', 'sdb', 'sdd'):
os.makedirs(os.path.join(self.root, drive, self.datadir))
for node in self._ring.devs:
@ -1383,5 +1389,46 @@ class TestReplicatorSync(unittest.TestCase):
self.assertEqual(daemon._local_device_ids,
set([node['id']]))
def test_clean_up_after_deleted_brokers(self):
broker = self._get_broker('a', 'c', node_index=0)
part, node = self._get_broker_part_node(broker)
part = str(part)
daemon = self._run_once(node)
# create a super old broker and delete it!
forever_ago = time.time() - daemon.reclaim_age
put_timestamp = normalize_timestamp(forever_ago - 2)
delete_timestamp = normalize_timestamp(forever_ago - 1)
broker.initialize(put_timestamp)
broker.delete_db(delete_timestamp)
# if we have a container broker make sure it's reported
if hasattr(broker, 'reported'):
info = broker.get_info()
broker.reported(info['put_timestamp'],
info['delete_timestamp'],
info['object_count'],
info['bytes_used'])
info = broker.get_replication_info()
self.assertTrue(daemon.report_up_to_date(info))
# we have a part dir
part_root = os.path.join(self.root, node['device'], self.datadir)
parts = os.listdir(part_root)
self.assertEqual([part], parts)
# with a single suffix
suff = os.listdir(os.path.join(part_root, part))
self.assertEqual(1, len(suff))
# running replicator will remove the deleted db
daemon = self._run_once(node, daemon=daemon)
self.assertEqual(1, daemon.stats['remove'])
# we still have a part dir (but it's empty)
suff = os.listdir(os.path.join(part_root, part))
self.assertEqual(0, len(suff))
# run it again and there's nothing to do...
daemon = self._run_once(node, daemon=daemon)
self.assertEqual(0, daemon.stats['attempted'])
# but empty part dir is cleaned up!
parts = os.listdir(part_root)
self.assertEqual(0, len(parts))
if __name__ == '__main__':
unittest.main()

View File

@ -34,44 +34,6 @@ from test.unit import patch_policies
from contextlib import contextmanager
class TestReplicator(unittest.TestCase):
def setUp(self):
self.orig_ring = replicator.db_replicator.ring.Ring
replicator.db_replicator.ring.Ring = lambda *args, **kwargs: None
def tearDown(self):
replicator.db_replicator.ring.Ring = self.orig_ring
def test_report_up_to_date(self):
repl = replicator.ContainerReplicator({})
info = {'put_timestamp': Timestamp(1).internal,
'delete_timestamp': Timestamp(0).internal,
'object_count': 0,
'bytes_used': 0,
'reported_put_timestamp': Timestamp(1).internal,
'reported_delete_timestamp': Timestamp(0).internal,
'reported_object_count': 0,
'reported_bytes_used': 0}
self.assertTrue(repl.report_up_to_date(info))
info['delete_timestamp'] = Timestamp(2).internal
self.assertFalse(repl.report_up_to_date(info))
info['reported_delete_timestamp'] = Timestamp(2).internal
self.assertTrue(repl.report_up_to_date(info))
info['object_count'] = 1
self.assertFalse(repl.report_up_to_date(info))
info['reported_object_count'] = 1
self.assertTrue(repl.report_up_to_date(info))
info['bytes_used'] = 1
self.assertFalse(repl.report_up_to_date(info))
info['reported_bytes_used'] = 1
self.assertTrue(repl.report_up_to_date(info))
info['put_timestamp'] = Timestamp(3).internal
self.assertFalse(repl.report_up_to_date(info))
info['reported_put_timestamp'] = Timestamp(3).internal
self.assertTrue(repl.report_up_to_date(info))
@patch_policies
class TestReplicatorSync(test_db_replicator.TestReplicatorSync):
@ -80,6 +42,46 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync):
replicator_daemon = replicator.ContainerReplicator
replicator_rpc = replicator.ContainerReplicatorRpc
def test_report_up_to_date(self):
broker = self._get_broker('a', 'c', node_index=0)
broker.initialize(Timestamp(1).internal, int(POLICIES.default))
info = broker.get_info()
broker.reported(info['put_timestamp'],
info['delete_timestamp'],
info['object_count'],
info['bytes_used'])
full_info = broker.get_replication_info()
expected_info = {'put_timestamp': Timestamp(1).internal,
'delete_timestamp': '0',
'count': 0,
'bytes_used': 0,
'reported_put_timestamp': Timestamp(1).internal,
'reported_delete_timestamp': '0',
'reported_object_count': 0,
'reported_bytes_used': 0}
for key, value in expected_info.items():
msg = 'expected value for %r, %r != %r' % (
key, full_info[key], value)
self.assertEqual(full_info[key], value, msg)
repl = replicator.ContainerReplicator({})
self.assertTrue(repl.report_up_to_date(full_info))
full_info['delete_timestamp'] = Timestamp(2).internal
self.assertFalse(repl.report_up_to_date(full_info))
full_info['reported_delete_timestamp'] = Timestamp(2).internal
self.assertTrue(repl.report_up_to_date(full_info))
full_info['count'] = 1
self.assertFalse(repl.report_up_to_date(full_info))
full_info['reported_object_count'] = 1
self.assertTrue(repl.report_up_to_date(full_info))
full_info['bytes_used'] = 1
self.assertFalse(repl.report_up_to_date(full_info))
full_info['reported_bytes_used'] = 1
self.assertTrue(repl.report_up_to_date(full_info))
full_info['put_timestamp'] = Timestamp(3).internal
self.assertFalse(repl.report_up_to_date(full_info))
full_info['reported_put_timestamp'] = Timestamp(3).internal
self.assertTrue(repl.report_up_to_date(full_info))
def test_sync_remote_in_sync(self):
# setup a local container
broker = self._get_broker('a', 'c', node_index=0)