Merge "Optimize obj replicator/reconstructor healthchecks"

This commit is contained in:
Zuul 2020-04-03 20:12:27 +00:00 committed by Gerrit Code Review
commit ede407d05f
5 changed files with 59 additions and 23 deletions

View File

@ -45,6 +45,7 @@ class Daemon(object):
multiple daemonized workers, they simply provide the behavior of the daemon
and context specific knowledge about how workers should be started.
"""
WORKERS_HEALTHCHECK_INTERVAL = 5.0
def __init__(self, conf):
self.conf = conf
@ -239,7 +240,7 @@ class DaemonStrategy(object):
if not self.spawned_pids():
self.logger.notice('Finished %s', os.getpid())
break
time.sleep(0.1)
time.sleep(self.daemon.WORKERS_HEALTHCHECK_INTERVAL)
self.daemon.post_multiprocess_run()
return 0

View File

@ -210,6 +210,7 @@ class ObjectReconstructor(Daemon):
'rebuild_handoff_node_count', 2))
self._df_router = DiskFileRouter(conf, self.logger)
self.all_local_devices = self.get_local_devices()
self.rings_mtime = None
def get_worker_args(self, once=False, **kwargs):
"""
@ -263,6 +264,11 @@ class ObjectReconstructor(Daemon):
if now > self._next_rcache_update:
self._next_rcache_update = now + self.stats_interval
self.aggregate_recon_update()
rings_mtime = [os.path.getmtime(self.load_object_ring(
policy).serialized_path) for policy in self.policies]
if self.rings_mtime == rings_mtime:
return True
self.rings_mtime = rings_mtime
return self.get_local_devices() == self.all_local_devices
def aggregate_recon_update(self):

View File

@ -139,6 +139,8 @@ class ObjectReplicator(Daemon):
int(conf.get('bind_port', 6200))
self.concurrency = int(conf.get('concurrency', 1))
self.replicator_workers = int(conf.get('replicator_workers', 0))
self.policies = [policy for policy in POLICIES
if policy.policy_type == REPL_POLICY]
self.stats_interval = int(conf.get('stats_interval', '300'))
self.ring_check_interval = int(conf.get('ring_check_interval', 15))
self.next_check = time.time() + self.ring_check_interval
@ -187,6 +189,7 @@ class ObjectReplicator(Daemon):
self.is_multiprocess_worker = None
self._df_router = DiskFileRouter(conf, self.logger)
self._child_process_reaper_queue = queue.LightQueue()
self.rings_mtime = None
def _zero_stats(self):
self.stats_for_dev = defaultdict(Stats)
@ -204,7 +207,7 @@ class ObjectReplicator(Daemon):
def _get_my_replication_ips(self):
my_replication_ips = set()
ips = whataremyips()
for policy in POLICIES:
for policy in self.policies:
self.load_object_ring(policy)
for local_dev in [dev for dev in policy.object_ring.devs
if dev and dev['replication_ip'] in ips and
@ -291,6 +294,11 @@ class ObjectReplicator(Daemon):
if time.time() >= self._next_rcache_update:
update = self.aggregate_recon_update()
dump_recon_cache(update, self.rcache, self.logger)
rings_mtime = [os.path.getmtime(self.load_object_ring(
policy).serialized_path) for policy in self.policies]
if self.rings_mtime == rings_mtime:
return True
self.rings_mtime = rings_mtime
return self.get_local_devices() == self.all_local_devices
def get_local_devices(self):
@ -303,9 +311,7 @@ class ObjectReplicator(Daemon):
"""
ips = whataremyips(self.bind_ip)
local_devices = set()
for policy in POLICIES:
if policy.policy_type != REPL_POLICY:
continue
for policy in self.policies:
self.load_object_ring(policy)
for device in policy.object_ring.devs:
if device and is_local_device(
@ -877,7 +883,7 @@ class ObjectReplicator(Daemon):
"""
jobs = []
ips = whataremyips(self.bind_ip)
for policy in POLICIES:
for policy in self.policies:
# Skip replication if next_part_power is set. In this case
# every object is hard-linked twice, but the replicator can't
# detect them and would create a second copy of the file if not
@ -891,15 +897,14 @@ class ObjectReplicator(Daemon):
policy.name)
continue
if policy.policy_type == REPL_POLICY:
if (override_policies is not None and
policy.idx not in override_policies):
continue
# ensure rings are loaded for policy
self.load_object_ring(policy)
jobs += self.build_replication_jobs(
policy, ips, override_devices=override_devices,
override_partitions=override_partitions)
if (override_policies is not None and
policy.idx not in override_policies):
continue
# ensure rings are loaded for policy
self.load_object_ring(policy)
jobs += self.build_replication_jobs(
policy, ips, override_devices=override_devices,
override_partitions=override_partitions)
random.shuffle(jobs)
if self.handoffs_first:
# Move the handoff parts to the front of the list

