Merge "Optimize obj replicator/reconstructor healthchecks"
This commit is contained in:
commit
ede407d05f
|
@ -45,6 +45,7 @@ class Daemon(object):
|
|||
multiple daemonized workers, they simply provide the behavior of the daemon
|
||||
and context specific knowledge about how workers should be started.
|
||||
"""
|
||||
WORKERS_HEALTHCHECK_INTERVAL = 5.0
|
||||
|
||||
def __init__(self, conf):
|
||||
self.conf = conf
|
||||
|
@ -239,7 +240,7 @@ class DaemonStrategy(object):
|
|||
if not self.spawned_pids():
|
||||
self.logger.notice('Finished %s', os.getpid())
|
||||
break
|
||||
time.sleep(0.1)
|
||||
time.sleep(self.daemon.WORKERS_HEALTHCHECK_INTERVAL)
|
||||
self.daemon.post_multiprocess_run()
|
||||
return 0
|
||||
|
||||
|
|
|
@ -210,6 +210,7 @@ class ObjectReconstructor(Daemon):
|
|||
'rebuild_handoff_node_count', 2))
|
||||
self._df_router = DiskFileRouter(conf, self.logger)
|
||||
self.all_local_devices = self.get_local_devices()
|
||||
self.rings_mtime = None
|
||||
|
||||
def get_worker_args(self, once=False, **kwargs):
|
||||
"""
|
||||
|
@ -263,6 +264,11 @@ class ObjectReconstructor(Daemon):
|
|||
if now > self._next_rcache_update:
|
||||
self._next_rcache_update = now + self.stats_interval
|
||||
self.aggregate_recon_update()
|
||||
rings_mtime = [os.path.getmtime(self.load_object_ring(
|
||||
policy).serialized_path) for policy in self.policies]
|
||||
if self.rings_mtime == rings_mtime:
|
||||
return True
|
||||
self.rings_mtime = rings_mtime
|
||||
return self.get_local_devices() == self.all_local_devices
|
||||
|
||||
def aggregate_recon_update(self):
|
||||
|
|
|
@ -139,6 +139,8 @@ class ObjectReplicator(Daemon):
|
|||
int(conf.get('bind_port', 6200))
|
||||
self.concurrency = int(conf.get('concurrency', 1))
|
||||
self.replicator_workers = int(conf.get('replicator_workers', 0))
|
||||
self.policies = [policy for policy in POLICIES
|
||||
if policy.policy_type == REPL_POLICY]
|
||||
self.stats_interval = int(conf.get('stats_interval', '300'))
|
||||
self.ring_check_interval = int(conf.get('ring_check_interval', 15))
|
||||
self.next_check = time.time() + self.ring_check_interval
|
||||
|
@ -187,6 +189,7 @@ class ObjectReplicator(Daemon):
|
|||
self.is_multiprocess_worker = None
|
||||
self._df_router = DiskFileRouter(conf, self.logger)
|
||||
self._child_process_reaper_queue = queue.LightQueue()
|
||||
self.rings_mtime = None
|
||||
|
||||
def _zero_stats(self):
|
||||
self.stats_for_dev = defaultdict(Stats)
|
||||
|
@ -204,7 +207,7 @@ class ObjectReplicator(Daemon):
|
|||
def _get_my_replication_ips(self):
|
||||
my_replication_ips = set()
|
||||
ips = whataremyips()
|
||||
for policy in POLICIES:
|
||||
for policy in self.policies:
|
||||
self.load_object_ring(policy)
|
||||
for local_dev in [dev for dev in policy.object_ring.devs
|
||||
if dev and dev['replication_ip'] in ips and
|
||||
|
@ -291,6 +294,11 @@ class ObjectReplicator(Daemon):
|
|||
if time.time() >= self._next_rcache_update:
|
||||
update = self.aggregate_recon_update()
|
||||
dump_recon_cache(update, self.rcache, self.logger)
|
||||
rings_mtime = [os.path.getmtime(self.load_object_ring(
|
||||
policy).serialized_path) for policy in self.policies]
|
||||
if self.rings_mtime == rings_mtime:
|
||||
return True
|
||||
self.rings_mtime = rings_mtime
|
||||
return self.get_local_devices() == self.all_local_devices
|
||||
|
||||
def get_local_devices(self):
|
||||
|
@ -303,9 +311,7 @@ class ObjectReplicator(Daemon):
|
|||
"""
|
||||
ips = whataremyips(self.bind_ip)
|
||||
local_devices = set()
|
||||
for policy in POLICIES:
|
||||
if policy.policy_type != REPL_POLICY:
|
||||
continue
|
||||
for policy in self.policies:
|
||||
self.load_object_ring(policy)
|
||||
for device in policy.object_ring.devs:
|
||||
if device and is_local_device(
|
||||
|
@ -877,7 +883,7 @@ class ObjectReplicator(Daemon):
|
|||
"""
|
||||
jobs = []
|
||||
ips = whataremyips(self.bind_ip)
|
||||
for policy in POLICIES:
|
||||
for policy in self.policies:
|
||||
# Skip replication if next_part_power is set. In this case
|
||||
# every object is hard-linked twice, but the replicator can't
|
||||
# detect them and would create a second copy of the file if not
|
||||
|
@ -891,15 +897,14 @@ class ObjectReplicator(Daemon):
|
|||
policy.name)
|
||||
continue
|
||||
|
||||
if policy.policy_type == REPL_POLICY:
|
||||
if (override_policies is not None and
|
||||
policy.idx not in override_policies):
|
||||
continue
|
||||
# ensure rings are loaded for policy
|
||||
self.load_object_ring(policy)
|
||||
jobs += self.build_replication_jobs(
|
||||
policy, ips, override_devices=override_devices,
|
||||
override_partitions=override_partitions)
|
||||
if (override_policies is not None and
|
||||
policy.idx not in override_policies):
|
||||
continue
|
||||
# ensure rings are loaded for policy
|
||||
self.load_object_ring(policy)
|
||||
jobs += self.build_replication_jobs(
|
||||
policy, ips, override_devices=override_devices,
|
||||
override_partitions=override_partitions)
|
||||
random.shuffle(jobs)
|
||||
if self.handoffs_first:
|
||||
# Move the handoff parts to the front of the list
|
||||
|
|
|
@ -215,6 +215,7 @@ class FakeRing(Ring):
|
|||
|
||||
def __init__(self, replicas=3, max_more_nodes=0, part_power=0,
|
||||
base_port=1000):
|
||||
self.serialized_path = '/foo/bar/object.ring.gz'
|
||||
self._base_port = base_port
|
||||
self.max_more_nodes = max_more_nodes
|
||||
self._part_shift = 32 - part_power
|
||||
|
|
|
@ -1811,14 +1811,18 @@ class TestWorkerReconstructor(unittest.TestCase):
|
|||
logger=self.logger)
|
||||
# file does not exist to start
|
||||
self.assertFalse(os.path.exists(self.rcache))
|
||||
self.assertTrue(reconstructor.is_healthy())
|
||||
with mock.patch('swift.obj.reconstructor.os.path.getmtime',
|
||||
return_value=10):
|
||||
self.assertTrue(reconstructor.is_healthy())
|
||||
# ... and isn't created until _next_rcache_update
|
||||
self.assertFalse(os.path.exists(self.rcache))
|
||||
# ... but if we wait 5 mins (by default)
|
||||
orig_next_update = reconstructor._next_rcache_update
|
||||
with mock.patch('swift.obj.reconstructor.time.time',
|
||||
return_value=now + 301):
|
||||
self.assertTrue(reconstructor.is_healthy())
|
||||
with mock.patch('swift.obj.reconstructor.os.path.getmtime',
|
||||
return_value=11):
|
||||
self.assertTrue(reconstructor.is_healthy())
|
||||
self.assertGreater(reconstructor._next_rcache_update, orig_next_update)
|
||||
# ... it will be created
|
||||
self.assertTrue(os.path.exists(self.rcache))
|
||||
|
@ -1831,13 +1835,19 @@ class TestWorkerReconstructor(unittest.TestCase):
|
|||
reconstructor = object_reconstructor.ObjectReconstructor(
|
||||
{'recon_cache_path': self.recon_cache_path},
|
||||
logger=self.logger)
|
||||
self.assertTrue(reconstructor.is_healthy())
|
||||
with mock.patch('swift.obj.reconstructor.os.path.getmtime',
|
||||
return_value=10):
|
||||
self.assertTrue(reconstructor.is_healthy())
|
||||
reconstructor.get_local_devices = lambda: {
|
||||
'sdb%d' % p for p in reconstructor.policies}
|
||||
self.assertFalse(reconstructor.is_healthy())
|
||||
with mock.patch('swift.obj.reconstructor.os.path.getmtime',
|
||||
return_value=11):
|
||||
self.assertFalse(reconstructor.is_healthy())
|
||||
reconstructor.all_local_devices = {
|
||||
'sdb%d' % p for p in reconstructor.policies}
|
||||
self.assertTrue(reconstructor.is_healthy())
|
||||
with mock.patch('swift.obj.reconstructor.os.path.getmtime',
|
||||
return_value=12):
|
||||
self.assertTrue(reconstructor.is_healthy())
|
||||
|
||||
def test_is_healthy_detects_ring_change(self):
|
||||
reconstructor = object_reconstructor.ObjectReconstructor(
|
||||
|
@ -1850,13 +1860,26 @@ class TestWorkerReconstructor(unittest.TestCase):
|
|||
self.assertEqual(14, len(p.object_ring.devs)) # sanity check
|
||||
worker_args = list(reconstructor.get_worker_args())
|
||||
self.assertFalse(worker_args[0]['override_devices']) # no local devs
|
||||
self.assertTrue(reconstructor.is_healthy())
|
||||
with mock.patch('swift.obj.reconstructor.os.path.getmtime',
|
||||
return_value=10):
|
||||
self.assertTrue(reconstructor.is_healthy())
|
||||
# expand ring - now there are local devices
|
||||
p.object_ring.set_replicas(28)
|
||||
self.assertEqual(28, len(p.object_ring.devs)) # sanity check
|
||||
self.assertFalse(reconstructor.is_healthy())
|
||||
|
||||
# If ring.gz mtime did not change, there is no change to detect
|
||||
with mock.patch('swift.obj.reconstructor.os.path.getmtime',
|
||||
return_value=10):
|
||||
self.assertTrue(reconstructor.is_healthy())
|
||||
# Now, ring.gz mtime changed, so the change will be detected
|
||||
with mock.patch('swift.obj.reconstructor.os.path.getmtime',
|
||||
return_value=11):
|
||||
self.assertFalse(reconstructor.is_healthy())
|
||||
|
||||
self.assertNotEqual(worker_args, list(reconstructor.get_worker_args()))
|
||||
self.assertTrue(reconstructor.is_healthy())
|
||||
with mock.patch('swift.obj.reconstructor.os.path.getmtime',
|
||||
return_value=12):
|
||||
self.assertTrue(reconstructor.is_healthy())
|
||||
|
||||
def test_final_recon_dump(self):
|
||||
reconstructor = object_reconstructor.ObjectReconstructor(
|
||||
|
|
Loading…
Reference in New Issue