Merge "Fix SSYNC concurrency on partition"

This commit is contained in:
Zuul 2018-12-11 23:34:54 +00:00 committed by Gerrit Code Review
commit b9d2c08e8d
6 changed files with 139 additions and 20 deletions

View File

@ -2585,7 +2585,7 @@ def _get_any_lock(fds):
@contextmanager
def lock_path(directory, timeout=10, timeout_class=None, limit=1):
def lock_path(directory, timeout=10, timeout_class=None, limit=1, name=None):
"""
Context manager that acquires a lock on a directory. This will block until
the lock can be acquired, or the timeout time has expired (whichever occurs
@ -2605,6 +2605,8 @@ def lock_path(directory, timeout=10, timeout_class=None, limit=1):
the same directory at the time this method is called. Note that this
limit is only applied during the current call to this method and does
not prevent subsequent calls giving a larger limit. Defaults to 1.
:param name: A string to distinguishes different type of locks in a
directory
:raises TypeError: if limit is not an int.
:raises ValueError: if limit is less than 1.
"""
@ -2614,6 +2616,8 @@ def lock_path(directory, timeout=10, timeout_class=None, limit=1):
timeout_class = swift.common.exceptions.LockTimeout
mkdirs(directory)
lockpath = '%s/.lock' % directory
if name:
lockpath += '-%s' % str(name)
fds = [os.open(get_zero_indexed_base_string(lockpath, i),
os.O_WRONLY | os.O_CREAT)
for i in range(limit)]

View File

@ -1267,23 +1267,34 @@ class BaseDiskFileManager(object):
return None
@contextmanager
def replication_lock(self, device):
def replication_lock(self, device, policy, partition):
"""
A context manager that will lock on the device given, if
configured to do so.
:param device: name of target device
:param policy: policy targeted by the replication request
:param partition: partition targeted by the replication request
:raises ReplicationLockTimeout: If the lock on the device
cannot be granted within the configured timeout.
"""
if self.replication_concurrency_per_device:
dev_path = self.get_dev_path(device)
part_path = os.path.join(dev_path, get_data_dir(policy),
str(partition))
limit_time = time.time() + self.replication_lock_timeout
with lock_path(
dev_path,
timeout=self.replication_lock_timeout,
timeout_class=ReplicationLockTimeout,
limit=self.replication_concurrency_per_device):
yield True
with lock_path(
part_path,
timeout=limit_time - time.time(),
timeout_class=ReplicationLockTimeout,
limit=1,
name='replication'):
yield True
else:
yield True
@ -1574,6 +1585,8 @@ class BaseDiskFileManager(object):
# This lock is only held by people dealing with the hashes
# or the hash invalidations, and we've just removed those.
_unlink_if_present(os.path.join(partition_path, ".lock"))
_unlink_if_present(os.path.join(partition_path,
".lock-replication"))
os.rmdir(partition_path)
except OSError as err:
self.logger.debug("Error cleaning up empty partition: %s", err)

View File

@ -158,7 +158,9 @@ class Receiver(object):
if not self.app.replication_semaphore.acquire(False):
raise swob.HTTPServiceUnavailable()
try:
with self.diskfile_mgr.replication_lock(self.device):
with self.diskfile_mgr.replication_lock(self.device,
self.policy,
self.partition):
for data in self.missing_check():
yield data
for data in self.updates():

View File

