Merge "Add support to increase object ring partition power"

This commit is contained in:
Jenkins 2017-07-05 14:40:42 +00:00 committed by Gerrit Code Review
commit e94b383655
23 changed files with 1557 additions and 34 deletions

39
bin/swift-object-relinker Executable file
View File

@ -0,0 +1,39 @@
#!/usr/bin/env python
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import sys
from swift.cli.relinker import main
if __name__ == '__main__':
parser = argparse.ArgumentParser(
description='Relink and cleanup objects to increase partition power')
parser.add_argument('action', choices=['relink', 'cleanup'])
parser.add_argument('--swift-dir', default='/etc/swift',
dest='swift_dir', help='Path to swift directory')
parser.add_argument('--devices', default='/srv/node',
dest='devices', help='Path to swift device directory')
parser.add_argument('--skip-mount-check', default=False,
action="store_true", dest='skip_mount_check')
parser.add_argument('--logfile', default=None,
dest='logfile')
parser.add_argument('--debug', default=False, action='store_true')
args = parser.parse_args()
sys.exit(main(args))

View File

@ -62,6 +62,7 @@ Overview and Concepts
overview_encryption
overview_backing_store
ring_background
ring_partpower
associated_projects
Developer Documentation

View File

@ -0,0 +1,184 @@
==============================
Modifying Ring Partition Power
==============================
The ring partition power determines the on-disk location of data files and is
selected when creating a new ring. In normal operation, it is a fixed value.
This is because a different partition power results in a different on-disk
location for all data files.
However, increasing the partition power by 1 can be done by choosing locations
that are on the same disk. As a result, we can create hard-links for both the
new and old locations, avoiding data movement without impacting availability.
To enable a partition power change without interrupting user access, object
servers need to be aware of it in advance. Therefore a partition power change
needs to be done in multiple steps.
.. note::
Do not increase the partition power on account and container rings.
Increasing the partition power is *only* supported for object rings.
Trying to increase the part_power for account and container rings *will*
result in unavailability, maybe even data loss.
-------
Caveats
-------
Before increasing the partition power, consider the possible drawbacks.
There are a few caveats when increasing the partition power:
* All hashes.pkl files will become invalid once hard links are created, and the
replicators will need significantly more time on the first run after finishing
the partition power increase.
* Object replicators will skip partitions during the partition power increase.
Replicators are not aware of hard-links, and would simply copy the content;
this would result in heavy data movement and the worst case would be that all
data is stored twice.
* Due to the fact that each object will now be hard linked from two locations,
many more inodes will be used - expect around twice the amount. You need to
check the free inode count *before* increasing the partition power.
* Also, object auditors might read each object twice before cleanup removes the
second hard link.
* Due to the new inodes more memory is needed to cache them, and your
object servers should have plenty of available memory to avoid running out of
inode cache. Setting ``vfs_cache_pressure`` to 1 might help with that.
* All nodes in the cluster *must* run at least Swift version 2.13.0 or later.
Due to these caveats you should only increase the partition power if really
needed, i.e. if the number of partitions per disk is extremely low and the data
is distributed unevenly across disks.
-----------------------------------
1. Prepare partition power increase
-----------------------------------
The swift-ring-builder is used to prepare the ring for an upcoming partition
power increase. It will store a new variable ``next_part_power`` with the current
partition power + 1. Object servers recognize this, and hard links to the new
location will be created (or deleted) on every PUT or DELETE. This will make
it possible to access newly written objects using the future partition power::
swift-ring-builder <builder-file> prepare_increase_partition_power
swift-ring-builder <builder-file> write_ring
Now you need to copy the updated .ring.gz to all nodes. Already existing data
needs to be relinked too; therefore an operator has to run a relinker command
on all object servers in this phase::
swift-object-relinker relink
.. note::
Start relinking after *all* the servers re-read the modified ring files,
which normally happens within 15 seconds after writing a modified ring.
Also, make sure the modified rings are pushed to all nodes running object
services (replicators, reconstructors and reconcilers)- they have to skip
partitions during relinking.
Relinking might take some time; while there is no data copied or actually
moved, the tool still needs to walk the whole file system and create new hard
links as required.
---------------------------
2. Increase partition power
---------------------------
Now that all existing data can be found using the new location, it's time to
actually increase the partition power itself::
swift-ring-builder <builder-file> increase_partition_power
swift-ring-builder <builder-file> write_ring
Now you need to copy the updated .ring.gz again to all nodes. Object servers
are now using the new, increased partition power and no longer create
additional hard links.
.. note::
The object servers will create additional hard links for each modified or
new object, and this requires more inodes.
.. note::
If you decide you don't want to increase the partition power, you should
instead cancel the increase. It is not possible to revert this operation
once started. To abort the partition power increase, execute the following
commands, copy the updated .ring.gz files to all nodes and continue with
`3. Cleanup`_ afterwards::
swift-ring-builder <builder-file> cancel_increase_partition_power
swift-ring-builder <builder-file> write_ring
----------
3. Cleanup
----------
Existing hard links in the old locations need to be removed, and a cleanup tool
is provided to do this. Run the following command on each storage node::
swift-object-relinker cleanup
.. note::
The cleanup must be finished within your object servers reclaim_age period
(which is by default 1 week). Otherwise objects that have been overwritten
between step #1 and step #2 and deleted afterwards can't be cleaned up
anymore.
Afterwards it is required to update the rings one last
time to inform servers that all steps to increase the partition power are done,
and replicators should resume their job::
swift-ring-builder <builder-file> finish_increase_partition_power
swift-ring-builder <builder-file> write_ring
Now you need to copy the updated .ring.gz again to all nodes.
----------
Background
----------
An existing object that is currently located on partition X will be placed
either on partition 2*X or 2*X+1 after the partition power is increased. The
reason for this is the Ring.get_part() method, that does a bitwise shift to the
right.
To avoid actual data movement to different disks or even nodes, the allocation
of partitions to nodes needs to be changed. The allocation is pairwise due to
the above mentioned new partition scheme. Therefore devices are allocated like
this, with the partition being the index and the value being the device id::
old new
part dev part dev
---- --- ---- ---
0 0 0 0
1 0
1 3 2 3
3 3
2 7 4 7
5 7
3 5 6 5
7 5
4 2 8 2
9 2
5 1 10 1
11 1
There is a helper method to compute the new path, and the following example
shows the mapping between old and new location::
>>> from swift.common.utils import replace_partition_in_path
>>> old='objects/16003/a38/fa0fcec07328d068e24ccbf2a62f2a38/1467658208.57179.data'
>>> replace_partition_in_path(old, 14)
'objects/16003/a38/fa0fcec07328d068e24ccbf2a62f2a38/1467658208.57179.data'
>>> replace_partition_in_path(old, 15)
'objects/32007/a38/fa0fcec07328d068e24ccbf2a62f2a38/1467658208.57179.data'
Using the original partition power (14) it returned the same path; however
after an increase to 15 it returns the new path, and the new partition is 2*X+1
in this case.

View File

@ -51,6 +51,7 @@ scripts =
bin/swift-object-info
bin/swift-object-replicator
bin/swift-object-reconstructor
bin/swift-object-relinker
bin/swift-object-server
bin/swift-object-updater
bin/swift-oldies

159
swift/cli/relinker.py Normal file
View File

