Merge remote-tracking branch 'upstream/master' into ec

Conflicts:
	swift/obj/replicator.py
	swift/proxy/controllers/obj.py
	test/unit/proxy/controllers/test_obj.py

Change-Id: I9ebbf4e97a9a964e97484e477cd6dff212c73612
This commit is contained in:
Samuel Merritt 2015-02-10 11:31:51 -08:00
commit e852ca94cc
56 changed files with 4291 additions and 786 deletions

View File

@ -69,3 +69,4 @@ Guang Yee <guang.yee@hp.com> <guang.yee@hp.com>
Jing Liuqing <jing.liuqing@99cloud.net> <jing.liuqing@99cloud.net>
Lorcan Browne <lorcan.browne@hp.com> <lorcan.browne@hp.com>
Eohyung Lee <liquidnuker@gmail.com> <liquid@kt.com>
Harshit Chitalia <harshit@acelio.com> <harshit@acelio.com>

View File

@ -39,6 +39,7 @@ Mahati Chamarthy (mahati.chamarthy@gmail.com)
Zap Chang (zapchang@gmail.com)
François Charlier (francois.charlier@enovance.com)
Ray Chen (oldsharp@163.com)
Harshit Chitalia (harshit@acelio.com)
Brian Cline (bcline@softlayer.com)
Alistair Coles (alistair.coles@hp.com)
Brian Curtin (brian.curtin@rackspace.com)
@ -159,6 +160,7 @@ saranjan (saranjan@cisco.com)
Christian Schwede (info@cschwede.de)
Mark Seger (Mark.Seger@hp.com)
Andrew Clay Shafer (acs@parvuscaptus.com)
Dhriti Shikhar (dhrish20@gmail.com)
Chuck Short (chuck.short@canonical.com)
Michael Shuler (mshuler@gmail.com)
David Moreau Simard (dmsimard@iweb.com)
@ -178,6 +180,7 @@ Rainer Toebbicke (Rainer.Toebbicke@cern.ch)
Fujita Tomonori (fujita.tomonori@lab.ntt.co.jp)
Nirmal Thacker (nirmalthacker@gmail.com)
Kapil Thangavelu (kapil.foss@gmail.com)
Nicolas Trangez (ikke@nicolast.be)
Dean Troyer (dtroyer@gmail.com)
Kota Tsuyuzaki (tsuyuzaki.kota@lab.ntt.co.jp)
Dmitry Ukov (dukov@mirantis.com)

View File

@ -1,3 +1,62 @@
swift (2.2.2)
* Data placement changes
This release has several major changes to data placement in Swift in
order to better handle different deployment patterns. First, with an
unbalance-able ring, less partitions will move if the movement doesn't
result in any better dispersion across failure domains. Also, empty
(partition weight of zero) devices will no longer keep partitions after
rebalancing when there is an unbalance-able ring.
Second, the notion of "overload" has been added to Swift's rings. This
allows devices to take some extra partitions (more than would normally
be allowed by the device weight) so that smaller and unbalanced clusters
will have less data movement between servers, zones, or regions if there
is a failure in the cluster.
Finally, rings have a new metric called "dispersion". This is the
percentage of partitions in the ring that have too many replicas in a
particular failure domain. For example, if you have three servers in a
cluster but two replicas for a partition get placed onto the same
server, that partition will count towards the dispersion metric. A
lower value is better, and the value can be used to find the proper
value for "overload".
The overload and dispersion metrics have been exposed in the
swift-ring-build CLI tools.
See http://docs.openstack.org/developer/swift/overview_ring.html
for more info on how data placement works now.
* Improve replication of large out-of-sync, out-of-date containers.
* Added console logging to swift-drive-audit with a new log_to_console
config option (default False).
* Optimize replication when a device and/or partition is specified.
* Fix dynamic large object manifests getting versioned. This was not
intended and did not work. Now it is properly prevented.
* Fix the GET's response code when there is a missing segment in a
large object manifest.
* Change black/white listing in ratelimit middleware to use sysmeta.
Instead of using the config option, operators can set
"X-Account-Sysmeta-Global-Write-Ratelimit: WHITELIST" or
"X-Account-Sysmeta-Global-Write-Ratelimit: BLACKLIST" on an account to
whitelist or blacklist it for ratelimiting. Note: the existing
config options continue to work.
* Use TCP_NODELAY on outgoing connections.
* Improve object-replicator startup time.
* Implement OPTIONS verb for storage nodes.
* Various other minor bug fixes and improvements.
swift (2.2.1)
* Swift now rejects object names with Unicode surrogates.

View File

@ -126,7 +126,9 @@ def comment_fstab(mount_point):
with open('/etc/fstab.new', 'w') as new_fstab:
for line in fstab:
parts = line.split()
if len(parts) > 2 and line.split()[1] == mount_point:
if len(parts) > 2 \
and parts[1] == mount_point \
and not line.startswith('#'):
new_fstab.write('#' + line)
else:
new_fstab.write(line)

View File

@ -27,5 +27,8 @@ if __name__ == '__main__':
parser.add_option('-p', '--partitions',
help='Replicate only given partitions. '
'Comma-separated list')
parser.add_option('-i', '--policies',
help='Replicate only given policy indices. '
'Comma-separated list')
conf_file, options = parse_options(parser=parser, once=True)
run_daemon(ObjectReplicator, conf_file, **options)

View File

@ -28,11 +28,12 @@ object. The large object is comprised of two types of objects:
the segment objects in JSON format.
**Dynamic large objects**
The manifest object has no content but it has a
``X-Object-Manifest`` metadata header. The value of this header
is ``{container}/{prefix}``, where ``{container}`` is the name of
the container where the segment objects are stored, and
``{prefix}`` is a string that all segment objects have in common.
The manifest object has a ``X-Object-Manifest`` metadata header.
The value of this header is ``{container}/{prefix}``,
where ``{container}`` is the name of the container where the
segment objects are stored, and ``{prefix}`` is a string that all
segment objects have in common. The manifest object should have
no content. However, this is not enforced.
Note
~~~~
@ -288,8 +289,14 @@ both the original and new manifest objects share the same set of segment
objects.
When creating dynamic large objects, the **COPY** operation does not create
a manifest object. To duplicate a manifest object, use the **GET** operation
to read the value of ``X-Object-Manifest`` and use this value in the
``X-Object-Manifest`` request header in a **PUT** operation. This creates
a new manifest object that shares the same set of segment objects as the
original manifest object.
a manifest object but a normal object with content same as what you would
get on a **GET** request to original manifest object.
To duplicate a manifest object:
* Use the **GET** operation to read the value of ``X-Object-Manifest`` and
use this value in the ``X-Object-Manifest`` request header in a **PUT**
operation.
* Alternatively, you can include *``?multipart-manifest=get``* query
string in the **COPY** request.
This creates a new manifest object that shares the same set of segment
objects as the original manifest object.

View File

@ -83,6 +83,7 @@ Storage Backends (DiskFile API implementations)
* `SwiftOnFile <https://github.com/swiftonfile/swiftonfile>`_ - Enables objects created using Swift API to be accessed as files on a POSIX filesystem and vice versa.
* `swift-ceph-backend <https://github.com/stackforge/swift-ceph-backend>`_ - Ceph RADOS object server implementation for Swift.
* `kinetic-swift <https://github.com/swiftstack/kinetic-swift>`_ - Seagate Kinetic Drive as backend for Swift
* `swift-scality-backend <https://github.com/scality/ScalitySproxydSwift>`_ - Scality sproxyd object server implementation for Swift.
Other
-----
@ -95,3 +96,4 @@ Other
* `PyECLib <https://bitbucket.org/kmgreen2/pyeclib>`_ - High Level Erasure Code library used by Swift
* `liberasurecode <http://www.bytebucket.org/tsg-/liberasurecode>`_ - Low Level Erasure Code library used by PyECLib
* `Swift Browser <https://github.com/zerovm/swift-browser>`_ - JavaScript interface for Swift
* `swift-ui <https://github.com/fanatic/swift-ui>`_ - OpenStack Swift web browser

View File

@ -37,19 +37,37 @@ cluster, and the locations for a partition are stored in the mapping maintained
by the ring. The ring is also responsible for determining which devices are
used for handoff in failure scenarios.
Data can be isolated with the concept of zones in the ring. Each replica
of a partition is guaranteed to reside in a different zone. A zone could
represent a drive, a server, a cabinet, a switch, or even a datacenter.
The replicas of each partition will be isolated onto as many distinct regions,
zones, servers and devices as the capacity of these failure domains allow. If
there are less failure domains at a given tier than replicas of the partition
assigned within a tier (e.g. a 3 replica cluster with 2 servers), or the
available capacity across the failure domains within a tier are not well
balanced it will not be possible to achieve both even capacity distribution
(`balance`) as well as complete isolation of replicas across failure domains
(`dispersion`). When this occurs the ring management tools will display a
warning so that the operator can evaluate the cluster topology.
The partitions of the ring are equally divided among all the devices in the
Swift installation. When partitions need to be moved around (for example if a
device is added to the cluster), the ring ensures that a minimum number of
partitions are moved at a time, and only one replica of a partition is moved at
a time.
Data is evenly distributed across the capacity available in the cluster as
described by the devices weight. Weights can be used to balance the
distribution of partitions on drives across the cluster. This can be useful,
for example, when different sized drives are used in a cluster. Device
weights can also be used when adding or removing capacity or failure domains
to control how many partitions are reassigned during a rebalance to be moved
as soon as replication bandwidth allows.
Weights can be used to balance the distribution of partitions on drives
across the cluster. This can be useful, for example, when different sized
drives are used in a cluster.
.. note::
Prior to Swift 2.1.0 it was not possible to restrict partition movement by
device weight when adding new failure domains, and would allow extremely
unbalanced rings. The greedy dispersion algorithm is now subject to the
constraints of the physical capacity in the system, but can be adjusted
with-in reason via the overload option. Artificially unbalancing the
partition assignment without respect to capacity can introduce unexpected
full devices when a given failure domain does not physically support its
share of the used capacity in the tier.
When partitions need to be moved around (for example if a device is added to
the cluster), the ring ensures that a minimum number of partitions are moved
at a time, and only one replica of a partition is moved at a time.
The ring is used by the Proxy server and several background processes
(like replication).

View File

@ -154,29 +154,28 @@ add the configuration for the authtoken middleware::
[filter:authtoken]
paste.filter_factory = keystonemiddleware.auth_token:filter_factory
auth_host = keystonehost
auth_port = 35357
auth_protocol = http
auth_uri = http://keystonehost:5000/
identity_uri = http://keystonehost:35357/
admin_tenant_name = service
admin_user = swift
admin_password = password
auth_uri = http://keystonehost:5000/
cache = swift.cache
include_service_catalog = False
delay_auth_decision = True
The actual values for these variables will need to be set depending on
your situation. For more information, please refer to the `Keystone
auth_token middleware documentation
<http://docs.openstack.org/developer/keystonemiddleware/middlewarearchitecture.html#configuration>`_,
but in short:
your situation, but in short:
* Those variables beginning with ``auth_`` point to the Keystone
Admin service. This information is used by the middleware to actually
query Keystone about the validity of the
authentication tokens.
* ``identity_uri`` points to the Keystone Admin service. This information is
used by the middleware to actually query Keystone about the validity of the
authentication tokens. It is not necessary to append any Keystone API version
number to this URI.
* The admin auth credentials (``admin_user``, ``admin_tenant_name``,
``admin_password``) will be used to retrieve an admin token. That
token will be used to authorize user tokens behind the scenes.
* ``auth_uri`` should point to a Keystone service from which users may
retrieve tokens. This value is used in the `WWW-Authenticate` header that
auth_token sends with any denial response.
* ``cache`` is set to ``swift.cache``. This means that the middleware
will get the Swift memcache from the request environment.
* ``include_service_catalog`` defaults to ``True`` if not set. This means
@ -185,16 +184,14 @@ but in short:
use the ``X-Service-Catalog`` header, there is no point in getting
the service catalog. We recommend you set ``include_service_catalog``
to ``False``.
* If you wish to authenticate using Keystone's v3 API you must set the
``auth_version`` option to ``v3.0``.
.. note::
If support is required for unvalidated users (as with anonymous
access or making capabilities requests using :ref:`discoverability`) or
for tempurl/formpost middleware, authtoken will need
to be configured with delay_auth_decision set to 1.
The authtoken config variable ``delay_auth_decision`` must be set to
``True``. The default is ``False``, but that breaks public access,
:ref:`staticweb`, :ref:`formpost`, :ref:`tempurl`, and authenticated
capabilities requests (using :ref:`discoverability`).
and you can finally add the keystoneauth configuration::

View File

@ -40,7 +40,7 @@ So now, the following ``swift`` command would download the entire large object::
swift download test_container large_file
``swift`` uses a strict convention for its segmented object
``swift`` command uses a strict convention for its segmented object
support. In the above example it will upload all the segments into a
second container named test_container_segments. These segments will
have names like large_file/1290206778.25/21474836480/00000000,
@ -66,14 +66,15 @@ Direct API
You can also work with the segments and manifests directly with HTTP
requests instead of having ``swift`` do that for you. You can just
upload the segments like you would any other object and the manifest
is just a zero-byte file with an extra ``X-Object-Manifest`` header.
is just a zero-byte (not enforced) file with an extra
``X-Object-Manifest`` header.
All the object segments need to be in the same container, have a common object
name prefix, and their names sort in the order they should be concatenated.
They don't have to be in the same container as the manifest file will be, which
is useful to keep container listings clean as explained above with ``swift``.
The manifest file is simply a zero-byte file with the extra
The manifest file is simply a zero-byte (not enforced) file with the extra
``X-Object-Manifest: <container>/<prefix>`` header, where ``<container>`` is
the container the object segments are in and ``<prefix>`` is the common prefix
for all the segments.
@ -85,6 +86,17 @@ location and then update the manifest to point to this new location. During the
upload of the new segments, the original manifest will still be available to
download the first set of segments.
.. note::
The manifest file should have no content. However, this is not enforced.
If the manifest path itself conforms to container/prefix specified in
X-Object-Manifest, and if manifest has some content/data in it, it would
also be considered as segment and manifest's content will be part of the
concatenated GET response. The order of concatenation follows the usual DLO
logic which is - the order of concatenation adheres to order returned when
segment names are sorted.
Here's an example using ``curl`` with tiny 1-byte segments::
# First, upload the segments

View File

@ -85,5 +85,5 @@ If you want to disable all functionality, set ``allow_versions`` back to
Disable versioning a versioned container (x is any value except empty)::
curl -i -HPOST -H "X-Auth-Token: <token>" \
curl -i -XPOST -H "X-Auth-Token: <token>" \
-H "X-Remove-Versions-Location: x" http://<storage_url>/container

View File

@ -75,7 +75,7 @@ weight float The relative weight of the device in comparison to other
back into balance a device that has ended up with more or less
data than desired over time. A good average weight of 100.0
allows flexibility in lowering the weight later if necessary.
ip string The IP address of the server containing the device.
ip string The IP address or hostname of the server containing the device.
port int The TCP port the listening server process uses that serves
requests for the device.
device string The on disk name of the device on the server.

View File

@ -16,3 +16,12 @@
#
# Sets the maximum number of connections to each memcached server per worker
# memcache_max_connections = 2
#
# Timeout for connection
# connect_timeout = 0.3
# Timeout for pooled connection
# pool_timeout = 1.0
# number of servers to retry on failures getting a pooled connection
# tries = 3
# Timeout for read and writes
# io_timeout = 2.0

View File

@ -267,14 +267,17 @@ user_test_tester3 = testing3
#
# [filter:authtoken]
# paste.filter_factory = keystonemiddleware.auth_token:filter_factory
# auth_host = keystonehost
# auth_port = 35357
# auth_protocol = http
# identity_uri = http://keystonehost:35357/
# auth_uri = http://keystonehost:5000/
# admin_tenant_name = service
# admin_user = swift
# admin_password = password
# delay_auth_decision = 1
#
# delay_auth_decision defaults to False, but leaving it as false will
# prevent other auth systems, staticweb, tempurl, formpost, and ACLs from
# working. This value must be explicitly set to True.
# delay_auth_decision = False
#
# cache = swift.cache
# include_service_catalog = False
#
@ -348,6 +351,8 @@ use = egg:swift#memcache
#
# Sets the maximum number of connections to each memcached server per worker
# memcache_max_connections = 2
#
# More options documented in memcache.conf-sample
[filter:ratelimit]
use = egg:swift#ratelimit

View File

@ -28,6 +28,7 @@ from swift.common.direct_client import direct_delete_container, \
direct_delete_object, direct_get_container
from swift.common.exceptions import ClientException
from swift.common.ring import Ring
from swift.common.ring.utils import is_local_device
from swift.common.utils import get_logger, whataremyips, ismount, \
config_true_value, Timestamp
from swift.common.daemon import Daemon
@ -58,6 +59,7 @@ class AccountReaper(Daemon):
def __init__(self, conf, logger=None):
self.conf = conf
self.logger = logger or get_logger(conf, log_route='account-reaper')
self.bind_port = conf.get('bind_port', 6002)
self.devices = conf.get('devices', '/srv/node')
self.mount_check = config_true_value(conf.get('mount_check', 'true'))
self.interval = int(conf.get('interval', 3600))
@ -159,7 +161,8 @@ class AccountReaper(Daemon):
if not partition.isdigit():
continue
nodes = self.get_account_ring().get_part_nodes(int(partition))
if nodes[0]['ip'] not in self.myips or \
if not is_local_device(self.myips, self.bind_port,
nodes[0]['ip'], nodes[0]['port']) or \
not os.path.isdir(partition_path):
continue
for suffix in os.listdir(partition_path):

View File

@ -32,6 +32,7 @@ from swift.common.utils import get_logger, hash_path, public, \
from swift.common.constraints import check_mount, valid_timestamp, check_utf8
from swift.common import constraints
from swift.common.db_replicator import ReplicatorRpc
from swift.common.base_storage_server import BaseStorageServer
from swift.common.swob import HTTPAccepted, HTTPBadRequest, \
HTTPCreated, HTTPForbidden, HTTPInternalServerError, \
HTTPMethodNotAllowed, HTTPNoContent, HTTPNotFound, \
@ -40,18 +41,17 @@ from swift.common.swob import HTTPAccepted, HTTPBadRequest, \
from swift.common.request_helpers import is_sys_or_user_meta
class AccountController(object):
class AccountController(BaseStorageServer):
"""WSGI controller for the account server."""
server_type = 'account-server'
def __init__(self, conf, logger=None):
super(AccountController, self).__init__(conf)
self.logger = logger or get_logger(conf, log_route='account-server')
self.log_requests = config_true_value(conf.get('log_requests', 'true'))
self.root = conf.get('devices', '/srv/node')
self.mount_check = config_true_value(conf.get('mount_check', 'true'))
replication_server = conf.get('replication_server', None)
if replication_server is not None:
replication_server = config_true_value(replication_server)
self.replication_server = replication_server
self.replicator_rpc = ReplicatorRpc(self.root, DATADIR, AccountBroker,
self.mount_check,
logger=self.logger)
@ -262,15 +262,12 @@ class AccountController(object):
try:
# disallow methods which are not publicly accessible
try:
method = getattr(self, req.method)
getattr(method, 'publicly_accessible')
replication_method = getattr(method, 'replication', False)
if (self.replication_server is not None and
self.replication_server != replication_method):
if req.method not in self.allowed_methods:
raise AttributeError('Not allowed method.')
except AttributeError:
res = HTTPMethodNotAllowed()
else:
method = getattr(self, req.method)
res = method(req)
except HTTPException as error_response:
res = error_response

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,77 @@
# Copyright (c) 2010-2014 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import inspect
from swift import __version__ as swift_version
from swift.common.utils import public, timing_stats, config_true_value
from swift.common.swob import Response
class BaseStorageServer(object):
"""
Implements common OPTIONS method for object, account, container servers.
"""
def __init__(self, conf, **kwargs):
self._allowed_methods = None
replication_server = conf.get('replication_server', None)
if replication_server is not None:
replication_server = config_true_value(replication_server)
self.replication_server = replication_server
@property
def server_type(self):
raise NotImplementedError(
'Storage nodes have not implemented the Server type.')
@property
def allowed_methods(self):
if self._allowed_methods is None:
self._allowed_methods = []
all_methods = inspect.getmembers(self, predicate=callable)
if self.replication_server is True:
for name, m in all_methods:
if (getattr(m, 'publicly_accessible', False) and
getattr(m, 'replication', False)):
self._allowed_methods.append(name)
elif self.replication_server is False:
for name, m in all_methods:
if (getattr(m, 'publicly_accessible', False) and not
getattr(m, 'replication', False)):
self._allowed_methods.append(name)
elif self.replication_server is None:
for name, m in all_methods:
if getattr(m, 'publicly_accessible', False):
self._allowed_methods.append(name)
self._allowed_methods.sort()
return self._allowed_methods
@public
@timing_stats()
def OPTIONS(self, req):
"""
Base handler for OPTIONS requests
:param req: swob.Request object
:returns: swob.Response object
"""
# Prepare the default response
headers = {'Allow': ', '.join(self.allowed_methods),
'Server': '%s/%s' % (self.server_type, swift_version)}
resp = Response(status=200, request=req, headers=headers)
return resp

