Merge "Optimize replication of targeted devices/partitions."

This commit is contained in:
Jenkins 2015-01-26 14:04:17 +00:00 committed by Gerrit Code Review
commit 42ee551c1a
1 changed files with 31 additions and 16 deletions

View File

@ -406,16 +406,21 @@ class ObjectReplicator(Daemon):
self.kill_coros()
self.last_replication_count = self.replication_count
def process_repl(self, policy, jobs, ips):
def process_repl(self, policy, ips, override_devices=None,
override_partitions=None):
"""
Helper function for collect_jobs to build jobs for replication
using replication style storage policy
"""
jobs = []
obj_ring = self.get_object_ring(policy.idx)
data_dir = get_data_dir(policy.idx)
for local_dev in [dev for dev in obj_ring.devs
if dev and dev['replication_ip'] in ips and
dev['replication_port'] == self.port]:
if (dev
and dev['replication_ip'] in ips
and dev['replication_port'] == self.port
and (override_devices is None
or dev['device'] in override_devices))]:
dev_path = join(self.devices_dir, local_dev['device'])
obj_path = join(dev_path, data_dir)
tmp_path = join(dev_path, get_tmp_dir(int(policy)))
@ -430,6 +435,10 @@ class ObjectReplicator(Daemon):
self.logger.exception('ERROR creating %s' % obj_path)
continue
for partition in os.listdir(obj_path):
if (override_partitions is not None
and partition not in override_partitions):
continue
try:
job_path = join(obj_path, partition)
part_nodes = obj_ring.get_part_nodes(int(partition))
@ -445,17 +454,26 @@ class ObjectReplicator(Daemon):
object_ring=obj_ring))
except ValueError:
continue
return jobs
def collect_jobs(self):
def collect_jobs(self, override_devices=None, override_partitions=None):
"""
Returns a sorted list of jobs (dictionaries) that specify the
partitions, nodes, etc to be rsynced.
:param override_devices: if set, only jobs on these devices
will be returned
:param override_partitions: if set, only jobs on these partitions
will be returned
"""
jobs = []
ips = whataremyips()
for policy in POLICIES:
# may need to branch here for future policy types
self.process_repl(policy, jobs, ips)
jobs += self.process_repl(policy, ips,
override_devices=override_devices,
override_partitions=override_partitions)
random.shuffle(jobs)
if self.handoffs_first:
# Move the handoff parts to the front of the list
@ -473,24 +491,15 @@ class ObjectReplicator(Daemon):
self.last_replication_count = -1
self.partition_times = []
if override_devices is None:
override_devices = []
if override_partitions is None:
override_partitions = []
stats = eventlet.spawn(self.heartbeat)
lockup_detector = eventlet.spawn(self.detect_lockups)
eventlet.sleep() # Give spawns a cycle
try:
self.run_pool = GreenPool(size=self.concurrency)
jobs = self.collect_jobs()
jobs = self.collect_jobs(override_devices=override_devices,
override_partitions=override_partitions)
for job in jobs:
if override_devices and job['device'] not in override_devices:
continue
if override_partitions and \
job['partition'] not in override_partitions:
continue
dev_path = join(self.devices_dir, job['device'])
if self.mount_check and not ismount(dev_path):
self.logger.warn(_('%s is not mounted'), job['device'])
@ -527,8 +536,14 @@ class ObjectReplicator(Daemon):
def run_once(self, *args, **kwargs):
start = time.time()
self.logger.info(_("Running object replicator in script mode."))
override_devices = list_from_csv(kwargs.get('devices'))
override_partitions = list_from_csv(kwargs.get('partitions'))
if not override_devices:
override_devices = None
if not override_partitions:
override_partitions = None
self.replicate(
override_devices=override_devices,
override_partitions=override_partitions)