@ -0,0 +1,159 @@
#!/usr/bin/env python
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import os
from swift.common.storage_policy import POLICIES
from swift.common.exceptions import DiskFileDeleted, DiskFileNotExist, \
DiskFileQuarantined
from swift.common.utils import replace_partition_in_path, \
audit_location_generator, get_logger
from swift.obj import diskfile
def relink(swift_dir='/etc/swift',
devices='/srv/node',
skip_mount_check=False,
logger=logging.getLogger()):
mount_check = not skip_mount_check
run = False
relinked = errors = 0
for policy in POLICIES:
policy.object_ring = None # Ensure it will be reloaded
policy.load_ring(swift_dir)
part_power = policy.object_ring.part_power
next_part_power = policy.object_ring.next_part_power
if not next_part_power or next_part_power == part_power:
continue
logging.info('Relinking files for policy %s under %s',
policy.name, devices)
run = True
locations = audit_location_generator(
devices,
diskfile.get_data_dir(policy),
mount_check=mount_check)
for fname, _, _ in locations:
newfname = replace_partition_in_path(fname, next_part_power)
try:
diskfile.relink_paths(fname, newfname, check_existing=True)
relinked += 1
except OSError as exc:
errors += 1
logger.warning("Relinking %s to %s failed: %s",
fname, newfname, exc)
if not run:
logger.warning("No policy found to increase the partition power.")
return 2
logging.info('Relinked %d diskfiles (%d errors)', relinked, errors)
if errors > 0:
return 1
return 0
def cleanup(swift_dir='/etc/swift',
devices='/srv/node',
skip_mount_check=False,
logger=logging.getLogger()):
mount_check = not skip_mount_check
conf = {'devices': devices, 'mount_check': mount_check}
diskfile_router = diskfile.DiskFileRouter(conf, get_logger(conf))
errors = cleaned_up = 0
run = False
for policy in POLICIES:
policy.object_ring = None # Ensure it will be reloaded
policy.load_ring(swift_dir)
part_power = policy.object_ring.part_power
next_part_power = policy.object_ring.next_part_power
if not next_part_power or next_part_power != part_power:
continue
logging.info('Cleaning up files for policy %s under %s',
policy.name, devices)
run = True
locations = audit_location_generator(
devices,
diskfile.get_data_dir(policy),
mount_check=mount_check)
for fname, device, partition in locations:
expected_fname = replace_partition_in_path(fname, part_power)
if fname == expected_fname:
continue
# Make sure there is a valid object file in the expected new
# location. Note that this could be newer than the original one
# (which happens if there is another PUT after partition power
# has been increased, but cleanup did not yet run)
loc = diskfile.AuditLocation(
os.path.dirname(expected_fname), device, partition, policy)
diskfile_mgr = diskfile_router[policy]
df = diskfile_mgr.get_diskfile_from_audit_location(loc)
try:
with df.open():
pass
except DiskFileQuarantined as exc:
logger.warning('ERROR Object %(obj)s failed audit and was'
' quarantined: %(err)r',
{'obj': loc, 'err': exc})
errors += 1
continue
except DiskFileDeleted:
pass
except DiskFileNotExist as exc:
err = False
if policy.policy_type == 'erasure_coding':
# Might be a non-durable fragment - check that there is
# a fragment in the new path. Will be fixed by the
# reconstructor then
if not os.path.isfile(expected_fname):
err = True
else:
err = True
if err:
logger.warning(
'Error cleaning up %s: %r', fname, exc)
errors += 1
continue
try:
os.remove(fname)
cleaned_up += 1
logging.debug("Removed %s", fname)
except OSError as exc:
logger.warning('Error cleaning up %s: %r', fname, exc)
errors += 1
if not run:
logger.warning("No policy found to increase the partition power.")
return 2
logging.info('Cleaned up %d diskfiles (%d errors)', cleaned_up, errors)
if errors > 0:
return 1
return 0
def main(args):
logging.basicConfig(
format='%(message)s',
level=logging.DEBUG if args.debug else logging.INFO,
filename=args.logfile)
logger = logging.getLogger()
if args.action == 'relink':
return relink(
args.swift_dir, args.devices, args.skip_mount_check, logger)
if args.action == 'cleanup':
return cleanup(
args.swift_dir, args.devices, args.skip_mount_check, logger)

View File

@ -497,12 +497,14 @@ swift-ring-builder <builder_file>
print('The overload factor is %0.2f%% (%.6f)' % (
builder.overload * 100, builder.overload))
ring_dict = None
builder_dict = builder.get_ring().to_dict()
# compare ring file against builder file
if not exists(ring_file):
print('Ring file %s not found, '
'probably it hasn\'t been written yet' % ring_file)
else:
builder_dict = builder.get_ring().to_dict()
try:
ring_dict = RingData.load(ring_file).to_dict()
except Exception as exc:
@ -523,6 +525,24 @@ swift-ring-builder <builder_file>
):
flags = 'DEL' if dev in builder._remove_devs else ''
print_dev_f(dev, balance_per_dev[dev['id']], flags)
# Print some helpful info if partition power increase in progress
if (builder.next_part_power and
builder.next_part_power == (builder.part_power + 1)):
print('\nPreparing increase of partition power (%d -> %d)' % (
builder.part_power, builder.next_part_power))
print('Run "swift-object-relinker relink" on all nodes before '
'moving on to increase_partition_power.')
if (builder.next_part_power and
builder.part_power == builder.next_part_power):
print('\nIncreased partition power (%d -> %d)' % (
builder.part_power, builder.next_part_power))
if builder_dict != ring_dict:
print('First run "swift-ring-builder <builderfile> write_ring"'
' now and copy the updated .ring.gz file to all nodes.')
print('Run "swift-object-relinker cleanup" on all nodes before '
'moving on to finish_increase_partition_power.')
exit(EXIT_SUCCESS)
@staticmethod
@ -653,6 +673,11 @@ swift-ring-builder <builder_file> add
print(Commands.add.__doc__.strip())
exit(EXIT_ERROR)
if builder.next_part_power:
print('Partition power increase in progress. You need ')
print('to finish the increase first before adding devices.')
exit(EXIT_WARNING)
try:
for new_dev in _parse_add_values(argv[3:]):
for dev in builder.devs:
@ -798,6 +823,11 @@ swift-ring-builder <builder_file> remove
print(parse_search_value.__doc__.strip())
exit(EXIT_ERROR)
if builder.next_part_power:
print('Partition power increase in progress. You need ')
print('to finish the increase first before removing devices.')
exit(EXIT_WARNING)
devs, opts = _parse_remove_values(argv[3:])
input_question = 'Are you sure you want to remove these ' \
@ -860,6 +890,11 @@ swift-ring-builder <builder_file> rebalance [options]
handler.setFormatter(formatter)
logger.addHandler(handler)
if builder.next_part_power:
print('Partition power increase in progress.')
print('You need to finish the increase first before rebalancing.')
exit(EXIT_WARNING)
devs_changed = builder.devs_changed
min_part_seconds_left = builder.min_part_seconds_left
try:
@ -1224,6 +1259,159 @@ swift-ring-builder <builder_file> set_overload <overload>[%]
builder.save(builder_file)
exit(status)
@staticmethod
def prepare_increase_partition_power():
"""
swift-ring-builder <builder_file> prepare_increase_partition_power
Prepare the ring to increase the partition power by one.
A write_ring command is needed to make the change take effect.
Once the updated rings have been deployed to all servers you need to run
the swift-object-relinker tool to relink existing data.
*****************************
USE THIS WITH EXTREME CAUTION
*****************************
If you increase the partition power and deploy changed rings, you may
introduce unavailability in your cluster. This has an end-user impact. Make
sure you execute required operations to increase the partition power
accurately.
"""
if len(argv) < 3:
print(Commands.prepare_increase_partition_power.__doc__.strip())
exit(EXIT_ERROR)
if "object" not in basename(builder_file):
print(
'Partition power increase is only supported for object rings.')
exit(EXIT_ERROR)
if not builder.prepare_increase_partition_power():
print('Ring is already prepared for partition power increase.')
exit(EXIT_ERROR)
builder.save(builder_file)
print('The next partition power is now %d.' % builder.next_part_power)
print('The change will take effect after the next write_ring.')
print('Ensure your proxy-servers, object-replicators and ')
print('reconstructors are using the changed rings and relink ')
print('(using swift-object-relinker) your existing data')
print('before the partition power increase')
exit(EXIT_SUCCESS)
@staticmethod
def increase_partition_power():
"""
swift-ring-builder <builder_file> increase_partition_power
Increases the partition power by one. Needs to be run after
prepare_increase_partition_power has been run and all existing data has
been relinked using the swift-object-relinker tool.
A write_ring command is needed to make the change take effect.
Once the updated rings have been deployed to all servers you need to run
the swift-object-relinker tool to cleanup old data.
*****************************
USE THIS WITH EXTREME CAUTION
*****************************
If you increase the partition power and deploy changed rings, you may
introduce unavailability in your cluster. This has an end-user impact. Make
sure you execute required operations to increase the partition power
accurately.
"""
if len(argv) < 3:
print(Commands.increase_partition_power.__doc__.strip())
exit(EXIT_ERROR)
if builder.increase_partition_power():
print('The partition power is now %d.' % builder.part_power)
print('The change will take effect after the next write_ring.')
builder._update_last_part_moves()
builder.save(builder_file)
exit(EXIT_SUCCESS)
else:
print('Ring partition power cannot be increased. Either the ring')
print('was not prepared yet, or this operation has already run.')
exit(EXIT_ERROR)
@staticmethod
def cancel_increase_partition_power():
"""
swift-ring-builder <builder_file> cancel_increase_partition_power
Cancel the increase of the partition power.
A write_ring command is needed to make the change take effect.
Once the updated rings have been deployed to all servers you need to run
the swift-object-relinker tool to cleanup unneeded links.
*****************************
USE THIS WITH EXTREME CAUTION
*****************************
If you increase the partition power and deploy changed rings, you may
introduce unavailability in your cluster. This has an end-user impact. Make
sure you execute required operations to increase the partition power
accurately.
"""
if len(argv) < 3:
print(Commands.cancel_increase_partition_power.__doc__.strip())
exit(EXIT_ERROR)
if not builder.cancel_increase_partition_power():
print('Ring partition power increase cannot be canceled.')
exit(EXIT_ERROR)
builder.save(builder_file)
print('The next partition power is now %d.' % builder.next_part_power)
print('The change will take effect after the next write_ring.')
print('Ensure your object-servers are using the changed rings and')
print('cleanup (using swift-object-relinker) the hard links')
exit(EXIT_SUCCESS)
@staticmethod
def finish_increase_partition_power():
"""
swift-ring-builder <builder_file> finish_increase_partition_power
Finally removes the next_part_power flag. Has to be run after the
swift-object-relinker tool has been used to cleanup old existing data.
A write_ring command is needed to make the change take effect.
*****************************
USE THIS WITH EXTREME CAUTION
*****************************
If you increase the partition power and deploy changed rings, you may
introduce unavailability in your cluster. This has an end-user impact. Make
sure you execute required operations to increase the partition power
accurately.
"""
if len(argv) < 3:
print(Commands.finish_increase_partition_power.__doc__.strip())
exit(EXIT_ERROR)
if not builder.finish_increase_partition_power():
print('Ring partition power increase cannot be finished.')
exit(EXIT_ERROR)
print('The change will take effect after the next write_ring.')
builder.save(builder_file)
exit(EXIT_SUCCESS)
def main(arguments=None):
global argv, backup_dir, builder, builder_file, ring_file