View File

@ -33,6 +33,7 @@ from swift.common.utils import get_logger, whataremyips, storage_directory, \
renamer, mkdirs, lock_parent_directory, config_true_value, \
unlink_older_than, dump_recon_cache, rsync_ip, ismount, json, Timestamp
from swift.common import ring
from swift.common.ring.utils import is_local_device
from swift.common.http import HTTP_NOT_FOUND, HTTP_INSUFFICIENT_STORAGE
from swift.common.bufferedhttp import BufferedHTTPConnection
from swift.common.exceptions import DriveNotMounted
@ -543,8 +544,9 @@ class Replicator(Daemon):
return
self._local_device_ids = set()
for node in self.ring.devs:
if (node and node['replication_ip'] in ips and
node['replication_port'] == self.port):
if node and is_local_device(ips, self.port,
node['replication_ip'],
node['replication_port']):
if self.mount_check and not ismount(
os.path.join(self.root, node['device'])):
self.logger.warn(

View File

@ -129,6 +129,10 @@ class LockTimeout(MessageTimeout):
pass
class ThreadPoolDead(SwiftException):
pass
class RingBuilderError(SwiftException):
pass

View File

@ -16,7 +16,8 @@
import os
from ConfigParser import ConfigParser, NoSectionError, NoOptionError
from swift.common.memcached import MemcacheRing
from swift.common.memcached import (MemcacheRing, CONN_TIMEOUT, POOL_TIMEOUT,
IO_TIMEOUT, TRY_COUNT)
class MemcacheMiddleware(object):
@ -36,6 +37,7 @@ class MemcacheMiddleware(object):
except ValueError:
max_conns = 0
memcache_options = {}
if (not self.memcache_servers
or serialization_format is None
or max_conns <= 0):
@ -43,6 +45,12 @@ class MemcacheMiddleware(object):
'memcache.conf')
memcache_conf = ConfigParser()
if memcache_conf.read(path):
# if memcache.conf exists we'll start with those base options
try:
memcache_options = dict(memcache_conf.items('memcache'))
except NoSectionError:
pass
if not self.memcache_servers:
try:
self.memcache_servers = \
@ -65,6 +73,17 @@ class MemcacheMiddleware(object):
except (NoSectionError, NoOptionError, ValueError):
pass
# while memcache.conf options are the base for the memcache
# middleware, if you set the same option also in the filter
# section of the proxy config it is more specific.
memcache_options.update(conf)
connect_timeout = float(memcache_options.get(
'connect_timeout', CONN_TIMEOUT))
pool_timeout = float(memcache_options.get(
'pool_timeout', POOL_TIMEOUT))
tries = int(memcache_options.get('tries', TRY_COUNT))
io_timeout = float(memcache_options.get('io_timeout', IO_TIMEOUT))
if not self.memcache_servers:
self.memcache_servers = '127.0.0.1:11211'
if max_conns <= 0:
@ -76,6 +95,10 @@ class MemcacheMiddleware(object):
self.memcache = MemcacheRing(
[s.strip() for s in self.memcache_servers.split(',') if s.strip()],
connect_timeout=connect_timeout,
pool_timeout=pool_timeout,
tries=tries,
io_timeout=io_timeout,
allow_pickle=(serialization_format == 0),
allow_unpickle=(serialization_format <= 1),
max_conns=max_conns)

View File

@ -595,7 +595,7 @@ class StaticLargeObject(object):
except (ValueError, TypeError):
raise HTTPBadRequest('Invalid Manifest File')
if seg_size < self.min_segment_size and \
len(parsed_data) > 1 and index < len(parsed_data) - 1:
index < len(parsed_data) - 1:
raise HTTPBadRequest(
'Each segment, except the last, must be at least '
'%d bytes.' % self.min_segment_size)

View File

@ -27,7 +27,8 @@ from time import time
from swift.common import exceptions
from swift.common.ring import RingData
from swift.common.ring.utils import tiers_for_dev, build_tier_tree
from swift.common.ring.utils import tiers_for_dev, build_tier_tree, \
validate_and_normalize_address
MAX_BALANCE = 999.99
@ -89,6 +90,9 @@ class RingBuilder(object):
self._last_part_moves = None
self._last_part_gather_start = 0
self._dispersion_graph = {}
self.dispersion = 0.0
self._remove_devs = []
self._ring = None
@ -143,6 +147,8 @@ class RingBuilder(object):
self._last_part_moves_epoch = builder['_last_part_moves_epoch']
self._last_part_moves = builder['_last_part_moves']
self._last_part_gather_start = builder['_last_part_gather_start']
self._dispersion_graph = builder.get('_dispersion_graph', {})
self.dispersion = builder.get('dispersion')
self._remove_devs = builder['_remove_devs']
self._ring = None
@ -170,6 +176,8 @@ class RingBuilder(object):
'_last_part_moves_epoch': self._last_part_moves_epoch,
'_last_part_moves': self._last_part_moves,
'_last_part_gather_start': self._last_part_gather_start,
'_dispersion_graph': self._dispersion_graph,
'dispersion': self.dispersion,
'_remove_devs': self._remove_devs}
def change_min_part_hours(self, min_part_hours):
@ -349,6 +357,7 @@ class RingBuilder(object):
if self._last_part_moves_epoch is None:
self._initial_balance()
self.devs_changed = False
self._build_dispersion_graph()
return self.parts, self.get_balance()
changed_parts = 0
self._update_last_part_moves()
@ -372,12 +381,62 @@ class RingBuilder(object):
self.devs_changed = False
self.version += 1
changed_parts = self._build_dispersion_graph(old_replica2part2dev)
return changed_parts, balance
def _build_dispersion_graph(self, old_replica2part2dev=None):
"""
Build a dict of all tiers in the cluster to a list of the number of
parts with a replica count at each index. The values of the dict will
be lists of length the maximum whole replica + 1 so that the
graph[tier][3] is the number of parts with in the tier with 3 replicas
and graph [tier][0] is the number of parts not assigned in this tier.
i.e.
{
<tier>: [
<number_of_parts_with_0_replicas>,
<number_of_parts_with_1_replicas>,
...
<number_of_parts_with_n_replicas>,
],
...
}
:param old_replica2part2dev: if called from rebalance, the
old_replica2part2dev can be used to count moved moved parts.
:returns: number of parts with different assignments than
old_replica2part2dev if provided
"""
# Since we're going to loop over every replica of every part we'll
# also count up changed_parts if old_replica2part2dev is passed in
old_replica2part2dev = old_replica2part2dev or []
# Compare the partition allocation before and after the rebalance
# Only changed device ids are taken into account; devices might be
# "touched" during the rebalance, but actually not really moved
changed_parts = 0
for rep_id, _rep in enumerate(self._replica2part2dev):
for part_id, new_device in enumerate(_rep):
int_replicas = int(math.ceil(self.replicas))
max_allowed_replicas = self._build_max_replicas_by_tier()
parts_at_risk = 0
tfd = {}
dispersion_graph = {}
# go over all the devices holding each replica part by part
for part_id, dev_ids in enumerate(
itertools.izip(*self._replica2part2dev)):
# count the number of replicas of this part for each tier of each
# device, some devices may have overlapping tiers!
replicas_at_tier = defaultdict(int)
for rep_id, dev in enumerate(iter(
self.devs[dev_id] for dev_id in dev_ids)):
if dev['id'] not in tfd:
tfd[dev['id']] = tiers_for_dev(dev)
for tier in tfd[dev['id']]:
replicas_at_tier[tier] += 1
# IndexErrors will be raised if the replicas are increased or
# decreased, and that actually means the partition has changed
try:
@ -386,9 +445,25 @@ class RingBuilder(object):
changed_parts += 1
continue
if old_device != new_device:
if old_device != dev['id']:
changed_parts += 1
return changed_parts, balance
part_at_risk = False
# update running totals for each tiers' number of parts with a
# given replica count
for tier, replicas in replicas_at_tier.items():
if tier not in dispersion_graph:
dispersion_graph[tier] = [self.parts] + [0] * int_replicas
dispersion_graph[tier][0] -= 1
dispersion_graph[tier][replicas] += 1
if replicas > max_allowed_replicas[tier]:
part_at_risk = True
# this part may be at risk in multiple tiers, but we only count it
# as at_risk once
if part_at_risk:
parts_at_risk += 1
self._dispersion_graph = dispersion_graph
self.dispersion = 100.0 * parts_at_risk / self.parts
return changed_parts
def validate(self, stats=False):
"""
@ -979,11 +1054,11 @@ class RingBuilder(object):
if candidates_with_room:
if len(candidates_with_room) > \
len(candidates_with_replicas):
# There exists at least one tier with room for
# another partition and 0 other replicas already in
# it, so we can use a faster search. The else
# branch's search would work here, but it's
# significantly slower.
# There exists at least one tier with room for
# another partition and 0 other replicas already
# in it, so we can use a faster search. The else
# branch's search would work here, but it's
# significantly slower.
roomiest_tier = max(
(t for t in candidates_with_room
if other_replicas[t] == 0),
@ -1060,7 +1135,8 @@ class RingBuilder(object):
def _build_max_replicas_by_tier(self):
"""
Returns a dict of (tier: replica_count) for all tiers in the ring.
Returns a defaultdict of (tier: replica_count) for all tiers in the
ring excluding zero weight devices.
There will always be a () entry as the root of the structure, whose
replica_count will equal the ring's replica_count.
@ -1108,7 +1184,8 @@ class RingBuilder(object):
"""
# Used by walk_tree to know what entries to create for each recursive
# call.
tier2children = build_tier_tree(self._iter_devs())
tier2children = build_tier_tree(d for d in self._iter_devs() if
d['weight'])
def walk_tree(tier, replica_count):
mr = {tier: replica_count}
@ -1118,7 +1195,9 @@ class RingBuilder(object):
submax = math.ceil(float(replica_count) / len(subtiers))
mr.update(walk_tree(subtier, submax))
return mr
return walk_tree((), self.replicas)
mr = defaultdict(float)
mr.update(walk_tree((), self.replicas))
return mr
def _devs_for_part(self, part):
"""
@ -1184,7 +1263,7 @@ class RingBuilder(object):
builder = RingBuilder(1, 1, 1)
builder.copy_from(builder_dict)
for dev in builder.devs:
#really old rings didn't have meta keys
# really old rings didn't have meta keys
if dev and 'meta' not in dev:
dev['meta'] = ''
# NOTE(akscram): An old ring builder file don't contain
@ -1227,6 +1306,15 @@ class RingBuilder(object):
if key == 'meta':
if value not in dev.get(key):
matched = False
elif key == 'ip' or key == 'replication_ip':
cdev = ''
try:
cdev = validate_and_normalize_address(
dev.get(key, ''))
except ValueError:
pass
if cdev != value:
matched = False
elif dev.get(key) != value:
matched = False
if matched:

View File

