Merge branch 'master' into feature/hummingbird

Change-Id: Ifc5982eb3b07d7512d7c0657005ab81f4d1dfca6
This commit is contained in:
Michael Barton 2015-07-15 18:28:05 +00:00
commit f7cb1777e1
159 changed files with 9859 additions and 3795 deletions

View File

@ -51,7 +51,8 @@ Tom Fifield <tom@openstack.org> Tom Fifield <fifieldt@unimelb.edu.au>
Sascha Peilicke <saschpe@gmx.de> Sascha Peilicke <saschpe@suse.de>
Zhenguo Niu <zhenguo@unitedstack.com> <Niu.ZGlinux@gmail.com>
Peter Portante <peter.portante@redhat.com> <peter.a.portante@gmail.com>
Christian Schwede <christian.schwede@enovance.com> <info@cschwede.de>
Christian Schwede <cschwede@redhat.com> <info@cschwede.de>
Christian Schwede <cschwede@redhat.com> <christian.schwede@enovance.com>
Constantine Peresypkin <constantine.peresypk@rackspace.com> <constantine@litestack.com>
Madhuri Kumari <madhuri.rai07@gmail.com> madhuri <madhuri@madhuri-VirtualBox.(none)>
Morgan Fainberg <morgan.fainberg@gmail.com> <m@metacloud.com>
@ -72,3 +73,8 @@ Eohyung Lee <liquidnuker@gmail.com> <liquid@kt.com>
Harshit Chitalia <harshit@acelio.com> <harshit@acelio.com>
Richard Hawkins <richard.hawkins@rackspace.com>
Sarvesh Ranjan <saranjan@cisco.com>
Minwoo Bae <minwoob@us.ibm.com> Minwoo B
Jaivish Kothari <jaivish.kothari@nectechnologies.in> <janonymous.codevulture@gmail.com>
Michael Matur <michael.matur@gmail.com>
Kazuhiro Miyahara <miyahara.kazuhiro@lab.ntt.co.jp>
Alexandra Settle <alexandra.settle@rackspace.com>

14
AUTHORS
View File

@ -29,6 +29,7 @@ Mehdi Abaakouk (mehdi.abaakouk@enovance.com)
Jesse Andrews (anotherjesse@gmail.com)
Joe Arnold (joe@swiftstack.com)
Ionuț Arțăriși (iartarisi@suse.cz)
Minwoo Bae (minwoob@us.ibm.com)
Bob Ball (bob.ball@citrix.com)
Christian Berendt (berendt@b1-systems.de)
Luis de Bethencourt (luis@debethencourt.com)
@ -42,9 +43,11 @@ Pádraig Brady (pbrady@redhat.com)
Lorcan Browne (lorcan.browne@hp.com)
Russell Bryant (rbryant@redhat.com)
Jay S. Bryant (jsbryant@us.ibm.com)
Tim Burke (tim.burke@gmail.com)
Brian D. Burns (iosctr@gmail.com)
Devin Carlen (devin.carlen@gmail.com)
Thierry Carrez (thierry@openstack.org)
Emmanuel Cazenave (contact@emcaz.fr)
Mahati Chamarthy (mahati.chamarthy@gmail.com)
Zap Chang (zapchang@gmail.com)
François Charlier (francois.charlier@enovance.com)
@ -88,6 +91,7 @@ Dan Hersam (dan.hersam@hp.com)
Derek Higgins (derekh@redhat.com)
Alex Holden (alex@alexjonasholden.com)
Edward Hope-Morley (opentastic@gmail.com)
Joanna H. Huang (joanna.huitzu.huang@gmail.com)
Kun Huang (gareth@unitedstack.com)
Matthieu Huin (mhu@enovance.com)
Hodong Hwang (hodong.hwang@kt.com)
@ -111,6 +115,7 @@ Nathan Kinder (nkinder@redhat.com)
Eugene Kirpichov (ekirpichov@gmail.com)
Leah Klearman (lklrmn@gmail.com)
Martin Kletzander (mkletzan@redhat.com)
Jaivish Kothari (jaivish.kothari@nectechnologies.in)
Steve Kowalik (steven@wedontsleep.org)
Sergey Kraynev (skraynev@mirantis.com)
Sushil Kumar (sushil.kumar2@globallogic.com)
@ -135,6 +140,7 @@ Steve Martinelli (stevemar@ca.ibm.com)
Juan J. Martinez (juan@memset.com)
Marcelo Martins (btorch@gmail.com)
Dolph Mathews (dolph.mathews@gmail.com)
Michael Matur (michael.matur@gmail.com)
Donagh McCabe (donagh.mccabe@hp.com)
Andy McCrae (andy.mccrae@gmail.com)
Paul McMillan (paul.mcmillan@nebula.com)
@ -142,6 +148,7 @@ Ewan Mellor (ewan.mellor@citrix.com)
Samuel Merritt (sam@swiftstack.com)
Stephen Milton (milton@isomedia.com)
Jola Mirecka (jola.mirecka@hp.com)
Kazuhiro Miyahara (miyahara.kazuhiro@lab.ntt.co.jp)
Daisuke Morita (morita.daisuke@lab.ntt.co.jp)
Dirk Mueller (dirk@dmllr.de)
Russ Nelson (russ@crynwr.com)
@ -161,6 +168,7 @@ Sascha Peilicke (saschpe@gmx.de)
Constantine Peresypkin (constantine.peresypk@rackspace.com)
Dieter Plaetinck (dieter@vimeo.com)
Dan Prince (dprince@redhat.com)
Sarvesh Ranjan (saranjan@cisco.com)
Felipe Reyes (freyes@tty.cl)
Janie Richling (jrichli@us.ibm.com)
Matt Riedemann (mriedem@us.ibm.com)
@ -171,9 +179,9 @@ Aaron Rosen (arosen@nicira.com)
Brent Roskos (broskos@internap.com)
Shilla Saebi (shilla.saebi@gmail.com)
Cristian A Sanchez (cristian.a.sanchez@intel.com)
Sarvesh Ranjan (saranjan@cisco.com)
Christian Schwede (christian.schwede@enovance.com)
Christian Schwede (cschwede@redhat.com)
Mark Seger (Mark.Seger@hp.com)
Alexandra Settle (alexandra.settle@rackspace.com)
Andrew Clay Shafer (acs@parvuscaptus.com)
Mitsuhiro SHIGEMATSU (shigematsu.mitsuhiro@lab.ntt.co.jp)
Dhriti Shikhar (dhrish20@gmail.com)
@ -181,6 +189,7 @@ Chuck Short (chuck.short@canonical.com)
Michael Shuler (mshuler@gmail.com)
David Moreau Simard (dmsimard@iweb.com)
Scott Simpson (sasimpson@gmail.com)
Pradeep Kumar Singh (pradeep.singh@nectechnologies.in)
Liu Siqi (meizu647@gmail.com)
Adrian Smith (adrian_f_smith@dell.com)
Jon Snitow (otherjon@swiftstack.com)
@ -188,6 +197,7 @@ TheSriram (sriram@klusterkloud.com)
Jeremy Stanley (fungi@yuggoth.org)
Mauro Stettler (mauro.stettler@gmail.com)
Tobias Stevenson (tstevenson@vbridges.com)
Victor Stinner (vstinner@redhat.com)
Pearl Yajing Tan (pearl.y.tan@seagate.com)
Yuriy Taraday (yorik.sar@gmail.com)
Monty Taylor (mordred@inaugust.com)

View File

@ -20,7 +20,7 @@ from cStringIO import StringIO
from optparse import OptionParser
from sys import exit, stdout
from time import time
from six.moves import range
from eventlet import GreenPool, patcher, sleep
from eventlet.pools import Pool
@ -31,16 +31,17 @@ except ImportError:
from swift.common.internal_client import SimpleClient
from swift.common.ring import Ring
from swift.common.utils import compute_eta, get_time_units, config_true_value
from swift.common.storage_policy import POLICIES
insecure = False
def put_container(connpool, container, report):
def put_container(connpool, container, report, headers):
global retries_done
try:
with connpool.item() as conn:
conn.put_container(container)
conn.put_container(container, headers=headers)
retries_done += conn.attempts - 1
if report:
report(True)
@ -105,6 +106,9 @@ Usage: %%prog [options] [conf_file]
help='No overlap of partitions if running populate \
more than once. Will increase coverage by amount shown \
in dispersion.conf file')
parser.add_option('-P', '--policy-name', dest='policy_name',
help="Specify storage policy name")
options, args = parser.parse_args()
if args:
@ -114,6 +118,15 @@ Usage: %%prog [options] [conf_file]
if not c.read(conffile):
exit('Unable to read config file: %s' % conffile)
conf = dict(c.items('dispersion'))
if options.policy_name is None:
policy = POLICIES.default
else:
policy = POLICIES.get_by_name(options.policy_name)
if policy is None:
exit('Unable to find policy: %s' % options.policy_name)
print 'Using storage policy: %s ' % policy.name
swift_dir = conf.get('swift_dir', '/etc/swift')
dispersion_coverage = float(conf.get('dispersion_coverage', 1))
retries = int(conf.get('retries', 5))
@ -141,18 +154,20 @@ Usage: %%prog [options] [conf_file]
insecure=insecure)
account = url.rsplit('/', 1)[1]
connpool = Pool(max_size=concurrency)
headers = {}
headers['X-Storage-Policy'] = policy.name
connpool.create = lambda: SimpleClient(
url=url, token=token, retries=retries)
if container_populate:
container_ring = Ring(swift_dir, ring_name='container')
parts_left = dict((x, x)
for x in xrange(container_ring.partition_count))
for x in range(container_ring.partition_count))
if options.no_overlap:
with connpool.item() as conn:
containers = [cont['name'] for cont in conn.get_account(
prefix='dispersion_', full_listing=True)[1]]
prefix='dispersion_%d' % policy.idx, full_listing=True)[1]]
containers_listed = len(containers)
if containers_listed > 0:
for container in containers:
@ -170,11 +185,12 @@ Usage: %%prog [options] [conf_file]
next_report += 2
suffix = 0
while need_to_queue >= 1 and parts_left:
container = 'dispersion_%d' % suffix
container = 'dispersion_%d_%d' % (policy.idx, suffix)
part = container_ring.get_part(account, container)
if part in parts_left:
if suffix >= options.container_suffix_start:
coropool.spawn(put_container, connpool, container, report)
coropool.spawn(put_container, connpool, container, report,
headers)
sleep()
else:
report(True)
@ -195,10 +211,10 @@ Usage: %%prog [options] [conf_file]
stdout.flush()
if object_populate:
container = 'dispersion_objects'
put_container(connpool, container, None)
object_ring = Ring(swift_dir, ring_name='object')
parts_left = dict((x, x) for x in xrange(object_ring.partition_count))
container = 'dispersion_objects_%d' % policy.idx
put_container(connpool, container, None, headers)
object_ring = Ring(swift_dir, ring_name=policy.ring_name)
parts_left = dict((x, x) for x in range(object_ring.partition_count))
if options.no_overlap:
with connpool.item() as conn:

View File