View File

@ -86,6 +86,7 @@ class RingBuilder(object):
% (min_part_hours,))
self.part_power = part_power
self.next_part_power = None
self.replicas = replicas
self.min_part_hours = min_part_hours
self.parts = 2 ** self.part_power
@ -210,6 +211,7 @@ class RingBuilder(object):
"""
if hasattr(builder, 'devs'):
self.part_power = builder.part_power
self.next_part_power = builder.next_part_power
self.replicas = builder.replicas
self.min_part_hours = builder.min_part_hours
self.parts = builder.parts
@ -225,6 +227,7 @@ class RingBuilder(object):
self._id = getattr(builder, '_id', None)
else:
self.part_power = builder['part_power']
self.next_part_power = builder.get('next_part_power')
self.replicas = builder['replicas']
self.min_part_hours = builder['min_part_hours']
self.parts = builder['parts']
@ -261,6 +264,7 @@ class RingBuilder(object):
copy_from.
"""
return {'part_power': self.part_power,
'next_part_power': self.next_part_power,
'replicas': self.replicas,
'min_part_hours': self.min_part_hours,
'parts': self.parts,
@ -341,7 +345,8 @@ class RingBuilder(object):
self._ring = \
RingData([array('H', p2d) for p2d in
self._replica2part2dev],
devs, self.part_shift)
devs, self.part_shift,
self.next_part_power)
return self._ring
def add_dev(self, dev):
@ -1751,6 +1756,26 @@ class RingBuilder(object):
matched_devs.append(dev)
return matched_devs
def prepare_increase_partition_power(self):
"""
Prepares a ring for partition power increase.
This makes it possible to compute the future location of any object
based on the next partition power.
In this phase object servers should create hard links when finalizing a
write to the new location as well. A relinker will be run after
restarting object-servers, creating hard links to all existing objects
in their future location.
:returns: False if next_part_power was not set, otherwise True.
"""
if self.next_part_power:
return False
self.next_part_power = self.part_power + 1
self.version += 1
return True
def increase_partition_power(self):
"""
Increases ring partition power by one.
@ -1759,8 +1784,17 @@ class RingBuilder(object):
OLD: 0, 3, 7, 5, 2, 1, ...
NEW: 0, 0, 3, 3, 7, 7, 5, 5, 2, 2, 1, 1, ...
:returns: False if next_part_power was not set or is equal to current
part_power, None if something went wrong, otherwise True.
"""
if not self.next_part_power:
return False
if self.next_part_power != (self.part_power + 1):
return False
new_replica2part2dev = []
for replica in self._replica2part2dev:
new_replica = array('H')
@ -1775,13 +1809,47 @@ class RingBuilder(object):
# We need to update the time when a partition has been moved the last
# time. Since this is an array of all partitions, we need to double it
# two
# too
new_last_part_moves = []
for partition in self._last_part_moves:
new_last_part_moves.append(partition)
new_last_part_moves.append(partition)
self._last_part_moves = new_last_part_moves
self.part_power += 1
self.part_power = self.next_part_power
self.parts *= 2
self.version += 1
return True
def cancel_increase_partition_power(self):
"""
Cancels a ring partition power increasement.
This sets the next_part_power to the current part_power. Object
replicators will still skip replication, and a cleanup is still
required. Finally, a finish_increase_partition_power needs to be run.
:returns: False if next_part_power was not set or is equal to current
part_power, otherwise True.
"""
if not self.next_part_power:
return False
if self.next_part_power != (self.part_power + 1):
return False
self.next_part_power = self.part_power
self.version += 1
return True
def finish_increase_partition_power(self):
"""Finish the partition power increase.
The hard links from the old object locations should be removed by now.
"""
if self.next_part_power and self.next_part_power == self.part_power:
self.next_part_power = None
self.version += 1
return True
return False

View File

@ -38,10 +38,12 @@ from swift.common.ring.utils import tiers_for_dev
class RingData(object):
"""Partitioned consistent hashing ring data (used for serialization)."""
def __init__(self, replica2part2dev_id, devs, part_shift):
def __init__(self, replica2part2dev_id, devs, part_shift,
next_part_power=None):
self.devs = devs
self._replica2part2dev_id = replica2part2dev_id
self._part_shift = part_shift
self.next_part_power = next_part_power
for dev in self.devs:
if dev is not None:
@ -113,18 +115,27 @@ class RingData(object):
if not hasattr(ring_data, 'devs'):
ring_data = RingData(ring_data['replica2part2dev_id'],
ring_data['devs'], ring_data['part_shift'])
ring_data['devs'], ring_data['part_shift'],
ring_data.get('next_part_power'))
return ring_data
def serialize_v1(self, file_obj):
# Write out new-style serialization magic and version:
file_obj.write(struct.pack('!4sH', 'R1NG', 1))
ring = self.to_dict()
# Only include next_part_power if it is set in the
# builder, otherwise just ignore it
_text = {'devs': ring['devs'], 'part_shift': ring['part_shift'],
'replica_count': len(ring['replica2part2dev_id']),
'byteorder': sys.byteorder}
next_part_power = ring.get('next_part_power')
if next_part_power is not None:
_text['next_part_power'] = next_part_power
json_encoder = json.JSONEncoder(sort_keys=True)
json_text = json_encoder.encode(
{'devs': ring['devs'], 'part_shift': ring['part_shift'],
'replica_count': len(ring['replica2part2dev_id']),
'byteorder': sys.byteorder})
json_text = json_encoder.encode(_text)
json_len = len(json_text)
file_obj.write(struct.pack('!I', json_len))
file_obj.write(json_text)
@ -155,7 +166,8 @@ class RingData(object):
def to_dict(self):
return {'devs': self.devs,
'replica2part2dev_id': self._replica2part2dev_id,
'part_shift': self._part_shift}
'part_shift': self._part_shift,
'next_part_power': self.next_part_power}
class Ring(object):
@ -244,6 +256,15 @@ class Ring(object):
self._num_regions = len(regions)
self._num_zones = len(zones)
self._num_ips = len(ips)
self._next_part_power = ring_data.next_part_power
@property
def next_part_power(self):
return self._next_part_power
@property
def part_power(self):
return 32 - self._part_shift
def _rebuild_tier_data(self):
self.tier2devs = defaultdict(list)

View File

@ -17,6 +17,7 @@
from __future__ import print_function
import binascii
import errno
import fcntl
import grp
@ -27,6 +28,7 @@ import operator
import os
import pwd
import re
import struct
import sys
import time
import uuid
@ -4244,3 +4246,25 @@ def md5_hash_for_file(fname):
for block in iter(lambda: f.read(MD5_BLOCK_READ_BYTES), ''):
md5sum.update(block)
return md5sum.hexdigest()
def replace_partition_in_path(path, part_power):
"""
Takes a full path to a file and a partition power and returns
the same path, but with the correct partition number. Most useful when
increasing the partition power.
:param path: full path to a file, for example object .data file
:param part_power: partition power to compute correct partition number
:returns: Path with re-computed partition power
"""
path_components = path.split(os.sep)
digest = binascii.unhexlify(path_components[-2])
part_shift = 32 - int(part_power)
part = struct.unpack_from('>I', digest)[0] >> part_shift
path_components[-4] = "%d" % part
return os.sep.join(path_components)