@ -13,8 +13,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from collections import defaultdict
from operator import itemgetter
import optparse
import re
import socket
from swift.common.utils import expand_ipv6
def tiers_for_dev(dev):
@ -127,10 +130,136 @@ def build_tier_tree(devices):
return tier2children
def validate_and_normalize_ip(ip):
"""
Return normalized ip if the ip is a valid ip.
Otherwise raise ValueError Exception. The hostname is
normalized to all lower case. IPv6-addresses are converted to
lowercase and fully expanded.
"""
# first convert to lower case
new_ip = ip.lower()
if is_valid_ipv4(new_ip):
return new_ip
elif is_valid_ipv6(new_ip):
return expand_ipv6(new_ip)
else:
raise ValueError('Invalid ip %s' % ip)
def validate_and_normalize_address(address):
"""
Return normalized address if the address is a valid ip or hostname.
Otherwise raise ValueError Exception. The hostname is
normalized to all lower case. IPv6-addresses are converted to
lowercase and fully expanded.
RFC1123 2.1 Host Names and Nubmers
DISCUSSION
This last requirement is not intended to specify the complete
syntactic form for entering a dotted-decimal host number;
that is considered to be a user-interface issue. For
example, a dotted-decimal number must be enclosed within
"[ ]" brackets for SMTP mail (see Section 5.2.17). This
notation could be made universal within a host system,
simplifying the syntactic checking for a dotted-decimal
number.
If a dotted-decimal number can be entered without such
identifying delimiters, then a full syntactic check must be
made, because a segment of a host domain name is now allowed
to begin with a digit and could legally be entirely numeric
(see Section 6.1.2.4). However, a valid host name can never
have the dotted-decimal form #.#.#.#, since at least the
highest-level component label will be alphabetic.
"""
new_address = address.lstrip('[').rstrip(']')
if address.startswith('[') and address.endswith(']'):
return validate_and_normalize_ip(new_address)
new_address = new_address.lower()
if is_valid_ipv4(new_address):
return new_address
elif is_valid_ipv6(new_address):
return expand_ipv6(new_address)
elif is_valid_hostname(new_address):
return new_address
else:
raise ValueError('Invalid address %s' % address)
def is_valid_ip(ip):
"""
Return True if the provided ip is a valid IP-address
"""
return is_valid_ipv4(ip) or is_valid_ipv6(ip)
def is_valid_ipv4(ip):
"""
Return True if the provided ip is a valid IPv4-address
"""
try:
socket.inet_pton(socket.AF_INET, ip)
except socket.error:
return False
return True
def is_valid_ipv6(ip):
"""
Return True if the provided ip is a valid IPv6-address
"""
try:
socket.inet_pton(socket.AF_INET6, ip)
except socket.error: # not a valid address
return False
return True
def is_valid_hostname(hostname):
"""
Return True if the provided hostname is a valid hostname
"""
if len(hostname) < 1 or len(hostname) > 255:
return False
if hostname[-1] == ".":
# strip exactly one dot from the right, if present
hostname = hostname[:-1]
allowed = re.compile("(?!-)[A-Z\d-]{1,63}(?<!-)$", re.IGNORECASE)
return all(allowed.match(x) for x in hostname.split("."))
def is_local_device(my_ips, my_port, dev_ip, dev_port):
"""
Return True if the provided dev_ip and dev_port are among the IP
addresses specified in my_ips and my_port respectively.
If dev_ip is a hostname then it is first translated to an IP
address before checking it against my_ips.
"""
if not is_valid_ip(dev_ip) and is_valid_hostname(dev_ip):
try:
# get the ip for this host; use getaddrinfo so that
# it works for both ipv4 and ipv6 addresses
addrinfo = socket.getaddrinfo(dev_ip, dev_port)
for addr in addrinfo:
family = addr[0]
dev_ip = addr[4][0] # get the ip-address
if family == socket.AF_INET6:
dev_ip = expand_ipv6(dev_ip)
if dev_ip in my_ips and dev_port == my_port:
return True
return False
except socket.gaierror:
return False
return dev_ip in my_ips and dev_port == my_port
def parse_search_value(search_value):
"""The <search-value> can be of the form::
d<device_id>r<region>z<zone>-<ip>:<port>[R<r_ip>:<r_port>]/
d<device_id>r<region>z<zone>-<ip>:<port>R<r_ip>:<r_port>/
<device_name>_<meta>
Where <r_ip> and <r_port> are replication ip and port.
@ -200,6 +329,12 @@ def parse_search_value(search_value):
i += 1
match['ip'] = search_value[:i].lstrip('[').rstrip(']')
search_value = search_value[i:]
if 'ip' in match:
# ipv6 addresses are converted to all lowercase
# and use the fully expanded representation
match['ip'] = validate_and_normalize_ip(match['ip'])
if search_value.startswith(':'):
i = 1
while i < len(search_value) and search_value[i].isdigit():
@ -223,6 +358,13 @@ def parse_search_value(search_value):
i += 1
match['replication_ip'] = search_value[:i].lstrip('[').rstrip(']')
search_value = search_value[i:]
if 'replication_ip' in match:
# ipv6 addresses are converted to all lowercase
# and use the fully expanded representation
match['replication_ip'] = \
validate_and_normalize_ip(match['replication_ip'])
if search_value.startswith(':'):
i = 1
while i < len(search_value) and search_value[i].isdigit():
@ -244,11 +386,68 @@ def parse_search_value(search_value):
return match
def parse_search_values_from_opts(opts):
"""
Convert optparse style options into a dictionary for searching.
:param opts: optparse style options
:returns: a dictonary with search values to filter devices,
supported parameters are id, region, zone, ip, port,
replication_ip, replication_port, device, weight, meta
"""
search_values = {}
for key in ('id', 'region', 'zone', 'ip', 'port', 'replication_ip',
'replication_port', 'device', 'weight', 'meta'):
value = getattr(opts, key, None)
if value:
if key == 'ip' or key == 'replication_ip':
value = validate_and_normalize_address(value)
search_values[key] = value
return search_values
def parse_change_values_from_opts(opts):
"""
Convert optparse style options into a dictionary for changing.
:param opts: optparse style options
:returns: a dictonary with change values to filter devices,
supported parameters are ip, port, replication_ip,
replication_port
"""
change_values = {}
for key in ('change_ip', 'change_port', 'change_replication_ip',
'change_replication_port', 'change_device', 'change_meta'):
value = getattr(opts, key, None)
if value:
if key == 'change_ip' or key == 'change_replication_ip':
value = validate_and_normalize_address(value)
change_values[key.replace('change_', '')] = value
return change_values
def validate_args(argvish):
"""
Build OptionParse and validate it whether the format is new command-line
format or not.
"""
opts, args = parse_args(argvish)
new_cmd_format = opts.id or opts.region or opts.zone or \
opts.ip or opts.port or \
opts.replication_ip or opts.replication_port or \
opts.device or opts.weight or opts.meta
return (new_cmd_format, opts, args)
def parse_args(argvish):
"""
Build OptionParser and evaluate command line arguments.
"""
parser = optparse.OptionParser()
parser.add_option('-u', '--id', type="int",
help="Device ID")
parser.add_option('-r', '--region', type="int",
help="Region")
parser.add_option('-z', '--zone', type="int",
@ -267,6 +466,18 @@ def parse_args(argvish):
help="Device weight")
parser.add_option('-m', '--meta', type="string", default="",
help="Extra device info (just a string)")
parser.add_option('-I', '--change-ip', type="string",
help="IP address for change")
parser.add_option('-P', '--change-port', type="int",
help="Port number for change")
parser.add_option('-J', '--change-replication-ip', type="string",
help="Replication IP address for change")
parser.add_option('-Q', '--change-replication-port', type="int",
help="Replication port number for change")
parser.add_option('-D', '--change-device', type="string",
help="Device name (e.g. md0, sdb1) for change")
parser.add_option('-M', '--change-meta', type="string", default="",
help="Extra device info (just a string) for change")
return parser.parse_args(argvish)
@ -299,35 +510,62 @@ def build_dev_from_opts(opts):
raise ValueError('Required argument %s/%s not specified.' %
(shortopt, longopt))
replication_ip = opts.replication_ip or opts.ip
ip = validate_and_normalize_address(opts.ip)
replication_ip = validate_and_normalize_address(
(opts.replication_ip or opts.ip))
replication_port = opts.replication_port or opts.port
return {'region': opts.region, 'zone': opts.zone, 'ip': opts.ip,
return {'region': opts.region, 'zone': opts.zone, 'ip': ip,
'port': opts.port, 'device': opts.device, 'meta': opts.meta,
'replication_ip': replication_ip,
'replication_port': replication_port, 'weight': opts.weight}
def find_parts(builder, argv):
devs = []
for arg in argv[3:]:
devs.extend(builder.search_devs(parse_search_value(arg)) or [])
def dispersion_report(builder, search_filter=None, verbose=False):
if not builder._dispersion_graph:
builder._build_dispersion_graph()
max_allowed_replicas = builder._build_max_replicas_by_tier()
worst_tier = None
max_dispersion = 0.0
sorted_graph = []
for tier, replica_counts in sorted(builder._dispersion_graph.items()):
tier_name = get_tier_name(tier, builder)
if search_filter and not re.match(search_filter, tier_name):
continue
max_replicas = int(max_allowed_replicas[tier])
at_risk_parts = sum(replica_counts[max_replicas + 1:])
placed_parts = sum(replica_counts[1:])
tier_dispersion = 100.0 * at_risk_parts / placed_parts
if tier_dispersion > max_dispersion:
max_dispersion = tier_dispersion
worst_tier = tier_name
max_dispersion = max(max_dispersion, tier_dispersion)
if not verbose:
continue
devs = [d['id'] for d in devs]
tier_report = {
'max_replicas': max_replicas,
'placed_parts': placed_parts,
'dispersion': tier_dispersion,
'replicas': replica_counts,
}
sorted_graph.append((tier_name, tier_report))
if not devs:
return None
return {
'max_dispersion': max_dispersion,
'worst_tier': worst_tier,
'graph': sorted_graph,
}
partition_count = {}
for replica in builder._replica2part2dev:
for partition, device in enumerate(replica):
if device in devs:
if partition not in partition_count:
partition_count[partition] = 0
partition_count[partition] += 1
# Sort by number of found replicas to keep the output format
sorted_partition_count = sorted(
partition_count.iteritems(), key=itemgetter(1), reverse=True)
return sorted_partition_count
def get_tier_name(tier, builder):
if len(tier) == 1:
return "r%s" % (tier[0], )
if len(tier) == 2:
return "r%sz%s" % (tier[0], tier[1])
if len(tier) == 3:
return "r%sz%s-%s" % (tier[0], tier[1], tier[2])
if len(tier) == 4:
device = builder.devs[tier[3]] or {}
return "r%sz%s-%s/%s" % (tier[0], tier[1], tier[2],
device.get('device', 'IDd%s' % tier[3]))

View File

@ -907,7 +907,7 @@ class NullLogger(object):
"""A no-op logger for eventlet wsgi."""
def write(self, *args):
#"Logs" the args to nowhere
# "Logs" the args to nowhere
pass
@ -1069,6 +1069,7 @@ class LoggingHandlerWeakRef(weakref.ref):
Like a weak reference, but passes through a couple methods that logging
handlers need.
"""
def close(self):
referent = self()
try:
@ -1542,6 +1543,17 @@ def parse_options(parser=None, once=False, test_args=None):
return config, options
def expand_ipv6(address):
"""
Expand ipv6 address.
:param address: a string indicating valid ipv6 address
:returns: a string indicating fully expanded ipv6 address
"""
packed_ip = socket.inet_pton(socket.AF_INET6, address)
return socket.inet_ntop(socket.AF_INET6, packed_ip)
def whataremyips():
"""
Get the machine's ip addresses
@ -1561,7 +1573,7 @@ def whataremyips():
# If we have an ipv6 address remove the
# %ether_interface at the end
if family == netifaces.AF_INET6:
addr = addr.split('%')[0]
addr = expand_ipv6(addr.split('%')[0])
addresses.append(addr)
except ValueError:
pass
@ -2388,7 +2400,7 @@ def dump_recon_cache(cache_dict, cache_file, logger, lock_timeout=2):
if existing_entry:
cache_entry = json.loads(existing_entry)
except ValueError:
#file doesn't have a valid entry, we'll recreate it
# file doesn't have a valid entry, we'll recreate it
pass
for cache_key, cache_value in cache_dict.items():
put_recon_cache_entry(cache_entry, cache_key, cache_value)
@ -2728,19 +2740,21 @@ def tpool_reraise(func, *args, **kwargs):
class ThreadPool(object):
BYTE = 'a'.encode('utf-8')
"""
Perform blocking operations in background threads.
Call its methods from within greenlets to green-wait for results without
blocking the eventlet reactor (hopefully).
"""
BYTE = 'a'.encode('utf-8')
def __init__(self, nthreads=2):
self.nthreads = nthreads
self._run_queue = Queue()
self._result_queue = Queue()
self._threads = []
self._alive = True
if nthreads <= 0:
return
@ -2788,6 +2802,8 @@ class ThreadPool(object):
"""
while True:
item = work_queue.get()
if item is None:
break
ev, func, args, kwargs = item
try:
result = func(*args, **kwargs)
@ -2842,6 +2858,9 @@ class ThreadPool(object):
:returns: result of calling func
:raises: whatever func raises
"""
if not self._alive:
raise swift.common.exceptions.ThreadPoolDead()
if self.nthreads <= 0:
result = func(*args, **kwargs)
sleep()
@ -2886,11 +2905,38 @@ class ThreadPool(object):
:returns: result of calling func
:raises: whatever func raises
"""
if not self._alive:
raise swift.common.exceptions.ThreadPoolDead()
if self.nthreads <= 0:
return self._run_in_eventlet_tpool(func, *args, **kwargs)
else:
return self.run_in_thread(func, *args, **kwargs)
def terminate(self):
"""
Releases the threadpool's resources (OS threads, greenthreads, pipes,
etc.) and renders it unusable.
Don't call run_in_thread() or force_run_in_thread() after calling
terminate().
"""
self._alive = False
if self.nthreads <= 0:
return
for _junk in range(self.nthreads):
self._run_queue.put(None)
for thr in self._threads:
thr.join()
self._threads = []
self.nthreads = 0
greenthread.kill(self._consumer_coro)
self.rpipe.close()
os.close(self.wpipe)
def ismount(path):
"""

View File

@ -38,6 +38,7 @@ from swift.common.bufferedhttp import http_connect
from swift.common.exceptions import ConnectionTimeout
from swift.common.http import HTTP_NOT_FOUND, is_success
from swift.common.storage_policy import POLICIES
from swift.common.base_storage_server import BaseStorageServer
from swift.common.swob import HTTPAccepted, HTTPBadRequest, HTTPConflict, \
HTTPCreated, HTTPInternalServerError, HTTPNoContent, HTTPNotFound, \
HTTPPreconditionFailed, HTTPMethodNotAllowed, Request, Response, \
@ -71,24 +72,22 @@ def gen_resp_headers(info, is_deleted=False):
return headers
class ContainerController(object):
class ContainerController(BaseStorageServer):
"""WSGI Controller for the container server."""
# Ensure these are all lowercase
save_headers = ['x-container-read', 'x-container-write',
'x-container-sync-key', 'x-container-sync-to']
server_type = 'container-server'
def __init__(self, conf, logger=None):
super(ContainerController, self).__init__(conf)
self.logger = logger or get_logger(conf, log_route='container-server')
self.log_requests = config_true_value(conf.get('log_requests', 'true'))
self.root = conf.get('devices', '/srv/node')
self.mount_check = config_true_value(conf.get('mount_check', 'true'))
self.node_timeout = int(conf.get('node_timeout', 3))
self.conn_timeout = float(conf.get('conn_timeout', 0.5))
replication_server = conf.get('replication_server', None)
if replication_server is not None:
replication_server = config_true_value(replication_server)
self.replication_server = replication_server
#: ContainerSyncCluster instance for validating sync-to values.
self.realms_conf = ContainerSyncRealms(
os.path.join(
@ -564,15 +563,12 @@ class ContainerController(object):
try:
# disallow methods which have not been marked 'public'
try:
method = getattr(self, req.method)
getattr(method, 'publicly_accessible')
replication_method = getattr(method, 'replication', False)
if (self.replication_server is not None and
self.replication_server != replication_method):
if req.method not in self.allowed_methods:
raise AttributeError('Not allowed method.')
except AttributeError:
res = HTTPMethodNotAllowed()
else:
method = getattr(self, req.method)
res = method(req)
except HTTPException as error_response:
res = error_response

View File

@ -29,6 +29,7 @@ from swift.common.direct_client import direct_get_object
from swift.common.internal_client import delete_object, put_object
from swift.common.exceptions import ClientException
from swift.common.ring import Ring
from swift.common.ring.utils import is_local_device
from swift.common.utils import (
audit_location_generator, clean_content_type, config_true_value,
FileLikeIter, get_logger, hash_path, quote, urlparse, validate_sync_to,
@ -239,7 +240,8 @@ class ContainerSync(Daemon):
x, nodes = self.container_ring.get_nodes(info['account'],
info['container'])
for ordinal, node in enumerate(nodes):
if node['ip'] in self._myips and node['port'] == self._myport:
if is_local_device(self._myips, self._myport,
node['ip'], node['port']):
break
else:
return

View File

@ -6,9 +6,9 @@
#, fuzzy
msgid ""
msgstr ""
"Project-Id-Version: swift 2.2.1.post18\n"
"Project-Id-Version: swift 2.2.2.post26\n"
"Report-Msgid-Bugs-To: EMAIL@ADDRESS\n"
"POT-Creation-Date: 2015-01-06 06:13+0000\n"
"POT-Creation-Date: 2015-02-06 06:10+0000\n"
"PO-Revision-Date: YEAR-MO-DA HO:MI+ZONE\n"
"Last-Translator: FULL NAME <EMAIL@ADDRESS>\n"
"Language-Team: LANGUAGE <LL@li.org>\n"
@ -153,13 +153,13 @@ msgstr ""
msgid "Exception with objects for container %(container)s for account %(account)s"
msgstr ""
#: swift/account/server.py:278 swift/container/server.py:580
#: swift/obj/server.py:726
#: swift/account/server.py:275 swift/container/server.py:576
#: swift/obj/server.py:723
#, python-format
msgid "ERROR __call__ error with %(method)s %(path)s "
msgstr ""
#: swift/common/bufferedhttp.py:154
#: swift/common/bufferedhttp.py:157
#, python-format
msgid "Error encoding to UTF-8: %s"
msgstr ""
@ -214,54 +214,54 @@ msgstr ""
msgid "ERROR rsync failed with %(code)s: %(args)s"
msgstr ""
#: swift/common/db_replicator.py:292
#: swift/common/db_replicator.py:293
#, python-format
msgid "ERROR Bad response %(status)s from %(host)s"
msgstr ""
#: swift/common/db_replicator.py:449 swift/common/db_replicator.py:673
#: swift/common/db_replicator.py:452 swift/common/db_replicator.py:676
#, python-format
msgid "Quarantining DB %s"
msgstr ""
#: swift/common/db_replicator.py:452
#: swift/common/db_replicator.py:455
#, python-format
msgid "ERROR reading db %s"
msgstr ""
#: swift/common/db_replicator.py:483
#: swift/common/db_replicator.py:486
#, python-format
msgid "ERROR Remote drive not mounted %s"
msgstr ""
#: swift/common/db_replicator.py:485
#: swift/common/db_replicator.py:488
#, python-format
msgid "ERROR syncing %(file)s with node %(node)s"
msgstr ""
#: swift/common/db_replicator.py:513
#: swift/common/db_replicator.py:516
#, python-format
msgid "ERROR while trying to clean up %s"
msgstr ""
#: swift/common/db_replicator.py:539
#: swift/common/db_replicator.py:542
msgid "ERROR Failed to get my own IPs?"
msgstr ""
#: swift/common/db_replicator.py:548
#: swift/common/db_replicator.py:551
#, python-format
msgid "Skipping %(device)s as it is not mounted"
msgstr ""
#: swift/common/db_replicator.py:557
#: swift/common/db_replicator.py:560
msgid "Beginning replication run"
msgstr ""
#: swift/common/db_replicator.py:562
#: swift/common/db_replicator.py:565
msgid "Replication run OVER"
msgstr ""
#: swift/common/db_replicator.py:575
#: swift/common/db_replicator.py:578
msgid "ERROR trying to replicate"
msgstr ""
@ -494,24 +494,24 @@ msgstr ""
msgid "Following CNAME chain for %(given_domain)s to %(found_domain)s"
msgstr ""
#: swift/common/middleware/ratelimit.py:233
#: swift/common/middleware/ratelimit.py:247
#, python-format
msgid "Returning 497 because of blacklisting: %s"
msgstr ""
#: swift/common/middleware/ratelimit.py:249
#: swift/common/middleware/ratelimit.py:262
#, python-format
msgid "Ratelimit sleep log: %(sleep)s for %(account)s/%(container)s/%(object)s"
msgstr ""
#: swift/common/middleware/ratelimit.py:257
#: swift/common/middleware/ratelimit.py:270
#, python-format
msgid ""
"Returning 498 for %(meth)s to %(acc)s/%(cont)s/%(obj)s . Ratelimit (Max "
"Sleep) %(e)s"
msgstr ""
#: swift/common/middleware/ratelimit.py:279
#: swift/common/middleware/ratelimit.py:292
msgid "Warning: Cannot ratelimit without a memcached client"
msgstr ""
@ -621,21 +621,21 @@ msgstr ""
msgid "ERROR Could not get container info %s"
msgstr ""
#: swift/container/server.py:181
#: swift/container/server.py:180
#, python-format
msgid ""
"ERROR Account update failed: different numbers of hosts and devices in "
"request: \"%s\" vs \"%s\""
msgstr ""
#: swift/container/server.py:222
#: swift/container/server.py:221
#, python-format
msgid ""
"ERROR Account update failed with %(ip)s:%(port)s/%(device)s (will retry "
"later): Response %(status)s %(reason)s"
msgstr ""
#: swift/container/server.py:231
#: swift/container/server.py:230
#, python-format
msgid ""
"ERROR account update failed with %(ip)s:%(port)s/%(device)s (will retry "
@ -697,8 +697,8 @@ msgstr ""
msgid "ERROR: Failed to get paths to drive partitions: %s"
msgstr ""
#: swift/container/updater.py:91 swift/obj/replicator.py:423
#: swift/obj/replicator.py:505
#: swift/container/updater.py:91 swift/obj/replicator.py:428
#: swift/obj/replicator.py:512
#, python-format
msgid "%s is not mounted"
msgstr ""
@ -829,17 +829,17 @@ msgstr ""
msgid "Quarantined %(object_path)s to %(quar_path)s because it is not a directory"
msgstr ""
#: swift/obj/diskfile.py:858
#: swift/obj/diskfile.py:867
#, python-format
msgid "Problem cleaning up %s"
msgstr ""
#: swift/obj/diskfile.py:1157
#: swift/obj/diskfile.py:1166
#, python-format
msgid "ERROR DiskFile %(data_file)s close failure: %(exc)s : %(stack)s"
msgstr ""
#: swift/obj/diskfile.py:1438
#: swift/obj/diskfile.py:1447
#, python-format
msgid ""
"Client path %(client)s does not match path stored in object metadata "
@ -958,32 +958,32 @@ msgstr ""
msgid "Lockup detected.. killing live coros."
msgstr ""
#: swift/obj/replicator.py:508
#: swift/obj/replicator.py:515
msgid "Ring change detected. Aborting current replication pass."
msgstr ""
#: swift/obj/replicator.py:518
#: swift/obj/replicator.py:536
msgid "Exception in top-level replication loop"
msgstr ""
#: swift/obj/replicator.py:527
#: swift/obj/replicator.py:545
msgid "Running object replicator in script mode."
msgstr ""
#: swift/obj/replicator.py:535
#: swift/obj/replicator.py:563
#, python-format
msgid "Object replication complete (once). (%.02f minutes)"
msgstr ""
#: swift/obj/replicator.py:542
#: swift/obj/replicator.py:570
msgid "Starting object replicator in daemon mode."
msgstr ""
#: swift/obj/replicator.py:546
#: swift/obj/replicator.py:574
msgid "Starting object replication pass."
msgstr ""
#: swift/obj/replicator.py:551
#: swift/obj/replicator.py:579
#, python-format
msgid "Object replication complete. (%.02f minutes)"
msgstr ""
@ -1079,55 +1079,55 @@ msgstr ""
msgid "Account"
msgstr ""
#: swift/proxy/controllers/base.py:696 swift/proxy/controllers/base.py:729
#: swift/proxy/controllers/obj.py:191 swift/proxy/controllers/obj.py:321
#: swift/proxy/controllers/obj.py:361 swift/proxy/controllers/obj.py:379
#: swift/proxy/controllers/obj.py:507
#: swift/proxy/controllers/base.py:697 swift/proxy/controllers/base.py:730
#: swift/proxy/controllers/obj.py:191 swift/proxy/controllers/obj.py:318
#: swift/proxy/controllers/obj.py:358 swift/proxy/controllers/obj.py:376
#: swift/proxy/controllers/obj.py:502
msgid "Object"
msgstr ""
#: swift/proxy/controllers/base.py:697
#: swift/proxy/controllers/base.py:698
msgid "Trying to read during GET (retrying)"
msgstr ""
#: swift/proxy/controllers/base.py:730
#: swift/proxy/controllers/base.py:731
msgid "Trying to read during GET"
msgstr ""
#: swift/proxy/controllers/base.py:734
#: swift/proxy/controllers/base.py:735
#, python-format
msgid "Client did not read from proxy within %ss"
msgstr ""
#: swift/proxy/controllers/base.py:739
#: swift/proxy/controllers/base.py:740
msgid "Client disconnected on read"
msgstr ""
#: swift/proxy/controllers/base.py:741
#: swift/proxy/controllers/base.py:742
msgid "Trying to send to client"
msgstr ""
#: swift/proxy/controllers/base.py:778 swift/proxy/controllers/base.py:1048
#: swift/proxy/controllers/base.py:779 swift/proxy/controllers/base.py:1049
#, python-format
msgid "Trying to %(method)s %(path)s"
msgstr ""
#: swift/proxy/controllers/base.py:815 swift/proxy/controllers/base.py:1036
#: swift/proxy/controllers/obj.py:353 swift/proxy/controllers/obj.py:393
#: swift/proxy/controllers/base.py:816 swift/proxy/controllers/base.py:1037
#: swift/proxy/controllers/obj.py:350 swift/proxy/controllers/obj.py:390
msgid "ERROR Insufficient Storage"
msgstr ""
#: swift/proxy/controllers/base.py:818
#: swift/proxy/controllers/base.py:819
#, python-format
msgid "ERROR %(status)d %(body)s From %(type)s Server"
msgstr ""
#: swift/proxy/controllers/base.py:1039
#: swift/proxy/controllers/base.py:1040
#, python-format
msgid "ERROR %(status)d Trying to %(method)s %(path)sFrom Container Server"
msgstr ""
#: swift/proxy/controllers/base.py:1151
#: swift/proxy/controllers/base.py:1152
#, python-format
msgid "%(type)s returning 503 for %(statuses)s"
msgstr ""
@ -1136,67 +1136,72 @@ msgstr ""
msgid "Container"
msgstr ""
#: swift/proxy/controllers/obj.py:322
#: swift/proxy/controllers/obj.py:319
#, python-format
msgid "Trying to write to %s"
msgstr ""
#: swift/proxy/controllers/obj.py:356
#: swift/proxy/controllers/obj.py:353
#, python-format
msgid "ERROR %(status)d Expect: 100-continue From Object Server"
msgstr ""
#: swift/proxy/controllers/obj.py:362
#: swift/proxy/controllers/obj.py:359
#, python-format
msgid "Expect: 100-continue on %s"
msgstr ""
#: swift/proxy/controllers/obj.py:380
#: swift/proxy/controllers/obj.py:377
#, python-format
msgid "Trying to get final status of PUT to %s"
msgstr ""
#: swift/proxy/controllers/obj.py:397
#: swift/proxy/controllers/obj.py:394
#, python-format
msgid "ERROR %(status)d %(body)s From Object Server re: %(path)s"
msgstr ""
#: swift/proxy/controllers/obj.py:673
#: swift/proxy/controllers/obj.py:665
#, python-format
msgid "Object PUT returning 412, %(statuses)r"
msgstr ""
#: swift/proxy/controllers/obj.py:679
#: swift/proxy/controllers/obj.py:674
#, python-format
msgid "Object PUT returning 202 for 409: %(req_timestamp)s <= %(timestamps)r"
msgstr ""
#: swift/proxy/controllers/obj.py:682
#, python-format
msgid "Object PUT returning 503, %(conns)s/%(nodes)s required connections"
msgstr ""
#: swift/proxy/controllers/obj.py:710
#: swift/proxy/controllers/obj.py:713
#, python-format
msgid ""
"Object PUT exceptions during send, %(conns)s/%(nodes)s required "
"connections"
msgstr ""
#: swift/proxy/controllers/obj.py:721
#: swift/proxy/controllers/obj.py:724
#, python-format
msgid "ERROR Client read timeout (%ss)"
msgstr ""
#: swift/proxy/controllers/obj.py:726
#: swift/proxy/controllers/obj.py:729
msgid "ERROR Exception causing client disconnect"
msgstr ""
#: swift/proxy/controllers/obj.py:731
#: swift/proxy/controllers/obj.py:734
msgid "Client disconnected without sending enough data"
msgstr ""
#: swift/proxy/controllers/obj.py:740
#: swift/proxy/controllers/obj.py:743
#, python-format
msgid "Object servers returned %s mismatched etags"
msgstr ""
#: swift/proxy/controllers/obj.py:744
#: swift/proxy/controllers/obj.py:747
msgid "Object PUT"
msgstr ""

View File

@ -8,8 +8,8 @@ msgid ""
msgstr ""
"Project-Id-Version: Swift\n"
"Report-Msgid-Bugs-To: EMAIL@ADDRESS\n"
"POT-Creation-Date: 2015-01-06 06:13+0000\n"
"PO-Revision-Date: 2015-01-05 16:24+0000\n"
"POT-Creation-Date: 2015-02-06 06:10+0000\n"
"PO-Revision-Date: 2015-02-05 16:52+0000\n"
"Last-Translator: openstackjenkins <jenkins@openstack.org>\n"
"Language-Team: Chinese (China) "
"(http://www.transifex.com/projects/p/swift/language/zh_CN/)\n"
@ -155,13 +155,13 @@ msgstr "%(ip)s:%(port)s/%(device)s出现异常"
msgid "Exception with objects for container %(container)s for account %(account)s"
msgstr "账号%(account)s容器%(container)s的对象出现异常"
#: swift/account/server.py:278 swift/container/server.py:580
#: swift/obj/server.py:726
#: swift/account/server.py:275 swift/container/server.py:576
#: swift/obj/server.py:723
#, python-format
msgid "ERROR __call__ error with %(method)s %(path)s "
msgstr "%(method)s %(path)s出现错误__call__ error"
#: swift/common/bufferedhttp.py:154
#: swift/common/bufferedhttp.py:157
#, python-format
msgid "Error encoding to UTF-8: %s"
msgstr "UTF-8编码错误%s"
@ -216,54 +216,54 @@ msgstr "%(success)s成功%(failure)s失败"
msgid "ERROR rsync failed with %(code)s: %(args)s"
msgstr "错误 rsync失败 %(code)s: %(args)s"
#: swift/common/db_replicator.py:292
#: swift/common/db_replicator.py:293
#, python-format
msgid "ERROR Bad response %(status)s from %(host)s"
msgstr "失败响应错误%(status)s来自%(host)s"
#: swift/common/db_replicator.py:449 swift/common/db_replicator.py:673
#: swift/common/db_replicator.py:452 swift/common/db_replicator.py:676
#, python-format
msgid "Quarantining DB %s"
msgstr "隔离DB%s"
#: swift/common/db_replicator.py:452
#: swift/common/db_replicator.py:455
#, python-format
msgid "ERROR reading db %s"
msgstr "错误 读取db %s"
#: swift/common/db_replicator.py:483
#: swift/common/db_replicator.py:486
#, python-format
msgid "ERROR Remote drive not mounted %s"
msgstr "错误 远程驱动器无法挂载 %s"
#: swift/common/db_replicator.py:485
#: swift/common/db_replicator.py:488
#, python-format
msgid "ERROR syncing %(file)s with node %(node)s"
msgstr "错误 同步 %(file)s 和 节点%(node)s"
#: swift/common/db_replicator.py:513
#: swift/common/db_replicator.py:516
#, python-format
msgid "ERROR while trying to clean up %s"
msgstr "清理时出现错误%s"
#: swift/common/db_replicator.py:539
#: swift/common/db_replicator.py:542
msgid "ERROR Failed to get my own IPs?"
msgstr "错误 无法获得我方IPs?"
#: swift/common/db_replicator.py:548
#: swift/common/db_replicator.py:551
#, python-format
msgid "Skipping %(device)s as it is not mounted"
msgstr "因无法挂载跳过%(device)s"
#: swift/common/db_replicator.py:557
#: swift/common/db_replicator.py:560
msgid "Beginning replication run"
msgstr "开始运行复制"
#: swift/common/db_replicator.py:562
#: swift/common/db_replicator.py:565
msgid "Replication run OVER"
msgstr "复制运行结束"
#: swift/common/db_replicator.py:575
#: swift/common/db_replicator.py:578
msgid "ERROR trying to replicate"
msgstr "尝试复制时发生错误"
@ -498,17 +498,17 @@ msgstr "集合%(given_domain)s到%(found_domain)s"
msgid "Following CNAME chain for %(given_domain)s to %(found_domain)s"
msgstr "跟随CNAME链从%(given_domain)s到%(found_domain)s"
#: swift/common/middleware/ratelimit.py:233
#: swift/common/middleware/ratelimit.py:247
#, python-format
msgid "Returning 497 because of blacklisting: %s"
msgstr "返回497因为黑名单%s"
#: swift/common/middleware/ratelimit.py:249
#: swift/common/middleware/ratelimit.py:262
#, python-format
msgid "Ratelimit sleep log: %(sleep)s for %(account)s/%(container)s/%(object)s"
msgstr "流量控制休眠日志:%(sleep)s for %(account)s/%(container)s/%(object)s"
#: swift/common/middleware/ratelimit.py:257
#: swift/common/middleware/ratelimit.py:270
#, python-format
msgid ""
"Returning 498 for %(meth)s to %(acc)s/%(cont)s/%(obj)s . Ratelimit (Max "
@ -517,7 +517,7 @@ msgstr ""
"返还498从%(meth)s到%(acc)s/%(cont)s/%(obj)s流量控制(Max \"\n"
"\"Sleep) %(e)s"
#: swift/common/middleware/ratelimit.py:279
#: swift/common/middleware/ratelimit.py:292
msgid "Warning: Cannot ratelimit without a memcached client"
msgstr "警告:缺失缓存客户端 无法控制流量 "
@ -627,21 +627,21 @@ msgstr "容器审计\"once\"模式完成:%.02fs"
msgid "ERROR Could not get container info %s"
msgstr "错误:无法获取容器%s信息"
#: swift/container/server.py:181
#: swift/container/server.py:180
#, python-format
msgid ""
"ERROR Account update failed: different numbers of hosts and devices in "
"request: \"%s\" vs \"%s\""
msgstr "出现错误 账号更新失败:本机数量与设备数量不符: \"%s\" vs \"%s\""
#: swift/container/server.py:222
#: swift/container/server.py:221
#, python-format
msgid ""
"ERROR Account update failed with %(ip)s:%(port)s/%(device)s (will retry "
"later): Response %(status)s %(reason)s"
msgstr "出现错误 账号更新失败: %(ip)s:%(port)s/%(device)s (稍后尝试): 回应 %(status)s %(reason)s"
#: swift/container/server.py:231
#: swift/container/server.py:230
#, python-format
msgid ""
"ERROR account update failed with %(ip)s:%(port)s/%(device)s (will retry "
@ -705,8 +705,8 @@ msgstr "同步错误 %(db_file)s %(row)s"
msgid "ERROR: Failed to get paths to drive partitions: %s"
msgstr "%s未挂载"
#: swift/container/updater.py:91 swift/obj/replicator.py:423
#: swift/obj/replicator.py:505
#: swift/container/updater.py:91 swift/obj/replicator.py:428
#: swift/obj/replicator.py:512
#, python-format
msgid "%s is not mounted"
msgstr "%s未挂载"
@ -847,17 +847,17 @@ msgstr "目录%s无法映射到一个有效的policy"
msgid "Quarantined %(object_path)s to %(quar_path)s because it is not a directory"
msgstr "隔离%(object_path)s和%(quar_path)s因为非目录"
#: swift/obj/diskfile.py:858
#: swift/obj/diskfile.py:867
#, python-format
msgid "Problem cleaning up %s"
msgstr "问题清除%s"
#: swift/obj/diskfile.py:1157
#: swift/obj/diskfile.py:1166
#, python-format
msgid "ERROR DiskFile %(data_file)s close failure: %(exc)s : %(stack)s"
msgstr "磁盘文件错误%(data_file)s关闭失败: %(exc)s : %(stack)s"
#: swift/obj/diskfile.py:1438
#: swift/obj/diskfile.py:1447
#, python-format
msgid ""
"Client path %(client)s does not match path stored in object metadata "
@ -978,32 +978,32 @@ msgstr "%s秒无复制"
msgid "Lockup detected.. killing live coros."
msgstr "检测到lockup。终止正在执行的coros"
#: swift/obj/replicator.py:508
#: swift/obj/replicator.py:515
msgid "Ring change detected. Aborting current replication pass."
msgstr "Ring改变被检测到。退出现有的复制通过"
#: swift/obj/replicator.py:518
#: swift/obj/replicator.py:536
msgid "Exception in top-level replication loop"
msgstr "top-level复制圈出现异常"
#: swift/obj/replicator.py:527
#: swift/obj/replicator.py:545
msgid "Running object replicator in script mode."
msgstr "在加密模式下执行对象复制"
#: swift/obj/replicator.py:535
#: swift/obj/replicator.py:563
#, python-format
msgid "Object replication complete (once). (%.02f minutes)"
msgstr "对象复制完成(一次)。(%.02f minutes)"
#: swift/obj/replicator.py:542
#: swift/obj/replicator.py:570
msgid "Starting object replicator in daemon mode."
msgstr "在守护模式下开始对象复制"
#: swift/obj/replicator.py:546
#: swift/obj/replicator.py:574
msgid "Starting object replication pass."
msgstr "开始通过对象复制"
#: swift/obj/replicator.py:551
#: swift/obj/replicator.py:579
#, python-format
msgid "Object replication complete. (%.02f minutes)"
msgstr "对象复制完成。(%.02f minutes)"
@ -1099,55 +1099,55 @@ msgstr "%(type)s服务器发生错误 %(ip)s:%(port)s/%(device)s re: %(info)s"
msgid "Account"
msgstr "账号"
#: swift/proxy/controllers/base.py:696 swift/proxy/controllers/base.py:729
#: swift/proxy/controllers/obj.py:191 swift/proxy/controllers/obj.py:321
#: swift/proxy/controllers/obj.py:361 swift/proxy/controllers/obj.py:379
#: swift/proxy/controllers/obj.py:507
#: swift/proxy/controllers/base.py:697 swift/proxy/controllers/base.py:730
#: swift/proxy/controllers/obj.py:191 swift/proxy/controllers/obj.py:318
#: swift/proxy/controllers/obj.py:358 swift/proxy/controllers/obj.py:376
#: swift/proxy/controllers/obj.py:502
msgid "Object"
msgstr "对象"
#: swift/proxy/controllers/base.py:697
#: swift/proxy/controllers/base.py:698
msgid "Trying to read during GET (retrying)"
msgstr "执行GET时尝试读取(重新尝试)"
#: swift/proxy/controllers/base.py:730
#: swift/proxy/controllers/base.py:731
msgid "Trying to read during GET"
msgstr "执行GET时尝试读取"
#: swift/proxy/controllers/base.py:734
#: swift/proxy/controllers/base.py:735
#, python-format
msgid "Client did not read from proxy within %ss"
msgstr "客户尚未从代理处读取%ss"
#: swift/proxy/controllers/base.py:739
#: swift/proxy/controllers/base.py:740
msgid "Client disconnected on read"
msgstr "客户读取时中断"
#: swift/proxy/controllers/base.py:741
#: swift/proxy/controllers/base.py:742
msgid "Trying to send to client"
msgstr "尝试发送到客户端"
#: swift/proxy/controllers/base.py:778 swift/proxy/controllers/base.py:1048
#: swift/proxy/controllers/base.py:779 swift/proxy/controllers/base.py:1049
#, python-format
msgid "Trying to %(method)s %(path)s"
msgstr "尝试执行%(method)s %(path)s"
#: swift/proxy/controllers/base.py:815 swift/proxy/controllers/base.py:1036
#: swift/proxy/controllers/obj.py:353 swift/proxy/controllers/obj.py:393
#: swift/proxy/controllers/base.py:816 swift/proxy/controllers/base.py:1037
#: swift/proxy/controllers/obj.py:350 swift/proxy/controllers/obj.py:390
msgid "ERROR Insufficient Storage"
msgstr "错误 存储空间不足"
#: swift/proxy/controllers/base.py:818
#: swift/proxy/controllers/base.py:819
#, python-format
msgid "ERROR %(status)d %(body)s From %(type)s Server"
msgstr "错误 %(status)d %(body)s 来自 %(type)s 服务器"
#: swift/proxy/controllers/base.py:1039
#: swift/proxy/controllers/base.py:1040
#, python-format
msgid "ERROR %(status)d Trying to %(method)s %(path)sFrom Container Server"
msgstr ""
#: swift/proxy/controllers/base.py:1151
#: swift/proxy/controllers/base.py:1152
#, python-format
msgid "%(type)s returning 503 for %(statuses)s"
msgstr "%(type)s 返回 503 在 %(statuses)s"
@ -1156,67 +1156,72 @@ msgstr "%(type)s 返回 503 在 %(statuses)s"
msgid "Container"
msgstr "容器"
#: swift/proxy/controllers/obj.py:322
#: swift/proxy/controllers/obj.py:319
#, python-format
msgid "Trying to write to %s"
msgstr "尝试执行书写%s"
#: swift/proxy/controllers/obj.py:356
#: swift/proxy/controllers/obj.py:353
#, python-format
msgid "ERROR %(status)d Expect: 100-continue From Object Server"
msgstr ""
#: swift/proxy/controllers/obj.py:362
#: swift/proxy/controllers/obj.py:359
#, python-format
msgid "Expect: 100-continue on %s"
msgstr "已知100-continue on %s"
#: swift/proxy/controllers/obj.py:380
#: swift/proxy/controllers/obj.py:377
#, python-format
msgid "Trying to get final status of PUT to %s"
msgstr "尝试执行获取最后的PUT状态%s"
#: swift/proxy/controllers/obj.py:397
#: swift/proxy/controllers/obj.py:394
#, python-format
msgid "ERROR %(status)d %(body)s From Object Server re: %(path)s"
msgstr "错误 %(status)d %(body)s 来自 对象服务器 re: %(path)s"
#: swift/proxy/controllers/obj.py:673
#: swift/proxy/controllers/obj.py:665
#, python-format
msgid "Object PUT returning 412, %(statuses)r"
msgstr "对象PUT返还 412%(statuses)r "
#: swift/proxy/controllers/obj.py:679
#: swift/proxy/controllers/obj.py:674
#, python-format
msgid "Object PUT returning 202 for 409: %(req_timestamp)s <= %(timestamps)r"
msgstr ""
#: swift/proxy/controllers/obj.py:682
#, python-format
msgid "Object PUT returning 503, %(conns)s/%(nodes)s required connections"
msgstr "对象PUT返回503%(conns)s/%(nodes)s 请求连接"
#: swift/proxy/controllers/obj.py:710
#: swift/proxy/controllers/obj.py:713
#, python-format
msgid ""
"Object PUT exceptions during send, %(conns)s/%(nodes)s required "
"connections"
msgstr "对象PUT发送时出现异常%(conns)s/%(nodes)s请求连接"
#: swift/proxy/controllers/obj.py:721
#: swift/proxy/controllers/obj.py:724
#, python-format
msgid "ERROR Client read timeout (%ss)"
msgstr "错误 客户读取超时(%ss)"
#: swift/proxy/controllers/obj.py:726
#: swift/proxy/controllers/obj.py:729
msgid "ERROR Exception causing client disconnect"
msgstr "错误 异常导致客户端中断连接"
#: swift/proxy/controllers/obj.py:731
#: swift/proxy/controllers/obj.py:734
msgid "Client disconnected without sending enough data"
msgstr "客户中断 尚未发送足够"
#: swift/proxy/controllers/obj.py:740
#: swift/proxy/controllers/obj.py:743
#, python-format
msgid "Object servers returned %s mismatched etags"
msgstr "对象服务器返还%s不匹配etags"
#: swift/proxy/controllers/obj.py:744
#: swift/proxy/controllers/obj.py:747
msgid "Object PUT"
msgstr "对象上传"

View File

@ -27,6 +27,7 @@ from eventlet import GreenPool, tpool, Timeout, sleep, hubs
from eventlet.green import subprocess
from eventlet.support.greenlets import GreenletExit
from swift.common.ring.utils import is_local_device
from swift.common.utils import whataremyips, unlink_older_than, \
compute_eta, get_logger, dump_recon_cache, ismount, \
rsync_ip, mkdirs, config_true_value, list_from_csv, get_hub, \
@ -406,16 +407,23 @@ class ObjectReplicator(Daemon):
self.kill_coros()
self.last_replication_count = self.replication_count
def build_replication_jobs(self, policy, jobs, ips):
def build_replication_jobs(self, policy, ips, override_devices=None,
override_partitions=None):
"""
Helper function for collect_jobs to build jobs for replication
using replication style storage policy
"""
jobs = []
obj_ring = self.get_object_ring(policy.idx)
data_dir = get_data_dir(policy.idx)
for local_dev in [dev for dev in obj_ring.devs
if dev and dev['replication_ip'] in ips and
dev['replication_port'] == self.port]:
if (dev
and is_local_device(ips,
self.port,
dev['replication_ip'],
dev['replication_port'])
and (override_devices is None
or dev['device'] in override_devices))]:
dev_path = join(self.devices_dir, local_dev['device'])
obj_path = join(dev_path, data_dir)
tmp_path = join(dev_path, get_tmp_dir(int(policy)))
@ -430,16 +438,12 @@ class ObjectReplicator(Daemon):
self.logger.exception('ERROR creating %s' % obj_path)
continue
for partition in os.listdir(obj_path):
if (override_partitions is not None
and partition not in override_partitions):
continue
try:
job_path = join(obj_path, partition)
if isfile(job_path):
# Clean up any (probably zero-byte) files where a
# partition should be.
self.logger.warning(
'Removing partition directory '
'which was a file: %s', job_path)
os.remove(job_path)
continue
part_nodes = obj_ring.get_part_nodes(int(partition))
nodes = [node for node in part_nodes
if node['id'] != local_dev['id']]
@ -451,20 +455,34 @@ class ObjectReplicator(Daemon):
policy_idx=policy.idx,
partition=partition,
object_ring=obj_ring))
except (ValueError, OSError):
except ValueError:
continue
return jobs
def collect_jobs(self):
def collect_jobs(self, override_devices=None, override_partitions=None,
override_policies=None):
"""
Returns a sorted list of jobs (dictionaries) that specify the
partitions, nodes, etc to be rsynced.
:param override_devices: if set, only jobs on these devices
will be returned
:param override_partitions: if set, only jobs on these partitions
will be returned
:param override_policies: if set, only jobs in these storage
policies will be returned
"""
jobs = []
ips = whataremyips()
for policy in POLICIES:
if policy.policy_type == REPL_POLICY:
self.build_replication_jobs(policy, jobs, ips)
if (policy.policy_type == REPL_POLICY
and override_policies is not None
and str(policy.idx) not in override_policies):
continue
# may need to branch here for future policy types
jobs += self.build_replication_jobs(
policy, ips, override_devices=override_devices,
override_partitions=override_partitions)
random.shuffle(jobs)
if self.handoffs_first:
# Move the handoff parts to the front of the list
@ -472,7 +490,8 @@ class ObjectReplicator(Daemon):
self.job_count = len(jobs)
return jobs
def replicate(self, override_devices=None, override_partitions=None):
def replicate(self, override_devices=None, override_partitions=None,
override_policies=None):
"""Run a replication pass"""
self.start = time.time()
self.suffix_count = 0
@ -482,24 +501,16 @@ class ObjectReplicator(Daemon):
self.last_replication_count = -1
self.partition_times = []
if override_devices is None:
override_devices = []
if override_partitions is None:
override_partitions = []
stats = eventlet.spawn(self.heartbeat)
lockup_detector = eventlet.spawn(self.detect_lockups)
eventlet.sleep() # Give spawns a cycle
try:
self.run_pool = GreenPool(size=self.concurrency)
jobs = self.collect_jobs()
jobs = self.collect_jobs(override_devices=override_devices,
override_partitions=override_partitions,
override_policies=override_policies)
for job in jobs:
if override_devices and job['device'] not in override_devices:
continue
if override_partitions and \
job['partition'] not in override_partitions:
continue
dev_path = join(self.devices_dir, job['device'])
if self.mount_check and not ismount(dev_path):
self.logger.warn(_('%s is not mounted'), job['device'])
@ -508,6 +519,17 @@ class ObjectReplicator(Daemon):
self.logger.info(_("Ring change detected. Aborting "
"current replication pass."))
return
try:
if isfile(job['path']):
# Clean up any (probably zero-byte) files where a
# partition should be.
self.logger.warning(
'Removing partition directory '
'which was a file: %s', job['path'])
os.remove(job['path'])
continue
except OSError:
continue
if job['delete']:
self.run_pool.spawn(self.update_deleted, job)
else:
@ -525,11 +547,21 @@ class ObjectReplicator(Daemon):
def run_once(self, *args, **kwargs):
start = time.time()
self.logger.info(_("Running object replicator in script mode."))
override_devices = list_from_csv(kwargs.get('devices'))
override_partitions = list_from_csv(kwargs.get('partitions'))
override_policies = list_from_csv(kwargs.get('policies'))
if not override_devices:
override_devices = None
if not override_partitions:
override_partitions = None
if not override_policies:
override_policies = None
self.replicate(
override_devices=override_devices,
override_partitions=override_partitions)
override_partitions=override_partitions,
override_policies=override_policies)
total = (time.time() - start) / 60
self.logger.info(
_("Object replication complete (once). (%.02f minutes)"), total)

View File

@ -42,6 +42,7 @@ from swift.common.exceptions import ConnectionTimeout, DiskFileQuarantined, \
DiskFileXattrNotSupported
from swift.obj import ssync_receiver
from swift.common.http import is_success
from swift.common.base_storage_server import BaseStorageServer
from swift.common.request_helpers import get_name_and_placement, \
is_user_meta, is_sys_or_user_meta
from swift.common.swob import HTTPAccepted, HTTPBadRequest, HTTPCreated, \
@ -80,9 +81,11 @@ class EventletPlungerString(str):
return wsgi.MINIMUM_CHUNK_SIZE + 1
class ObjectController(object):
class ObjectController(BaseStorageServer):
"""Implements the WSGI application for the Swift Object Server."""
server_type = 'object-server'
def __init__(self, conf, logger=None):
"""
Creates a new WSGI application for the Swift Object Server. An
@ -90,6 +93,7 @@ class ObjectController(object):
<source-dir>/etc/object-server.conf-sample or
/etc/swift/object-server.conf-sample.
"""
super(ObjectController, self).__init__(conf)
self.logger = logger or get_logger(conf, log_route='object-server')
self.node_timeout = int(conf.get('node_timeout', 3))
self.conn_timeout = float(conf.get('conn_timeout', 0.5))
@ -101,10 +105,6 @@ class ObjectController(object):
self.slow = int(conf.get('slow', 0))
self.keep_cache_private = \
config_true_value(conf.get('keep_cache_private', 'false'))
replication_server = conf.get('replication_server', None)
if replication_server is not None:
replication_server = config_true_value(replication_server)
self.replication_server = replication_server
default_allowed_headers = '''
content-disposition,
@ -808,15 +808,12 @@ class ObjectController(object):
try:
# disallow methods which have not been marked 'public'
try:
method = getattr(self, req.method)
getattr(method, 'publicly_accessible')
replication_method = getattr(method, 'replication', False)
if (self.replication_server is not None and
self.replication_server != replication_method):
if req.method not in self.allowed_methods:
raise AttributeError('Not allowed method.')
except AttributeError:
res = HTTPMethodNotAllowed()
else:
method = getattr(self, req.method)
res = method(req)
except DiskFileCollision:
res = HTTPForbidden(request=req)

View File

@ -185,7 +185,8 @@ def headers_to_object_info(headers, status_int=HTTP_OK):
'length': headers.get('content-length'),
'type': headers.get('content-type'),
'etag': headers.get('etag'),
'meta': meta
'meta': meta,
'sysmeta': sysmeta
}
return info

View File

@ -55,7 +55,7 @@ from swift.common.http import (
is_success, is_client_error, is_server_error, HTTP_CREATED,
HTTP_MULTIPLE_CHOICES, HTTP_NOT_FOUND, HTTP_INTERNAL_SERVER_ERROR,
HTTP_SERVICE_UNAVAILABLE, HTTP_INSUFFICIENT_STORAGE,
HTTP_PRECONDITION_FAILED)
HTTP_PRECONDITION_FAILED, HTTP_CONFLICT)
from swift.common.storage_policy import POLICIES
from swift.proxy.controllers.base import Controller, delay_denial, \
cors_validation
@ -293,7 +293,7 @@ class Putter(object):
conn.node = node
conn.resp = None
if is_success(resp.status):
if is_success(resp.status) or resp.status == HTTP_CONFLICT:
conn.resp = resp
elif (headers.get('If-None-Match', None) is not None and
resp.status == HTTP_PRECONDITION_FAILED):
@ -498,9 +498,6 @@ class ObjectController(Controller):
headers = [self.generate_request_headers(req, additional=req.headers)
for _junk in range(n_outgoing)]
for header in headers:
header['Connection'] = 'close'
for i, container in enumerate(containers):
i = i % len(headers)
@ -747,10 +744,8 @@ class ObjectController(Controller):
partition, nodes = obj_ring.get_nodes(
self.account_name, self.container_name, self.object_name)
# do a HEAD request for container sync and checking object versions
if 'x-timestamp' in req.headers or \
(object_versions and not
req.environ.get('swift_versioned_copy')):
# do a HEAD request for checking object versions
if object_versions and not req.environ.get('swift_versioned_copy'):
# make sure proxy-server uses the right policy index
_headers = {'X-Backend-Storage-Policy-Index': policy_index,
'X-Newest': 'True'}
@ -764,9 +759,6 @@ class ObjectController(Controller):
if 'x-timestamp' in req.headers:
try:
req_timestamp = Timestamp(req.headers['X-Timestamp'])
if hresp.environ and 'swift_x_timestamp' in hresp.environ and \
hresp.environ['swift_x_timestamp'] >= req_timestamp:
return HTTPAccepted(request=req)
except ValueError:
return HTTPBadRequest(
request=req, content_type='text/plain',
@ -824,8 +816,8 @@ class ObjectController(Controller):
else:
src_account_name = acct
src_container_name, src_obj_name = check_copy_from_header(req)
source_header = '/%s/%s/%s/%s' % (ver, src_account_name,
src_container_name, src_obj_name)
source_header = '/%s/%s/%s/%s' % (
ver, src_account_name, src_container_name, src_obj_name)
source_req = req.copy_get()
# make sure the source request uses it's container_info
@ -939,14 +931,25 @@ class ObjectController(Controller):
chunk_hashers[p.hshr_index] = (
None if policy.stores_objects_verbatim else md5())
if req.if_none_match is not None and '*' in req.if_none_match:
statuses = [p.current_status() for p in putters]
if HTTP_PRECONDITION_FAILED in statuses:
# If we find any copy of the file, it shouldn't be uploaded
self.app.logger.debug(
_('Object PUT returning 412, %(statuses)r'),
{'statuses': statuses})
return HTTPPreconditionFailed(request=req)
statuses = [p.current_status() for p in putters]
if (req.if_none_match is not None
and '*' in req.if_none_match
and HTTP_PRECONDITION_FAILED in statuses):
# If we find any copy of the file, it shouldn't be uploaded
self.app.logger.debug(
_('Object PUT returning 412, %(statuses)r'),
{'statuses': statuses})
return HTTPPreconditionFailed(request=req)
if HTTP_CONFLICT in statuses:
timestamps = [HeaderKeyDict(p.resp.getheaders()).get(
'X-Backend-Timestamp') for p in putters if p.resp]
self.app.logger.debug(
_('Object PUT returning 202 for 409: '
'%(req_timestamp)s <= %(timestamps)r'),
{'req_timestamp': req.timestamp.internal,
'timestamps': ', '.join(timestamps)})
return HTTPAccepted(request=req)
if len(putters) < min_puts:
self.app.logger.error(

View File

@ -666,13 +666,13 @@ def requires_acls(f):
def wrapper(*args, **kwargs):
global skip, cluster_info
if skip or not cluster_info:
raise SkipTest
raise SkipTest('Requires account ACLs')
# Determine whether this cluster has account ACLs; if not, skip test
if not cluster_info.get('tempauth', {}).get('account_acls'):
raise SkipTest
if 'keystoneauth' in cluster_info:
raise SkipTest('Requires account ACLs')
if swift_test_auth_version != '1':
# remove when keystoneauth supports account acls
raise SkipTest
raise SkipTest('Requires account ACLs')
reset_acl()
try:
rv = f(*args, **kwargs)

View File

@ -722,8 +722,10 @@ class File(Base):
['content_type', 'content-type'],
['last_modified', 'last-modified'],
['etag', 'etag']]
optional_fields = [['x_object_manifest', 'x-object-manifest']]
header_fields = self.header_fields(fields)
header_fields = self.header_fields(fields,
optional_fields=optional_fields)
header_fields['etag'] = header_fields['etag'].strip('"')
return header_fields

View File

@ -1805,6 +1805,9 @@ class TestDlo(Base):
file_item = self.env.container.file('man1')
file_contents = file_item.read(parms={'multipart-manifest': 'get'})
self.assertEqual(file_contents, "man1-contents")
self.assertEqual(file_item.info()['x_object_manifest'],
"%s/%s/seg_lower" %
(self.env.container.name, self.env.segment_prefix))
def test_get_range(self):
file_item = self.env.container.file('man1')
@ -1839,6 +1842,8 @@ class TestDlo(Base):
self.assertEqual(
file_contents,
"aaaaaaaaaabbbbbbbbbbccccccccccddddddddddeeeeeeeeeeffffffffff")
# The copied object must not have X-Object-Manifest
self.assertTrue("x_object_manifest" not in file_item.info())
def test_copy_account(self):
# dlo use same account and same container only
@ -1863,9 +1868,12 @@ class TestDlo(Base):
self.assertEqual(
file_contents,
"aaaaaaaaaabbbbbbbbbbccccccccccddddddddddeeeeeeeeeeffffffffff")
# The copied object must not have X-Object-Manifest
self.assertTrue("x_object_manifest" not in file_item.info())
def test_copy_manifest(self):
# Copying the manifest should result in another manifest
# Copying the manifest with multipart-manifest=get query string
# should result in another manifest
try:
man1_item = self.env.container.file('man1')
man1_item.copy(self.env.container.name, "copied-man1",
@ -1879,6 +1887,8 @@ class TestDlo(Base):
self.assertEqual(
copied_contents,
"aaaaaaaaaabbbbbbbbbbccccccccccddddddddddeeeeeeeeee")
self.assertEqual(man1_item.info()['x_object_manifest'],
copied.info()['x_object_manifest'])
finally:
# try not to leave this around for other tests to stumble over
self.env.container.file("copied-man1").delete()

View File

@ -26,6 +26,8 @@ from swift.common.http import HTTP_NOT_FOUND
from swiftclient import client, get_auth, ClientException
from test.probe.common import ENABLED_POLICIES
TIMEOUT = 60
@ -73,7 +75,7 @@ class BrainSplitter(object):
self.object_name = object_name
server_list = ['%s-server' % server_type] if server_type else ['all']
self.servers = Manager(server_list)
policies = list(POLICIES)
policies = list(ENABLED_POLICIES)
random.shuffle(policies)
self.policies = itertools.cycle(policies)

View File

@ -31,6 +31,9 @@ from swift.common.storage_policy import POLICIES
from test.probe import CHECK_SERVER_TIMEOUT, VALIDATE_RSYNC
ENABLED_POLICIES = [p for p in POLICIES if not p.is_deprecated]
def get_server_number(port, port2server):
server_number = port2server[port]
server, number = server_number[:-1], server_number[-1:]

View File

@ -22,7 +22,7 @@ from swift.common.manager import Manager
from swift.common.direct_client import direct_delete_account, \
direct_get_object, direct_head_container, ClientException
from test.probe.common import kill_servers, reset_environment, \
get_to_final_state
get_to_final_state, ENABLED_POLICIES
class TestAccountReaper(unittest.TestCase):
@ -38,7 +38,7 @@ class TestAccountReaper(unittest.TestCase):
def test_sync(self):
all_objects = []
# upload some containers
for policy in POLICIES:
for policy in ENABLED_POLICIES:
container = 'container-%s-%s' % (policy.name, uuid.uuid4())
client.put_container(self.url, self.token, container,
headers={'X-Storage-Policy': policy.name})
@ -52,11 +52,11 @@ class TestAccountReaper(unittest.TestCase):
headers = client.head_account(self.url, self.token)
self.assertEqual(int(headers['x-account-container-count']),
len(POLICIES))
len(ENABLED_POLICIES))
self.assertEqual(int(headers['x-account-object-count']),
len(POLICIES))
len(ENABLED_POLICIES))
self.assertEqual(int(headers['x-account-bytes-used']),
len(POLICIES) * len(body))
len(ENABLED_POLICIES) * len(body))
part, nodes = self.account_ring.get_nodes(self.account)
for node in nodes:

View File

@ -26,7 +26,8 @@ from swift.common import utils, direct_client
from swift.common.storage_policy import POLICIES
from swift.common.http import HTTP_NOT_FOUND
from test.probe.brain import BrainSplitter
from test.probe.common import reset_environment, get_to_final_state
from test.probe.common import reset_environment, get_to_final_state, \
ENABLED_POLICIES
from swiftclient import client, ClientException
@ -36,7 +37,7 @@ TIMEOUT = 60
class TestContainerMergePolicyIndex(unittest.TestCase):
def setUp(self):
if len(POLICIES) < 2:
if len(ENABLED_POLICIES) < 2:
raise SkipTest()
(self.pids, self.port2server, self.account_ring, self.container_ring,
self.object_ring, self.policy, self.url, self.token,
@ -252,7 +253,7 @@ class TestContainerMergePolicyIndex(unittest.TestCase):
# get an old container stashed
self.brain.stop_primary_half()
policy = random.choice(list(POLICIES))
policy = random.choice(ENABLED_POLICIES)
self.brain.put_container(policy.idx)
self.brain.start_primary_half()
# write some parts
@ -260,7 +261,8 @@ class TestContainerMergePolicyIndex(unittest.TestCase):
write_part(i)
self.brain.stop_handoff_half()
wrong_policy = random.choice([p for p in POLICIES if p is not policy])
wrong_policy = random.choice([p for p in ENABLED_POLICIES
if p is not policy])
self.brain.put_container(wrong_policy.idx)
# write some more parts
for i in range(10, 20):
@ -347,8 +349,9 @@ class TestContainerMergePolicyIndex(unittest.TestCase):
def test_reconciler_move_object_twice(self):
# select some policies
old_policy = random.choice(list(POLICIES))
new_policy = random.choice([p for p in POLICIES if p != old_policy])
old_policy = random.choice(ENABLED_POLICIES)
new_policy = random.choice([p for p in ENABLED_POLICIES
if p != old_policy])
# setup a split brain
self.brain.stop_handoff_half()

View File

@ -20,9 +20,8 @@ from nose import SkipTest
from swiftclient import client
from swift.common.storage_policy import POLICIES
from swift.common.manager import Manager
from test.probe.common import kill_servers, reset_environment
from test.probe.common import kill_servers, reset_environment, ENABLED_POLICIES
def get_current_realm_cluster(url):
@ -62,8 +61,8 @@ class TestContainerSync(unittest.TestCase):
dest_container = 'dest-container-%s' % uuid.uuid4()
dest_headers = base_headers.copy()
dest_policy = None
if len(POLICIES) > 1:
dest_policy = random.choice(list(POLICIES))
if len(ENABLED_POLICIES) > 1:
dest_policy = random.choice(ENABLED_POLICIES)
dest_headers['X-Storage-Policy'] = dest_policy.name
client.put_container(self.url, self.token, dest_container,
headers=dest_headers)
@ -75,7 +74,7 @@ class TestContainerSync(unittest.TestCase):
dest_container)
source_headers['X-Container-Sync-To'] = sync_to
if dest_policy:
source_policy = random.choice([p for p in POLICIES
source_policy = random.choice([p for p in ENABLED_POLICIES
if p is not dest_policy])
source_headers['X-Storage-Policy'] = source_policy.name
client.put_container(self.url, self.token, source_container,

View File

@ -20,10 +20,10 @@ from nose import SkipTest
from swift.common.internal_client import InternalClient
from swift.common.manager import Manager
from swift.common.storage_policy import POLICIES
from swift.common.utils import Timestamp
from test.probe.common import reset_environment, get_to_final_state
from test.probe.common import reset_environment, get_to_final_state, \
ENABLED_POLICIES
from test.probe.test_container_merge_policy_index import BrainSplitter
from swiftclient import client
@ -32,7 +32,7 @@ from swiftclient import client
class TestObjectExpirer(unittest.TestCase):
def setUp(self):
if len(POLICIES) < 2:
if len(ENABLED_POLICIES) < 2:
raise SkipTest('Need more than one policy')
self.expirer = Manager(['object-expirer'])
@ -56,8 +56,9 @@ class TestObjectExpirer(unittest.TestCase):
self.object_name)
def test_expirer_object_split_brain(self):
old_policy = random.choice(list(POLICIES))
wrong_policy = random.choice([p for p in POLICIES if p != old_policy])
old_policy = random.choice(ENABLED_POLICIES)
wrong_policy = random.choice([p for p in ENABLED_POLICIES
if p != old_policy])
# create an expiring object and a container with the wrong policy
self.brain.stop_primary_half()
self.brain.put_container(int(old_policy))
@ -114,7 +115,7 @@ class TestObjectExpirer(unittest.TestCase):
# and validate object is tombstoned
found_in_policy = None
for policy in POLICIES:
for policy in ENABLED_POLICIES:
metadata = self.client.get_object_metadata(
self.account, self.container_name, self.object_name,
acceptable_statuses=(4,),

View File

@ -29,8 +29,7 @@ from eventlet.green import socket
from tempfile import mkdtemp
from shutil import rmtree
from test import get_config
from swift.common import swob
from swift.common.utils import config_true_value, LogAdapter
from swift.common import swob, utils
from swift.common.ring import Ring, RingData
from hashlib import md5
from eventlet import sleep, Timeout
@ -43,6 +42,11 @@ import cPickle as pickle
from gzip import GzipFile
import mock as mocklib
# try not to import this module from swift
if not os.path.basename(sys.argv[0]).startswith('swift'):
# never patch HASH_PATH_SUFFIX AGAIN!
utils.HASH_PATH_SUFFIX = 'endcap'
def patch_policies(thing_or_policies=None, legacy_only=False):
if legacy_only:
@ -496,7 +500,7 @@ class DebugLogger(FakeLogger):
print self.formatter.format(record)
class DebugLogAdapter(LogAdapter):
class DebugLogAdapter(utils.LogAdapter):
def _send_to_logger(name):
def stub_fn(self, *args, **kwargs):
@ -538,7 +542,8 @@ def fake_syslog_handler():
logging.handlers.SysLogHandler = FakeLogger
if config_true_value(get_config('unit_test').get('fake_syslog', 'False')):
if utils.config_true_value(
get_config('unit_test').get('fake_syslog', 'False')):
fake_syslog_handler()

View File

@ -96,15 +96,15 @@ class FakeRing(object):
def __init__(self):
self.nodes = [{'id': '1',
'ip': '10.10.10.1',
'port': None,
'port': 6002,
'device': None},
{'id': '2',
'ip': '10.10.10.1',
'port': None,
'port': 6002,
'device': None},
{'id': '3',
'ip': '10.10.10.1',
'port': None,
'port': 6002,
'device': None},
]

View File

@ -28,6 +28,7 @@ import random
import simplejson
import xml.dom.minidom
from swift import __version__ as swift_version
from swift.common.swob import Request
from swift.common import constraints
from swift.account.server import AccountController
@ -56,6 +57,20 @@ class TestAccountController(unittest.TestCase):
if err.errno != errno.ENOENT:
raise
def test_OPTIONS(self):
server_handler = AccountController(
{'devices': self.testdir, 'mount_check': 'false'})
req = Request.blank('/sda1/p/a/c/o', {'REQUEST_METHOD': 'OPTIONS'})
req.content_length = 0
resp = server_handler.OPTIONS(req)
self.assertEquals(200, resp.status_int)
for verb in 'OPTIONS GET POST PUT DELETE HEAD REPLICATE'.split():
self.assertTrue(
verb in resp.headers['Allow'].split(', '))
self.assertEquals(len(resp.headers['Allow'].split(', ')), 7)
self.assertEquals(resp.headers['Server'],
(server_handler.server_type + '/' + swift_version))
def test_DELETE_not_found(self):
req = Request.blank('/sda1/p/a', environ={'REQUEST_METHOD': 'DELETE',
'HTTP_X_TIMESTAMP': '0'})
@ -1601,7 +1616,7 @@ class TestAccountController(unittest.TestCase):
with mock.patch.object(self.controller, method,
new=mock_method):
mock_method.replication = False
response = self.controller.__call__(env, start_response)
response = self.controller(env, start_response)
self.assertEqual(response, method_res)
def test_not_allowed_method(self):
@ -1643,6 +1658,38 @@ class TestAccountController(unittest.TestCase):
response = self.controller.__call__(env, start_response)
self.assertEqual(response, answer)
def test_call_incorrect_replication_method(self):
inbuf = StringIO()
errbuf = StringIO()
outbuf = StringIO()
self.controller = AccountController(
{'devices': self.testdir, 'mount_check': 'false',
'replication_server': 'true'})
def start_response(*args):
"""Sends args to outbuf"""
outbuf.writelines(args)
obj_methods = ['DELETE', 'PUT', 'HEAD', 'GET', 'POST', 'OPTIONS']
for method in obj_methods:
env = {'REQUEST_METHOD': method,
'SCRIPT_NAME': '',
'PATH_INFO': '/sda1/p/a/c',
'SERVER_NAME': '127.0.0.1',
'SERVER_PORT': '8080',
'SERVER_PROTOCOL': 'HTTP/1.0',
'CONTENT_LENGTH': '0',
'wsgi.version': (1, 0),
'wsgi.url_scheme': 'http',
'wsgi.input': inbuf,
'wsgi.errors': errbuf,
'wsgi.multithread': False,
'wsgi.multiprocess': False,
'wsgi.run_once': False}
self.controller(env, start_response)
self.assertEquals(errbuf.getvalue(), '')
self.assertEquals(outbuf.getvalue()[:4], '405 ')
def test_GET_log_requests_true(self):
self.controller.logger = FakeLogger()
self.controller.log_requests = True

File diff suppressed because it is too large Load Diff

View File

@ -13,12 +13,19 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from textwrap import dedent
import unittest
from ConfigParser import NoSectionError, NoOptionError
import mock
from swift.common.middleware import memcache
from swift.common.memcached import MemcacheRing
from swift.common.swob import Request
from swift.common.wsgi import loadapp
from test.unit import with_tempdir, patch_policies
class FakeApp(object):
@ -49,6 +56,16 @@ def get_config_parser(memcache_servers='1.2.3.4:5',
class SetConfigParser(object):
def items(self, section_name):
if section_name != section:
raise NoSectionError(section_name)
return {
'memcache_servers': memcache_servers,
'memcache_serialization_support':
memcache_serialization_support,
'memcache_max_connections': memcache_max_connections,
}
def read(self, path):
return True
@ -295,5 +312,120 @@ class TestCacheMiddleware(unittest.TestCase):
self.assertEquals(
thefilter.memcache._client_cache['10.10.10.10:10'].max_size, 3)
@patch_policies
def _loadapp(self, proxy_config_path):
"""
Load a proxy from an app.conf to get the memcache_ring
:returns: the memcache_ring of the memcache middleware filter
"""
with mock.patch('swift.proxy.server.Ring'):
app = loadapp(proxy_config_path)
memcache_ring = None
while True:
memcache_ring = getattr(app, 'memcache', None)
if memcache_ring:
break
app = app.app
return memcache_ring
@with_tempdir
def test_real_config(self, tempdir):
config = """
[pipeline:main]
pipeline = cache proxy-server
[app:proxy-server]
use = egg:swift#proxy
[filter:cache]
use = egg:swift#memcache
"""
config_path = os.path.join(tempdir, 'test.conf')
with open(config_path, 'w') as f:
f.write(dedent(config))
memcache_ring = self._loadapp(config_path)
# only one server by default
self.assertEqual(memcache_ring._client_cache.keys(),
['127.0.0.1:11211'])
# extra options
self.assertEqual(memcache_ring._connect_timeout, 0.3)
self.assertEqual(memcache_ring._pool_timeout, 1.0)
# tries is limited to server count
self.assertEqual(memcache_ring._tries, 1)
self.assertEqual(memcache_ring._io_timeout, 2.0)
@with_tempdir
def test_real_config_with_options(self, tempdir):
config = """
[pipeline:main]
pipeline = cache proxy-server
[app:proxy-server]
use = egg:swift#proxy
[filter:cache]
use = egg:swift#memcache
memcache_servers = 10.0.0.1:11211,10.0.0.2:11211,10.0.0.3:11211,
10.0.0.4:11211
connect_timeout = 1.0
pool_timeout = 0.5
tries = 4
io_timeout = 1.0
"""
config_path = os.path.join(tempdir, 'test.conf')
with open(config_path, 'w') as f:
f.write(dedent(config))
memcache_ring = self._loadapp(config_path)
self.assertEqual(sorted(memcache_ring._client_cache.keys()),
['10.0.0.%d:11211' % i for i in range(1, 5)])
# extra options
self.assertEqual(memcache_ring._connect_timeout, 1.0)
self.assertEqual(memcache_ring._pool_timeout, 0.5)
# tries is limited to server count
self.assertEqual(memcache_ring._tries, 4)
self.assertEqual(memcache_ring._io_timeout, 1.0)
@with_tempdir
def test_real_memcache_config(self, tempdir):
proxy_config = """
[DEFAULT]
swift_dir = %s
[pipeline:main]
pipeline = cache proxy-server
[app:proxy-server]
use = egg:swift#proxy
[filter:cache]
use = egg:swift#memcache
connect_timeout = 1.0
""" % tempdir
proxy_config_path = os.path.join(tempdir, 'test.conf')
with open(proxy_config_path, 'w') as f:
f.write(dedent(proxy_config))
memcache_config = """
[memcache]
memcache_servers = 10.0.0.1:11211,10.0.0.2:11211,10.0.0.3:11211,
10.0.0.4:11211
connect_timeout = 0.5
io_timeout = 1.0
"""
memcache_config_path = os.path.join(tempdir, 'memcache.conf')
with open(memcache_config_path, 'w') as f:
f.write(dedent(memcache_config))
memcache_ring = self._loadapp(proxy_config_path)
self.assertEqual(sorted(memcache_ring._client_cache.keys()),
['10.0.0.%d:11211' % i for i in range(1, 5)])
# proxy option takes precedence
self.assertEqual(memcache_ring._connect_timeout, 1.0)
# default tries are not limited by servers
self.assertEqual(memcache_ring._tries, 3)
# memcache conf options are defaults
self.assertEqual(memcache_ring._io_timeout, 1.0)
if __name__ == '__main__':
unittest.main()

View File

@ -184,7 +184,7 @@ class TestRingBuilder(unittest.TestCase):
'ip': '127.0.0.1', 'port': 6000})
self.assertEquals(rb.devs[0]['id'], 0)
self.assertEqual(dev_id, 0)
#test add another dev with no id
# test add another dev with no id
dev_id = rb.add_dev({'zone': 3, 'region': 2, 'weight': 1,
'ip': '127.0.0.1', 'port': 6000})
self.assertEquals(rb.devs[1]['id'], 1)
@ -1186,7 +1186,7 @@ class TestRingBuilder(unittest.TestCase):
io_error_generic = IOError()
io_error_generic.errno = errno.EOPNOTSUPP
try:
#test a legit builder
# test a legit builder
fake_pickle = mock.Mock(return_value=rb)
pickle.load = fake_pickle
builder = ring.RingBuilder.load('fake.builder', open=fake_open)
@ -1195,7 +1195,7 @@ class TestRingBuilder(unittest.TestCase):
self.assertEquals(builder, rb)
fake_pickle.reset_mock()
#test old style builder
# test old style builder
fake_pickle.return_value = rb.to_dict()
pickle.load = fake_pickle
builder = ring.RingBuilder.load('fake.builder', open=fake_open)
@ -1203,7 +1203,7 @@ class TestRingBuilder(unittest.TestCase):
self.assertEquals(builder.devs, rb.devs)
fake_pickle.reset_mock()
#test old devs but no meta
# test old devs but no meta
no_meta_builder = rb
for dev in no_meta_builder.devs:
del(dev['meta'])
@ -1213,21 +1213,21 @@ class TestRingBuilder(unittest.TestCase):
fake_open.assert_has_calls([mock.call('fake.builder', 'rb')])
self.assertEquals(builder.devs, rb.devs)
#test an empty builder
# test an empty builder
fake_pickle.side_effect = EOFError
pickle.load = fake_pickle
self.assertRaises(exceptions.UnPicklingError,
ring.RingBuilder.load, 'fake.builder',
open=fake_open)
#test a corrupted builder
# test a corrupted builder
fake_pickle.side_effect = pickle.UnpicklingError
pickle.load = fake_pickle
self.assertRaises(exceptions.UnPicklingError,
ring.RingBuilder.load, 'fake.builder',
open=fake_open)
#test some error
# test some error
fake_pickle.side_effect = AttributeError
pickle.load = fake_pickle
self.assertRaises(exceptions.UnPicklingError,
@ -1236,19 +1236,19 @@ class TestRingBuilder(unittest.TestCase):
finally:
pickle.load = real_pickle
#test non existent builder file
# test non existent builder file
fake_open.side_effect = io_error_not_found
self.assertRaises(exceptions.FileNotFoundError,
ring.RingBuilder.load, 'fake.builder',
open=fake_open)
#test non accessible builder file
# test non accessible builder file
fake_open.side_effect = io_error_no_perm
self.assertRaises(exceptions.PermissionError,
ring.RingBuilder.load, 'fake.builder',
open=fake_open)
#test an error other then ENOENT and ENOPERM
# test an error other then ENOENT and ENOPERM
fake_open.side_effect = io_error_generic
self.assertRaises(IOError,
ring.RingBuilder.load, 'fake.builder',
@ -1467,6 +1467,70 @@ class TestRingBuilder(unittest.TestCase):
key=operator.itemgetter('id'))
self.assertEqual(part_devs, [rb.devs[0], rb.devs[1]])
def test_dispersion_with_zero_weight_devices(self):
rb = ring.RingBuilder(8, 3.0, 0)
# add two devices to a single server in a single zone
rb.add_dev({'id': 0, 'region': 0, 'zone': 0, 'weight': 1,
'ip': '127.0.0.1', 'port': 10000, 'device': 'sda'})
rb.add_dev({'id': 1, 'region': 0, 'zone': 0, 'weight': 1,
'ip': '127.0.0.1', 'port': 10000, 'device': 'sdb'})
# and a zero weight device
rb.add_dev({'id': 2, 'region': 0, 'zone': 0, 'weight': 0,
'ip': '127.0.0.1', 'port': 10000, 'device': 'sdc'})
rb.rebalance()
self.assertEqual(rb.dispersion, 0.0)
self.assertEqual(rb._dispersion_graph, {
(0,): [0, 0, 0, 256],
(0, 0): [0, 0, 0, 256],
(0, 0, '127.0.0.1:10000'): [0, 0, 0, 256],
(0, 0, '127.0.0.1:10000', 0): [0, 128, 128, 0],
(0, 0, '127.0.0.1:10000', 1): [0, 128, 128, 0],
})
def test_dispersion_with_zero_weight_devices_with_parts(self):
rb = ring.RingBuilder(8, 3.0, 1)
# add three devices to a single server in a single zone
rb.add_dev({'id': 0, 'region': 0, 'zone': 0, 'weight': 1,
'ip': '127.0.0.1', 'port': 10000, 'device': 'sda'})
rb.add_dev({'id': 1, 'region': 0, 'zone': 0, 'weight': 1,
'ip': '127.0.0.1', 'port': 10000, 'device': 'sdb'})
rb.add_dev({'id': 2, 'region': 0, 'zone': 0, 'weight': 1,
'ip': '127.0.0.1', 'port': 10000, 'device': 'sdc'})
rb.rebalance(seed=1)
self.assertEqual(rb.dispersion, 0.0)
self.assertEqual(rb._dispersion_graph, {
(0,): [0, 0, 0, 256],
(0, 0): [0, 0, 0, 256],
(0, 0, '127.0.0.1:10000'): [0, 0, 0, 256],
(0, 0, '127.0.0.1:10000', 0): [0, 256, 0, 0],
(0, 0, '127.0.0.1:10000', 1): [0, 256, 0, 0],
(0, 0, '127.0.0.1:10000', 2): [0, 256, 0, 0],
})
# now mark a device 2 for decom
rb.set_dev_weight(2, 0.0)
# we'll rebalance but can't move any parts
rb.rebalance(seed=1)
# zero weight tier has one copy of *every* part
self.assertEqual(rb.dispersion, 100.0)
self.assertEqual(rb._dispersion_graph, {
(0,): [0, 0, 0, 256],
(0, 0): [0, 0, 0, 256],
(0, 0, '127.0.0.1:10000'): [0, 0, 0, 256],
(0, 0, '127.0.0.1:10000', 0): [0, 256, 0, 0],
(0, 0, '127.0.0.1:10000', 1): [0, 256, 0, 0],
(0, 0, '127.0.0.1:10000', 2): [0, 256, 0, 0],
})
rb.pretend_min_part_hours_passed()
rb.rebalance(seed=3)
self.assertEqual(rb.dispersion, 0.0)
self.assertEqual(rb._dispersion_graph, {
(0,): [0, 0, 0, 256],
(0, 0): [0, 0, 0, 256],
(0, 0, '127.0.0.1:10000'): [0, 0, 0, 256],
(0, 0, '127.0.0.1:10000', 0): [0, 128, 128, 0],
(0, 0, '127.0.0.1:10000', 1): [0, 128, 128, 0],
})
if __name__ == '__main__':
unittest.main()

View File

@ -16,10 +16,17 @@
import unittest
from swift.common import ring
from swift.common.ring.utils import (build_tier_tree, tiers_for_dev,
parse_search_value, parse_args,
build_dev_from_opts, find_parts,
parse_builder_ring_filename_args)
from swift.common.ring.utils import (tiers_for_dev, build_tier_tree,
validate_and_normalize_ip,
validate_and_normalize_address,
is_valid_ip, is_valid_ipv4,
is_valid_ipv6, is_valid_hostname,
is_local_device, parse_search_value,
parse_search_values_from_opts,
parse_change_values_from_opts,
validate_args, parse_args,
parse_builder_ring_filename_args,
build_dev_from_opts, dispersion_report)
class TestUtils(unittest.TestCase):
@ -94,6 +101,121 @@ class TestUtils(unittest.TestCase):
(1, 2, '192.168.2.2:6000', 10),
(1, 2, '192.168.2.2:6000', 11)]))
def test_is_valid_ip(self):
self.assertTrue(is_valid_ip("127.0.0.1"))
self.assertTrue(is_valid_ip("10.0.0.1"))
ipv6 = "fe80:0000:0000:0000:0204:61ff:fe9d:f156"
self.assertTrue(is_valid_ip(ipv6))
ipv6 = "fe80:0:0:0:204:61ff:fe9d:f156"
self.assertTrue(is_valid_ip(ipv6))
ipv6 = "fe80::204:61ff:fe9d:f156"
self.assertTrue(is_valid_ip(ipv6))
ipv6 = "fe80:0000:0000:0000:0204:61ff:254.157.241.86"
self.assertTrue(is_valid_ip(ipv6))
ipv6 = "fe80:0:0:0:0204:61ff:254.157.241.86"
self.assertTrue(is_valid_ip(ipv6))
ipv6 = "fe80::204:61ff:254.157.241.86"
self.assertTrue(is_valid_ip(ipv6))
ipv6 = "fe80::"
self.assertTrue(is_valid_ip(ipv6))
ipv6 = "::1"
self.assertTrue(is_valid_ip(ipv6))
not_ipv6 = "3ffe:0b00:0000:0001:0000:0000:000a"
self.assertFalse(is_valid_ip(not_ipv6))
not_ipv6 = "1:2:3:4:5:6::7:8"
self.assertFalse(is_valid_ip(not_ipv6))
def test_is_valid_ipv4(self):
self.assertTrue(is_valid_ipv4("127.0.0.1"))
self.assertTrue(is_valid_ipv4("10.0.0.1"))
ipv6 = "fe80:0000:0000:0000:0204:61ff:fe9d:f156"
self.assertFalse(is_valid_ipv4(ipv6))
ipv6 = "fe80:0:0:0:204:61ff:fe9d:f156"
self.assertFalse(is_valid_ipv4(ipv6))
ipv6 = "fe80::204:61ff:fe9d:f156"
self.assertFalse(is_valid_ipv4(ipv6))
ipv6 = "fe80:0000:0000:0000:0204:61ff:254.157.241.86"
self.assertFalse(is_valid_ipv4(ipv6))
ipv6 = "fe80:0:0:0:0204:61ff:254.157.241.86"
self.assertFalse(is_valid_ipv4(ipv6))
ipv6 = "fe80::204:61ff:254.157.241.86"
self.assertFalse(is_valid_ipv4(ipv6))
ipv6 = "fe80::"
self.assertFalse(is_valid_ipv4(ipv6))
ipv6 = "::1"
self.assertFalse(is_valid_ipv4(ipv6))
not_ipv6 = "3ffe:0b00:0000:0001:0000:0000:000a"
self.assertFalse(is_valid_ipv4(not_ipv6))
not_ipv6 = "1:2:3:4:5:6::7:8"
self.assertFalse(is_valid_ipv4(not_ipv6))
def test_is_valid_ipv6(self):
self.assertFalse(is_valid_ipv6("127.0.0.1"))
self.assertFalse(is_valid_ipv6("10.0.0.1"))
ipv6 = "fe80:0000:0000:0000:0204:61ff:fe9d:f156"
self.assertTrue(is_valid_ipv6(ipv6))
ipv6 = "fe80:0:0:0:204:61ff:fe9d:f156"
self.assertTrue(is_valid_ipv6(ipv6))
ipv6 = "fe80::204:61ff:fe9d:f156"
self.assertTrue(is_valid_ipv6(ipv6))
ipv6 = "fe80:0000:0000:0000:0204:61ff:254.157.241.86"
self.assertTrue(is_valid_ipv6(ipv6))
ipv6 = "fe80:0:0:0:0204:61ff:254.157.241.86"
self.assertTrue(is_valid_ipv6(ipv6))
ipv6 = "fe80::204:61ff:254.157.241.86"
self.assertTrue(is_valid_ipv6(ipv6))
ipv6 = "fe80::"
self.assertTrue(is_valid_ipv6(ipv6))
ipv6 = "::1"
self.assertTrue(is_valid_ipv6(ipv6))
not_ipv6 = "3ffe:0b00:0000:0001:0000:0000:000a"
self.assertFalse(is_valid_ipv6(not_ipv6))
not_ipv6 = "1:2:3:4:5:6::7:8"
self.assertFalse(is_valid_ipv6(not_ipv6))
def test_is_valid_hostname(self):
self.assertTrue(is_valid_hostname("local"))
self.assertTrue(is_valid_hostname("test.test.com"))
hostname = "test." * 51
self.assertTrue(is_valid_hostname(hostname))
hostname = hostname.rstrip('.')
self.assertTrue(is_valid_hostname(hostname))
hostname = hostname + "00"
self.assertFalse(is_valid_hostname(hostname))
self.assertFalse(is_valid_hostname("$blah#"))
def test_is_local_device(self):
my_ips = ["127.0.0.1",
"0000:0000:0000:0000:0000:0000:0000:0001"]
my_port = 6000
self.assertTrue(is_local_device(my_ips, my_port,
"localhost",
my_port))
def test_validate_and_normalize_ip(self):
ipv4 = "10.0.0.1"
self.assertEqual(ipv4, validate_and_normalize_ip(ipv4))
ipv6 = "fe80::204:61ff:fe9d:f156"
self.assertEqual(ipv6, validate_and_normalize_ip(ipv6.upper()))
hostname = "test.test.com"
self.assertRaises(ValueError,
validate_and_normalize_ip, hostname)
hostname = "$blah#"
self.assertRaises(ValueError,
validate_and_normalize_ip, hostname)
def test_validate_and_normalize_address(self):
ipv4 = "10.0.0.1"
self.assertEqual(ipv4, validate_and_normalize_address(ipv4))
ipv6 = "fe80::204:61ff:fe9d:f156"
self.assertEqual(ipv6, validate_and_normalize_address(ipv6.upper()))
hostname = "test.test.com"
self.assertEqual(hostname,
validate_and_normalize_address(hostname.upper()))
hostname = "$blah#"
self.assertRaises(ValueError,
validate_and_normalize_address, hostname)
def test_parse_search_value(self):
res = parse_search_value('r0')
self.assertEqual(res, {'region': 0})
@ -107,6 +229,8 @@ class TestUtils(unittest.TestCase):
self.assertEqual(res, {'zone': 1})
res = parse_search_value('-127.0.0.1')
self.assertEqual(res, {'ip': '127.0.0.1'})
res = parse_search_value('127.0.0.1')
self.assertEqual(res, {'ip': '127.0.0.1'})
res = parse_search_value('-[127.0.0.1]:10001')
self.assertEqual(res, {'ip': '127.0.0.1', 'port': 10001})
res = parse_search_value(':10001')
@ -124,22 +248,268 @@ class TestUtils(unittest.TestCase):
self.assertEqual(res, {'meta': 'meta1'})
self.assertRaises(ValueError, parse_search_value, 'OMGPONIES')
def test_replication_defaults(self):
args = '-r 1 -z 1 -i 127.0.0.1 -p 6010 -d d1 -w 100'.split()
opts, _ = parse_args(args)
device = build_dev_from_opts(opts)
def test_parse_search_values_from_opts(self):
argv = \
["--id", "1", "--region", "2", "--zone", "3",
"--ip", "test.test.com",
"--port", "6000",
"--replication-ip", "r.test.com",
"--replication-port", "7000",
"--device", "sda3",
"--meta", "some meta data",
"--weight", "3.14159265359",
"--change-ip", "change.test.test.com",
"--change-port", "6001",
"--change-replication-ip", "change.r.test.com",
"--change-replication-port", "7001",
"--change-device", "sdb3",
"--change-meta", "some meta data for change"]
expected = {
'device': 'd1',
'ip': '127.0.0.1',
'meta': '',
'port': 6010,
'region': 1,
'replication_ip': '127.0.0.1',
'replication_port': 6010,
'weight': 100.0,
'zone': 1,
'id': 1,
'region': 2,
'zone': 3,
'ip': "test.test.com",
'port': 6000,
'replication_ip': "r.test.com",
'replication_port': 7000,
'device': "sda3",
'meta': "some meta data",
'weight': 3.14159265359,
}
self.assertEquals(device, expected)
new_cmd_format, opts, args = validate_args(argv)
search_values = parse_search_values_from_opts(opts)
self.assertEquals(search_values, expected)
argv = \
["--id", "1", "--region", "2", "--zone", "3",
"--ip", "127.0.0.1",
"--port", "6000",
"--replication-ip", "127.0.0.10",
"--replication-port", "7000",
"--device", "sda3",
"--meta", "some meta data",
"--weight", "3.14159265359",
"--change-ip", "127.0.0.2",
"--change-port", "6001",
"--change-replication-ip", "127.0.0.20",
"--change-replication-port", "7001",
"--change-device", "sdb3",
"--change-meta", "some meta data for change"]
expected = {
'id': 1,
'region': 2,
'zone': 3,
'ip': "127.0.0.1",
'port': 6000,
'replication_ip': "127.0.0.10",
'replication_port': 7000,
'device': "sda3",
'meta': "some meta data",
'weight': 3.14159265359,
}
new_cmd_format, opts, args = validate_args(argv)
search_values = parse_search_values_from_opts(opts)
self.assertEquals(search_values, expected)
argv = \
["--id", "1", "--region", "2", "--zone", "3",
"--ip", "[127.0.0.1]",
"--port", "6000",
"--replication-ip", "[127.0.0.10]",
"--replication-port", "7000",
"--device", "sda3",
"--meta", "some meta data",
"--weight", "3.14159265359",
"--change-ip", "[127.0.0.2]",
"--change-port", "6001",
"--change-replication-ip", "[127.0.0.20]",
"--change-replication-port", "7001",
"--change-device", "sdb3",
"--change-meta", "some meta data for change"]
new_cmd_format, opts, args = validate_args(argv)
search_values = parse_search_values_from_opts(opts)
self.assertEquals(search_values, expected)
def test_parse_change_values_from_opts(self):
argv = \
["--id", "1", "--region", "2", "--zone", "3",
"--ip", "test.test.com",
"--port", "6000",
"--replication-ip", "r.test.com",
"--replication-port", "7000",
"--device", "sda3",
"--meta", "some meta data",
"--weight", "3.14159265359",
"--change-ip", "change.test.test.com",
"--change-port", "6001",
"--change-replication-ip", "change.r.test.com",
"--change-replication-port", "7001",
"--change-device", "sdb3",
"--change-meta", "some meta data for change"]
expected = {
'ip': "change.test.test.com",
'port': 6001,
'replication_ip': "change.r.test.com",
'replication_port': 7001,
'device': "sdb3",
'meta': "some meta data for change",
}
new_cmd_format, opts, args = validate_args(argv)
search_values = parse_change_values_from_opts(opts)
self.assertEquals(search_values, expected)
argv = \
["--id", "1", "--region", "2", "--zone", "3",
"--ip", "127.0.0.1",
"--port", "6000",
"--replication-ip", "127.0.0.10",
"--replication-port", "7000",
"--device", "sda3",
"--meta", "some meta data",
"--weight", "3.14159265359",
"--change-ip", "127.0.0.2",
"--change-port", "6001",
"--change-replication-ip", "127.0.0.20",
"--change-replication-port", "7001",
"--change-device", "sdb3",
"--change-meta", "some meta data for change"]
expected = {
'ip': "127.0.0.2",
'port': 6001,
'replication_ip': "127.0.0.20",
'replication_port': 7001,
'device': "sdb3",
'meta': "some meta data for change",
}
new_cmd_format, opts, args = validate_args(argv)
search_values = parse_change_values_from_opts(opts)
self.assertEquals(search_values, expected)
argv = \
["--id", "1", "--region", "2", "--zone", "3",
"--ip", "[127.0.0.1]",
"--port", "6000",
"--replication-ip", "[127.0.0.10]",
"--replication-port", "7000",
"--device", "sda3",
"--meta", "some meta data",
"--weight", "3.14159265359",
"--change-ip", "[127.0.0.2]",
"--change-port", "6001",
"--change-replication-ip", "[127.0.0.20]",
"--change-replication-port", "7001",
"--change-device", "sdb3",
"--change-meta", "some meta data for change"]
new_cmd_format, opts, args = validate_args(argv)
search_values = parse_change_values_from_opts(opts)
self.assertEquals(search_values, expected)
def test_validate_args(self):
argv = \
["--id", "1", "--region", "2", "--zone", "3",
"--ip", "test.test.com",
"--port", "6000",
"--replication-ip", "r.test.com",
"--replication-port", "7000",
"--device", "sda3",
"--meta", "some meta data",
"--weight", "3.14159265359",
"--change-ip", "change.test.test.com",
"--change-port", "6001",
"--change-replication-ip", "change.r.test.com",
"--change-replication-port", "7001",
"--change-device", "sdb3",
"--change-meta", "some meta data for change"]
new_cmd_format, opts, args = validate_args(argv)
self.assertTrue(new_cmd_format)
self.assertEqual(opts.id, 1)
self.assertEqual(opts.region, 2)
self.assertEqual(opts.zone, 3)
self.assertEqual(opts.ip, "test.test.com")
self.assertEqual(opts.port, 6000)
self.assertEqual(opts.replication_ip, "r.test.com")
self.assertEqual(opts.replication_port, 7000)
self.assertEqual(opts.device, "sda3")
self.assertEqual(opts.meta, "some meta data")
self.assertEqual(opts.weight, 3.14159265359)
self.assertEqual(opts.change_ip, "change.test.test.com")
self.assertEqual(opts.change_port, 6001)
self.assertEqual(opts.change_replication_ip, "change.r.test.com")
self.assertEqual(opts.change_replication_port, 7001)
self.assertEqual(opts.change_device, "sdb3")
self.assertEqual(opts.change_meta, "some meta data for change")
argv = \
["--id", "0", "--region", "0", "--zone", "0",
"--ip", "",
"--port", "0",
"--replication-ip", "",
"--replication-port", "0",
"--device", "",
"--meta", "",
"--weight", "0",
"--change-ip", "",
"--change-port", "0",
"--change-replication-ip", "",
"--change-replication-port", "0",
"--change-device", "",
"--change-meta", ""]
new_cmd_format, opts, args = validate_args(argv)
self.assertFalse(new_cmd_format)
argv = \
["--id", "0", "--region", "0", "--zone", "0",
"--ip", "",
"--port", "0",
"--replication-ip", "",
"--replication-port", "0",
"--device", "",
"--meta", "",
"--weight", "0",
"--change-ip", "change.test.test.com",
"--change-port", "6001",
"--change-replication-ip", "change.r.test.com",
"--change-replication-port", "7001",
"--change-device", "sdb3",
"--change-meta", "some meta data for change"]
new_cmd_format, opts, args = validate_args(argv)
self.assertFalse(new_cmd_format)
def test_parse_args(self):
argv = \
["--id", "1", "--region", "2", "--zone", "3",
"--ip", "test.test.com",
"--port", "6000",
"--replication-ip", "r.test.com",
"--replication-port", "7000",
"--device", "sda3",
"--meta", "some meta data",
"--weight", "3.14159265359",
"--change-ip", "change.test.test.com",
"--change-port", "6001",
"--change-replication-ip", "change.r.test.com",
"--change-replication-port", "7001",
"--change-device", "sdb3",
"--change-meta", "some meta data for change"]
opts, args = parse_args(argv)
self.assertEqual(opts.id, 1)
self.assertEqual(opts.region, 2)
self.assertEqual(opts.zone, 3)
self.assertEqual(opts.ip, "test.test.com")
self.assertEqual(opts.port, 6000)
self.assertEqual(opts.replication_ip, "r.test.com")
self.assertEqual(opts.replication_port, 7000)
self.assertEqual(opts.device, "sda3")
self.assertEqual(opts.meta, "some meta data")
self.assertEqual(opts.weight, 3.14159265359)
self.assertEqual(opts.change_ip, "change.test.test.com")
self.assertEqual(opts.change_port, 6001)
self.assertEqual(opts.change_replication_ip, "change.r.test.com")
self.assertEqual(opts.change_replication_port, 7001)
self.assertEqual(opts.change_device, "sdb3")
self.assertEqual(opts.change_meta, "some meta data for change")
self.assertEqual(len(args), 0)
def test_parse_builder_ring_filename_args(self):
args = 'swift-ring-builder object.builder write_ring'
@ -160,33 +530,147 @@ class TestUtils(unittest.TestCase):
'my.file.name', 'my.file.name.ring.gz'
), parse_builder_ring_filename_args(args.split()))
def test_find_parts(self):
def test_build_dev_from_opts(self):
argv = \
["--region", "2", "--zone", "3",
"--ip", "test.test.com",
"--port", "6000",
"--replication-ip", "r.test.com",
"--replication-port", "7000",
"--device", "sda3",
"--meta", "some meta data",
"--weight", "3.14159265359"]
expected = {
'region': 2,
'zone': 3,
'ip': "test.test.com",
'port': 6000,
'replication_ip': "r.test.com",
'replication_port': 7000,
'device': "sda3",
'meta': "some meta data",
'weight': 3.14159265359,
}
opts, args = parse_args(argv)
device = build_dev_from_opts(opts)
self.assertEquals(device, expected)
argv = \
["--region", "2", "--zone", "3",
"--ip", "[test.test.com]",
"--port", "6000",
"--replication-ip", "[r.test.com]",
"--replication-port", "7000",
"--device", "sda3",
"--meta", "some meta data",
"--weight", "3.14159265359"]
opts, args = parse_args(argv)
self.assertRaises(ValueError, build_dev_from_opts, opts)
argv = \
["--region", "2", "--zone", "3",
"--ip", "[test.test.com]",
"--port", "6000",
"--replication-ip", "[r.test.com]",
"--replication-port", "7000",
"--meta", "some meta data",
"--weight", "3.14159265359"]
opts, args = parse_args(argv)
self.assertRaises(ValueError, build_dev_from_opts, opts)
def test_replication_defaults(self):
args = '-r 1 -z 1 -i 127.0.0.1 -p 6010 -d d1 -w 100'.split()
opts, _ = parse_args(args)
device = build_dev_from_opts(opts)
expected = {
'device': 'd1',
'ip': '127.0.0.1',
'meta': '',
'port': 6010,
'region': 1,
'replication_ip': '127.0.0.1',
'replication_port': 6010,
'weight': 100.0,
'zone': 1,
}
self.assertEquals(device, expected)
args = '-r 1 -z 1 -i test.com -p 6010 -d d1 -w 100'.split()
opts, _ = parse_args(args)
device = build_dev_from_opts(opts)
expected = {
'device': 'd1',
'ip': 'test.com',
'meta': '',
'port': 6010,
'region': 1,
'replication_ip': 'test.com',
'replication_port': 6010,
'weight': 100.0,
'zone': 1,
}
self.assertEquals(device, expected)
def test_dispersion_report(self):
rb = ring.RingBuilder(8, 3, 0)
rb.add_dev({'id': 0, 'region': 1, 'zone': 0, 'weight': 100,
'ip': '127.0.0.1', 'port': 10000, 'device': 'sda1'})
rb.add_dev({'id': 1, 'region': 1, 'zone': 1, 'weight': 100,
rb.add_dev({'id': 1, 'region': 1, 'zone': 1, 'weight': 200,
'ip': '127.0.0.1', 'port': 10001, 'device': 'sda1'})
rb.add_dev({'id': 2, 'region': 1, 'zone': 2, 'weight': 100,
rb.add_dev({'id': 2, 'region': 1, 'zone': 1, 'weight': 200,
'ip': '127.0.0.1', 'port': 10002, 'device': 'sda1'})
rb.rebalance()
rb.rebalance(seed=10)
rb.add_dev({'id': 3, 'region': 2, 'zone': 1, 'weight': 10,
'ip': '127.0.0.1', 'port': 10004, 'device': 'sda1'})
rb.pretend_min_part_hours_passed()
rb.rebalance()
self.assertEqual(rb.dispersion, 39.84375)
report = dispersion_report(rb)
self.assertEqual(report['worst_tier'], 'r1z1')
self.assertEqual(report['max_dispersion'], 39.84375)
argv = ['swift-ring-builder', 'object.builder',
'list_parts', '127.0.0.1']
sorted_partition_count = find_parts(rb, argv)
# Each node should store 256 partitions to avoid multiple replicas
# 2/5 of total weight * 768 ~= 307 -> 51 partitions on each node in
# zone 1 are stored at least twice on the nodes
expected = [
['r1z1', 2, '0', '154', '102'],
['r1z1-127.0.0.1:10001', 1, '205', '51', '0'],
['r1z1-127.0.0.1:10001/sda1', 1, '205', '51', '0'],
['r1z1-127.0.0.1:10002', 1, '205', '51', '0'],
['r1z1-127.0.0.1:10002/sda1', 1, '205', '51', '0']]
# Expect 256 partitions in the output
self.assertEqual(256, len(sorted_partition_count))
def build_tier_report(max_replicas, placed_parts, dispersion,
replicas):
return {
'max_replicas': max_replicas,
'placed_parts': placed_parts,
'dispersion': dispersion,
'replicas': replicas,
}
expected = [
['r1z1', build_tier_report(
2, 256, 39.84375, [0, 0, 154, 102])],
['r1z1-127.0.0.1:10001', build_tier_report(
1, 256, 19.921875, [0, 205, 51, 0])],
['r1z1-127.0.0.1:10001/sda1', build_tier_report(
1, 256, 19.921875, [0, 205, 51, 0])],
['r1z1-127.0.0.1:10002', build_tier_report(
1, 256, 19.921875, [0, 205, 51, 0])],
['r1z1-127.0.0.1:10002/sda1', build_tier_report(
1, 256, 19.921875, [0, 205, 51, 0])],
]
report = dispersion_report(rb, 'r1z1.*', verbose=True)
graph = report['graph']
for i in range(len(expected)):
self.assertEqual(expected[i][0], graph[i][0])
self.assertEqual(expected[i][1], graph[i][1])
# Each partitions should have 3 replicas
for partition, count in sorted_partition_count:
self.assertEqual(
3, count, "Partition %d has only %d replicas" %
(partition, count))
# overcompensate in r1z0
rb.add_dev({'id': 3, 'region': 1, 'zone': 0, 'weight': 500,
'ip': '127.0.0.1', 'port': 10003, 'device': 'sda1'})
rb.rebalance(seed=10)
report = dispersion_report(rb)
self.assertEqual(rb.dispersion, 40.234375)
self.assertEqual(report['worst_tier'], 'r1z0-127.0.0.1:10003')
self.assertEqual(report['max_dispersion'], 30.078125)
if __name__ == '__main__':

View File

@ -0,0 +1,122 @@
# Copyright (c) 2010-2015 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import os
from swift.common.base_storage_server import BaseStorageServer
from tempfile import mkdtemp
from swift import __version__ as swift_version
from swift.common.swob import Request
from swift.common.utils import get_logger, public
from shutil import rmtree
class FakeOPTIONS(BaseStorageServer):
server_type = 'test-server'
def __init__(self, conf, logger=None):
super(FakeOPTIONS, self).__init__(conf)
self.logger = logger or get_logger(conf, log_route='test-server')
class FakeANOTHER(FakeOPTIONS):
@public
def ANOTHER(self):
"""this is to test adding to allowed_methods"""
pass
class TestBaseStorageServer(unittest.TestCase):
"""Test swift.common.base_storage_server"""
def setUp(self):
self.tmpdir = mkdtemp()
self.testdir = os.path.join(self.tmpdir,
'tmp_test_base_storage_server')
def tearDown(self):
"""Tear down for testing swift.common.base_storage_server"""
rmtree(self.tmpdir)
def test_server_type(self):
conf = {'devices': self.testdir, 'mount_check': 'false'}
baseserver = BaseStorageServer(conf)
msg = 'Storage nodes have not implemented the Server type.'
try:
baseserver.server_type
except NotImplementedError as e:
self.assertEquals(e.message, msg)
def test_allowed_methods(self):
conf = {'devices': self.testdir, 'mount_check': 'false',
'replication_server': 'false'}
# test what's available in the base class
allowed_methods_test = FakeOPTIONS(conf).allowed_methods
self.assertEquals(allowed_methods_test, ['OPTIONS'])
# test that a subclass can add allowed methods
allowed_methods_test = FakeANOTHER(conf).allowed_methods
allowed_methods_test.sort()
self.assertEquals(allowed_methods_test, ['ANOTHER', 'OPTIONS'])
conf = {'devices': self.testdir, 'mount_check': 'false',
'replication_server': 'true'}
# test what's available in the base class
allowed_methods_test = FakeOPTIONS(conf).allowed_methods
self.assertEquals(allowed_methods_test, [])
# test that a subclass can add allowed methods
allowed_methods_test = FakeANOTHER(conf).allowed_methods
self.assertEquals(allowed_methods_test, [])
conf = {'devices': self.testdir, 'mount_check': 'false'}
# test what's available in the base class
allowed_methods_test = FakeOPTIONS(conf).allowed_methods
self.assertEquals(allowed_methods_test, ['OPTIONS'])
# test that a subclass can add allowed methods
allowed_methods_test = FakeANOTHER(conf).allowed_methods
allowed_methods_test.sort()
self.assertEquals(allowed_methods_test, ['ANOTHER', 'OPTIONS'])
def test_OPTIONS_error(self):
msg = 'Storage nodes have not implemented the Server type.'
conf = {'devices': self.testdir, 'mount_check': 'false',
'replication_server': 'false'}
baseserver = BaseStorageServer(conf)
req = Request.blank('/sda1/p/a/c/o', {'REQUEST_METHOD': 'OPTIONS'})
req.content_length = 0
try:
baseserver.OPTIONS(req)
except NotImplementedError as e:
self.assertEquals(e.message, msg)
def test_OPTIONS(self):
conf = {'devices': self.testdir, 'mount_check': 'false',
'replication_server': 'false'}
req = Request.blank('/sda1/p/a/c/o', {'REQUEST_METHOD': 'OPTIONS'})
req.content_length = 0
resp = FakeOPTIONS(conf).OPTIONS(req)
self.assertEquals(resp.headers['Allow'], 'OPTIONS')
self.assertEquals(resp.headers['Server'],
'test-server/' + swift_version)

View File

@ -28,6 +28,7 @@ import mock
import random
import re
import socket
import stat
import sys
import json
import math
@ -55,7 +56,7 @@ from mock import MagicMock, patch
from swift.common.exceptions import (Timeout, MessageTimeout,
ConnectionTimeout, LockTimeout,
ReplicationLockTimeout,
MimeInvalid)
MimeInvalid, ThreadPoolDead)
from swift.common import utils
from swift.common.container_sync_realms import ContainerSyncRealms
from swift.common.swob import Request, Response
@ -1454,6 +1455,15 @@ class TestUtils(unittest.TestCase):
self.assertEquals(utils.storage_directory('objects', '1', 'ABCDEF'),
'objects/1/DEF/ABCDEF')
def test_expand_ipv6(self):
expanded_ipv6 = "fe80::204:61ff:fe9d:f156"
upper_ipv6 = "fe80:0000:0000:0000:0204:61ff:fe9d:f156"
self.assertEqual(expanded_ipv6, utils.expand_ipv6(upper_ipv6))
omit_ipv6 = "fe80:0000:0000::0204:61ff:fe9d:f156"
self.assertEqual(expanded_ipv6, utils.expand_ipv6(omit_ipv6))
less_num_ipv6 = "fe80:0:00:000:0204:61ff:fe9d:f156"
self.assertEqual(expanded_ipv6, utils.expand_ipv6(less_num_ipv6))
def test_whataremyips(self):
myips = utils.whataremyips()
self.assert_(len(myips) > 1)
@ -2935,9 +2945,9 @@ class TestSwiftInfo(unittest.TestCase):
utils._swift_info = {'swift': {'foo': 'bar'},
'cap1': cap1}
# expect no exceptions
info = utils.get_swift_info(disallowed_sections=
['cap2.cap1_foo', 'cap1.no_match',
'cap1.cap1_foo.no_match.no_match'])
info = utils.get_swift_info(
disallowed_sections=['cap2.cap1_foo', 'cap1.no_match',
'cap1.cap1_foo.no_match.no_match'])
self.assertEquals(info['cap1'], cap1)
@ -3748,7 +3758,28 @@ class TestStatsdLoggingDelegation(unittest.TestCase):
self.assertEquals(called, [12345])
class TestThreadpool(unittest.TestCase):
class TestThreadPool(unittest.TestCase):
def setUp(self):
self.tp = None
def tearDown(self):
if self.tp:
self.tp.terminate()
def _pipe_count(self):
# Counts the number of pipes that this process owns.
fd_dir = "/proc/%d/fd" % os.getpid()
def is_pipe(path):
try:
stat_result = os.stat(path)
return stat.S_ISFIFO(stat_result.st_mode)
except OSError:
return False
return len([fd for fd in os.listdir(fd_dir)
if is_pipe(os.path.join(fd_dir, fd))])
def _thread_id(self):
return threading.current_thread().ident
@ -3760,7 +3791,7 @@ class TestThreadpool(unittest.TestCase):
return int('fishcakes')
def test_run_in_thread_with_threads(self):
tp = utils.ThreadPool(1)
tp = self.tp = utils.ThreadPool(1)
my_id = self._thread_id()
other_id = tp.run_in_thread(self._thread_id)
@ -3779,7 +3810,7 @@ class TestThreadpool(unittest.TestCase):
def test_force_run_in_thread_with_threads(self):
# with nthreads > 0, force_run_in_thread looks just like run_in_thread
tp = utils.ThreadPool(1)
tp = self.tp = utils.ThreadPool(1)
my_id = self._thread_id()
other_id = tp.force_run_in_thread(self._thread_id)
@ -3829,7 +3860,7 @@ class TestThreadpool(unittest.TestCase):
def alpha():
return beta()
tp = utils.ThreadPool(1)
tp = self.tp = utils.ThreadPool(1)
try:
tp.run_in_thread(alpha)
except ZeroDivisionError:
@ -3847,6 +3878,44 @@ class TestThreadpool(unittest.TestCase):
self.assertEqual(tb_func[1], "run_in_thread")
self.assertEqual(tb_func[0], "test_preserving_stack_trace_from_thread")
def test_terminate(self):
initial_thread_count = threading.activeCount()
initial_pipe_count = self._pipe_count()
tp = utils.ThreadPool(4)
# do some work to ensure any lazy initialization happens
tp.run_in_thread(os.path.join, 'foo', 'bar')
tp.run_in_thread(os.path.join, 'baz', 'quux')
# 4 threads in the ThreadPool, plus one pipe for IPC; this also
# serves as a sanity check that we're actually allocating some
# resources to free later
self.assertEqual(initial_thread_count, threading.activeCount() - 4)
self.assertEqual(initial_pipe_count, self._pipe_count() - 2)
tp.terminate()
self.assertEqual(initial_thread_count, threading.activeCount())
self.assertEqual(initial_pipe_count, self._pipe_count())
def test_cant_run_after_terminate(self):
tp = utils.ThreadPool(0)
tp.terminate()
self.assertRaises(ThreadPoolDead, tp.run_in_thread, lambda: 1)
self.assertRaises(ThreadPoolDead, tp.force_run_in_thread, lambda: 1)
def test_double_terminate_doesnt_crash(self):
tp = utils.ThreadPool(0)
tp.terminate()
tp.terminate()
tp = utils.ThreadPool(1)
tp.terminate()
tp.terminate()
def test_terminate_no_threads_doesnt_crash(self):
tp = utils.ThreadPool(0)
tp.terminate()
class TestAuditLocationGenerator(unittest.TestCase):

View File

@ -31,6 +31,7 @@ import random
from eventlet import spawn, Timeout, listen
import simplejson
from swift import __version__ as swift_version
from swift.common.swob import Request, HeaderKeyDict
import swift.container
from swift.container import server as container_server
@ -302,6 +303,20 @@ class TestContainerController(unittest.TestCase):
resp = req.get_response(self.controller)
self.assertEquals(resp.status_int, 400)
def test_OPTIONS(self):
server_handler = container_server.ContainerController(
{'devices': self.testdir, 'mount_check': 'false'})
req = Request.blank('/sda1/p/a/c/o', {'REQUEST_METHOD': 'OPTIONS'})
req.content_length = 0
resp = server_handler.OPTIONS(req)
self.assertEquals(200, resp.status_int)
for verb in 'OPTIONS GET POST PUT DELETE HEAD REPLICATE'.split():
self.assertTrue(
verb in resp.headers['Allow'].split(', '))
self.assertEquals(len(resp.headers['Allow'].split(', ')), 7)
self.assertEquals(resp.headers['Server'],
(self.controller.server_type + '/' + swift_version))
def test_PUT(self):
req = Request.blank(
'/sda1/p/a/c', environ={'REQUEST_METHOD': 'PUT',
@ -2475,7 +2490,7 @@ class TestContainerController(unittest.TestCase):
method_res = mock.MagicMock()
mock_method = public(lambda x: mock.MagicMock(return_value=method_res))
with mock.patch.object(self.controller, method, new=mock_method):
response = self.controller.__call__(env, start_response)
response = self.controller(env, start_response)
self.assertEqual(response, method_res)
def test_not_allowed_method(self):
@ -2516,6 +2531,38 @@ class TestContainerController(unittest.TestCase):
response = self.controller.__call__(env, start_response)
self.assertEqual(response, answer)
def test_call_incorrect_replication_method(self):
inbuf = StringIO()
errbuf = StringIO()
outbuf = StringIO()
self.controller = container_server.ContainerController(
{'devices': self.testdir, 'mount_check': 'false',
'replication_server': 'true'})
def start_response(*args):
"""Sends args to outbuf"""
outbuf.writelines(args)
obj_methods = ['DELETE', 'PUT', 'HEAD', 'GET', 'POST', 'OPTIONS']
for method in obj_methods:
env = {'REQUEST_METHOD': method,
'SCRIPT_NAME': '',
'PATH_INFO': '/sda1/p/a/c',
'SERVER_NAME': '127.0.0.1',
'SERVER_PORT': '8080',
'SERVER_PROTOCOL': 'HTTP/1.0',
'CONTENT_LENGTH': '0',
'wsgi.version': (1, 0),
'wsgi.url_scheme': 'http',
'wsgi.input': inbuf,
'wsgi.errors': errbuf,
'wsgi.multithread': False,
'wsgi.multiprocess': False,
'wsgi.run_once': False}
self.controller(env, start_response)
self.assertEquals(errbuf.getvalue(), '')
self.assertEquals(outbuf.getvalue()[:4], '405 ')
def test_GET_log_requests_true(self):
self.controller.logger = FakeLogger()
self.controller.log_requests = True

View File

@ -266,8 +266,7 @@ class TestObjectReplicator(unittest.TestCase):
def blowup_mkdirs(path):
raise OSError('Ow!')
mkdirs_orig = object_replicator.mkdirs
try:
with mock.patch.object(object_replicator, 'mkdirs', blowup_mkdirs):
rmtree(self.objects, ignore_errors=1)
object_replicator.mkdirs = blowup_mkdirs
self.replicator.collect_jobs()
@ -280,8 +279,6 @@ class TestObjectReplicator(unittest.TestCase):
self.assertTrue(exc_args[0].startswith('ERROR creating '))
self.assertEquals(exc_kwargs, {})
self.assertEquals(exc_str, 'Ow!')
finally:
object_replicator.mkdirs = mkdirs_orig
def test_collect_jobs(self):
jobs = self.replicator.collect_jobs()
@ -326,64 +323,57 @@ class TestObjectReplicator(unittest.TestCase):
self.assertTrue(jobs[0]['delete'])
self.assertEquals('1', jobs[0]['partition'])
def test_collect_jobs_removes_zbf(self):
"""
After running xfs_repair, a partition directory could become a
zero-byte file. If this happens, collect_jobs() should clean it up and
*not* create a job which will hit an exception as it tries to listdir()
a file.
"""
# Surprise! Partition dir 1 is actually a zero-byte-file
part_1_path = os.path.join(self.objects, '1')
rmtree(part_1_path)
with open(part_1_path, 'w'):
pass
self.assertTrue(os.path.isfile(part_1_path)) # sanity check
part_1_path_1 = os.path.join(self.objects_1, '1')
rmtree(part_1_path_1)
with open(part_1_path_1, 'w'):
pass
self.assertTrue(os.path.isfile(part_1_path_1)) # sanity check
def test_replicator_skips_bogus_partition_dirs(self):
# A directory in the wrong place shouldn't crash the replicator
rmtree(self.objects)
rmtree(self.objects_1)
os.mkdir(self.objects)
os.mkdir(self.objects_1)
os.mkdir(os.path.join(self.objects, "burrito"))
jobs = self.replicator.collect_jobs()
jobs_to_delete = [j for j in jobs if j['delete']]
jobs_by_pol_part = {}
for job in jobs:
jobs_by_pol_part[str(job['policy_idx']) + job['partition']] = job
self.assertEquals(len(jobs_to_delete), 0)
self.assertEquals(
[node['id'] for node in jobs_by_pol_part['00']['nodes']], [1, 2])
self.assertFalse('1' in jobs_by_pol_part)
self.assertEquals(
[node['id'] for node in jobs_by_pol_part['02']['nodes']], [2, 3])
self.assertEquals(
[node['id'] for node in jobs_by_pol_part['03']['nodes']], [3, 1])
self.assertEquals(
[node['id'] for node in jobs_by_pol_part['10']['nodes']], [1, 2])
self.assertFalse('1' in jobs_by_pol_part)
self.assertEquals(
[node['id'] for node in jobs_by_pol_part['12']['nodes']], [2, 3])
self.assertEquals(
[node['id'] for node in jobs_by_pol_part['13']['nodes']], [3, 1])
for part in ['00', '02', '03']:
for node in jobs_by_pol_part[part]['nodes']:
self.assertEquals(node['device'], 'sda')
self.assertEquals(jobs_by_pol_part[part]['path'],
os.path.join(self.objects, part[1:]))
self.assertFalse(os.path.exists(part_1_path))
expected = sorted(self.replicator.logger.log_dict['warning'])
self.assertEqual(len(jobs), 0)
def test_replicator_removes_zbf(self):
# After running xfs_repair, a partition directory could become a
# zero-byte file. If this happens, the replicator should clean it
# up, log something, and move on to the next partition.
# Surprise! Partition dir 1 is actually a zero-byte file.
pol_0_part_1_path = os.path.join(self.objects, '1')
rmtree(pol_0_part_1_path)
with open(pol_0_part_1_path, 'w'):
pass
self.assertTrue(os.path.isfile(pol_0_part_1_path)) # sanity check
# Policy 1's partition dir 1 is also a zero-byte file.
pol_1_part_1_path = os.path.join(self.objects_1, '1')
rmtree(pol_1_part_1_path)
with open(pol_1_part_1_path, 'w'):
pass
self.assertTrue(os.path.isfile(pol_1_part_1_path)) # sanity check
# Don't delete things in collect_jobs(); all the stat() calls would
# make replicator startup really slow.
self.replicator.collect_jobs()
self.assertTrue(os.path.exists(pol_0_part_1_path))
self.assertTrue(os.path.exists(pol_1_part_1_path))
# After a replication pass, the files should be gone
with mock.patch('swift.obj.replicator.http_connect',
mock_http_connect(200)):
self.replicator.run_once()
self.assertFalse(os.path.exists(pol_0_part_1_path))
self.assertFalse(os.path.exists(pol_1_part_1_path))
logged_warnings = sorted(self.replicator.logger.log_dict['warning'])
self.assertEquals(
(('Removing partition directory which was a file: %s',
part_1_path), {}), expected[1])
# policy 1
for part in ['10', '12', '13']:
for node in jobs_by_pol_part[part]['nodes']:
self.assertEquals(node['device'], 'sda')
self.assertEquals(jobs_by_pol_part[part]['path'],
os.path.join(self.objects_1, part[1:]))
self.assertFalse(os.path.exists(part_1_path_1))
pol_1_part_1_path), {}), logged_warnings[0])
self.assertEquals(
(('Removing partition directory which was a file: %s',
part_1_path_1), {}), expected[0])
pol_0_part_1_path), {}), logged_warnings[1])
def test_delete_partition(self):
with mock.patch('swift.obj.replicator.http_connect',
@ -558,6 +548,27 @@ class TestObjectReplicator(unittest.TestCase):
override_partitions=['1'])
self.assertFalse(os.access(part_path, os.F_OK))
def test_delete_policy_override_params(self):
df0 = self.df_mgr.get_diskfile('sda', '99', 'a', 'c', 'o')
df1 = self.df_mgr.get_diskfile('sda', '99', 'a', 'c', 'o',
policy_idx=1)
mkdirs(df0._datadir)
mkdirs(df1._datadir)
pol0_part_path = os.path.join(self.objects, '99')
pol1_part_path = os.path.join(self.objects_1, '99')
# sanity checks
self.assertTrue(os.access(pol0_part_path, os.F_OK))
self.assertTrue(os.access(pol1_part_path, os.F_OK))
# a bogus policy index doesn't bother the replicator any more than a
# bogus device or partition does
self.replicator.run_once(policies='1,2,5')
self.assertFalse(os.access(pol1_part_path, os.F_OK))
self.assertTrue(os.access(pol0_part_path, os.F_OK))
def test_run_once_recover_from_failure(self):
conf = dict(swift_dir=self.testdir, devices=self.devices,
mount_check='false', timeout='300', stats_interval='1')

View File

@ -39,6 +39,7 @@ from eventlet.green import httplib
from nose import SkipTest
from swift import __version__ as swift_version
from test.unit import FakeLogger, debug_logger, mocked_http_conn
from test.unit import connect_tcp, readuntil2crlfs, patch_policies
from swift.obj import server as object_server
@ -1351,6 +1352,23 @@ class TestObjectController(unittest.TestCase):
os.path.basename(os.path.dirname(disk_file._data_file)))
self.assertEquals(os.listdir(quar_dir)[0], file_name)
def test_OPTIONS(self):
conf = {'devices': self.testdir, 'mount_check': 'false'}
server_handler = object_server.ObjectController(
conf, logger=debug_logger())
req = Request.blank('/sda1/p/a/c/o', {'REQUEST_METHOD': 'OPTIONS'})
req.content_length = 0
resp = server_handler.OPTIONS(req)
self.assertEquals(200, resp.status_int)
print resp.headers['Allow']
for verb in 'OPTIONS GET POST PUT DELETE HEAD REPLICATE \
RUGGEDIZE'.split():
self.assertTrue(
verb in resp.headers['Allow'].split(', '))
self.assertEquals(len(resp.headers['Allow'].split(', ')), 8)
self.assertEquals(resp.headers['Server'],
(server_handler.server_type + '/' + swift_version))
def test_GET(self):
# Test swift.obj.server.ObjectController.GET
req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'GET'})
@ -4341,7 +4359,7 @@ class TestObjectController(unittest.TestCase):
mock.MagicMock(return_value=method_res))
with mock.patch.object(self.object_controller, method,
new=mock_method):
response = self.object_controller.__call__(env, start_response)
response = self.object_controller(env, start_response)
self.assertEqual(response, method_res)
def test_not_allowed_method(self):
@ -4398,6 +4416,38 @@ class TestObjectController(unittest.TestCase):
' 1234',),
{})])
def test_call_incorrect_replication_method(self):
inbuf = StringIO()
errbuf = StringIO()
outbuf = StringIO()
self.object_controller = object_server.ObjectController(
{'devices': self.testdir, 'mount_check': 'false',
'replication_server': 'true'}, logger=FakeLogger())
def start_response(*args):
"""Sends args to outbuf"""
outbuf.writelines(args)
obj_methods = ['DELETE', 'PUT', 'HEAD', 'GET', 'POST', 'OPTIONS']
for method in obj_methods:
env = {'REQUEST_METHOD': method,
'SCRIPT_NAME': '',
'PATH_INFO': '/sda1/p/a/c',
'SERVER_NAME': '127.0.0.1',
'SERVER_PORT': '8080',
'SERVER_PROTOCOL': 'HTTP/1.0',
'CONTENT_LENGTH': '0',
'wsgi.version': (1, 0),
'wsgi.url_scheme': 'http',
'wsgi.input': inbuf,
'wsgi.errors': errbuf,
'wsgi.multithread': False,
'wsgi.multiprocess': False,
'wsgi.run_once': False}
self.object_controller(env, start_response)
self.assertEquals(errbuf.getvalue(), '')
self.assertEquals(outbuf.getvalue()[:4], '405 ')
def test_not_utf8_and_not_logging_requests(self):
inbuf = WsgiStringIO()
errbuf = StringIO()

View File

@ -527,6 +527,15 @@ class TestFuncs(unittest.TestCase):
self.assertEquals(resp['meta']['whatevs'], 14)
self.assertEquals(resp['meta']['somethingelse'], 0)
def test_headers_to_object_info_sys_meta(self):
prefix = get_sys_meta_prefix('object')
headers = {'%sWhatevs' % prefix: 14,
'%ssomethingelse' % prefix: 0}
resp = headers_to_object_info(headers.items(), 200)
self.assertEquals(len(resp['sysmeta']), 2)
self.assertEquals(resp['sysmeta']['whatevs'], 14)
self.assertEquals(resp['sysmeta']['somethingelse'], 0)
def test_headers_to_object_info_values(self):
headers = {
'content-length': '1024',
@ -622,7 +631,8 @@ class TestFuncs(unittest.TestCase):
req = Request.blank('/v1/a/c/o', headers=src_headers)
dst_headers = base.generate_request_headers(req, transfer=True)
expected_headers = {'x-base-meta-owner': '',
'x-base-meta-size': '151M'}
'x-base-meta-size': '151M',
'connection': 'close'}
for k, v in expected_headers.iteritems():
self.assertTrue(k in dst_headers)
self.assertEqual(v, dst_headers[k])

View File

@ -543,12 +543,8 @@ class TestObjController(unittest.TestCase):
'/v1/a/c/o', method='PUT', headers={
'Content-Length': 0,
'X-Timestamp': put_timestamp})
ts_iter = itertools.repeat(put_timestamp)
head_resp = [404] * self.obj_ring.replicas + \
[404] * self.obj_ring.max_more_nodes
put_resp = [201] * self.obj_ring.replicas
codes = head_resp + put_resp
with set_http_connect(*codes, timestamps=ts_iter):
codes = [201] * self.obj_ring.replicas
with set_http_connect(*codes):
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 201)
@ -562,9 +558,7 @@ class TestObjController(unittest.TestCase):
'Content-Length': 0,
'X-Timestamp': put_timestamp})
ts_iter = itertools.repeat(put_timestamp)
head_resp = [200] * self.obj_ring.replicas + \
[404] * self.obj_ring.max_more_nodes
codes = head_resp
codes = [409] * self.obj_ring.replicas
with set_http_connect(*codes, timestamps=ts_iter):
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 202)
@ -579,9 +573,7 @@ class TestObjController(unittest.TestCase):
'Content-Length': 0,
'X-Timestamp': ts.next().internal})
ts_iter = itertools.repeat(ts.next().internal)
head_resp = [200] * self.obj_ring.replicas + \
[404] * self.obj_ring.max_more_nodes
codes = head_resp
codes = [409] * self.obj_ring.replicas
with set_http_connect(*codes, timestamps=ts_iter):
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 202)
@ -596,10 +588,7 @@ class TestObjController(unittest.TestCase):
'Content-Length': 0,
'X-Timestamp': ts.next().internal})
ts_iter = itertools.repeat(orig_timestamp)
head_resp = [200] * self.obj_ring.replicas + \
[404] * self.obj_ring.max_more_nodes
put_resp = [201] * self.obj_ring.replicas
codes = head_resp + put_resp
codes = [201] * self.obj_ring.replicas
with set_http_connect(*codes, timestamps=ts_iter):
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 201)
@ -623,13 +612,53 @@ class TestObjController(unittest.TestCase):
'/v1/a/c/o', method='PUT', headers={
'Content-Length': 0,
'X-Timestamp': ts.next().internal})
head_resp = [404] * self.obj_ring.replicas + \
[404] * self.obj_ring.max_more_nodes
put_resp = [409] + [201] * (self.obj_ring.replicas - 1)
codes = head_resp + put_resp
with set_http_connect(*codes):
ts_iter = iter([ts.next().internal, None, None])
codes = [409] + [201] * (self.obj_ring.replicas - 1)
with set_http_connect(*codes, timestamps=ts_iter):
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 201)
self.assertEqual(resp.status_int, 202)
def test_container_sync_put_x_timestamp_race(self):
ts = (utils.Timestamp(t) for t in itertools.count(int(time.time())))
test_indexes = [None] + [int(p) for p in POLICIES]
for policy_index in test_indexes:
put_timestamp = ts.next().internal
req = swob.Request.blank(
'/v1/a/c/o', method='PUT', headers={
'Content-Length': 0,
'X-Timestamp': put_timestamp})
# object nodes they respond 409 because another in-flight request
# finished and now the on disk timestamp is equal to the request.
put_ts = [put_timestamp] * self.obj_ring.replicas
codes = [409] * self.obj_ring.replicas
ts_iter = iter(put_ts)
with set_http_connect(*codes, timestamps=ts_iter):
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 202)
def test_container_sync_put_x_timestamp_unsynced_race(self):
ts = (utils.Timestamp(t) for t in itertools.count(int(time.time())))
test_indexes = [None] + [int(p) for p in POLICIES]
for policy_index in test_indexes:
put_timestamp = ts.next().internal
req = swob.Request.blank(
'/v1/a/c/o', method='PUT', headers={
'Content-Length': 0,
'X-Timestamp': put_timestamp})
# only one in-flight request finished
put_ts = [None] * (self.obj_ring.replicas - 1)
put_resp = [201] * (self.obj_ring.replicas - 1)
put_ts += [put_timestamp]
put_resp += [409]
ts_iter = iter(put_ts)
codes = put_resp
with set_http_connect(*codes, timestamps=ts_iter):
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 202)
def test_COPY_simple(self):
req = swift.common.swob.Request.blank(