From 81d4673674febdbe0bba6e27a6d24739456eb3c4 Mon Sep 17 00:00:00 2001 From: Mahati Chamarthy Date: Mon, 25 Jul 2016 20:10:44 +0530 Subject: [PATCH] Delete old tombstones - Call invalidate_hash in auditor for reclaimable tombstones - assert changed auditor behavior with a unit test - driveby test: assert get_hashes behavior with a unit test Co-Authored-By: Pete Zaitcev Co-Authored-By: Kota Tsuyuzaki Closes-Bug: #1301728 Change-Id: I3e99dc702d55a7424c6482969e03cb4afac854a4 --- swift/obj/auditor.py | 36 ++++++--- test/unit/obj/test_auditor.py | 141 ++++++++++++++++++++++++++++++++- test/unit/obj/test_diskfile.py | 58 ++++++++++++++ 3 files changed, 222 insertions(+), 13 deletions(-) diff --git a/swift/obj/auditor.py b/swift/obj/auditor.py index 88214ccacf..5aa9f96daa 100644 --- a/swift/obj/auditor.py +++ b/swift/obj/auditor.py @@ -19,6 +19,7 @@ import sys import time import signal import re +from os.path import basename, dirname, join from random import shuffle from swift import gettext_ as _ from contextlib import closing @@ -28,7 +29,8 @@ from swift.obj import diskfile, replicator from swift.common.utils import ( get_logger, ratelimit_sleep, dump_recon_cache, list_from_csv, listdir, unlink_paths_older_than, readconf, config_auto_int_value) -from swift.common.exceptions import DiskFileQuarantined, DiskFileNotExist +from swift.common.exceptions import DiskFileQuarantined, DiskFileNotExist,\ + DiskFileDeleted from swift.common.daemon import Daemon from swift.common.storage_policy import POLICIES @@ -43,7 +45,6 @@ class AuditorWorker(object): self.conf = conf self.logger = logger self.devices = devices - self.diskfile_router = diskfile.DiskFileRouter(conf, self.logger) self.max_files_per_second = float(conf.get('files_per_second', 20)) self.max_bytes_per_second = float(conf.get('bytes_per_second', 10000000)) @@ -56,17 +57,25 @@ class AuditorWorker(object): except (KeyError, SystemExit): # if we can't parse the real config (generally a KeyError on # __file__, or SystemExit on no object-replicator section) we use - # a very conservative default - default = 86400 + # a very conservative default for rsync_timeout + default_rsync_timeout = 86400 else: replicator_rsync_timeout = int(replicator_config.get( 'rsync_timeout', replicator.DEFAULT_RSYNC_TIMEOUT)) # Here we can do some light math for ops and use the *replicator's* # rsync_timeout (plus 15 mins to avoid deleting local tempfiles # before the remote replicator kills it's rsync) - default = replicator_rsync_timeout + 900 + default_rsync_timeout = replicator_rsync_timeout + 900 + # there's not really a good reason to assume the replicator + # section's reclaim_age is more appropriate than the reconstructor + # reclaim_age - but we're already parsing the config so we can set + # the default value in our config if it's not already set + if 'reclaim_age' in replicator_config: + conf.setdefault('reclaim_age', + replicator_config['reclaim_age']) self.rsync_tempfile_timeout = config_auto_int_value( - self.conf.get('rsync_tempfile_timeout'), default) + self.conf.get('rsync_tempfile_timeout'), default_rsync_timeout) + self.diskfile_router = diskfile.DiskFileRouter(conf, self.logger) self.auditor_type = 'ALL' self.zero_byte_only_at_fps = zero_byte_only_at_fps @@ -251,19 +260,26 @@ class AuditorWorker(object): incr_by=chunk_len) self.bytes_processed += chunk_len self.total_bytes_processed += chunk_len - except DiskFileNotExist: - pass except DiskFileQuarantined as err: self.quarantines += 1 self.logger.error(_('ERROR Object %(obj)s failed audit and was' ' quarantined: %(err)s'), {'obj': location, 'err': err}) + except DiskFileDeleted: + # If there is a reclaimable tombstone, we'll invalidate the hash + # to trigger the replciator to rehash/cleanup this suffix + ts = df._ondisk_info['ts_info']['timestamp'] + if (time.time() - float(ts)) > df.manager.reclaim_age: + df.manager.invalidate_hash(dirname(df._datadir)) + except DiskFileNotExist: + pass + self.passes += 1 # _ondisk_info attr is initialized to None and filled in by open ondisk_info_dict = df._ondisk_info or {} if 'unexpected' in ondisk_info_dict: is_rsync_tempfile = lambda fpath: RE_RSYNC_TEMPFILE.match( - os.path.basename(fpath)) + basename(fpath)) rsync_tempfile_paths = filter(is_rsync_tempfile, ondisk_info_dict['unexpected']) mtime = time.time() - self.rsync_tempfile_timeout @@ -282,7 +298,7 @@ class ObjectAuditor(Daemon): conf.get('zero_byte_files_per_second', 50)) self.recon_cache_path = conf.get('recon_cache_path', '/var/cache/swift') - self.rcache = os.path.join(self.recon_cache_path, "object.recon") + self.rcache = join(self.recon_cache_path, "object.recon") self.interval = int(conf.get('interval', 30)) def _sleep(self): diff --git a/test/unit/obj/test_auditor.py b/test/unit/obj/test_auditor.py index 78aa08a246..d61ff5f8da 100644 --- a/test/unit/obj/test_auditor.py +++ b/test/unit/obj/test_auditor.py @@ -14,6 +14,7 @@ # limitations under the License. from test import unit +import six.moves.cPickle as pickle import unittest import mock import os @@ -23,13 +24,14 @@ from shutil import rmtree from hashlib import md5 from tempfile import mkdtemp import textwrap +from os.path import dirname, basename, join from test.unit import (FakeLogger, patch_policies, make_timestamp_iter, DEFAULT_TEST_EC_TYPE) from swift.obj import auditor, replicator from swift.obj.diskfile import ( DiskFile, write_metadata, invalidate_hash, get_data_dir, DiskFileManager, ECDiskFileManager, AuditLocation, clear_auditor_status, - get_auditor_status) + get_auditor_status, HASH_FILE, HASH_INVALIDATIONS_FILE) from swift.common.utils import ( mkdirs, normalize_timestamp, Timestamp, readconf) from swift.common.storage_policy import ( @@ -328,7 +330,7 @@ class TestAuditor(unittest.TestCase): [object-auditor] rsync_tempfile_timeout = auto """ - with open(os.path.join(self.testdir, 'objserver.conf'), 'w') as f: + with open(config_path, 'w') as f: f.write(textwrap.dedent(stub_config)) conf = readconf(config_path, 'object-auditor') auditor_worker = auditor.AuditorWorker(conf, self.logger, @@ -346,7 +348,7 @@ class TestAuditor(unittest.TestCase): [object-auditor] rsync_tempfile_timeout = auto """ - with open(os.path.join(self.testdir, 'objserver.conf'), 'w') as f: + with open(config_path, 'w') as f: f.write(textwrap.dedent(stub_config)) conf = readconf(config_path, 'object-auditor') auditor_worker = auditor.AuditorWorker(conf, self.logger, @@ -746,6 +748,139 @@ class TestAuditor(unittest.TestCase): self.auditor.run_audit(**kwargs) self.assertFalse(os.path.exists(self.disk_file._datadir)) + def test_with_tombstone_delete(self): + test_md5 = '098f6bcd4621d373cade4e832627b4f6' + + def do_audit(self, timestamp, invalidate=False): + dir_path = self.disk_file._datadir + ts_file = os.path.join(dir_path, '%d.ts' % timestamp) + + # Create a .ts file + if not os.path.exists(dir_path): + mkdirs(dir_path) + fp = open(ts_file, 'w') + write_metadata(fp, {'X-Timestamp': '%d' % timestamp}) + fp.close() + # Create hashes.pkl + hash = dirname(dirname(ts_file)) # hash value of ts file + suffix = basename(hash) + hashes_pkl = join(os.path.dirname(hash), HASH_FILE) + with open(hashes_pkl, 'wb') as fp: + pickle.dump({suffix: test_md5}, fp, 0) + # Run auditor + kwargs = {'mode': 'once'} + self.auditor.run_audit(**kwargs) + # Check if hash invalid file exists + hash_invalid = join(dirname(hash), HASH_INVALIDATIONS_FILE) + hash_invalid_exists = os.path.exists(hash_invalid) + # If invalidate, fetch value from hashes.invalid + if invalidate: + with open(hash_invalid, 'rb') as fp: + hash_val = fp.read() + return hash_invalid_exists, hash_val, suffix + return hash_invalid_exists, ts_file + + self.auditor = auditor.ObjectAuditor(self.conf) + self.auditor.log_time = 0 + + now = time.time() + + # audit with a recent tombstone + hash_invalid_exists, ts_file = do_audit(self, now - 55) + self.assertFalse(hash_invalid_exists) + os.unlink(ts_file) + + # audit with a tombstone that is beyond default reclaim_age + hash_invalid_exists, hash_val, suffix = do_audit(self, now - (604800), + True) + self.assertTrue(hash_invalid_exists) + self.assertEqual(hash_val.strip('\n'), suffix) + + def test_auditor_reclaim_age(self): + # if we don't have access to the replicator config section we'll + # diskfile's default + auditor_worker = auditor.AuditorWorker(self.conf, self.logger, + self.rcache, self.devices) + router = auditor_worker.diskfile_router + for policy in POLICIES: + self.assertEqual(router[policy].reclaim_age, 86400 * 7) + + # if the reclaim_age option is set explicitly we use that + self.conf['reclaim_age'] = '1800' + auditor_worker = auditor.AuditorWorker(self.conf, self.logger, + self.rcache, self.devices) + router = auditor_worker.diskfile_router + for policy in POLICIES: + self.assertEqual(router[policy].reclaim_age, 1800) + + # if we have a real config we can be a little smarter + config_path = os.path.join(self.testdir, 'objserver.conf') + + # if there is no object-replicator section we still have to fall back + # to default because we can't parse the config for that section! + stub_config = """ + [object-auditor] + """ + with open(config_path, 'w') as f: + f.write(textwrap.dedent(stub_config)) + conf = readconf(config_path, 'object-auditor') + auditor_worker = auditor.AuditorWorker(conf, self.logger, + self.rcache, self.devices) + router = auditor_worker.diskfile_router + for policy in POLICIES: + self.assertEqual(router[policy].reclaim_age, 86400 * 7) + + # verify reclaim_age is of auditor config value + stub_config = """ + [object-replicator] + [object-auditor] + reclaim_age = 60 + """ + with open(config_path, 'w') as f: + f.write(textwrap.dedent(stub_config)) + conf = readconf(config_path, 'object-auditor') + auditor_worker = auditor.AuditorWorker(conf, self.logger, + self.rcache, self.devices) + router = auditor_worker.diskfile_router + for policy in POLICIES: + self.assertEqual(router[policy].reclaim_age, 60) + + # verify reclaim_age falls back to replicator config value + # if there is no auditor config value + config_path = os.path.join(self.testdir, 'objserver.conf') + stub_config = """ + [object-replicator] + reclaim_age = 60 + [object-auditor] + """ + with open(config_path, 'w') as f: + f.write(textwrap.dedent(stub_config)) + conf = readconf(config_path, 'object-auditor') + auditor_worker = auditor.AuditorWorker(conf, self.logger, + self.rcache, self.devices) + router = auditor_worker.diskfile_router + for policy in POLICIES: + self.assertEqual(router[policy].reclaim_age, 60) + + # we'll prefer our own DEFAULT section to the replicator though + self.assertEqual(auditor_worker.rsync_tempfile_timeout, + replicator.DEFAULT_RSYNC_TIMEOUT + 900) + stub_config = """ + [DEFAULT] + reclaim_age = 1209600 + [object-replicator] + reclaim_age = 1800 + [object-auditor] + """ + with open(config_path, 'w') as f: + f.write(textwrap.dedent(stub_config)) + conf = readconf(config_path, 'object-auditor') + auditor_worker = auditor.AuditorWorker(conf, self.logger, + self.rcache, self.devices) + router = auditor_worker.diskfile_router + for policy in POLICIES: + self.assertEqual(router[policy].reclaim_age, 1209600) + def test_sleeper(self): with mock.patch( 'time.sleep', mock.MagicMock()) as mock_sleep: diff --git a/test/unit/obj/test_diskfile.py b/test/unit/obj/test_diskfile.py index cc7d97d6aa..cae095046b 100644 --- a/test/unit/obj/test_diskfile.py +++ b/test/unit/obj/test_diskfile.py @@ -5052,6 +5052,64 @@ class TestSuffixHashes(unittest.TestCase): hashes = df_mgr.get_hashes('sda1', '0', [], policy) self.assertEqual(hashes, {}) + def test_hash_suffix_one_reclaim_tombstone_with_hash_pkl(self): + for policy in self.iter_policies(): + df_mgr = self.df_router[policy] + df = df_mgr.get_diskfile( + 'sda1', '0', 'a', 'c', 'o', policy=policy) + suffix_dir = os.path.dirname(df._datadir) + part_dir = os.path.dirname(suffix_dir) + hash_file = os.path.join(part_dir, diskfile.HASH_FILE) + + # scale back reclaim age a bit + df_mgr.reclaim_age = 1000 + # write a tombstone that's just a *little* older + old_time = time() - 1001 + timestamp = Timestamp(old_time) + df.delete(timestamp.internal) + hashes = df_mgr.get_hashes('sda1', '0', [], policy) + # sanity + self.assertEqual(hashes, {}) + self.assertFalse(os.path.exists(df._datadir)) + + hash_timestamp = os.stat(hash_file).st_mtime + + # if hash.pkl exists, that .ts file is not reclaimed + df = df_mgr.get_diskfile( + 'sda1', '0', 'a', 'c', 'o', policy=policy) + df.delete(timestamp.internal) + + hashes = df_mgr.get_hashes('sda1', '0', [], policy) + # This was a cached value so the value looks empty + self.assertEqual(hashes, {}) + # and the hash.pkl is not touched + self.assertEqual(hash_timestamp, os.stat(hash_file).st_mtime) + # and we still have tombstone entry + tombstone = '%s.ts' % timestamp.internal + self.assertTrue(os.path.exists(df._datadir)) + self.assertIn(tombstone, os.listdir(df._datadir)) + + # However if we call invalidate_hash for the suffix dir, + # get_hashes can reclaim the tombstone + with mock.patch('swift.obj.diskfile.lock_path'): + df_mgr.invalidate_hash(suffix_dir) + + hashes = df_mgr.get_hashes('sda1', '0', [], policy) + + self.assertEqual(hashes, {}) + # If we have no other objects in the suffix, get_hashes + # doesn't reclaim anything + self.assertTrue(os.path.exists(df._datadir)) + self.assertIn(tombstone, os.listdir(df._datadir)) + self.assertEqual(hash_timestamp, os.stat(hash_file).st_mtime) + + # *BUT* if suffix value is given to recalc, it can force to recaim! + suffix = os.path.dirname(suffix_dir) + hashes = df_mgr.get_hashes('sda1', '0', [suffix], policy) + self.assertFalse(os.path.exists(df._datadir)) + # hash.pkl was updated + self.assertGreater(os.stat(hash_file).st_mtime, hash_timestamp) + def test_hash_suffix_one_reclaim_and_one_valid_tombstone(self): for policy in self.iter_policies(): paths, suffix = find_paths_with_matching_suffixes(2, 1)