View File

@ -65,7 +65,7 @@ from swift.common.utils import mkdirs, Timestamp, \
config_true_value, listdir, split_path, ismount, remove_file, \
get_md5_socket, F_SETPIPE_SZ, decode_timestamps, encode_timestamps, \
tpool_reraise, MD5_OF_EMPTY_STRING, link_fd_to_path, o_tmpfile_supported, \
O_TMPFILE, makedirs_count
O_TMPFILE, makedirs_count, replace_partition_in_path
from swift.common.splice import splice, tee
from swift.common.exceptions import DiskFileQuarantined, DiskFileNotExist, \
DiskFileCollision, DiskFileNoSpace, DiskFileDeviceUnavailable, \
@ -345,6 +345,37 @@ def invalidate_hash(suffix_dir):
inv_fh.write(suffix + "\n")
def relink_paths(target_path, new_target_path, check_existing=False):
"""
Hard-links a file located in target_path using the second path
new_target_path. Creates intermediate directories if required.
:param target_path: current absolute filename
:param new_target_path: new absolute filename for the hardlink
:param check_existing: if True, check whether the link is already present
before attempting to create a new one
"""
if target_path != new_target_path:
logging.debug('Relinking %s to %s due to next_part_power set',
target_path, new_target_path)
new_target_dir = os.path.dirname(new_target_path)
if not os.path.isdir(new_target_dir):
os.makedirs(new_target_dir)
link_exists = False
if check_existing:
try:
new_stat = os.stat(new_target_path)
orig_stat = os.stat(target_path)
link_exists = (new_stat.st_ino == orig_stat.st_ino)
except OSError:
pass # if anything goes wrong, try anyway
if not link_exists:
os.link(target_path, new_target_path)
def get_part_path(dev_path, policy, partition):
"""
Given the device path, policy, and partition, returns the full
@ -1455,9 +1486,11 @@ class BaseDiskFileWriter(object):
:param tmppath: full path name of the opened file descriptor
:param bytes_per_sync: number bytes written between sync calls
:param diskfile: the diskfile creating this DiskFileWriter instance
:param next_part_power: the next partition power to be used
"""
def __init__(self, name, datadir, fd, tmppath, bytes_per_sync, diskfile):
def __init__(self, name, datadir, fd, tmppath, bytes_per_sync, diskfile,
next_part_power):
# Parameter tracking
self._name = name
self._datadir = datadir
@ -1465,6 +1498,7 @@ class BaseDiskFileWriter(object):
self._tmppath = tmppath
self._bytes_per_sync = bytes_per_sync
self._diskfile = diskfile
self.next_part_power = next_part_power
# Internal attributes
self._upload_size = 0
@ -1528,6 +1562,21 @@ class BaseDiskFileWriter(object):
# It was an unnamed temp file created by open() with O_TMPFILE
link_fd_to_path(self._fd, target_path,
self._diskfile._dirs_created)
# Check if the partition power will/has been increased
new_target_path = None
if self.next_part_power:
new_target_path = replace_partition_in_path(
target_path, self.next_part_power)
if target_path != new_target_path:
try:
fsync_dir(os.path.dirname(target_path))
relink_paths(target_path, new_target_path)
except OSError as exc:
self.manager.logger.exception(
'Relinking %s to %s failed: %s',
target_path, new_target_path, exc)
# If rename is successful, flag put as succeeded. This is done to avoid
# unnecessary os.unlink() of tempfile later. As renamer() has
# succeeded, the tempfile would no longer exist at its original path.
@ -1538,6 +1587,8 @@ class BaseDiskFileWriter(object):
except OSError:
logging.exception(_('Problem cleaning up %s'), self._datadir)
self._part_power_cleanup(target_path, new_target_path)
def _put(self, metadata, cleanup=True, *a, **kw):
"""
Helper method for subclasses.
@ -1584,6 +1635,43 @@ class BaseDiskFileWriter(object):
"""
pass
def _part_power_cleanup(self, cur_path, new_path):
"""
Cleanup relative DiskFile directories.
If the partition power is increased soon or has just been increased but
the relinker didn't yet cleanup the old files, an additional cleanup of
the relative dirs has to be done. Otherwise there might be some unused
files left if a PUT or DELETE is done in the meantime
:param cur_path: current full path to an object file
:param new_path: recomputed path to an object file, based on the
next_part_power set in the ring
"""
if new_path is None:
return
# Partition power will be increased soon
if new_path != cur_path:
new_target_dir = os.path.dirname(new_path)
try:
self.manager.cleanup_ondisk_files(new_target_dir)
except OSError:
logging.exception(
_('Problem cleaning up %s'), new_target_dir)
# Partition power has been increased, cleanup not yet finished
else:
prev_part_power = int(self.next_part_power) - 1
old_target_path = replace_partition_in_path(
cur_path, prev_part_power)
old_target_dir = os.path.dirname(old_target_path)
try:
self.manager.cleanup_ondisk_files(old_target_dir)
except OSError:
logging.exception(
_('Problem cleaning up %s'), old_target_dir)
class BaseDiskFileReader(object):
"""
@ -1922,6 +2010,7 @@ class BaseDiskFile(object):
:param use_linkat: if True, use open() with linkat() to create obj file
:param open_expired: if True, open() will not raise a DiskFileExpired if
object is expired
:param next_part_power: the next partition power to be used
"""
reader_cls = None # must be set by subclasses
writer_cls = None # must be set by subclasses
@ -1929,7 +2018,8 @@ class BaseDiskFile(object):
def __init__(self, mgr, device_path, partition,
account=None, container=None, obj=None, _datadir=None,
policy=None, use_splice=False, pipe_size=None,
use_linkat=False, open_expired=False, **kwargs):
use_linkat=False, open_expired=False, next_part_power=None,
**kwargs):
self._manager = mgr
self._device_path = device_path
self._logger = mgr.logger
@ -1947,6 +2037,7 @@ class BaseDiskFile(object):
# in all entry fops being carried out synchronously.
self._dirs_created = 0
self.policy = policy
self.next_part_power = next_part_power
if account and container and obj:
self._name = '/' + '/'.join((account, container, obj))
self._account = account
@ -2491,7 +2582,8 @@ class BaseDiskFile(object):
raise
dfw = self.writer_cls(self._name, self._datadir, fd, tmppath,
bytes_per_sync=self._bytes_per_sync,
diskfile=self)
diskfile=self,
next_part_power=self.next_part_power)
yield dfw
finally:
try:
@ -2712,10 +2804,27 @@ class ECDiskFileWriter(BaseDiskFileWriter):
def _finalize_durable(self, data_file_path, durable_data_file_path):
exc = None
new_data_file_path = new_durable_data_file_path = None
if self.next_part_power:
new_data_file_path = replace_partition_in_path(
data_file_path, self.next_part_power)
new_durable_data_file_path = replace_partition_in_path(
durable_data_file_path, self.next_part_power)
try:
try:
os.rename(data_file_path, durable_data_file_path)
fsync_dir(self._datadir)
if self.next_part_power and \
data_file_path != new_data_file_path:
try:
os.rename(new_data_file_path,
new_durable_data_file_path)
except OSError as exc:
self.manager.logger.exception(
'Renaming new path %s to %s failed: %s',
new_data_file_path, new_durable_data_file_path,
exc)
except (OSError, IOError) as err:
if err.errno not in (errno.ENOSPC, errno.EDQUOT):
# re-raise to catch all handler
@ -2733,6 +2842,9 @@ class ECDiskFileWriter(BaseDiskFileWriter):
self.manager.logger.exception(
_('Problem cleaning up %(datadir)s (%(err)s)'),
{'datadir': self._datadir, 'err': os_err})
self._part_power_cleanup(
durable_data_file_path, new_durable_data_file_path)
except Exception as err:
params = {'file': durable_data_file_path, 'err': err}
self.manager.logger.exception(

View File

@ -920,6 +920,19 @@ class ObjectReconstructor(Daemon):
all_parts = []
for policy, local_devices in policy2devices.items():
# Skip replication if next_part_power is set. In this case
# every object is hard-linked twice, but the replicator
# can't detect them and would create a second copy of the
# file if not yet existing - and this might double the
# actual transferred and stored data
next_part_power = getattr(
policy.object_ring, 'next_part_power', None)
if next_part_power is not None:
self.logger.warning(
_("next_part_power set in policy '%s'. Skipping"),
policy.name)
continue
df_mgr = self._df_router[policy]
for local_dev in local_devices:
dev_path = df_mgr.get_dev_path(local_dev['device'])
@ -1022,6 +1035,7 @@ class ObjectReconstructor(Daemon):
self.logger.info(_("Ring change detected. Aborting "
"current reconstruction pass."))
return
self.reconstruction_part_count += 1
jobs = self.build_reconstruction_jobs(part_info)
if not jobs:

View File

@ -669,6 +669,19 @@ class ObjectReplicator(Daemon):
jobs = []
ips = whataremyips(self.bind_ip)
for policy in POLICIES:
# Skip replication if next_part_power is set. In this case
# every object is hard-linked twice, but the replicator can't
# detect them and would create a second copy of the file if not
# yet existing - and this might double the actual transferred
# and stored data
next_part_power = getattr(
policy.object_ring, 'next_part_power', None)
if next_part_power is not None:
self.logger.warning(
_("next_part_power set in policy '%s'. Skipping"),
policy.name)
continue
if policy.policy_type == REPL_POLICY:
if (override_policies is not None and
str(policy.idx) not in override_policies):
@ -737,6 +750,7 @@ class ObjectReplicator(Daemon):
self.logger.info(_("Ring change detected. Aborting "
"current replication pass."))
return
try:
if isfile(job['path']):
# Clean up any (probably zero-byte) files where a

View File

@ -542,11 +542,13 @@ class ObjectController(BaseStorageServer):
if new_delete_at and new_delete_at < time.time():
return HTTPBadRequest(body='X-Delete-At in past', request=request,
content_type='text/plain')
next_part_power = request.headers.get('X-Backend-Next-Part-Power')
try:
disk_file = self.get_diskfile(
device, partition, account, container, obj,
policy=policy, open_expired=config_true_value(
request.headers.get('x-backend-replication', 'false')))
request.headers.get('x-backend-replication', 'false')),
next_part_power=next_part_power)
except DiskFileDeviceUnavailable:
return HTTPInsufficientStorage(drive=device, request=request)
try:
@ -704,10 +706,12 @@ class ObjectController(BaseStorageServer):
# nodes; handoff nodes should 409 subrequests to over-write an
# existing data fragment until they offloaded the existing fragment
frag_index = request.headers.get('X-Backend-Ssync-Frag-Index')
next_part_power = request.headers.get('X-Backend-Next-Part-Power')
try:
disk_file = self.get_diskfile(
device, partition, account, container, obj,
policy=policy, frag_index=frag_index)
policy=policy, frag_index=frag_index,
next_part_power=next_part_power)
except DiskFileDeviceUnavailable:
return HTTPInsufficientStorage(drive=device, request=request)
try:
@ -1016,10 +1020,11 @@ class ObjectController(BaseStorageServer):
device, partition, account, container, obj, policy = \
get_name_and_placement(request, 5, 5, True)
req_timestamp = valid_timestamp(request)
next_part_power = request.headers.get('X-Backend-Next-Part-Power')
try:
disk_file = self.get_diskfile(
device, partition, account, container, obj,
policy=policy)
policy=policy, next_part_power=next_part_power)
except DiskFileDeviceUnavailable:
return HTTPInsufficientStorage(drive=device, request=request)
try:

