Remove sharding_lock()
This lock was taken two places: 1. when cleaving into a shard broker 2. when removing misplaced objects from a shard broker These two operations would not execute concurrently if there is just one sharder process running. If there were more than one sharder process, each processing different nodes, then they might visit the same db when one is visiting the parent container and cleaving to the shard and one is visiting the shard container and handling misplaced objects. However, any objects merged by the cleaving process will not be removed by the misplaced object process because the removal is limited to a max row count that is sampled at the start of the misplaced objects handling. It is therefore not necessary to protect these operations with a lock. Change-Id: Icb3f9d8843b0fe601a32006adb4dcb29779c8a06
This commit is contained in:
parent
55ebf422f8
commit
d9be8caae2
|
@ -20,7 +20,6 @@ import errno
|
|||
|
||||
import os
|
||||
from uuid import uuid4
|
||||
from contextlib import contextmanager
|
||||
|
||||
import six
|
||||
import six.moves.cPickle as pickle
|
||||
|
@ -1801,20 +1800,6 @@ class ContainerBroker(DatabaseBroker):
|
|||
return [dict(sr, record_type=RECORD_TYPE_SHARD_NODE)
|
||||
for sr in shard_ranges]
|
||||
|
||||
@contextmanager
|
||||
def sharding_lock(self):
|
||||
lockpath = '%s/.sharding' % self.db_dir
|
||||
try:
|
||||
fd = os.open(lockpath, os.O_WRONLY | os.O_CREAT)
|
||||
yield fd
|
||||
finally:
|
||||
os.close(fd)
|
||||
os.unlink(lockpath)
|
||||
|
||||
def has_sharding_lock(self):
|
||||
lockpath = '%s/.sharding' % self.db_dir
|
||||
return os.path.exists(lockpath)
|
||||
|
||||
def set_sharding_state(self, epoch=None):
|
||||
epoch = epoch or self.get_own_shard_range().epoch
|
||||
if not epoch:
|
||||
|
|
|
@ -339,9 +339,6 @@ class ContainerReplicator(db_replicator.Replicator):
|
|||
return super(ContainerReplicator, self)._in_sync(
|
||||
rinfo, info, broker, local_sync)
|
||||
|
||||
def _is_locked(self, broker):
|
||||
return broker.has_sharding_lock()
|
||||
|
||||
|
||||
class ContainerReplicatorRpc(db_replicator.ReplicatorRpc):
|
||||
|
||||
|
|
|
@ -803,22 +803,20 @@ class ContainerSharder(ContainerReplicator):
|
|||
'(not removing)', dest_shard_range, broker.path)
|
||||
return False
|
||||
|
||||
with broker.sharding_lock():
|
||||
# TODO: check we're actually contending for this lock when
|
||||
# modifying the broker dbs
|
||||
if broker.get_info()['id'] != info['id']:
|
||||
# the db changed - don't remove any objects
|
||||
success = False
|
||||
else:
|
||||
# remove objects up to the max row of the db sampled prior to
|
||||
# the first object yielded for this destination; objects added
|
||||
# after that point may not have been yielded and replicated so
|
||||
# it is not safe to remove them yet
|
||||
broker.remove_objects(
|
||||
dest_shard_range.lower_str,
|
||||
dest_shard_range.upper_str,
|
||||
max_row=info['max_row'])
|
||||
success = True
|
||||
if broker.get_info()['id'] != info['id']:
|
||||
# the db changed - don't remove any objects
|
||||
success = False
|
||||
else:
|
||||
# remove objects up to the max row of the db sampled prior to
|
||||
# the first object yielded for this destination; objects added
|
||||
# after that point may not have been yielded and replicated so
|
||||
# it is not safe to remove them yet
|
||||
broker.remove_objects(
|
||||
dest_shard_range.lower_str,
|
||||
dest_shard_range.upper_str,
|
||||
max_row=info['max_row'])
|
||||
success = True
|
||||
|
||||
if not success:
|
||||
self.logger.warning(
|
||||
'Refused to remove misplaced objects: %s in %s',
|
||||
|
@ -1090,28 +1088,27 @@ class ContainerSharder(ContainerReplicator):
|
|||
# only cleave from the retiring db - misplaced objects handler will
|
||||
# deal with any objects in the fresh db
|
||||
source_broker = broker.get_brokers()[0]
|
||||
with shard_broker.sharding_lock():
|
||||
# if this range has been cleaved before but replication
|
||||
# failed then the shard db may still exist and it may not be
|
||||
# necessary to merge all the rows again
|
||||
source_db_id = source_broker.get_info()['id']
|
||||
source_max_row = source_broker.get_max_row()
|
||||
sync_point = shard_broker.get_sync(source_db_id)
|
||||
if sync_point < source_max_row:
|
||||
sync_from_row = max(cleaving_context.last_cleave_to_row,
|
||||
sync_point)
|
||||
for objects, info in self.yield_objects(
|
||||
source_broker, shard_range,
|
||||
since_row=sync_from_row):
|
||||
shard_broker.merge_items(objects)
|
||||
# note: the max row stored as a sync point is sampled *before*
|
||||
# objects are yielded to ensure that is less than or equal to
|
||||
# the last yielded row
|
||||
shard_broker.merge_syncs([{'sync_point': source_max_row,
|
||||
'remote_id': source_db_id}])
|
||||
else:
|
||||
self.logger.debug("Cleaving '%s': %r - already in sync",
|
||||
broker.path, shard_range)
|
||||
# if this range has been cleaved before but replication
|
||||
# failed then the shard db may still exist and it may not be
|
||||
# necessary to merge all the rows again
|
||||
source_db_id = source_broker.get_info()['id']
|
||||
source_max_row = source_broker.get_max_row()
|
||||
sync_point = shard_broker.get_sync(source_db_id)
|
||||
if sync_point < source_max_row:
|
||||
sync_from_row = max(cleaving_context.last_cleave_to_row,
|
||||
sync_point)
|
||||
for objects, info in self.yield_objects(
|
||||
source_broker, shard_range,
|
||||
since_row=sync_from_row):
|
||||
shard_broker.merge_items(objects)
|
||||
# note: the max row stored as a sync point is sampled *before*
|
||||
# objects are yielded to ensure that is less than or equal to
|
||||
# the last yielded row
|
||||
shard_broker.merge_syncs([{'sync_point': source_max_row,
|
||||
'remote_id': source_db_id}])
|
||||
else:
|
||||
self.logger.debug("Cleaving '%s': %r - already in sync",
|
||||
broker.path, shard_range)
|
||||
|
||||
own_shard_range = broker.get_own_shard_range()
|
||||
if shard_range.includes(own_shard_range):
|
||||
|
|
Loading…
Reference in New Issue