@ -1072,6 +1072,28 @@ class TestUtils(unittest.TestCase):
self.assertTrue(exc2 is not None)
self.assertTrue(not success)
@with_tempdir
def test_lock_path_name(self, tmpdir):
# With default limit (1), can't take the same named lock twice
success = False
with utils.lock_path(tmpdir, 0.1, name='foo'):
with self.assertRaises(LockTimeout):
with utils.lock_path(tmpdir, 0.1, name='foo'):
success = True
self.assertFalse(success)
# With default limit (1), can take two differently named locks
success = False
with utils.lock_path(tmpdir, 0.1, name='foo'):
with utils.lock_path(tmpdir, 0.1, name='bar'):
success = True
self.assertTrue(success)
# With default limit (1), can take a named lock and the default lock
success = False
with utils.lock_path(tmpdir, 0.1, name='foo'):
with utils.lock_path(tmpdir, 0.1):
success = True
self.assertTrue(success)
def test_normalize_timestamp(self):
# Test swift.common.utils.normalize_timestamp
self.assertEqual(utils.normalize_timestamp('1253327593.48174'),

View File

@ -1080,9 +1080,11 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin):
self.df_mgr.replication_concurrency_per_device = 1
self.df_mgr.replication_lock_timeout = 0.1
success = False
with self.df_mgr.replication_lock(self.existing_device):
with self.df_mgr.replication_lock(self.existing_device,
POLICIES.legacy, '1'):
with self.assertRaises(ReplicationLockTimeout):
with self.df_mgr.replication_lock(self.existing_device):
with self.df_mgr.replication_lock(self.existing_device,
POLICIES.legacy, '2'):
success = True
self.assertFalse(success)
@ -1093,9 +1095,11 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin):
# 2 locks must succeed
success = False
with self.df_mgr.replication_lock(self.existing_device):
with self.df_mgr.replication_lock(self.existing_device,
POLICIES.legacy, '1'):
try:
with self.df_mgr.replication_lock(self.existing_device):
with self.df_mgr.replication_lock(self.existing_device,
POLICIES.legacy, '2'):
success = True
except ReplicationLockTimeout as err:
self.fail('Unexpected exception: %s' % err)
@ -1103,10 +1107,13 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin):
# 3 locks must succeed
success = False
with self.df_mgr.replication_lock(self.existing_device):
with self.df_mgr.replication_lock(self.existing_device):
with self.df_mgr.replication_lock(self.existing_device,
POLICIES.legacy, '1'):
with self.df_mgr.replication_lock(self.existing_device,
POLICIES.legacy, '2'):
try:
with self.df_mgr.replication_lock(self.existing_device):
with self.df_mgr.replication_lock(self.existing_device,
POLICIES.legacy, '3'):
success = True
except ReplicationLockTimeout as err:
self.fail('Unexpected exception: %s' % err)
@ -1119,9 +1126,11 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin):
# 2 locks with replication_concurrency_per_device=2 must succeed
success = False
with self.df_mgr.replication_lock(self.existing_device):
with self.df_mgr.replication_lock(self.existing_device,
POLICIES.legacy, '1'):
try:
with self.df_mgr.replication_lock(self.existing_device):
with self.df_mgr.replication_lock(self.existing_device,
POLICIES.legacy, '2'):
success = True
except ReplicationLockTimeout as err:
self.fail('Unexpected exception: %s' % err)
@ -1129,10 +1138,13 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin):
# 3 locks with replication_concurrency_per_device=2 must fail
success = False
with self.df_mgr.replication_lock(self.existing_device):
with self.df_mgr.replication_lock(self.existing_device):
with self.df_mgr.replication_lock(self.existing_device,
POLICIES.legacy, '1'):
with self.df_mgr.replication_lock(self.existing_device,
POLICIES.legacy, '2'):
with self.assertRaises(ReplicationLockTimeout):
with self.df_mgr.replication_lock(self.existing_device):
with self.df_mgr.replication_lock(self.existing_device,
POLICIES.legacy, '3'):
success = True
self.assertFalse(success)
@ -1141,14 +1153,29 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin):
self.df_mgr.replication_concurrency_per_device = 1
self.df_mgr.replication_lock_timeout = 0.1
success = False
with self.df_mgr.replication_lock(self.existing_device):
with self.df_mgr.replication_lock(self.existing_device,
POLICIES.legacy, '1'):
try:
with self.df_mgr.replication_lock(self.existing_device2):
with self.df_mgr.replication_lock(self.existing_device2,
POLICIES.legacy, '2'):
success = True
except ReplicationLockTimeout as err:
self.fail('Unexpected exception: %s' % err)
self.assertTrue(success)
def test_replication_lock_same_partition(self):
# Double check settings
self.df_mgr.replication_concurrency_per_device = 2
self.df_mgr.replication_lock_timeout = 0.1
success = False
with self.df_mgr.replication_lock(self.existing_device,
POLICIES.legacy, '1'):
with self.assertRaises(ReplicationLockTimeout):
with self.df_mgr.replication_lock(self.existing_device,
POLICIES.legacy, '1'):
success = True
self.assertFalse(success)
def test_missing_splice_warning(self):
with mock.patch('swift.common.splice.splice._c_splice', None):
self.conf['splice'] = 'yes'