View File

@ -234,6 +234,9 @@ class BaseObjectController(Controller):
container_info['storage_policy'])
obj_ring = self.app.get_object_ring(policy_index)
req.headers['X-Backend-Storage-Policy-Index'] = policy_index
next_part_power = getattr(obj_ring, 'next_part_power', None)
if next_part_power:
req.headers['X-Backend-Next-Part-Power'] = next_part_power
partition, nodes = obj_ring.get_nodes(
self.account_name, self.container_name, self.object_name)
@ -642,6 +645,9 @@ class BaseObjectController(Controller):
# pass the policy index to storage nodes via req header
req.headers['X-Backend-Storage-Policy-Index'] = policy_index
next_part_power = getattr(obj_ring, 'next_part_power', None)
if next_part_power:
req.headers['X-Backend-Next-Part-Power'] = next_part_power
req.acl = container_info['write_acl']
req.environ['swift_sync_key'] = container_info['sync_key']
@ -700,6 +706,9 @@ class BaseObjectController(Controller):
obj_ring = self.app.get_object_ring(policy_index)
# pass the policy index to storage nodes via req header
req.headers['X-Backend-Storage-Policy-Index'] = policy_index
next_part_power = getattr(obj_ring, 'next_part_power', None)
if next_part_power:
req.headers['X-Backend-Next-Part-Power'] = next_part_power
container_partition = container_info['partition']
container_nodes = container_info['nodes']
req.acl = container_info['write_acl']

View File

@ -0,0 +1,199 @@
#!/usr/bin/env python
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from errno import EEXIST
from shutil import copyfile
from tempfile import mkstemp
from time import time
from unittest import main
from uuid import uuid4
from swiftclient import client
from swift.cli.relinker import relink, cleanup
from swift.common.manager import Manager
from swift.common.ring import RingBuilder
from swift.common.utils import replace_partition_in_path
from swift.obj.diskfile import get_data_dir
from test.probe.common import ECProbeTest, ProbeTest, ReplProbeTest
class TestPartPowerIncrease(ProbeTest):
def setUp(self):
super(TestPartPowerIncrease, self).setUp()
_, self.ring_file_backup = mkstemp()
_, self.builder_file_backup = mkstemp()
self.ring_file = self.object_ring.serialized_path
self.builder_file = self.ring_file.replace('ring.gz', 'builder')
copyfile(self.ring_file, self.ring_file_backup)
copyfile(self.builder_file, self.builder_file_backup)
# In case the test user is not allowed to write rings
self.assertTrue(os.access('/etc/swift', os.W_OK))
self.assertTrue(os.access('/etc/swift/backups', os.W_OK))
self.assertTrue(os.access('/etc/swift/object.builder', os.W_OK))
self.assertTrue(os.access('/etc/swift/object.ring.gz', os.W_OK))
# Ensure the test object will be erasure coded
self.data = ' ' * getattr(self.policy, 'ec_segment_size', 1)
self.devices = [
self.device_dir('object', {'ip': ip, 'port': port, 'device': ''})
for ip, port in set((dev['ip'], dev['port'])
for dev in self.object_ring.devs)]
def tearDown(self):
# Keep a backup copy of the modified .builder file
backup_dir = os.path.join(
os.path.dirname(self.builder_file), 'backups')
try:
os.mkdir(backup_dir)
except OSError as err:
if err.errno != EEXIST:
raise
backup_name = (os.path.join(
backup_dir,
'%d.probe.' % time() + os.path.basename(self.builder_file)))
copyfile(self.builder_file, backup_name)
# Restore original ring
os.system('sudo mv %s %s' % (
self.ring_file_backup, self.ring_file))
os.system('sudo mv %s %s' % (
self.builder_file_backup, self.builder_file))
def _find_objs_ondisk(self, container, obj):
locations = []
opart, onodes = self.object_ring.get_nodes(
self.account, container, obj)
for node in onodes:
start_dir = os.path.join(
self.device_dir('object', node),
get_data_dir(self.policy),
str(opart))
for root, dirs, files in os.walk(start_dir):
for filename in files:
if filename.endswith('.data'):
locations.append(os.path.join(root, filename))
return locations
def _test_main(self, cancel=False):
container = 'container-%s' % uuid4()
obj = 'object-%s' % uuid4()
obj2 = 'object-%s' % uuid4()
# Create container
headers = {'X-Storage-Policy': self.policy.name}
client.put_container(self.url, self.token, container, headers=headers)
# Create a new object
client.put_object(self.url, self.token, container, obj, self.data)
client.head_object(self.url, self.token, container, obj)
# Prepare partition power increase
builder = RingBuilder.load(self.builder_file)
builder.prepare_increase_partition_power()
builder.save(self.builder_file)
ring_data = builder.get_ring()
ring_data.save(self.ring_file)
# Ensure the proxy uses the changed ring
Manager(['proxy']).restart()
# Ensure object is still accessible
client.head_object(self.url, self.token, container, obj)
# Relink existing objects
for device in self.devices:
self.assertEqual(0, relink(skip_mount_check=True, devices=device))
# Create second object after relinking and ensure it is accessible
client.put_object(self.url, self.token, container, obj2, self.data)
client.head_object(self.url, self.token, container, obj2)
# Remember the original object locations
org_locations = self._find_objs_ondisk(container, obj)
org_locations += self._find_objs_ondisk(container, obj2)
# Remember the new object locations
new_locations = []
for loc in org_locations:
new_locations.append(replace_partition_in_path(
str(loc), self.object_ring.part_power + 1))
# Overwrite existing object - to ensure that older timestamp files
# will be cleaned up properly later
client.put_object(self.url, self.token, container, obj, self.data)
# Ensure objects are still accessible
client.head_object(self.url, self.token, container, obj)
client.head_object(self.url, self.token, container, obj2)
# Increase partition power
builder = RingBuilder.load(self.builder_file)
if not cancel:
builder.increase_partition_power()
else:
builder.cancel_increase_partition_power()
builder.save(self.builder_file)
ring_data = builder.get_ring()
ring_data.save(self.ring_file)
# Ensure the proxy uses the changed ring
Manager(['proxy']).restart()
# Ensure objects are still accessible
client.head_object(self.url, self.token, container, obj)
client.head_object(self.url, self.token, container, obj2)
# Overwrite existing object - to ensure that older timestamp files
# will be cleaned up properly later
client.put_object(self.url, self.token, container, obj, self.data)
# Cleanup old objects in the wrong location
for device in self.devices:
self.assertEqual(0, cleanup(skip_mount_check=True, devices=device))
# Ensure objects are still accessible
client.head_object(self.url, self.token, container, obj)
client.head_object(self.url, self.token, container, obj2)
# Ensure data in old or relinked object locations is removed
if not cancel:
for fn in org_locations:
self.assertFalse(os.path.exists(fn))
else:
for fn in new_locations:
self.assertFalse(os.path.exists(fn))
class TestReplPartPowerIncrease(TestPartPowerIncrease, ReplProbeTest):
def test_main(self):
self._test_main()
def test_canceled(self):
self._test_main(cancel=True)
class TestECPartPowerIncrease(TestPartPowerIncrease, ECProbeTest):
def test_main(self):
self._test_main()
def test_canceled(self):
self._test_main(cancel=True)
if __name__ == '__main__':
main()