View File

@ -215,6 +215,7 @@ class FakeRing(Ring):
def __init__(self, replicas=3, max_more_nodes=0, part_power=0,
base_port=1000):
self.serialized_path = '/foo/bar/object.ring.gz'
self._base_port = base_port
self.max_more_nodes = max_more_nodes
self._part_shift = 32 - part_power

View File

@ -1811,14 +1811,18 @@ class TestWorkerReconstructor(unittest.TestCase):
logger=self.logger)
# file does not exist to start
self.assertFalse(os.path.exists(self.rcache))
self.assertTrue(reconstructor.is_healthy())
with mock.patch('swift.obj.reconstructor.os.path.getmtime',
return_value=10):
self.assertTrue(reconstructor.is_healthy())
# ... and isn't created until _next_rcache_update
self.assertFalse(os.path.exists(self.rcache))
# ... but if we wait 5 mins (by default)
orig_next_update = reconstructor._next_rcache_update
with mock.patch('swift.obj.reconstructor.time.time',
return_value=now + 301):
self.assertTrue(reconstructor.is_healthy())
with mock.patch('swift.obj.reconstructor.os.path.getmtime',
return_value=11):
self.assertTrue(reconstructor.is_healthy())
self.assertGreater(reconstructor._next_rcache_update, orig_next_update)
# ... it will be created
self.assertTrue(os.path.exists(self.rcache))
@ -1831,13 +1835,19 @@ class TestWorkerReconstructor(unittest.TestCase):
reconstructor = object_reconstructor.ObjectReconstructor(
{'recon_cache_path': self.recon_cache_path},
logger=self.logger)
self.assertTrue(reconstructor.is_healthy())
with mock.patch('swift.obj.reconstructor.os.path.getmtime',
return_value=10):
self.assertTrue(reconstructor.is_healthy())
reconstructor.get_local_devices = lambda: {
'sdb%d' % p for p in reconstructor.policies}
self.assertFalse(reconstructor.is_healthy())
with mock.patch('swift.obj.reconstructor.os.path.getmtime',
return_value=11):
self.assertFalse(reconstructor.is_healthy())
reconstructor.all_local_devices = {
'sdb%d' % p for p in reconstructor.policies}
self.assertTrue(reconstructor.is_healthy())
with mock.patch('swift.obj.reconstructor.os.path.getmtime',
return_value=12):
self.assertTrue(reconstructor.is_healthy())
def test_is_healthy_detects_ring_change(self):
reconstructor = object_reconstructor.ObjectReconstructor(
@ -1850,13 +1860,26 @@ class TestWorkerReconstructor(unittest.TestCase):
self.assertEqual(14, len(p.object_ring.devs)) # sanity check
worker_args = list(reconstructor.get_worker_args())
self.assertFalse(worker_args[0]['override_devices']) # no local devs
self.assertTrue(reconstructor.is_healthy())
with mock.patch('swift.obj.reconstructor.os.path.getmtime',
return_value=10):
self.assertTrue(reconstructor.is_healthy())
# expand ring - now there are local devices
p.object_ring.set_replicas(28)
self.assertEqual(28, len(p.object_ring.devs)) # sanity check
self.assertFalse(reconstructor.is_healthy())
# If ring.gz mtime did not change, there is no change to detect
with mock.patch('swift.obj.reconstructor.os.path.getmtime',
return_value=10):
self.assertTrue(reconstructor.is_healthy())
# Now, ring.gz mtime changed, so the change will be detected
with mock.patch('swift.obj.reconstructor.os.path.getmtime',
return_value=11):
self.assertFalse(reconstructor.is_healthy())
self.assertNotEqual(worker_args, list(reconstructor.get_worker_args()))
self.assertTrue(reconstructor.is_healthy())
with mock.patch('swift.obj.reconstructor.os.path.getmtime',
return_value=12):
self.assertTrue(reconstructor.is_healthy())
def test_final_recon_dump(self):
reconstructor = object_reconstructor.ObjectReconstructor(