View File

@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import itertools
import os
import shutil
import tempfile
@ -131,7 +132,9 @@ class TestReceiver(unittest.TestCase):
[':MISSING_CHECK: START', ':MISSING_CHECK: END',
':UPDATES: START', ':UPDATES: END'])
self.assertEqual(resp.status_int, 200)
mocked_replication_lock.assert_called_once_with('sda1')
mocked_replication_lock.assert_called_once_with('sda1',
POLICIES.legacy,
'1')
def test_Receiver_with_default_storage_policy(self):
req = swob.Request.blank(
@ -290,7 +293,7 @@ class TestReceiver(unittest.TestCase):
self.controller, req)
def test_SSYNC_replication_lock_fail(self):
def _mock(path):
def _mock(path, policy, partition):
with exceptions.ReplicationLockTimeout(0.01, '/somewhere/' + path):
eventlet.sleep(0.05)
with mock.patch.object(
@ -311,6 +314,54 @@ class TestReceiver(unittest.TestCase):
'None/sda1/1 SSYNC LOCK TIMEOUT: 0.01 seconds: '
'/somewhere/sda1')
def test_SSYNC_replication_lock_per_partition(self):
def _concurrent_ssync(path1, path2):
env = {'REQUEST_METHOD': 'SSYNC'}
body = ':MISSING_CHECK: START\r\n:MISSING_CHECK: END\r\n' \
':UPDATES: START\r\n:UPDATES: END\r\n'
req1 = swob.Request.blank(path1, environ=env, body=body)
req2 = swob.Request.blank(path2, environ=env, body=body)
rcvr1 = ssync_receiver.Receiver(self.controller, req1)
rcvr2 = ssync_receiver.Receiver(self.controller, req2)
body_lines1 = []
body_lines2 = []
for chunk1, chunk2 in itertools.izip_longest(rcvr1(), rcvr2()):
if chunk1 and chunk1.strip():
body_lines1.append(chunk1.strip())
if chunk2 and chunk2.strip():
body_lines2.append(chunk2.strip())
return body_lines1, body_lines2
self.controller._diskfile_router[POLICIES[0]]\
.replication_lock_timeout = 0.01
self.controller._diskfile_router[POLICIES[0]]\
.replication_concurrency_per_device = 2
# It should be possible to lock two different partitions
body_lines1, body_lines2 = _concurrent_ssync('/sda1/1', '/sda1/2')
self.assertEqual(
body_lines1,
[':MISSING_CHECK: START', ':MISSING_CHECK: END',
':UPDATES: START', ':UPDATES: END'])
self.assertEqual(
body_lines2,
[':MISSING_CHECK: START', ':MISSING_CHECK: END',
':UPDATES: START', ':UPDATES: END'])
# It should not be possible to lock the same partition twice
body_lines1, body_lines2 = _concurrent_ssync('/sda1/1', '/sda1/1')
self.assertEqual(
body_lines1,
[':MISSING_CHECK: START', ':MISSING_CHECK: END',
':UPDATES: START', ':UPDATES: END'])
self.assertRegexpMatches(
''.join(body_lines2),
"^:ERROR: 0 '0\.0[0-9]+ seconds: "
"/.+/sda1/objects/1/.lock-replication'$")
def test_SSYNC_initial_path(self):
with mock.patch.object(
self.controller, 'replication_semaphore') as \