View File

@ -300,8 +300,7 @@ class FabricatedRing(Ring):
self.nodes = nodes
self.port = port
self.replicas = replicas
self.part_power = part_power
self._part_shift = 32 - self.part_power
self._part_shift = 32 - part_power
self._reload()
def _reload(self, *args, **kwargs):

View File

@ -0,0 +1,172 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import binascii
import os
import shutil
import struct
import tempfile
import unittest
from swift.cli import relinker
from swift.common import exceptions, ring, utils
from swift.common import storage_policy
from swift.common.storage_policy import (
StoragePolicy, StoragePolicyCollection, POLICIES)
from swift.obj.diskfile import write_metadata
from test.unit import FakeLogger
class TestRelinker(unittest.TestCase):
def setUp(self):
self.logger = FakeLogger()
self.testdir = tempfile.mkdtemp()
self.devices = os.path.join(self.testdir, 'node')
shutil.rmtree(self.testdir, ignore_errors=1)
os.mkdir(self.testdir)
os.mkdir(self.devices)
self.rb = ring.RingBuilder(8, 6.0, 1)
for i in range(6):
ip = "127.0.0.%s" % i
self.rb.add_dev({'id': i, 'region': 0, 'zone': 0, 'weight': 1,
'ip': ip, 'port': 10000, 'device': 'sda1'})
self.rb.rebalance(seed=1)
self.existing_device = 'sda1'
os.mkdir(os.path.join(self.devices, self.existing_device))
self.objects = os.path.join(self.devices, self.existing_device,
'objects')
os.mkdir(self.objects)
self._hash = utils.hash_path('a/c/o')
digest = binascii.unhexlify(self._hash)
part = struct.unpack_from('>I', digest)[0] >> 24
self.next_part = struct.unpack_from('>I', digest)[0] >> 23
self.objdir = os.path.join(
self.objects, str(part), self._hash[-3:], self._hash)
os.makedirs(self.objdir)
self.object_fname = "1278553064.00000.data"
self.objname = os.path.join(self.objdir, self.object_fname)
with open(self.objname, "wb") as dummy:
dummy.write("Hello World!")
write_metadata(dummy, {'name': '/a/c/o', 'Content-Length': '12'})
test_policies = [StoragePolicy(0, 'platin', True)]
storage_policy._POLICIES = StoragePolicyCollection(test_policies)
self.expected_dir = os.path.join(
self.objects, str(self.next_part), self._hash[-3:], self._hash)
self.expected_file = os.path.join(self.expected_dir, self.object_fname)
def _save_ring(self):
rd = self.rb.get_ring()
for policy in POLICIES:
rd.save(os.path.join(
self.testdir, '%s.ring.gz' % policy.ring_name))
# Enforce ring reloading in relinker
policy.object_ring = None
def tearDown(self):
shutil.rmtree(self.testdir, ignore_errors=1)
storage_policy.reload_storage_policies()
def test_relink(self):
self.rb.prepare_increase_partition_power()
self._save_ring()
relinker.relink(self.testdir, self.devices, True)
self.assertTrue(os.path.isdir(self.expected_dir))
self.assertTrue(os.path.isfile(self.expected_file))
stat_old = os.stat(os.path.join(self.objdir, self.object_fname))
stat_new = os.stat(self.expected_file)
self.assertEqual(stat_old.st_ino, stat_new.st_ino)
def _common_test_cleanup(self, relink=True):
# Create a ring that has prev_part_power set
self.rb.prepare_increase_partition_power()
self.rb.increase_partition_power()
self._save_ring()
os.makedirs(self.expected_dir)
if relink:
# Create a hardlink to the original object name. This is expected
# after a normal relinker run
os.link(os.path.join(self.objdir, self.object_fname),
self.expected_file)
def test_cleanup(self):
self._common_test_cleanup()
self.assertEqual(0, relinker.cleanup(self.testdir, self.devices, True))
# Old objectname should be removed, new should still exist
self.assertTrue(os.path.isdir(self.expected_dir))
self.assertTrue(os.path.isfile(self.expected_file))
self.assertFalse(os.path.isfile(
os.path.join(self.objdir, self.object_fname)))
def test_cleanup_not_yet_relinked(self):
self._common_test_cleanup(relink=False)
self.assertEqual(1, relinker.cleanup(self.testdir, self.devices, True))
self.assertTrue(os.path.isfile(
os.path.join(self.objdir, self.object_fname)))
def test_cleanup_deleted(self):
self._common_test_cleanup()
# Pretend the object got deleted inbetween and there is a tombstone
fname_ts = self.expected_file[:-4] + "ts"
os.rename(self.expected_file, fname_ts)
self.assertEqual(0, relinker.cleanup(self.testdir, self.devices, True))
def test_cleanup_doesnotexist(self):
self._common_test_cleanup()
# Pretend the file in the new place got deleted inbetween
os.remove(self.expected_file)
self.assertEqual(
1, relinker.cleanup(self.testdir, self.devices, True, self.logger))
self.assertEqual(self.logger.get_lines_for_level('warning'),
['Error cleaning up %s: %s' % (self.objname,
repr(exceptions.DiskFileNotExist()))])
def test_cleanup_non_durable_fragment(self):
self._common_test_cleanup()
# Actually all fragments are non-durable and raise and DiskFileNotExist
# in EC in this test. However, if the counterpart exists in the new
# location, this is ok - it will be fixed by the reconstructor later on
storage_policy._POLICIES[0].policy_type = 'erasure_coding'
self.assertEqual(
0, relinker.cleanup(self.testdir, self.devices, True, self.logger))
self.assertEqual(self.logger.get_lines_for_level('warning'), [])
def test_cleanup_quarantined(self):
self._common_test_cleanup()
# Pretend the object in the new place got corrupted
with open(self.expected_file, "wb") as obj:
obj.write('trash')
self.assertEqual(
1, relinker.cleanup(self.testdir, self.devices, True, self.logger))
self.assertIn('failed audit and was quarantined',
self.logger.get_lines_for_level('warning')[0])

View File

@ -489,6 +489,16 @@ class TestCommands(unittest.TestCase, RunSwiftRingBuilderMixin):
dev = ring.devs[-1]
self.assertGreater(dev['region'], 0)
def test_add_device_part_power_increase(self):
self.create_sample_ring()
ring = RingBuilder.load(self.tmpfile)
ring.next_part_power = 1
ring.save(self.tmpfile)
argv = ["", self.tmpfile, "add",
"r0z0-127.0.1.1:6200/sda1_some meta data", "100"]
self.assertSystemExit(EXIT_WARNING, ringbuilder.main, argv)
def test_remove_device(self):
for search_value in self.search_values:
self.create_sample_ring()
@ -762,6 +772,15 @@ class TestCommands(unittest.TestCase, RunSwiftRingBuilderMixin):
"--ip", "unknown"]
self.assertSystemExit(EXIT_ERROR, ringbuilder.main, argv)
def test_remove_device_part_power_increase(self):
self.create_sample_ring()
ring = RingBuilder.load(self.tmpfile)
ring.next_part_power = 1
ring.save(self.tmpfile)
argv = ["", self.tmpfile, "remove", "d0"]
self.assertSystemExit(EXIT_WARNING, ringbuilder.main, argv)
def test_set_weight(self):
for search_value in self.search_values:
self.create_sample_ring()
@ -2025,6 +2044,15 @@ class TestCommands(unittest.TestCase, RunSwiftRingBuilderMixin):
ring = RingBuilder.load(self.tmpfile)
self.assertEqual(last_replica2part2dev, ring._replica2part2dev)
def test_rebalance_part_power_increase(self):
self.create_sample_ring()
ring = RingBuilder.load(self.tmpfile)
ring.next_part_power = 1
ring.save(self.tmpfile)
argv = ["", self.tmpfile, "rebalance", "3"]
self.assertSystemExit(EXIT_WARNING, ringbuilder.main, argv)
def test_write_ring(self):
self.create_sample_ring()
argv = ["", self.tmpfile, "rebalance"]