@ -36,6 +36,7 @@ from swift.common.internal_client import SimpleClient
from swift.common.ring import Ring
from swift.common.exceptions import ClientException
from swift.common.utils import compute_eta, get_time_units, config_true_value
from swift.common.storage_policy import POLICIES
unmounted = []
@ -73,10 +74,10 @@ def get_error_log(prefix):
def container_dispersion_report(coropool, connpool, account, container_ring,
retries, output_missing_partitions):
retries, output_missing_partitions, policy):
with connpool.item() as conn:
containers = [c['name'] for c in conn.get_account(
prefix='dispersion_', full_listing=True)[1]]
prefix='dispersion_%d' % policy.idx, full_listing=True)[1]]
containers_listed = len(containers)
if not containers_listed:
print >>stderr, 'No containers to query. Has ' \
@ -148,7 +149,7 @@ def container_dispersion_report(coropool, connpool, account, container_ring,
if containers_listed - distinct_partitions:
print 'There were %d overlapping partitions' % (
containers_listed - distinct_partitions)
for missing_copies, num_parts in container_copies_missing.iteritems():
for missing_copies, num_parts in container_copies_missing.items():
print missing_string(num_parts, missing_copies,
container_ring.replica_count)
print '%.02f%% of container copies found (%d of %d)' % (
@ -163,14 +164,14 @@ def container_dispersion_report(coropool, connpool, account, container_ring,
'pct_found': value,
'copies_found': copies_found,
'copies_expected': copies_expected}
for missing_copies, num_parts in container_copies_missing.iteritems():
for missing_copies, num_parts in container_copies_missing.items():
results['missing_%d' % (missing_copies)] = num_parts
return results
def object_dispersion_report(coropool, connpool, account, object_ring,
retries, output_missing_partitions):
container = 'dispersion_objects'
retries, output_missing_partitions, policy):
container = 'dispersion_objects_%d' % policy.idx
with connpool.item() as conn:
try:
objects = [o['name'] for o in conn.get_container(
@ -196,6 +197,11 @@ def object_dispersion_report(coropool, connpool, account, object_ring,
begun = time()
next_report = [time() + 2]
headers = None
if policy is not None:
headers = {}
headers['X-Backend-Storage-Policy-Index'] = int(policy)
def direct(obj, part, nodes):
found_count = 0
for node in nodes:
@ -203,7 +209,8 @@ def object_dispersion_report(coropool, connpool, account, object_ring,
try:
attempts, _junk = direct_client.retry(
direct_client.direct_head_object, node, part, account,
container, obj, error_log=error_log, retries=retries)
container, obj, error_log=error_log, retries=retries,
headers=headers)
retries_done[0] += attempts - 1
found_count += 1
except ClientException as err:
@ -253,7 +260,7 @@ def object_dispersion_report(coropool, connpool, account, object_ring,
print 'There were %d overlapping partitions' % (
objects_listed - distinct_partitions)
for missing_copies, num_parts in object_copies_missing.iteritems():
for missing_copies, num_parts in object_copies_missing.items():
print missing_string(num_parts, missing_copies,
object_ring.replica_count)
@ -270,7 +277,7 @@ def object_dispersion_report(coropool, connpool, account, object_ring,
'copies_found': copies_found,
'copies_expected': copies_expected}
for missing_copies, num_parts in object_copies_missing.iteritems():
for missing_copies, num_parts in object_copies_missing.items():
results['missing_%d' % (missing_copies,)] = num_parts
return results
@ -290,9 +297,9 @@ def missing_string(partition_count, missing_copies, copy_count):
verb_string = 'were'
partition_string = 'partitions'
copy_string = 'copy'
if missing_copies > 1:
copy_string = 'copies'
copy_string = 'copies'
if missing_copies == 1:
copy_string = 'copy'
return '%sThere %s %d %s missing %s %s.' % (
exclamations, verb_string, partition_count, partition_string,
@ -323,6 +330,9 @@ Usage: %%prog [options] [conf_file]
parser.add_option('--insecure', action='store_true', default=False,
help='Allow accessing insecure keystone server. '
'The keystone\'s certificate will not be verified.')
parser.add_option('-P', '--policy-name', dest='policy_name',
help="Specify storage policy name")
options, args = parser.parse_args()
if args:
@ -332,6 +342,15 @@ Usage: %%prog [options] [conf_file]
if not c.read(conffile):
exit('Unable to read config file: %s' % conffile)
conf = dict(c.items('dispersion'))
if options.policy_name is None:
policy = POLICIES.default
else:
policy = POLICIES.get_by_name(options.policy_name)
if policy is None:
exit('Unable to find policy: %s' % options.policy_name)
print 'Using storage policy: %s ' % policy.name
swift_dir = conf.get('swift_dir', '/etc/swift')
retries = int(conf.get('retries', 5))
concurrency = int(conf.get('concurrency', 25))
@ -364,16 +383,16 @@ Usage: %%prog [options] [conf_file]
url=url, token=token, retries=retries)
container_ring = Ring(swift_dir, ring_name='container')
object_ring = Ring(swift_dir, ring_name='object')
object_ring = Ring(swift_dir, ring_name=policy.ring_name)
output = {}
if container_report:
output['container'] = container_dispersion_report(
coropool, connpool, account, container_ring, retries,
options.partitions)
options.partitions, policy)
if object_report:
output['object'] = object_dispersion_report(
coropool, connpool, account, object_ring, retries,
options.partitions)
options.partitions, policy)
if json_output:
print json.dumps(output)

View File

@ -26,7 +26,7 @@ if __name__ == '__main__':
usage = '''
Shows the nodes responsible for the item specified.
Usage: %prog [-a] <ring.gz> <account> [<container>] [<object>]
Usage: %prog [-a] <ring.gz> <account> [<container> [<object>]]
Or: %prog [-a] <ring.gz> -p partition
Or: %prog [-a] -P policy_name <account> <container> <object>
Note: account, container, object can also be a single arg separated by /

22
bin/swift-ring-builder-analyzer Executable file
View File

@ -0,0 +1,22 @@
#!/usr/bin/python
# Copyright (c) 2015 Samuel Merritt <sam@swiftstack.com>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
from swift.cli.ring_builder_analyzer import main
if __name__ == "__main__":
sys.exit(main())

View File

@ -185,7 +185,7 @@ This caps how long the replicator will spend trying to sync a given database per
.IP \fBconcurrency\fR
Number of replication workers to spawn. The default is 8.
.IP "\fBrun_pause [deprecated]\fR"
Time in seconds to wait between replication passes. The default is 10.
Time in seconds to wait between replication passes. The default is 30.
.IP \fBinterval\fR
Replaces run_pause with the more standard "interval", which means the replicator won't pause unless it takes less than the interval set. The default is 30.
.IP \fBerror_suppression_interval\fR

View File

@ -191,7 +191,7 @@ This caps how long the replicator will spend trying to sync a given database per
.IP \fBconcurrency\fR
Number of replication workers to spawn. The default is 8.
.IP "\fBrun_pause [deprecated]\fR"
Time in seconds to wait between replication passes. The default is 10.
Time in seconds to wait between replication passes. The default is 30.
.IP \fBinterval\fR
Replaces run_pause with the more standard "interval", which means the replicator won't pause unless it takes less than the interval set. The default is 30.
.IP \fBnode_timeout\fR

View File

@ -187,7 +187,9 @@ Logging address. The default is /dev/log.
Indicates that you are using a VM environment. The default is no.
.IP \fBdaemonize\fR
Whether or not to run replication as a daemon. The default is yes.
.IP \fBrun_pause\fR
.IP "\fBrun_pause [deprecated]\fR"
Time in seconds to wait between replication passes. The default is 30.
.IP \fBinterval\fR
Time in seconds to wait between replication passes. The default is 30.
.IP \fBconcurrency\fR
Number of replication workers to spawn. The default is 1.

View File

@ -296,9 +296,13 @@ Browsers can convert a host header to lowercase, so check that reseller
prefix on the account is the correct case. This is done by comparing the
items in the reseller_prefixes config option to the found prefix. If they
match except for case, the item from reseller_prefixes will be used
instead of the found reseller prefix. The reseller_prefixes list is exclusive.
If defined, any request with an account prefix not in that list will be ignored
by this middleware. Defaults to 'AUTH'.
instead of the found reseller prefix. When none match, the default reseller
prefix is used. When no default reseller prefix is configured, any request with
an account prefix not in that list will be ignored by this middleware.
Defaults to 'AUTH'.
.IP \fBdefault_reseller_prefix\fR
The default reseller prefix. This is used when none of the configured
reseller_prefixes match. When not set, no reseller prefix is added.
.RE

View File

@ -25,7 +25,7 @@
.SH SYNOPSIS
.LP
.B swift-get-nodes
\ <ring.gz> <account> [<container>] [<object>]
\ <ring.gz> <account> [<container> [<object>]]
.SH DESCRIPTION
.PP

View File

@ -142,7 +142,7 @@ could take a while to run.
.RE
.IP "\fBrebalence\fR"
.IP "\fBrebalance\fR"
.RS 5
Attempts to rebalance the ring by reassigning partitions that haven't been recently reassigned.
.RE

View File

@ -339,6 +339,12 @@ allows it to be more easily consumed by third party utilities::
$ swift-dispersion-report -j
{"object": {"retries:": 0, "missing_two": 0, "copies_found": 7863, "missing_one": 0, "copies_expected": 7863, "pct_found": 100.0, "overlapping": 0, "missing_all": 0}, "container": {"retries:": 0, "missing_two": 0, "copies_found": 12534, "missing_one": 0, "copies_expected": 12534, "pct_found": 100.0, "overlapping": 15, "missing_all": 0}}
Note that you may select which storage policy to use by setting the option
'--policy-name silver' or '-P silver' (silver is the example policy name here).
If no policy is specified, the default will be used per the swift.conf file.
When you specify a policy the containers created also include the policy index,
thus even when running a container_only report, you will need to specify the
policy not using the default.
-----------------------------------
Geographically Distributed Clusters
@ -896,6 +902,31 @@ Metric Name Description
including ones resulting in an error.
======================== ====================================================
Metrics for `object-reconstructor`:
====================================================== ======================================================
Metric Name Description
------------------------------------------------------ ------------------------------------------------------
`object-reconstructor.partition.delete.count.<device>` A count of partitions on <device> which were
reconstructed and synced to another node because they
didn't belong on this node. This metric is tracked
per-device to allow for "quiescence detection" for
object reconstruction activity on each device.
`object-reconstructor.partition.delete.timing` Timing data for partitions reconstructed and synced to
another node because they didn't belong on this node.
This metric is not tracked per device.
`object-reconstructor.partition.update.count.<device>` A count of partitions on <device> which were
reconstructed and synced to another node, but also
belong on this node. As with delete.count, this metric
is tracked per-device.
`object-reconstructor.partition.update.timing` Timing data for partitions reconstructed which also
belong on this node. This metric is not tracked
per-device.
`object-reconstructor.suffix.hashes` Count of suffix directories whose hash (of filenames)
was recalculated.
`object-reconstructor.suffix.syncs` Count of suffix directories reconstructed with ssync.
====================================================== ======================================================
Metrics for `object-replicator`:
=================================================== ====================================================

View File

@ -5,7 +5,7 @@ Form POST middleware
====================
To discover whether your Object Storage system supports this feature,
check with your service provider or send a **GET** request using the ``/info``
check with your service provider or send a **GET** request using the :file:`/info`
path.
You can upload objects directly to the Object Storage system from a
@ -35,7 +35,7 @@ The format of the form **POST** request is:
.. code::
&lt;![CDATA[
<![CDATA[
<form action="SWIFT_URL"
method="POST"
enctype="multipart/form-data">
@ -48,13 +48,13 @@ The format of the form **POST** request is:
<br/>
<input type="submit"/>
</form>
]]&gt;
]]>
**``action="SWIFT_URL``"**
**action="SWIFT_URL"**
Set to full URL where the objects are to be uploaded. The names of
uploaded files are appended to the specified *``SWIFT_URL``*. So, you
uploaded files are appended to the specified *SWIFT_URL*. So, you
can upload directly to the root of a container with a URL like:
.. code::
@ -79,39 +79,39 @@ Must be ``POST``.
Must be ``multipart/form-data``.
**name="redirect" value="*``REDIRECT_URL``*\ "**
**name="redirect" value="REDIRECT_URL"**
Redirects the browser to the *``REDIRECT_URL``* after the upload
Redirects the browser to the *REDIRECT_URL* after the upload
completes. The URL has status and message query parameters added to it,
which specify the HTTP status code for the upload and an optional error
message. The 2\ *``nn``* status code indicates success.
message. The 2\ *nn* status code indicates success.
The *``REDIRECT_URL``* can be an empty string. If so, the ``Location``
The *REDIRECT_URL* can be an empty string. If so, the ``Location``
response header is not set.
**name="max\_file\_size" value="*``BYTES``*\ "**
**name="max\_file\_size" value="BYTES"**
Required. Indicates the size, in bytes, of the maximum single file
upload.
**name="max\_file\_count" value= "*``COUNT``*\ "**
**name="max\_file\_count" value= "COUNT"**
Required. Indicates the maximum number of files that can be uploaded
with the form.
**name="expires" value="*``UNIX_TIMESTAMP``*\ "**
**name="expires" value="UNIX_TIMESTAMP"**
The UNIX timestamp that specifies the time before which the form must be
submitted before it becomes no longer valid.
**name="signature" value="*``HMAC``*\ "**
**name="signature" value="HMAC"**
The HMAC-SHA1 signature of the form.
**type="file" name="*``FILE_NAME``*\ "**
**type="file" name="FILE_NAME"**
File name of the file to be uploaded. You can include from one to the
``max_file_count`` value of files.
@ -127,7 +127,7 @@ follow the file attributes are ignored.
Optionally, if you want the uploaded files to be temporary you can set x-delete-at or x-delete-after attributes by adding one of these as a form input:
..code::
.. code::
<input type="hidden" name="x_delete_at" value="<unix-timestamp>" />
<input type="hidden" name="x_delete_after" value="<seconds>" />
@ -144,7 +144,7 @@ Form **POST** middleware uses an HMAC-SHA1 cryptographic signature. This
signature includes these elements from the form:
- The path. Starting with ``/v1/`` onwards and including a container
name and, optionally, an object prefix. In `Example 1.15, “HMAC-SHA1
name and, optionally, an object prefix. In `Example 1.15`, “HMAC-SHA1
signature for form
POST” the path is
``/v1/my_account/container/object_prefix``. Do not URL-encode the
@ -152,11 +152,11 @@ signature includes these elements from the form:
- A redirect URL. If there is no redirect URL, use the empty string.
- Maximum file size. In `Example 1.15, “HMAC-SHA1 signature for form
- Maximum file size. In `Example 1.15`, “HMAC-SHA1 signature for form
POST” the
``max_file_size`` is ``104857600`` bytes.
- The maximum number of objects to upload. In `Example 1.15, “HMAC-SHA1
- The maximum number of objects to upload. In `Example 1.15`, “HMAC-SHA1
signature for form
POST” ``max_file_count`` is ``10``.

View File

@ -133,21 +133,16 @@ or ends.
Object Storage HTTP requests have the following default constraints.
Your service provider might use different default values.
==== ============= =====
============================ ============= =====
Item Maximum value Notes
==== ============= =====
============================ ============= =====
Number of HTTP headers 90
Length of HTTP headers 4096 bytes
Length per HTTP request line 8192 bytes
Length of HTTP request 5 GB
Length of container names 256 bytes Cannot contain the ``/`` character.
Length of object names 1024 bytes By default, there are no character restrictions.
============================ ============= =====
You must UTF-8-encode and then URL-encode container and object names
before you call the API binding. If you use an API binding that performs

View File

@ -48,6 +48,7 @@ Monitoring & Statistics
-----------------------
* `Swift Informant <https://github.com/pandemicsyn/swift-informant>`_ - Swift Proxy Middleware to send events to a statsd instance.
* `Swift Inspector <https://github.com/hurricanerix/swift-inspector>`_ - Swift middleware to relay information about a request back to the client.
Content Distribution Network Integration
@ -108,3 +109,4 @@ Other
* `liberasurecode <http://www.bytebucket.org/tsg-/liberasurecode>`_ - Low Level Erasure Code library used by PyECLib
* `Swift Browser <https://github.com/zerovm/swift-browser>`_ - JavaScript interface for Swift
* `swift-ui <https://github.com/fanatic/swift-ui>`_ - OpenStack Swift web browser
* `Swift Durability Calculator <https://github.com/enovance/swift-durability-calculator>`_ - Data Durability Calculation Tool for Swift

View File

@ -139,6 +139,72 @@ swift-ring-builder with no options will display help text with available
commands and options. More information on how the ring works internally
can be found in the :doc:`Ring Overview <overview_ring>`.
.. _server-per-port-configuration:
-------------------------------
Running object-servers Per Disk
-------------------------------
The lack of true asynchronous file I/O on Linux leaves the object-server
workers vulnerable to misbehaving disks. Because any object-server worker can
service a request for any disk, and a slow I/O request blocks the eventlet hub,
a single slow disk can impair an entire storage node. This also prevents
object servers from fully utilizing all their disks during heavy load.
The :ref:`threads_per_disk <object-server-options>` option was one way to
address this, but came with severe performance overhead which was worse
than the benefit of I/O isolation. Any clusters using threads_per_disk should
switch to using `servers_per_port`.
Another way to get full I/O isolation is to give each disk on a storage node a
different port in the storage policy rings. Then set the
:ref:`servers_per_port <object-server-default-options>`
option in the object-server config. NOTE: while the purpose of this config
setting is to run one or more object-server worker processes per *disk*, the
implementation just runs object-servers per unique port of local devices in the
rings. The deployer must combine this option with appropriately-configured
rings to benefit from this feature.
Here's an example (abbreviated) old-style ring (2 node cluster with 2 disks
each)::
Devices: id region zone ip address port replication ip replication port name
0 1 1 1.1.0.1 6000 1.1.0.1 6000 d1
1 1 1 1.1.0.1 6000 1.1.0.1 6000 d2
2 1 2 1.1.0.2 6000 1.1.0.2 6000 d3
3 1 2 1.1.0.2 6000 1.1.0.2 6000 d4
And here's the same ring set up for `servers_per_port`::
Devices: id region zone ip address port replication ip replication port name
0 1 1 1.1.0.1 6000 1.1.0.1 6000 d1
1 1 1 1.1.0.1 6001 1.1.0.1 6001 d2
2 1 2 1.1.0.2 6000 1.1.0.2 6000 d3
3 1 2 1.1.0.2 6001 1.1.0.2 6001 d4
When migrating from normal to `servers_per_port`, perform these steps in order:
#. Upgrade Swift code to a version capable of doing `servers_per_port`.
#. Enable `servers_per_port` with a > 0 value
#. Restart `swift-object-server` processes with a SIGHUP. At this point, you
will have the `servers_per_port` number of `swift-object-server` processes
serving all requests for all disks on each node. This preserves
availability, but you should perform the next step as quickly as possible.
#. Push out new rings that actually have different ports per disk on each
server. One of the ports in the new ring should be the same as the port
used in the old ring ("6000" in the example above). This will cover
existing proxy-server processes who haven't loaded the new ring yet. They
can still talk to any storage node regardless of whether or not that
storage node has loaded the ring and started object-server processes on the
new ports.
If you do not run a separate object-server for replication, then this setting
must be available to the object-replicator and object-reconstructor (i.e.
appear in the [DEFAULT] config section).
.. _general-service-configuration:
-----------------------------
@ -149,14 +215,14 @@ Most Swift services fall into two categories. Swift's wsgi servers and
background daemons.
For more information specific to the configuration of Swift's wsgi servers
with paste deploy see :ref:`general-server-configuration`
with paste deploy see :ref:`general-server-configuration`.
Configuration for servers and daemons can be expressed together in the same
file for each type of server, or separately. If a required section for the
service trying to start is missing there will be an error. The sections not
used by the service are ignored.
Consider the example of an object storage node. By convention configuration
Consider the example of an object storage node. By convention, configuration
for the object-server, object-updater, object-replicator, and object-auditor
exist in a single file ``/etc/swift/object-server.conf``::
@ -323,7 +389,7 @@ max_header_size 8192 max_header_size is the max number of bytes in
tokens including more than 7 catalog entries.
See also include_service_catalog in
proxy-server.conf-sample (documented in
overview_auth.rst)
overview_auth.rst).
=================== ========== =============================================
---------------------------
@ -335,6 +401,8 @@ etc/object-server.conf-sample in the source code repository.
The following configuration options are available:
.. _object-server-default-options:
[DEFAULT]
=================== ========== =============================================
@ -353,12 +421,30 @@ workers auto Override the number of pre-forked workers
should be an integer, zero means no fork. If
unset, it will try to default to the number
of effective cpu cores and fallback to one.
Increasing the number of workers may reduce
the possibility of slow file system
operations in one request from negatively
impacting other requests, but may not be as
efficient as tuning :ref:`threads_per_disk
<object-server-options>`
Increasing the number of workers helps slow
filesystem operations in one request from
negatively impacting other requests, but only
the :ref:`servers_per_port
<server-per-port-configuration>`
option provides complete I/O isolation with
no measurable overhead.
servers_per_port 0 If each disk in each storage policy ring has
unique port numbers for its "ip" value, you
can use this setting to have each
object-server worker only service requests
for the single disk matching the port in the
ring. The value of this setting determines
how many worker processes run for each port
(disk) in the ring. If you have 24 disks
per server, and this setting is 4, then
each storage node will have 1 + (24 * 4) =
97 total object-server processes running.
This gives complete I/O isolation, drastically
reducing the impact of slow disks on storage
node performance. The object-replicator and
object-reconstructor need to see this setting
too, so it must be in the [DEFAULT] section.
See :ref:`server-per-port-configuration`.
max_clients 1024 Maximum number of clients one worker can
process simultaneously (it will actually
accept(2) N + 1). Setting this to one (1)
@ -421,13 +507,12 @@ keep_cache_private false Allow non-public objects to stay
threads_per_disk 0 Size of the per-disk thread pool
used for performing disk I/O. The
default of 0 means to not use a
per-disk thread pool. It is
recommended to keep this value
small, as large values can result
in high read latencies due to
large queue depths. A good
starting point is 4 threads per
disk.
per-disk thread pool.
This option is no longer
recommended and the
:ref:`servers_per_port
<server-per-port-configuration>`
should be used instead.
replication_concurrency 4 Set to restrict the number of
concurrent incoming REPLICATION
requests; set to 0 for unlimited
@ -465,7 +550,7 @@ log_facility LOG_LOCAL0 Syslog log facility
log_level INFO Logging level
daemonize yes Whether or not to run replication as a
daemon
run_pause 30 Time in seconds to wait between
interval 30 Time in seconds to wait between
replication passes
concurrency 1 Number of replication workers to spawn
timeout 5 Timeout value sent to rsync --timeout
@ -562,7 +647,7 @@ workers auto Override the number of pre-forked workers
the possibility of slow file system
operations in one request from negatively
impacting other requests. See
:ref:`general-service-tuning`
:ref:`general-service-tuning`.
max_clients 1024 Maximum number of clients one worker can
process simultaneously (it will actually
accept(2) N + 1). Setting this to one (1)
@ -614,7 +699,7 @@ log_level INFO Logging level
per_diff 1000
concurrency 8 Number of replication workers to
spawn
run_pause 30 Time in seconds to wait between
interval 30 Time in seconds to wait between
replication passes
node_timeout 10 Request timeout to external services
conn_timeout 0.5 Connection timeout to external
@ -690,7 +775,7 @@ workers auto Override the number of pre-forked workers
the possibility of slow file system
operations in one request from negatively
impacting other requests. See
:ref:`general-service-tuning`
:ref:`general-service-tuning`.
max_clients 1024 Maximum number of clients one worker can
process simultaneously (it will actually
accept(2) N + 1). Setting this to one (1)
@ -742,7 +827,7 @@ log_facility LOG_LOCAL0 Syslog log facility
log_level INFO Logging level
per_diff 1000
concurrency 8 Number of replication workers to spawn
run_pause 30 Time in seconds to wait between
interval 30 Time in seconds to wait between
replication passes
node_timeout 10 Request timeout to external services
conn_timeout 0.5 Connection timeout to external services
@ -813,7 +898,7 @@ workers auto Override the number of
will try to default to the
number of effective cpu cores
and fallback to one. See
:ref:`general-service-tuning`
:ref:`general-service-tuning`.
max_clients 1024 Maximum number of clients one
worker can process
simultaneously (it will
@ -1149,6 +1234,16 @@ the system. If your filesystem does not support `fallocate()` or
`posix_fallocate()`, be sure to set the `disable_fallocate = true` config
parameter in account, container, and object server configs.
Most current Linux distributions ship with a default installation of updatedb.
This tool runs periodically and updates the file name database that is used by
the GNU locate tool. However, including Swift object and container database
files is most likely not required and the periodic update affects the
performance quite a bit. To disable the inclusion of these files add the path
where Swift stores its data to the setting PRUNEPATHS in `/etc/updatedb.conf`::
PRUNEPATHS="... /tmp ... /var/spool ... /srv/node"
---------------------
General System Tuning
---------------------

View File

@ -44,12 +44,23 @@ To execute the unit tests:
If you installed using: `cd ~/swift; sudo python setup.py develop`,
you may need to do: `cd ~/swift; sudo chown -R swift:swift swift.egg-info`
prior to running tox.
If you ever encounter DistributionNotFound, try to use `tox --recreate`
or removing .tox directory to force tox to recreate the dependency list
* Optionally, run only specific tox builds:
- `tox -e pep8,py26`
- `tox -e pep8,py27`
.. note::
As of tox version 2.0.0, most environment variables are not automatically
passed to the test environment. Swift's tox.ini overrides this default
behavior so that variable names matching SWIFT_* and *_proxy will be passed,
but you may need to run tox --recreate for this to take effect after
upgrading from tox<2.0.0.
Conversely, if you do not want those environment variables to be passed to
the test environment then you will need to unset them before calling tox.
Also, if you ever encounter DistributionNotFound, try to use `tox --recreate`
or remove the .tox directory to force tox to recreate the dependency list.
The functional tests may be executed against a :doc:`development_saio` or
other running Swift cluster using the command:

View File

@ -8,7 +8,7 @@ Instructions for setting up a development VM
This section documents setting up a virtual machine for doing Swift
development. The virtual machine will emulate running a four node Swift
cluster.
cluster. To begin:
* Get an Ubuntu 14.04 LTS server image or try something
Fedora/CentOS.
@ -55,10 +55,9 @@ Installing dependencies
python-netifaces python-pip python-dns \
python-mock
This installs necessary system dependencies; and *most* of the python
dependencies. Later in the process setuptools/distribute or pip will
install and/or upgrade some other stuff - it's getting harder to avoid.
You can also install anything else you want, like screen, ssh, vim, etc.
Note: This installs necessary system dependencies and *most* of the python
dependencies. Later in the process setuptools/distribute or pip will install
and/or upgrade packages.
Next, choose either :ref:`partition-section` or :ref:`loopback-section`.
@ -176,7 +175,7 @@ Getting the code
#. Build a development installation of swift::
cd $HOME/swift; sudo python setup.py develop; cd -
cd $HOME/swift; sudo pip install -r requirements.txt; sudo python setup.py develop; cd -
Fedora 19 or later users might have to perform the following if development
installation of swift fails::
@ -409,6 +408,7 @@ Setting up scripts for running Swift
#. Copy the SAIO scripts for resetting the environment::
mkdir -p $HOME/bin
cd $HOME/swift/doc; cp saio/bin/* $HOME/bin; cd -
chmod +x $HOME/bin/*

View File

@ -16,8 +16,7 @@ Swift is written in Python and has these dependencies:
* The Python packages listed in `the requirements file <https://github.com/openstack/swift/blob/master/requirements.txt>`_
* Testing additionally requires `the test dependencies <https://github.com/openstack/swift/blob/master/test-requirements.txt>`_
Python 2.6 should work, but it's not actively tested. There is no current
support for Python 3.
There is no current support for Python 3.
-------------
Getting Swift

View File

@ -35,14 +35,14 @@ and their information::
[realm1]
key = realm1key
key2 = realm1key2
cluster_name1 = https://host1/v1/
cluster_name2 = https://host2/v1/
cluster_clustername1 = https://host1/v1/
cluster_clustername2 = https://host2/v1/
[realm2]
key = realm2key
key2 = realm2key2
cluster_name3 = https://host3/v1/
cluster_name4 = https://host4/v1/
cluster_clustername3 = https://host3/v1/
cluster_clustername4 = https://host4/v1/
Each section name is the name of a sync realm. A sync realm is a set of
@ -165,12 +165,12 @@ Now, let's make our first container and tell it to synchronize to a second
we'll make next::
$ swift -A http://cluster1/auth/v1.0 -U test:tester -K testing post \
-t '//realm_name/cluster2_name/AUTH_33cdcad8-09fb-4940-90da-0f00cbf21c7c/container2' \
-t '//realm_name/clustername2/AUTH_33cdcad8-09fb-4940-90da-0f00cbf21c7c/container2' \
-k 'secret' container1
The ``-t`` indicates the cluster to sync to, which is the realm name of the
section from container-sync-realms.conf, followed by the cluster name from
that section, followed by the account and container names we want to sync to.
that section (without the cluster\_ prefix), followed by the account and container names we want to sync to.
The ``-k`` specifies the secret key the two containers will share for
synchronization; this is the user key, the cluster key in
container-sync-realms.conf will also be used behind the scenes.
@ -178,7 +178,7 @@ container-sync-realms.conf will also be used behind the scenes.
Now, we'll do something similar for the second cluster's container::
$ swift -A http://cluster2/auth/v1.0 -U test2:tester2 -K testing2 post \
-t '//realm_name/cluster1_name/AUTH_208d1854-e475-4500-b315-81de645d060e/container1' \
-t '//realm_name/clustername1/AUTH_208d1854-e475-4500-b315-81de645d060e/container1' \
-k 'secret' container2
That's it. Now we can upload a bunch of stuff to the first container and watch
@ -224,7 +224,7 @@ For instance, when we created the first container above and told it to
synchronize to the second, we could have used this curl command::
$ curl -i -X POST -H 'X-Auth-Token: AUTH_tkd5359e46ff9e419fa193dbd367f3cd19' \
-H 'X-Container-Sync-To: //realm_name/cluster2_name/AUTH_33cdcad8-09fb-4940-90da-0f00cbf21c7c/container2' \
-H 'X-Container-Sync-To: //realm_name/clustername2/AUTH_33cdcad8-09fb-4940-90da-0f00cbf21c7c/container2' \
-H 'X-Container-Sync-Key: secret' \
'http://cluster1/v1/AUTH_208d1854-e475-4500-b315-81de645d060e/container1'
HTTP/1.1 204 No Content

View File

@ -70,7 +70,8 @@ is just a zero-byte (not enforced) file with an extra
``X-Object-Manifest`` header.
All the object segments need to be in the same container, have a common object
name prefix, and their names sort in the order they should be concatenated.
name prefix, and sort in the order in which they should be concatenated.
Object names are sorted lexicographically as UTF-8 byte strings.
They don't have to be in the same container as the manifest file will be, which
is useful to keep container listings clean as explained above with ``swift``.
@ -101,11 +102,11 @@ Here's an example using ``curl`` with tiny 1-byte segments::
# First, upload the segments
curl -X PUT -H 'X-Auth-Token: <token>' \
http://<storage_url>/container/myobject/1 --data-binary '1'
http://<storage_url>/container/myobject/00000001 --data-binary '1'
curl -X PUT -H 'X-Auth-Token: <token>' \
http://<storage_url>/container/myobject/2 --data-binary '2'
http://<storage_url>/container/myobject/00000002 --data-binary '2'
curl -X PUT -H 'X-Auth-Token: <token>' \
http://<storage_url>/container/myobject/3 --data-binary '3'
http://<storage_url>/container/myobject/00000003 --data-binary '3'
# Next, create the manifest file
curl -X PUT -H 'X-Auth-Token: <token>' \

View File

@ -237,6 +237,12 @@ when the balance doesn't improve by at least 1% (indicating we probably can't
get perfect balance due to wildly imbalanced zones or too many partitions
recently moved).
---------------------
Ring Builder Analyzer
---------------------
.. automodule:: swift.cli.ring_builder_analyzer
-------
History
-------

View File

@ -94,7 +94,11 @@ use = egg:swift#recon
# per_diff = 1000
# max_diffs = 100
# concurrency = 8
#
# Time in seconds to wait between replication passes
# interval = 30
# run_pause is deprecated, use interval instead
# run_pause = 30
#
# How long without an error before a node's error count is reset. This will
# also be how long before a node is reenabled after suppression is triggered.
@ -109,10 +113,10 @@ use = egg:swift#recon
# The replicator also performs reclamation
# reclaim_age = 604800
#
# Time in seconds to wait between replication passes
# Note: if the parameter 'interval' is defined then it will be used in place
# of run_pause.
# run_pause = 30
# Allow rsync to compress data which is transmitted to destination node
# during sync. However, this is applicable only when destination node is in
# a different region than the local one.
# rsync_compress = no
#
# recon_cache_path = /var/cache/swift

View File

@ -103,17 +103,22 @@ use = egg:swift#recon
# per_diff = 1000
# max_diffs = 100
# concurrency = 8
#
# Time in seconds to wait between replication passes
# interval = 30
# run_pause is deprecated, use interval instead
# run_pause = 30
#
# node_timeout = 10
# conn_timeout = 0.5
#
# The replicator also performs reclamation
# reclaim_age = 604800
#
# Time in seconds to wait between replication passes
# Note: if the parameter 'interval' is defined then it will be used in place
# of run_pause.
# run_pause = 30
# Allow rsync to compress data which is transmitted to destination node
# during sync. However, this is applicable only when destination node is in
# a different region than the local one.
# rsync_compress = no
#
# recon_cache_path = /var/cache/swift

View File

@ -7,14 +7,14 @@
# [realm1]
# key = realm1key
# key2 = realm1key2
# cluster_name1 = https://host1/v1/
# cluster_name2 = https://host2/v1/
# cluster_clustername1 = https://host1/v1/
# cluster_clustername2 = https://host2/v1/
#
# [realm2]
# key = realm2key
# key2 = realm2key2
# cluster_name3 = https://host3/v1/
# cluster_name4 = https://host4/v1/
# cluster_clustername3 = https://host3/v1/
# cluster_clustername4 = https://host4/v1/
# Each section name is the name of a sync realm. A sync realm is a set of

View File

@ -12,9 +12,16 @@ bind_port = 6000
# expiring_objects_account_name = expiring_objects
#
# Use an integer to override the number of pre-forked processes that will
# accept connections.
# accept connections. NOTE: if servers_per_port is set, this setting is
# ignored.
# workers = auto
#
# Make object-server run this many worker processes per unique port of
# "local" ring devices across all storage policies. This can help provide
# the isolation of threads_per_disk without the severe overhead. The default
# value of 0 disables this feature.
# servers_per_port = 0
#
# Maximum concurrent requests per worker
# max_clients = 1024
#
@ -155,7 +162,12 @@ use = egg:swift#recon
#
# vm_test_mode = no
# daemonize = on
#
# Time in seconds to wait between replication passes
# interval = 30
# run_pause is deprecated, use interval instead
# run_pause = 30
#
# concurrency = 1
# stats_interval = 300
#
@ -174,6 +186,13 @@ use = egg:swift#recon
# passed to rsync for io op timeout
# rsync_io_timeout = 30
#
# Allow rsync to compress data which is transmitted to destination node
# during sync. However, this is applicable only when destination node is in
# a different region than the local one.
# NOTE: Objects that are already compressed (for example: .tar.gz, .mp3) might
# slow down the syncing process.
# rsync_compress = no
#
# node_timeout = <whatever's in the DEFAULT section or 10>
# max duration of an http request; this is for REPLICATE finalization calls and
# so should be longer than node_timeout
@ -223,7 +242,12 @@ use = egg:swift#recon
# log_address = /dev/log
#
# daemonize = on
#
# Time in seconds to wait between reconstruction passes
# interval = 30
# run_pause is deprecated, use interval instead
# run_pause = 30
#
# concurrency = 1
# stats_interval = 300
# node_timeout = 10

View File

@ -460,7 +460,16 @@ use = egg:swift#domain_remap
#
# storage_domain = example.com
# path_root = v1
# Browsers can convert a host header to lowercase, so check that reseller
# prefix on the account is the correct case. This is done by comparing the
# items in the reseller_prefixes config option to the found prefix. If they
# match except for case, the item from reseller_prefixes will be used
# instead of the found reseller prefix. When none match, the default reseller
# prefix is used. When no default reseller prefix is configured, any request
# with an account prefix not in that list will be ignored by this middleware.
# reseller_prefixes = AUTH
# default_reseller_prefix =
[filter:catch_errors]
use = egg:swift#catch_errors

View File

@ -129,6 +129,14 @@ default = yes
#max_header_size = 8192
# By default the maximum number of allowed headers depends on the number of max
# allowed metadata settings plus a default value of 32 for regular http
# headers. If for some reason this is not enough (custom middleware for
# example) it can be increased with the extra_header_count constraint.
#extra_header_count = 32
# max_object_name_length is the max number of bytes in the utf8 encoding
# of an object name

View File

@ -8,5 +8,6 @@ greenlet>=0.3.1
netifaces>=0.5,!=0.10.0,!=0.10.1
pastedeploy>=1.3.3
simplejson>=2.0.9
six>=1.9.0
xattr>=0.4
PyECLib>=1.0.3
PyECLib>=1.0.7

View File

@ -15,7 +15,6 @@ classifier =
Operating System :: POSIX :: Linux
Programming Language :: Python
Programming Language :: Python :: 2
Programming Language :: Python :: 2.6
Programming Language :: Python :: 2.7
[pbr]
@ -60,6 +59,7 @@ scripts =
bin/swift-recon
bin/swift-recon-cron
bin/swift-ring-builder
bin/swift-ring-builder-analyzer
bin/swift-temp-url
[entry_points]

View File

@ -460,6 +460,7 @@ class AccountBroker(DatabaseBroker):
max_rowid = -1
curs = conn.cursor()
for rec in item_list:
rec.setdefault('storage_policy_index', 0) # legacy
record = [rec['name'], rec['put_timestamp'],
rec['delete_timestamp'], rec['object_count'],
rec['bytes_used'], rec['deleted'],
@ -477,7 +478,7 @@ class AccountBroker(DatabaseBroker):
row = curs_row.fetchone()
if row:
row = list(row)
for i in xrange(5):
for i in range(5):
if record[i] is None and row[i] is not None:
record[i] = row[i]
if row[1] > record[1]: # Keep newest put_timestamp

View File

@ -69,7 +69,7 @@ class AccountReaper(Daemon):
self.object_ring = None
self.node_timeout = int(conf.get('node_timeout', 10))
self.conn_timeout = float(conf.get('conn_timeout', 0.5))
self.myips = whataremyips()
self.myips = whataremyips(conf.get('bind_ip', '0.0.0.0'))
self.concurrency = int(conf.get('concurrency', 25))
self.container_concurrency = self.object_concurrency = \
sqrt(self.concurrency)
@ -376,6 +376,7 @@ class AccountReaper(Daemon):
break
successes = 0
failures = 0
timestamp = Timestamp(time())
for node in nodes:
anode = account_nodes.pop()
try:
@ -386,7 +387,8 @@ class AccountReaper(Daemon):
headers={'X-Account-Host': '%(ip)s:%(port)s' % anode,
'X-Account-Partition': str(account_partition),
'X-Account-Device': anode['device'],
'X-Account-Override-Deleted': 'yes'})
'X-Account-Override-Deleted': 'yes',
'X-Timestamp': timestamp.internal})
successes += 1
self.stats_return_codes[2] = \
self.stats_return_codes.get(2, 0) + 1
@ -443,6 +445,8 @@ class AccountReaper(Daemon):
part, nodes = ring.get_nodes(account, container, obj)
successes = 0
failures = 0
timestamp = Timestamp(time())
for node in nodes:
cnode = next(cnodes)
try:
@ -453,7 +457,8 @@ class AccountReaper(Daemon):
headers={'X-Container-Host': '%(ip)s:%(port)s' % cnode,
'X-Container-Partition': str(container_partition),
'X-Container-Device': cnode['device'],
'X-Backend-Storage-Policy-Index': policy_index})
'X-Backend-Storage-Policy-Index': policy_index,
'X-Timestamp': timestamp.internal})
successes += 1
self.stats_return_codes[2] = \
self.stats_return_codes.get(2, 0) + 1

View File

@ -153,7 +153,7 @@ class AccountController(BaseStorageServer):
return HTTPConflict(request=req)
metadata = {}
metadata.update((key, (value, timestamp.internal))
for key, value in req.headers.iteritems()
for key, value in req.headers.items()
if is_sys_or_user_meta('account', key))
if metadata:
broker.update_metadata(metadata, validate_metadata=True)
@ -246,7 +246,7 @@ class AccountController(BaseStorageServer):
return self._deleted_response(broker, req, HTTPNotFound)
metadata = {}
metadata.update((key, (value, req_timestamp.internal))
for key, value in req.headers.iteritems()
for key, value in req.headers.items()
if is_sys_or_user_meta('account', key))
if metadata:
broker.update_metadata(metadata, validate_metadata=True)

View File

@ -64,7 +64,7 @@ def get_response_headers(broker):
resp_headers[header_name] = value
resp_headers.update((key, value)
for key, (value, timestamp) in
broker.metadata.iteritems() if value != '')
broker.metadata.items() if value != '')
return resp_headers

View File

@ -212,13 +212,13 @@ def print_db_info_metadata(db_type, info, metadata):
raise ValueError('Info is incomplete: %s' % e)
meta_prefix = 'x_' + db_type + '_'
for key, value in info.iteritems():
for key, value in info.items():
if key.lower().startswith(meta_prefix):
title = key.replace('_', '-').title()
print ' %s: %s' % (title, value)
user_metadata = {}
sys_metadata = {}
for key, (value, timestamp) in metadata.iteritems():
for key, (value, timestamp) in metadata.items():
if is_user_meta(db_type, key):
user_metadata[strip_user_meta_prefix(db_type, key)] = value
elif is_sys_meta(db_type, key):
@ -284,7 +284,7 @@ def print_obj_metadata(metadata):
else:
print 'Timestamp: Not found in metadata'
for key, value in metadata.iteritems():
for key, value in metadata.items():
if is_user_meta('Object', key):
user_metadata[key] = value
elif is_sys_meta('Object', key):
@ -382,7 +382,7 @@ def print_obj(datafile, check_etag=True, swift_dir='/etc/swift',
if (policy_index is not None and
policy_index_for_name is not None and
policy_index != policy_index_for_name):
print 'Attention: Ring does not match policy!'
print 'Warning: Ring does not match policy!'
print 'Double check your policy name!'
if not ring and policy_index_for_name:
ring = POLICIES.get_object_ring(policy_index_for_name,
@ -472,9 +472,9 @@ def print_item_locations(ring, ring_name=None, account=None, container=None,
policy = POLICIES.get_by_name(policy_name)
if policy:
if ring_name != policy.ring_name:
print 'Attention! mismatch between ring and policy detected!'
print 'Warning: mismatch between ring and policy name!'
else:
print 'Attention! Policy %s is not valid' % policy_name
print 'Warning: Policy %s is not valid' % policy_name
policy_index = None
if ring is None and (obj or part):
@ -518,14 +518,16 @@ def print_item_locations(ring, ring_name=None, account=None, container=None,
ring = Ring(swift_dir, ring_name='container')
else:
if ring_name != 'container':
print 'Attention! mismatch between ring and item detected!'
print 'Warning: account/container specified ' + \
'but ring not named "container"'
if account and not container and not obj:
loc = 'accounts'
if not any([ring, ring_name]):
ring = Ring(swift_dir, ring_name='account')
else:
if ring_name != 'account':
print 'Attention! mismatch between ring and item detected!'
print 'Warning: account specified ' + \
'but ring not named "account"'
print '\nAccount \t%s' % account
print 'Container\t%s' % container

View File

@ -22,12 +22,9 @@ from eventlet.green import urllib2
from swift.common.utils import SWIFT_CONF_FILE
from swift.common.ring import Ring
from urlparse import urlparse
try:
import simplejson as json
except ImportError:
import json
from hashlib import md5
import eventlet
import json
import optparse
import time
import sys
@ -773,11 +770,10 @@ class SwiftRecon(object):
objq[url] = response['objects']
conq[url] = response['containers']
acctq[url] = response['accounts']
if response['policies']:
for key in response['policies']:
pkey = "objects_%s" % key
stats.setdefault(pkey, {})
stats[pkey][url] = response['policies'][key]['objects']
for key in response.get('policies', {}):
pkey = "objects_%s" % key
stats.setdefault(pkey, {})
stats[pkey][url] = response['policies'][key]['objects']
stats.update({"objects": objq, "containers": conq, "accounts": acctq})
for item in stats:
if len(stats[item]) > 0:

View File

@ -0,0 +1,325 @@
#! /usr/bin/env python
# Copyright (c) 2015 Samuel Merritt <sam@swiftstack.com>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
This is a tool for analyzing how well the ring builder performs its job
in a particular scenario. It is intended to help developers quantify any
improvements or regressions in the ring builder; it is probably not useful
to others.
The ring builder analyzer takes a scenario file containing some initial
parameters for a ring builder plus a certain number of rounds. In each
round, some modifications are made to the builder, e.g. add a device, remove
a device, change a device's weight. Then, the builder is repeatedly
rebalanced until it settles down. Data about that round is printed, and the
next round begins.
Scenarios are specified in JSON. Example scenario for a gradual device
addition::
{
"part_power": 12,
"replicas": 3,
"overload": 0.1,
"random_seed": 203488,
"rounds": [
[
["add", "r1z2-10.20.30.40:6000/sda", 8000],
["add", "r1z2-10.20.30.40:6000/sdb", 8000],
["add", "r1z2-10.20.30.40:6000/sdc", 8000],
["add", "r1z2-10.20.30.40:6000/sdd", 8000],
["add", "r1z2-10.20.30.41:6000/sda", 8000],
["add", "r1z2-10.20.30.41:6000/sdb", 8000],
["add", "r1z2-10.20.30.41:6000/sdc", 8000],
["add", "r1z2-10.20.30.41:6000/sdd", 8000],
["add", "r1z2-10.20.30.43:6000/sda", 8000],
["add", "r1z2-10.20.30.43:6000/sdb", 8000],
["add", "r1z2-10.20.30.43:6000/sdc", 8000],
["add", "r1z2-10.20.30.43:6000/sdd", 8000],
["add", "r1z2-10.20.30.44:6000/sda", 8000],
["add", "r1z2-10.20.30.44:6000/sdb", 8000],
["add", "r1z2-10.20.30.44:6000/sdc", 8000]
], [
["add", "r1z2-10.20.30.44:6000/sdd", 1000]
], [
["set_weight", 15, 2000]
], [
["remove", 3],
["set_weight", 15, 3000]
], [
["set_weight", 15, 4000]
], [
["set_weight", 15, 5000]
], [
["set_weight", 15, 6000]
], [
["set_weight", 15, 7000]
], [
["set_weight", 15, 8000]
]]
}
"""
import argparse
import json
import sys
from swift.common.ring import builder
from swift.common.ring.utils import parse_add_value
ARG_PARSER = argparse.ArgumentParser(
description='Put the ring builder through its paces')
ARG_PARSER.add_argument(
'--check', '-c', action='store_true',
help="Just check the scenario, don't execute it.")
ARG_PARSER.add_argument(
'scenario_path',
help="Path to the scenario file")
def _parse_weight(round_index, command_index, weight_str):
try:
weight = float(weight_str)
except ValueError as err:
raise ValueError(
"Invalid weight %r (round %d, command %d): %s"
% (weight_str, round_index, command_index, err))
if weight < 0:
raise ValueError(
"Negative weight (round %d, command %d)"
% (round_index, command_index))
return weight
def _parse_add_command(round_index, command_index, command):
if len(command) != 3:
raise ValueError(
"Invalid add command (round %d, command %d): expected array of "
"length 3, but got %d"
% (round_index, command_index, len(command)))
dev_str = command[1]
weight_str = command[2]
try:
dev = parse_add_value(dev_str)
except ValueError as err:
raise ValueError(
"Invalid device specifier '%s' in add (round %d, command %d): %s"
% (dev_str, round_index, command_index, err))
dev['weight'] = _parse_weight(round_index, command_index, weight_str)
if dev['region'] is None:
dev['region'] = 1
return ['add', dev]
def _parse_remove_command(round_index, command_index, command):
if len(command) != 2:
raise ValueError(
"Invalid remove command (round %d, command %d): expected array of "
"length 2, but got %d"
% (round_index, command_index, len(command)))
dev_str = command[1]
try:
dev_id = int(dev_str)
except ValueError as err:
raise ValueError(
"Invalid device ID '%s' in remove (round %d, command %d): %s"
% (dev_str, round_index, command_index, err))
return ['remove', dev_id]
def _parse_set_weight_command(round_index, command_index, command):
if len(command) != 3:
raise ValueError(
"Invalid remove command (round %d, command %d): expected array of "
"length 3, but got %d"
% (round_index, command_index, len(command)))
dev_str = command[1]
weight_str = command[2]
try:
dev_id = int(dev_str)
except ValueError as err:
raise ValueError(
"Invalid device ID '%s' in set_weight (round %d, command %d): %s"
% (dev_str, round_index, command_index, err))
weight = _parse_weight(round_index, command_index, weight_str)
return ['set_weight', dev_id, weight]
def parse_scenario(scenario_data):
"""
Takes a serialized scenario and turns it into a data structure suitable
for feeding to run_scenario().
:returns: scenario
:raises: ValueError on invalid scenario
"""
parsed_scenario = {}
try:
raw_scenario = json.loads(scenario_data)
except ValueError as err:
raise ValueError("Invalid JSON in scenario file: %s" % err)
if not isinstance(raw_scenario, dict):
raise ValueError("Scenario must be a JSON object, not array or string")
if 'part_power' not in raw_scenario:
raise ValueError("part_power missing")
try:
parsed_scenario['part_power'] = int(raw_scenario['part_power'])
except ValueError as err:
raise ValueError("part_power not an integer: %s" % err)
if not 1 <= parsed_scenario['part_power'] <= 32:
raise ValueError("part_power must be between 1 and 32, but was %d"
% raw_scenario['part_power'])
if 'replicas' not in raw_scenario:
raise ValueError("replicas missing")
try:
parsed_scenario['replicas'] = float(raw_scenario['replicas'])
except ValueError as err:
raise ValueError("replicas not a float: %s" % err)
if parsed_scenario['replicas'] < 1:
raise ValueError("replicas must be at least 1, but is %f"
% parsed_scenario['replicas'])
if 'overload' not in raw_scenario:
raise ValueError("overload missing")
try:
parsed_scenario['overload'] = float(raw_scenario['overload'])
except ValueError as err:
raise ValueError("overload not a float: %s" % err)
if parsed_scenario['overload'] < 0:
raise ValueError("overload must be non-negative, but is %f"
% parsed_scenario['overload'])
if 'random_seed' not in raw_scenario:
raise ValueError("random_seed missing")
try:
parsed_scenario['random_seed'] = int(raw_scenario['random_seed'])
except ValueError as err:
raise ValueError("replicas not an integer: %s" % err)
if 'rounds' not in raw_scenario:
raise ValueError("rounds missing")
if not isinstance(raw_scenario['rounds'], list):
raise ValueError("rounds must be an array")
parser_for_command = {'add': _parse_add_command,
'remove': _parse_remove_command,
'set_weight': _parse_set_weight_command}
parsed_scenario['rounds'] = []
for round_index, raw_round in enumerate(raw_scenario['rounds']):
if not isinstance(raw_round, list):
raise ValueError("round %d not an array" % round_index)
parsed_round = []
for command_index, command in enumerate(raw_round):
if command[0] not in parser_for_command:
raise ValueError(
"Unknown command (round %d, command %d): "
"'%s' should be one of %s" %
(round_index, command_index, command[0],
parser_for_command.keys()))
parsed_round.append(
parser_for_command[command[0]](
round_index, command_index, command))
parsed_scenario['rounds'].append(parsed_round)
return parsed_scenario
def run_scenario(scenario):
"""
Takes a parsed scenario (like from parse_scenario()) and runs it.
"""
seed = scenario['random_seed']
rb = builder.RingBuilder(scenario['part_power'], scenario['replicas'], 1)
rb.set_overload(scenario['overload'])
for round_index, commands in enumerate(scenario['rounds']):
print "Round %d" % (round_index + 1)
for command in commands:
if command[0] == 'add':
rb.add_dev(command[1])
elif command[0] == 'remove':
rb.remove_dev(command[1])
elif command[0] == 'set_weight':
rb.set_dev_weight(command[1], command[2])
else:
raise ValueError("unknown command %r" % (command[0],))
rebalance_number = 1
parts_moved, old_balance = rb.rebalance(seed=seed)
rb.pretend_min_part_hours_passed()
print "\tRebalance 1: moved %d parts, balance is %.6f" % (
parts_moved, old_balance)
while True:
rebalance_number += 1
parts_moved, new_balance = rb.rebalance(seed=seed)
rb.pretend_min_part_hours_passed()
print "\tRebalance %d: moved %d parts, balance is %.6f" % (
rebalance_number, parts_moved, new_balance)
if parts_moved == 0:
break
if abs(new_balance - old_balance) < 1 and not (
old_balance == builder.MAX_BALANCE and
new_balance == builder.MAX_BALANCE):
break
old_balance = new_balance
def main(argv=None):
args = ARG_PARSER.parse_args(argv)
try:
with open(args.scenario_path) as sfh:
scenario_data = sfh.read()
except OSError as err:
sys.stderr.write("Error opening scenario %s: %s\n" %
(args.scenario_path, err))
return 1
try:
scenario = parse_scenario(scenario_data)
except ValueError as err:
sys.stderr.write("Invalid scenario %s: %s\n" %
(args.scenario_path, err))
return 1
if not args.check:
run_scenario(scenario)
return 0

View File

@ -34,7 +34,7 @@ from swift.common.ring.utils import validate_args, \
validate_and_normalize_ip, build_dev_from_opts, \
parse_builder_ring_filename_args, parse_search_value, \
parse_search_values_from_opts, parse_change_values_from_opts, \
dispersion_report, validate_device_name
dispersion_report, parse_add_value
from swift.common.utils import lock_parent_directory
MAJOR_VERSION = 1
@ -97,7 +97,7 @@ def _find_parts(devs):
# Sort by number of found replicas to keep the output format
sorted_partition_count = sorted(
partition_count.iteritems(), key=itemgetter(1), reverse=True)
partition_count.items(), key=itemgetter(1), reverse=True)
return sorted_partition_count
@ -129,37 +129,6 @@ def _parse_list_parts_values(argvish):
exit(EXIT_ERROR)
def _parse_address(rest):
if rest.startswith('['):
# remove first [] for ip
rest = rest.replace('[', '', 1).replace(']', '', 1)
pos = 0
while (pos < len(rest) and
not (rest[pos] == 'R' or rest[pos] == '/')):
pos += 1
address = rest[:pos]
rest = rest[pos:]
port_start = address.rfind(':')
if port_start == -1:
raise ValueError('Invalid port in add value')
ip = address[:port_start]
try:
port = int(address[(port_start + 1):])
except (TypeError, ValueError):
raise ValueError(
'Invalid port %s in add value' % address[port_start:])
# if this is an ipv6 address then we want to convert it
# to all lowercase and use its fully expanded representation
# to make searches easier
ip = validate_and_normalize_ip(ip)
return (ip, port, rest)
def _parse_add_values(argvish):
"""
Parse devices to add as specified on the command line.
@ -183,62 +152,25 @@ def _parse_add_values(argvish):
islice(args, 1, len(args), 2))
for devstr, weightstr in devs_and_weights:
region = 1
rest = devstr
if devstr.startswith('r'):
i = 1
while i < len(devstr) and devstr[i].isdigit():
i += 1
region = int(devstr[1:i])
rest = devstr[i:]
else:
dev_dict = parse_add_value(devstr)
if dev_dict['region'] is None:
stderr.write('WARNING: No region specified for %s. '
'Defaulting to region 1.\n' % devstr)
dev_dict['region'] = 1
if not rest.startswith('z'):
raise ValueError('Invalid add value: %s' % devstr)
i = 1
while i < len(rest) and rest[i].isdigit():
i += 1
zone = int(rest[1:i])
rest = rest[i:]
if dev_dict['replication_ip'] is None:
dev_dict['replication_ip'] = dev_dict['ip']
if not rest.startswith('-'):
raise ValueError('Invalid add value: %s' % devstr)
ip, port, rest = _parse_address(rest[1:])
replication_ip = ip
replication_port = port
if rest.startswith('R'):
replication_ip, replication_port, rest = \
_parse_address(rest[1:])
if not rest.startswith('/'):
raise ValueError(
'Invalid add value: %s' % devstr)
i = 1
while i < len(rest) and rest[i] != '_':
i += 1
device_name = rest[1:i]
if not validate_device_name(device_name):
raise ValueError('Invalid device name')
rest = rest[i:]
meta = ''
if rest.startswith('_'):
meta = rest[1:]
if dev_dict['replication_port'] is None:
dev_dict['replication_port'] = dev_dict['port']
weight = float(weightstr)
if weight < 0:
raise ValueError('Invalid weight value: %s' % devstr)
dev_dict['weight'] = weight
parsed_devs.append({'region': region, 'zone': zone, 'ip': ip,
'port': port, 'device': device_name,
'replication_ip': replication_ip,
'replication_port': replication_port,
'weight': weight, 'meta': meta})
parsed_devs.append(dev_dict)
else:
parsed_devs.append(build_dev_from_opts(opts))
@ -1073,8 +1005,7 @@ swift-ring-builder <ring_file> write_builder [min_part_hours]
'_last_part_gather_start': 0,
'_remove_devs': [],
}
builder = RingBuilder(1, 1, 1)
builder.copy_from(builder_dict)
builder = RingBuilder.from_dict(builder_dict)
for parts in builder._replica2part2dev:
for dev_id in parts:
builder.devs[dev_id]['parts'] += 1
@ -1190,7 +1121,7 @@ def main(arguments=None):
globals()
print Commands.default.__doc__.strip()
print
cmds = [c for c, f in Commands.__dict__.iteritems()
cmds = [c for c, f in Commands.__dict__.items()
if f.__doc__ and c[0] != '_' and c != 'default']
cmds.sort()
for cmd in cmds:

View File

@ -27,14 +27,19 @@ BufferedHTTPResponse.
"""
from swift import gettext_ as _
from swift.common import constraints
from urllib import quote
import logging
import time
import socket
import eventlet
from eventlet.green.httplib import CONTINUE, HTTPConnection, HTTPMessage, \
HTTPResponse, HTTPSConnection, _UNKNOWN
httplib = eventlet.import_patched('httplib')
httplib._MAXHEADERS = constraints.MAX_HEADER_COUNT
class BufferedHTTPResponse(HTTPResponse):
"""HTTPResponse class that buffers reading of headers"""
@ -62,6 +67,7 @@ class BufferedHTTPResponse(HTTPResponse):
self.chunk_left = _UNKNOWN # bytes left to read in current chunk
self.length = _UNKNOWN # number of bytes left in response
self.will_close = _UNKNOWN # conn will close at end of response
self._readline_buffer = ''
def expect_response(self):
if self.fp:
@ -79,6 +85,48 @@ class BufferedHTTPResponse(HTTPResponse):
self.msg = HTTPMessage(self.fp, 0)
self.msg.fp = None
def read(self, amt=None):
if not self._readline_buffer:
return HTTPResponse.read(self, amt)
if amt is None:
# Unbounded read: send anything we have buffered plus whatever
# is left.
buffered = self._readline_buffer
self._readline_buffer = ''
return buffered + HTTPResponse.read(self, amt)
elif amt <= len(self._readline_buffer):
# Bounded read that we can satisfy entirely from our buffer
res = self._readline_buffer[:amt]
self._readline_buffer = self._readline_buffer[amt:]
return res
else:
# Bounded read that wants more bytes than we have
smaller_amt = amt - len(self._readline_buffer)
buf = self._readline_buffer
self._readline_buffer = ''
return buf + HTTPResponse.read(self, smaller_amt)
def readline(self, size=1024):
# You'd think Python's httplib would provide this, but it doesn't.
# It does, however, provide a comment in the HTTPResponse class:
#
# # XXX It would be nice to have readline and __iter__ for this,
# # too.
#
# Yes, it certainly would.
while ('\n' not in self._readline_buffer
and len(self._readline_buffer) < size):
read_size = size - len(self._readline_buffer)
chunk = HTTPResponse.read(self, read_size)
if not chunk:
break
self._readline_buffer += chunk
line, newline, rest = self._readline_buffer.partition('\n')
self._readline_buffer = rest
return line + newline
def nuke_from_orbit(self):
"""
Terminate the socket with extreme prejudice.
@ -155,6 +203,11 @@ def http_connect(ipaddr, port, device, partition, method, path,
path = path.encode("utf-8")
except UnicodeError as e:
logging.exception(_('Error encoding to UTF-8: %s'), str(e))
if isinstance(device, unicode):
try:
device = device.encode("utf-8")
except UnicodeError as e:
logging.exception(_('Error encoding to UTF-8: %s'), str(e))
path = quote('/' + device + '/' + str(partition) + path)
return http_connect_raw(
ipaddr, port, method, path, headers, query_string, ssl)
@ -187,7 +240,7 @@ def http_connect_raw(ipaddr, port, method, path, headers=None,
conn.path = path
conn.putrequest(method, path, skip_host=(headers and 'Host' in headers))
if headers:
for header, value in headers.iteritems():
for header, value in headers.items():
conn.putheader(header, str(value))
conn.endheaders()
return conn

View File

@ -36,6 +36,7 @@ ACCOUNT_LISTING_LIMIT = 10000
MAX_ACCOUNT_NAME_LENGTH = 256
MAX_CONTAINER_NAME_LENGTH = 256
VALID_API_VERSIONS = ["v1", "v1.0"]
EXTRA_HEADER_COUNT = 0
# If adding an entry to DEFAULT_CONSTRAINTS, note that
# these constraints are automatically published by the
@ -54,6 +55,7 @@ DEFAULT_CONSTRAINTS = {
'max_account_name_length': MAX_ACCOUNT_NAME_LENGTH,
'max_container_name_length': MAX_CONTAINER_NAME_LENGTH,
'valid_api_versions': VALID_API_VERSIONS,
'extra_header_count': EXTRA_HEADER_COUNT,
}
SWIFT_CONSTRAINTS_LOADED = False
@ -105,6 +107,13 @@ FORMAT2CONTENT_TYPE = {'plain': 'text/plain', 'json': 'application/json',
'xml': 'application/xml'}
# By default the maximum number of allowed headers depends on the number of max
# allowed metadata settings plus a default value of 32 for regular http
# headers. If for some reason this is not enough (custom middleware for
# example) it can be increased with the extra_header_count constraint.
MAX_HEADER_COUNT = MAX_META_COUNT + 32 + max(EXTRA_HEADER_COUNT, 0)
def check_metadata(req, target_type):
"""
Check metadata sent in the request headers. This should only check
@ -120,7 +129,7 @@ def check_metadata(req, target_type):
prefix = 'x-%s-meta-' % target_type.lower()
meta_count = 0
meta_size = 0
for key, value in req.headers.iteritems():
for key, value in req.headers.items():
if isinstance(value, basestring) and len(value) > MAX_HEADER_SIZE:
return HTTPBadRequest(body='Header value too long: %s' %
key[:MAX_META_NAME_LENGTH],

View File

@ -328,6 +328,8 @@ class DatabaseBroker(object):
exc_hint = 'malformed'
elif 'file is encrypted or is not a database' in str(exc_value):
exc_hint = 'corrupted'
elif 'disk I/O error' in str(exc_value):
exc_hint = 'disk error while accessing'
else:
raise exc_type, exc_value, exc_traceback
prefix_path = os.path.dirname(self.db_dir)
@ -732,7 +734,7 @@ class DatabaseBroker(object):
"""
meta_count = 0
meta_size = 0
for key, (value, timestamp) in metadata.iteritems():
for key, (value, timestamp) in metadata.items():
key = key.lower()
if value != '' and (key.startswith('x-account-meta') or
key.startswith('x-container-meta')):
@ -760,7 +762,7 @@ class DatabaseBroker(object):
"""
old_metadata = self.metadata
if set(metadata_updates).issubset(set(old_metadata)):
for key, (value, timestamp) in metadata_updates.iteritems():
for key, (value, timestamp) in metadata_updates.items():
if timestamp > old_metadata[key][1]:
break
else:
@ -778,7 +780,7 @@ class DatabaseBroker(object):
ALTER TABLE %s_stat
ADD COLUMN metadata TEXT DEFAULT '' """ % self.db_type)
md = {}
for key, value_timestamp in metadata_updates.iteritems():
for key, value_timestamp in metadata_updates.items():
value, timestamp = value_timestamp
if key not in md or timestamp > md[key][1]:
md[key] = value_timestamp
@ -842,7 +844,7 @@ class DatabaseBroker(object):
if md:
md = json.loads(md)
keys_to_delete = []
for key, (value, value_timestamp) in md.iteritems():
for key, (value, value_timestamp) in md.items():
if value == '' and value_timestamp < timestamp:
keys_to_delete.append(key)
if keys_to_delete:

View File

@ -105,7 +105,7 @@ def roundrobin_datadirs(datadirs):
while its:
for it in its:
try:
yield it.next()
yield next(it)
except StopIteration:
its.remove(it)
@ -154,6 +154,7 @@ class Replicator(Daemon):
self.logger = logger or get_logger(conf, log_route='replicator')
self.root = conf.get('devices', '/srv/node')
self.mount_check = config_true_value(conf.get('mount_check', 'true'))
self.bind_ip = conf.get('bind_ip', '0.0.0.0')
self.port = int(conf.get('bind_port', self.default_port))
concurrency = int(conf.get('concurrency', 8))
self.cpool = GreenPool(size=concurrency)
@ -167,6 +168,8 @@ class Replicator(Daemon):
self.vm_test_mode = config_true_value(conf.get('vm_test_mode', 'no'))
self.node_timeout = int(conf.get('node_timeout', 10))
self.conn_timeout = float(conf.get('conn_timeout', 0.5))
self.rsync_compress = config_true_value(
conf.get('rsync_compress', 'no'))
self.reclaim_age = float(conf.get('reclaim_age', 86400 * 7))
swift.common.db.DB_PREALLOCATION = \
config_true_value(conf.get('db_preallocation', 'f'))
@ -209,13 +212,16 @@ class Replicator(Daemon):
('no_change', 'hashmatch', 'rsync', 'diff', 'ts_repl',
'empty', 'diff_capped')]))
def _rsync_file(self, db_file, remote_file, whole_file=True):
def _rsync_file(self, db_file, remote_file, whole_file=True,
different_region=False):
"""
Sync a single file using rsync. Used by _rsync_db to handle syncing.
:param db_file: file to be synced
:param remote_file: remote location to sync the DB file to
:param whole-file: if True, uses rsync's --whole-file flag
:param different_region: if True, the destination node is in a
different region
:returns: True if the sync was successful, False otherwise
"""
@ -224,6 +230,12 @@ class Replicator(Daemon):
'--contimeout=%s' % int(math.ceil(self.conn_timeout))]
if whole_file:
popen_args.append('--whole-file')
if self.rsync_compress and different_region:
# Allow for compression, but only if the remote node is in
# a different region than the local one.
popen_args.append('--compress')
popen_args.extend([db_file, remote_file])
proc = subprocess.Popen(popen_args)
proc.communicate()
@ -233,7 +245,8 @@ class Replicator(Daemon):
return proc.returncode == 0
def _rsync_db(self, broker, device, http, local_id,
replicate_method='complete_rsync', replicate_timeout=None):
replicate_method='complete_rsync', replicate_timeout=None,
different_region=False):
"""
Sync a whole db using rsync.
@ -243,6 +256,8 @@ class Replicator(Daemon):
:param local_id: unique ID of the local database replica
:param replicate_method: remote operation to perform after rsync
:param replicate_timeout: timeout to wait in seconds
:param different_region: if True, the destination node is in a
different region
"""
device_ip = rsync_ip(device['replication_ip'])
if self.vm_test_mode:
@ -253,14 +268,17 @@ class Replicator(Daemon):
remote_file = '%s::%s/%s/tmp/%s' % (
device_ip, self.server_type, device['device'], local_id)
mtime = os.path.getmtime(broker.db_file)
if not self._rsync_file(broker.db_file, remote_file):
if not self._rsync_file(broker.db_file, remote_file,
different_region=different_region):
return False
# perform block-level sync if the db was modified during the first sync
if os.path.exists(broker.db_file + '-journal') or \
os.path.getmtime(broker.db_file) > mtime:
# grab a lock so nobody else can modify it
with broker.lock():
if not self._rsync_file(broker.db_file, remote_file, False):
if not self._rsync_file(broker.db_file, remote_file,
whole_file=False,
different_region=different_region):
return False
with Timeout(replicate_timeout or self.node_timeout):
response = http.replicate(replicate_method, local_id)
@ -363,7 +381,8 @@ class Replicator(Daemon):
'put_timestamp', 'delete_timestamp', 'metadata')
return tuple(info[key] for key in sync_args_order)
def _repl_to_node(self, node, broker, partition, info):
def _repl_to_node(self, node, broker, partition, info,
different_region=False):
"""
Replicate a database to a node.
@ -373,6 +392,8 @@ class Replicator(Daemon):
:param info: DB info as a dictionary of {'max_row', 'hash', 'id',
'created_at', 'put_timestamp', 'delete_timestamp',
'metadata'}
:param different_region: if True, the destination node is in a
different region
:returns: True if successful, False otherwise
"""
@ -382,13 +403,16 @@ class Replicator(Daemon):
response = http.replicate('sync', *sync_args)
if not response:
return False
return self._handle_sync_response(node, response, info, broker, http)
return self._handle_sync_response(node, response, info, broker, http,
different_region=different_region)
def _handle_sync_response(self, node, response, info, broker, http):
def _handle_sync_response(self, node, response, info, broker, http,
different_region=False):
if response.status == HTTP_NOT_FOUND: # completely missing, rsync
self.stats['rsync'] += 1
self.logger.increment('rsyncs')
return self._rsync_db(broker, node, http, info['id'])
return self._rsync_db(broker, node, http, info['id'],
different_region=different_region)
elif response.status == HTTP_INSUFFICIENT_STORAGE:
raise DriveNotMounted()
elif response.status >= 200 and response.status < 300:
@ -403,7 +427,8 @@ class Replicator(Daemon):
self.logger.increment('remote_merges')
return self._rsync_db(broker, node, http, info['id'],
replicate_method='rsync_then_merge',
replicate_timeout=(info['count'] / 2000))
replicate_timeout=(info['count'] / 2000),
different_region=different_region)
# else send diffs over to the remote server
return self._usync_db(max(rinfo['point'], local_sync),
broker, http, rinfo['id'], info['id'])
@ -470,6 +495,11 @@ class Replicator(Daemon):
return
responses = []
nodes = self.ring.get_part_nodes(int(partition))
local_dev = None
for node in nodes:
if node['id'] == node_id:
local_dev = node
break
if shouldbehere:
shouldbehere = bool([n for n in nodes if n['id'] == node_id])
# See Footnote [1] for an explanation of the repl_nodes assignment.
@ -478,12 +508,25 @@ class Replicator(Daemon):
i += 1
repl_nodes = nodes[i + 1:] + nodes[:i]
more_nodes = self.ring.get_more_nodes(int(partition))
if not local_dev:
# Check further if local device is a handoff node
for node in more_nodes:
if node['id'] == node_id:
local_dev = node
break
for node in repl_nodes:
different_region = False
if local_dev and local_dev['region'] != node['region']:
# This additional information will help later if we
# want to handle syncing to a node in different
# region with some optimizations.
different_region = True
success = False
try:
success = self._repl_to_node(node, broker, partition, info)
success = self._repl_to_node(node, broker, partition, info,
different_region)
except DriveNotMounted:
repl_nodes.append(more_nodes.next())
repl_nodes.append(next(more_nodes))
self.logger.error(_('ERROR Remote drive not mounted %s'), node)
except (Exception, Timeout):
self.logger.exception(_('ERROR syncing %(file)s with node'
@ -538,7 +581,7 @@ class Replicator(Daemon):
"""Run a replication pass once."""
self._zero_stats()
dirs = []
ips = whataremyips()
ips = whataremyips(self.bind_ip)
if not ips:
self.logger.error(_('ERROR Failed to get my own IPs?'))
return

View File

@ -153,6 +153,7 @@ def direct_head_container(node, part, account, container, conn_timeout=5,
:param conn_timeout: timeout in seconds for establishing the connection
:param response_timeout: timeout in seconds for getting the response
:returns: a dict containing the response's headers in a HeaderKeyDict
:raises ClientException: HTTP HEAD request failed
"""
path = '/%s/%s' % (account, container)
with Timeout(conn_timeout):
@ -200,14 +201,27 @@ def direct_get_container(node, part, account, container, marker=None,
def direct_delete_container(node, part, account, container, conn_timeout=5,
response_timeout=15, headers=None):
"""
Delete container directly from the container server.
:param node: node dictionary from the ring
:param part: partition the container is on
:param account: account name
:param container: container name
:param conn_timeout: timeout in seconds for establishing the connection
:param response_timeout: timeout in seconds for getting the response
:param headers: dict to be passed into HTTPConnection headers
:raises ClientException: HTTP DELETE request failed
"""
if headers is None:
headers = {}
path = '/%s/%s' % (account, container)
add_timestamp = 'x-timestamp' not in (k.lower() for k in headers)
with Timeout(conn_timeout):
conn = http_connect(node['ip'], node['port'], node['device'], part,
'DELETE', path,
headers=gen_headers(headers, True))
headers=gen_headers(headers, add_timestamp))
with Timeout(response_timeout):
resp = conn.getresponse()
resp.read()
@ -274,6 +288,7 @@ def direct_head_object(node, part, account, container, obj, conn_timeout=5,
:param response_timeout: timeout in seconds for getting the response
:param headers: dict to be passed into HTTPConnection headers
:returns: a dict containing the response's headers in a HeaderKeyDict
:raises ClientException: HTTP HEAD request failed
"""
if headers is None:
headers = {}
@ -312,6 +327,7 @@ def direct_get_object(node, part, account, container, obj, conn_timeout=5,
:param headers: dict to be passed into HTTPConnection headers
:returns: a tuple of (response headers, the object's contents) The response
headers will be a HeaderKeyDict.
:raises ClientException: HTTP GET request failed
"""
if headers is None:
headers = {}
@ -363,6 +379,7 @@ def direct_put_object(node, part, account, container, name, contents,
:param response_timeout: timeout in seconds for getting the response
:param chunk_size: if defined, chunk size of data to send.
:returns: etag from the server response
:raises ClientException: HTTP PUT request failed
"""
path = '/%s/%s/%s' % (account, container, name)
@ -373,7 +390,7 @@ def direct_put_object(node, part, account, container, name, contents,
if content_length is not None:
headers['Content-Length'] = str(content_length)
else:
for n, v in headers.iteritems():
for n, v in headers.items():
if n.lower() == 'content-length':
content_length = int(v)
if content_type is not None:
@ -462,7 +479,7 @@ def direct_delete_object(node, part, account, container, obj,
:param obj: object name
:param conn_timeout: timeout in seconds for establishing the connection
:param response_timeout: timeout in seconds for getting the response
:returns: response from server
:raises ClientException: HTTP DELETE request failed
"""
if headers is None:
headers = {}
@ -493,7 +510,8 @@ def retry(func, *args, **kwargs):
:param kwargs: keyward arguments to send to func (if retries or
error_log are sent, they will be deleted from kwargs
before sending on to func)
:returns: restult of func
:returns: result of func
:raises ClientException: all retries failed
"""
retries = 5
if 'retries' in kwargs:

View File

@ -57,6 +57,10 @@ class SuffixSyncError(SwiftException):
pass
class RangeAlreadyComplete(SwiftException):
pass
class DiskFileError(SwiftException):
pass
@ -256,3 +260,7 @@ class ClientException(Exception):
b += ' [first 60 chars of response] %s' \
% self.http_response_content[:60]
return b and '%s: %s' % (a, b) or a
class InvalidPidFileException(Exception):
pass

View File

@ -16,6 +16,7 @@
from eventlet import sleep, Timeout
from eventlet.green import httplib, socket, urllib2
import json
from six.moves import range
import struct
from sys import exc_info
import zlib
@ -171,7 +172,7 @@ class InternalClient(object):
headers = dict(headers)
headers['user-agent'] = self.user_agent
resp = exc_type = exc_value = exc_traceback = None
for attempt in xrange(self.request_tries):
for attempt in range(self.request_tries):
req = Request.blank(
path, environ={'REQUEST_METHOD': method}, headers=headers)
if body_file is not None:
@ -223,7 +224,7 @@ class InternalClient(object):
resp = self.make_request('HEAD', path, headers, acceptable_statuses)
metadata_prefix = metadata_prefix.lower()
metadata = {}
for k, v in resp.headers.iteritems():
for k, v in resp.headers.items():
if k.lower().startswith(metadata_prefix):
metadata[k[len(metadata_prefix):].lower()] = v
return metadata
@ -307,7 +308,7 @@ class InternalClient(object):
"""
headers = {}
for k, v in metadata.iteritems():
for k, v in metadata.items():
if k.lower().startswith(metadata_prefix):
headers[k] = v
else:

View File

@ -24,9 +24,11 @@ import re
from swift import gettext_ as _
from swift.common.utils import search_tree, remove_file, write_file
from swift.common.exceptions import InvalidPidFileException
SWIFT_DIR = '/etc/swift'
RUN_DIR = '/var/run/swift'
PROC_DIR = '/proc'
# auth-server has been removed from ALL_SERVERS, start it explicitly
ALL_SERVERS = ['account-auditor', 'account-server', 'container-auditor',
@ -134,6 +136,29 @@ def watch_server_pids(server_pids, interval=1, **kwargs):
time.sleep(0.1)
def safe_kill(pid, sig, name):
"""Send signal to process and check process name
: param pid: process id
: param sig: signal to send
: param name: name to ensure target process
"""
# check process name for SIG_DFL
if sig == signal.SIG_DFL:
try:
proc_file = '%s/%d/cmdline' % (PROC_DIR, pid)
if os.path.exists(proc_file):
with open(proc_file, 'r') as fd:
if name not in fd.read():
# unknown process is using the pid
raise InvalidPidFileException()
except IOError:
pass
os.kill(pid, sig)
class UnknownCommandError(Exception):
pass
@ -472,7 +497,11 @@ class Server(object):
"""Generator, yields (pid_file, pids)
"""
for pid_file in self.pid_files(**kwargs):
yield pid_file, int(open(pid_file).read().strip())
try:
pid = int(open(pid_file).read().strip())
except ValueError:
pid = None
yield pid_file, pid
def signal_pids(self, sig, **kwargs):
"""Send a signal to pids for this server
@ -484,11 +513,20 @@ class Server(object):
"""
pids = {}
for pid_file, pid in self.iter_pid_files(**kwargs):
if not pid: # Catches None and 0
print _('Removing pid file %s with invalid pid') % pid_file
remove_file(pid_file)
continue
try:
if sig != signal.SIG_DFL:
print _('Signal %s pid: %s signal: %s') % (self.server,
pid, sig)
os.kill(pid, sig)
safe_kill(pid, sig, 'swift-%s' % self.server)
except InvalidPidFileException as e:
if kwargs.get('verbose'):
print _('Removing pid file %s with wrong pid %d') \
% (pid_file, pid)
remove_file(pid_file)
except OSError as e:
if e.errno == errno.ESRCH:
# pid does not exist

View File

@ -50,11 +50,11 @@ import time
from bisect import bisect
from swift import gettext_ as _
from hashlib import md5
from distutils.version import StrictVersion
from eventlet.green import socket
from eventlet.pools import Pool
from eventlet import Timeout, __version__ as eventlet_version
from eventlet import Timeout
from six.moves import range
from swift.common.utils import json
@ -107,14 +107,6 @@ class MemcacheConnPool(Pool):
Pool.__init__(self, max_size=size)
self.server = server
self._connect_timeout = connect_timeout
self._parent_class_getter = super(MemcacheConnPool, self).get
try:
# call the patched .get() if eventlet is older than 0.9.17
if StrictVersion(eventlet_version) < StrictVersion('0.9.17'):
self._parent_class_getter = self._upstream_fixed_get
except ValueError:
# "invalid" version number or otherwise error parsing version
pass
def create(self):
if ':' in self.server:
@ -129,34 +121,12 @@ class MemcacheConnPool(Pool):
return (sock.makefile(), sock)
def get(self):
fp, sock = self._parent_class_getter()
fp, sock = super(MemcacheConnPool, self).get()
if fp is None:
# An error happened previously, so we need a new connection
fp, sock = self.create()
return fp, sock
# The following method is from eventlet post 0.9.16. This version
# properly keeps track of pool size accounting, and therefore doesn't
# let the pool grow without bound. This patched version is the result
# of commit f5e5b2bda7b442f0262ee1084deefcc5a1cc0694 in eventlet and is
# documented at https://bitbucket.org/eventlet/eventlet/issue/91
def _upstream_fixed_get(self):
"""Return an item from the pool, when one is available. This may
cause the calling greenthread to block.
"""
if self.free_items:
return self.free_items.popleft()
self.current_size += 1
if self.current_size <= self.max_size:
try:
created = self.create()
except: # noqa
self.current_size -= 1
raise
return created
self.current_size -= 1 # did not create
return self.channel.get()
class MemcacheRing(object):
"""
@ -171,7 +141,7 @@ class MemcacheRing(object):
self._errors = dict(((serv, []) for serv in servers))
self._error_limited = dict(((serv, 0) for serv in servers))
for server in sorted(servers):
for i in xrange(NODE_WEIGHT):
for i in range(NODE_WEIGHT):
self._ring[md5hash('%s-%s' % (server, i))] = server
self._tries = tries if tries <= len(servers) else len(servers)
self._sorted = sorted(self._ring)
@ -457,7 +427,7 @@ class MemcacheRing(object):
server_key = md5hash(server_key)
timeout = sanitize_timeout(time or timeout)
msg = ''
for key, value in mapping.iteritems():
for key, value in mapping.items():
key = md5hash(key)
flags = 0
if serialize and self._allow_pickle:

View File

@ -75,6 +75,23 @@ def get_response_body(data_format, data_dict, error_list):
return output
def pax_key_to_swift_header(pax_key):
if (pax_key == u"SCHILY.xattr.user.mime_type" or
pax_key == u"LIBARCHIVE.xattr.user.mime_type"):
return "Content-Type"
elif pax_key.startswith(u"SCHILY.xattr.user.meta."):
useful_part = pax_key[len(u"SCHILY.xattr.user.meta."):]
return "X-Object-Meta-" + useful_part.encode("utf-8")
elif pax_key.startswith(u"LIBARCHIVE.xattr.user.meta."):
useful_part = pax_key[len(u"LIBARCHIVE.xattr.user.meta."):]
return "X-Object-Meta-" + useful_part.encode("utf-8")
else:
# You can get things like atime/mtime/ctime or filesystem ACLs in
# pax headers; those aren't really user metadata. The same goes for
# other, non-user metadata.
return None
class Bulk(object):
"""
Middleware that will do many operations on a single request.
@ -403,7 +420,7 @@ class Bulk(object):
separator = '\r\n\r\n'
last_yield = time()
yield ' '
tar_info = tar.next()
tar_info = next(tar)
if tar_info is None or \
len(failed_files) >= self.max_failed_extractions:
break
@ -464,6 +481,16 @@ class Bulk(object):
new_env['HTTP_USER_AGENT'] = \
'%s BulkExpand' % req.environ.get('HTTP_USER_AGENT')
create_obj_req = Request.blank(destination, new_env)
for pax_key, pax_value in tar_info.pax_headers.items():
header_name = pax_key_to_swift_header(pax_key)
if header_name:
# Both pax_key and pax_value are unicode
# strings; the key is already UTF-8 encoded, but
# we still have to encode the value.
create_obj_req.headers[header_name] = \
pax_value.encode("utf-8")
resp = create_obj_req.get_response(self.app)
containers_accessed.add(container)
if resp.is_success:

View File

@ -27,6 +27,8 @@ maximum lookup depth. If a match is found, the environment's Host header is
rewritten and the request is passed further down the WSGI chain.
"""
from six.moves import range
import socket
from swift import gettext_ as _
@ -122,7 +124,7 @@ class CNAMELookupMiddleware(object):
if self.memcache is None:
self.memcache = cache_from_env(env)
error = True
for tries in xrange(self.lookup_depth):
for tries in range(self.lookup_depth):
found_domain = None
if self.memcache:
memcache_key = ''.join(['cname-', a_domain])

View File

@ -22,7 +22,8 @@ from swift.common.http import is_success
from swift.common.swob import Request, Response, \
HTTPRequestedRangeNotSatisfiable, HTTPBadRequest, HTTPConflict
from swift.common.utils import get_logger, json, \
RateLimitedIterator, read_conf_dir, quote
RateLimitedIterator, read_conf_dir, quote, close_if_possible, \
closing_if_possible
from swift.common.request_helpers import SegmentedIterable
from swift.common.wsgi import WSGIContext, make_subrequest
from urllib import unquote
@ -48,7 +49,8 @@ class GetContext(WSGIContext):
con_resp = con_req.get_response(self.dlo.app)
if not is_success(con_resp.status_int):
return con_resp, None
return None, json.loads(''.join(con_resp.app_iter))
with closing_if_possible(con_resp.app_iter):
return None, json.loads(''.join(con_resp.app_iter))
def _segment_listing_iterator(self, req, version, account, container,
prefix, segments, first_byte=None,
@ -107,6 +109,7 @@ class GetContext(WSGIContext):
# we've already started sending the response body to the
# client, so all we can do is raise an exception to make the
# WSGI server close the connection early
close_if_possible(error_response.app_iter)
raise ListingIterError(
"Got status %d listing container /%s/%s" %
(error_response.status_int, account, container))
@ -233,6 +236,7 @@ class GetContext(WSGIContext):
# make sure this response is for a dynamic large object manifest
for header, value in self._response_headers:
if (header.lower() == 'x-object-manifest'):
close_if_possible(resp_iter)
response = self.get_or_head_response(req, value)
return response(req.environ, start_response)
else:

View File

@ -30,9 +30,10 @@ Browsers can convert a host header to lowercase, so check that reseller
prefix on the account is the correct case. This is done by comparing the
items in the reseller_prefixes config option to the found prefix. If they
match except for case, the item from reseller_prefixes will be used
instead of the found reseller prefix. The reseller_prefixes list is
exclusive. If defined, any request with an account prefix not in that list
will be ignored by this middleware. reseller_prefixes defaults to 'AUTH'.
instead of the found reseller prefix. When none match, the default reseller
prefix is used. When no default reseller prefix is configured, any request with
an account prefix not in that list will be ignored by this middleware.
reseller_prefixes defaults to 'AUTH'.
Note that this middleware requires that container names and account names
(except as described above) must be DNS-compatible. This means that the
@ -50,6 +51,7 @@ sync destinations.
"""
from swift.common.swob import Request, HTTPBadRequest
from swift.common.utils import list_from_csv, register_swift_info
class DomainRemapMiddleware(object):
@ -70,10 +72,10 @@ class DomainRemapMiddleware(object):
self.storage_domain = '.' + self.storage_domain
self.path_root = conf.get('path_root', 'v1').strip('/')
prefixes = conf.get('reseller_prefixes', 'AUTH')
self.reseller_prefixes = [x.strip() for x in prefixes.split(',')
if x.strip()]
self.reseller_prefixes = list_from_csv(prefixes)
self.reseller_prefixes_lower = [x.lower()
for x in self.reseller_prefixes]
self.default_reseller_prefix = conf.get('default_reseller_prefix')
def __call__(self, env, start_response):
if not self.storage_domain:
@ -102,15 +104,21 @@ class DomainRemapMiddleware(object):
if '_' not in account and '-' in account:
account = account.replace('-', '_', 1)
account_reseller_prefix = account.split('_', 1)[0].lower()
if account_reseller_prefix not in self.reseller_prefixes_lower:
if account_reseller_prefix in self.reseller_prefixes_lower:
prefix_index = self.reseller_prefixes_lower.index(
account_reseller_prefix)
real_prefix = self.reseller_prefixes[prefix_index]
if not account.startswith(real_prefix):
account_suffix = account[len(real_prefix):]
account = real_prefix + account_suffix
elif self.default_reseller_prefix:
# account prefix is not in config list. Add default one.
account = "%s_%s" % (self.default_reseller_prefix, account)
else:
# account prefix is not in config list. bail.
return self.app(env, start_response)
prefix_index = self.reseller_prefixes_lower.index(
account_reseller_prefix)
real_prefix = self.reseller_prefixes[prefix_index]
if not account.startswith(real_prefix):
account_suffix = account[len(real_prefix):]
account = real_prefix + account_suffix
path = env['PATH_INFO'].strip('/')
new_path_parts = ['', self.path_root, account]
if container:
@ -128,6 +136,10 @@ def filter_factory(global_conf, **local_conf):
conf = global_conf.copy()
conf.update(local_conf)
register_swift_info(
'domain_remap',
default_reseller_prefix=conf.get('default_reseller_prefix'))
def domain_filter(app):
return DomainRemapMiddleware(app, conf)
return domain_filter

View File

@ -394,7 +394,7 @@ class FormPost(object):
i = iter(self.app(subenv, _start_response))
try:
i.next()
next(i)
except StopIteration:
pass
return substatus[0], subheaders[0], ''

View File

@ -71,6 +71,7 @@ if this is a middleware subrequest or not. A log processor calculating
bandwidth usage will want to only sum up logs with no swift.source.
"""
import sys
import time
from urllib import quote, unquote
@ -247,9 +248,9 @@ class ProxyLoggingMiddleware(object):
def iter_response(iterable):
iterator = iter(iterable)
try:
chunk = iterator.next()
chunk = next(iterator)
while not chunk:
chunk = iterator.next()
chunk = next(iterator)
except StopIteration:
chunk = ''
for h, v in start_response_args[0][1]:
@ -280,7 +281,7 @@ class ProxyLoggingMiddleware(object):
while chunk:
bytes_sent += len(chunk)
yield chunk
chunk = iterator.next()
chunk = next(iterator)
except GeneratorExit: # generator was closed before we finished
client_disconnect = True
raise
@ -296,12 +297,13 @@ class ProxyLoggingMiddleware(object):
try:
iterable = self.app(env, my_start_response)
except Exception:
exc_type, exc_value, exc_traceback = sys.exc_info()
req = Request(env)
status_int = status_int_for_logging(start_status=500)
self.log_request(
req, status_int, input_proxy.bytes_received, 0, start_time,
time.time())
raise
raise exc_type, exc_value, exc_traceback
else:
return iter_response(iterable)

View File

@ -36,8 +36,8 @@ json data format. The data to be supplied for each segment is::
path: the path to the segment (not including account)
/container/object_name
etag: the etag given back when the segment was PUT
size_bytes: the size of the segment in bytes
etag: the etag given back when the segment was PUT, or null
size_bytes: the size of the segment in bytes, or null
The format of the list will be::
@ -48,15 +48,25 @@ The format of the list will be::
The number of object segments is limited to a configurable amount, default
1000. Each segment, except for the final one, must be at least 1 megabyte
(configurable). On upload, the middleware will head every segment passed in and
verify the size and etag of each. If any of the objects do not match (not
(configurable). On upload, the middleware will head every segment passed in to
verify:
1. the segment exists (i.e. the HEAD was successful);
2. the segment meets minimum size requirements (if not the last segment);
3. if the user provided a non-null etag, the etag matches; and
4. if the user provided a non-null size_bytes, the size_bytes matches.
Note that the etag and size_bytes keys are still required; this acts as a guard
against user errors such as typos. If any of the objects fail to verify (not
found, size/etag mismatch, below minimum size) then the user will receive a 4xx
error response. If everything does match, the user will receive a 2xx response
and the SLO object is ready for downloading.
Behind the scenes, on success, a json manifest generated from the user input is
sent to object servers with an extra "X-Static-Large-Object: True" header
and a modified Content-Type. The parameter: swift_bytes=$total_size will be
and a modified Content-Type. The items in this manifest will include the etag
and size_bytes for each segment, regardless of whether the client specified
them for verification. The parameter: swift_bytes=$total_size will be
appended to the existing Content-Type, where total_size is the sum of all
the included segments' size_bytes. This extra parameter will be hidden from
the user.
@ -73,9 +83,11 @@ Retrieving a Large Object
A GET request to the manifest object will return the concatenation of the
objects from the manifest much like DLO. If any of the segments from the
manifest are not found or their Etag/Content Length no longer match the
connection will drop. In this case a 409 Conflict will be logged in the proxy
logs and the user will receive incomplete results.
manifest are not found or their Etag/Content Length have changed since upload,
the connection will drop. In this case a 409 Conflict will be logged in the
proxy logs and the user will receive incomplete results. Note that this will be
enforced regardless of whether the user perfomed per-segment validation during
upload.
The headers from this GET or HEAD request will return the metadata attached
to the manifest object itself with some exceptions::
@ -134,6 +146,8 @@ the manifest and the segments it's referring to) in the container and account
metadata which can be used for stats purposes.
"""
from six.moves import range
from cStringIO import StringIO
from datetime import datetime
import mimetypes
@ -147,9 +161,9 @@ from swift.common.swob import Request, HTTPBadRequest, HTTPServerError, \
Response
from swift.common.utils import json, get_logger, config_true_value, \
get_valid_utf8_str, override_bytes_from_content_type, split_path, \
register_swift_info, RateLimitedIterator, quote
from swift.common.request_helpers import SegmentedIterable, \
closing_if_possible, close_if_possible
register_swift_info, RateLimitedIterator, quote, close_if_possible, \
closing_if_possible
from swift.common.request_helpers import SegmentedIterable
from swift.common.constraints import check_utf8, MAX_BUFFERED_SLO_SEGMENTS
from swift.common.http import HTTP_NOT_FOUND, HTTP_UNAUTHORIZED, is_success
from swift.common.wsgi import WSGIContext, make_subrequest
@ -193,7 +207,7 @@ class SloPutContext(WSGIContext):
def handle_slo_put(self, req, start_response):
app_resp = self._app_call(req.environ)
for i in xrange(len(self._response_headers)):
for i in range(len(self._response_headers)):
if self._response_headers[i][0].lower() == 'etag':
self._response_headers[i] = ('Etag', self.slo_etag)
break
@ -227,6 +241,7 @@ class SloGetContext(WSGIContext):
sub_resp = sub_req.get_response(self.slo.app)
if not is_success(sub_resp.status_int):
close_if_possible(sub_resp.app_iter)
raise ListingIterError(
'ERROR: while fetching %s, GET of submanifest %s '
'failed with status %d' % (req.path, sub_req.path,
@ -400,7 +415,8 @@ class SloGetContext(WSGIContext):
return response(req.environ, start_response)
def get_or_head_response(self, req, resp_headers, resp_iter):
resp_body = ''.join(resp_iter)
with closing_if_possible(resp_iter):
resp_body = ''.join(resp_iter)
try:
segments = json.loads(resp_body)
except ValueError:
@ -537,7 +553,8 @@ class StaticLargeObject(object):
def slo_hook(source_req, source_resp, sink_req):
x_slo = source_resp.headers.get('X-Static-Large-Object')
if (config_true_value(x_slo)
and source_req.params.get('multipart-manifest') != 'get'):
and source_req.params.get('multipart-manifest') != 'get'
and 'swift.post_as_copy' not in source_req.environ):
source_resp = SloGetContext(self).get_or_head_response(
source_req, source_resp.headers.items(),
source_resp.app_iter)
@ -586,11 +603,19 @@ class StaticLargeObject(object):
if isinstance(obj_name, unicode):
obj_name = obj_name.encode('utf-8')
obj_path = '/'.join(['', vrs, account, obj_name.lstrip('/')])
if req.path == quote(obj_path):
raise HTTPConflict(
'Manifest object name "%s" '
'cannot be included in the manifest'
% obj_name)
try:
seg_size = int(seg_dict['size_bytes'])
except (ValueError, TypeError):
raise HTTPBadRequest('Invalid Manifest File')
if seg_size < self.min_segment_size and \
if seg_dict['size_bytes'] is None:
seg_size = None
else:
raise HTTPBadRequest('Invalid Manifest File')
if seg_size is not None and seg_size < self.min_segment_size and \
index < len(parsed_data) - 1:
raise HTTPBadRequest(
'Each segment, except the last, must be at least '
@ -608,11 +633,18 @@ class StaticLargeObject(object):
head_seg_resp = \
Request.blank(obj_path, new_env).get_response(self)
if head_seg_resp.is_success:
total_size += seg_size
if seg_size != head_seg_resp.content_length:
if head_seg_resp.content_length < self.min_segment_size and \
index < len(parsed_data) - 1:
raise HTTPBadRequest(
'Each segment, except the last, must be at least '
'%d bytes.' % self.min_segment_size)
total_size += head_seg_resp.content_length
if seg_size is not None and \
seg_size != head_seg_resp.content_length:
problem_segments.append([quote(obj_name), 'Size Mismatch'])
if seg_dict['etag'] == head_seg_resp.etag:
slo_etag.update(seg_dict['etag'])
if seg_dict['etag'] is None or \
seg_dict['etag'] == head_seg_resp.etag:
slo_etag.update(head_seg_resp.etag)
else:
problem_segments.append([quote(obj_name), 'Etag Mismatch'])
if head_seg_resp.last_modified:
@ -624,8 +656,8 @@ class StaticLargeObject(object):
last_modified_formatted = \
last_modified.strftime('%Y-%m-%dT%H:%M:%S.%f')
seg_data = {'name': '/' + seg_dict['path'].lstrip('/'),
'bytes': seg_size,
'hash': seg_dict['etag'],
'bytes': head_seg_resp.content_length,
'hash': head_seg_resp.etag,
'content_type': head_seg_resp.content_type,
'last_modified': last_modified_formatted}
if config_true_value(

View File

@ -117,10 +117,11 @@ Example usage of this middleware via ``swift``:
import cgi
import json
import time
from swift.common.utils import human_readable, split_path, config_true_value, \
json, quote, get_valid_utf8_str, register_swift_info
quote, register_swift_info
from swift.common.wsgi import make_pre_authed_env, WSGIContext
from swift.common.http import is_success, is_redirection, HTTP_NOT_FOUND
from swift.common.swob import Response, HTTPMovedPermanently, HTTPNotFound
@ -289,7 +290,7 @@ class _StaticWebContext(WSGIContext):
' </tr>\n'
for item in listing:
if 'subdir' in item:
subdir = get_valid_utf8_str(item['subdir'])
subdir = item['subdir'].encode("utf-8")
if prefix:
subdir = subdir[len(prefix):]
body += ' <tr class="item subdir">\n' \
@ -300,13 +301,14 @@ class _StaticWebContext(WSGIContext):
(quote(subdir), cgi.escape(subdir))
for item in listing:
if 'name' in item:
name = get_valid_utf8_str(item['name'])
name = item['name'].encode("utf-8")
if prefix:
name = name[len(prefix):]
content_type = get_valid_utf8_str(item['content_type'])
bytes = get_valid_utf8_str(human_readable(item['bytes']))
last_modified = (cgi.escape(item['last_modified']).
split('.')[0].replace('T', ' '))
content_type = item['content_type'].encode("utf-8")
bytes = human_readable(item['bytes'])
last_modified = (
cgi.escape(item['last_modified'].encode("utf-8")).
split('.')[0].replace('T', ' '))
body += ' <tr class="item %s">\n' \
' <td class="colname"><a href="%s">%s</a></td>\n' \
' <td class="colsize">%s</td>\n' \
@ -315,7 +317,7 @@ class _StaticWebContext(WSGIContext):
(' '.join('type-' + cgi.escape(t.lower(), quote=True)
for t in content_type.split('/')),
quote(name), cgi.escape(name),
bytes, get_valid_utf8_str(last_modified))
bytes, last_modified)
body += ' </table>\n' \
' </body>\n' \
'</html>\n'

View File

@ -447,16 +447,16 @@ class TempAuth(object):
# on ACLs, TempAuth is not such an auth system. At this point,
# it thinks it is authoritative.
if key not in tempauth_acl_keys:
return 'Key %r not recognized' % key
return "Key '%s' not recognized" % key
for key in tempauth_acl_keys:
if key not in result:
continue
if not isinstance(result[key], list):
return 'Value for key %r must be a list' % key
return "Value for key '%s' must be a list" % key
for grantee in result[key]:
if not isinstance(grantee, str):
return 'Elements of %r list must be strings' % key
if not isinstance(grantee, basestring):
return "Elements of '%s' list must be strings" % key
# Everything looks fine, no errors found
internal_hdr = get_sys_meta_prefix('account') + 'core-access-control'

View File

@ -161,7 +161,7 @@ def get_tempurl_keys_from_metadata(meta):
meta = get_account_info(...)['meta']
keys = get_tempurl_keys_from_metadata(meta)
"""
return [get_valid_utf8_str(value) for key, value in meta.iteritems()
return [get_valid_utf8_str(value) for key, value in meta.items()
if key.lower() in ('temp-url-key', 'temp-url-key-2')]

View File

@ -23,7 +23,6 @@ from swob in here without creating circular imports.
import hashlib
import itertools
import time
from contextlib import contextmanager
from urllib import unquote
from swift import gettext_ as _
from swift.common.storage_policy import POLICIES
@ -32,7 +31,8 @@ from swift.common.exceptions import ListingIterError, SegmentError
from swift.common.http import is_success
from swift.common.swob import (HTTPBadRequest, HTTPNotAcceptable,
HTTPServiceUnavailable)
from swift.common.utils import split_path, validate_device_partition
from swift.common.utils import split_path, validate_device_partition, \
close_if_possible
from swift.common.wsgi import make_subrequest
@ -249,26 +249,6 @@ def copy_header_subset(from_r, to_r, condition):
to_r.headers[k] = v
def close_if_possible(maybe_closable):
close_method = getattr(maybe_closable, 'close', None)
if callable(close_method):
return close_method()
@contextmanager
def closing_if_possible(maybe_closable):
"""
Like contextlib.closing(), but doesn't crash if the object lacks a close()
method.
PEP 333 (WSGI) says: "If the iterable returned by the application has a
close() method, the server or gateway must call that method upon
completion of the current request[.]" This function makes that easier.
"""
yield maybe_closable
close_if_possible(maybe_closable)
class SegmentedIterable(object):
"""
Iterable that returns the object contents for a large object.
@ -304,6 +284,7 @@ class SegmentedIterable(object):
self.peeked_chunk = None
self.app_iter = self._internal_iter()
self.validated_first_segment = False
self.current_resp = None
def _internal_iter(self):
start_time = time.time()
@ -360,6 +341,8 @@ class SegmentedIterable(object):
'r_size': seg_resp.content_length,
's_etag': seg_etag,
's_size': seg_size})
else:
self.current_resp = seg_resp
seg_hash = hashlib.md5()
for chunk in seg_resp.app_iter:
@ -420,7 +403,7 @@ class SegmentedIterable(object):
self.validated_first_segment = True
try:
self.peeked_chunk = self.app_iter.next()
self.peeked_chunk = next(self.app_iter)
except StopIteration:
pass
@ -431,3 +414,11 @@ class SegmentedIterable(object):
return itertools.chain([pc], self.app_iter)
else:
return self.app_iter
def close(self):
"""
Called when the client disconnect. Ensure that the connection to the
backend server is closed.
"""
if self.current_resp:
close_if_possible(self.current_resp.app_iter)

View File

@ -21,9 +21,11 @@ import logging
import math
import random
import cPickle as pickle
from copy import deepcopy
from array import array
from collections import defaultdict
from six.moves import range
from time import time
from swift.common import exceptions
@ -79,6 +81,7 @@ class RingBuilder(object):
self.devs_changed = False
self.version = 0
self.overload = 0.0
self._effective_overload = None
# _replica2part2dev maps from replica number to partition number to
# device id. So, for a three replica, 2**23 ring, it's an array of
@ -125,6 +128,12 @@ class RingBuilder(object):
'ring, or all devices have been '
'deleted')
@classmethod
def from_dict(cls, builder_data):
b = cls(1, 1, 1) # Dummy values
b.copy_from(builder_data)
return b
def copy_from(self, builder):
"""
Reinitializes this RingBuilder instance from data obtained from the
@ -173,6 +182,9 @@ class RingBuilder(object):
for dev in self._iter_devs():
dev.setdefault("region", 1)
def __deepcopy__(self, memo):
return type(self).from_dict(deepcopy(self.to_dict(), memo))
def to_dict(self):
"""
Returns a dict that can be used later with copy_from to
@ -369,6 +381,11 @@ class RingBuilder(object):
if seed is not None:
random.seed(seed)
self._effective_overload = min(self.overload,
self.get_required_overload())
self.logger.debug("Using effective overload of %f",
self._effective_overload)
self._ring = None
if self._last_part_moves_epoch is None:
self.logger.debug("New builder; performing initial balance")
@ -390,7 +407,8 @@ class RingBuilder(object):
while True:
reassign_parts = self._gather_reassign_parts()
changed_parts += len(reassign_parts)
self.logger.debug("Gathered %d parts", changed_parts)
self.logger.debug("Gathered %d parts thus far (%d this pass)",
changed_parts, len(reassign_parts))
self._reassign_parts(reassign_parts)
self.logger.debug("Assigned %d parts", changed_parts)
while self._remove_devs:
@ -524,7 +542,7 @@ class RingBuilder(object):
if stats:
# dev_usage[dev_id] will equal the number of partitions assigned to
# that device.
dev_usage = array('I', (0 for _junk in xrange(dev_len)))
dev_usage = array('I', (0 for _junk in range(dev_len)))
for part2dev in self._replica2part2dev:
for dev_id in part2dev:
dev_usage[dev_id] += 1
@ -591,13 +609,158 @@ class RingBuilder(object):
balance = dev_balance
return balance
def get_required_overload(self):
"""
Returns the minimum overload value required to make the ring maximally
dispersed.
"""
self.logger.debug("computing required overload")
tfd, sibling_tiers = self._compute_sibling_tiers()
max_allowed_replicas = self._build_max_replicas_by_tier()
# We're computing a bunch of different things here, but iterating
# over all the devs once is more efficient than doing it a bunch of
# times.
all_tiers = set([()])
tier_weight = defaultdict(float)
total_weight = 0.0
tier2children = defaultdict(set)
for dev in self._iter_devs():
dev_weight = dev['weight']
total_weight += dev_weight
for tier in tfd[dev['id']]:
all_tiers.add(tier)
tier_weight[tier] += dev_weight
tier2children[tier[:-1]].add(tier)
tier_weight[()] = total_weight
max_required_overload = 0.0
for tier in all_tiers:
if tier not in tier2children:
continue
if tier_weight[tier] <= 0:
continue
# Example 1: Consider a 3-replica cluster with 2 regions. If one
# region has more than 2/3 the total weight, then (ignoring
# overload) some partitions will reside entirely in the big
# region.
#
# Example 2: Consider a 3-replica cluster with 3 regions. If any
# region has more than 1/3 the total weight, some partitions will
# not have replicas spread across all regions.
#
# Example 3: Consider a 3-replica cluster with 4 regions. If any
# region has more than 1/3 the total weight, some partitions will
# not have replicas spread across all regions.
#
# Example 4: Consider a 3-replica cluster with 100 regions. If
# any region has more than 1/3 the total weight, some partitions
# will not have replicas spread across all regions. The fact
# that there's 100 regions doesn't matter; if one region is big
# enough, it'll get multiple replicas of some partitions.
#
# Example 5: Consider a 5-replica cluster with 2 regions. If the
# bigger region has more than 3/5 the weight, some partitions
# will have more than 3 replicas in the big region. (Optimal
# dispersion is 3 replicas in some region and 2 in the other; 4
# and 1 is not good enough.)
#
# In general, what we do is split this tier's child tiers
# into two groups: "big" and "small". "Big" child tiers are
# ones whose weight exceeds their fraction of the replicas.
# For example, given 3 replicas and 4 zones of total weight
# 12,000, a zone with weight greater than 1/3 of 12,000 (=
# 4,000) would be considered big. "Small" child tiers are
# those which are not big.
#
# Once we've divided the child tiers into big and small, we
# figure out how many replicas should wind up on the small
# child tiers (all together), and then compute the needed
# overload factor to boost their weights so they can take
# that many replicas.
child_tiers = tier2children[tier]
tier_replicas = max_allowed_replicas[tier]
big_child_count = small_child_count = 0
big_child_weight = small_child_weight = 0.0
max_child_replicas = math.ceil(tier_replicas / len(child_tiers))
bigness_threshold = (
max_child_replicas / tier_replicas * tier_weight[tier])
for child_tier in child_tiers:
child_weight = tier_weight[child_tier]
if child_weight == 0:
# If it's got 0 weight, it's not taking any
# partitions at all, so it doesn't count.
continue
if child_weight >= bigness_threshold:
big_child_count += 1
big_child_weight += child_weight
else:
small_child_count += 1
small_child_weight += child_weight
if big_child_count == 0 or small_child_count == 0:
# We only need overload if we have both big and small
# tiers. Usually, all small tiers means things can
# balance, while all big tiers means that we have
# exactly one child tier (e.g. a cluster with only one
# region).
continue
# We assume each big child tier takes the maximum possible
# number of replicas for optimal dispersion, but no more.
# That leaves the remainder for the small child tiers.
big_child_replicas = max_child_replicas * big_child_count
small_child_replicas = tier_replicas - big_child_replicas
if small_child_replicas == 0:
# If we're not putting any replicas on small child
# tiers, then there's no need for overload. This also
# avoids a division-by-zero below.
continue
# We want the overloaded small tiers to take up their fair
# share of the replicas. We can express this as follows:
#
# Let Ws be the sum of the weights of the small child tiers.
#
# Let Wb be the sum of the weights of the big child tiers.
#
# Let Rt be the number of replicas at the current tier.
#
# Let Rs be the desired number of replicas for the small
# child tiers.
#
# Let L be the overload.
#
# Then, we have the following:
#
# (L * Ws) / (Wb + L * Ws) = Rs / Rt
#
# Solving for L, we get:
#
# L = 1 / (Ws / Wb * (Rt / Rs - 1))
required_overload = 1.0 / (
(small_child_weight / big_child_weight)
* (tier_replicas / small_child_replicas - 1)) - 1
if required_overload > max_required_overload:
self.logger.debug("Required overload for %r is %f [NEW HIGH]",
tier, required_overload)
max_required_overload = required_overload
else:
self.logger.debug("Required overload for %r is %f",
tier, required_overload)
return max_required_overload
def pretend_min_part_hours_passed(self):
"""
Override min_part_hours by marking all partitions as having been moved
255 hours ago. This can be used to force a full rebalance on the next
call to rebalance.
"""
for part in xrange(self.parts):
for part in range(self.parts):
self._last_part_moves[part] = 0xff
def get_part_devices(self, part):
@ -632,6 +795,8 @@ class RingBuilder(object):
used to sort the devices according to "most wanted" during rebalancing
to best distribute partitions. A negative parts_wanted indicates the
device is "overweight" and wishes to give partitions away if possible.
Note: parts_wanted does *not* consider overload.
"""
weight_of_one_part = self.weight_of_one_part()
@ -703,12 +868,12 @@ class RingBuilder(object):
if len(part2dev) < desired_length:
# Not long enough: needs to be extended and the
# newly-added pieces assigned to devices.
for part in xrange(len(part2dev), desired_length):
for part in range(len(part2dev), desired_length):
to_assign[part].append(replica)
part2dev.append(0)
elif len(part2dev) > desired_length:
# Too long: truncate this mapping.
for part in xrange(desired_length, len(part2dev)):
for part in range(desired_length, len(part2dev)):
dev_losing_part = self.devs[part2dev[part]]
dev_losing_part['parts'] -= 1
removed_replicas += 1
@ -716,10 +881,10 @@ class RingBuilder(object):
else:
# Mapping not present at all: make one up and assign
# all of it.
for part in xrange(desired_length):
for part in range(desired_length):
to_assign[part].append(replica)
self._replica2part2dev.append(
array('H', (0 for _junk in xrange(desired_length))))
array('H', (0 for _junk in range(desired_length))))
return (to_assign.items(), removed_replicas)
@ -728,7 +893,7 @@ class RingBuilder(object):
Initial partition assignment is the same as rebalancing an
existing ring, but with some initial setup beforehand.
"""
self._last_part_moves = array('B', (0 for _junk in xrange(self.parts)))
self._last_part_moves = array('B', (0 for _junk in range(self.parts)))
self._last_part_moves_epoch = int(time())
self._set_parts_wanted()
@ -741,7 +906,7 @@ class RingBuilder(object):
more recently than min_part_hours.
"""
elapsed_hours = int(time() - self._last_part_moves_epoch) / 3600
for part in xrange(self.parts):
for part in range(self.parts):
# The "min(self._last_part_moves[part] + elapsed_hours, 0xff)"
# which was here showed up in profiling, so it got inlined.
last_plus_elapsed = self._last_part_moves[part] + elapsed_hours
@ -756,29 +921,30 @@ class RingBuilder(object):
Returns a dict of (tier: available parts in other tiers) for all tiers
in the ring.
Devices that have too much partitions (negative parts_wanted) are
ignored, otherwise the sum of all parts_wanted is 0 +/- rounding
errors.
Devices that have too many partitions (negative parts_wanted plus
overload) are ignored, otherwise the sum of all returned values is 0
+/- rounding errors.
This takes overload into account.
"""
wanted_parts_for_tier = {}
for dev in self._iter_devs():
pw = (max(0, dev['parts_wanted']) +
max(int(math.ceil(
(dev['parts_wanted'] + dev['parts']) * self.overload)),
0))
extra_overload_parts = self._n_overload_parts(dev)
pw = max(dev['parts_wanted'] + extra_overload_parts, 0)
for tier in tiers_for_dev(dev):
wanted_parts_for_tier.setdefault(tier, 0)
wanted_parts_for_tier[tier] += pw
return wanted_parts_for_tier
def _gather_reassign_parts(self):
def _compute_sibling_tiers(self):
"""
Returns a list of (partition, replicas) pairs to be reassigned by
gathering from removed devices, insufficiently-far-apart replicas, and
overweight drives.
Returns a 2-tuple; the first value is a dictionary mapping each
device's id to its tiers, and the second is a dictionary mapping
a-tier: list-of-sibling-tiers.
"""
# inline memoization of tiers_for_dev() results (profiling reveals it
# as a hot-spot).
# as a hot-spot). We also return it so callers don't have to
# rebuild it.
tfd = {}
tiers_by_len = defaultdict(set)
@ -796,6 +962,15 @@ class RingBuilder(object):
for i, tier in enumerate(tiers):
sibling_tiers[tier] = [t for t in (tiers[:i] + tiers[(i + 1):])
if t[:-1] == tier[:-1]]
return (tfd, sibling_tiers)
def _gather_reassign_parts(self):
"""
Returns a list of (partition, replicas) pairs to be reassigned by
gathering from removed devices, insufficiently-far-apart replicas, and
overweight drives.
"""
tfd, sibling_tiers = self._compute_sibling_tiers()
# First we gather partitions from removed devices. Since removed
# devices usually indicate device failures, we have no choice but to
@ -820,7 +995,7 @@ class RingBuilder(object):
max_allowed_replicas = self._build_max_replicas_by_tier()
wanted_parts_for_tier = self._get_available_parts()
moved_parts = 0
for part in xrange(self.parts):
for part in range(self.parts):
# Only move one replica at a time if possible.
if part in removed_dev_parts:
continue
@ -906,20 +1081,22 @@ class RingBuilder(object):
start += random.randint(0, self.parts / 2) # GRAH PEP8!!!
self._last_part_gather_start = start
for replica, part2dev in enumerate(self._replica2part2dev):
# If we've got a partial replica, start may be out of
# range. Scale it down so that we get a similar movement
# pattern (but scaled down) on sequential runs.
this_start = int(float(start) * len(part2dev) / self.parts)
for part in itertools.chain(xrange(this_start, len(part2dev)),
xrange(0, this_start)):
for part in itertools.chain(range(this_start, len(part2dev)),
range(0, this_start)):
if self._last_part_moves[part] < self.min_part_hours:
continue
if part in removed_dev_parts or part in spread_out_parts:
continue
dev = self.devs[part2dev[part]]
if dev['parts_wanted'] < 0:
fudge = self._n_overload_parts(dev)
if dev['parts_wanted'] + fudge < 0:
self._last_part_moves[part] = 0
dev['parts_wanted'] += 1
dev['parts'] -= 1
@ -931,7 +1108,7 @@ class RingBuilder(object):
reassign_parts.update(spread_out_parts)
reassign_parts.update(removed_dev_parts)
reassign_parts_list = list(reassign_parts.iteritems())
reassign_parts_list = list(reassign_parts.items())
# We shuffle the partitions to reassign so we get a more even
# distribution later. There has been discussion of trying to distribute
# partitions more "regularly" because that would actually reduce risk
@ -942,6 +1119,14 @@ class RingBuilder(object):
random.shuffle(reassign_parts_list)
return reassign_parts_list
def _n_overload_parts(self, dev):
"""
The number of extra partitions a device can take due to overload.
"""
return max(int(math.ceil(
(dev['parts_wanted'] + dev['parts'])
* self._effective_overload)), 0)
def _reassign_parts(self, reassign_parts):
"""
For an existing ring data set, partitions are reassigned similarly to
@ -981,9 +1166,7 @@ class RingBuilder(object):
# with partitions to shed, which is any time a device is being
# removed, which is a pretty frequent operation.
wanted = max(dev['parts_wanted'], 0)
fudge = max(int(math.ceil(
(dev['parts_wanted'] + dev['parts']) * self.overload)),
0)
fudge = self._n_overload_parts(dev)
for tier in tiers:
fudge_available_in_tier[tier] += (wanted + fudge)
parts_available_in_tier[tier] += wanted
@ -1260,7 +1443,7 @@ class RingBuilder(object):
Generator yielding every (partition, replica) pair in the ring.
"""
for replica, part2dev in enumerate(self._replica2part2dev):
for part in xrange(len(part2dev)):
for part in range(len(part2dev)):
yield (part, replica)
@classmethod

View File

@ -27,6 +27,8 @@ from hashlib import md5
from itertools import chain
from tempfile import NamedTemporaryFile
from six.moves import range
from swift.common.utils import hash_path, validate_configuration, json
from swift.common.ring.utils import tiers_for_dev
@ -44,22 +46,42 @@ class RingData(object):
dev.setdefault("region", 1)
@classmethod
def deserialize_v1(cls, gz_file):
def deserialize_v1(cls, gz_file, metadata_only=False):
"""
Deserialize a v1 ring file into a dictionary with `devs`, `part_shift`,
and `replica2part2dev_id` keys.
If the optional kwarg `metadata_only` is True, then the
`replica2part2dev_id` is not loaded and that key in the returned
dictionary just has the value `[]`.
:param file gz_file: An opened file-like object which has already
consumed the 6 bytes of magic and version.
:param bool metadata_only: If True, only load `devs` and `part_shift`
:returns: A dict containing `devs`, `part_shift`, and
`replica2part2dev_id`
"""
json_len, = struct.unpack('!I', gz_file.read(4))
ring_dict = json.loads(gz_file.read(json_len))
ring_dict['replica2part2dev_id'] = []
if metadata_only:
return ring_dict
partition_count = 1 << (32 - ring_dict['part_shift'])
for x in xrange(ring_dict['replica_count']):
for x in range(ring_dict['replica_count']):
ring_dict['replica2part2dev_id'].append(
array.array('H', gz_file.read(2 * partition_count)))
return ring_dict
@classmethod
def load(cls, filename):
def load(cls, filename, metadata_only=False):
"""
Load ring data from a file.
:param filename: Path to a file serialized by the save() method.
:param bool metadata_only: If True, only load `devs` and `part_shift`.
:returns: A RingData instance containing the loaded data.
"""
gz_file = GzipFile(filename, 'rb')
@ -70,15 +92,18 @@ class RingData(object):
# See if the file is in the new format
magic = gz_file.read(4)
if magic == 'R1NG':
version, = struct.unpack('!H', gz_file.read(2))
if version == 1:
ring_data = cls.deserialize_v1(gz_file)
format_version, = struct.unpack('!H', gz_file.read(2))
if format_version == 1:
ring_data = cls.deserialize_v1(
gz_file, metadata_only=metadata_only)
else:
raise Exception('Unknown ring format version %d' % version)
raise Exception('Unknown ring format version %d' %
format_version)
else:
# Assume old-style pickled ring
gz_file.seek(0)
ring_data = pickle.load(gz_file)
if not hasattr(ring_data, 'devs'):
ring_data = RingData(ring_data['replica2part2dev_id'],
ring_data['devs'], ring_data['part_shift'])
@ -179,18 +204,17 @@ class Ring(object):
# doing it on every call to get_more_nodes().
regions = set()
zones = set()
ip_ports = set()
ips = set()
self._num_devs = 0
for dev in self._devs:
if dev:
regions.add(dev['region'])
zones.add((dev['region'], dev['zone']))
ip_ports.add((dev['region'], dev['zone'],
dev['ip'], dev['port']))
ips.add((dev['region'], dev['zone'], dev['ip']))
self._num_devs += 1
self._num_regions = len(regions)
self._num_zones = len(zones)
self._num_ip_ports = len(ip_ports)
self._num_ips = len(ips)
def _rebuild_tier_data(self):
self.tier2devs = defaultdict(list)
@ -329,8 +353,8 @@ class Ring(object):
used = set(d['id'] for d in primary_nodes)
same_regions = set(d['region'] for d in primary_nodes)
same_zones = set((d['region'], d['zone']) for d in primary_nodes)
same_ip_ports = set((d['region'], d['zone'], d['ip'], d['port'])
for d in primary_nodes)
same_ips = set(
(d['region'], d['zone'], d['ip']) for d in primary_nodes)
parts = len(self._replica2part2dev_id[0])
start = struct.unpack_from(
@ -339,9 +363,9 @@ class Ring(object):
# Multiple loops for execution speed; the checks and bookkeeping get
# simpler as you go along
hit_all_regions = len(same_regions) == self._num_regions
for handoff_part in chain(xrange(start, parts, inc),
xrange(inc - ((parts - start) % inc),
start, inc)):
for handoff_part in chain(range(start, parts, inc),
range(inc - ((parts - start) % inc),
start, inc)):
if hit_all_regions:
# At this point, there are no regions left untouched, so we
# can stop looking.
@ -356,17 +380,17 @@ class Ring(object):
used.add(dev_id)
same_regions.add(region)
zone = dev['zone']
ip_port = (region, zone, dev['ip'], dev['port'])
ip = (region, zone, dev['ip'])
same_zones.add((region, zone))
same_ip_ports.add(ip_port)
same_ips.add(ip)
if len(same_regions) == self._num_regions:
hit_all_regions = True
break
hit_all_zones = len(same_zones) == self._num_zones
for handoff_part in chain(xrange(start, parts, inc),
xrange(inc - ((parts - start) % inc),
start, inc)):
for handoff_part in chain(range(start, parts, inc),
range(inc - ((parts - start) % inc),
start, inc)):
if hit_all_zones:
# Much like we stopped looking for fresh regions before, we
# can now stop looking for fresh zones; there are no more.
@ -380,17 +404,17 @@ class Ring(object):
yield dev
used.add(dev_id)
same_zones.add(zone)
ip_port = zone + (dev['ip'], dev['port'])
same_ip_ports.add(ip_port)
ip = zone + (dev['ip'],)
same_ips.add(ip)
if len(same_zones) == self._num_zones:
hit_all_zones = True
break
hit_all_ip_ports = len(same_ip_ports) == self._num_ip_ports
for handoff_part in chain(xrange(start, parts, inc),
xrange(inc - ((parts - start) % inc),
start, inc)):
if hit_all_ip_ports:
hit_all_ips = len(same_ips) == self._num_ips
for handoff_part in chain(range(start, parts, inc),
range(inc - ((parts - start) % inc),
start, inc)):
if hit_all_ips:
# We've exhausted the pool of unused backends, so stop
# looking.
break
@ -398,20 +422,19 @@ class Ring(object):
if handoff_part < len(part2dev_id):
dev_id = part2dev_id[handoff_part]
dev = self._devs[dev_id]
ip_port = (dev['region'], dev['zone'],
dev['ip'], dev['port'])
if dev_id not in used and ip_port not in same_ip_ports:
ip = (dev['region'], dev['zone'], dev['ip'])
if dev_id not in used and ip not in same_ips:
yield dev
used.add(dev_id)
same_ip_ports.add(ip_port)
if len(same_ip_ports) == self._num_ip_ports:
hit_all_ip_ports = True
same_ips.add(ip)
if len(same_ips) == self._num_ips:
hit_all_ips = True
break
hit_all_devs = len(used) == self._num_devs
for handoff_part in chain(xrange(start, parts, inc),
xrange(inc - ((parts - start) % inc),
start, inc)):
for handoff_part in chain(range(start, parts, inc),
range(inc - ((parts - start) % inc),
start, inc)):
if hit_all_devs:
# We've used every device we have, so let's stop looking for
# unused devices now.

View File

@ -29,7 +29,7 @@ def tiers_for_dev(dev):
"""
t1 = dev['region']
t2 = dev['zone']
t3 = "{ip}:{port}".format(ip=dev.get('ip'), port=dev.get('port'))
t3 = dev['ip']
t4 = dev['id']
return ((t1,),
@ -48,40 +48,40 @@ def build_tier_tree(devices):
Example:
region 1 -+---- zone 1 -+---- 192.168.101.1:6000 -+---- device id 0
| | |
| | +---- device id 1
| | |
| | +---- device id 2
region 1 -+---- zone 1 -+---- 192.168.101.1 -+---- device id 0
| | |
| | +---- device id 1
| | |
| | +---- device id 2
| |
| +---- 192.168.101.2:6000 -+---- device id 3
| |
| +---- device id 4
| |
| +---- device id 5
| +---- 192.168.101.2 -+---- device id 3
| |
| +---- device id 4
| |
| +---- device id 5
|
+---- zone 2 -+---- 192.168.102.1:6000 -+---- device id 6
| |
| +---- device id 7
| |
| +---- device id 8
+---- zone 2 -+---- 192.168.102.1 -+---- device id 6
| |
| +---- device id 7
| |
| +---- device id 8
|
+---- 192.168.102.2:6000 -+---- device id 9
|
+---- device id 10
+---- 192.168.102.2 -+---- device id 9
|
+---- device id 10
region 2 -+---- zone 1 -+---- 192.168.201.1:6000 -+---- device id 12
| |
| +---- device id 13
| |
| +---- device id 14
region 2 -+---- zone 1 -+---- 192.168.201.1 -+---- device id 12
| |
| +---- device id 13
| |
| +---- device id 14
|
+---- 192.168.201.2:6000 -+---- device id 15
|
+---- device id 16
|
+---- device id 17
+---- 192.168.201.2 -+---- device id 15
|
+---- device id 16
|
+---- device id 17
The tier tree would look like:
{
@ -90,30 +90,30 @@ def build_tier_tree(devices):
(1,): [(1, 1), (1, 2)],
(2,): [(2, 1)],
(1, 1): [(1, 1, 192.168.101.1:6000),
(1, 1, 192.168.101.2:6000)],
(1, 2): [(1, 2, 192.168.102.1:6000),
(1, 2, 192.168.102.2:6000)],
(2, 1): [(2, 1, 192.168.201.1:6000),
(2, 1, 192.168.201.2:6000)],
(1, 1): [(1, 1, 192.168.101.1),
(1, 1, 192.168.101.2)],
(1, 2): [(1, 2, 192.168.102.1),
(1, 2, 192.168.102.2)],
(2, 1): [(2, 1, 192.168.201.1),
(2, 1, 192.168.201.2)],
(1, 1, 192.168.101.1:6000): [(1, 1, 192.168.101.1:6000, 0),
(1, 1, 192.168.101.1:6000, 1),
(1, 1, 192.168.101.1:6000, 2)],
(1, 1, 192.168.101.2:6000): [(1, 1, 192.168.101.2:6000, 3),
(1, 1, 192.168.101.2:6000, 4),
(1, 1, 192.168.101.2:6000, 5)],
(1, 2, 192.168.102.1:6000): [(1, 2, 192.168.102.1:6000, 6),
(1, 2, 192.168.102.1:6000, 7),
(1, 2, 192.168.102.1:6000, 8)],
(1, 2, 192.168.102.2:6000): [(1, 2, 192.168.102.2:6000, 9),
(1, 2, 192.168.102.2:6000, 10)],
(2, 1, 192.168.201.1:6000): [(2, 1, 192.168.201.1:6000, 12),
(2, 1, 192.168.201.1:6000, 13),
(2, 1, 192.168.201.1:6000, 14)],
(2, 1, 192.168.201.2:6000): [(2, 1, 192.168.201.2:6000, 15),
(2, 1, 192.168.201.2:6000, 16),
(2, 1, 192.168.201.2:6000, 17)],
(1, 1, 192.168.101.1): [(1, 1, 192.168.101.1, 0),
(1, 1, 192.168.101.1, 1),
(1, 1, 192.168.101.1, 2)],
(1, 1, 192.168.101.2): [(1, 1, 192.168.101.2, 3),
(1, 1, 192.168.101.2, 4),
(1, 1, 192.168.101.2, 5)],
(1, 2, 192.168.102.1): [(1, 2, 192.168.102.1, 6),
(1, 2, 192.168.102.1, 7),
(1, 2, 192.168.102.1, 8)],
(1, 2, 192.168.102.2): [(1, 2, 192.168.102.2, 9),
(1, 2, 192.168.102.2, 10)],
(2, 1, 192.168.201.1): [(2, 1, 192.168.201.1, 12),
(2, 1, 192.168.201.1, 13),
(2, 1, 192.168.201.1, 14)],
(2, 1, 192.168.201.2): [(2, 1, 192.168.201.2, 15),
(2, 1, 192.168.201.2, 16),
(2, 1, 192.168.201.2, 17)],
}
:devices: device dicts from which to generate the tree
@ -235,9 +235,14 @@ def is_local_device(my_ips, my_port, dev_ip, dev_port):
Return True if the provided dev_ip and dev_port are among the IP
addresses specified in my_ips and my_port respectively.
To support accurate locality determination in the server-per-port
deployment, when my_port is None, only IP addresses are used for
determining locality (dev_port is ignored).
If dev_ip is a hostname then it is first translated to an IP
address before checking it against my_ips.
"""
candidate_ips = []
if not is_valid_ip(dev_ip) and is_valid_hostname(dev_ip):
try:
# get the ip for this host; use getaddrinfo so that
@ -248,12 +253,19 @@ def is_local_device(my_ips, my_port, dev_ip, dev_port):
dev_ip = addr[4][0] # get the ip-address
if family == socket.AF_INET6:
dev_ip = expand_ipv6(dev_ip)
if dev_ip in my_ips and dev_port == my_port:
return True
return False
candidate_ips.append(dev_ip)
except socket.gaierror:
return False
return dev_ip in my_ips and dev_port == my_port
else:
if is_valid_ipv6(dev_ip):
dev_ip = expand_ipv6(dev_ip)
candidate_ips = [dev_ip]
for dev_ip in candidate_ips:
if dev_ip in my_ips and (my_port is None or dev_port == my_port):
return True
return False
def parse_search_value(search_value):
@ -391,7 +403,7 @@ def parse_search_values_from_opts(opts):
Convert optparse style options into a dictionary for searching.
:param opts: optparse style options
:returns: a dictonary with search values to filter devices,
:returns: a dictionary with search values to filter devices,
supported parameters are id, region, zone, ip, port,
replication_ip, replication_port, device, weight, meta
"""
@ -428,6 +440,100 @@ def parse_change_values_from_opts(opts):
return change_values
def parse_add_value(add_value):
"""
Convert an add value, like 'r1z2-10.1.2.3:7878/sdf', to a dictionary.
If the string does not start with 'r<N>', then the value of 'region' in
the returned dictionary will be None. Callers should check for this and
set a reasonable default. This is done so callers can emit errors or
warnings if desired.
Similarly, 'replication_ip' and 'replication_port' will be None if not
specified.
:returns: dictionary with keys 'region', 'zone', 'ip', 'port', 'device',
'replication_ip', 'replication_port', 'meta'
:raises: ValueError if add_value is malformed
"""
region = None
rest = add_value
if add_value.startswith('r'):
i = 1
while i < len(add_value) and add_value[i].isdigit():
i += 1
region = int(add_value[1:i])
rest = add_value[i:]
if not rest.startswith('z'):
raise ValueError('Invalid add value: %s' % add_value)
i = 1
while i < len(rest) and rest[i].isdigit():
i += 1
zone = int(rest[1:i])
rest = rest[i:]
if not rest.startswith('-'):
raise ValueError('Invalid add value: %s' % add_value)
ip, port, rest = parse_address(rest[1:])
replication_ip = replication_port = None
if rest.startswith('R'):
replication_ip, replication_port, rest = \
parse_address(rest[1:])
if not rest.startswith('/'):
raise ValueError(
'Invalid add value: %s' % add_value)
i = 1
while i < len(rest) and rest[i] != '_':
i += 1
device_name = rest[1:i]
if not validate_device_name(device_name):
raise ValueError('Invalid device name')
rest = rest[i:]
meta = ''
if rest.startswith('_'):
meta = rest[1:]
return {'region': region, 'zone': zone, 'ip': ip, 'port': port,
'device': device_name, 'replication_ip': replication_ip,
'replication_port': replication_port, 'meta': meta}
def parse_address(rest):
if rest.startswith('['):
# remove first [] for ip
rest = rest.replace('[', '', 1).replace(']', '', 1)
pos = 0
while (pos < len(rest) and
not (rest[pos] == 'R' or rest[pos] == '/')):
pos += 1
address = rest[:pos]
rest = rest[pos:]
port_start = address.rfind(':')
if port_start == -1:
raise ValueError('Invalid port in add value')
ip = address[:port_start]
try:
port = int(address[(port_start + 1):])
except (TypeError, ValueError):
raise ValueError(
'Invalid port %s in add value' % address[port_start:])
# if this is an ipv6 address then we want to convert it
# to all lowercase and use its fully expanded representation
# to make searches easier
ip = validate_and_normalize_ip(ip)
return (ip, port, rest)
def validate_args(argvish):
"""
Build OptionParse and validate it whether the format is new command-line

View File

@ -19,7 +19,7 @@ Bindings to the `tee` and `splice` system calls
import os
import operator
import six
import ctypes
import ctypes.util
@ -85,8 +85,8 @@ class Tee(object):
if not self.available:
raise EnvironmentError('tee not available')
if not isinstance(flags, (int, long)):
c_flags = reduce(operator.or_, flags, 0)
if not isinstance(flags, six.integer_types):
c_flags = six.moves.reduce(operator.or_, flags, 0)
else:
c_flags = flags
@ -176,8 +176,8 @@ class Splice(object):
if not self.available:
raise EnvironmentError('splice not available')
if not isinstance(flags, (int, long)):
c_flags = reduce(operator.or_, flags, 0)
if not isinstance(flags, six.integer_types):
c_flags = six.moves.reduce(operator.or_, flags, 0)
else:
c_flags = flags

View File

@ -12,11 +12,14 @@
# limitations under the License.
from ConfigParser import ConfigParser
import textwrap
import os
import string
import textwrap
import six
from swift.common.utils import config_true_value, SWIFT_CONF_FILE
from swift.common.ring import Ring
from swift.common.utils import (
config_true_value, SWIFT_CONF_FILE, whataremyips)
from swift.common.ring import Ring, RingData
from swift.common.utils import quorum_size
from swift.common.exceptions import RingValidationError
from pyeclib.ec_iface import ECDriver, ECDriverError, VALID_EC_TYPES
@ -30,6 +33,54 @@ EC_POLICY = 'erasure_coding'
DEFAULT_EC_OBJECT_SEGMENT_SIZE = 1048576
class BindPortsCache(object):
def __init__(self, swift_dir, bind_ip):
self.swift_dir = swift_dir
self.mtimes_by_ring_path = {}
self.portsets_by_ring_path = {}
self.my_ips = set(whataremyips(bind_ip))
def all_bind_ports_for_node(self):
"""
Given an iterable of IP addresses identifying a storage backend server,
return a set of all bind ports defined in all rings for this storage
backend server.
The caller is responsible for not calling this method (which performs
at least a stat on all ring files) too frequently.
"""
# NOTE: we don't worry about disappearing rings here because you can't
# ever delete a storage policy.
for policy in POLICIES:
# NOTE: we must NOT use policy.load_ring to load the ring. Users
# of this utility function will not need the actual ring data, just
# the bind ports.
#
# This is duplicated with Ring.__init__ just a bit...
serialized_path = os.path.join(self.swift_dir,
policy.ring_name + '.ring.gz')
try:
new_mtime = os.path.getmtime(serialized_path)
except OSError:
continue
old_mtime = self.mtimes_by_ring_path.get(serialized_path)
if not old_mtime or old_mtime != new_mtime:
self.portsets_by_ring_path[serialized_path] = set(
dev['port']
for dev in RingData.load(serialized_path,
metadata_only=True).devs
if dev and dev['ip'] in self.my_ips)
self.mtimes_by_ring_path[serialized_path] = new_mtime
# No "break" here so that the above line will update the
# mtimes_by_ring_path entry for any ring that changes, not just
# the first one we notice.
# Return the requested set of ports from our (now-freshened) cache
return six.moves.reduce(set.union,
self.portsets_by_ring_path.values(), set())
class PolicyError(ValueError):
def __init__(self, msg, index=None):
@ -291,7 +342,7 @@ class ECStoragePolicy(BaseStoragePolicy):
if ec_type not in VALID_EC_TYPES:
raise PolicyError('Wrong ec_type %s for policy %s, should be one'
' of "%s"' % (ec_type, self.name,
', '.join(VALID_EC_TYPES)))
', '.join(VALID_EC_TYPES)))
self._ec_type = ec_type
# Define _ec_ndata as the number of EC data fragments
@ -427,8 +478,9 @@ class ECStoragePolicy(BaseStoragePolicy):
if nodes_configured != (self.ec_ndata + self.ec_nparity):
raise RingValidationError(
'EC ring for policy %s needs to be configured with '
'exactly %d nodes. Got %d.' % (self.name,
self.ec_ndata + self.ec_nparity, nodes_configured))
'exactly %d nodes. Got %d.' % (
self.name, self.ec_ndata + self.ec_nparity,
nodes_configured))
@property
def quorum(self):

View File

@ -49,7 +49,8 @@ import random
import functools
import inspect
from swift.common.utils import reiterate, split_path, Timestamp, pairs
from swift.common.utils import reiterate, split_path, Timestamp, pairs, \
close_if_possible
from swift.common.exceptions import InvalidTimestamp
@ -874,7 +875,7 @@ class Request(object):
elif 'wsgi.input' not in env:
env['wsgi.input'] = WsgiStringIO('')
req = Request(env)
for key, val in headers.iteritems():
for key, val in headers.items():
req.headers[key] = val
for key, val in kwargs.items():
prop = getattr(Request, key, None)
@ -1089,13 +1090,14 @@ def content_range_header(start, stop, size):
def multi_range_iterator(ranges, content_type, boundary, size, sub_iter_gen):
for start, stop in ranges:
yield ''.join(['\r\n--', boundary, '\r\n',
yield ''.join(['--', boundary, '\r\n',
'Content-Type: ', content_type, '\r\n'])
yield content_range_header(start, stop, size) + '\r\n\r\n'
sub_iter = sub_iter_gen(start, stop)
for chunk in sub_iter:
yield chunk
yield '\r\n--' + boundary + '--\r\n'
yield '\r\n'
yield '--' + boundary + '--'
class Response(object):
@ -1139,7 +1141,7 @@ class Response(object):
self.headers.update(headers)
if self.status_int == 401 and 'www-authenticate' not in self.headers:
self.headers.update({'www-authenticate': self.www_authenticate()})
for key, value in kw.iteritems():
for key, value in kw.items():
setattr(self, key, value)
# When specifying both 'content_type' and 'charset' in the kwargs,
# charset needs to be applied *after* content_type, otherwise charset
@ -1177,21 +1179,37 @@ class Response(object):
self.content_type = ''.join(['multipart/byteranges;',
'boundary=', self.boundary])
# This section calculate the total size of the targeted response
# The value 12 is the length of total bytes of hyphen, new line
# form feed for each section header. The value 8 is the length of
# total bytes of hyphen, new line, form feed characters for the
# closing boundary which appears only once
section_header_fixed_len = 12 + (len(self.boundary) +
len('Content-Type: ') +
len(content_type) +
len('Content-Range: bytes '))
# This section calculates the total size of the response.
section_header_fixed_len = (
# --boundary\r\n
len(self.boundary) + 4
# Content-Type: <type>\r\n
+ len('Content-Type: ') + len(content_type) + 2
# Content-Range: <value>\r\n; <value> accounted for later
+ len('Content-Range: ') + 2
# \r\n at end of headers
+ 2)
body_size = 0
for start, end in ranges:
body_size += section_header_fixed_len
body_size += len(str(start) + '-' + str(end - 1) + '/' +
str(content_size)) + (end - start)
body_size += 8 + len(self.boundary)
# length of the value of Content-Range, not including the \r\n
# since that's already accounted for
cr = content_range_header_value(start, end, content_size)
body_size += len(cr)
# the actual bytes (note: this range is half-open, i.e. begins
# with byte <start> and ends with byte <end - 1>, so there's no
# fencepost error here)
body_size += (end - start)
# \r\n prior to --boundary
body_size += 2
# --boundary-- terminates the message
body_size += len(self.boundary) + 4
self.content_length = body_size
self.content_range = None
return content_size, content_type
@ -1203,12 +1221,14 @@ class Response(object):
etag in self.request.if_none_match:
self.status = 304
self.content_length = 0
close_if_possible(app_iter)
return ['']
if etag and self.request.if_match and \
etag not in self.request.if_match:
self.status = 412
self.content_length = 0
close_if_possible(app_iter)
return ['']
if self.status_int == 404 and self.request.if_match \
@ -1219,18 +1239,21 @@ class Response(object):
# Failed) response. [RFC 2616 section 14.24]
self.status = 412
self.content_length = 0
close_if_possible(app_iter)
return ['']
if self.last_modified and self.request.if_modified_since \
and self.last_modified <= self.request.if_modified_since:
self.status = 304
self.content_length = 0
close_if_possible(app_iter)
return ['']
if self.last_modified and self.request.if_unmodified_since \
and self.last_modified > self.request.if_unmodified_since:
self.status = 412
self.content_length = 0
close_if_possible(app_iter)
return ['']
if self.request and self.request.method == 'HEAD':
@ -1244,6 +1267,7 @@ class Response(object):
if ranges == []:
self.status = 416
self.content_length = 0
close_if_possible(app_iter)
return ['']
elif ranges:
range_size = len(ranges)

View File

@ -25,6 +25,7 @@ import operator
import os
import pwd
import re
import rfc822
import sys
import threading as stdlib_threading
import time
@ -63,6 +64,7 @@ import netifaces
import codecs
utf8_decoder = codecs.getdecoder('utf-8')
utf8_encoder = codecs.getencoder('utf-8')
from six.moves import range
from swift import gettext_ as _
import swift.common.exceptions
@ -224,7 +226,7 @@ def register_swift_info(name='swift', admin=False, **kwargs):
if "." in name:
raise ValueError('Cannot use "." in a swift_info key: %s' % name)
dict_to_use[name] = {}
for key, val in kwargs.iteritems():
for key, val in kwargs.items():
if "." in key:
raise ValueError('Cannot use "." in a swift_info key: %s' % key)
dict_to_use[name][key] = val
@ -458,7 +460,7 @@ class FileLikeIter(object):
def next(self):
"""
x.next() -> the next value, or raise StopIteration
next(x) -> the next value, or raise StopIteration
"""
if self.closed:
raise ValueError('I/O operation on closed file')
@ -467,7 +469,7 @@ class FileLikeIter(object):
self.buf = None
return rv
else:
return self.iterator.next()
return next(self.iterator)
def read(self, size=-1):
"""
@ -488,7 +490,7 @@ class FileLikeIter(object):
self.buf = None
else:
try:
chunk = self.iterator.next()
chunk = next(self.iterator)
except StopIteration:
return ''
if len(chunk) > size:
@ -1026,7 +1028,7 @@ class RateLimitedIterator(object):
else:
self.running_time = ratelimit_sleep(self.running_time,
self.elements_per_second)
return self.iterator.next()
return next(self.iterator)
class GreenthreadSafeIterator(object):
@ -1049,7 +1051,7 @@ class GreenthreadSafeIterator(object):
def next(self):
with self.semaphore:
return self.unsafe_iter.next()
return next(self.unsafe_iter)
class NullLogger(object):
@ -1588,7 +1590,7 @@ def get_hub():
return None
def drop_privileges(user):
def drop_privileges(user, call_setsid=True):
"""
Sets the userid/groupid of the current process, get session leader, etc.
@ -1601,10 +1603,11 @@ def drop_privileges(user):
os.setgid(user[3])
os.setuid(user[2])
os.environ['HOME'] = user[5]
try:
os.setsid()
except OSError:
pass
if call_setsid:
try:
os.setsid()
except OSError:
pass
os.chdir('/') # in case you need to rmdir on where you started the daemon
os.umask(0o22) # ensure files are created with the correct privileges
@ -1705,12 +1708,28 @@ def expand_ipv6(address):
return socket.inet_ntop(socket.AF_INET6, packed_ip)
def whataremyips():
def whataremyips(bind_ip=None):
"""
Get the machine's ip addresses
Get "our" IP addresses ("us" being the set of services configured by
one *.conf file). If our REST listens on a specific address, return it.
Otherwise, if listen on '0.0.0.0' or '::' return all addresses, including
the loopback.
:param str bind_ip: Optional bind_ip from a config file; may be IP address
or hostname.
:returns: list of Strings of ip addresses
"""
if bind_ip:
# See if bind_ip is '0.0.0.0'/'::'
try:
_, _, _, _, sockaddr = socket.getaddrinfo(
bind_ip, None, 0, socket.SOCK_STREAM, 0,
socket.AI_NUMERICHOST)[0]
if sockaddr[0] not in ('0.0.0.0', '::'):
return [bind_ip]
except socket.gaierror:
pass
addresses = []
for interface in netifaces.interfaces():
try:
@ -2273,7 +2292,7 @@ class GreenAsyncPile(object):
try:
with GreenAsyncPileWaitallTimeout(timeout):
while True:
results.append(self.next())
results.append(next(self))
except (GreenAsyncPileWaitallTimeout, StopIteration):
pass
return results
@ -2935,7 +2954,7 @@ class ThreadPool(object):
_raw_rpipe, self.wpipe = os.pipe()
self.rpipe = greenio.GreenPipe(_raw_rpipe, 'rb', bufsize=0)
for _junk in xrange(nthreads):
for _junk in range(nthreads):
thr = stdlib_threading.Thread(
target=self._worker,
args=(self._run_queue, self._result_queue))
@ -3143,6 +3162,28 @@ def ismount_raw(path):
return False
def close_if_possible(maybe_closable):
close_method = getattr(maybe_closable, 'close', None)
if callable(close_method):
return close_method()
@contextmanager
def closing_if_possible(maybe_closable):
"""
Like contextlib.closing(), but doesn't crash if the object lacks a close()
method.
PEP 333 (WSGI) says: "If the iterable returned by the application has a
close() method, the server or gateway must call that method upon
completion of the current request[.]" This function makes that easier.
"""
try:
yield maybe_closable
finally:
close_if_possible(maybe_closable)
_rfc_token = r'[^()<>@,;:\"/\[\]?={}\x00-\x20\x7f]+'
_rfc_extension_pattern = re.compile(
r'(?:\s*;\s*(' + _rfc_token + r")\s*(?:=\s*(" + _rfc_token +
@ -3181,7 +3222,7 @@ def parse_content_type(content_type):
('text/plain', [('charset, 'UTF-8'), ('level', '1')])
:param content_type: content_type to parse
:returns: a typle containing (content type, list of k, v parameter tuples)
:returns: a tuple containing (content type, list of k, v parameter tuples)
"""
parm_list = []
if ';' in content_type:
@ -3313,7 +3354,9 @@ class _MultipartMimeFileLikeObject(object):
def iter_multipart_mime_documents(wsgi_input, boundary, read_chunk_size=4096):
"""
Given a multi-part-mime-encoded input file object and boundary,
yield file-like objects for each part.
yield file-like objects for each part. Note that this does not
split each part into headers and body; the caller is responsible
for doing that if necessary.
:param wsgi_input: The file-like object to read from.
:param boundary: The mime boundary to separate new file-like
@ -3324,6 +3367,9 @@ def iter_multipart_mime_documents(wsgi_input, boundary, read_chunk_size=4096):
boundary = '--' + boundary
blen = len(boundary) + 2 # \r\n
got = wsgi_input.readline(blen)
while got == '\r\n':
got = wsgi_input.readline(blen)
if got.strip() != boundary:
raise swift.common.exceptions.MimeInvalid(
'invalid starting boundary: wanted %r, got %r', (boundary, got))
@ -3338,6 +3384,174 @@ def iter_multipart_mime_documents(wsgi_input, boundary, read_chunk_size=4096):
input_buffer = it.input_buffer
def mime_to_document_iters(input_file, boundary, read_chunk_size=4096):
"""
Takes a file-like object containing a multipart MIME document and
returns an iterator of (headers, body-file) tuples.
:param input_file: file-like object with the MIME doc in it
:param boundary: MIME boundary, sans dashes
(e.g. "divider", not "--divider")
:param read_chunk_size: size of strings read via input_file.read()
"""
doc_files = iter_multipart_mime_documents(input_file, boundary,
read_chunk_size)
for i, doc_file in enumerate(doc_files):
# this consumes the headers and leaves just the body in doc_file
headers = rfc822.Message(doc_file, 0)
yield (headers, doc_file)
def document_iters_to_multipart_byteranges(ranges_iter, boundary):
"""
Takes an iterator of range iters and yields a multipart/byteranges MIME
document suitable for sending as the body of a multi-range 206 response.
See document_iters_to_http_response_body for parameter descriptions.
"""
divider = "--" + boundary + "\r\n"
terminator = "--" + boundary + "--"
for range_spec in ranges_iter:
start_byte = range_spec["start_byte"]
end_byte = range_spec["end_byte"]
entity_length = range_spec.get("entity_length", "*")
content_type = range_spec["content_type"]
part_iter = range_spec["part_iter"]
part_header = ''.join((
divider,
"Content-Type: ", str(content_type), "\r\n",
"Content-Range: ", "bytes %d-%d/%s\r\n" % (
start_byte, end_byte, entity_length),
"\r\n"
))
yield part_header
for chunk in part_iter:
yield chunk
yield "\r\n"
yield terminator
def document_iters_to_http_response_body(ranges_iter, boundary, multipart,
logger):
"""
Takes an iterator of range iters and turns it into an appropriate
HTTP response body, whether that's multipart/byteranges or not.
This is almost, but not quite, the inverse of
http_response_to_document_iters(). This function only yields chunks of
the body, not any headers.
:param ranges_iter: an iterator of dictionaries, one per range.
Each dictionary must contain at least the following key:
"part_iter": iterator yielding the bytes in the range
Additionally, if multipart is True, then the following other keys
are required:
"start_byte": index of the first byte in the range
"end_byte": index of the last byte in the range
"content_type": value for the range's Content-Type header
Finally, there is one optional key that is used in the
multipart/byteranges case:
"entity_length": length of the requested entity (not necessarily
equal to the response length). If omitted, "*" will be used.
Each part_iter will be exhausted prior to calling next(ranges_iter).
:param boundary: MIME boundary to use, sans dashes (e.g. "boundary", not
"--boundary").
:param multipart: True if the response should be multipart/byteranges,
False otherwise. This should be True if and only if you have 2 or
more ranges.
:param logger: a logger
"""
if multipart:
return document_iters_to_multipart_byteranges(ranges_iter, boundary)
else:
try:
response_body_iter = next(ranges_iter)['part_iter']
except StopIteration:
return ''
# We need to make sure ranges_iter does not get garbage-collected
# before response_body_iter is exhausted. The reason is that
# ranges_iter has a finally block that calls close_swift_conn, and
# so if that finally block fires before we read response_body_iter,
# there's nothing there.
def string_along(useful_iter, useless_iter_iter, logger):
for x in useful_iter:
yield x
try:
next(useless_iter_iter)
except StopIteration:
pass
else:
logger.warn("More than one part in a single-part response?")
return string_along(response_body_iter, ranges_iter, logger)
def multipart_byteranges_to_document_iters(input_file, boundary,
read_chunk_size=4096):
"""
Takes a file-like object containing a multipart/byteranges MIME document
(see RFC 7233, Appendix A) and returns an iterator of (first-byte,
last-byte, length, document-headers, body-file) 5-tuples.
:param input_file: file-like object with the MIME doc in it
:param boundary: MIME boundary, sans dashes
(e.g. "divider", not "--divider")
:param read_chunk_size: size of strings read via input_file.read()
"""
for headers, body in mime_to_document_iters(input_file, boundary,
read_chunk_size):
first_byte, last_byte, length = parse_content_range(
headers.getheader('content-range'))
yield (first_byte, last_byte, length, headers.items(), body)
def http_response_to_document_iters(response, read_chunk_size=4096):
"""
Takes a successful object-GET HTTP response and turns it into an
iterator of (first-byte, last-byte, length, headers, body-file)
5-tuples.
The response must either be a 200 or a 206; if you feed in a 204 or
something similar, this probably won't work.
:param response: HTTP response, like from bufferedhttp.http_connect(),
not a swob.Response.
"""
if response.status == 200:
# Single "range" that's the whole object
content_length = int(response.getheader('Content-Length'))
return iter([(0, content_length - 1, content_length,
response.getheaders(), response)])
content_type, params_list = parse_content_type(
response.getheader('Content-Type'))
if content_type != 'multipart/byteranges':
# Single range; no MIME framing, just the bytes. The start and end
# byte indices are in the Content-Range header.
start, end, length = parse_content_range(
response.getheader('Content-Range'))
return iter([(start, end, length, response.getheaders(), response)])
else:
# Multiple ranges; the response body is a multipart/byteranges MIME
# document, and we have to parse it using the MIME boundary
# extracted from the Content-Type header.
params = dict(params_list)
return multipart_byteranges_to_document_iters(
response, params['boundary'], read_chunk_size)
#: Regular expression to match form attributes.
ATTRIBUTES_RE = re.compile(r'(\w+)=(".*?"|[^";]+)(; ?|$)')
@ -3355,8 +3569,8 @@ def parse_content_disposition(header):
"""
attributes = {}
attrs = ''
if '; ' in header:
header, attrs = header.split('; ', 1)
if ';' in header:
header, attrs = [x.strip() for x in header.split(';', 1)]
m = True
while m:
m = ATTRIBUTES_RE.match(attrs)

View File

@ -29,12 +29,13 @@ from textwrap import dedent
import eventlet
import eventlet.debug
from eventlet import greenio, GreenPool, sleep, wsgi, listen
from eventlet import greenio, GreenPool, sleep, wsgi, listen, Timeout
from paste.deploy import loadwsgi
from eventlet.green import socket, ssl
from eventlet.green import socket, ssl, os as green_os
from urllib import unquote
from swift.common import utils, constraints
from swift.common.storage_policy import BindPortsCache
from swift.common.swob import Request
from swift.common.utils import capture_stdio, disable_fallocate, \
drop_privileges, get_logger, NullLogger, config_true_value, \
@ -437,10 +438,414 @@ def run_server(conf, logger, sock, global_conf=None):
pool.waitall()
#TODO(clayg): pull more pieces of this to test more
class WorkersStrategy(object):
"""
WSGI server management strategy object for a single bind port and listen
socket shared by a configured number of forked-off workers.
Used in :py:func:`run_wsgi`.
:param dict conf: Server configuration dictionary.
:param logger: The server's :py:class:`~swift.common.utils.LogAdaptor`
object.
"""
def __init__(self, conf, logger):
self.conf = conf
self.logger = logger
self.sock = None
self.children = []
self.worker_count = config_auto_int_value(conf.get('workers'),
CPU_COUNT)
def loop_timeout(self):
"""
:returns: None; to block in :py:func:`green.os.wait`
"""
return None
def bind_ports(self):
"""
Bind the one listen socket for this strategy and drop privileges
(since the parent process will never need to bind again).
"""
try:
self.sock = get_socket(self.conf)
except ConfigFilePortError:
msg = 'bind_port wasn\'t properly set in the config file. ' \
'It must be explicitly set to a valid port number.'
return msg
drop_privileges(self.conf.get('user', 'swift'))
def no_fork_sock(self):
"""
Return a server listen socket if the server should run in the
foreground (no fork).
"""
# Useful for profiling [no forks].
if self.worker_count == 0:
return self.sock
def new_worker_socks(self):
"""
Yield a sequence of (socket, opqaue_data) tuples for each server which
should be forked-off and started.
The opaque_data item for each socket will passed into the
:py:meth:`log_sock_exit` and :py:meth:`register_worker_start` methods
where it will be ignored.
"""
while len(self.children) < self.worker_count:
yield self.sock, None
def post_fork_hook(self):
"""
Perform any initialization in a forked-off child process prior to
starting the wsgi server.
"""
pass
def log_sock_exit(self, sock, _unused):
"""
Log a server's exit.
:param socket sock: The listen socket for the worker just started.
:param _unused: The socket's opaque_data yielded by
:py:meth:`new_worker_socks`.
"""
self.logger.notice('Child %d exiting normally' % os.getpid())
def register_worker_start(self, sock, _unused, pid):
"""
Called when a new worker is started.
:param socket sock: The listen socket for the worker just started.
:param _unused: The socket's opaque_data yielded by new_worker_socks().
:param int pid: The new worker process' PID
"""
self.logger.notice('Started child %s' % pid)
self.children.append(pid)
def register_worker_exit(self, pid):
"""
Called when a worker has exited.
:param int pid: The PID of the worker that exited.
"""
self.logger.error('Removing dead child %s' % pid)
self.children.remove(pid)
def shutdown_sockets(self):
"""
Shutdown any listen sockets.
"""
greenio.shutdown_safe(self.sock)
self.sock.close()
class PortPidState(object):
"""
A helper class for :py:class:`ServersPerPortStrategy` to track listen
sockets and PIDs for each port.
:param int servers_per_port: The configured number of servers per port.
:param logger: The server's :py:class:`~swift.common.utils.LogAdaptor`
"""
def __init__(self, servers_per_port, logger):
self.servers_per_port = servers_per_port
self.logger = logger
self.sock_data_by_port = {}
def sock_for_port(self, port):
"""
:param int port: The port whose socket is desired.
:returns: The bound listen socket for the given port.
"""
return self.sock_data_by_port[port]['sock']
def port_for_sock(self, sock):
"""
:param socket sock: A tracked bound listen socket
:returns: The port the socket is bound to.
"""
for port, sock_data in self.sock_data_by_port.items():
if sock_data['sock'] == sock:
return port
def _pid_to_port_and_index(self, pid):
for port, sock_data in self.sock_data_by_port.items():
for server_idx, a_pid in enumerate(sock_data['pids']):
if pid == a_pid:
return port, server_idx
def port_index_pairs(self):
"""
:returns: A set of (port, server_idx) tuples for currently-tracked
ports, sockets, and PIDs.
"""
current_port_index_pairs = set()
for port, pid_state in self.sock_data_by_port.items():
current_port_index_pairs |= set(
(port, i)
for i, pid in enumerate(pid_state['pids'])
if pid is not None)
return current_port_index_pairs
def track_port(self, port, sock):
"""
Start tracking servers for the given port and listen socket.
:param int port: The port to start tracking
:param socket sock: The bound listen socket for the port.
"""
self.sock_data_by_port[port] = {
'sock': sock,
'pids': [None] * self.servers_per_port,
}
def not_tracking(self, port):
"""
Return True if the specified port is not being tracked.
:param int port: A port to check.
"""
return port not in self.sock_data_by_port
def all_socks(self):
"""
Yield all current listen sockets.
"""
for orphan_data in self.sock_data_by_port.itervalues():
yield orphan_data['sock']
def forget_port(self, port):
"""
Idempotently forget a port, closing the listen socket at most once.
"""
orphan_data = self.sock_data_by_port.pop(port, None)
if orphan_data:
greenio.shutdown_safe(orphan_data['sock'])
orphan_data['sock'].close()
self.logger.notice('Closing unnecessary sock for port %d', port)
def add_pid(self, port, index, pid):
self.sock_data_by_port[port]['pids'][index] = pid
def forget_pid(self, pid):
"""
Idempotently forget a PID. It's okay if the PID is no longer in our
data structure (it could have been removed by the "orphan port" removal
in :py:meth:`new_worker_socks`).
:param int pid: The PID which exited.
"""
port_server_idx = self._pid_to_port_and_index(pid)
if port_server_idx is None:
# This method can lose a race with the "orphan port" removal, when
# a ring reload no longer contains a port. So it's okay if we were
# unable to find a (port, server_idx) pair.
return
dead_port, server_idx = port_server_idx
self.logger.error('Removing dead child %d (PID: %s) for port %s',
server_idx, pid, dead_port)
self.sock_data_by_port[dead_port]['pids'][server_idx] = None
class ServersPerPortStrategy(object):
"""
WSGI server management strategy object for an object-server with one listen
port per unique local port in the storage policy rings. The
`servers_per_port` integer config setting determines how many workers are
run per port.
Used in :py:func:`run_wsgi`.
:param dict conf: Server configuration dictionary.
:param logger: The server's :py:class:`~swift.common.utils.LogAdaptor`
object.
:param int servers_per_port: The number of workers to run per port.
"""
def __init__(self, conf, logger, servers_per_port):
self.conf = conf
self.logger = logger
self.servers_per_port = servers_per_port
self.swift_dir = conf.get('swift_dir', '/etc/swift')
self.ring_check_interval = int(conf.get('ring_check_interval', 15))
self.port_pid_state = PortPidState(servers_per_port, logger)
bind_ip = conf.get('bind_ip', '0.0.0.0')
self.cache = BindPortsCache(self.swift_dir, bind_ip)
def _reload_bind_ports(self):
self.bind_ports = self.cache.all_bind_ports_for_node()
def _bind_port(self, port):
new_conf = self.conf.copy()
new_conf['bind_port'] = port
sock = get_socket(new_conf)
self.port_pid_state.track_port(port, sock)
def loop_timeout(self):
"""
:returns: The time to wait for a child to exit before checking for
reloaded rings (new ports).
"""
return self.ring_check_interval
def bind_ports(self):
"""
Bind one listen socket per unique local storage policy ring port. Then
do all the work of drop_privileges except the actual dropping of
privileges (each forked-off worker will do that post-fork in
:py:meth:`post_fork_hook`).
"""
self._reload_bind_ports()
for port in self.bind_ports:
self._bind_port(port)
# The workers strategy drops privileges here, which we obviously cannot
# do if we want to support binding to low ports. But we do want some
# of the actions that drop_privileges did.
try:
os.setsid()
except OSError:
pass
# In case you need to rmdir where you started the daemon:
os.chdir('/')
# Ensure files are created with the correct privileges:
os.umask(0o22)
def no_fork_sock(self):
"""
This strategy does not support running in the foreground.
"""
pass
def new_worker_socks(self):
"""
Yield a sequence of (socket, server_idx) tuples for each server which
should be forked-off and started.
Any sockets for "orphaned" ports no longer in any ring will be closed
(causing their associated workers to gracefully exit) after all new
sockets have been yielded.
The server_idx item for each socket will passed into the
:py:meth:`log_sock_exit` and :py:meth:`register_worker_start` methods.
"""
self._reload_bind_ports()
desired_port_index_pairs = set(
(p, i) for p in self.bind_ports
for i in range(self.servers_per_port))
current_port_index_pairs = self.port_pid_state.port_index_pairs()
if desired_port_index_pairs != current_port_index_pairs:
# Orphan ports are ports which had object-server processes running,
# but which no longer appear in the ring. We'll kill them after we
# start missing workers.
orphan_port_index_pairs = current_port_index_pairs - \
desired_port_index_pairs
# Fork off worker(s) for every port who's supposed to have
# worker(s) but doesn't
missing_port_index_pairs = desired_port_index_pairs - \
current_port_index_pairs
for port, server_idx in sorted(missing_port_index_pairs):
if self.port_pid_state.not_tracking(port):
try:
self._bind_port(port)
except Exception as e:
self.logger.critical('Unable to bind to port %d: %s',
port, e)
continue
yield self.port_pid_state.sock_for_port(port), server_idx
for orphan_pair in orphan_port_index_pairs:
# For any port in orphan_port_index_pairs, it is guaranteed
# that there should be no listen socket for that port, so we
# can close and forget them.
self.port_pid_state.forget_port(orphan_pair[0])
def post_fork_hook(self):
"""
Called in each child process, prior to starting the actual wsgi server,
to drop privileges.
"""
drop_privileges(self.conf.get('user', 'swift'), call_setsid=False)
def log_sock_exit(self, sock, server_idx):
"""
Log a server's exit.
"""
port = self.port_pid_state.port_for_sock(sock)
self.logger.notice('Child %d (PID %d, port %d) exiting normally',
server_idx, os.getpid(), port)
def register_worker_start(self, sock, server_idx, pid):
"""
Called when a new worker is started.
:param socket sock: The listen socket for the worker just started.
:param server_idx: The socket's server_idx as yielded by
:py:meth:`new_worker_socks`.
:param int pid: The new worker process' PID
"""
port = self.port_pid_state.port_for_sock(sock)
self.logger.notice('Started child %d (PID %d) for port %d',
server_idx, pid, port)
self.port_pid_state.add_pid(port, server_idx, pid)
def register_worker_exit(self, pid):
"""
Called when a worker has exited.
:param int pid: The PID of the worker that exited.
"""
self.port_pid_state.forget_pid(pid)
def shutdown_sockets(self):
"""
Shutdown any listen sockets.
"""
for sock in self.port_pid_state.all_socks():
greenio.shutdown_safe(sock)
sock.close()
def run_wsgi(conf_path, app_section, *args, **kwargs):
"""
Runs the server using the specified number of workers.
Runs the server according to some strategy. The default strategy runs a
specified number of workers in pre-fork model. The object-server (only)
may use a servers-per-port strategy if its config has a servers_per_port
setting with a value greater than zero.
:param conf_path: Path to paste.deploy style configuration file/directory
:param app_section: App name from conf file to load config from
@ -454,17 +859,22 @@ def run_wsgi(conf_path, app_section, *args, **kwargs):
print(e)
return 1
# bind to address and port
try:
sock = get_socket(conf)
except ConfigFilePortError:
msg = 'bind_port wasn\'t properly set in the config file. ' \
'It must be explicitly set to a valid port number.'
logger.error(msg)
print(msg)
servers_per_port = int(conf.get('servers_per_port', '0') or 0)
# NOTE: for now servers_per_port is object-server-only; future work could
# be done to test and allow it to be used for account and container
# servers, but that has not been done yet.
if servers_per_port and app_section == 'object-server':
strategy = ServersPerPortStrategy(
conf, logger, servers_per_port=servers_per_port)
else:
strategy = WorkersStrategy(conf, logger)
error_msg = strategy.bind_ports()
if error_msg:
logger.error(error_msg)
print(error_msg)
return 1
# remaining tasks should not require elevated privileges
drop_privileges(conf.get('user', 'swift'))
# Ensure the configuration and application can be loaded before proceeding.
global_conf = {'log_name': log_name}
@ -479,11 +889,9 @@ def run_wsgi(conf_path, app_section, *args, **kwargs):
# redirect errors to logger and close stdio
capture_stdio(logger)
worker_count = config_auto_int_value(conf.get('workers'), CPU_COUNT)
# Useful for profiling [no forks].
if worker_count == 0:
run_server(conf, logger, sock, global_conf=global_conf)
no_fork_sock = strategy.no_fork_sock()
if no_fork_sock:
run_server(conf, logger, no_fork_sock, global_conf=global_conf)
return 0
def kill_children(*args):
@ -502,32 +910,42 @@ def run_wsgi(conf_path, app_section, *args, **kwargs):
running = [True]
signal.signal(signal.SIGTERM, kill_children)
signal.signal(signal.SIGHUP, hup)
children = []
while running[0]:
while len(children) < worker_count:
for sock, sock_info in strategy.new_worker_socks():
pid = os.fork()
if pid == 0:
signal.signal(signal.SIGHUP, signal.SIG_DFL)
signal.signal(signal.SIGTERM, signal.SIG_DFL)
strategy.post_fork_hook()
run_server(conf, logger, sock)
logger.notice('Child %d exiting normally' % os.getpid())
strategy.log_sock_exit(sock, sock_info)
return 0
else:
logger.notice('Started child %s' % pid)
children.append(pid)
try:
pid, status = os.wait()
if os.WIFEXITED(status) or os.WIFSIGNALED(status):
logger.error('Removing dead child %s' % pid)
children.remove(pid)
except OSError as err:
if err.errno not in (errno.EINTR, errno.ECHILD):
raise
except KeyboardInterrupt:
logger.notice('User quit')
break
greenio.shutdown_safe(sock)
sock.close()
strategy.register_worker_start(sock, sock_info, pid)
# The strategy may need to pay attention to something in addition to
# child process exits (like new ports showing up in a ring).
#
# NOTE: a timeout value of None will just instantiate the Timeout
# object and not actually schedule it, which is equivalent to no
# timeout for the green_os.wait().
loop_timeout = strategy.loop_timeout()
with Timeout(loop_timeout, exception=False):
try:
pid, status = green_os.wait()
if os.WIFEXITED(status) or os.WIFSIGNALED(status):
strategy.register_worker_exit(pid)
except OSError as err:
if err.errno not in (errno.EINTR, errno.ECHILD):
raise
except KeyboardInterrupt:
logger.notice('User quit')
running[0] = False
break
strategy.shutdown_sockets()
logger.notice('Exited')
return 0
@ -613,7 +1031,7 @@ class WSGIContext(object):
return resp
resp = iter(resp)
try:
first_chunk = resp.next()
first_chunk = next(resp)
except StopIteration:
return iter([])
else: # We got a first_chunk

View File

@ -21,6 +21,7 @@ from uuid import uuid4
import time
import cPickle as pickle
from six.moves import range
import sqlite3
from swift.common.utils import Timestamp
@ -698,7 +699,7 @@ class ContainerBroker(DatabaseBroker):
# Get created_at times for objects in item_list that already exist.
# We must chunk it up to avoid sqlite's limit of 999 args.
created_at = {}
for offset in xrange(0, len(item_list), SQLITE_ARG_LIMIT):
for offset in range(0, len(item_list), SQLITE_ARG_LIMIT):
chunk = [rec['name'] for rec in
item_list[offset:offset + SQLITE_ARG_LIMIT]]
created_at.update(

View File

@ -59,7 +59,8 @@ class ContainerReplicator(db_replicator.Replicator):
'storage_policy_index'))
return sync_args
def _handle_sync_response(self, node, response, info, broker, http):
def _handle_sync_response(self, node, response, info, broker, http,
different_region):
parent = super(ContainerReplicator, self)
if is_success(response.status):
remote_info = json.loads(response.data)
@ -74,7 +75,7 @@ class ContainerReplicator(db_replicator.Replicator):
broker.merge_timestamps(*(remote_info[key] for key in
sync_timestamps))
rv = parent._handle_sync_response(
node, response, info, broker, http)
node, response, info, broker, http, different_region)
return rv
def find_local_handoff_for_part(self, part):

View File

@ -367,7 +367,7 @@ class ContainerController(BaseStorageServer):
metadata = {}
metadata.update(
(key, (value, req_timestamp.internal))
for key, value in req.headers.iteritems()
for key, value in req.headers.items()
if key.lower() in self.save_headers or
is_sys_or_user_meta('container', key))
if 'X-Container-Sync-To' in metadata:
@ -406,7 +406,7 @@ class ContainerController(BaseStorageServer):
return HTTPNotFound(request=req, headers=headers)
headers.update(
(key, value)
for key, (value, timestamp) in broker.metadata.iteritems()
for key, (value, timestamp) in broker.metadata.items()
if value != '' and (key.lower() in self.save_headers or
is_sys_or_user_meta('container', key)))
headers['Content-Type'] = out_content_type
@ -473,7 +473,7 @@ class ContainerController(BaseStorageServer):
def create_listing(self, req, out_content_type, info, resp_headers,
metadata, container_list, container):
for key, (value, timestamp) in metadata.iteritems():
for key, (value, timestamp) in metadata.items():
if value and (key.lower() in self.save_headers or
is_sys_or_user_meta('container', key)):
resp_headers[key] = value
@ -547,7 +547,7 @@ class ContainerController(BaseStorageServer):
metadata = {}
metadata.update(
(key, (value, req_timestamp.internal))
for key, value in req.headers.iteritems()
for key, value in req.headers.items()
if key.lower() in self.save_headers or
is_sys_or_user_meta('container', key))
if metadata:

View File

@ -204,7 +204,8 @@ class ContainerSync(Daemon):
#: swift.common.ring.Ring for locating containers.
self.container_ring = container_ring or Ring(self.swift_dir,
ring_name='container')
self._myips = whataremyips()
bind_ip = conf.get('bind_ip', '0.0.0.0')
self._myips = whataremyips(bind_ip)
self._myport = int(conf.get('bind_port', 6001))
swift.common.db.DB_PREALLOCATION = \
config_true_value(conf.get('db_preallocation', 'f'))
@ -321,7 +322,7 @@ class ContainerSync(Daemon):
user_key = None
sync_point1 = info['x_container_sync_point1']
sync_point2 = info['x_container_sync_point2']
for key, (value, timestamp) in broker.metadata.iteritems():
for key, (value, timestamp) in broker.metadata.items():
if key.lower() == 'x-container-sync-to':
sync_to = value
elif key.lower() == 'x-container-sync-key':

View File

@ -121,7 +121,7 @@ class ContainerUpdater(Daemon):
begin = time.time()
now = time.time()
expired_suppressions = \
[a for a, u in self.account_suppressions.iteritems()
[a for a, u in self.account_suppressions.items()
if u < now]
for account in expired_suppressions:
del self.account_suppressions[account]

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -53,8 +53,8 @@ from swift import gettext_ as _
from swift.common.constraints import check_mount, check_dir
from swift.common.request_helpers import is_sys_meta
from swift.common.utils import mkdirs, Timestamp, \
storage_directory, hash_path, renamer, fallocate, fsync, \
fdatasync, drop_buffer_cache, ThreadPool, lock_path, write_pickle, \
storage_directory, hash_path, renamer, fallocate, fsync, fdatasync, \
fsync_dir, drop_buffer_cache, ThreadPool, lock_path, write_pickle, \
config_true_value, listdir, split_path, ismount, remove_file, \
get_md5_socket, F_SETPIPE_SZ
from swift.common.splice import splice, tee
@ -117,13 +117,17 @@ def read_metadata(fd):
metadata += xattr.getxattr(fd, '%s%s' % (METADATA_KEY,
(key or '')))
key += 1
except IOError as e:
except (IOError, OSError) as e:
for err in 'ENOTSUP', 'EOPNOTSUPP':
if hasattr(errno, err) and e.errno == getattr(errno, err):
msg = "Filesystem at %s does not support xattr" % \
_get_filename(fd)
logging.exception(msg)
raise DiskFileXattrNotSupported(e)
if e.errno == errno.ENOENT:
raise DiskFileNotExist()
# TODO: we might want to re-raise errors that don't denote a missing
# xattr here. Seems to be ENODATA on linux and ENOATTR on BSD/OSX.
return pickle.loads(metadata)
@ -910,7 +914,7 @@ class DiskFileWriter(object):
return self._upload_size
def _finalize_put(self, metadata, target_path):
def _finalize_put(self, metadata, target_path, cleanup):
# Write the metadata before calling fsync() so that both data and
# metadata are flushed to disk.
write_metadata(self._fd, metadata)
@ -930,10 +934,11 @@ class DiskFileWriter(object):
# unnecessary os.unlink() of tempfile later. As renamer() has
# succeeded, the tempfile would no longer exist at its original path.
self._put_succeeded = True
try:
self.manager.hash_cleanup_listdir(self._datadir)
except OSError:
logging.exception(_('Problem cleaning up %s'), self._datadir)
if cleanup:
try:
self.manager.hash_cleanup_listdir(self._datadir)
except OSError:
logging.exception(_('Problem cleaning up %s'), self._datadir)
def put(self, metadata):
"""
@ -950,9 +955,10 @@ class DiskFileWriter(object):
timestamp = Timestamp(metadata['X-Timestamp']).internal
metadata['name'] = self._name
target_path = join(self._datadir, timestamp + self._extension)
cleanup = True
self._threadpool.force_run_in_thread(
self._finalize_put, metadata, target_path)
self._finalize_put, metadata, target_path, cleanup)
def commit(self, timestamp):
"""
@ -1590,6 +1596,8 @@ class DiskFile(object):
# file if we have one
try:
return read_metadata(source)
except (DiskFileXattrNotSupported, DiskFileNotExist):
raise
except Exception as err:
raise self._quarantine(
quarantine_filename,
@ -1612,7 +1620,7 @@ class DiskFile(object):
if meta_file:
self._metadata = self._failsafe_read_metadata(meta_file, meta_file)
sys_metadata = dict(
[(key, val) for key, val in datafile_metadata.iteritems()
[(key, val) for key, val in datafile_metadata.items()
if key.lower() in DATAFILE_SYSTEM_META
or is_sys_meta('object', key)])
self._metadata.update(sys_metadata)
@ -1784,30 +1792,33 @@ class ECDiskFileReader(DiskFileReader):
class ECDiskFileWriter(DiskFileWriter):
def _finalize_durable(self, durable_file_path):
exc = msg = None
exc = None
try:
with open(durable_file_path, 'w') as _fd:
fsync(_fd)
try:
with open(durable_file_path, 'w') as _fp:
fsync(_fp.fileno())
fsync_dir(self._datadir)
except (OSError, IOError) as err:
if err.errno not in (errno.ENOSPC, errno.EDQUOT):
# re-raise to catch all handler
raise
msg = (_('No space left on device for %s (%s)') %
(durable_file_path, err))
self.manager.logger.error(msg)
exc = DiskFileNoSpace(str(err))
else:
try:
self.manager.hash_cleanup_listdir(self._datadir)
except OSError:
except OSError as os_err:
self.manager.logger.exception(
_('Problem cleaning up %s'), self._datadir)
except OSError:
msg = (_('Problem fsyncing durable state file: %s'),
durable_file_path)
exc = DiskFileError(msg)
except IOError as io_err:
if io_err.errno in (errno.ENOSPC, errno.EDQUOT):
msg = (_("No space left on device for %s"),
durable_file_path)
exc = DiskFileNoSpace()
else:
msg = (_('Problem writing durable state file: %s'),
durable_file_path)
exc = DiskFileError(msg)
if exc:
_('Problem cleaning up %s (%s)') %
(self._datadir, os_err))
except Exception as err:
msg = (_('Problem writing durable state file %s (%s)') %
(durable_file_path, err))
self.manager.logger.exception(msg)
exc = DiskFileError(msg)
if exc:
raise exc
def commit(self, timestamp):
@ -1832,6 +1843,7 @@ class ECDiskFileWriter(DiskFileWriter):
"""
timestamp = Timestamp(metadata['X-Timestamp'])
fi = None
cleanup = True
if self._extension == '.data':
# generally we treat the fragment index provided in metadata as
# canon, but if it's unavailable (e.g. tests) it's reasonable to
@ -1839,13 +1851,15 @@ class ECDiskFileWriter(DiskFileWriter):
# sure that the fragment index is included in object sysmeta.
fi = metadata.setdefault('X-Object-Sysmeta-Ec-Frag-Index',
self._diskfile._frag_index)
# defer cleanup until commit() writes .durable
cleanup = False
filename = self.manager.make_on_disk_filename(
timestamp, self._extension, frag_index=fi)
metadata['name'] = self._name
target_path = join(self._datadir, filename)
self._threadpool.force_run_in_thread(
self._finalize_put, metadata, target_path)
self._finalize_put, metadata, target_path, cleanup)
class ECDiskFile(DiskFile):

View File

@ -29,8 +29,8 @@ from eventlet.support.greenlets import GreenletExit
from swift import gettext_ as _
from swift.common.utils import (
whataremyips, unlink_older_than, compute_eta, get_logger,
dump_recon_cache, ismount, mkdirs, config_true_value, list_from_csv,
get_hub, tpool_reraise, GreenAsyncPile, Timestamp, remove_file)
dump_recon_cache, mkdirs, config_true_value, list_from_csv, get_hub,
tpool_reraise, GreenAsyncPile, Timestamp, remove_file)
from swift.common.swob import HeaderKeyDict
from swift.common.bufferedhttp import http_connect
from swift.common.daemon import Daemon
@ -119,14 +119,18 @@ class ObjectReconstructor(Daemon):
self.devices_dir = conf.get('devices', '/srv/node')
self.mount_check = config_true_value(conf.get('mount_check', 'true'))
self.swift_dir = conf.get('swift_dir', '/etc/swift')
self.port = int(conf.get('bind_port', 6000))
self.bind_ip = conf.get('bind_ip', '0.0.0.0')
self.servers_per_port = int(conf.get('servers_per_port', '0') or 0)
self.port = None if self.servers_per_port else \
int(conf.get('bind_port', 6000))
self.concurrency = int(conf.get('concurrency', 1))
self.stats_interval = int(conf.get('stats_interval', '300'))
self.ring_check_interval = int(conf.get('ring_check_interval', 15))
self.next_check = time.time() + self.ring_check_interval
self.reclaim_age = int(conf.get('reclaim_age', 86400 * 7))
self.partition_times = []
self.run_pause = int(conf.get('run_pause', 30))
self.interval = int(conf.get('interval') or
conf.get('run_pause') or 30)
self.http_timeout = int(conf.get('http_timeout', 60))
self.lockup_timeout = int(conf.get('lockup_timeout', 1800))
self.recon_cache_path = conf.get('recon_cache_path',
@ -193,7 +197,6 @@ class ObjectReconstructor(Daemon):
:returns: response
"""
resp = None
headers['X-Backend-Node-Index'] = node['index']
try:
with ConnectionTimeout(self.conn_timeout):
conn = http_connect(node['ip'], node['port'], node['device'],
@ -249,6 +252,13 @@ class ObjectReconstructor(Daemon):
if not resp:
continue
resp.headers = HeaderKeyDict(resp.getheaders())
if str(fi_to_rebuild) == \
resp.headers.get('X-Object-Sysmeta-Ec-Frag-Index'):
continue
if resp.headers.get('X-Object-Sysmeta-Ec-Frag-Index') in set(
r.headers.get('X-Object-Sysmeta-Ec-Frag-Index')
for r in responses):
continue
responses.append(resp)
etag = sorted(responses, reverse=True,
key=lambda r: Timestamp(
@ -276,14 +286,8 @@ class ObjectReconstructor(Daemon):
rebuilt_fragment_iter)
def _reconstruct(self, policy, fragment_payload, frag_index):
# XXX with jerasure this doesn't work if we need to rebuild a
# parity fragment, and not all data fragments are available
# segment = policy.pyeclib_driver.reconstruct(
# fragment_payload, [frag_index])[0]
# for safety until pyeclib 1.0.7 we'll just use decode and encode
segment = policy.pyeclib_driver.decode(fragment_payload)
return policy.pyeclib_driver.encode(segment)[frag_index]
return policy.pyeclib_driver.reconstruct(fragment_payload,
[frag_index])[0]
def make_rebuilt_fragment_iter(self, responses, path, policy, frag_index):
"""
@ -422,7 +426,7 @@ class ObjectReconstructor(Daemon):
:returns: a list of strings, the suffix dirs to sync
"""
suffixes = []
for suffix, sub_dict_local in local_suff.iteritems():
for suffix, sub_dict_local in local_suff.items():
sub_dict_remote = remote_suff.get(suffix, {})
if (sub_dict_local.get(None) != sub_dict_remote.get(None) or
sub_dict_local.get(local_index) !=
@ -575,9 +579,12 @@ class ObjectReconstructor(Daemon):
job['sync_to'],
# I think we could order these based on our index to better
# protect against a broken chain
itertools.ifilter(
lambda n: n['id'] not in (n['id'] for n in job['sync_to']),
job['policy'].object_ring.get_part_nodes(job['partition'])),
[
n for n in
job['policy'].object_ring.get_part_nodes(job['partition'])
if n['id'] != job['local_dev']['id'] and
n['id'] not in (m['id'] for m in job['sync_to'])
],
)
syncd_with = 0
for node in dest_nodes:
@ -767,7 +774,7 @@ class ObjectReconstructor(Daemon):
"""
override_devices = override_devices or []
override_partitions = override_partitions or []
ips = whataremyips()
ips = whataremyips(self.bind_ip)
for policy in POLICIES:
if policy.policy_type != EC_POLICY:
continue
@ -779,17 +786,19 @@ class ObjectReconstructor(Daemon):
ips, self.port,
dev['replication_ip'], dev['replication_port']),
policy.object_ring.devs)
for local_dev in local_devices:
if override_devices and (local_dev['device'] not in
override_devices):
continue
dev_path = join(self.devices_dir, local_dev['device'])
obj_path = join(dev_path, data_dir)
tmp_path = join(dev_path, get_tmp_dir(int(policy)))
if self.mount_check and not ismount(dev_path):
dev_path = self._df_router[policy].get_dev_path(
local_dev['device'])
if not dev_path:
self.logger.warn(_('%s is not mounted'),
local_dev['device'])
continue
obj_path = join(dev_path, data_dir)
tmp_path = join(dev_path, get_tmp_dir(int(policy)))
unlink_older_than(tmp_path, time.time() -
self.reclaim_age)
if not os.path.exists(obj_path):
@ -923,5 +932,5 @@ class ObjectReconstructor(Daemon):
'object_reconstruction_last': time.time()},
self.rcache, self.logger)
self.logger.debug('reconstruction sleeping for %s seconds.',
self.run_pause)
sleep(self.run_pause)
self.interval)
sleep(self.interval)

View File

@ -65,17 +65,23 @@ class ObjectReplicator(Daemon):
self.mount_check = config_true_value(conf.get('mount_check', 'true'))
self.vm_test_mode = config_true_value(conf.get('vm_test_mode', 'no'))
self.swift_dir = conf.get('swift_dir', '/etc/swift')
self.port = int(conf.get('bind_port', 6000))
self.bind_ip = conf.get('bind_ip', '0.0.0.0')
self.servers_per_port = int(conf.get('servers_per_port', '0') or 0)
self.port = None if self.servers_per_port else \
int(conf.get('bind_port', 6000))
self.concurrency = int(conf.get('concurrency', 1))
self.stats_interval = int(conf.get('stats_interval', '300'))
self.ring_check_interval = int(conf.get('ring_check_interval', 15))
self.next_check = time.time() + self.ring_check_interval
self.reclaim_age = int(conf.get('reclaim_age', 86400 * 7))
self.partition_times = []
self.run_pause = int(conf.get('run_pause', 30))
self.interval = int(conf.get('interval') or
conf.get('run_pause') or 30)
self.rsync_timeout = int(conf.get('rsync_timeout', 900))
self.rsync_io_timeout = conf.get('rsync_io_timeout', '30')
self.rsync_bwlimit = conf.get('rsync_bwlimit', '0')
self.rsync_compress = config_true_value(
conf.get('rsync_compress', 'no'))
self.http_timeout = int(conf.get('http_timeout', 60))
self.lockup_timeout = int(conf.get('lockup_timeout', 1800))
self.recon_cache_path = conf.get('recon_cache_path',
@ -184,6 +190,11 @@ class ObjectReplicator(Daemon):
'--contimeout=%s' % self.rsync_io_timeout,
'--bwlimit=%s' % self.rsync_bwlimit,
]
if self.rsync_compress and \
job['region'] != node['region']:
# Allow for compression, but only if the remote node is in
# a different region than the local one.
args.append('--compress')
node_ip = rsync_ip(node['replication_ip'])
if self.vm_test_mode:
rsync_module = '%s::object%s' % (node_ip, node['replication_port'])
@ -262,7 +273,7 @@ class ObjectReplicator(Daemon):
synced_remote_regions[node['region']] = \
candidates.keys()
responses.append(success)
for region, cand_objs in synced_remote_regions.iteritems():
for region, cand_objs in synced_remote_regions.items():
if delete_objs is None:
delete_objs = cand_objs
else:
@ -531,7 +542,7 @@ class ObjectReplicator(Daemon):
policies will be returned
"""
jobs = []
ips = whataremyips()
ips = whataremyips(self.bind_ip)
for policy in POLICIES:
if policy.policy_type == REPL_POLICY:
if (override_policies is not None and
@ -644,5 +655,5 @@ class ObjectReplicator(Daemon):
'object_replication_last': time.time()},
self.rcache, self.logger)
self.logger.debug('Replication sleeping for %s seconds.',
self.run_pause)
sleep(self.run_pause)
self.interval)
sleep(self.interval)

View File

@ -411,7 +411,7 @@ class ObjectController(BaseStorageServer):
raise HTTPBadRequest("invalid JSON for footer doc")
def _check_container_override(self, update_headers, metadata):
for key, val in metadata.iteritems():
for key, val in metadata.items():
override_prefix = 'x-backend-container-update-override-'
if key.lower().startswith(override_prefix):
override = key.lower().replace(override_prefix, 'x-')
@ -446,7 +446,7 @@ class ObjectController(BaseStorageServer):
request=request,
headers={'X-Backend-Timestamp': orig_timestamp.internal})
metadata = {'X-Timestamp': req_timestamp.internal}
metadata.update(val for val in request.headers.iteritems()
metadata.update(val for val in request.headers.items()
if is_user_meta('object', val[0]))
for header_key in self.allowed_headers:
if header_key in request.headers:
@ -498,10 +498,14 @@ class ObjectController(BaseStorageServer):
except ValueError as e:
return HTTPBadRequest(body=str(e), request=request,
content_type='text/plain')
# SSYNC will include Frag-Index header for subrequests to primary
# nodes; handoff nodes should 409 subrequests to over-write an
# existing data fragment until they offloaded the existing fragment
frag_index = request.headers.get('X-Backend-Ssync-Frag-Index')
try:
disk_file = self.get_diskfile(
device, partition, account, container, obj,
policy=policy)
policy=policy, frag_index=frag_index)
except DiskFileDeviceUnavailable:
return HTTPInsufficientStorage(drive=device, request=request)
try:
@ -610,9 +614,9 @@ class ObjectController(BaseStorageServer):
'ETag': etag,
'Content-Length': str(upload_size),
}
metadata.update(val for val in request.headers.iteritems()
metadata.update(val for val in request.headers.items()
if is_sys_or_user_meta('object', val[0]))
metadata.update(val for val in footer_meta.iteritems()
metadata.update(val for val in footer_meta.items()
if is_sys_or_user_meta('object', val[0]))
headers_to_copy = (
request.headers.get(
@ -708,7 +712,7 @@ class ObjectController(BaseStorageServer):
conditional_etag=conditional_etag)
response.headers['Content-Type'] = metadata.get(
'Content-Type', 'application/octet-stream')
for key, value in metadata.iteritems():
for key, value in metadata.items():
if is_sys_or_user_meta('object', key) or \
key.lower() in self.allowed_headers:
response.headers[key] = value
@ -763,7 +767,7 @@ class ObjectController(BaseStorageServer):
conditional_etag=conditional_etag)
response.headers['Content-Type'] = metadata.get(
'Content-Type', 'application/octet-stream')
for key, value in metadata.iteritems():
for key, value in metadata.items():
if is_sys_or_user_meta('object', key) or \
key.lower() in self.allowed_headers:
response.headers[key] = value

View File

@ -19,7 +19,6 @@ import eventlet
import eventlet.wsgi
import eventlet.greenio
from swift.common import constraints
from swift.common import exceptions
from swift.common import http
from swift.common import swob
@ -70,6 +69,7 @@ class Receiver(object):
# raised during processing because otherwise the sender could send for
# quite some time before realizing it was all in vain.
self.disconnect = True
self.initialize_request()
def __call__(self):
"""
@ -89,10 +89,9 @@ class Receiver(object):
try:
# Double try blocks in case our main error handlers fail.
try:
# initialize_request is for preamble items that can be done
# outside a replication semaphore lock.
for data in self.initialize_request():
yield data
# Need to send something to trigger wsgi to return response
# headers and kick off the ssync exchange.
yield '\r\n'
# If semaphore is in use, try to acquire it, non-blocking, and
# return a 503 if it fails.
if self.app.replication_semaphore:
@ -144,20 +143,6 @@ class Receiver(object):
except Exception:
pass # We're okay with the above failing.
def _ensure_flush(self):
"""
Sends a blank line sufficient to flush buffers.
This is to ensure Eventlet versions that don't support
eventlet.minimum_write_chunk_size will send any previous data
buffered.
If https://bitbucket.org/eventlet/eventlet/pull-request/37
ever gets released in an Eventlet version, we should make
this yield only for versions older than that.
"""
yield ' ' * eventlet.wsgi.MINIMUM_CHUNK_SIZE + '\r\n'
def initialize_request(self):
"""
Basic validation of request and mount check.
@ -165,23 +150,29 @@ class Receiver(object):
This function will be called before attempting to acquire a
replication semaphore lock, so contains only quick checks.
"""
# The following is the setting we talk about above in _ensure_flush.
# This environ override has been supported since eventlet 0.14:
# https://bitbucket.org/eventlet/eventlet/commits/ \
# 4bd654205a4217970a57a7c4802fed7ff2c8b770
self.request.environ['eventlet.minimum_write_chunk_size'] = 0
self.device, self.partition, self.policy = \
request_helpers.get_name_and_placement(self.request, 2, 2, False)
if 'X-Backend-Ssync-Frag-Index' in self.request.headers:
self.frag_index = self.node_index = None
if self.request.headers.get('X-Backend-Ssync-Frag-Index'):
self.frag_index = int(
self.request.headers['X-Backend-Ssync-Frag-Index'])
else:
self.frag_index = None
if self.request.headers.get('X-Backend-Ssync-Node-Index'):
self.node_index = int(
self.request.headers['X-Backend-Ssync-Node-Index'])
if self.node_index != self.frag_index:
# a primary node should only recieve it's own fragments
raise swob.HTTPBadRequest(
'Frag-Index (%s) != Node-Index (%s)' % (
self.frag_index, self.node_index))
utils.validate_device_partition(self.device, self.partition)
self.diskfile_mgr = self.app._diskfile_router[self.policy]
if self.diskfile_mgr.mount_check and not constraints.check_mount(
self.diskfile_mgr.devices, self.device):
if not self.diskfile_mgr.get_dev_path(self.device):
raise swob.HTTPInsufficientStorage(drive=self.device)
self.fp = self.request.environ['wsgi.input']
for data in self._ensure_flush():
yield data
def missing_check(self):
"""
@ -251,11 +242,10 @@ class Receiver(object):
if want:
object_hashes.append(object_hash)
yield ':MISSING_CHECK: START\r\n'
yield '\r\n'.join(object_hashes)
if object_hashes:
yield '\r\n'.join(object_hashes)
yield '\r\n'
yield ':MISSING_CHECK: END\r\n'
for data in self._ensure_flush():
yield data
def updates(self):
"""
@ -361,6 +351,9 @@ class Receiver(object):
raise Exception('Invalid subrequest method %s' % method)
subreq.headers['X-Backend-Storage-Policy-Index'] = int(self.policy)
subreq.headers['X-Backend-Replication'] = 'True'
if self.node_index is not None:
# primary node should not 409 if it has a non-primary fragment
subreq.headers['X-Backend-Ssync-Frag-Index'] = self.node_index
if replication_headers:
subreq.headers['X-Backend-Replication-Headers'] = \
' '.join(replication_headers)
@ -389,5 +382,3 @@ class Receiver(object):
(failures, successes))
yield ':UPDATES: START\r\n'
yield ':UPDATES: END\r\n'
for data in self._ensure_flush():
yield data

View File

@ -129,13 +129,21 @@ class Sender(object):
self.connection.putheader('Transfer-Encoding', 'chunked')
self.connection.putheader('X-Backend-Storage-Policy-Index',
int(self.job['policy']))
self.connection.putheader('X-Backend-Ssync-Frag-Index',
self.node['index'])
# a sync job must use the node's index for the frag_index of the
# rebuilt fragments instead of the frag_index from the job which
# will be rebuilding them
self.connection.putheader(
'X-Backend-Ssync-Frag-Index', self.node.get(
'index', self.job.get('frag_index', '')))
# a revert job to a handoff will not have a node index
self.connection.putheader('X-Backend-Ssync-Node-Index',
self.node.get('index', ''))
self.connection.endheaders()
with exceptions.MessageTimeout(
self.daemon.node_timeout, 'connect receive'):
self.response = self.connection.getresponse()
if self.response.status != http.HTTP_OK:
self.response.read()
raise exceptions.ReplicationException(
'Expected status %s; got %s' %
(http.HTTP_OK, self.response.status))
@ -325,7 +333,7 @@ class Sender(object):
"""
msg = ['PUT ' + url_path, 'Content-Length: ' + str(df.content_length)]
# Sorted to make it easier to test.
for key, value in sorted(df.get_metadata().iteritems()):
for key, value in sorted(df.get_metadata().items()):
if key not in ('name', 'Content-Length'):
msg.append('%s: %s' % (key, value))
msg = '\r\n'.join(msg) + '\r\n\r\n'

View File

@ -28,7 +28,6 @@ import os
import time
import functools
import inspect
import logging
import operator
from sys import exc_info
from swift import gettext_ as _
@ -40,10 +39,11 @@ from eventlet.timeout import Timeout
from swift.common.wsgi import make_pre_authed_env
from swift.common.utils import Timestamp, config_true_value, \
public, split_path, list_from_csv, GreenthreadSafeIterator, \
GreenAsyncPile, quorum_size, parse_content_range
GreenAsyncPile, quorum_size, parse_content_type, \
http_response_to_document_iters, document_iters_to_http_response_body
from swift.common.bufferedhttp import http_connect
from swift.common.exceptions import ChunkReadTimeout, ChunkWriteTimeout, \
ConnectionTimeout
ConnectionTimeout, RangeAlreadyComplete
from swift.common.http import is_informational, is_success, is_redirection, \
is_server_error, HTTP_OK, HTTP_PARTIAL_CONTENT, HTTP_MULTIPLE_CHOICES, \
HTTP_BAD_REQUEST, HTTP_NOT_FOUND, HTTP_SERVICE_UNAVAILABLE, \
@ -122,7 +122,7 @@ def _prep_headers_to_info(headers, server_type):
meta = {}
sysmeta = {}
other = {}
for key, val in dict(headers).iteritems():
for key, val in dict(headers).items():
lkey = key.lower()
if is_user_meta(server_type, lkey):
meta[strip_user_meta_prefix(server_type, lkey)] = val
@ -613,10 +613,9 @@ def bytes_to_skip(record_size, range_start):
return (record_size - (range_start % record_size)) % record_size
class GetOrHeadHandler(object):
class ResumingGetter(object):
def __init__(self, app, req, server_type, node_iter, partition, path,
backend_headers, client_chunk_size=None):
backend_headers, client_chunk_size=None, newest=None):
self.app = app
self.node_iter = node_iter
self.server_type = server_type
@ -632,7 +631,10 @@ class GetOrHeadHandler(object):
self.req_method = req.method
self.req_path = req.path
self.req_query_string = req.query_string
self.newest = config_true_value(req.headers.get('x-newest', 'f'))
if newest is None:
self.newest = config_true_value(req.headers.get('x-newest', 'f'))
else:
self.newest = newest
# populated when finding source
self.statuses = []
@ -640,6 +642,9 @@ class GetOrHeadHandler(object):
self.bodies = []
self.source_headers = []
# populated from response headers
self.start_byte = self.end_byte = self.length = None
def fast_forward(self, num_bytes):
"""
Will skip num_bytes into the current ranges.
@ -648,57 +653,89 @@ class GetOrHeadHandler(object):
this request. This will change the Range header
so that the next req will start where it left off.
:raises NotImplementedError: if this is a multirange request
:raises ValueError: if invalid range header
:raises HTTPRequestedRangeNotSatisfiable: if begin + num_bytes
> end of range
> end of range + 1
:raises RangeAlreadyComplete: if begin + num_bytes == end of range + 1
"""
if 'Range' in self.backend_headers:
req_range = Range(self.backend_headers['Range'])
if len(req_range.ranges) > 1:
raise NotImplementedError()
begin, end = req_range.ranges.pop()
begin, end = req_range.ranges[0]
if begin is None:
# this is a -50 range req (last 50 bytes of file)
end -= num_bytes
else:
begin += num_bytes
if end and begin > end:
if end and begin == end + 1:
# we sent out exactly the first range's worth of bytes, so
# we're done with it
raise RangeAlreadyComplete()
elif end and begin > end:
raise HTTPRequestedRangeNotSatisfiable()
req_range.ranges = [(begin, end)]
elif end and begin:
req_range.ranges = [(begin, end)] + req_range.ranges[1:]
elif end:
req_range.ranges = [(None, end)] + req_range.ranges[1:]
else:
req_range.ranges = [(begin, None)] + req_range.ranges[1:]
self.backend_headers['Range'] = str(req_range)
else:
self.backend_headers['Range'] = 'bytes=%d-' % num_bytes
def learn_size_from_content_range(self, start, end):
def pop_range(self):
"""
Remove the first byterange from our Range header.
This is used after a byterange has been completely sent to the
client; this way, should we need to resume the download from another
object server, we do not re-fetch byteranges that the client already
has.
If we have no Range header, this is a no-op.
"""
if 'Range' in self.backend_headers:
req_range = Range(self.backend_headers['Range'])
begin, end = req_range.ranges.pop(0)
if len(req_range.ranges) > 0:
self.backend_headers['Range'] = str(req_range)
else:
self.backend_headers.pop('Range')
def learn_size_from_content_range(self, start, end, length):
"""
If client_chunk_size is set, makes sure we yield things starting on
chunk boundaries based on the Content-Range header in the response.
Sets our first Range header to the value learned from the
Content-Range header in the response; if we were given a
Sets our Range header's first byterange to the value learned from
the Content-Range header in the response; if we were given a
fully-specified range (e.g. "bytes=123-456"), this is a no-op.
If we were given a half-specified range (e.g. "bytes=123-" or
"bytes=-456"), then this changes the Range header to a
semantically-equivalent one *and* it lets us resume on a proper
boundary instead of just in the middle of a piece somewhere.
If the original request is for more than one range, this does not
affect our backend Range header, since we don't support resuming one
of those anyway.
"""
if length == 0:
return
if self.client_chunk_size:
self.skip_bytes = bytes_to_skip(self.client_chunk_size, start)
if 'Range' in self.backend_headers:
req_range = Range(self.backend_headers['Range'])
try:
req_range = Range(self.backend_headers['Range'])
new_ranges = [(start, end)] + req_range.ranges[1:]
except ValueError:
new_ranges = [(start, end)]
else:
new_ranges = [(start, end)]
if len(req_range.ranges) > 1:
return
self.backend_headers['Range'] = "bytes=%d-%d" % (start, end)
self.backend_headers['Range'] = (
"bytes=" + (",".join("%s-%s" % (s if s is not None else '',
e if e is not None else '')
for s, e in new_ranges)))
def is_good_source(self, src):
"""
@ -712,106 +749,183 @@ class GetOrHeadHandler(object):
return True
return is_success(src.status) or is_redirection(src.status)
def _make_app_iter(self, req, node, source):
"""
Returns an iterator over the contents of the source (via its read
func). There is also quite a bit of cleanup to ensure garbage
collection works and the underlying socket of the source is closed.
def response_parts_iter(self, req):
source, node = self._get_source_and_node()
it = None
if source:
it = self._get_response_parts_iter(req, node, source)
return it
def _get_response_parts_iter(self, req, node, source):
# Someday we can replace this [mess] with python 3's "nonlocal"
source = [source]
node = [node]
:param req: incoming request object
:param source: The httplib.Response object this iterator should read
from.
:param node: The node the source is reading from, for logging purposes.
"""
try:
nchunks = 0
client_chunk_size = self.client_chunk_size
bytes_consumed_from_backend = 0
node_timeout = self.app.node_timeout
if self.server_type == 'Object':
node_timeout = self.app.recoverable_node_timeout
buf = ''
while True:
try:
with ChunkReadTimeout(node_timeout):
chunk = source.read(self.app.object_chunk_size)
nchunks += 1
buf += chunk
except ChunkReadTimeout:
exc_type, exc_value, exc_traceback = exc_info()
if self.newest or self.server_type != 'Object':
raise exc_type, exc_value, exc_traceback
# This is safe; it sets up a generator but does not call next()
# on it, so no IO is performed.
parts_iter = [
http_response_to_document_iters(
source[0], read_chunk_size=self.app.object_chunk_size)]
def get_next_doc_part():
while True:
try:
self.fast_forward(bytes_consumed_from_backend)
except (NotImplementedError, HTTPException, ValueError):
raise exc_type, exc_value, exc_traceback
buf = ''
new_source, new_node = self._get_source_and_node()
if new_source:
self.app.exception_occurred(
node, _('Object'),
_('Trying to read during GET (retrying)'),
level=logging.ERROR, exc_info=(
exc_type, exc_value, exc_traceback))
# Close-out the connection as best as possible.
if getattr(source, 'swift_conn', None):
close_swift_conn(source)
source = new_source
node = new_node
continue
else:
raise exc_type, exc_value, exc_traceback
# This call to next() performs IO when we have a
# multipart/byteranges response; it reads the MIME
# boundary and part headers.
#
# If we don't have a multipart/byteranges response,
# but just a 200 or a single-range 206, then this
# performs no IO, and either just returns source or
# raises StopIteration.
with ChunkReadTimeout(node_timeout):
# if StopIteration is raised, it escapes and is
# handled elsewhere
start_byte, end_byte, length, headers, part = next(
parts_iter[0])
return (start_byte, end_byte, length, headers, part)
except ChunkReadTimeout:
new_source, new_node = self._get_source_and_node()
if new_source:
self.app.exception_occurred(
node[0], _('Object'),
_('Trying to read during GET (retrying)'))
# Close-out the connection as best as possible.
if getattr(source[0], 'swift_conn', None):
close_swift_conn(source[0])
source[0] = new_source
node[0] = new_node
# This is safe; it sets up a generator but does
# not call next() on it, so no IO is performed.
parts_iter[0] = http_response_to_document_iters(
new_source,
read_chunk_size=self.app.object_chunk_size)
else:
raise StopIteration()
if buf and self.skip_bytes:
if self.skip_bytes < len(buf):
buf = buf[self.skip_bytes:]
bytes_consumed_from_backend += self.skip_bytes
self.skip_bytes = 0
else:
self.skip_bytes -= len(buf)
bytes_consumed_from_backend += len(buf)
def iter_bytes_from_response_part(part_file):
nchunks = 0
buf = ''
bytes_used_from_backend = 0
while True:
try:
with ChunkReadTimeout(node_timeout):
chunk = part_file.read(self.app.object_chunk_size)
nchunks += 1
buf += chunk
except ChunkReadTimeout:
exc_type, exc_value, exc_traceback = exc_info()
if self.newest or self.server_type != 'Object':
raise exc_type, exc_value, exc_traceback
try:
self.fast_forward(bytes_used_from_backend)
except (HTTPException, ValueError):
raise exc_type, exc_value, exc_traceback
except RangeAlreadyComplete:
break
buf = ''
new_source, new_node = self._get_source_and_node()
if new_source:
self.app.exception_occurred(
node[0], _('Object'),
_('Trying to read during GET (retrying)'))
# Close-out the connection as best as possible.
if getattr(source[0], 'swift_conn', None):
close_swift_conn(source[0])
source[0] = new_source
node[0] = new_node
# This is safe; it just sets up a generator but
# does not call next() on it, so no IO is
# performed.
parts_iter[0] = http_response_to_document_iters(
new_source,
read_chunk_size=self.app.object_chunk_size)
if not chunk:
if buf:
with ChunkWriteTimeout(self.app.client_timeout):
bytes_consumed_from_backend += len(buf)
yield buf
buf = ''
break
try:
_junk, _junk, _junk, _junk, part_file = \
get_next_doc_part()
except StopIteration:
# Tried to find a new node from which to
# finish the GET, but failed. There's
# nothing more to do here.
return
else:
raise exc_type, exc_value, exc_traceback
else:
if buf and self.skip_bytes:
if self.skip_bytes < len(buf):
buf = buf[self.skip_bytes:]
bytes_used_from_backend += self.skip_bytes
self.skip_bytes = 0
else:
self.skip_bytes -= len(buf)
bytes_used_from_backend += len(buf)
buf = ''
if client_chunk_size is not None:
while len(buf) >= client_chunk_size:
client_chunk = buf[:client_chunk_size]
buf = buf[client_chunk_size:]
with ChunkWriteTimeout(self.app.client_timeout):
yield client_chunk
bytes_consumed_from_backend += len(client_chunk)
else:
with ChunkWriteTimeout(self.app.client_timeout):
yield buf
bytes_consumed_from_backend += len(buf)
buf = ''
if not chunk:
if buf:
with ChunkWriteTimeout(
self.app.client_timeout):
bytes_used_from_backend += len(buf)
yield buf
buf = ''
break
# This is for fairness; if the network is outpacing the CPU,
# we'll always be able to read and write data without
# encountering an EWOULDBLOCK, and so eventlet will not switch
# greenthreads on its own. We do it manually so that clients
# don't starve.
#
# The number 5 here was chosen by making stuff up. It's not
# every single chunk, but it's not too big either, so it seemed
# like it would probably be an okay choice.
#
# Note that we may trampoline to other greenthreads more often
# than once every 5 chunks, depending on how blocking our
# network IO is; the explicit sleep here simply provides a
# lower bound on the rate of trampolining.
if nchunks % 5 == 0:
sleep()
if client_chunk_size is not None:
while len(buf) >= client_chunk_size:
client_chunk = buf[:client_chunk_size]
buf = buf[client_chunk_size:]
with ChunkWriteTimeout(
self.app.client_timeout):
yield client_chunk
bytes_used_from_backend += len(client_chunk)
else:
with ChunkWriteTimeout(self.app.client_timeout):
yield buf
bytes_used_from_backend += len(buf)
buf = ''
# This is for fairness; if the network is outpacing
# the CPU, we'll always be able to read and write
# data without encountering an EWOULDBLOCK, and so
# eventlet will not switch greenthreads on its own.
# We do it manually so that clients don't starve.
#
# The number 5 here was chosen by making stuff up.
# It's not every single chunk, but it's not too big
# either, so it seemed like it would probably be an
# okay choice.
#
# Note that we may trampoline to other greenthreads
# more often than once every 5 chunks, depending on
# how blocking our network IO is; the explicit sleep
# here simply provides a lower bound on the rate of
# trampolining.
if nchunks % 5 == 0:
sleep()
try:
while True:
start_byte, end_byte, length, headers, part = \
get_next_doc_part()
self.learn_size_from_content_range(
start_byte, end_byte, length)
part_iter = iter_bytes_from_response_part(part)
yield {'start_byte': start_byte, 'end_byte': end_byte,
'entity_length': length, 'headers': headers,
'part_iter': part_iter}
self.pop_range()
except StopIteration:
return
except ChunkReadTimeout:
self.app.exception_occurred(node, _('Object'),
self.app.exception_occurred(node[0], _('Object'),
_('Trying to read during GET'))
raise
except ChunkWriteTimeout:
@ -827,8 +941,22 @@ class GetOrHeadHandler(object):
raise
finally:
# Close-out the connection as best as possible.
if getattr(source, 'swift_conn', None):
close_swift_conn(source)
if getattr(source[0], 'swift_conn', None):
close_swift_conn(source[0])
@property
def last_status(self):
if self.statuses:
return self.statuses[-1]
else:
return None
@property
def last_headers(self):
if self.source_headers:
return self.source_headers[-1]
else:
return None
def _get_source_and_node(self):
self.statuses = []
@ -869,7 +997,7 @@ class GetOrHeadHandler(object):
self.statuses.append(HTTP_NOT_FOUND)
self.reasons.append('')
self.bodies.append('')
self.source_headers.append('')
self.source_headers.append([])
close_swift_conn(possible_source)
else:
if self.used_source_etag:
@ -883,13 +1011,13 @@ class GetOrHeadHandler(object):
self.statuses.append(HTTP_NOT_FOUND)
self.reasons.append('')
self.bodies.append('')
self.source_headers.append('')
self.source_headers.append([])
continue
self.statuses.append(possible_source.status)
self.reasons.append(possible_source.reason)
self.bodies.append('')
self.source_headers.append('')
self.source_headers.append(possible_source.getheaders())
sources.append((possible_source, node))
if not self.newest: # one good source is enough
break
@ -923,6 +1051,44 @@ class GetOrHeadHandler(object):
return source, node
return None, None
class GetOrHeadHandler(ResumingGetter):
def _make_app_iter(self, req, node, source):
"""
Returns an iterator over the contents of the source (via its read
func). There is also quite a bit of cleanup to ensure garbage
collection works and the underlying socket of the source is closed.
:param req: incoming request object
:param source: The httplib.Response object this iterator should read
from.
:param node: The node the source is reading from, for logging purposes.
"""
ct = source.getheader('Content-Type')
if ct:
content_type, content_type_attrs = parse_content_type(ct)
is_multipart = content_type == 'multipart/byteranges'
else:
is_multipart = False
boundary = "dontcare"
if is_multipart:
# we need some MIME boundary; fortunately, the object server has
# furnished one for us, so we'll just re-use it
boundary = dict(content_type_attrs)["boundary"]
parts_iter = self._get_response_parts_iter(req, node, source)
def add_content_type(response_part):
response_part["content_type"] = \
HeaderKeyDict(response_part["headers"]).get("Content-Type")
return response_part
return document_iters_to_http_response_body(
(add_content_type(pi) for pi in parts_iter),
boundary, is_multipart, self.app.logger)
def get_working_response(self, req):
source, node = self._get_source_and_node()
res = None
@ -932,10 +1098,6 @@ class GetOrHeadHandler(object):
update_headers(res, source.getheaders())
if req.method == 'GET' and \
source.status in (HTTP_OK, HTTP_PARTIAL_CONTENT):
cr = res.headers.get('Content-Range')
if cr:
start, end, total = parse_content_range(cr)
self.learn_size_from_content_range(start, end)
res.app_iter = self._make_app_iter(req, node, source)
# See NOTE: swift_conn at top of file about this.
res.swift_conn = source.swift_conn
@ -1004,7 +1166,7 @@ class Controller(object):
k.lower() in self._x_remove_headers())
dst_headers.update((k.lower(), v)
for k, v in src_headers.iteritems()
for k, v in src_headers.items()
if k.lower() in self.pass_through_headers or
is_sys_or_user_meta(st, k))
@ -1197,16 +1359,18 @@ class Controller(object):
"""
return quorum_size(n)
def have_quorum(self, statuses, node_count):
def have_quorum(self, statuses, node_count, quorum=None):
"""
Given a list of statuses from several requests, determine if
a quorum response can already be decided.
:param statuses: list of statuses returned
:param node_count: number of nodes being queried (basically ring count)
:param quorum: number of statuses required for quorum
:returns: True or False, depending on if quorum is established
"""
quorum = self._quorum_size(node_count)
if quorum is None:
quorum = self._quorum_size(node_count)
if len(statuses) >= quorum:
for hundred in (HTTP_CONTINUE, HTTP_OK, HTTP_MULTIPLE_CHOICES,
HTTP_BAD_REQUEST):
@ -1324,7 +1488,7 @@ class Controller(object):
# transfer any x-account-sysmeta headers from original request
# to the autocreate PUT
headers.update((k, v)
for k, v in req.headers.iteritems()
for k, v in req.headers.items()
if is_sys_meta('account', k))
resp = self.make_requests(Request.blank('/v1' + path),
self.app.account_ring, partition, 'PUT',
@ -1453,7 +1617,8 @@ class Controller(object):
list_from_csv(req.headers['Access-Control-Request-Headers']))
# Populate the response with the CORS preflight headers
if cors.get('allow_origin', '').strip() == '*':
if cors.get('allow_origin') and \
cors.get('allow_origin').strip() == '*':
headers['access-control-allow-origin'] = '*'
else:
headers['access-control-allow-origin'] = req_origin_value

File diff suppressed because it is too large Load Diff

View File

@ -7,9 +7,9 @@ hacking>=0.8.0,<0.9
coverage
nose
nosexcover
openstack.nose_plugin
nosehtmloutput
oslosphinx
sphinx>=1.1.2,<1.2
mock>=1.0
python-swiftclient
python-keystoneclient>=1.3.0

View File

@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import httplib
import mock
import os
import sys
@ -52,7 +53,7 @@ from swift.container import server as container_server
from swift.obj import server as object_server, mem_server as mem_object_server
import swift.proxy.controllers.obj
httplib._MAXHEADERS = constraints.MAX_HEADER_COUNT
DEBUG = True
# In order to get the proper blocking behavior of sockets without using

View File

@ -29,10 +29,13 @@ from xml.dom import minidom
from swiftclient import get_auth
from swift.common import constraints
from swift.common.utils import config_true_value
from test import safe_repr
httplib._MAXHEADERS = constraints.MAX_HEADER_COUNT
class AuthenticationFailed(Exception):
pass
@ -68,7 +71,7 @@ class ResponseError(Exception):
def listing_empty(method):
for i in xrange(6):
for i in range(6):
if len(method()) == 0:
return True
@ -181,7 +184,11 @@ class Connection(object):
self.storage_url = str('/%s/%s' % (x[3], x[4]))
self.account_name = str(x[4])
self.auth_user = auth_user
self.storage_token = storage_token
# With v2 keystone, storage_token is unicode.
# We want it to be string otherwise this would cause
# troubles when doing query with already encoded
# non ascii characters in its headers.
self.storage_token = str(storage_token)
self.user_acl = '%s:%s' % (self.account, self.username)
self.http_connect()
@ -330,7 +337,7 @@ class Connection(object):
port=self.storage_port)
#self.connection.set_debuglevel(3)
self.connection.putrequest('PUT', path)
for key, value in headers.iteritems():
for key, value in headers.items():
self.connection.putheader(key, value)
self.connection.endheaders()
@ -847,7 +854,7 @@ class File(Base):
finally:
fobj.close()
def sync_metadata(self, metadata=None, cfg=None):
def sync_metadata(self, metadata=None, cfg=None, parms=None):
if metadata is None:
metadata = {}
if cfg is None:
@ -864,7 +871,8 @@ class File(Base):
else:
headers['Content-Length'] = 0
self.conn.make_request('POST', self.path, hdrs=headers, cfg=cfg)
self.conn.make_request('POST', self.path, hdrs=headers,
parms=parms, cfg=cfg)
if self.conn.response.status not in (201, 202):
raise ResponseError(self.conn.response, 'POST',

View File

@ -21,6 +21,7 @@ from uuid import uuid4
from nose import SkipTest
from string import letters
from six.moves import range
from swift.common.middleware.acl import format_acl
from test.functional import check_response, retry, requires_acls, \
@ -790,13 +791,13 @@ class TestAccount(unittest.TestCase):
resp = retry(post, headers)
headers = {}
for x in xrange(self.max_meta_count):
for x in range(self.max_meta_count):
headers['X-Account-Meta-%d' % x] = 'v'
resp = retry(post, headers)
resp.read()
self.assertEqual(resp.status, 204)
headers = {}
for x in xrange(self.max_meta_count + 1):
for x in range(self.max_meta_count + 1):
headers['X-Account-Meta-%d' % x] = 'v'
resp = retry(post, headers)
resp.read()
@ -827,8 +828,23 @@ class TestAccount(unittest.TestCase):
resp = retry(post, headers)
resp.read()
self.assertEqual(resp.status, 204)
# this POST includes metadata size that is over limit
headers['X-Account-Meta-k'] = \
'v' * (self.max_meta_overall_size - size)
'x' * (self.max_meta_overall_size - size)
resp = retry(post, headers)
resp.read()
self.assertEqual(resp.status, 400)
# this POST would be ok and the aggregate backend metadata
# size is on the border
headers = {'X-Account-Meta-k':
'y' * (self.max_meta_overall_size - size - 1)}
resp = retry(post, headers)
resp.read()
self.assertEqual(resp.status, 204)
# this last POST would be ok by itself but takes the aggregate
# backend metadata size over limit
headers = {'X-Account-Meta-k':
'z' * (self.max_meta_overall_size - size)}
resp = retry(post, headers)
resp.read()
self.assertEqual(resp.status, 400)

View File

@ -24,6 +24,8 @@ from test.functional import check_response, retry, requires_acls, \
load_constraint, requires_policies
import test.functional as tf
from six.moves import range
class TestContainer(unittest.TestCase):
@ -319,7 +321,7 @@ class TestContainer(unittest.TestCase):
name = uuid4().hex
headers = {}
for x in xrange(self.max_meta_count):
for x in range(self.max_meta_count):
headers['X-Container-Meta-%d' % x] = 'v'
resp = retry(put, name, headers)
resp.read()
@ -329,7 +331,7 @@ class TestContainer(unittest.TestCase):
self.assertEqual(resp.status, 204)
name = uuid4().hex
headers = {}
for x in xrange(self.max_meta_count + 1):
for x in range(self.max_meta_count + 1):
headers['X-Container-Meta-%d' % x] = 'v'
resp = retry(put, name, headers)
resp.read()
@ -412,13 +414,13 @@ class TestContainer(unittest.TestCase):
return check_response(conn)
headers = {}
for x in xrange(self.max_meta_count):
for x in range(self.max_meta_count):
headers['X-Container-Meta-%d' % x] = 'v'
resp = retry(post, headers)
resp.read()
self.assertEqual(resp.status, 204)
headers = {}
for x in xrange(self.max_meta_count + 1):
for x in range(self.max_meta_count + 1):
headers['X-Container-Meta-%d' % x] = 'v'
resp = retry(post, headers)
resp.read()
@ -449,8 +451,23 @@ class TestContainer(unittest.TestCase):
resp = retry(post, headers)
resp.read()
self.assertEqual(resp.status, 204)
# this POST includes metadata size that is over limit
headers['X-Container-Meta-k'] = \
'v' * (self.max_meta_overall_size - size)
'x' * (self.max_meta_overall_size - size)
resp = retry(post, headers)
resp.read()
self.assertEqual(resp.status, 400)
# this POST would be ok and the aggregate backend metadata
# size is on the border
headers = {'X-Container-Meta-k':
'y' * (self.max_meta_overall_size - size - 1)}
resp = retry(post, headers)
resp.read()
self.assertEqual(resp.status, 204)
# this last POST would be ok by itself but takes the aggregate
# backend metadata size over limit
headers = {'X-Container-Meta-k':
'z' * (self.max_meta_overall_size - size)}
resp = retry(post, headers)
resp.read()
self.assertEqual(resp.status, 400)

View File

@ -15,11 +15,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import unittest
from nose import SkipTest
from uuid import uuid4
from swift.common.utils import json
from six.moves import range
from test.functional import check_response, retry, requires_acls, \
requires_policies
@ -746,7 +747,7 @@ class TestObject(unittest.TestCase):
parsed.path, self.container, str(objnum)), segments1[objnum],
{'X-Auth-Token': token})
return check_response(conn)
for objnum in xrange(len(segments1)):
for objnum in range(len(segments1)):
resp = retry(put, objnum)
resp.read()
self.assertEqual(resp.status, 201)
@ -809,7 +810,7 @@ class TestObject(unittest.TestCase):
parsed.path, self.container, str(objnum)), segments2[objnum],
{'X-Auth-Token': token})
return check_response(conn)
for objnum in xrange(len(segments2)):
for objnum in range(len(segments2)):
resp = retry(put, objnum)
resp.read()
self.assertEqual(resp.status, 201)
@ -891,7 +892,7 @@ class TestObject(unittest.TestCase):
parsed.path, acontainer, str(objnum)), segments3[objnum],
{'X-Auth-Token': token})
return check_response(conn)
for objnum in xrange(len(segments3)):
for objnum in range(len(segments3)):
resp = retry(put, objnum)
resp.read()
self.assertEqual(resp.status, 201)
@ -966,7 +967,7 @@ class TestObject(unittest.TestCase):
parsed.path, acontainer, str(objnum)), '',
{'X-Auth-Token': token})
return check_response(conn)
for objnum in xrange(len(segments3)):
for objnum in range(len(segments3)):
resp = retry(delete, objnum)
resp.read()
self.assertEqual(resp.status, 204)
@ -977,7 +978,7 @@ class TestObject(unittest.TestCase):
parsed.path, self.container, str(objnum)), '',
{'X-Auth-Token': token})
return check_response(conn)
for objnum in xrange(len(segments2)):
for objnum in range(len(segments2)):
resp = retry(delete, objnum)
resp.read()
self.assertEqual(resp.status, 204)
@ -988,7 +989,7 @@ class TestObject(unittest.TestCase):
parsed.path, self.container, str(objnum)), '',
{'X-Auth-Token': token})
return check_response(conn)
for objnum in xrange(len(segments1)):
for objnum in range(len(segments1)):
resp = retry(delete, objnum)
resp.read()
self.assertEqual(resp.status, 204)

View File

@ -31,7 +31,7 @@ from nose import SkipTest
from swift.common.http import is_success, is_client_error
from test.functional import normalized_urls, load_constraint, cluster_info
from test.functional import check_response, retry
from test.functional import check_response, retry, requires_acls
import test.functional as tf
from test.functional.swift_test_client import Account, Connection, File, \
ResponseError
@ -55,7 +55,7 @@ class Utils(object):
u'\u3705\u1803\u0902\uF112\uD210\uB30E\u940C\u850B'\
u'\u5608\u3706\u1804\u0903\u03A9\u2603'
return ''.join([random.choice(utf8_chars)
for x in xrange(length)]).encode('utf-8')
for x in range(length)]).encode('utf-8')
create_name = create_ascii_name
@ -393,14 +393,14 @@ class TestContainer(Base):
cont = self.env.account.container(Utils.create_name())
self.assert_(cont.create())
files = sorted([Utils.create_name() for x in xrange(10)])
files = sorted([Utils.create_name() for x in range(10)])
for f in files:
file_item = cont.file(f)
self.assert_(file_item.write_random())
for i in xrange(len(files)):
for i in range(len(files)):
f = files[i]
for j in xrange(1, len(files) - i):
for j in range(1, len(files) - i):
self.assert_(cont.files(parms={'limit': j, 'marker': f}) ==
files[i + 1: i + j + 1])
self.assert_(cont.files(parms={'marker': f}) == files[i + 1:])
@ -2151,6 +2151,16 @@ class TestSloEnv(object):
'manifest-bcd-submanifest')},
seg_info['seg_e']]),
parms={'multipart-manifest': 'put'})
cls.seg_info = seg_info
file_item = cls.container.file("manifest-db")
file_item.write(
json.dumps([
{'path': seg_info['seg_d']['path'], 'etag': None,
'size_bytes': None},
{'path': seg_info['seg_b']['path'], 'etag': None,
'size_bytes': None},
]), parms={'multipart-manifest': 'put'})
class TestSlo(Base):
@ -2259,6 +2269,72 @@ class TestSlo(Base):
else:
self.fail("Expected ResponseError but didn't get it")
def test_slo_unspecified_etag(self):
file_item = self.env.container.file("manifest-a-unspecified-etag")
file_item.write(
json.dumps([{
'size_bytes': 1024 * 1024,
'etag': None,
'path': '/%s/%s' % (self.env.container.name, 'seg_a')}]),
parms={'multipart-manifest': 'put'})
self.assert_status(201)
def test_slo_unspecified_size(self):
file_item = self.env.container.file("manifest-a-unspecified-size")
file_item.write(
json.dumps([{
'size_bytes': None,
'etag': hashlib.md5('a' * 1024 * 1024).hexdigest(),
'path': '/%s/%s' % (self.env.container.name, 'seg_a')}]),
parms={'multipart-manifest': 'put'})
self.assert_status(201)
def test_slo_missing_etag(self):
file_item = self.env.container.file("manifest-a-missing-etag")
try:
file_item.write(
json.dumps([{
'size_bytes': 1024 * 1024,
'path': '/%s/%s' % (self.env.container.name, 'seg_a')}]),
parms={'multipart-manifest': 'put'})
except ResponseError as err:
self.assertEqual(400, err.status)
else:
self.fail("Expected ResponseError but didn't get it")
def test_slo_missing_size(self):
file_item = self.env.container.file("manifest-a-missing-size")
try:
file_item.write(
json.dumps([{
'etag': hashlib.md5('a' * 1024 * 1024).hexdigest(),
'path': '/%s/%s' % (self.env.container.name, 'seg_a')}]),
parms={'multipart-manifest': 'put'})
except ResponseError as err:
self.assertEqual(400, err.status)
else:
self.fail("Expected ResponseError but didn't get it")
def test_slo_overwrite_segment_with_manifest(self):
file_item = self.env.container.file("seg_b")
try:
file_item.write(
json.dumps([
{'size_bytes': 1024 * 1024,
'etag': hashlib.md5('a' * 1024 * 1024).hexdigest(),
'path': '/%s/%s' % (self.env.container.name, 'seg_a')},
{'size_bytes': 1024 * 1024,
'etag': hashlib.md5('b' * 1024 * 1024).hexdigest(),
'path': '/%s/%s' % (self.env.container.name, 'seg_b')},
{'size_bytes': 1024 * 1024,
'etag': hashlib.md5('c' * 1024 * 1024).hexdigest(),
'path': '/%s/%s' % (self.env.container.name, 'seg_c')}]),
parms={'multipart-manifest': 'put'})
except ResponseError as err:
self.assertEqual(409, err.status)
else:
self.fail("Expected ResponseError but didn't get it")
def test_slo_copy(self):
file_item = self.env.container.file("manifest-abcde")
file_item.copy(self.env.container.name, "copied-abcde")
@ -2336,6 +2412,58 @@ class TestSlo(Base):
except ValueError:
self.fail("COPY didn't copy the manifest (invalid json on GET)")
def _make_manifest(self):
# To avoid the bug 1453807 on fast-post, make a new manifest
# for post test.
file_item = self.env.container.file("manifest-post")
seg_info = self.env.seg_info
file_item.write(
json.dumps([seg_info['seg_a'], seg_info['seg_b'],
seg_info['seg_c'], seg_info['seg_d'],
seg_info['seg_e']]),
parms={'multipart-manifest': 'put'})
return file_item
def test_slo_post_the_manifest_metadata_update(self):
file_item = self._make_manifest()
# sanity check, check the object is an SLO manifest
file_item.info()
file_item.header_fields([('slo', 'x-static-large-object')])
# POST a user metadata (i.e. x-object-meta-post)
file_item.sync_metadata({'post': 'update'})
updated = self.env.container.file("manifest-post")
updated.info()
updated.header_fields([('user-meta', 'x-object-meta-post')]) # sanity
updated_contents = updated.read(parms={'multipart-manifest': 'get'})
try:
json.loads(updated_contents)
except ValueError:
self.fail("Unexpected content on GET, expected a json body")
def test_slo_post_the_manifest_metadata_update_with_qs(self):
# multipart-manifest query should be ignored on post
for verb in ('put', 'get', 'delete'):
file_item = self._make_manifest()
# sanity check, check the object is an SLO manifest
file_item.info()
file_item.header_fields([('slo', 'x-static-large-object')])
# POST a user metadata (i.e. x-object-meta-post)
file_item.sync_metadata(metadata={'post': 'update'},
parms={'multipart-manifest': verb})
updated = self.env.container.file("manifest-post")
updated.info()
updated.header_fields(
[('user-meta', 'x-object-meta-post')]) # sanity
updated_contents = updated.read(
parms={'multipart-manifest': 'get'})
try:
json.loads(updated_contents)
except ValueError:
self.fail(
"Unexpected content on GET, expected a json body")
def test_slo_get_the_manifest(self):
manifest = self.env.container.file("manifest-abcde")
got_body = manifest.read(parms={'multipart-manifest': 'get'})
@ -2347,6 +2475,30 @@ class TestSlo(Base):
except ValueError:
self.fail("GET with multipart-manifest=get got invalid json")
def test_slo_get_the_manifest_with_details_from_server(self):
manifest = self.env.container.file("manifest-db")
got_body = manifest.read(parms={'multipart-manifest': 'get'})
self.assertEqual('application/json; charset=utf-8',
manifest.content_type)
try:
value = json.loads(got_body)
except ValueError:
self.fail("GET with multipart-manifest=get got invalid json")
self.assertEqual(len(value), 2)
self.assertEqual(value[0]['bytes'], 1024 * 1024)
self.assertEqual(value[0]['hash'],
hashlib.md5('d' * 1024 * 1024).hexdigest())
self.assertEqual(value[0]['name'],
'/%s/seg_d' % self.env.container.name.decode("utf-8"))
self.assertEqual(value[1]['bytes'], 1024 * 1024)
self.assertEqual(value[1]['hash'],
hashlib.md5('b' * 1024 * 1024).hexdigest())
self.assertEqual(value[1]['name'],
'/%s/seg_b' % self.env.container.name.decode("utf-8"))
def test_slo_head_the_manifest(self):
manifest = self.env.container.file("manifest-abcde")
got_info = manifest.info(parms={'multipart-manifest': 'get'})
@ -2984,6 +3136,7 @@ class TestContainerTempurl(Base):
parms=parms)
self.assert_status([401])
@requires_acls
def test_tempurl_keys_visible_to_account_owner(self):
if not tf.cluster_info.get('tempauth'):
raise SkipTest('TEMP AUTH SPECIFIC TEST')
@ -2991,6 +3144,7 @@ class TestContainerTempurl(Base):
self.assertEqual(metadata.get('tempurl_key'), self.env.tempurl_key)
self.assertEqual(metadata.get('tempurl_key2'), self.env.tempurl_key2)
@requires_acls
def test_tempurl_keys_hidden_from_acl_readonly(self):
if not tf.cluster_info.get('tempauth'):
raise SkipTest('TEMP AUTH SPECIFIC TEST')

View File

@ -142,7 +142,7 @@ class BrainSplitter(object):
"""
put container with next storage policy
"""
policy = self.policies.next()
policy = next(self.policies)
if policy_index is not None:
policy = POLICIES.get_by_index(int(policy_index))
if not policy:

View File

@ -26,7 +26,7 @@ from swiftclient import get_auth, head_account
from swift.obj.diskfile import get_data_dir
from swift.common.ring import Ring
from swift.common.utils import readconf
from swift.common.utils import readconf, renamer
from swift.common.manager import Manager
from swift.common.storage_policy import POLICIES, EC_POLICY, REPL_POLICY
@ -39,8 +39,8 @@ for p in POLICIES:
POLICIES_BY_TYPE[p.policy_type].append(p)
def get_server_number(port, port2server):
server_number = port2server[port]
def get_server_number(ipport, ipport2server):
server_number = ipport2server[ipport]
server, number = server_number[:-1], server_number[-1:]
try:
number = int(number)
@ -50,19 +50,19 @@ def get_server_number(port, port2server):
return server, number
def start_server(port, port2server, pids, check=True):
server, number = get_server_number(port, port2server)
def start_server(ipport, ipport2server, pids, check=True):
server, number = get_server_number(ipport, ipport2server)
err = Manager([server]).start(number=number, wait=False)
if err:
raise Exception('unable to start %s' % (
server if not number else '%s%s' % (server, number)))
if check:
return check_server(port, port2server, pids)
return check_server(ipport, ipport2server, pids)
return None
def check_server(port, port2server, pids, timeout=CHECK_SERVER_TIMEOUT):
server = port2server[port]
def check_server(ipport, ipport2server, pids, timeout=CHECK_SERVER_TIMEOUT):
server = ipport2server[ipport]
if server[:-1] in ('account', 'container', 'object'):
if int(server[-1]) > 4:
return None
@ -74,7 +74,7 @@ def check_server(port, port2server, pids, timeout=CHECK_SERVER_TIMEOUT):
try_until = time() + timeout
while True:
try:
conn = HTTPConnection('127.0.0.1', port)
conn = HTTPConnection(*ipport)
conn.request('GET', path)
resp = conn.getresponse()
# 404 because it's a nonsense path (and mount_check is false)
@ -87,14 +87,14 @@ def check_server(port, port2server, pids, timeout=CHECK_SERVER_TIMEOUT):
if time() > try_until:
print err
print 'Giving up on %s:%s after %s seconds.' % (
server, port, timeout)
server, ipport, timeout)
raise err
sleep(0.1)
else:
try_until = time() + timeout
while True:
try:
url, token = get_auth('http://127.0.0.1:8080/auth/v1.0',
url, token = get_auth('http://%s:%d/auth/v1.0' % ipport,
'test:tester', 'testing')
account = url.split('/')[-1]
head_account(url, token)
@ -108,8 +108,8 @@ def check_server(port, port2server, pids, timeout=CHECK_SERVER_TIMEOUT):
return None
def kill_server(port, port2server, pids):
server, number = get_server_number(port, port2server)
def kill_server(ipport, ipport2server, pids):
server, number = get_server_number(ipport, ipport2server)
err = Manager([server]).kill(number=number)
if err:
raise Exception('unable to kill %s' % (server if not number else
@ -117,47 +117,77 @@ def kill_server(port, port2server, pids):
try_until = time() + 30
while True:
try:
conn = HTTPConnection('127.0.0.1', port)
conn = HTTPConnection(*ipport)
conn.request('GET', '/')
conn.getresponse()
except Exception as err:
break
if time() > try_until:
raise Exception(
'Still answering on port %s after 30 seconds' % port)
'Still answering on %s:%s after 30 seconds' % ipport)
sleep(0.1)
def kill_nonprimary_server(primary_nodes, port2server, pids):
primary_ports = [n['port'] for n in primary_nodes]
for port, server in port2server.iteritems():
if port in primary_ports:
def kill_nonprimary_server(primary_nodes, ipport2server, pids):
primary_ipports = [(n['ip'], n['port']) for n in primary_nodes]
for ipport, server in ipport2server.items():
if ipport in primary_ipports:
server_type = server[:-1]
break
else:
raise Exception('Cannot figure out server type for %r' % primary_nodes)
for port, server in list(port2server.iteritems()):
if server[:-1] == server_type and port not in primary_ports:
kill_server(port, port2server, pids)
return port
for ipport, server in list(ipport2server.items()):
if server[:-1] == server_type and ipport not in primary_ipports:
kill_server(ipport, ipport2server, pids)
return ipport
def build_port_to_conf(server):
# map server to config by port
port_to_config = {}
for server_ in Manager([server]):
for config_path in server_.conf_files():
conf = readconf(config_path,
section_name='%s-replicator' % server_.type)
port_to_config[int(conf['bind_port'])] = conf
return port_to_config
def add_ring_devs_to_ipport2server(ring, server_type, ipport2server,
servers_per_port=0):
# We'll number the servers by order of unique occurrence of:
# IP, if servers_per_port > 0 OR there > 1 IP in ring
# ipport, otherwise
unique_ip_count = len(set(dev['ip'] for dev in ring.devs if dev))
things_to_number = {}
number = 0
for dev in filter(None, ring.devs):
ip = dev['ip']
ipport = (ip, dev['port'])
unique_by = ip if servers_per_port or unique_ip_count > 1 else ipport
if unique_by not in things_to_number:
number += 1
things_to_number[unique_by] = number
ipport2server[ipport] = '%s%d' % (server_type,
things_to_number[unique_by])
def store_config_paths(name, configs):
for server_name in (name, '%s-replicator' % name):
for server in Manager([server_name]):
for i, conf in enumerate(server.conf_files(), 1):
configs[server.server][i] = conf
def get_ring(ring_name, required_replicas, required_devices,
server=None, force_validate=None):
server=None, force_validate=None, ipport2server=None,
config_paths=None):
if not server:
server = ring_name
ring = Ring('/etc/swift', ring_name=ring_name)
if ipport2server is None:
ipport2server = {} # used internally, even if not passed in
if config_paths is None:
config_paths = defaultdict(dict)
store_config_paths(server, config_paths)
repl_name = '%s-replicator' % server
repl_configs = {i: readconf(c, section_name=repl_name)
for i, c in config_paths[repl_name].items()}
servers_per_port = any(int(c.get('servers_per_port', '0'))
for c in repl_configs.values())
add_ring_devs_to_ipport2server(ring, server, ipport2server,
servers_per_port=servers_per_port)
if not VALIDATE_RSYNC and not force_validate:
return ring
# easy sanity checks
@ -167,10 +197,11 @@ def get_ring(ring_name, required_replicas, required_devices,
if len(ring.devs) != required_devices:
raise SkipTest('%s has %s devices instead of %s' % (
ring.serialized_path, len(ring.devs), required_devices))
port_to_config = build_port_to_conf(server)
for dev in ring.devs:
# verify server is exposing mounted device
conf = port_to_config[dev['port']]
ipport = (dev['ip'], dev['port'])
_, server_number = get_server_number(ipport, ipport2server)
conf = repl_configs[server_number]
for device in os.listdir(conf['devices']):
if device == dev['device']:
dev_path = os.path.join(conf['devices'], device)
@ -185,7 +216,7 @@ def get_ring(ring_name, required_replicas, required_devices,
"unable to find ring device %s under %s's devices (%s)" % (
dev['device'], server, conf['devices']))
# verify server is exposing rsync device
if port_to_config[dev['port']].get('vm_test_mode', False):
if conf.get('vm_test_mode', False):
rsync_export = '%s%s' % (server, dev['replication_port'])
else:
rsync_export = server
@ -235,46 +266,45 @@ class ProbeTest(unittest.TestCase):
Manager(['all']).stop()
self.pids = {}
try:
self.ipport2server = {}
self.configs = defaultdict(dict)
self.account_ring = get_ring(
'account',
self.acct_cont_required_replicas,
self.acct_cont_required_devices)
self.acct_cont_required_devices,
ipport2server=self.ipport2server,
config_paths=self.configs)
self.container_ring = get_ring(
'container',
self.acct_cont_required_replicas,
self.acct_cont_required_devices)
self.acct_cont_required_devices,
ipport2server=self.ipport2server,
config_paths=self.configs)
self.policy = get_policy(**self.policy_requirements)
self.object_ring = get_ring(
self.policy.ring_name,
self.obj_required_replicas,
self.obj_required_devices,
server='object')
server='object',
ipport2server=self.ipport2server,
config_paths=self.configs)
self.servers_per_port = any(
int(readconf(c, section_name='object-replicator').get(
'servers_per_port', '0'))
for c in self.configs['object-replicator'].values())
Manager(['main']).start(wait=False)
self.port2server = {}
for server, port in [('account', 6002), ('container', 6001),
('object', 6000)]:
for number in xrange(1, 9):
self.port2server[port + (number * 10)] = \
'%s%d' % (server, number)
for port in self.port2server:
check_server(port, self.port2server, self.pids)
self.port2server[8080] = 'proxy'
self.url, self.token, self.account = \
check_server(8080, self.port2server, self.pids)
self.configs = defaultdict(dict)
for name in ('account', 'container', 'object'):
for server_name in (name, '%s-replicator' % name):
for server in Manager([server_name]):
for i, conf in enumerate(server.conf_files(), 1):
self.configs[server.server][i] = conf
for ipport in self.ipport2server:
check_server(ipport, self.ipport2server, self.pids)
proxy_ipport = ('127.0.0.1', 8080)
self.ipport2server[proxy_ipport] = 'proxy'
self.url, self.token, self.account = check_server(
proxy_ipport, self.ipport2server, self.pids)
self.replicators = Manager(
['account-replicator', 'container-replicator',
'object-replicator'])
self.updaters = Manager(['container-updater', 'object-updater'])
self.server_port_to_conf = {}
# get some configs backend daemon configs loaded up
for server in ('account', 'container', 'object'):
self.server_port_to_conf[server] = build_port_to_conf(server)
except BaseException:
try:
raise
@ -288,7 +318,11 @@ class ProbeTest(unittest.TestCase):
Manager(['all']).kill()
def device_dir(self, server, node):
conf = self.server_port_to_conf[server][node['port']]
server_type, config_number = get_server_number(
(node['ip'], node['port']), self.ipport2server)
repl_server = '%s-replicator' % server_type
conf = readconf(self.configs[repl_server][config_number],
section_name=repl_server)
return os.path.join(conf['devices'], node['device'])
def storage_dir(self, server, node, part=None, policy=None):
@ -301,9 +335,24 @@ class ProbeTest(unittest.TestCase):
def config_number(self, node):
_server_type, config_number = get_server_number(
node['port'], self.port2server)
(node['ip'], node['port']), self.ipport2server)
return config_number
def is_local_to(self, node1, node2):
"""
Return True if both ring devices are "local" to each other (on the same
"server".
"""
if self.servers_per_port:
return node1['ip'] == node2['ip']
# Without a disambiguating IP, for SAIOs, we have to assume ports
# uniquely identify "servers". SAIOs should be configured to *either*
# have unique IPs per node (e.g. 127.0.0.1, 127.0.0.2, etc.) OR unique
# ports per server (i.e. sdb1 & sdb5 would have same port numbers in
# the 8-disk EC ring).
return node1['port'] == node2['port']
def get_to_final_state(self):
# these .stop()s are probably not strictly necessary,
# but may prevent race conditions
@ -314,6 +363,19 @@ class ProbeTest(unittest.TestCase):
self.updaters.once()
self.replicators.once()
def kill_drive(self, device):
if os.path.ismount(device):
os.system('sudo umount %s' % device)
else:
renamer(device, device + "X")
def revive_drive(self, device):
disabled_name = device + "X"
if os.path.isdir(disabled_name):
renamer(device + "X", device)
else:
os.system('sudo mount %s' % device)
class ReplProbeTest(ProbeTest):

View File

@ -97,8 +97,9 @@ class TestAccountFailures(ReplProbeTest):
self.assert_(found2)
apart, anodes = self.account_ring.get_nodes(self.account)
kill_nonprimary_server(anodes, self.port2server, self.pids)
kill_server(anodes[0]['port'], self.port2server, self.pids)
kill_nonprimary_server(anodes, self.ipport2server, self.pids)
kill_server((anodes[0]['ip'], anodes[0]['port']),
self.ipport2server, self.pids)
# Kill account servers excepting two of the primaries
# Delete container1
@ -146,7 +147,8 @@ class TestAccountFailures(ReplProbeTest):
self.assert_(found2)
# Restart other primary account server
start_server(anodes[0]['port'], self.port2server, self.pids)
start_server((anodes[0]['ip'], anodes[0]['port']),
self.ipport2server, self.pids)
# Assert that server doesn't know about container1's deletion or the
# new container2/object2 yet

View File

@ -53,33 +53,96 @@ class TestAccountReaper(ReplProbeTest):
for node in nodes:
direct_delete_account(node, part, self.account)
# run the reaper
Manager(['account-reaper']).once()
self.get_to_final_state()
for policy, container, obj in all_objects:
# verify that any container deletes were at same timestamp
cpart, cnodes = self.container_ring.get_nodes(
self.account, container)
delete_times = set()
for cnode in cnodes:
try:
direct_head_container(cnode, cpart, self.account,
container)
except ClientException as err:
self.assertEquals(err.http_status, 404)
delete_time = err.http_headers.get(
'X-Backend-DELETE-Timestamp')
# 'X-Backend-DELETE-Timestamp' confirms it was deleted
self.assertTrue(delete_time)
delete_times.add(delete_time)
else:
self.fail('Found un-reaped /%s/%s on %r' %
(self.account, container, node))
# Container replicas may not yet be deleted if we have a
# policy with object replicas < container replicas, so
# ignore successful HEAD. We'll check for all replicas to
# be deleted again after running the replicators.
pass
self.assertEqual(1, len(delete_times), delete_times)
# verify that all object deletes were at same timestamp
object_ring = POLICIES.get_object_ring(policy.idx, '/etc/swift/')
part, nodes = object_ring.get_nodes(self.account, container, obj)
headers = {'X-Backend-Storage-Policy-Index': int(policy)}
delete_times = set()
for node in nodes:
try:
direct_get_object(node, part, self.account,
container, obj)
container, obj, headers=headers)
except ClientException as err:
self.assertEquals(err.http_status, 404)
delete_time = err.http_headers.get('X-Backend-Timestamp')
# 'X-Backend-Timestamp' confirms obj was deleted
self.assertTrue(delete_time)
delete_times.add(delete_time)
else:
self.fail('Found un-reaped /%s/%s/%s on %r in %s!' %
(self.account, container, obj, node, policy))
self.assertEqual(1, len(delete_times))
# run replicators and updaters
self.get_to_final_state()
for policy, container, obj in all_objects:
# verify that ALL container replicas are now deleted
cpart, cnodes = self.container_ring.get_nodes(
self.account, container)
delete_times = set()
for cnode in cnodes:
try:
direct_head_container(cnode, cpart, self.account,
container)
except ClientException as err:
self.assertEquals(err.http_status, 404)
delete_time = err.http_headers.get(
'X-Backend-DELETE-Timestamp')
# 'X-Backend-DELETE-Timestamp' confirms it was deleted
self.assertTrue(delete_time)
delete_times.add(delete_time)
else:
self.fail('Found un-reaped /%s/%s on %r' %
(self.account, container, cnode))
# sanity check that object state is still consistent...
object_ring = POLICIES.get_object_ring(policy.idx, '/etc/swift/')
part, nodes = object_ring.get_nodes(self.account, container, obj)
headers = {'X-Backend-Storage-Policy-Index': int(policy)}
delete_times = set()
for node in nodes:
try:
direct_get_object(node, part, self.account,
container, obj, headers=headers)
except ClientException as err:
self.assertEquals(err.http_status, 404)
delete_time = err.http_headers.get('X-Backend-Timestamp')
# 'X-Backend-Timestamp' confirms obj was deleted
self.assertTrue(delete_time)
delete_times.add(delete_time)
else:
self.fail('Found un-reaped /%s/%s/%s on %r in %s!' %
(self.account, container, obj, node, policy))
self.assertEqual(1, len(delete_times))
if __name__ == "__main__":

View File

@ -49,14 +49,16 @@ class TestContainerFailures(ReplProbeTest):
client.put_container(self.url, self.token, container1)
# Kill container1 servers excepting two of the primaries
kill_nonprimary_server(cnodes, self.port2server, self.pids)
kill_server(cnodes[0]['port'], self.port2server, self.pids)
kill_nonprimary_server(cnodes, self.ipport2server, self.pids)
kill_server((cnodes[0]['ip'], cnodes[0]['port']),
self.ipport2server, self.pids)
# Delete container1
client.delete_container(self.url, self.token, container1)
# Restart other container1 primary server
start_server(cnodes[0]['port'], self.port2server, self.pids)
start_server((cnodes[0]['ip'], cnodes[0]['port']),
self.ipport2server, self.pids)
# Create container1/object1 (allowed because at least server thinks the
# container exists)
@ -87,18 +89,23 @@ class TestContainerFailures(ReplProbeTest):
client.put_container(self.url, self.token, container1)
# Kill container1 servers excepting one of the primaries
cnp_port = kill_nonprimary_server(cnodes, self.port2server, self.pids)
kill_server(cnodes[0]['port'], self.port2server, self.pids)
kill_server(cnodes[1]['port'], self.port2server, self.pids)
cnp_ipport = kill_nonprimary_server(cnodes, self.ipport2server,
self.pids)
kill_server((cnodes[0]['ip'], cnodes[0]['port']),
self.ipport2server, self.pids)
kill_server((cnodes[1]['ip'], cnodes[1]['port']),
self.ipport2server, self.pids)
# Delete container1 directly to the one primary still up
direct_client.direct_delete_container(cnodes[2], cpart, self.account,
container1)
# Restart other container1 servers
start_server(cnodes[0]['port'], self.port2server, self.pids)
start_server(cnodes[1]['port'], self.port2server, self.pids)
start_server(cnp_port, self.port2server, self.pids)
start_server((cnodes[0]['ip'], cnodes[0]['port']),
self.ipport2server, self.pids)
start_server((cnodes[1]['ip'], cnodes[1]['port']),
self.ipport2server, self.pids)
start_server(cnp_ipport, self.ipport2server, self.pids)
# Get to a final state
self.get_to_final_state()

View File

@ -26,7 +26,8 @@ from swiftclient import client
from swift.common import direct_client
from swift.obj.diskfile import get_data_dir
from swift.common.exceptions import ClientException
from test.probe.common import kill_server, ReplProbeTest, start_server
from test.probe.common import (
kill_server, ReplProbeTest, start_server, get_server_number)
from swift.common.utils import readconf
from swift.common.manager import Manager
@ -35,7 +36,8 @@ class TestEmptyDevice(ReplProbeTest):
def _get_objects_dir(self, onode):
device = onode['device']
node_id = (onode['port'] - 6000) / 10
_, node_id = get_server_number((onode['ip'], onode['port']),
self.ipport2server)
obj_server_conf = readconf(self.configs['object-server'][node_id])
devices = obj_server_conf['app:object-server']['devices']
obj_dir = '%s/%s' % (devices, device)
@ -56,7 +58,8 @@ class TestEmptyDevice(ReplProbeTest):
onode = onodes[0]
# Kill one container/obj primary server
kill_server(onode['port'], self.port2server, self.pids)
kill_server((onode['ip'], onode['port']),
self.ipport2server, self.pids)
# Delete the default data directory for objects on the primary server
obj_dir = '%s/%s' % (self._get_objects_dir(onode),
@ -74,7 +77,8 @@ class TestEmptyDevice(ReplProbeTest):
# Kill other two container/obj primary servers
# to ensure GET handoff works
for node in onodes[1:]:
kill_server(node['port'], self.port2server, self.pids)
kill_server((node['ip'], node['port']),
self.ipport2server, self.pids)
# Indirectly through proxy assert we can get container/obj
odata = client.get_object(self.url, self.token, container, obj)[-1]
@ -83,13 +87,14 @@ class TestEmptyDevice(ReplProbeTest):
'returned: %s' % repr(odata))
# Restart those other two container/obj primary servers
for node in onodes[1:]:
start_server(node['port'], self.port2server, self.pids)
start_server((node['ip'], node['port']),
self.ipport2server, self.pids)
self.assertFalse(os.path.exists(obj_dir))
# We've indirectly verified the handoff node has the object, but
# let's directly verify it.
# Directly to handoff server assert we can get container/obj
another_onode = self.object_ring.get_more_nodes(opart).next()
another_onode = next(self.object_ring.get_more_nodes(opart))
odata = direct_client.direct_get_object(
another_onode, opart, self.account, container, obj,
headers={'X-Backend-Storage-Policy-Index': self.policy.idx})[-1]
@ -122,7 +127,8 @@ class TestEmptyDevice(ReplProbeTest):
missing)
# Bring the first container/obj primary server back up
start_server(onode['port'], self.port2server, self.pids)
start_server((onode['ip'], onode['port']),
self.ipport2server, self.pids)
# Assert that it doesn't have container/obj yet
self.assertFalse(os.path.exists(obj_dir))
@ -136,21 +142,17 @@ class TestEmptyDevice(ReplProbeTest):
else:
self.fail("Expected ClientException but didn't get it")
try:
port_num = onode['replication_port']
except KeyError:
port_num = onode['port']
try:
another_port_num = another_onode['replication_port']
except KeyError:
another_port_num = another_onode['port']
# Run object replication for first container/obj primary server
num = (port_num - 6000) / 10
_, num = get_server_number(
(onode['ip'], onode.get('replication_port', onode['port'])),
self.ipport2server)
Manager(['object-replicator']).once(number=num)
# Run object replication for handoff node
another_num = (another_port_num - 6000) / 10
_, another_num = get_server_number(
(another_onode['ip'],
another_onode.get('replication_port', another_onode['port'])),
self.ipport2server)
Manager(['object-replicator']).once(number=another_num)
# Assert the first container/obj primary server now has container/obj

View File

@ -41,15 +41,17 @@ class TestObjectAsyncUpdate(ReplProbeTest):
# Kill container servers excepting two of the primaries
cpart, cnodes = self.container_ring.get_nodes(self.account, container)
cnode = cnodes[0]
kill_nonprimary_server(cnodes, self.port2server, self.pids)
kill_server(cnode['port'], self.port2server, self.pids)
kill_nonprimary_server(cnodes, self.ipport2server, self.pids)
kill_server((cnode['ip'], cnode['port']),
self.ipport2server, self.pids)
# Create container/obj
obj = 'object-%s' % uuid4()
client.put_object(self.url, self.token, container, obj, '')
# Restart other primary server
start_server(cnode['port'], self.port2server, self.pids)
start_server((cnode['ip'], cnode['port']),
self.ipport2server, self.pids)
# Assert it does not know about container/obj
self.assert_(not direct_client.direct_get_container(

View File

@ -36,7 +36,7 @@ def get_data_file_path(obj_dir):
files = []
# We might need to try a few times if a request hasn't yet settled. For
# instance, a PUT can return success when just 2 of 3 nodes has completed.
for attempt in xrange(RETRIES + 1):
for attempt in range(RETRIES + 1):
try:
files = sorted(listdir(obj_dir), reverse=True)
break

View File

@ -41,7 +41,8 @@ class TestObjectHandoff(ReplProbeTest):
opart, onodes = self.object_ring.get_nodes(
self.account, container, obj)
onode = onodes[0]
kill_server(onode['port'], self.port2server, self.pids)
kill_server((onode['ip'], onode['port']),
self.ipport2server, self.pids)
# Create container/obj (goes to two primary servers and one handoff)
client.put_object(self.url, self.token, container, obj, 'VERIFY')
@ -53,7 +54,8 @@ class TestObjectHandoff(ReplProbeTest):
# Kill other two container/obj primary servers
# to ensure GET handoff works
for node in onodes[1:]:
kill_server(node['port'], self.port2server, self.pids)
kill_server((node['ip'], node['port']),
self.ipport2server, self.pids)
# Indirectly through proxy assert we can get container/obj
odata = client.get_object(self.url, self.token, container, obj)[-1]
@ -63,11 +65,12 @@ class TestObjectHandoff(ReplProbeTest):
# Restart those other two container/obj primary servers
for node in onodes[1:]:
start_server(node['port'], self.port2server, self.pids)
start_server((node['ip'], node['port']),
self.ipport2server, self.pids)
# We've indirectly verified the handoff node has the container/object,
# but let's directly verify it.
another_onode = self.object_ring.get_more_nodes(opart).next()
another_onode = next(self.object_ring.get_more_nodes(opart))
odata = direct_client.direct_get_object(
another_onode, opart, self.account, container, obj, headers={
'X-Backend-Storage-Policy-Index': self.policy.idx})[-1]
@ -90,7 +93,8 @@ class TestObjectHandoff(ReplProbeTest):
(cnode['ip'], cnode['port']))
# Bring the first container/obj primary server back up
start_server(onode['port'], self.port2server, self.pids)
start_server((onode['ip'], onode['port']),
self.ipport2server, self.pids)
# Assert that it doesn't have container/obj yet
try:
@ -138,7 +142,8 @@ class TestObjectHandoff(ReplProbeTest):
# Kill the first container/obj primary server again (we have two
# primaries and the handoff up now)
kill_server(onode['port'], self.port2server, self.pids)
kill_server((onode['ip'], onode['port']),
self.ipport2server, self.pids)
# Delete container/obj
try:
@ -175,7 +180,8 @@ class TestObjectHandoff(ReplProbeTest):
(cnode['ip'], cnode['port']))
# Restart the first container/obj primary server again
start_server(onode['port'], self.port2server, self.pids)
start_server((onode['ip'], onode['port']),
self.ipport2server, self.pids)
# Assert it still has container/obj
direct_client.direct_get_object(

View File

@ -53,7 +53,7 @@ class Body(object):
return self.chunk
def __next__(self):
return self.next()
return next(self)
class TestReconstructorPropDurable(ECProbeTest):

View File

@ -19,12 +19,14 @@ import unittest
import uuid
import shutil
import random
from collections import defaultdict
from test.probe.common import ECProbeTest
from swift.common import direct_client
from swift.common.storage_policy import EC_POLICY
from swift.common.manager import Manager
from swift.obj.reconstructor import _get_partners
from swiftclient import client
@ -52,7 +54,7 @@ class Body(object):
return self.chunk
def __next__(self):
return self.next()
return next(self)
class TestReconstructorRebuild(ECProbeTest):
@ -165,6 +167,61 @@ class TestReconstructorRebuild(ECProbeTest):
self._format_node(onode),
[self._format_node(n) for n in node_list]))
def test_rebuild_partner_down(self):
# create EC container
headers = {'X-Storage-Policy': self.policy.name}
client.put_container(self.url, self.token, self.container_name,
headers=headers)
# PUT object
contents = Body()
client.put_object(self.url, self.token,
self.container_name,
self.object_name,
contents=contents)
opart, onodes = self.object_ring.get_nodes(
self.account, self.container_name, self.object_name)
# find a primary server that only has one of it's devices in the
# primary node list
group_nodes_by_config = defaultdict(list)
for n in onodes:
group_nodes_by_config[self.config_number(n)].append(n)
for config_number, node_list in group_nodes_by_config.items():
if len(node_list) == 1:
break
else:
self.fail('ring balancing did not use all available nodes')
primary_node = node_list[0]
# pick one it's partners to fail randomly
partner_node = random.choice(_get_partners(
primary_node['index'], onodes))
# 507 the partner device
device_path = self.device_dir('object', partner_node)
self.kill_drive(device_path)
# select another primary sync_to node to fail
failed_primary = [n for n in onodes if n['id'] not in
(primary_node['id'], partner_node['id'])][0]
# ... capture it's fragment etag
failed_primary_etag = self.direct_get(failed_primary, opart)
# ... and delete it
part_dir = self.storage_dir('object', failed_primary, part=opart)
shutil.rmtree(part_dir, True)
# reconstruct from the primary, while one of it's partners is 507'd
self.reconstructor.once(number=self.config_number(primary_node))
# the other failed primary will get it's fragment rebuilt instead
self.assertEqual(failed_primary_etag,
self.direct_get(failed_primary, opart))
# just to be nice
self.revive_drive(device_path)
if __name__ == "__main__":
unittest.main()

Some files were not shown because too many files have changed in this diff Show More