View File

@ -2604,6 +2604,40 @@ class TestRingBuilder(unittest.TestCase):
except exceptions.DuplicateDeviceError:
self.fail("device hole not reused")
def test_prepare_increase_partition_power(self):
ring_file = os.path.join(self.testdir, 'test_partpower.ring.gz')
rb = ring.RingBuilder(8, 3.0, 1)
self.assertEqual(rb.part_power, 8)
# add more devices than replicas to the ring
for i in range(10):
dev = "sdx%s" % i
rb.add_dev({'id': i, 'region': 0, 'zone': 0, 'weight': 1,
'ip': '127.0.0.1', 'port': 10000, 'device': dev})
rb.rebalance(seed=1)
self.assertFalse(rb.cancel_increase_partition_power())
self.assertEqual(rb.part_power, 8)
self.assertIsNone(rb.next_part_power)
self.assertFalse(rb.finish_increase_partition_power())
self.assertEqual(rb.part_power, 8)
self.assertIsNone(rb.next_part_power)
self.assertTrue(rb.prepare_increase_partition_power())
self.assertEqual(rb.part_power, 8)
self.assertEqual(rb.next_part_power, 9)
# Save .ring.gz, and load ring from it to ensure prev/next is set
rd = rb.get_ring()
rd.save(ring_file)
r = ring.Ring(ring_file)
expected_part_shift = 32 - 8
self.assertEqual(expected_part_shift, r._part_shift)
self.assertEqual(9, r.next_part_power)
def test_increase_partition_power(self):
rb = ring.RingBuilder(8, 3.0, 1)
self.assertEqual(rb.part_power, 8)
@ -2623,13 +2657,18 @@ class TestRingBuilder(unittest.TestCase):
old_part, old_nodes = r.get_nodes("acc", "cont", "obj")
old_version = rb.version
rb.increase_partition_power()
self.assertTrue(rb.prepare_increase_partition_power())
self.assertTrue(rb.increase_partition_power())
rb.validate()
changed_parts, _balance, removed_devs = rb.rebalance()
self.assertEqual(changed_parts, 0)
self.assertEqual(removed_devs, 0)
# Make sure cancellation is not possible
# after increasing the partition power
self.assertFalse(rb.cancel_increase_partition_power())
old_ring = r
rd = rb.get_ring()
rd.save(ring_file)
@ -2637,8 +2676,9 @@ class TestRingBuilder(unittest.TestCase):
new_part, new_nodes = r.get_nodes("acc", "cont", "obj")
# sanity checks
self.assertEqual(rb.part_power, 9)
self.assertEqual(rb.version, old_version + 2)
self.assertEqual(9, rb.part_power)
self.assertEqual(9, rb.next_part_power)
self.assertEqual(rb.version, old_version + 3)
# make sure there is always the same device assigned to every pair of
# partitions
@ -2670,6 +2710,107 @@ class TestRingBuilder(unittest.TestCase):
# nodes after increasing the partition power
self.assertEqual(old_nodes, new_nodes)
def test_finalize_increase_partition_power(self):
ring_file = os.path.join(self.testdir, 'test_partpower.ring.gz')
rb = ring.RingBuilder(8, 3.0, 1)
self.assertEqual(rb.part_power, 8)
# add more devices than replicas to the ring
for i in range(10):
dev = "sdx%s" % i
rb.add_dev({'id': i, 'region': 0, 'zone': 0, 'weight': 1,
'ip': '127.0.0.1', 'port': 10000, 'device': dev})
rb.rebalance(seed=1)
self.assertTrue(rb.prepare_increase_partition_power())
# Make sure this doesn't do any harm before actually increasing the
# partition power
self.assertFalse(rb.finish_increase_partition_power())
self.assertEqual(rb.next_part_power, 9)
self.assertTrue(rb.increase_partition_power())
self.assertFalse(rb.prepare_increase_partition_power())
self.assertEqual(rb.part_power, 9)
self.assertEqual(rb.next_part_power, 9)
self.assertTrue(rb.finish_increase_partition_power())
self.assertEqual(rb.part_power, 9)
self.assertIsNone(rb.next_part_power)
# Save .ring.gz, and load ring from it to ensure prev/next is set
rd = rb.get_ring()
rd.save(ring_file)
r = ring.Ring(ring_file)
expected_part_shift = 32 - 9
self.assertEqual(expected_part_shift, r._part_shift)
self.assertIsNone(r.next_part_power)
def test_prepare_increase_partition_power_failed(self):
rb = ring.RingBuilder(8, 3.0, 1)
self.assertEqual(rb.part_power, 8)
self.assertTrue(rb.prepare_increase_partition_power())
self.assertEqual(rb.next_part_power, 9)
# next_part_power is still set, do not increase again
self.assertFalse(rb.prepare_increase_partition_power())
self.assertEqual(rb.next_part_power, 9)
def test_increase_partition_power_failed(self):
rb = ring.RingBuilder(8, 3.0, 1)
self.assertEqual(rb.part_power, 8)
# add more devices than replicas to the ring
for i in range(10):
dev = "sdx%s" % i
rb.add_dev({'id': i, 'region': 0, 'zone': 0, 'weight': 1,
'ip': '127.0.0.1', 'port': 10000, 'device': dev})
rb.rebalance(seed=1)
# next_part_power not set, can't increase the part power
self.assertFalse(rb.increase_partition_power())
self.assertEqual(rb.part_power, 8)
self.assertTrue(rb.prepare_increase_partition_power())
self.assertTrue(rb.increase_partition_power())
self.assertEqual(rb.part_power, 9)
# part_power already increased
self.assertFalse(rb.increase_partition_power())
self.assertEqual(rb.part_power, 9)
def test_cancel_increase_partition_power(self):
rb = ring.RingBuilder(8, 3.0, 1)
self.assertEqual(rb.part_power, 8)
# add more devices than replicas to the ring
for i in range(10):
dev = "sdx%s" % i
rb.add_dev({'id': i, 'region': 0, 'zone': 0, 'weight': 1,
'ip': '127.0.0.1', 'port': 10000, 'device': dev})
rb.rebalance(seed=1)
old_version = rb.version
self.assertTrue(rb.prepare_increase_partition_power())
# sanity checks
self.assertEqual(8, rb.part_power)
self.assertEqual(9, rb.next_part_power)
self.assertEqual(rb.version, old_version + 1)
self.assertTrue(rb.cancel_increase_partition_power())
rb.validate()
self.assertEqual(8, rb.part_power)
self.assertEqual(8, rb.next_part_power)
self.assertEqual(rb.version, old_version + 2)
class TestGetRequiredOverload(unittest.TestCase):

View File

@ -3819,6 +3819,28 @@ cluster_dfw1 = http://dfw1.host/v1/
self.fail('Invalid results from pure function:\n%s' %
'\n'.join(failures))
def test_replace_partition_in_path(self):
# Check for new part = part * 2
old = '/s/n/d/o/700/c77/af088baea4806dcaba30bf07d9e64c77/f'
new = '/s/n/d/o/1400/c77/af088baea4806dcaba30bf07d9e64c77/f'
# Expected outcome
self.assertEqual(utils.replace_partition_in_path(old, 11), new)
# Make sure there is no change if the part power didn't change
self.assertEqual(utils.replace_partition_in_path(old, 10), old)
self.assertEqual(utils.replace_partition_in_path(new, 11), new)
# Check for new part = part * 2 + 1
old = '/s/n/d/o/693/c77/ad708baea4806dcaba30bf07d9e64c77/f'
new = '/s/n/d/o/1387/c77/ad708baea4806dcaba30bf07d9e64c77/f'
# Expected outcome
self.assertEqual(utils.replace_partition_in_path(old, 11), new)
# Make sure there is no change if the part power didn't change
self.assertEqual(utils.replace_partition_in_path(old, 10), old)
self.assertEqual(utils.replace_partition_in_path(new, 11), new)
class ResellerConfReader(unittest.TestCase):

View File

@ -28,6 +28,7 @@ import uuid
import xattr
import re
import six
import struct
from collections import defaultdict
from random import shuffle, randint
from shutil import rmtree
@ -3839,16 +3840,25 @@ class DiskFileMixin(BaseDiskFileTestMixin):
DiskFileNoSpace,
diskfile.write_metadata, 'n/a', metadata)
def _create_diskfile_dir(self, timestamp, policy, legacy_durable=False):
def _create_diskfile_dir(self, timestamp, policy, legacy_durable=False,
partition=0, next_part_power=None,
expect_error=False):
timestamp = Timestamp(timestamp)
df = self._simple_get_diskfile(account='a', container='c',
obj='o_%s' % policy,
policy=policy)
policy=policy,
partition=partition,
next_part_power=next_part_power)
frag_index = None
if policy.policy_type == EC_POLICY:
frag_index = df._frag_index or 7
write_diskfile(df, timestamp, frag_index=frag_index,
legacy_durable=legacy_durable)
if expect_error:
with self.assertRaises(Exception):
write_diskfile(df, timestamp, frag_index=frag_index,
legacy_durable=legacy_durable)
else:
write_diskfile(df, timestamp, frag_index=frag_index,
legacy_durable=legacy_durable)
return df._datadir
def test_commit(self):
@ -3892,7 +3902,77 @@ class DiskFileMixin(BaseDiskFileTestMixin):
for policy in POLICIES:
self._do_test_write_cleanup(policy, legacy_durable=True)
def test_commit_no_extra_fsync(self):
@mock.patch("swift.obj.diskfile.BaseDiskFileManager.cleanup_ondisk_files")
def test_write_cleanup_part_power_increase(self, mock_cleanup):
# Without next_part_power set we expect only one cleanup per DiskFile
# and no linking
for policy in POLICIES:
timestamp = Timestamp(time()).internal
df_dir = self._create_diskfile_dir(timestamp, policy)
self.assertEqual(1, mock_cleanup.call_count)
mock_cleanup.assert_called_once_with(df_dir)
mock_cleanup.reset_mock()
# With next_part_power set to part_power + 1 we expect two cleanups per
# DiskFile: first cleanup the current directory, but also cleanup the
# future directory where hardlinks are created
for policy in POLICIES:
timestamp = Timestamp(time()).internal
df_dir = self._create_diskfile_dir(
timestamp, policy, next_part_power=11)
self.assertEqual(2, mock_cleanup.call_count)
mock_cleanup.assert_any_call(df_dir)
# Make sure the translated path is also cleaned up
expected_fname = utils.replace_partition_in_path(
os.path.join(df_dir, "dummy"), 11)
expected_dir = os.path.dirname(expected_fname)
mock_cleanup.assert_any_call(expected_dir)
mock_cleanup.reset_mock()
# With next_part_power set to part_power we expect two cleanups per
# DiskFile: first cleanup the current directory, but also cleanup the
# previous old directory
for policy in POLICIES:
digest = utils.hash_path(
'a', 'c', 'o_%s' % policy, raw_digest=True)
partition = struct.unpack_from('>I', digest)[0] >> (32 - 10)
timestamp = Timestamp(time()).internal
df_dir = self._create_diskfile_dir(
timestamp, policy, partition=partition, next_part_power=10)
self.assertEqual(2, mock_cleanup.call_count)
mock_cleanup.assert_any_call(df_dir)
# Make sure the path using the old part power is also cleaned up
expected_fname = utils.replace_partition_in_path(
os.path.join(df_dir, "dummy"), 9)
expected_dir = os.path.dirname(expected_fname)
mock_cleanup.assert_any_call(expected_dir)
mock_cleanup.reset_mock()
@mock.patch.object(diskfile.BaseDiskFileManager, 'cleanup_ondisk_files',
side_effect=Exception)
def test_killed_before_cleanup(self, mock_cleanup):
for policy in POLICIES:
timestamp = Timestamp(time()).internal
digest = utils.hash_path(
'a', 'c', 'o_%s' % policy, raw_digest=True)
partition = struct.unpack_from('>I', digest)[0] >> (32 - 10)
df_dir = self._create_diskfile_dir(timestamp, policy,
partition=partition,
next_part_power=11,
expect_error=True)
expected_fname = utils.replace_partition_in_path(
os.path.join(df_dir, "dummy"), 11)
expected_dir = os.path.dirname(expected_fname)
self.assertEqual(os.listdir(df_dir), os.listdir(expected_dir))
def test_commit_fsync(self):
for policy in POLICIES:
df = self._simple_get_diskfile(account='a', container='c',
obj='o', policy=policy)

View File

@ -65,7 +65,26 @@ def mock_ssync_sender(ssync_calls=None, response_callback=None, **kwargs):
yield fake_ssync
def _create_test_rings(path):
def make_ec_archive_bodies(policy, test_body):
segment_size = policy.ec_segment_size
# split up the body into buffers
chunks = [test_body[x:x + segment_size]
for x in range(0, len(test_body), segment_size)]
# encode the buffers into fragment payloads
fragment_payloads = []
for chunk in chunks:
fragments = \
policy.pyeclib_driver.encode(chunk) * policy.ec_duplication_factor
if not fragments:
break
fragment_payloads.append(fragments)
# join up the fragment payloads per node
ec_archive_bodies = [''.join(frags) for frags in zip(*fragment_payloads)]
return ec_archive_bodies
def _create_test_rings(path, next_part_power=None):
testgz = os.path.join(path, 'object.ring.gz')
intended_replica2part2dev_id = [
[0, 1, 2],
@ -87,14 +106,16 @@ def _create_test_rings(path):
with closing(GzipFile(testgz, 'wb')) as f:
pickle.dump(
ring.RingData(intended_replica2part2dev_id,
intended_devs, intended_part_shift),
intended_devs, intended_part_shift,
next_part_power),
f)
testgz = os.path.join(path, 'object-1.ring.gz')
with closing(GzipFile(testgz, 'wb')) as f:
pickle.dump(
ring.RingData(intended_replica2part2dev_id,
intended_devs, intended_part_shift),
intended_devs, intended_part_shift,
next_part_power),
f)
@ -1218,6 +1239,19 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
self.assertEqual(self.reconstructor.suffix_count, 0)
self.assertEqual(len(found_jobs), 6)
def test_reconstructor_skipped_partpower_increase(self):
self.reconstructor._reset_stats()
_create_test_rings(self.testdir, 10)
# Enforce re-reading the EC ring
POLICIES[1].object_ring = ring.Ring(self.testdir, ring_name='object-1')
self.reconstructor.reconstruct()
self.assertEqual(0, self.reconstructor.reconstruction_count)
warnings = self.reconstructor.logger.get_lines_for_level('warning')
self.assertIn(
"next_part_power set in policy 'one'. Skipping", warnings)
class TestGlobalSetupObjectReconstructorLegacyDurable(
TestGlobalSetupObjectReconstructor):

View File

@ -131,7 +131,7 @@ def _mock_process(ret):
object_replicator.subprocess.Popen = orig_process
def _create_test_rings(path, devs=None):
def _create_test_rings(path, devs=None, next_part_power=None):
testgz = os.path.join(path, 'object.ring.gz')
intended_replica2part2dev_id = [
[0, 1, 2, 3, 4, 5, 6],
@ -159,14 +159,14 @@ def _create_test_rings(path, devs=None):
with closing(GzipFile(testgz, 'wb')) as f:
pickle.dump(
ring.RingData(intended_replica2part2dev_id,
intended_devs, intended_part_shift),
intended_devs, intended_part_shift, next_part_power),
f)
testgz = os.path.join(path, 'object-1.ring.gz')
with closing(GzipFile(testgz, 'wb')) as f:
pickle.dump(
ring.RingData(intended_replica2part2dev_id,
intended_devs, intended_part_shift),
intended_devs, intended_part_shift, next_part_power),
f)
for policy in POLICIES:
policy.object_ring = None # force reload
@ -1959,6 +1959,15 @@ class TestObjectReplicator(unittest.TestCase):
# After 10 cycles every partition is seen exactly once
self.assertEqual(sorted(range(partitions)), sorted(seen))
def test_replicate_skipped_partpower_increase(self):
_create_test_rings(self.testdir, next_part_power=4)
self.replicator.replicate()
self.assertEqual(0, self.replicator.job_count)
self.assertEqual(0, self.replicator.replication_count)
warnings = self.logger.get_lines_for_level('warning')
self.assertIn(
"next_part_power set in policy 'one'. Skipping", warnings)
if __name__ == '__main__':
unittest.main()