Merge branch 'master' into feature/ec

Change-Id: Ic255786a8470fdd9b9802656017d97b61e0e159a
This commit is contained in:
John Dickinson 2014-07-08 10:35:59 -07:00
commit fd19daee7f
55 changed files with 1163 additions and 493 deletions

View File

@ -520,12 +520,14 @@ log_name object-auditor Label used when logging
log_facility LOG_LOCAL0 Syslog log facility
log_level INFO Logging level
log_time 3600 Frequency of status logs in seconds.
files_per_second 20 Maximum files audited per second. Should
be tuned according to individual system
specs. 0 is unlimited.
bytes_per_second 10000000 Maximum bytes audited per second. Should
be tuned according to individual system
specs. 0 is unlimited.
files_per_second 20 Maximum files audited per second per
auditor process. Should be tuned according
to individual system specs. 0 is unlimited.
bytes_per_second 10000000 Maximum bytes audited per second per
auditor process. Should be tuned according
to individual system specs. 0 is unlimited.
concurrency 1 The number of parallel processes to use
for checksum auditing.
================== ============== ==========================================
------------------------------

View File

@ -162,6 +162,11 @@ Getting the code
cd $HOME/python-swiftclient; sudo python setup.py develop; cd -
Ubuntu 12.04 users need to install python-swiftclient's dependencies before the installation of
python-swiftclient. This is due to a bug in an older version of setup tools::
cd $HOME/python-swiftclient; sudo pip install -r requirements.txt; sudo python setup.py develop; cd -
#. Check out the swift repo::
git clone https://github.com/openstack/swift.git

View File

@ -78,8 +78,8 @@ awareness of storage policies to use them; once a container has been created
with a specific policy, all objects stored in it will be done so in accordance
with that policy.
Storage Policies are not implemented as a separate code module but are a core
abstraction of Swift architecture.
The Storage Policies feature is implemented throughout the entire code base so
it is an important concept in understanding Swift architecture.
-------------
Object Server

View File

@ -3,9 +3,9 @@ Storage Policies
================
Storage Policies allow for some level of segmenting the cluster for various
purposes through the creation of multiple object rings. Storage Policies are
not implemented as a separate code module but are an important concept in
understanding Swift architecture.
purposes through the creation of multiple object rings. The Storage Policies
feature is implemented throughout the entire code base so it is an important
concept in understanding Swift architecture.
As described in :doc:`overview_ring`, Swift uses modified hashing rings to
determine where data should reside in the cluster. There is a separate ring
@ -61,10 +61,10 @@ Policy-0 is considered the default). We will be covering the difference
between default and Policy-0 in the next section.
Policies are assigned when a container is created. Once a container has been
assigned a policy, it cannot be changed until the container is deleted. The implications
assigned a policy, it cannot be changed (unless it is deleted/recreated). The implications
on data placement/movement for large datasets would make this a task best left for
applications to perform. Therefore, if a container has an existing policy of,
for example 3x replication, and one wanted to migrate that data to a policy that specifies,
for example 3x replication, and one wanted to migrate that data to a policy that specifies
a different replication level, the application would create another container
specifying the other policy name and then simply move the data from one container
to the other. Policies apply on a per container basis allowing for minimal application
@ -84,8 +84,8 @@ Storage Policies on either side of a network outage at the same time? Furthermo
what would happen if objects were placed in those containers, a whole bunch of them,
and then later the network outage was restored? Well, without special care it would
be a big problem as an application could end up using the wrong ring to try and find
an object. Luckily there is a solution for this problem, a daemon covered in more
detail later, works tirelessly to identify and rectify this potential scenario.
an object. Luckily there is a solution for this problem, a daemon known as the
Container Reconciler works tirelessly to identify and rectify this potential scenario.
--------------------
Container Reconciler
@ -126,19 +126,19 @@ The object rows are ensured to be fully durable during replication using
the normal container replication. After the container
replicator pushes its object rows to available primary nodes any
misplaced object rows are bulk loaded into containers based off the
object timestamp under the ".misplaced_objects" system account. The
object timestamp under the ``.misplaced_objects`` system account. The
rows are initially written to a handoff container on the local node, and
at the end of the replication pass the .misplaced_object containers are
at the end of the replication pass the ``.misplaced_objects`` containers are
replicated to the correct primary nodes.
The container-reconciler processes the .misplaced_objects containers in
The container-reconciler processes the ``.misplaced_objects`` containers in
descending order and reaps its containers as the objects represented by
the rows are successfully reconciled. The container-reconciler will
always validate the correct storage policy for enqueued objects using
direct container HEAD requests which are accelerated via caching.
Because failure of individual storage nodes in aggregate is assumed to
be common at scale the container-reconciler will make forward progress
be common at scale, the container-reconciler will make forward progress
with a simple quorum majority. During a combination of failures and
rebalances it is possible that a quorum could provide an incomplete
record of the correct storage policy - so an object write may have to be
@ -209,7 +209,7 @@ on disk but no longer accessible) and to provide proper messaging to
applications when a policy needs to be retired, the notion of deprecation is
used. :ref:`configure-policy` describes how to deprecate a policy.
Swift's behavior with deprecated policies will change as follows:
Swift's behavior with deprecated policies is as follows:
* The deprecated policy will not appear in /info
* PUT/GET/DELETE/POST/HEAD are still allowed on the pre-existing containers
@ -221,7 +221,7 @@ Swift's behavior with deprecated policies will change as follows:
.. note::
A policy can not be both the default and deprecated. If you deprecate the
A policy cannot be both the default and deprecated. If you deprecate the
default policy, you must specify a new default.
You can also use the deprecated feature to rollout new policies. If you
@ -268,9 +268,9 @@ are a number of rules enforced by Swift when parsing this file:
* Policy names must be unique
* The policy name 'Policy-0' can only be used for the policy with index 0
* If any policies are defined, exactly one policy must be declared default
* Deprecated policies can not be declared the default
* Deprecated policies cannot be declared the default
The following is an example of a properly configured ''swift.conf'' file. See :doc:`policies_saio`
The following is an example of a properly configured ``swift.conf`` file. See :doc:`policies_saio`
for full instructions on setting up an all-in-one with this example configuration.::
[swift-hash]
@ -327,8 +327,8 @@ for policy 1::
Using Policies
--------------
Using policies is very simple, a policy is only specified when a container is
initially created, there are no other API changes. Creating a container can
Using policies is very simple - a policy is only specified when a container is
initially created. There are no other API changes. Creating a container can
be done without any special policy information::
curl -v -X PUT -H 'X-Auth-Token: <your auth token>' \
@ -344,7 +344,7 @@ If we wanted to explicitly state that we wanted policy 'gold' the command
would simply need to include a new header as shown below::
curl -v -X PUT -H 'X-Auth-Token: <your auth token>' \
-H 'X-Storage-Policy: gold' http://127.0.0.1:8080/v1/AUTH_test/myCont1
-H 'X-Storage-Policy: gold' http://127.0.0.1:8080/v1/AUTH_test/myCont0
And that's it! The application does not need to specify the policy name ever
again. There are some illegal operations however:
@ -388,7 +388,7 @@ collection is made up of policies of class :class:`.StoragePolicy`. The
collection class includes handy functions for getting to a policy either by
name or by index , getting info about the policies, etc. There's also one
very important function, :meth:`~.StoragePolicyCollection.get_object_ring`.
Object rings are now members of the :class:`.StoragePolicy` class and are
Object rings are members of the :class:`.StoragePolicy` class and are
actually not instantiated until the :meth:`~.StoragePolicy.load_ring`
method is called. Any caller anywhere in the code base that needs to access
an object ring must use the :data:`.POLICIES` global singleton to access the
@ -408,7 +408,7 @@ and by importing :func:`.get_container_info` to gain access to the policy
index associated with the container in question. From the index it
can then use the :data:`.POLICIES` singleton to grab the right ring. For example,
:ref:`list_endpoints` is policy aware using the means just described. Another
example is :ref:`recon` which will report the md5 sums for all object rings.
example is :ref:`recon` which will report the md5 sums for all of the rings.
Proxy Server
------------
@ -452,7 +452,7 @@ on this later, but for now be aware of the following directory naming convention
* ``/quarantined/objects-N`` maps to the quarantine directory for policy index #N
Note that these directory names are actually owned by the specific Diskfile
Implementation, the names shown above are used by the default Diskfile.
implementation, the names shown above are used by the default Diskfile.
Object Server
-------------
@ -466,7 +466,7 @@ policy index and leaves the actual directory naming/structure mechanisms to
:class:`.Diskfile` being used will assure that data is properly located in the
tree based on its policy.
For the same reason, the :ref:`object-updater` also is policy aware; as previously
For the same reason, the :ref:`object-updater` also is policy aware. As previously
described, different policies use different async pending directories so the
updater needs to know how to scan them appropriately.
@ -476,7 +476,7 @@ handling a replication job for 2x versus 3x is trivial; however, the difference
handling replication between 3x and erasure code is most definitely not. In
fact, the term 'replication' really isn't appropriate for some policies
like erasure code; however, the majority of the framework for collecting and
processing jobs remains the same. Thus, those functions in the replicator are
processing jobs is common. Thus, those functions in the replicator are
leveraged for all policies and then there is policy specific code required for
each policy, added when the policy is defined if needed.
@ -524,8 +524,8 @@ the old table.
The policy index is stored here for use in reporting information
about the container as well as managing split-brain scenario induced
discrepancies between containers and their storage policies. Furthermore,
during split-brain containers must be prepared to track object updates from
multiple policies, so the object table also includes a
during split-brain, containers must be prepared to track object updates from
multiple policies so the object table also includes a
``storage_policy_index`` column. Per-policy object counts and bytes are
updated in the ``policy_stat`` table using ``INSERT`` and ``DELETE`` triggers
similar to the pre-policy triggers that updated ``container_stat`` directly.
@ -535,7 +535,7 @@ schemas as part of its normal consistency checking process when it updates the
``reconciler_sync_point`` entry in the ``container_info`` table. This ensures
that read heavy containers which do not encounter any writes will still get
migrated to be fully compatible with the post-storage-policy queries without
having to fall-back and retry queries with the legacy schema to service
having to fall back and retry queries with the legacy schema to service
container read requests.
The :ref:`container-sync-daemon` functionality only needs to be policy aware in that it
@ -564,7 +564,7 @@ pre-storage-policy accounts by altering the DB schema and populating the
point in time.
The per-storage-policy object and byte counts are not updated with each object
PUT and DELETE request container updates to the account server is performed
PUT and DELETE request, instead container updates to the account server are performed
asynchronously by the ``swift-container-updater``.
.. _upgrade-policy:
@ -582,6 +582,7 @@ an existing cluster that already has lots of data on it and upgrade to Swift wit
Storage Policies. From there you want to go ahead and create a policy and test a
few things out. All you need to do is:
#. Upgrade all of your Swift nodes to a policy-aware version of Swift
#. Define your policies in ``/etc/swift/swift.conf``
#. Create the corresponding object rings
#. Create containers and objects and confirm their placement is as expected

View File

@ -227,6 +227,7 @@ use = egg:swift#recon
# log_address = /dev/log
#
# files_per_second = 20
# concurrency = 1
# bytes_per_second = 10000000
# log_time = 3600
# zero_byte_files_per_second = 50

View File

@ -31,7 +31,7 @@ from swift.common.ring import Ring
from swift.common.utils import get_logger, whataremyips, ismount, \
config_true_value, Timestamp
from swift.common.daemon import Daemon
from swift.common.storage_policy import POLICIES, POLICY_INDEX
from swift.common.storage_policy import POLICIES
class AccountReaper(Daemon):
@ -352,7 +352,7 @@ class AccountReaper(Daemon):
if not objects:
break
try:
policy_index = headers.get(POLICY_INDEX, 0)
policy_index = headers.get('X-Backend-Storage-Policy-Index', 0)
for obj in objects:
if isinstance(obj['name'], unicode):
obj['name'] = obj['name'].encode('utf8')
@ -442,7 +442,7 @@ class AccountReaper(Daemon):
headers={'X-Container-Host': '%(ip)s:%(port)s' % cnode,
'X-Container-Partition': str(container_partition),
'X-Container-Device': cnode['device'],
POLICY_INDEX: policy_index})
'X-Backend-Storage-Policy-Index': policy_index})
successes += 1
self.stats_return_codes[2] = \
self.stats_return_codes.get(2, 0) + 1

View File

@ -38,7 +38,6 @@ from swift.common.swob import HTTPAccepted, HTTPBadRequest, \
HTTPPreconditionFailed, HTTPConflict, Request, \
HTTPInsufficientStorage, HTTPException
from swift.common.request_helpers import is_sys_or_user_meta
from swift.common.storage_policy import POLICY_INDEX
class AccountController(object):
@ -110,7 +109,8 @@ class AccountController(object):
else:
timestamp = valid_timestamp(req)
pending_timeout = None
container_policy_index = req.headers.get(POLICY_INDEX, 0)
container_policy_index = \
req.headers.get('X-Backend-Storage-Policy-Index', 0)
if 'x-trans-id' in req.headers:
pending_timeout = 3
broker = self._get_account_broker(drive, part, account,

View File

@ -25,7 +25,7 @@ from swift.account.backend import AccountBroker, DATADIR as ABDATADIR
from swift.container.backend import ContainerBroker, DATADIR as CBDATADIR
from swift.obj.diskfile import get_data_dir, read_metadata, DATADIR_BASE, \
extract_policy_index
from swift.common.storage_policy import POLICIES, POLICY_INDEX
from swift.common.storage_policy import POLICIES
class InfoSystemExit(Exception):
@ -101,14 +101,16 @@ def print_ring_locations(ring, datadir, account, container=None, obj=None,
% (node['ip'], node['port'], node['device'], part,
urllib.quote(target))
if policy_index is not None:
cmd += ' -H "%s: %s"' % (POLICY_INDEX, policy_index)
cmd += ' -H "%s: %s"' % ('X-Backend-Storage-Policy-Index',
policy_index)
print cmd
for node in handoff_nodes:
cmd = 'curl -I -XHEAD "http://%s:%s/%s/%s/%s"' \
% (node['ip'], node['port'], node['device'], part,
urllib.quote(target))
if policy_index is not None:
cmd += ' -H "%s: %s"' % (POLICY_INDEX, policy_index)
cmd += ' -H "%s: %s"' % ('X-Backend-Storage-Policy-Index',
policy_index)
cmd += ' # [Handoff]'
print cmd

View File

@ -558,12 +558,18 @@ class SwiftRecon(object):
"""
Generator that yields all values for given key in a recon cache entry.
This is for use with object auditor recon cache entries. If the
object auditor has run in 'once' mode with a subset of devices
specified the checksum auditor section will have an entry of the form:
{'object_auditor_stats_ALL': { 'disk1disk2diskN': {..}}
The same is true of the ZBF auditor cache entry section. We use this
generator to find all instances of a particular key in these multi-
level dictionaries.
object auditor has run in parallel, the recon cache will have entries
of the form: {'object_auditor_stats_ALL': { 'disk1': {..},
'disk2': {..},
'disk3': {..},
...}}
If the object auditor hasn't run in parallel, the recon cache will have
entries of the form: {'object_auditor_stats_ALL': {...}}.
The ZBF auditor doesn't run in parallel. However, if a subset of
devices is selected for auditing, the recon cache will have an entry
of the form: {'object_auditor_stats_ZBF': { 'disk1disk2..diskN': {}}
We use this generator to find all instances of a particular key in
these multi-level dictionaries.
"""
for k, v in recon_entry.items():
if isinstance(v, dict):
@ -597,15 +603,15 @@ class SwiftRecon(object):
zbf_scan[url] = response['object_auditor_stats_ZBF']
if len(all_scan) > 0:
stats = {}
stats[atime] = [(self.nested_get_value(atime, all_scan[i]))
stats[atime] = [sum(self.nested_get_value(atime, all_scan[i]))
for i in all_scan]
stats[bprocessed] = [(self.nested_get_value(bprocessed,
stats[bprocessed] = [sum(self.nested_get_value(bprocessed,
all_scan[i])) for i in all_scan]
stats[passes] = [(self.nested_get_value(passes, all_scan[i]))
stats[passes] = [sum(self.nested_get_value(passes, all_scan[i]))
for i in all_scan]
stats[errors] = [(self.nested_get_value(errors, all_scan[i]))
stats[errors] = [sum(self.nested_get_value(errors, all_scan[i]))
for i in all_scan]
stats[quarantined] = [(self.nested_get_value(quarantined,
stats[quarantined] = [sum(self.nested_get_value(quarantined,
all_scan[i])) for i in all_scan]
for k in stats:
if None in stats[k]:
@ -623,13 +629,13 @@ class SwiftRecon(object):
print("[ALL_auditor] - No hosts returned valid data.")
if len(zbf_scan) > 0:
stats = {}
stats[atime] = [(self.nested_get_value(atime, zbf_scan[i]))
stats[atime] = [sum(self.nested_get_value(atime, zbf_scan[i]))
for i in zbf_scan]
stats[bprocessed] = [(self.nested_get_value(bprocessed,
stats[bprocessed] = [sum(self.nested_get_value(bprocessed,
zbf_scan[i])) for i in zbf_scan]
stats[errors] = [(self.nested_get_value(errors, zbf_scan[i]))
stats[errors] = [sum(self.nested_get_value(errors, zbf_scan[i]))
for i in zbf_scan]
stats[quarantined] = [(self.nested_get_value(quarantined,
stats[quarantined] = [sum(self.nested_get_value(quarantined,
zbf_scan[i])) for i in zbf_scan]
for k in stats:
if None in stats[k]:

View File

@ -728,14 +728,14 @@ class SimpleClient(object):
max_backoff=5, retries=5):
self.url = url
self.token = token
self.attempts = 0
self.starting_backoff = starting_backoff
self.max_backoff = max_backoff
self.retries = retries
def base_request(self, method, container=None, name=None, prefix=None,
headers=None, proxy=None, contents=None,
full_listing=None, logger=None, additional_info=None):
full_listing=None, logger=None, additional_info=None,
timeout=None):
# Common request method
trans_start = time()
url = self.url
@ -756,16 +756,12 @@ class SimpleClient(object):
if prefix:
url += '&prefix=%s' % prefix
req = urllib2.Request(url, headers=headers, data=contents)
if proxy:
proxy = urlparse.urlparse(proxy)
proxy = urllib2.ProxyHandler({proxy.scheme: proxy.netloc})
opener = urllib2.build_opener(proxy)
urllib2.install_opener(opener)
req = urllib2.Request(url, headers=headers, data=contents)
req.set_proxy(proxy.netloc, proxy.scheme)
req.get_method = lambda: method
urllib2.urlopen(req)
conn = urllib2.urlopen(req)
conn = urllib2.urlopen(req, timeout=timeout)
body = conn.read()
try:
body_data = json.loads(body)
@ -799,14 +795,15 @@ class SimpleClient(object):
return [None, body_data]
def retry_request(self, method, **kwargs):
self.attempts = 0
retries = kwargs.pop('retries', self.retries)
attempts = 0
backoff = self.starting_backoff
while self.attempts <= self.retries:
self.attempts += 1
while attempts <= retries:
attempts += 1
try:
return self.base_request(method, **kwargs)
except (socket.error, httplib.HTTPException, urllib2.URLError):
if self.attempts > self.retries:
if attempts > retries:
raise
sleep(backoff)
backoff = min(backoff * 2, self.max_backoff)

View File

@ -30,7 +30,6 @@ from swift.common.exceptions import ListingIterError, SegmentError
from swift.common.http import is_success, HTTP_SERVICE_UNAVAILABLE
from swift.common.swob import HTTPBadRequest, HTTPNotAcceptable
from swift.common.utils import split_path, validate_device_partition
from swift.common.storage_policy import POLICY_INDEX
from swift.common.wsgi import make_subrequest
@ -91,7 +90,7 @@ def get_name_and_placement(request, minsegs=1, maxsegs=None,
storage_policy_index appended on the end
:raises: HTTPBadRequest
"""
policy_idx = request.headers.get(POLICY_INDEX, '0')
policy_idx = request.headers.get('X-Backend-Storage-Policy-Index', '0')
policy_idx = int(policy_idx)
results = split_and_validate_path(request, minsegs=minsegs,
maxsegs=maxsegs,

View File

@ -15,6 +15,7 @@
import array
import cPickle as pickle
import inspect
from collections import defaultdict
from gzip import GzipFile
from os.path import getmtime
@ -112,10 +113,10 @@ class RingData(object):
# This only works on Python 2.7; on 2.6, we always get the
# current time in the gzip output.
tempf = NamedTemporaryFile(dir=".", prefix=filename, delete=False)
try:
if 'mtime' in inspect.getargspec(GzipFile.__init__).args:
gz_file = GzipFile(filename, mode='wb', fileobj=tempf,
mtime=mtime)
except TypeError:
else:
gz_file = GzipFile(filename, mode='wb', fileobj=tempf)
self.serialize_v1(gz_file)
gz_file.close()

View File

@ -18,8 +18,6 @@ import string
from swift.common.utils import config_true_value, SWIFT_CONF_FILE
from swift.common.ring import Ring
POLICY = 'X-Storage-Policy'
POLICY_INDEX = 'X-Backend-Storage-Policy-Index'
LEGACY_POLICY_NAME = 'Policy-0'
VALID_CHARS = '-' + string.letters + string.digits

View File

@ -64,7 +64,9 @@ utf8_encoder = codecs.getencoder('utf-8')
from swift import gettext_ as _
import swift.common.exceptions
from swift.common.http import is_success, is_redirection, HTTP_NOT_FOUND
from swift.common.http import is_success, is_redirection, HTTP_NOT_FOUND, \
HTTP_PRECONDITION_FAILED, HTTP_REQUESTED_RANGE_NOT_SATISFIABLE
# logging doesn't import patched as cleanly as one would like
from logging.handlers import SysLogHandler
@ -989,6 +991,24 @@ class StatsdClient(object):
sample_rate)
def server_handled_successfully(status_int):
"""
True for successful responses *or* error codes that are not Swift's fault,
False otherwise. For example, 500 is definitely the server's fault, but
412 is an error code (4xx are all errors) that is due to a header the
client sent.
If one is tracking error rates to monitor server health, one would be
advised to use a function like this one, lest a client cause a flurry of
404s or 416s and make a spurious spike in your errors graph.
"""
return (is_success(status_int) or
is_redirection(status_int) or
status_int == HTTP_NOT_FOUND or
status_int == HTTP_PRECONDITION_FAILED or
status_int == HTTP_REQUESTED_RANGE_NOT_SATISFIABLE)
def timing_stats(**dec_kwargs):
"""
Returns a decorator that logs timing events or errors for public methods in
@ -1001,9 +1021,7 @@ def timing_stats(**dec_kwargs):
def _timing_stats(ctrl, *args, **kwargs):
start_time = time.time()
resp = func(ctrl, *args, **kwargs)
if is_success(resp.status_int) or \
is_redirection(resp.status_int) or \
resp.status_int == HTTP_NOT_FOUND:
if server_handled_successfully(resp.status_int):
ctrl.logger.timing_since(method + '.timing',
start_time, **dec_kwargs)
else:
@ -1896,11 +1914,16 @@ def audit_location_generator(devices, datadir, suffix='',
for device in device_dir:
if mount_check and not ismount(os.path.join(devices, device)):
if logger:
logger.debug(
logger.warning(
_('Skipping %s as it is not mounted'), device)
continue
datadir_path = os.path.join(devices, device, datadir)
partitions = listdir(datadir_path)
try:
partitions = listdir(datadir_path)
except OSError as e:
if logger:
logger.warning('Skipping %s because %s', datadir_path, e)
continue
for partition in partitions:
part_path = os.path.join(datadir_path, partition)
try:
@ -2284,7 +2307,8 @@ def put_recon_cache_entry(cache_entry, key, item):
"""
Function that will check if item is a dict, and if so put it under
cache_entry[key]. We use nested recon cache entries when the object
auditor runs in 'once' mode with a specified subset of devices.
auditor runs in parallel or else in 'once' mode with a specified
subset of devices.
"""
if isinstance(item, dict):
if key not in cache_entry or key in cache_entry and not \

View File

@ -25,7 +25,6 @@ from swift.common.direct_client import (
direct_head_container, direct_delete_container_object,
direct_put_container_object, ClientException)
from swift.common.internal_client import InternalClient, UnexpectedResponse
from swift.common.storage_policy import POLICY_INDEX
from swift.common.utils import get_logger, split_path, quorum_size, \
FileLikeIter, Timestamp, last_modified_date_to_timestamp, \
LRUCache
@ -100,7 +99,7 @@ def incorrect_policy_index(info, remote_info):
def translate_container_headers_to_info(headers):
default_timestamp = Timestamp(0).internal
return {
'storage_policy_index': int(headers[POLICY_INDEX]),
'storage_policy_index': int(headers['X-Backend-Storage-Policy-Index']),
'put_timestamp': headers.get('x-backend-put-timestamp',
default_timestamp),
'delete_timestamp': headers.get('x-backend-delete-timestamp',
@ -373,7 +372,7 @@ class ContainerReconciler(Daemon):
an object was manually re-enqued.
"""
q_path = '/%s/%s/%s' % (MISPLACED_OBJECTS_ACCOUNT, container, obj)
x_timestamp = slightly_later_timestamp(q_record)
x_timestamp = slightly_later_timestamp(max(q_record, q_ts))
self.stats_log('pop_queue', 'remove %r (%f) from the queue (%s)',
q_path, q_ts, x_timestamp)
headers = {'X-Timestamp': x_timestamp}

View File

@ -37,7 +37,7 @@ from swift.common import constraints
from swift.common.bufferedhttp import http_connect
from swift.common.exceptions import ConnectionTimeout
from swift.common.http import HTTP_NOT_FOUND, is_success
from swift.common.storage_policy import POLICIES, POLICY_INDEX
from swift.common.storage_policy import POLICIES
from swift.common.swob import HTTPAccepted, HTTPBadRequest, HTTPConflict, \
HTTPCreated, HTTPInternalServerError, HTTPNoContent, HTTPNotFound, \
HTTPPreconditionFailed, HTTPMethodNotAllowed, Request, Response, \
@ -57,7 +57,7 @@ def gen_resp_headers(info, is_deleted=False):
info.get('delete_timestamp', 0)).internal,
'X-Backend-Status-Changed-At': Timestamp(
info.get('status_changed_at', 0)).internal,
POLICY_INDEX: info.get('storage_policy_index', 0),
'X-Backend-Storage-Policy-Index': info.get('storage_policy_index', 0),
}
if not is_deleted:
# base container info on deleted containers is not exposed to client
@ -137,7 +137,7 @@ class ContainerController(object):
:raises: HTTPBadRequest if the supplied index is bogus
"""
policy_index = req.headers.get(POLICY_INDEX, None)
policy_index = req.headers.get('X-Backend-Storage-Policy-Index', None)
if policy_index is None:
return None
@ -202,7 +202,7 @@ class ContainerController(object):
'x-object-count': info['object_count'],
'x-bytes-used': info['bytes_used'],
'x-trans-id': req.headers.get('x-trans-id', '-'),
POLICY_INDEX: info['storage_policy_index'],
'X-Backend-Storage-Policy-Index': info['storage_policy_index'],
'user-agent': 'container-server %s' % os.getpid(),
'referer': req.as_referer()})
if req.headers.get('x-account-override-deleted', 'no').lower() == \

View File

@ -35,7 +35,7 @@ from swift.common.utils import (
whataremyips, Timestamp)
from swift.common.daemon import Daemon
from swift.common.http import HTTP_UNAUTHORIZED, HTTP_NOT_FOUND
from swift.common.storage_policy import POLICIES, POLICY_INDEX
from swift.common.storage_policy import POLICIES
class ContainerSync(Daemon):
@ -376,7 +376,8 @@ class ContainerSync(Daemon):
looking_for_timestamp = Timestamp(row['created_at'])
timestamp = -1
headers = body = None
headers_out = {POLICY_INDEX: str(info['storage_policy_index'])}
headers_out = {'X-Backend-Storage-Policy-Index':
str(info['storage_policy_index'])}
for node in nodes:
try:
these_headers, this_body = direct_get_object(

View File

@ -33,7 +33,6 @@ from swift.common.utils import get_logger, config_true_value, ismount, \
dump_recon_cache, quorum_size, Timestamp
from swift.common.daemon import Daemon
from swift.common.http import is_success, HTTP_INTERNAL_SERVER_ERROR
from swift.common.storage_policy import POLICY_INDEX
class ContainerUpdater(Daemon):
@ -278,7 +277,7 @@ class ContainerUpdater(Daemon):
'X-Object-Count': count,
'X-Bytes-Used': bytes,
'X-Account-Override-Deleted': 'yes',
POLICY_INDEX: storage_policy_index,
'X-Backend-Storage-Policy-Index': storage_policy_index,
'user-agent': self.user_agent}
conn = http_connect(
node['ip'], node['port'], node['device'], part,

View File

@ -17,6 +17,7 @@ import os
import sys
import time
import signal
from random import shuffle
from swift import gettext_ as _
from contextlib import closing
from eventlet import Timeout
@ -72,7 +73,10 @@ class AuditorWorker(object):
description = ''
if device_dirs:
device_dir_str = ','.join(sorted(device_dirs))
description = _(' - %s') % device_dir_str
if self.auditor_type == 'ALL':
description = _(' - parallel, %s') % device_dir_str
else:
description = _(' - %s') % device_dir_str
self.logger.info(_('Begin object audit "%s" mode (%s%s)') %
(mode, self.auditor_type, description))
begin = reported = time.time()
@ -223,6 +227,7 @@ class ObjectAuditor(Daemon):
self.conf = conf
self.logger = get_logger(conf, log_route='object-auditor')
self.devices = conf.get('devices', '/srv/node')
self.concurrency = int(conf.get('concurrency', 1))
self.conf_zero_byte_fps = int(
conf.get('zero_byte_files_per_second', 50))
self.recon_cache_path = conf.get('recon_cache_path',
@ -260,7 +265,7 @@ class ObjectAuditor(Daemon):
sys.exit()
def audit_loop(self, parent, zbo_fps, override_devices=None, **kwargs):
"""Audit loop"""
"""Parallel audit loop"""
self.clear_recon_cache('ALL')
self.clear_recon_cache('ZBF')
kwargs['device_dirs'] = override_devices
@ -272,7 +277,34 @@ class ObjectAuditor(Daemon):
if self.conf_zero_byte_fps:
zbf_pid = self.fork_child(zero_byte_fps=True, **kwargs)
pids.append(zbf_pid)
pids.append(self.fork_child(**kwargs))
if self.concurrency == 1:
# Audit all devices in 1 process
pids.append(self.fork_child(**kwargs))
else:
# Divide devices amongst parallel processes set by
# self.concurrency. Total number of parallel processes
# is self.concurrency + 1 if zero_byte_fps.
parallel_proc = self.concurrency + 1 if \
self.conf_zero_byte_fps else self.concurrency
device_list = list(override_devices) if override_devices else \
listdir(self.devices)
shuffle(device_list)
while device_list:
pid = None
if len(pids) == parallel_proc:
pid = os.wait()[0]
pids.remove(pid)
# ZBF scanner must be restarted as soon as it finishes
if self.conf_zero_byte_fps and pid == zbf_pid:
kwargs['device_dirs'] = override_devices
# sleep between ZBF scanner forks
self._sleep()
zbf_pid = self.fork_child(zero_byte_fps=True,
**kwargs)
pids.append(zbf_pid)
else:
kwargs['device_dirs'] = [device_list.pop()]
pids.append(self.fork_child(**kwargs))
while pids:
pid = os.wait()[0]
# ZBF scanner must be restarted as soon as it finishes

View File

@ -37,7 +37,7 @@ from swift.common.http import HTTP_OK, HTTP_INSUFFICIENT_STORAGE
from swift.obj import ssync_sender
from swift.obj.diskfile import (DiskFileManager, get_hashes, get_data_dir,
get_tmp_dir)
from swift.common.storage_policy import POLICY_INDEX, POLICIES
from swift.common.storage_policy import POLICIES
hubs.use_hub(get_hub())
@ -228,7 +228,7 @@ class ObjectReplicator(Daemon):
if len(suff) == 3 and isdir(join(path, suff))]
self.replication_count += 1
self.logger.increment('partition.delete.count.%s' % (job['device'],))
self.headers[POLICY_INDEX] = job['policy_idx']
self.headers['X-Backend-Storage-Policy-Index'] = job['policy_idx']
begin = time.time()
try:
responses = []
@ -270,7 +270,7 @@ class ObjectReplicator(Daemon):
"""
self.replication_count += 1
self.logger.increment('partition.update.count.%s' % (job['device'],))
self.headers[POLICY_INDEX] = job['policy_idx']
self.headers['X-Backend-Storage-Policy-Index'] = job['policy_idx']
begin = time.time()
try:
hashed, local_hash = tpool_reraise(

View File

@ -46,7 +46,6 @@ from swift.common.swob import HTTPAccepted, HTTPBadRequest, HTTPCreated, \
HTTPInsufficientStorage, HTTPForbidden, HTTPException, HeaderKeyDict, \
HTTPConflict
from swift.obj.diskfile import DATAFILE_SYSTEM_META, DiskFileManager
from swift.common.storage_policy import POLICY_INDEX
class ObjectController(object):
@ -113,7 +112,7 @@ class ObjectController(object):
# network_chunk_size parameter value instead.
socket._fileobject.default_bufsize = self.network_chunk_size
# Provide further setup sepecific to an object server implemenation.
# Provide further setup specific to an object server implementation.
self.setup(conf)
def setup(self, conf):
@ -240,7 +239,7 @@ class ObjectController(object):
headers_out['x-trans-id'] = headers_in.get('x-trans-id', '-')
headers_out['referer'] = request.as_referer()
headers_out[POLICY_INDEX] = policy_idx
headers_out['X-Backend-Storage-Policy-Index'] = policy_idx
for conthost, contdevice in updates:
self.async_update(op, account, container, obj, conthost,
contpartition, contdevice, headers_out,
@ -270,7 +269,8 @@ class ObjectController(object):
hosts = contdevices = [None]
headers_in = request.headers
headers_out = HeaderKeyDict({
POLICY_INDEX: 0, # system accounts are always Policy-0
# system accounts are always Policy-0
'X-Backend-Storage-Policy-Index': 0,
'x-timestamp': request.timestamp.internal,
'x-trans-id': headers_in.get('x-trans-id', '-'),
'referer': request.as_referer()})

View File

@ -25,8 +25,6 @@ from swift.common import http
from swift.common import swob
from swift.common import utils
from swift.common.storage_policy import POLICY_INDEX
class Receiver(object):
"""
@ -170,7 +168,8 @@ class Receiver(object):
self.request.environ['eventlet.minimum_write_chunk_size'] = 0
self.device, self.partition = utils.split_path(
urllib.unquote(self.request.path), 2, 2, False)
self.policy_idx = int(self.request.headers.get(POLICY_INDEX, 0))
self.policy_idx = \
int(self.request.headers.get('X-Backend-Storage-Policy-Index', 0))
utils.validate_device_partition(self.device, self.partition)
if self.app._diskfile_mgr.mount_check and \
not constraints.check_mount(
@ -354,7 +353,7 @@ class Receiver(object):
subreq_iter())
else:
raise Exception('Invalid subrequest method %s' % method)
subreq.headers[POLICY_INDEX] = self.policy_idx
subreq.headers['X-Backend-Storage-Policy-Index'] = self.policy_idx
subreq.headers['X-Backend-Replication'] = 'True'
if replication_headers:
subreq.headers['X-Backend-Replication-Headers'] = \

View File

@ -18,8 +18,6 @@ from swift.common import bufferedhttp
from swift.common import exceptions
from swift.common import http
from swift.common.storage_policy import POLICY_INDEX
class Sender(object):
"""
@ -100,7 +98,8 @@ class Sender(object):
self.connection.putrequest('REPLICATION', '/%s/%s' % (
self.node['device'], self.job['partition']))
self.connection.putheader('Transfer-Encoding', 'chunked')
self.connection.putheader(POLICY_INDEX, self.policy_idx)
self.connection.putheader('X-Backend-Storage-Policy-Index',
self.policy_idx)
self.connection.endheaders()
with exceptions.MessageTimeout(
self.daemon.node_timeout, 'connect receive'):

View File

@ -26,7 +26,6 @@ from eventlet import patcher, Timeout
from swift.common.bufferedhttp import http_connect
from swift.common.exceptions import ConnectionTimeout
from swift.common.ring import Ring
from swift.common.storage_policy import POLICY_INDEX
from swift.common.utils import get_logger, renamer, write_pickle, \
dump_recon_cache, config_true_value, ismount
from swift.common.daemon import Daemon
@ -222,7 +221,8 @@ class ObjectUpdater(Daemon):
for node in nodes:
if node['id'] not in successes:
headers = update['headers'].copy()
headers.setdefault(POLICY_INDEX, str(policy_idx))
headers.setdefault('X-Backend-Storage-Policy-Index',
str(policy_idx))
status = self.object_update(node, part, update['op'], obj,
headers)
if not is_success(status) and status != HTTP_NOT_FOUND:

View File

@ -50,7 +50,7 @@ from swift.common.swob import Request, Response, HeaderKeyDict, Range, \
HTTPException, HTTPRequestedRangeNotSatisfiable
from swift.common.request_helpers import strip_sys_meta_prefix, \
strip_user_meta_prefix, is_user_meta, is_sys_meta, is_sys_or_user_meta
from swift.common.storage_policy import POLICY_INDEX, POLICY, POLICIES
from swift.common.storage_policy import POLICIES
def update_headers(response, headers):
@ -162,7 +162,8 @@ def headers_to_container_info(headers, status_int=HTTP_OK):
'object_count': headers.get('x-container-object-count'),
'bytes': headers.get('x-container-bytes-used'),
'versions': headers.get('x-versions-location'),
'storage_policy': headers.get(POLICY_INDEX.lower(), '0'),
'storage_policy': headers.get('X-Backend-Storage-Policy-Index'.lower(),
'0'),
'cors': {
'allow_origin': meta.get('access-control-allow-origin'),
'expose_headers': meta.get('access-control-expose-headers'),
@ -295,6 +296,7 @@ def get_container_info(env, app, swift_source=None):
swift_source=swift_source)
if not info:
info = headers_to_container_info({}, 0)
info.setdefault('storage_policy', '0')
return info
@ -987,6 +989,7 @@ class Controller(object):
else:
info['partition'] = part
info['nodes'] = nodes
info.setdefault('storage_policy', '0')
return info
def _make_request(self, nodes, part, method, path, headers, query,
@ -1206,14 +1209,18 @@ class Controller(object):
pass
# if a backend policy index is present in resp headers, translate it
# here with the friendly policy name
if POLICY_INDEX in res.headers and is_success(res.status_int):
policy = POLICIES.get_by_index(res.headers[POLICY_INDEX])
if 'X-Backend-Storage-Policy-Index' in res.headers and \
is_success(res.status_int):
policy = \
POLICIES.get_by_index(
res.headers['X-Backend-Storage-Policy-Index'])
if policy:
res.headers[POLICY] = policy.name
res.headers['X-Storage-Policy'] = policy.name
else:
self.app.logger.error(
'Could not translate %s (%r) from %r to policy',
POLICY_INDEX, res.headers[POLICY_INDEX], path)
'X-Backend-Storage-Policy-Index',
res.headers['X-Backend-Storage-Policy-Index'], path)
return res
def is_origin_allowed(self, cors_info, origin):

View File

@ -23,7 +23,7 @@ from swift.common import constraints
from swift.common.http import HTTP_ACCEPTED
from swift.proxy.controllers.base import Controller, delay_denial, \
cors_validation, clear_info_cache
from swift.common.storage_policy import POLICIES, POLICY, POLICY_INDEX
from swift.common.storage_policy import POLICIES
from swift.common.swob import HTTPBadRequest, HTTPForbidden, \
HTTPNotFound
@ -55,7 +55,7 @@ class ContainerController(Controller):
:param req: incoming request
"""
policy_name = req.headers.get(POLICY)
policy_name = req.headers.get('X-Storage-Policy')
if not policy_name:
return
policy = POLICIES.get_by_name(policy_name)
@ -63,7 +63,7 @@ class ContainerController(Controller):
raise HTTPBadRequest(request=req,
content_type="text/plain",
body=("Invalid %s '%s'"
% (POLICY, policy_name)))
% ('X-Storage-Policy', policy_name)))
if policy.is_deprecated:
body = 'Storage Policy %r is deprecated' % (policy.name)
raise HTTPBadRequest(request=req, body=body)
@ -214,7 +214,7 @@ class ContainerController(Controller):
additional['X-Backend-Storage-Policy-Default'] = \
int(POLICIES.default)
else:
additional[POLICY_INDEX] = str(policy_index)
additional['X-Backend-Storage-Policy-Index'] = str(policy_index)
headers = [self.generate_request_headers(req, transfer=True,
additional=additional)
for _junk in range(n_outgoing)]

View File

@ -56,7 +56,6 @@ from swift.common.swob import HTTPAccepted, HTTPBadRequest, HTTPNotFound, \
HTTPPreconditionFailed, HTTPRequestEntityTooLarge, HTTPRequestTimeout, \
HTTPServerError, HTTPServiceUnavailable, Request, \
HTTPClientDisconnect, HTTPNotImplemented
from swift.common.storage_policy import POLICY_INDEX
from swift.common.request_helpers import is_user_meta
@ -197,10 +196,10 @@ class ObjectController(Controller):
self.account_name, self.container_name, req)
req.acl = container_info['read_acl']
# pass the policy index to storage nodes via req header
policy_index = req.headers.get(POLICY_INDEX,
policy_index = req.headers.get('X-Backend-Storage-Policy-Index',
container_info['storage_policy'])
obj_ring = self.app.get_object_ring(policy_index)
req.headers[POLICY_INDEX] = policy_index
req.headers['X-Backend-Storage-Policy-Index'] = policy_index
if 'swift.authorize' in req.environ:
aresp = req.environ['swift.authorize'](req)
if aresp:
@ -303,10 +302,10 @@ class ObjectController(Controller):
else:
delete_at_container = delete_at_part = delete_at_nodes = None
# pass the policy index to storage nodes via req header
policy_index = req.headers.get(POLICY_INDEX,
policy_index = req.headers.get('X-Backend-Storage-Policy-Index',
container_info['storage_policy'])
obj_ring = self.app.get_object_ring(policy_index)
req.headers[POLICY_INDEX] = policy_index
req.headers['X-Backend-Storage-Policy-Index'] = policy_index
partition, nodes = obj_ring.get_nodes(
self.account_name, self.container_name, self.object_name)
req.headers['X-Timestamp'] = Timestamp(time.time()).internal
@ -458,11 +457,11 @@ class ObjectController(Controller):
body='If-None-Match only supports *')
container_info = self.container_info(
self.account_name, self.container_name, req)
policy_index = req.headers.get(POLICY_INDEX,
policy_index = req.headers.get('X-Backend-Storage-Policy-Index',
container_info['storage_policy'])
obj_ring = self.app.get_object_ring(policy_index)
# pass the policy index to storage nodes via req header
req.headers[POLICY_INDEX] = policy_index
req.headers['X-Backend-Storage-Policy-Index'] = policy_index
container_partition = container_info['partition']
containers = container_info['nodes']
req.acl = container_info['write_acl']
@ -500,7 +499,7 @@ class ObjectController(Controller):
(object_versions and not
req.environ.get('swift_versioned_copy')):
# make sure proxy-server uses the right policy index
_headers = {POLICY_INDEX: req.headers[POLICY_INDEX],
_headers = {'X-Backend-Storage-Policy-Index': policy_index,
'X-Newest': 'True'}
hreq = Request.blank(req.path_info, headers=_headers,
environ={'REQUEST_METHOD': 'HEAD'})
@ -586,7 +585,7 @@ class ObjectController(Controller):
src_container_name, src_obj_name)
source_req = req.copy_get()
# make sure the source request uses it's container_info
source_req.headers.pop(POLICY_INDEX, None)
source_req.headers.pop('X-Backend-Storage-Policy-Index', None)
source_req.path_info = source_header
source_req.headers['X-Newest'] = 'true'
orig_obj_name = self.object_name
@ -777,11 +776,11 @@ class ObjectController(Controller):
container_info = self.container_info(
self.account_name, self.container_name, req)
# pass the policy index to storage nodes via req header
policy_index = req.headers.get(POLICY_INDEX,
policy_index = req.headers.get('X-Backend-Storage-Policy-Index',
container_info['storage_policy'])
obj_ring = self.app.get_object_ring(policy_index)
# pass the policy index to storage nodes via req header
req.headers[POLICY_INDEX] = policy_index
req.headers['X-Backend-Storage-Policy-Index'] = policy_index
container_partition = container_info['partition']
containers = container_info['nodes']
req.acl = container_info['write_acl']
@ -838,7 +837,8 @@ class ObjectController(Controller):
policy_idx = container_info['storage_policy']
obj_ring = self.app.get_object_ring(policy_idx)
# pass the policy index to storage nodes via req header
new_del_req.headers[POLICY_INDEX] = policy_idx
new_del_req.headers['X-Backend-Storage-Policy-Index'] = \
policy_idx
container_partition = container_info['partition']
containers = container_info['nodes']
new_del_req.acl = container_info['write_acl']

View File

@ -28,8 +28,6 @@ import uuid
import eventlet
from nose import SkipTest
from swift.common.storage_policy import POLICY
from test.functional import normalized_urls, load_constraint, cluster_info
import test.functional as tf
from test.functional.swift_test_client import Account, Connection, File, \
@ -2120,13 +2118,13 @@ class TestCrossPolicyObjectVersioningEnv(object):
cls.versions_container = cls.account.container(prefix + "-versions")
if not cls.versions_container.create(
{POLICY: policy['name']}):
{'X-Storage-Policy': policy['name']}):
raise ResponseError(cls.conn.response)
cls.container = cls.account.container(prefix + "-objs")
if not cls.container.create(
hdrs={'X-Versions-Location': cls.versions_container.name,
POLICY: version_policy['name']}):
'X-Storage-Policy': version_policy['name']}):
raise ResponseError(cls.conn.response)
container_info = cls.container.info()

View File

@ -27,7 +27,7 @@ from nose import SkipTest
from swift.common.manager import Manager
from swift.common.internal_client import InternalClient
from swift.common import utils, direct_client, ring
from swift.common.storage_policy import POLICIES, POLICY_INDEX
from swift.common.storage_policy import POLICIES
from swift.common.http import HTTP_NOT_FOUND
from test.probe.common import reset_environment, get_to_final_state
@ -203,8 +203,9 @@ class TestContainerMergePolicyIndex(unittest.TestCase):
metadata = direct_client.direct_head_container(
node, container_part, self.account, self.container_name)
head_responses.append((node, metadata))
found_policy_indexes = set(metadata[POLICY_INDEX] for
node, metadata in head_responses)
found_policy_indexes = \
set(metadata['X-Backend-Storage-Policy-Index'] for
node, metadata in head_responses)
self.assert_(len(found_policy_indexes) > 1,
'primary nodes did not disagree about policy index %r' %
head_responses)
@ -218,7 +219,9 @@ class TestContainerMergePolicyIndex(unittest.TestCase):
try:
direct_client.direct_head_object(
node, part, self.account, self.container_name,
self.object_name, headers={POLICY_INDEX: policy_index})
self.object_name,
headers={'X-Backend-Storage-Policy-Index':
policy_index})
except direct_client.ClientException as err:
continue
orig_policy_index = policy_index
@ -237,8 +240,9 @@ class TestContainerMergePolicyIndex(unittest.TestCase):
metadata = direct_client.direct_head_container(
node, container_part, self.account, self.container_name)
head_responses.append((node, metadata))
found_policy_indexes = set(metadata[POLICY_INDEX] for
node, metadata in head_responses)
found_policy_indexes = \
set(metadata['X-Backend-Storage-Policy-Index'] for
node, metadata in head_responses)
self.assert_(len(found_policy_indexes) == 1,
'primary nodes disagree about policy index %r' %
head_responses)
@ -253,7 +257,7 @@ class TestContainerMergePolicyIndex(unittest.TestCase):
direct_client.direct_head_object(
node, part, self.account, self.container_name,
self.object_name, headers={
POLICY_INDEX: orig_policy_index})
'X-Backend-Storage-Policy-Index': orig_policy_index})
except direct_client.ClientException as err:
if err.http_status == HTTP_NOT_FOUND:
continue
@ -299,8 +303,9 @@ class TestContainerMergePolicyIndex(unittest.TestCase):
metadata = direct_client.direct_head_container(
node, container_part, self.account, self.container_name)
head_responses.append((node, metadata))
found_policy_indexes = set(metadata[POLICY_INDEX] for
node, metadata in head_responses)
found_policy_indexes = \
set(metadata['X-Backend-Storage-Policy-Index'] for
node, metadata in head_responses)
self.assert_(len(found_policy_indexes) > 1,
'primary nodes did not disagree about policy index %r' %
head_responses)
@ -314,7 +319,9 @@ class TestContainerMergePolicyIndex(unittest.TestCase):
try:
direct_client.direct_head_object(
node, part, self.account, self.container_name,
self.object_name, headers={POLICY_INDEX: policy_index})
self.object_name,
headers={'X-Backend-Storage-Policy-Index':
policy_index})
except direct_client.ClientException as err:
if 'x-backend-timestamp' in err.http_headers:
ts_policy_index = policy_index
@ -338,11 +345,13 @@ class TestContainerMergePolicyIndex(unittest.TestCase):
metadata = direct_client.direct_head_container(
node, container_part, self.account, self.container_name)
head_responses.append((node, metadata))
new_found_policy_indexes = set(metadata[POLICY_INDEX] for node,
metadata in head_responses)
new_found_policy_indexes = \
set(metadata['X-Backend-Storage-Policy-Index'] for node,
metadata in head_responses)
self.assert_(len(new_found_policy_indexes) == 1,
'primary nodes disagree about policy index %r' %
dict((node['port'], metadata[POLICY_INDEX])
dict((node['port'],
metadata['X-Backend-Storage-Policy-Index'])
for node, metadata in head_responses))
expected_policy_index = new_found_policy_indexes.pop()
self.assertEqual(orig_policy_index, expected_policy_index)
@ -355,7 +364,9 @@ class TestContainerMergePolicyIndex(unittest.TestCase):
try:
direct_client.direct_head_object(
node, part, self.account, self.container_name,
self.object_name, headers={POLICY_INDEX: policy_index})
self.object_name,
headers={'X-Backend-Storage-Policy-Index':
policy_index})
except direct_client.ClientException as err:
if err.http_status == HTTP_NOT_FOUND:
continue
@ -430,7 +441,7 @@ class TestContainerMergePolicyIndex(unittest.TestCase):
'x-container-device': ','.join(n['device'] for n in
self.container_ring.devs),
'x-container-partition': container_part,
POLICY_INDEX: wrong_policy.idx,
'X-Backend-Storage-Policy-Index': wrong_policy.idx,
'X-Static-Large-Object': 'True',
}
for node in nodes:
@ -575,6 +586,13 @@ class TestContainerMergePolicyIndex(unittest.TestCase):
acceptable_statuses=(4,),
headers={'X-Backend-Storage-Policy-Index': int(old_policy)})
# make sure the queue is settled
get_to_final_state()
for container in client.iter_containers('.misplaced_objects'):
for obj in client.iter_objects('.misplaced_objects',
container['name']):
self.fail('Found unexpected object %r in the queue' % obj)
def main():
options, commands = parser.parse_args()

View File

@ -24,7 +24,6 @@ from uuid import uuid4
from swiftclient import client
from swift.common import direct_client
from swift.common.storage_policy import POLICY_INDEX
from swift.obj.diskfile import get_data_dir
from swift.common.exceptions import ClientException
from test.probe.common import kill_server, kill_servers, reset_environment,\
@ -102,7 +101,7 @@ class TestEmptyDevice(TestCase):
another_onode = self.object_ring.get_more_nodes(opart).next()
odata = direct_client.direct_get_object(
another_onode, opart, self.account, container, obj,
headers={POLICY_INDEX: self.policy.idx})[-1]
headers={'X-Backend-Storage-Policy-Index': self.policy.idx})[-1]
if odata != 'VERIFY':
raise Exception('Direct object GET did not return VERIFY, instead '
'it returned: %s' % repr(odata))
@ -134,7 +133,7 @@ class TestEmptyDevice(TestCase):
try:
direct_client.direct_get_object(
onode, opart, self.account, container, obj, headers={
POLICY_INDEX: self.policy.idx})
'X-Backend-Storage-Policy-Index': self.policy.idx})
except ClientException as err:
exc = err
self.assertEquals(exc.http_status, 404)
@ -157,7 +156,7 @@ class TestEmptyDevice(TestCase):
odata = direct_client.direct_get_object(
onode, opart, self.account, container, obj, headers={
POLICY_INDEX: self.policy.idx})[-1]
'X-Backend-Storage-Policy-Index': self.policy.idx})[-1]
if odata != 'VERIFY':
raise Exception('Direct object GET did not return VERIFY, instead '
'it returned: %s' % repr(odata))
@ -165,7 +164,7 @@ class TestEmptyDevice(TestCase):
try:
direct_client.direct_get_object(
another_onode, opart, self.account, container, obj, headers={
POLICY_INDEX: self.policy.idx})
'X-Backend-Storage-Policy-Index': self.policy.idx})
except ClientException as err:
exc = err
self.assertEquals(exc.http_status, 404)

View File

@ -23,7 +23,6 @@ from uuid import uuid4
from swiftclient import client
from swift.common import direct_client
from swift.common.storage_policy import POLICY_INDEX
from swift.common.exceptions import ClientException
from swift.common.utils import hash_path, readconf
from swift.obj.diskfile import write_metadata, read_metadata, get_data_dir
@ -90,12 +89,12 @@ class TestObjectFailures(TestCase):
odata = direct_client.direct_get_object(
onode, opart, self.account, container, obj, headers={
POLICY_INDEX: self.policy.idx})[-1]
'X-Backend-Storage-Policy-Index': self.policy.idx})[-1]
self.assertEquals(odata, 'VERIFY')
try:
direct_client.direct_get_object(
onode, opart, self.account, container, obj, headers={
POLICY_INDEX: self.policy.idx})
'X-Backend-Storage-Policy-Index': self.policy.idx})
raise Exception("Did not quarantine object")
except ClientException as err:
self.assertEquals(err.http_status, 404)
@ -109,7 +108,7 @@ class TestObjectFailures(TestCase):
metadata = read_metadata(data_file)
metadata['ETag'] = 'badetag'
write_metadata(data_file, metadata)
base_headers = {POLICY_INDEX: self.policy.idx}
base_headers = {'X-Backend-Storage-Policy-Index': self.policy.idx}
for header, result in [({'Range': 'bytes=0-2'}, 'RAN'),
({'Range': 'bytes=1-11'}, 'ANGE'),
({'Range': 'bytes=0-11'}, 'RANGE')]:
@ -123,7 +122,7 @@ class TestObjectFailures(TestCase):
try:
direct_client.direct_get_object(
onode, opart, self.account, container, obj, headers={
POLICY_INDEX: self.policy.idx})
'X-Backend-Storage-Policy-Index': self.policy.idx})
raise Exception("Did not quarantine object")
except ClientException as err:
self.assertEquals(err.http_status, 404)
@ -140,7 +139,8 @@ class TestObjectFailures(TestCase):
try:
direct_client.direct_get_object(
onode, opart, self.account, container, obj, conn_timeout=1,
response_timeout=1, headers={POLICY_INDEX: self.policy.idx})
response_timeout=1, headers={'X-Backend-Storage-Policy-Index':
self.policy.idx})
raise Exception("Did not quarantine object")
except ClientException as err:
self.assertEquals(err.http_status, 404)
@ -157,7 +157,8 @@ class TestObjectFailures(TestCase):
try:
direct_client.direct_head_object(
onode, opart, self.account, container, obj, conn_timeout=1,
response_timeout=1, headers={POLICY_INDEX: self.policy.idx})
response_timeout=1, headers={'X-Backend-Storage-Policy-Index':
self.policy.idx})
raise Exception("Did not quarantine object")
except ClientException as err:
self.assertEquals(err.http_status, 404)
@ -173,7 +174,7 @@ class TestObjectFailures(TestCase):
write_metadata(fpointer, metadata)
try:
headers = {'X-Object-Meta-1': 'One', 'X-Object-Meta-Two': 'Two',
POLICY_INDEX: self.policy.idx}
'X-Backend-Storage-Policy-Index': self.policy.idx}
direct_client.direct_post_object(
onode, opart, self.account,
container, obj,

View File

@ -20,7 +20,6 @@ from uuid import uuid4
from swiftclient import client
from swift.common import direct_client
from swift.common.storage_policy import POLICY_INDEX
from swift.common.exceptions import ClientException
from swift.common.manager import Manager
from test.probe.common import kill_server, kill_servers, reset_environment, \
@ -92,7 +91,7 @@ class TestObjectHandoff(TestCase):
another_onode = self.object_ring.get_more_nodes(opart).next()
odata = direct_client.direct_get_object(
another_onode, opart, self.account, container, obj, headers={
POLICY_INDEX: self.policy.idx})[-1]
'X-Backend-Storage-Policy-Index': self.policy.idx})[-1]
if odata != 'VERIFY':
raise Exception('Direct object GET did not return VERIFY, instead '
'it returned: %s' % repr(odata))
@ -113,7 +112,7 @@ class TestObjectHandoff(TestCase):
try:
direct_client.direct_get_object(
onode, opart, self.account, container, obj, headers={
POLICY_INDEX: self.policy.idx})
'X-Backend-Storage-Policy-Index': self.policy.idx})
except ClientException as err:
exc = err
self.assertEquals(exc.http_status, 404)
@ -133,7 +132,7 @@ class TestObjectHandoff(TestCase):
Manager(['object-replicator']).once(number=another_num)
odata = direct_client.direct_get_object(
onode, opart, self.account, container, obj, headers={
POLICY_INDEX: self.policy.idx})[-1]
'X-Backend-Storage-Policy-Index': self.policy.idx})[-1]
if odata != 'VERIFY':
raise Exception('Direct object GET did not return VERIFY, instead '
'it returned: %s' % repr(odata))
@ -141,7 +140,7 @@ class TestObjectHandoff(TestCase):
try:
direct_client.direct_get_object(
another_onode, opart, self.account, container, obj, headers={
POLICY_INDEX: self.policy.idx})
'X-Backend-Storage-Policy-Index': self.policy.idx})
except ClientException as err:
exc = err
self.assertEquals(exc.http_status, 404)
@ -177,7 +176,7 @@ class TestObjectHandoff(TestCase):
start_server(onode['port'], self.port2server, self.pids)
direct_client.direct_get_object(
onode, opart, self.account, container, obj, headers={
POLICY_INDEX: self.policy.idx})
'X-Backend-Storage-Policy-Index': self.policy.idx})
# Run the extra server last so it'll remove its extra partition
for node in onodes:
try:
@ -192,7 +191,7 @@ class TestObjectHandoff(TestCase):
try:
direct_client.direct_get_object(
another_onode, opart, self.account, container, obj, headers={
POLICY_INDEX: self.policy.idx})
'X-Backend-Storage-Policy-Index': self.policy.idx})
except ClientException as err:
exc = err
self.assertEquals(exc.http_status, 404)

View File

@ -41,16 +41,18 @@ import cPickle as pickle
from gzip import GzipFile
import mock as mocklib
DEFAULT_PATCH_POLICIES = [storage_policy.StoragePolicy(0, 'nulo', True),
storage_policy.StoragePolicy(1, 'unu')]
LEGACY_PATCH_POLICIES = [storage_policy.StoragePolicy(0, 'legacy', True)]
def patch_policies(thing_or_policies=None, legacy_only=False):
if legacy_only:
default_policies = LEGACY_PATCH_POLICIES
default_policies = [storage_policy.StoragePolicy(
0, 'legacy', True, object_ring=FakeRing())]
else:
default_policies = DEFAULT_PATCH_POLICIES
default_policies = [
storage_policy.StoragePolicy(
0, 'nulo', True, object_ring=FakeRing()),
storage_policy.StoragePolicy(
1, 'unu', object_ring=FakeRing()),
]
thing_or_policies = thing_or_policies or default_policies
@ -138,6 +140,7 @@ class FakeRing(Ring):
self._reload()
def get_part(self, *args, **kwargs):
# always call the real method, even if the fake ignores the result
real_part = super(FakeRing, self).get_part(*args, **kwargs)
if self._part_shift == 32:
return 1
@ -604,12 +607,27 @@ def fake_http_connect(*code_iter, **kwargs):
class FakeConn(object):
def __init__(self, status, etag=None, body='', timestamp='1',
expect_status=None, headers=None):
self.status = status
if expect_status is None:
self.expect_status = self.status
headers=None):
# connect exception
if isinstance(status, (Exception, Timeout)):
raise status
if isinstance(status, tuple):
self.expect_status, self.status = status
else:
self.expect_status = expect_status
self.expect_status, self.status = (None, status)
if not self.expect_status:
# when a swift backend service returns a status before reading
# from the body (mostly an error response) eventlet.wsgi will
# respond with that status line immediately instead of 100
# Continue, even if the client sent the Expect 100 header.
# BufferedHttp and the proxy both see these error statuses
# when they call getexpect, so our FakeConn tries to act like
# our backend services and return certain types of responses
# as expect statuses just like a real backend server would do.
if self.status in (507, 412, 409):
self.expect_status = status
else:
self.expect_status = 100
self.reason = 'Fake'
self.host = '1.2.3.4'
self.port = '1234'
@ -626,9 +644,11 @@ def fake_http_connect(*code_iter, **kwargs):
self._next_sleep = None
def getresponse(self):
if isinstance(self.status, (Exception, Timeout)):
raise self.status
exc = kwargs.get('raise_exc')
if exc:
if isinstance(exc, Exception):
if isinstance(exc, (Exception, Timeout)):
raise exc
raise Exception('test')
if kwargs.get('raise_timeout_exc'):
@ -636,15 +656,9 @@ def fake_http_connect(*code_iter, **kwargs):
return self
def getexpect(self):
if self.expect_status == -2:
raise HTTPException()
if self.expect_status == -3:
return FakeConn(507)
if self.expect_status == -4:
return FakeConn(201)
if self.expect_status == 412:
return FakeConn(412)
return FakeConn(100)
if isinstance(self.expect_status, (Exception, Timeout)):
raise self.expect_status
return FakeConn(self.expect_status)
def getheaders(self):
etag = self.etag
@ -737,10 +751,6 @@ def fake_http_connect(*code_iter, **kwargs):
if 'give_connect' in kwargs:
kwargs['give_connect'](*args, **ckwargs)
status = code_iter.next()
if isinstance(status, tuple):
status, expect_status = status
else:
expect_status = status
etag = etag_iter.next()
headers = headers_iter.next()
timestamp = timestamps_iter.next()
@ -752,7 +762,7 @@ def fake_http_connect(*code_iter, **kwargs):
else:
body = body_iter.next()
return FakeConn(status, etag, body=body, timestamp=timestamp,
expect_status=expect_status, headers=headers)
headers=headers)
connect.code_iter = code_iter

View File

@ -30,7 +30,7 @@ from swift.common.exceptions import ClientException
from swift.common.utils import normalize_timestamp
from test import unit
from swift.common.storage_policy import StoragePolicy, POLICIES, POLICY_INDEX
from swift.common.storage_policy import StoragePolicy, POLICIES
class FakeLogger(object):
@ -291,7 +291,7 @@ class TestReaper(unittest.TestCase):
'X-Container-Host': host,
'X-Container-Partition': 'partition',
'X-Container-Device': device,
POLICY_INDEX: policy.idx
'X-Backend-Storage-Policy-Index': policy.idx
}
ring = r.get_object_ring(policy.idx)
expected = call(ring.devs[i], 1, 'a', 'c', 'o',
@ -322,7 +322,7 @@ class TestReaper(unittest.TestCase):
direct_get_container=DEFAULT,
direct_delete_object=DEFAULT,
direct_delete_container=DEFAULT) as mocks:
headers = {POLICY_INDEX: policy.idx}
headers = {'X-Backend-Storage-Policy-Index': policy.idx}
obj_listing = [{'name': 'o'}]
def fake_get_container(*args, **kwargs):
@ -340,7 +340,8 @@ class TestReaper(unittest.TestCase):
self.assertEqual(3, len(mock_calls))
for call_args in mock_calls:
_args, kwargs = call_args
self.assertEqual(kwargs['headers'][POLICY_INDEX],
self.assertEqual(kwargs['headers']
['X-Backend-Storage-Policy-Index'],
policy.idx)
self.assertEquals(mocks['direct_delete_container'].call_count, 3)
@ -542,7 +543,7 @@ class TestReaper(unittest.TestCase):
with patch('swift.account.reaper.random.random', fake_random):
try:
r.run_forever()
except Exception, err:
except Exception as err:
pass
self.assertEqual(self.val, 1)
self.assertEqual(str(err), 'exit')

View File

@ -34,7 +34,7 @@ from swift.account.server import AccountController
from swift.common.utils import normalize_timestamp, replication, public
from swift.common.request_helpers import get_sys_meta_prefix
from test.unit import patch_policies
from swift.common.storage_policy import StoragePolicy, POLICIES, POLICY_INDEX
from swift.common.storage_policy import StoragePolicy, POLICIES
@patch_policies
@ -1723,7 +1723,7 @@ class TestAccountController(unittest.TestCase):
'X-Delete-Timestamp': '0',
'X-Object-Count': '2',
'X-Bytes-Used': '4',
POLICY_INDEX: policy.idx,
'X-Backend-Storage-Policy-Index': policy.idx,
})
resp = req.get_response(self.controller)
self.assertEqual(resp.status_int, 201)
@ -1780,7 +1780,7 @@ class TestAccountController(unittest.TestCase):
'X-Delete-Timestamp': '0',
'X-Object-Count': '2',
'X-Bytes-Used': '4',
POLICY_INDEX: policy.idx,
'X-Backend-Storage-Policy-Index': policy.idx,
})
resp = req.get_response(self.controller)
self.assertEqual(resp.status_int, 201)
@ -1812,7 +1812,7 @@ class TestAccountController(unittest.TestCase):
'X-Delete-Timestamp': '0',
'X-Object-Count': count,
'X-Bytes-Used': count,
POLICY_INDEX: policy.idx,
'X-Backend-Storage-Policy-Index': policy.idx,
})
resp = req.get_response(self.controller)
self.assertEqual(resp.status_int, 201)

View File

@ -79,7 +79,7 @@ class TestCliInfoBase(unittest.TestCase):
def assertRaisesMessage(self, exc, msg, func, *args, **kwargs):
try:
func(*args, **kwargs)
except Exception, e:
except Exception as e:
self.assertTrue(msg in str(e),
"Expected %r in %r" % (msg, str(e)))
self.assertTrue(isinstance(e, exc),

View File

@ -258,3 +258,48 @@ class TestReconCommands(unittest.TestCase):
self.assertTrue("1/2 hosts matched" in output)
self.assertTrue("http://10.2.2.2:10000/recon/swiftconfmd5 (bogus) "
"doesn't match on disk md5sum" in output)
def test_object_auditor_check(self):
# Recon middleware response from an object server
def dummy_request(*args, **kwargs):
values = {
'passes': 0, 'errors': 0, 'audit_time': 0,
'start_time': 0, 'quarantined': 0, 'bytes_processed': 0}
return [('http://127.0.0.1:6010/recon/auditor/object', {
'object_auditor_stats_ALL': values,
'object_auditor_stats_ZBF': values,
}, 200)]
response = {}
def catch_print(computed):
response[computed.get('name')] = computed
cli = recon.SwiftRecon()
cli.pool.imap = dummy_request
cli._print_stats = catch_print
cli.object_auditor_check([('127.0.0.1', 6010)])
# Now check that output contains all keys and names
keys = ['average', 'number_none', 'high',
'reported', 'low', 'total', 'perc_none']
names = [
'ALL_audit_time_last_path',
'ALL_quarantined_last_path',
'ALL_errors_last_path',
'ALL_passes_last_path',
'ALL_bytes_processed_last_path',
'ZBF_audit_time_last_path',
'ZBF_quarantined_last_path',
'ZBF_errors_last_path',
'ZBF_bytes_processed_last_path'
]
for name in names:
computed = response.get(name)
self.assertTrue(computed)
for key in keys:
self.assertTrue(key in computed)

View File

@ -662,15 +662,23 @@ class TestReconSuccess(TestCase):
"files_processed": 2310,
"quarantined": 0}})
def test_get_auditor_info_object_once(self):
def test_get_auditor_info_object_parallel_once(self):
from_cache_response = {
"object_auditor_stats_ALL": {'disk1disk2': {
"audit_time": 115.14418768882751,
"bytes_processed": 234660,
"completed": 115.4512460231781,
"errors": 0,
"files_processed": 2310,
"quarantined": 0}},
"object_auditor_stats_ALL": {
'disk1': {
"audit_time": 115.14418768882751,
"bytes_processed": 234660,
"completed": 115.4512460231781,
"errors": 0,
"files_processed": 2310,
"quarantined": 0},
'disk2': {
"audit_time": 115,
"bytes_processed": 234660,
"completed": 115,
"errors": 0,
"files_processed": 2310,
"quarantined": 0}},
"object_auditor_stats_ZBF": {'disk1disk2': {
"audit_time": 45.877294063568115,
"bytes_processed": 0,
@ -686,13 +694,21 @@ class TestReconSuccess(TestCase):
'object_auditor_stats_ZBF'],
'/var/cache/swift/object.recon'), {})])
self.assertEquals(rv, {
"object_auditor_stats_ALL": {'disk1disk2': {
"audit_time": 115.14418768882751,
"bytes_processed": 234660,
"completed": 115.4512460231781,
"errors": 0,
"files_processed": 2310,
"quarantined": 0}},
"object_auditor_stats_ALL": {
'disk1': {
"audit_time": 115.14418768882751,
"bytes_processed": 234660,
"completed": 115.4512460231781,
"errors": 0,
"files_processed": 2310,
"quarantined": 0},
'disk2': {
"audit_time": 115,
"bytes_processed": 234660,
"completed": 115,
"errors": 0,
"files_processed": 2310,
"quarantined": 0}},
"object_auditor_stats_ZBF": {'disk1disk2': {
"audit_time": 45.877294063568115,
"bytes_processed": 0,

View File

@ -821,8 +821,8 @@ class TestSloGetManifest(SloTestCase):
headers)
try:
resp_data = json.loads(body)
except json.JSONDecodeError:
resp_data = None
except ValueError:
self.fail("Invalid JSON in manifest GET: %r" % body)
self.assertEqual(
resp_data,

View File

@ -27,7 +27,7 @@ from swift.common import direct_client
from swift.common.exceptions import ClientException
from swift.common.utils import json, Timestamp
from swift.common.swob import HeaderKeyDict, RESPONSE_REASONS
from swift.common.storage_policy import POLICY_INDEX, POLICIES
from swift.common.storage_policy import POLICIES
from test.unit import patch_policies
@ -134,9 +134,11 @@ class TestDirectClient(unittest.TestCase):
for add_ts in (True, False):
now = time.time()
headers = direct_client.gen_headers(
{POLICY_INDEX: policy.idx}, add_ts=add_ts)
{'X-Backend-Storage-Policy-Index': policy.idx},
add_ts=add_ts)
self.assertEqual(headers['user-agent'], stub_user_agent)
self.assertEqual(headers[POLICY_INDEX], str(policy.idx))
self.assertEqual(headers['X-Backend-Storage-Policy-Index'],
str(policy.idx))
expected_header_count = 2
if add_ts:
expected_header_count += 1

View File

@ -23,9 +23,11 @@ from textwrap import dedent
import os
from test.unit import FakeLogger
import eventlet
from eventlet.green import urllib2
from swift.common import internal_client
from swift.common import swob
from swift.common.storage_policy import StoragePolicy
from test.unit import with_tempdir, write_fake_ring, patch_policies
from test.unit.common.middleware.helpers import FakeSwift
@ -202,7 +204,6 @@ class TestCompressingfileReader(unittest.TestCase):
class TestInternalClient(unittest.TestCase):
@patch_policies(legacy_only=True)
@mock.patch('swift.common.utils.HASH_PATH_SUFFIX', new='endcap')
@with_tempdir
def test_load_from_config(self, tempdir):
@ -232,7 +233,8 @@ class TestInternalClient(unittest.TestCase):
write_fake_ring(container_ring_path)
object_ring_path = os.path.join(tempdir, 'object.ring.gz')
write_fake_ring(object_ring_path)
client = internal_client.InternalClient(conf_path, 'test', 1)
with patch_policies([StoragePolicy(0, 'legacy', True)]):
client = internal_client.InternalClient(conf_path, 'test', 1)
self.assertEqual(client.account_ring, client.app.app.app.account_ring)
self.assertEqual(client.account_ring.serialized_path,
account_ring_path)
@ -1130,6 +1132,7 @@ class TestSimpleClient(unittest.TestCase):
logger = FakeLogger()
retval = sc.retry_request(
'GET', headers={'content-length': '123'}, logger=logger)
self.assertEqual(urlopen.call_count, 1)
request.assert_called_with('http://127.0.0.1?format=json',
headers={'content-length': '123'},
data=None)
@ -1181,29 +1184,107 @@ class TestSimpleClient(unittest.TestCase):
def test_get_with_retries_all_failed(self, request, urlopen):
# Simulate a failing request, ensure retries done
request.return_value.get_type.return_value = "http"
request.side_effect = urllib2.URLError('')
urlopen.return_value.read.return_value = ''
urlopen.side_effect = urllib2.URLError('')
sc = internal_client.SimpleClient(url='http://127.0.0.1', retries=1)
self.assertRaises(urllib2.URLError, sc.retry_request, 'GET')
with mock.patch('swift.common.internal_client.sleep') as mock_sleep:
self.assertRaises(urllib2.URLError, sc.retry_request, 'GET')
self.assertEqual(mock_sleep.call_count, 1)
self.assertEqual(request.call_count, 2)
self.assertEqual(urlopen.call_count, 2)
@mock.patch('eventlet.green.urllib2.urlopen')
@mock.patch('eventlet.green.urllib2.Request')
def test_get_with_retries(self, request, urlopen):
# First request fails, retry successful
request.return_value.get_type.return_value = "http"
urlopen.return_value.read.return_value = ''
req = urllib2.Request('http://127.0.0.1', method='GET')
request.side_effect = [urllib2.URLError(''), req]
mock_resp = mock.MagicMock()
mock_resp.read.return_value = ''
urlopen.side_effect = [urllib2.URLError(''), mock_resp]
sc = internal_client.SimpleClient(url='http://127.0.0.1', retries=1,
token='token')
retval = sc.retry_request('GET')
self.assertEqual(request.call_count, 3)
with mock.patch('swift.common.internal_client.sleep') as mock_sleep:
retval = sc.retry_request('GET')
self.assertEqual(mock_sleep.call_count, 1)
self.assertEqual(request.call_count, 2)
self.assertEqual(urlopen.call_count, 2)
request.assert_called_with('http://127.0.0.1?format=json', data=None,
headers={'X-Auth-Token': 'token'})
self.assertEqual([None, None], retval)
@mock.patch('eventlet.green.urllib2.urlopen')
def test_get_with_retries_param(self, mock_urlopen):
mock_response = mock.MagicMock()
mock_response.read.return_value = ''
mock_urlopen.side_effect = internal_client.httplib.BadStatusLine('')
c = internal_client.SimpleClient(url='http://127.0.0.1', token='token')
self.assertEqual(c.retries, 5)
# first without retries param
with mock.patch('swift.common.internal_client.sleep') as mock_sleep:
self.assertRaises(internal_client.httplib.BadStatusLine,
c.retry_request, 'GET')
self.assertEqual(mock_sleep.call_count, 5)
self.assertEqual(mock_urlopen.call_count, 6)
# then with retries param
mock_urlopen.reset_mock()
with mock.patch('swift.common.internal_client.sleep') as mock_sleep:
self.assertRaises(internal_client.httplib.BadStatusLine,
c.retry_request, 'GET', retries=2)
self.assertEqual(mock_sleep.call_count, 2)
self.assertEqual(mock_urlopen.call_count, 3)
# and this time with a real response
mock_urlopen.reset_mock()
mock_urlopen.side_effect = [internal_client.httplib.BadStatusLine(''),
mock_response]
with mock.patch('swift.common.internal_client.sleep') as mock_sleep:
retval = c.retry_request('GET', retries=1)
self.assertEqual(mock_sleep.call_count, 1)
self.assertEqual(mock_urlopen.call_count, 2)
self.assertEqual([None, None], retval)
def test_proxy(self):
running = True
def handle(sock):
while running:
try:
with eventlet.Timeout(0.1):
(conn, addr) = sock.accept()
except eventlet.Timeout:
continue
else:
conn.send('HTTP/1.1 503 Server Error')
conn.close()
sock.close()
sock = eventlet.listen(('', 0))
port = sock.getsockname()[1]
proxy = 'http://127.0.0.1:%s' % port
url = 'https://127.0.0.1:1/a'
server = eventlet.spawn(handle, sock)
try:
headers = {'Content-Length': '0'}
with mock.patch('swift.common.internal_client.sleep'):
try:
internal_client.put_object(
url, container='c', name='o1', headers=headers,
contents='', proxy=proxy, timeout=0.1, retries=0)
except urllib2.HTTPError as e:
self.assertEqual(e.code, 503)
except urllib2.URLError as e:
if 'ECONNREFUSED' in str(e):
self.fail(
"Got %s which probably means the http proxy "
"settings were not used" % e)
else:
raise e
else:
self.fail('Unexpected successful response')
finally:
running = False
server.wait()
if __name__ == '__main__':
unittest.main()

View File

@ -3050,6 +3050,20 @@ class TestStatsdLogging(unittest.TestCase):
self.assertEquals(mock_controller.args[0], 'METHOD.timing')
self.assert_(mock_controller.args[1] > 0)
mock_controller = MockController(412)
METHOD(mock_controller)
self.assertEquals(len(mock_controller.args), 2)
self.assertEquals(mock_controller.called, 'timing')
self.assertEquals(mock_controller.args[0], 'METHOD.timing')
self.assert_(mock_controller.args[1] > 0)
mock_controller = MockController(416)
METHOD(mock_controller)
self.assertEquals(len(mock_controller.args), 2)
self.assertEquals(mock_controller.called, 'timing')
self.assertEquals(mock_controller.args[0], 'METHOD.timing')
self.assert_(mock_controller.args[1] > 0)
mock_controller = MockController(401)
METHOD(mock_controller)
self.assertEquals(len(mock_controller.args), 2)
@ -3683,8 +3697,112 @@ class TestThreadpool(unittest.TestCase):
class TestAuditLocationGenerator(unittest.TestCase):
def test_drive_tree_access(self):
orig_listdir = utils.listdir
def _mock_utils_listdir(path):
if 'bad_part' in path:
raise OSError(errno.EACCES)
elif 'bad_suffix' in path:
raise OSError(errno.EACCES)
elif 'bad_hash' in path:
raise OSError(errno.EACCES)
else:
return orig_listdir(path)
#Check Raise on Bad partition
tmpdir = mkdtemp()
data = os.path.join(tmpdir, "drive", "data")
os.makedirs(data)
obj_path = os.path.join(data, "bad_part")
with open(obj_path, "w"):
pass
part1 = os.path.join(data, "partition1")
os.makedirs(part1)
part2 = os.path.join(data, "partition2")
os.makedirs(part2)
with patch('swift.common.utils.listdir', _mock_utils_listdir):
audit = lambda: list(utils.audit_location_generator(
tmpdir, "data", mount_check=False))
self.assertRaises(OSError, audit)
#Check Raise on Bad Suffix
tmpdir = mkdtemp()
data = os.path.join(tmpdir, "drive", "data")
os.makedirs(data)
part1 = os.path.join(data, "partition1")
os.makedirs(part1)
part2 = os.path.join(data, "partition2")
os.makedirs(part2)
obj_path = os.path.join(part1, "bad_suffix")
with open(obj_path, 'w'):
pass
suffix = os.path.join(part2, "suffix")
os.makedirs(suffix)
with patch('swift.common.utils.listdir', _mock_utils_listdir):
audit = lambda: list(utils.audit_location_generator(
tmpdir, "data", mount_check=False))
self.assertRaises(OSError, audit)
#Check Raise on Bad Hash
tmpdir = mkdtemp()
data = os.path.join(tmpdir, "drive", "data")
os.makedirs(data)
part1 = os.path.join(data, "partition1")
os.makedirs(part1)
suffix = os.path.join(part1, "suffix")
os.makedirs(suffix)
hash1 = os.path.join(suffix, "hash1")
os.makedirs(hash1)
obj_path = os.path.join(suffix, "bad_hash")
with open(obj_path, 'w'):
pass
with patch('swift.common.utils.listdir', _mock_utils_listdir):
audit = lambda: list(utils.audit_location_generator(
tmpdir, "data", mount_check=False))
self.assertRaises(OSError, audit)
def test_non_dir_drive(self):
with temptree([]) as tmpdir:
logger = FakeLogger()
data = os.path.join(tmpdir, "drive", "data")
os.makedirs(data)
#Create a file, that represents a non-dir drive
open(os.path.join(tmpdir, 'asdf'), 'w')
locations = utils.audit_location_generator(
tmpdir, "data", mount_check=False, logger=logger
)
self.assertEqual(list(locations), [])
self.assertEqual(1, len(logger.get_lines_for_level('warning')))
#Test without the logger
locations = utils.audit_location_generator(
tmpdir, "data", mount_check=False
)
self.assertEqual(list(locations), [])
def test_mount_check_drive(self):
with temptree([]) as tmpdir:
logger = FakeLogger()
data = os.path.join(tmpdir, "drive", "data")
os.makedirs(data)
#Create a file, that represents a non-dir drive
open(os.path.join(tmpdir, 'asdf'), 'w')
locations = utils.audit_location_generator(
tmpdir, "data", mount_check=True, logger=logger
)
self.assertEqual(list(locations), [])
self.assertEqual(2, len(logger.get_lines_for_level('warning')))
#Test without the logger
locations = utils.audit_location_generator(
tmpdir, "data", mount_check=True
)
self.assertEqual(list(locations), [])
def test_non_dir_contents(self):
with temptree([]) as tmpdir:
logger = FakeLogger()
data = os.path.join(tmpdir, "drive", "data")
os.makedirs(data)
with open(os.path.join(data, "partition1"), "w"):
@ -3698,31 +3816,49 @@ class TestAuditLocationGenerator(unittest.TestCase):
with open(os.path.join(suffix, "hash1"), "w"):
pass
locations = utils.audit_location_generator(
tmpdir, "data", mount_check=False
tmpdir, "data", mount_check=False, logger=logger
)
self.assertEqual(list(locations), [])
def test_find_objects(self):
with temptree([]) as tmpdir:
expected_objs = list()
logger = FakeLogger()
data = os.path.join(tmpdir, "drive", "data")
os.makedirs(data)
#Create a file, that represents a non-dir drive
open(os.path.join(tmpdir, 'asdf'), 'w')
partition = os.path.join(data, "partition1")
os.makedirs(partition)
suffix = os.path.join(partition, "suffix")
os.makedirs(suffix)
hash_path = os.path.join(suffix, "hash")
os.makedirs(hash_path)
obj_path = os.path.join(hash_path, "obj1.db")
with open(obj_path, "w"):
pass
expected_objs.append((obj_path, 'drive', 'partition1'))
partition = os.path.join(data, "partition2")
os.makedirs(partition)
suffix = os.path.join(partition, "suffix2")
os.makedirs(suffix)
hash_path = os.path.join(suffix, "hash2")
os.makedirs(hash_path)
obj_path = os.path.join(hash_path, "obj1.dat")
obj_path = os.path.join(hash_path, "obj2.db")
with open(obj_path, "w"):
pass
expected_objs.append((obj_path, 'drive', 'partition2'))
locations = utils.audit_location_generator(
tmpdir, "data", ".dat", mount_check=False
tmpdir, "data", mount_check=False, logger=logger
)
self.assertEqual(list(locations),
[(obj_path, "drive", "partition2")])
got_objs = list(locations)
self.assertEqual(len(got_objs), len(expected_objs))
self.assertEqual(sorted(got_objs), sorted(expected_objs))
self.assertEqual(1, len(logger.get_lines_for_level('warning')))
def test_ignore_metadata(self):
with temptree([]) as tmpdir:
logger = FakeLogger()
data = os.path.join(tmpdir, "drive", "data")
os.makedirs(data)
partition = os.path.join(data, "partition2")
@ -3738,7 +3874,7 @@ class TestAuditLocationGenerator(unittest.TestCase):
with open(meta_path, "w"):
pass
locations = utils.audit_location_generator(
tmpdir, "data", ".dat", mount_check=False
tmpdir, "data", ".dat", mount_check=False, logger=logger
)
self.assertEqual(list(locations),
[(obj_path, "drive", "partition2")])

View File

@ -40,8 +40,7 @@ import swift.container.server as container_server
import swift.account.server as account_server
from swift.common.swob import Request
from swift.common import wsgi, utils
from swift.common.storage_policy import StoragePolicy, \
StoragePolicyCollection
from swift.common.storage_policy import POLICIES
from test.unit import temptree, with_tempdir, write_fake_ring, patch_policies
@ -51,16 +50,15 @@ from paste.deploy import loadwsgi
def _fake_rings(tmpdir):
write_fake_ring(os.path.join(tmpdir, 'account.ring.gz'))
write_fake_ring(os.path.join(tmpdir, 'container.ring.gz'))
# Some storage-policy-specific fake rings.
policy = [StoragePolicy(0, 'zero'),
StoragePolicy(1, 'one', is_default=True)]
policies = StoragePolicyCollection(policy)
for pol in policies:
for policy in POLICIES:
obj_ring_path = \
os.path.join(tmpdir, pol.ring_name + '.ring.gz')
os.path.join(tmpdir, policy.ring_name + '.ring.gz')
write_fake_ring(obj_ring_path)
# make sure there's no other ring cached on this policy
policy.object_ring = None
@patch_policies
class TestWSGI(unittest.TestCase):
"""Tests for swift.common.wsgi"""
@ -757,6 +755,7 @@ class TestPipelineWrapper(unittest.TestCase):
"<unknown> catch_errors tempurl proxy-server")
@patch_policies
@mock.patch('swift.common.utils.HASH_PATH_SUFFIX', new='endcap')
class TestPipelineModification(unittest.TestCase):
def pipeline_modules(self, app):
@ -1012,7 +1011,6 @@ class TestPipelineModification(unittest.TestCase):
'swift.common.middleware.dlo',
'swift.proxy.server'])
@patch_policies
@with_tempdir
def test_loadapp_proxy(self, tempdir):
conf_path = os.path.join(tempdir, 'proxy-server.conf')
@ -1034,24 +1032,23 @@ class TestPipelineModification(unittest.TestCase):
""" % tempdir
with open(conf_path, 'w') as f:
f.write(dedent(conf_body))
_fake_rings(tempdir)
account_ring_path = os.path.join(tempdir, 'account.ring.gz')
write_fake_ring(account_ring_path)
container_ring_path = os.path.join(tempdir, 'container.ring.gz')
write_fake_ring(container_ring_path)
object_ring_path = os.path.join(tempdir, 'object.ring.gz')
write_fake_ring(object_ring_path)
object_1_ring_path = os.path.join(tempdir, 'object-1.ring.gz')
write_fake_ring(object_1_ring_path)
object_ring_paths = {}
for policy in POLICIES:
object_ring_paths[int(policy)] = os.path.join(
tempdir, policy.ring_name + '.ring.gz')
app = wsgi.loadapp(conf_path)
proxy_app = app.app.app.app.app
self.assertEqual(proxy_app.account_ring.serialized_path,
account_ring_path)
self.assertEqual(proxy_app.container_ring.serialized_path,
container_ring_path)
self.assertEqual(proxy_app.get_object_ring(0).serialized_path,
object_ring_path)
self.assertEqual(proxy_app.get_object_ring(1).serialized_path,
object_1_ring_path)
for policy_index, expected_path in object_ring_paths.items():
object_ring = proxy_app.get_object_ring(policy_index)
self.assertEqual(expected_path, object_ring.serialized_path)
@with_tempdir
def test_loadapp_storage(self, tempdir):

View File

@ -38,8 +38,7 @@ from swift.common import constraints
from swift.common.utils import (Timestamp, mkdirs, public, replication,
lock_parent_directory, json)
from test.unit import fake_http_connect
from swift.common.storage_policy import (POLICY_INDEX, POLICIES,
StoragePolicy)
from swift.common.storage_policy import (POLICIES, StoragePolicy)
from swift.common.request_helpers import get_sys_meta_prefix
from test.unit import patch_policies
@ -87,7 +86,8 @@ class TestContainerController(unittest.TestCase):
req = Request.blank(req.path, method='HEAD')
resp = req.get_response(self.controller)
self.assertEqual(204, resp.status_int)
self.assertEqual(str(policy_index), resp.headers[POLICY_INDEX])
self.assertEqual(str(policy_index),
resp.headers['X-Backend-Storage-Policy-Index'])
def test_get_and_validate_policy_index(self):
# no policy is OK
@ -100,7 +100,7 @@ class TestContainerController(unittest.TestCase):
req = Request.blank('/sda1/p/a/c_%s' % policy, method='PUT',
headers={
'X-Timestamp': '0',
POLICY_INDEX: policy
'X-Backend-Storage-Policy-Index': policy
})
resp = req.get_response(self.controller)
self.assertEqual(400, resp.status_int)
@ -111,7 +111,8 @@ class TestContainerController(unittest.TestCase):
req = Request.blank('/sda1/p/a/c_%s' % policy.name, method='PUT',
headers={
'X-Timestamp': '0',
POLICY_INDEX: policy.idx,
'X-Backend-Storage-Policy-Index':
policy.idx,
})
self._check_put_container_storage_policy(req, policy.idx)
@ -204,7 +205,8 @@ class TestContainerController(unittest.TestCase):
Timestamp(start).normal)
# backend headers
self.assertEqual(int(response.headers[POLICY_INDEX]),
self.assertEqual(int(response.headers
['X-Backend-Storage-Policy-Index']),
int(POLICIES.default))
self.assert_(
Timestamp(response.headers['x-backend-timestamp']) >= start)
@ -219,7 +221,8 @@ class TestContainerController(unittest.TestCase):
req = Request.blank('/sda1/p/a/c', method='HEAD')
resp = req.get_response(self.controller)
self.assertEqual(resp.status_int, 404)
self.assertEqual(int(resp.headers[POLICY_INDEX]), 0)
self.assertEqual(int(resp.headers['X-Backend-Storage-Policy-Index']),
0)
self.assertEqual(resp.headers['x-backend-timestamp'],
Timestamp(0).internal)
self.assertEqual(resp.headers['x-backend-put-timestamp'],
@ -252,7 +255,8 @@ class TestContainerController(unittest.TestCase):
resp = req.get_response(self.controller)
self.assertEqual(resp.status_int, 404)
# backend headers
self.assertEqual(int(resp.headers[POLICY_INDEX]),
self.assertEqual(int(resp.headers[
'X-Backend-Storage-Policy-Index']),
int(POLICIES.default))
self.assert_(Timestamp(resp.headers['x-backend-timestamp']) >=
Timestamp(request_method_times['PUT']))
@ -363,14 +367,16 @@ class TestContainerController(unittest.TestCase):
# Set metadata header
req = Request.blank('/sda1/p/a/c', method='PUT',
headers={'X-Timestamp': Timestamp(1).internal,
POLICY_INDEX: policy.idx})
'X-Backend-Storage-Policy-Index':
policy.idx})
resp = req.get_response(self.controller)
self.assertEquals(resp.status_int, 201)
# now make sure we read it back
req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'GET'})
resp = req.get_response(self.controller)
self.assertEquals(resp.headers.get(POLICY_INDEX), str(policy.idx))
self.assertEquals(resp.headers.get('X-Backend-Storage-Policy-Index'),
str(policy.idx))
def test_PUT_no_policy_specified(self):
# Set metadata header
@ -382,14 +388,14 @@ class TestContainerController(unittest.TestCase):
# now make sure the default was used (pol 1)
req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'GET'})
resp = req.get_response(self.controller)
self.assertEquals(resp.headers.get(POLICY_INDEX),
self.assertEquals(resp.headers.get('X-Backend-Storage-Policy-Index'),
str(POLICIES.default.idx))
def test_PUT_bad_policy_specified(self):
# Set metadata header
req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'PUT'},
headers={'X-Timestamp': Timestamp(1).internal,
POLICY_INDEX: 'nada'})
'X-Backend-Storage-Policy-Index': 'nada'})
resp = req.get_response(self.controller)
# make sure we get bad response
self.assertEquals(resp.status_int, 400)
@ -400,20 +406,21 @@ class TestContainerController(unittest.TestCase):
# Set metadata header
req = Request.blank('/sda1/p/a/c', method='PUT', headers={
'X-Timestamp': ts.next(),
POLICY_INDEX: policy.idx})
'X-Backend-Storage-Policy-Index': policy.idx})
resp = req.get_response(self.controller)
self.assertEquals(resp.status_int, 201)
req = Request.blank('/sda1/p/a/c')
resp = req.get_response(self.controller)
self.assertEquals(resp.status_int, 204)
# make sure we get the right index back
self.assertEquals(resp.headers.get(POLICY_INDEX), str(policy.idx))
self.assertEquals(resp.headers.get('X-Backend-Storage-Policy-Index'),
str(policy.idx))
# now try to update w/o changing the policy
for method in ('POST', 'PUT'):
req = Request.blank('/sda1/p/a/c', method=method, headers={
'X-Timestamp': ts.next(),
POLICY_INDEX: policy.idx
'X-Backend-Storage-Policy-Index': policy.idx
})
resp = req.get_response(self.controller)
self.assertEquals(resp.status_int // 100, 2)
@ -421,7 +428,8 @@ class TestContainerController(unittest.TestCase):
req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'GET'})
resp = req.get_response(self.controller)
self.assertEquals(resp.status_int, 204)
self.assertEquals(resp.headers.get(POLICY_INDEX), str(policy.idx))
self.assertEquals(resp.headers.get('X-Backend-Storage-Policy-Index'),
str(policy.idx))
def test_PUT_bad_policy_change(self):
ts = (Timestamp(t).internal for t in itertools.count(time.time()))
@ -429,21 +437,22 @@ class TestContainerController(unittest.TestCase):
# Set metadata header
req = Request.blank('/sda1/p/a/c', method='PUT', headers={
'X-Timestamp': ts.next(),
POLICY_INDEX: policy.idx})
'X-Backend-Storage-Policy-Index': policy.idx})
resp = req.get_response(self.controller)
self.assertEquals(resp.status_int, 201)
req = Request.blank('/sda1/p/a/c')
resp = req.get_response(self.controller)
self.assertEquals(resp.status_int, 204)
# make sure we get the right index back
self.assertEquals(resp.headers.get(POLICY_INDEX), str(policy.idx))
self.assertEquals(resp.headers.get('X-Backend-Storage-Policy-Index'),
str(policy.idx))
other_policies = [p for p in POLICIES if p != policy]
for other_policy in other_policies:
# now try to change it and make sure we get a conflict
req = Request.blank('/sda1/p/a/c', method='PUT', headers={
'X-Timestamp': ts.next(),
POLICY_INDEX: other_policy.idx
'X-Backend-Storage-Policy-Index': other_policy.idx
})
resp = req.get_response(self.controller)
self.assertEquals(resp.status_int, 409)
@ -453,28 +462,30 @@ class TestContainerController(unittest.TestCase):
resp = req.get_response(self.controller)
self.assertEquals(resp.status_int, 204)
# make sure we get the right index back
self.assertEquals(resp.headers.get(POLICY_INDEX), str(policy.idx))
self.assertEquals(resp.headers.get('X-Backend-Storage-Policy-Index'),
str(policy.idx))
def test_POST_ignores_policy_change(self):
ts = (Timestamp(t).internal for t in itertools.count(time.time()))
policy = random.choice(list(POLICIES))
req = Request.blank('/sda1/p/a/c', method='PUT', headers={
'X-Timestamp': ts.next(),
POLICY_INDEX: policy.idx})
'X-Backend-Storage-Policy-Index': policy.idx})
resp = req.get_response(self.controller)
self.assertEquals(resp.status_int, 201)
req = Request.blank('/sda1/p/a/c')
resp = req.get_response(self.controller)
self.assertEquals(resp.status_int, 204)
# make sure we get the right index back
self.assertEquals(resp.headers.get(POLICY_INDEX), str(policy.idx))
self.assertEquals(resp.headers.get('X-Backend-Storage-Policy-Index'),
str(policy.idx))
other_policies = [p for p in POLICIES if p != policy]
for other_policy in other_policies:
# now try to change it and make sure we get a conflict
req = Request.blank('/sda1/p/a/c', method='POST', headers={
'X-Timestamp': ts.next(),
POLICY_INDEX: other_policy.idx
'X-Backend-Storage-Policy-Index': other_policy.idx
})
resp = req.get_response(self.controller)
# valid request
@ -485,7 +496,9 @@ class TestContainerController(unittest.TestCase):
resp = req.get_response(self.controller)
self.assertEquals(resp.status_int, 204)
# make sure we get the right index back
self.assertEquals(resp.headers.get(POLICY_INDEX), str(policy.idx))
self.assertEquals(resp.headers.get
('X-Backend-Storage-Policy-Index'),
str(policy.idx))
def test_PUT_no_policy_for_existing_default(self):
ts = (Timestamp(t).internal for t in
@ -501,7 +514,7 @@ class TestContainerController(unittest.TestCase):
req = Request.blank('/sda1/p/a/c', method='HEAD')
resp = req.get_response(self.controller)
self.assertEqual(resp.status_int, 204)
self.assertEqual(resp.headers[POLICY_INDEX],
self.assertEqual(resp.headers['X-Backend-Storage-Policy-Index'],
str(POLICIES.default.idx))
# put again without specifying the storage policy
@ -515,7 +528,7 @@ class TestContainerController(unittest.TestCase):
req = Request.blank('/sda1/p/a/c', method='HEAD')
resp = req.get_response(self.controller)
self.assertEqual(resp.status_int, 204)
self.assertEqual(resp.headers[POLICY_INDEX],
self.assertEqual(resp.headers['X-Backend-Storage-Policy-Index'],
str(POLICIES.default.idx))
def test_PUT_proxy_default_no_policy_for_existing_default(self):
@ -537,7 +550,7 @@ class TestContainerController(unittest.TestCase):
req = Request.blank('/sda1/p/a/c', method='HEAD')
resp = req.get_response(self.controller)
self.assertEqual(resp.status_int, 204)
self.assertEqual(int(resp.headers[POLICY_INDEX]),
self.assertEqual(int(resp.headers['X-Backend-Storage-Policy-Index']),
int(proxy_default))
# put again without proxy specifying the different default
@ -552,7 +565,7 @@ class TestContainerController(unittest.TestCase):
req = Request.blank('/sda1/p/a/c', method='HEAD')
resp = req.get_response(self.controller)
self.assertEqual(resp.status_int, 204)
self.assertEqual(int(resp.headers[POLICY_INDEX]),
self.assertEqual(int(resp.headers['X-Backend-Storage-Policy-Index']),
int(proxy_default))
def test_PUT_no_policy_for_existing_non_default(self):
@ -561,7 +574,7 @@ class TestContainerController(unittest.TestCase):
# create a container with the non-default storage policy
req = Request.blank('/sda1/p/a/c', method='PUT', headers={
'X-Timestamp': ts.next(),
POLICY_INDEX: non_default_policy.idx,
'X-Backend-Storage-Policy-Index': non_default_policy.idx,
})
resp = req.get_response(self.controller)
self.assertEqual(resp.status_int, 201) # sanity check
@ -570,7 +583,7 @@ class TestContainerController(unittest.TestCase):
req = Request.blank('/sda1/p/a/c', method='HEAD')
resp = req.get_response(self.controller)
self.assertEqual(resp.status_int, 204)
self.assertEqual(resp.headers[POLICY_INDEX],
self.assertEqual(resp.headers['X-Backend-Storage-Policy-Index'],
str(non_default_policy.idx))
# put again without specifiying the storage policy
@ -584,7 +597,7 @@ class TestContainerController(unittest.TestCase):
req = Request.blank('/sda1/p/a/c', method='HEAD')
resp = req.get_response(self.controller)
self.assertEqual(resp.status_int, 204)
self.assertEqual(resp.headers[POLICY_INDEX],
self.assertEqual(resp.headers['X-Backend-Storage-Policy-Index'],
str(non_default_policy.idx))
def test_PUT_GET_metadata(self):
@ -1244,7 +1257,7 @@ class TestContainerController(unittest.TestCase):
req = Request.blank(
'/sda1/p/a/c', method='PUT',
headers={'X-Timestamp': ts.next(),
POLICY_INDEX: policy.idx})
'X-Backend-Storage-Policy-Index': policy.idx})
resp = req.get_response(self.controller)
self.assertEqual(resp.status_int, 201) # sanity check
@ -1261,14 +1274,14 @@ class TestContainerController(unittest.TestCase):
# state, so changing the policy index is perfectly acceptable
req = Request.blank('/sda1/p/a/c', method='PUT', headers={
'X-Timestamp': ts.next(),
POLICY_INDEX: other_policy.idx})
'X-Backend-Storage-Policy-Index': other_policy.idx})
resp = req.get_response(self.controller)
self.assertEqual(resp.status_int, 201) # sanity check
req = Request.blank(
'/sda1/p/a/c', method='HEAD')
resp = req.get_response(self.controller)
self.assertEqual(resp.headers[POLICY_INDEX],
self.assertEqual(resp.headers['X-Backend-Storage-Policy-Index'],
str(other_policy.idx))
def test_change_to_default_storage_policy_via_DELETE_then_PUT(self):
@ -1278,7 +1291,7 @@ class TestContainerController(unittest.TestCase):
if not p.is_default])
req = Request.blank('/sda1/p/a/c', method='PUT', headers={
'X-Timestamp': ts.next(),
POLICY_INDEX: non_default_policy.idx,
'X-Backend-Storage-Policy-Index': non_default_policy.idx,
})
resp = req.get_response(self.controller)
self.assertEqual(resp.status_int, 201) # sanity check
@ -1299,7 +1312,7 @@ class TestContainerController(unittest.TestCase):
req = Request.blank('/sda1/p/a/c', method='HEAD')
resp = req.get_response(self.controller)
self.assertEqual(resp.headers[POLICY_INDEX],
self.assertEqual(resp.headers['X-Backend-Storage-Policy-Index'],
str(POLICIES.default.idx))
def test_DELETE_object(self):
@ -1347,7 +1360,7 @@ class TestContainerController(unittest.TestCase):
req = Request.blank('/sda1/p/a/c', method='HEAD')
resp = req.get_response(self.controller)
self.assertEqual(resp.status_int, 204)
self.assertEqual(int(resp.headers[POLICY_INDEX]),
self.assertEqual(int(resp.headers['X-Backend-Storage-Policy-Index']),
int(POLICIES.default))
# create object
obj_timestamp = ts.next()
@ -2381,7 +2394,7 @@ class TestContainerController(unittest.TestCase):
'x-delete-timestamp': '0',
'x-object-count': 0,
'x-put-timestamp': Timestamp(12345).internal,
POLICY_INDEX: '%s' % POLICIES.default.idx,
'X-Backend-Storage-Policy-Index': '%s' % POLICIES.default.idx,
'referer': 'PUT http://localhost/sda1/p/a/c',
'user-agent': 'container-server %d' % os.getpid(),
'x-trans-id': '-'})})
@ -2399,7 +2412,7 @@ class TestContainerController(unittest.TestCase):
'x-delete-timestamp': '0',
'x-object-count': 0,
'x-put-timestamp': Timestamp(12345).internal,
POLICY_INDEX: '%s' % POLICIES.default.idx,
'X-Backend-Storage-Policy-Index': '%s' % POLICIES.default.idx,
'referer': 'PUT http://localhost/sda1/p/a/c',
'user-agent': 'container-server %d' % os.getpid(),
'x-trans-id': '-'})})
@ -2551,10 +2564,11 @@ class TestNonLegacyDefaultStoragePolicy(TestContainerController):
def _update_object_put_headers(self, req):
"""
Add policy index headers for containers created with default POLICY
Add policy index headers for containers created with default policy
- which in this TestCase is 1.
"""
req.headers[POLICY_INDEX] = str(POLICIES.default.idx)
req.headers['X-Backend-Storage-Policy-Index'] = \
str(POLICIES.default.idx)
if __name__ == '__main__':

View File

@ -23,7 +23,7 @@ from test.unit import FakeLogger
from swift.container import sync
from swift.common import utils
from swift.common.exceptions import ClientException
from swift.common.storage_policy import StoragePolicy, POLICY_INDEX
from swift.common.storage_policy import StoragePolicy
from test.unit import patch_policies
utils.HASH_PATH_SUFFIX = 'endcap'
@ -789,7 +789,8 @@ class TestContainerSync(unittest.TestCase):
def fake_direct_get_object(node, part, account, container, obj,
headers, resp_chunk_size=1):
self.assertEquals(headers[POLICY_INDEX], '0')
self.assertEquals(headers['X-Backend-Storage-Policy-Index'],
'0')
return ({'other-header': 'other header value',
'etag': '"etagvalue"', 'x-timestamp': '1.2',
'content-type': 'text/plain; swift_bytes=123'},
@ -807,7 +808,8 @@ class TestContainerSync(unittest.TestCase):
def fake_direct_get_object(node, part, account, container, obj,
headers, resp_chunk_size=1):
self.assertEquals(headers[POLICY_INDEX], '0')
self.assertEquals(headers['X-Backend-Storage-Policy-Index'],
'0')
return ({'date': 'date value',
'last-modified': 'last modified value',
'x-timestamp': '1.2',
@ -833,7 +835,8 @@ class TestContainerSync(unittest.TestCase):
def fake_direct_get_object(node, part, account, container, obj,
headers, resp_chunk_size=1):
self.assertEquals(headers[POLICY_INDEX], '0')
self.assertEquals(headers['X-Backend-Storage-Policy-Index'],
'0')
exc.append(Exception('test exception'))
raise exc[-1]
@ -854,7 +857,8 @@ class TestContainerSync(unittest.TestCase):
def fake_direct_get_object(node, part, account, container, obj,
headers, resp_chunk_size=1):
self.assertEquals(headers[POLICY_INDEX], '0')
self.assertEquals(headers['X-Backend-Storage-Policy-Index'],
'0')
if len(exc) == 0:
exc.append(Exception('test other exception'))
else:
@ -878,7 +882,8 @@ class TestContainerSync(unittest.TestCase):
def fake_direct_get_object(node, part, account, container, obj,
headers, resp_chunk_size=1):
self.assertEquals(headers[POLICY_INDEX], '0')
self.assertEquals(headers['X-Backend-Storage-Policy-Index'],
'0')
return ({'other-header': 'other header value',
'x-timestamp': '1.2', 'etag': '"etagvalue"'},
iter('contents'))

View File

@ -278,6 +278,24 @@ class TestAuditor(unittest.TestCase):
self.assertEquals(auditor_worker.stats_buckets[10240], 0)
self.assertEquals(auditor_worker.stats_buckets['OVER'], 1)
def test_object_run_logging(self):
logger = FakeLogger()
auditor_worker = auditor.AuditorWorker(self.conf, logger,
self.rcache, self.devices)
auditor_worker.audit_all_objects(device_dirs=['sda'])
log_lines = logger.get_lines_for_level('info')
self.assertTrue(len(log_lines) > 0)
self.assertTrue(log_lines[0].index('ALL - parallel, sda'))
logger = FakeLogger()
auditor_worker = auditor.AuditorWorker(self.conf, logger,
self.rcache, self.devices,
zero_byte_only_at_fps=50)
auditor_worker.audit_all_objects(device_dirs=['sda'])
log_lines = logger.get_lines_for_level('info')
self.assertTrue(len(log_lines) > 0)
self.assertTrue(log_lines[0].index('ZBF - sda'))
def test_object_run_once_no_sda(self):
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
self.rcache, self.devices)
@ -444,15 +462,14 @@ class TestAuditor(unittest.TestCase):
self.assertTrue(os.path.exists(ts_file_path))
def test_sleeper(self):
auditor.SLEEP_BETWEEN_AUDITS = 0.10
my_auditor = auditor.ObjectAuditor(self.conf)
start = time.time()
my_auditor._sleep()
delta_t = time.time() - start
self.assert_(delta_t > 0.08)
self.assert_(delta_t < 0.12)
with mock.patch(
'time.sleep', mock.MagicMock()) as mock_sleep:
auditor.SLEEP_BETWEEN_AUDITS = 0.10
my_auditor = auditor.ObjectAuditor(self.conf)
my_auditor._sleep()
mock_sleep.assert_called_with(auditor.SLEEP_BETWEEN_AUDITS)
def test_run_audit(self):
def test_run_parallel_audit(self):
class StopForever(Exception):
pass
@ -500,7 +517,9 @@ class TestAuditor(unittest.TestCase):
my_auditor = auditor.ObjectAuditor(dict(devices=self.devices,
mount_check='false',
zero_byte_files_per_second=89))
zero_byte_files_per_second=89,
concurrency=1))
mocker = ObjectAuditorMock()
my_auditor.logger.exception = mock.MagicMock()
real_audit_loop = my_auditor.audit_loop
@ -555,12 +574,17 @@ class TestAuditor(unittest.TestCase):
my_auditor._sleep = mocker.mock_sleep_continue
my_auditor.concurrency = 2
mocker.fork_called = 0
mocker.wait_called = 0
my_auditor.run_once()
# Fork is called 3 times since the zbf process is forked twice
self.assertEquals(mocker.fork_called, 3)
self.assertEquals(mocker.wait_called, 3)
# Fork is called no. of devices + (no. of devices)/2 + 1 times
# since zbf process is forked (no.of devices)/2 + 1 times
no_devices = len(os.listdir(self.devices))
self.assertEquals(mocker.fork_called, no_devices + no_devices / 2
+ 1)
self.assertEquals(mocker.wait_called, no_devices + no_devices / 2
+ 1)
finally:
os.fork = was_fork

View File

@ -31,7 +31,7 @@ from swift.common import utils
from swift.common.utils import hash_path, mkdirs, normalize_timestamp
from swift.common import ring
from swift.obj import diskfile, replicator as object_replicator
from swift.common.storage_policy import StoragePolicy, POLICY_INDEX, POLICIES
from swift.common.storage_policy import StoragePolicy, POLICIES
def _ips():
@ -706,7 +706,7 @@ class TestObjectReplicator(unittest.TestCase):
for job in jobs:
set_default(self)
ring = self.replicator.get_object_ring(job['policy_idx'])
self.headers[POLICY_INDEX] = job['policy_idx']
self.headers['X-Backend-Storage-Policy-Index'] = job['policy_idx']
self.replicator.update(job)
self.assertTrue(error in mock_logger.error.call_args[0][0])
self.assertTrue(expect in mock_logger.exception.call_args[0][0])
@ -792,7 +792,7 @@ class TestObjectReplicator(unittest.TestCase):
set_default(self)
# with only one set of headers make sure we speicy index 0 here
# as otherwise it may be different from earlier tests
self.headers[POLICY_INDEX] = 0
self.headers['X-Backend-Storage-Policy-Index'] = 0
self.replicator.update(repl_job)
reqs = []
for node in repl_job['nodes']:

View File

@ -44,7 +44,7 @@ from swift.common.utils import hash_path, mkdirs, normalize_timestamp, \
NullLogger, storage_directory, public, replication
from swift.common import constraints
from swift.common.swob import Request, HeaderKeyDict
from swift.common.storage_policy import POLICY_INDEX, POLICIES
from swift.common.storage_policy import POLICIES
from swift.common.exceptions import DiskFileDeviceUnavailable
@ -2311,7 +2311,8 @@ class TestObjectController(unittest.TestCase):
self.object_controller.async_update(
'PUT', 'a', 'c', 'o', '127.0.0.1:1234', 1, 'sdc1',
{'x-timestamp': '1', 'x-out': 'set',
POLICY_INDEX: policy.idx}, 'sda1', policy.idx)
'X-Backend-Storage-Policy-Index': policy.idx}, 'sda1',
policy.idx)
finally:
object_server.http_connect = orig_http_connect
self.assertEquals(
@ -2319,7 +2320,7 @@ class TestObjectController(unittest.TestCase):
['127.0.0.1', '1234', 'sdc1', 1, 'PUT', '/a/c/o', {
'x-timestamp': '1', 'x-out': 'set',
'user-agent': 'obj-server %s' % os.getpid(),
POLICY_INDEX: policy.idx}])
'X-Backend-Storage-Policy-Index': policy.idx}])
@patch_policies([storage_policy.StoragePolicy(0, 'zero', True),
storage_policy.StoragePolicy(1, 'one'),
@ -2363,7 +2364,7 @@ class TestObjectController(unittest.TestCase):
headers={'X-Timestamp': '12345',
'Content-Type': 'application/burrito',
'Content-Length': '0',
POLICY_INDEX: policy.idx,
'X-Backend-Storage-Policy-Index': policy.idx,
'X-Container-Partition': '20',
'X-Container-Host': '1.2.3.4:5',
'X-Container-Device': 'sdb1',
@ -2396,10 +2397,10 @@ class TestObjectController(unittest.TestCase):
'x-etag': 'd41d8cd98f00b204e9800998ecf8427e',
'x-size': '0',
'x-timestamp': utils.Timestamp('12345').internal,
POLICY_INDEX: '37',
'X-Backend-Storage-Policy-Index': '37',
'referer': 'PUT http://localhost/sda1/p/a/c/o',
'user-agent': 'obj-server %d' % os.getpid(),
POLICY_INDEX: policy.idx,
'X-Backend-Storage-Policy-Index': policy.idx,
'x-trans-id': '-'})})
self.assertEquals(
http_connect_args[1],
@ -2417,7 +2418,8 @@ class TestObjectController(unittest.TestCase):
'x-timestamp': utils.Timestamp('12345').internal,
'referer': 'PUT http://localhost/sda1/p/a/c/o',
'user-agent': 'obj-server %d' % os.getpid(),
POLICY_INDEX: 0, # system account storage policy is 0
# system account storage policy is 0
'X-Backend-Storage-Policy-Index': 0,
'x-trans-id': '-'})})
self.assertEquals(
http_connect_args[2],
@ -2435,7 +2437,8 @@ class TestObjectController(unittest.TestCase):
'x-timestamp': utils.Timestamp('12345').internal,
'referer': 'PUT http://localhost/sda1/p/a/c/o',
'user-agent': 'obj-server %d' % os.getpid(),
POLICY_INDEX: 0, # system account storage policy is 0
# system account storage policy is 0
'X-Backend-Storage-Policy-Index': 0,
'x-trans-id': '-'})})
@patch_policies([storage_policy.StoragePolicy(0, 'zero', True),
@ -2476,7 +2479,7 @@ class TestObjectController(unittest.TestCase):
headers={'X-Timestamp': '12345',
'Content-Type': 'application/burrito',
'Content-Length': '0',
POLICY_INDEX: '26',
'X-Backend-Storage-Policy-Index': '26',
'X-Container-Partition': '20',
'X-Container-Host': '1.2.3.4:5, 6.7.8.9:10',
'X-Container-Device': 'sdb1, sdf1'})
@ -2502,7 +2505,7 @@ class TestObjectController(unittest.TestCase):
'x-etag': 'd41d8cd98f00b204e9800998ecf8427e',
'x-size': '0',
'x-timestamp': utils.Timestamp('12345').internal,
POLICY_INDEX: '26',
'X-Backend-Storage-Policy-Index': '26',
'referer': 'PUT http://localhost/sda1/p/a/c/o',
'user-agent': 'obj-server %d' % os.getpid(),
'x-trans-id': '-'})})
@ -2520,7 +2523,7 @@ class TestObjectController(unittest.TestCase):
'x-etag': 'd41d8cd98f00b204e9800998ecf8427e',
'x-size': '0',
'x-timestamp': utils.Timestamp('12345').internal,
POLICY_INDEX: '26',
'X-Backend-Storage-Policy-Index': '26',
'referer': 'PUT http://localhost/sda1/p/a/c/o',
'user-agent': 'obj-server %d' % os.getpid(),
'x-trans-id': '-'})})
@ -2553,7 +2556,7 @@ class TestObjectController(unittest.TestCase):
'X-Delete-At-Partition': 'p',
'X-Delete-At-Host': '10.0.0.2:6002',
'X-Delete-At-Device': 'sda1',
POLICY_INDEX: int(policy),
'X-Backend-Storage-Policy-Index': int(policy),
})
with mocked_http_conn(
500, 500, give_connect=capture_updates) as fake_conn:
@ -2571,7 +2574,8 @@ class TestObjectController(unittest.TestCase):
(delete_at_container, delete_at_timestamp))
expected = {
'X-Timestamp': put_timestamp,
POLICY_INDEX: 0, # system account is always 0
# system account storage policy is 0
'X-Backend-Storage-Policy-Index': 0,
}
for key, value in expected.items():
self.assertEqual(headers[key], str(value))
@ -2583,7 +2587,7 @@ class TestObjectController(unittest.TestCase):
self.assertEqual(path, '/sda1/p/a/c/o')
expected = {
'X-Timestamp': put_timestamp,
POLICY_INDEX: int(policy),
'X-Backend-Storage-Policy-Index': int(policy),
}
for key, value in expected.items():
self.assertEqual(headers[key], str(value))
@ -2598,10 +2602,12 @@ class TestObjectController(unittest.TestCase):
data = pickle.load(open(async_file))
if data['account'] == 'a':
self.assertEquals(
int(data['headers'][POLICY_INDEX]), policy.idx)
int(data['headers']
['X-Backend-Storage-Policy-Index']), policy.idx)
elif data['account'] == '.expiring_objects':
self.assertEquals(
int(data['headers'][POLICY_INDEX]), 0)
int(data['headers']
['X-Backend-Storage-Policy-Index']), 0)
else:
self.fail('unexpected async pending data')
self.assertEqual(2, len(found_files))
@ -2621,7 +2627,8 @@ class TestObjectController(unittest.TestCase):
self.object_controller.async_update(
'PUT', 'a', 'c', 'o', '127.0.0.1:1234', 1, 'sdc1',
{'x-timestamp': '1', 'x-out': 'set',
POLICY_INDEX: policy.idx}, 'sda1', policy.idx)
'X-Backend-Storage-Policy-Index': policy.idx}, 'sda1',
policy.idx)
finally:
object_server.http_connect = orig_http_connect
utils.HASH_PATH_PREFIX = _prefix
@ -2633,7 +2640,7 @@ class TestObjectController(unittest.TestCase):
utils.Timestamp(1).internal))),
{'headers': {'x-timestamp': '1', 'x-out': 'set',
'user-agent': 'obj-server %s' % os.getpid(),
POLICY_INDEX: policy.idx},
'X-Backend-Storage-Policy-Index': policy.idx},
'account': 'a', 'container': 'c', 'obj': 'o', 'op': 'PUT'})
def test_async_update_saves_on_non_2xx(self):
@ -2664,7 +2671,8 @@ class TestObjectController(unittest.TestCase):
self.object_controller.async_update(
'PUT', 'a', 'c', 'o', '127.0.0.1:1234', 1, 'sdc1',
{'x-timestamp': '1', 'x-out': str(status),
POLICY_INDEX: policy.idx}, 'sda1', policy.idx)
'X-Backend-Storage-Policy-Index': policy.idx}, 'sda1',
policy.idx)
async_dir = diskfile.get_async_dir(policy.idx)
self.assertEquals(
pickle.load(open(os.path.join(
@ -2673,7 +2681,8 @@ class TestObjectController(unittest.TestCase):
utils.Timestamp(1).internal))),
{'headers': {'x-timestamp': '1', 'x-out': str(status),
'user-agent': 'obj-server %s' % os.getpid(),
POLICY_INDEX: policy.idx},
'X-Backend-Storage-Policy-Index':
policy.idx},
'account': 'a', 'container': 'c', 'obj': 'o',
'op': 'PUT'})
finally:
@ -2799,7 +2808,7 @@ class TestObjectController(unittest.TestCase):
'x-etag': 'd41d8cd98f00b204e9800998ecf8427e',
'x-content-type': 'text/plain',
'x-timestamp': utils.Timestamp(1).internal,
POLICY_INDEX: '0', # default when not given
'X-Backend-Storage-Policy-Index': '0', # default when not given
'x-trans-id': '123',
'referer': 'PUT http://localhost/sda1/0/a/c/o'}))
@ -2900,7 +2909,7 @@ class TestObjectController(unittest.TestCase):
'DELETE', '.expiring_objects', '0000000000',
'0000000002-a/c/o', None, None, None,
HeaderKeyDict({
POLICY_INDEX: 0,
'X-Backend-Storage-Policy-Index': 0,
'x-timestamp': utils.Timestamp('1').internal,
'x-trans-id': '123',
'referer': 'PUT http://localhost/v1/a/c/o'}),
@ -2927,7 +2936,7 @@ class TestObjectController(unittest.TestCase):
'DELETE', '.expiring_objects', '0000000000', '0000000000-a/c/o',
None, None, None,
HeaderKeyDict({
POLICY_INDEX: 0,
'X-Backend-Storage-Policy-Index': 0,
'x-timestamp': utils.Timestamp('1').internal,
'x-trans-id': '1234',
'referer': 'PUT http://localhost/v1/a/c/o'}),
@ -2954,7 +2963,7 @@ class TestObjectController(unittest.TestCase):
'DELETE', '.expiring_objects', '9999936000', '9999999999-a/c/o',
None, None, None,
HeaderKeyDict({
POLICY_INDEX: 0,
'X-Backend-Storage-Policy-Index': 0,
'x-timestamp': utils.Timestamp('1').internal,
'x-trans-id': '1234',
'referer': 'PUT http://localhost/v1/a/c/o'}),
@ -2986,7 +2995,7 @@ class TestObjectController(unittest.TestCase):
'PUT', '.expiring_objects', '0000000000', '0000000002-a/c/o',
'127.0.0.1:1234',
'3', 'sdc1', HeaderKeyDict({
POLICY_INDEX: 0,
'X-Backend-Storage-Policy-Index': 0,
'x-size': '0',
'x-etag': 'd41d8cd98f00b204e9800998ecf8427e',
'x-content-type': 'text/plain',
@ -3040,7 +3049,7 @@ class TestObjectController(unittest.TestCase):
'DELETE', '.expiring_objects', '0000000000',
'0000000002-a/c/o', None, None,
None, HeaderKeyDict({
POLICY_INDEX: 0,
'X-Backend-Storage-Policy-Index': 0,
'x-timestamp': utils.Timestamp('1').internal,
'x-trans-id': '1234',
'referer': 'DELETE http://localhost/v1/a/c/o'}),
@ -4065,7 +4074,7 @@ class TestObjectController(unittest.TestCase):
'Content-Type': 'application/x-test',
'Foo': 'fooheader',
'Baz': 'bazheader',
POLICY_INDEX: 1,
'X-Backend-Storage-Policy-Index': 1,
'X-Object-Meta-1': 'One',
'X-Object-Meta-Two': 'Two'})
req.body = 'VERIFY'

View File

@ -27,7 +27,6 @@ from swift.common import constraints
from swift.common import exceptions
from swift.common import swob
from swift.common import utils
from swift.common.storage_policy import POLICY_INDEX
from swift.obj import diskfile
from swift.obj import server
from swift.obj import ssync_receiver
@ -1066,7 +1065,7 @@ class TestReceiver(unittest.TestCase):
'Content-Encoding': 'gzip',
'Specialty-Header': 'value',
'Host': 'localhost:80',
POLICY_INDEX: '0',
'X-Backend-Storage-Policy-Index': '0',
'X-Backend-Replication': 'True',
'X-Backend-Replication-Headers': (
'content-length x-timestamp x-object-meta-test1 '
@ -1117,7 +1116,7 @@ class TestReceiver(unittest.TestCase):
'Content-Encoding': 'gzip',
'Specialty-Header': 'value',
'Host': 'localhost:80',
POLICY_INDEX: '1',
'X-Backend-Storage-Policy-Index': '1',
'X-Backend-Replication': 'True',
'X-Backend-Replication-Headers': (
'content-length x-timestamp x-object-meta-test1 '
@ -1155,7 +1154,7 @@ class TestReceiver(unittest.TestCase):
self.assertEqual(req.headers, {
'X-Timestamp': '1364456113.76334',
'Host': 'localhost:80',
POLICY_INDEX: '0',
'X-Backend-Storage-Policy-Index': '0',
'X-Backend-Replication': 'True',
'X-Backend-Replication-Headers': 'x-timestamp'})
@ -1256,7 +1255,7 @@ class TestReceiver(unittest.TestCase):
'Content-Encoding': 'gzip',
'Specialty-Header': 'value',
'Host': 'localhost:80',
POLICY_INDEX: '0',
'X-Backend-Storage-Policy-Index': '0',
'X-Backend-Replication': 'True',
'X-Backend-Replication-Headers': (
'content-length x-timestamp x-object-meta-test1 '
@ -1268,7 +1267,7 @@ class TestReceiver(unittest.TestCase):
self.assertEqual(req.headers, {
'X-Timestamp': '1364456113.00002',
'Host': 'localhost:80',
POLICY_INDEX: '0',
'X-Backend-Storage-Policy-Index': '0',
'X-Backend-Replication': 'True',
'X-Backend-Replication-Headers': 'x-timestamp'})
req = _requests.pop(0)
@ -1279,7 +1278,7 @@ class TestReceiver(unittest.TestCase):
'Content-Length': '3',
'X-Timestamp': '1364456113.00003',
'Host': 'localhost:80',
POLICY_INDEX: '0',
'X-Backend-Storage-Policy-Index': '0',
'X-Backend-Replication': 'True',
'X-Backend-Replication-Headers': (
'content-length x-timestamp')})
@ -1292,7 +1291,7 @@ class TestReceiver(unittest.TestCase):
'Content-Length': '4',
'X-Timestamp': '1364456113.00004',
'Host': 'localhost:80',
POLICY_INDEX: '0',
'X-Backend-Storage-Policy-Index': '0',
'X-Backend-Replication': 'True',
'X-Backend-Replication-Headers': (
'content-length x-timestamp')})
@ -1303,7 +1302,7 @@ class TestReceiver(unittest.TestCase):
self.assertEqual(req.headers, {
'X-Timestamp': '1364456113.00005',
'Host': 'localhost:80',
POLICY_INDEX: '0',
'X-Backend-Storage-Policy-Index': '0',
'X-Backend-Replication': 'True',
'X-Backend-Replication-Headers': 'x-timestamp'})
req = _requests.pop(0)
@ -1312,7 +1311,7 @@ class TestReceiver(unittest.TestCase):
self.assertEqual(req.headers, {
'X-Timestamp': '1364456113.00006',
'Host': 'localhost:80',
POLICY_INDEX: '0',
'X-Backend-Storage-Policy-Index': '0',
'X-Backend-Replication': 'True',
'X-Backend-Replication-Headers': 'x-timestamp'})
self.assertEqual(_requests, [])
@ -1376,7 +1375,7 @@ class TestReceiver(unittest.TestCase):
'Content-Length': '3',
'X-Timestamp': '1364456113.00001',
'Host': 'localhost:80',
POLICY_INDEX: '0',
'X-Backend-Storage-Policy-Index': '0',
'X-Backend-Replication': 'True',
'X-Backend-Replication-Headers': (
'content-length x-timestamp')})
@ -1388,7 +1387,7 @@ class TestReceiver(unittest.TestCase):
'Content-Length': '1',
'X-Timestamp': '1364456113.00002',
'Host': 'localhost:80',
POLICY_INDEX: '0',
'X-Backend-Storage-Policy-Index': '0',
'X-Backend-Replication': 'True',
'X-Backend-Replication-Headers': (
'content-length x-timestamp')})

View File

@ -26,7 +26,6 @@ import mock
from swift.common import exceptions, utils
from swift.obj import ssync_sender, diskfile
from swift.common.storage_policy import POLICY_INDEX
from test.unit import DebugLogger, patch_policies
@ -228,7 +227,7 @@ class TestSender(unittest.TestCase):
],
'putheader': [
mock.call('Transfer-Encoding', 'chunked'),
mock.call(POLICY_INDEX, 1),
mock.call('X-Backend-Storage-Policy-Index', 1),
],
'endheaders': [mock.call()],
}

View File

@ -37,7 +37,7 @@ from swift.common.utils import hash_path, normalize_timestamp, mkdirs, \
write_pickle
from swift.common import swob
from test.unit import debug_logger, patch_policies, mocked_http_conn
from swift.common.storage_policy import StoragePolicy, POLICIES, POLICY_INDEX
from swift.common.storage_policy import StoragePolicy, POLICIES
_mocked_policies = [StoragePolicy(0, 'zero', False),
@ -288,7 +288,8 @@ class TestObjectUpdater(unittest.TestCase):
line.split(':')[1].strip()
line = inc.readline()
self.assertTrue('x-container-timestamp' in headers)
self.assertTrue(POLICY_INDEX in headers)
self.assertTrue('X-Backend-Storage-Policy-Index' in
headers)
except BaseException as err:
return err
return None
@ -392,7 +393,8 @@ class TestObjectUpdater(unittest.TestCase):
for request_args, request_kwargs in request_log:
ip, part, method, path, headers, qs, ssl = request_args
self.assertEqual(method, op)
self.assertEqual(headers[POLICY_INDEX], str(policy.idx))
self.assertEqual(headers['X-Backend-Storage-Policy-Index'],
str(policy.idx))
self.assertEqual(daemon.logger.get_increment_counts(),
{'successes': 1, 'unlinks': 1,
'async_pendings': 1})
@ -420,7 +422,7 @@ class TestObjectUpdater(unittest.TestCase):
'x-content-type': 'text/plain',
'x-etag': 'd41d8cd98f00b204e9800998ecf8427e',
'x-timestamp': ts.next(),
POLICY_INDEX: policy.idx,
'X-Backend-Storage-Policy-Index': policy.idx,
})
data = {'op': op, 'account': account, 'container': container,
'obj': obj, 'headers': headers_out}
@ -444,7 +446,8 @@ class TestObjectUpdater(unittest.TestCase):
for request_args, request_kwargs in request_log:
ip, part, method, path, headers, qs, ssl = request_args
self.assertEqual(method, 'PUT')
self.assertEqual(headers[POLICY_INDEX], str(policy.idx))
self.assertEqual(headers['X-Backend-Storage-Policy-Index'],
str(policy.idx))
self.assertEqual(daemon.logger.get_increment_counts(),
{'successes': 1, 'unlinks': 1, 'async_pendings': 1})

View File

@ -315,6 +315,7 @@ class TestFuncs(unittest.TestCase):
req = Request.blank("/v1/AUTH_account/cont",
environ={'swift.cache': FakeCache({})})
resp = get_container_info(req.environ, FakeApp())
self.assertEquals(resp['storage_policy'], '0')
self.assertEquals(resp['bytes'], 6666)
self.assertEquals(resp['object_count'], 1000)
@ -342,6 +343,7 @@ class TestFuncs(unittest.TestCase):
req = Request.blank("/v1/account/cont",
environ={'swift.cache': FakeCache(cache_stub)})
resp = get_container_info(req.environ, FakeApp())
self.assertEquals(resp['storage_policy'], '0')
self.assertEquals(resp['bytes'], 3333)
self.assertEquals(resp['object_count'], 10)
self.assertEquals(resp['status'], 404)

View File

@ -14,33 +14,50 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import itertools
import time
import unittest
from contextlib import contextmanager
import mock
import swift
from swift.common import utils, swob
from swift.proxy import server as proxy_server
from swift.common.swob import HTTPException
from test.unit import FakeRing, FakeMemcache, fake_http_connect, debug_logger
from swift.common.storage_policy import StoragePolicy
from swift.common.storage_policy import StoragePolicy, POLICIES
from test.unit import patch_policies
from test.unit import FakeRing, FakeMemcache, fake_http_connect, \
debug_logger, patch_policies
@contextmanager
def set_http_connect(*args, **kwargs):
old_connect = swift.proxy.controllers.base.http_connect
new_connect = fake_http_connect(*args, **kwargs)
swift.proxy.controllers.base.http_connect = new_connect
swift.proxy.controllers.obj.http_connect = new_connect
swift.proxy.controllers.account.http_connect = new_connect
swift.proxy.controllers.container.http_connect = new_connect
yield new_connect
swift.proxy.controllers.base.http_connect = old_connect
swift.proxy.controllers.obj.http_connect = old_connect
swift.proxy.controllers.account.http_connect = old_connect
swift.proxy.controllers.container.http_connect = old_connect
try:
swift.proxy.controllers.base.http_connect = new_connect
swift.proxy.controllers.obj.http_connect = new_connect
swift.proxy.controllers.account.http_connect = new_connect
swift.proxy.controllers.container.http_connect = new_connect
yield new_connect
left_over_status = list(new_connect.code_iter)
if left_over_status:
raise AssertionError('left over status %r' % left_over_status)
finally:
swift.proxy.controllers.base.http_connect = old_connect
swift.proxy.controllers.obj.http_connect = old_connect
swift.proxy.controllers.account.http_connect = old_connect
swift.proxy.controllers.container.http_connect = old_connect
class PatchedObjControllerApp(proxy_server.Application):
object_controller = proxy_server.ObjectController
def handle_request(self, req):
with mock.patch('swift.proxy.server.ObjectController',
new=self.object_controller):
return super(PatchedObjControllerApp, self).handle_request(req)
@patch_policies([StoragePolicy(0, 'zero', True,
@ -90,129 +107,314 @@ class TestObjControllerWriteAffinity(unittest.TestCase):
def test_connect_put_node_timeout(self):
controller = proxy_server.ObjectController(self.app, 'a', 'c', 'o')
self.app.conn_timeout = 0.05
with set_http_connect(200, slow_connect=True):
with set_http_connect(slow_connect=True):
nodes = [dict(ip='', port='', device='')]
res = controller._connect_put_node(nodes, '', '', {}, ('', ''))
self.assertTrue(res is None)
@patch_policies([StoragePolicy(0, 'zero', True, object_ring=FakeRing())])
@patch_policies([
StoragePolicy(0, 'zero', True),
StoragePolicy(1, 'one'),
StoragePolicy(2, 'two'),
])
class TestObjController(unittest.TestCase):
container_info = {
'partition': 1,
'nodes': [
{'ip': '127.0.0.1', 'port': '1', 'device': 'sda'},
{'ip': '127.0.0.1', 'port': '2', 'device': 'sda'},
{'ip': '127.0.0.1', 'port': '3', 'device': 'sda'},
],
'write_acl': None,
'read_acl': None,
'storage_policy': None,
'sync_key': None,
'versions': None,
}
def setUp(self):
# setup fake rings with handoffs
self.obj_ring = FakeRing(max_more_nodes=3)
for policy in POLICIES:
policy.object_ring = self.obj_ring
logger = debug_logger('proxy-server')
logger.thread_locals = ('txn1', '127.0.0.2')
self.app = proxy_server.Application(
self.app = PatchedObjControllerApp(
None, FakeMemcache(), account_ring=FakeRing(),
container_ring=FakeRing(), logger=logger)
self.controller = proxy_server.ObjectController(self.app,
'a', 'c', 'o')
self.controller.container_info = mock.MagicMock(return_value={
'partition': 1,
'nodes': [
{'ip': '127.0.0.1', 'port': '1', 'device': 'sda'},
{'ip': '127.0.0.1', 'port': '2', 'device': 'sda'},
{'ip': '127.0.0.1', 'port': '3', 'device': 'sda'},
],
'write_acl': None,
'read_acl': None,
'storage_policy': None,
'sync_key': None,
'versions': None})
class FakeContainerInfoObjController(proxy_server.ObjectController):
def container_info(controller, *args, **kwargs):
patch_path = 'swift.proxy.controllers.base.get_info'
with mock.patch(patch_path) as mock_get_info:
mock_get_info.return_value = dict(self.container_info)
return super(FakeContainerInfoObjController,
controller).container_info(*args, **kwargs)
self.app.object_controller = FakeContainerInfoObjController
def test_PUT_simple(self):
req = swift.common.swob.Request.blank('/v1/a/c/o')
req = swift.common.swob.Request.blank('/v1/a/c/o', method='PUT')
req.headers['content-length'] = '0'
with set_http_connect(201, 201, 201):
resp = self.controller.PUT(req)
resp = req.get_response(self.app)
self.assertEquals(resp.status_int, 201)
def test_PUT_if_none_match(self):
req = swift.common.swob.Request.blank('/v1/a/c/o')
req = swift.common.swob.Request.blank('/v1/a/c/o', method='PUT')
req.headers['if-none-match'] = '*'
req.headers['content-length'] = '0'
with set_http_connect(201, 201, 201):
resp = self.controller.PUT(req)
resp = req.get_response(self.app)
self.assertEquals(resp.status_int, 201)
def test_PUT_if_none_match_denied(self):
req = swift.common.swob.Request.blank('/v1/a/c/o')
req = swift.common.swob.Request.blank('/v1/a/c/o', method='PUT')
req.headers['if-none-match'] = '*'
req.headers['content-length'] = '0'
with set_http_connect(201, (412, 412), 201):
resp = self.controller.PUT(req)
with set_http_connect(201, 412, 201):
resp = req.get_response(self.app)
self.assertEquals(resp.status_int, 412)
def test_PUT_if_none_match_not_star(self):
req = swift.common.swob.Request.blank('/v1/a/c/o')
req = swift.common.swob.Request.blank('/v1/a/c/o', method='PUT')
req.headers['if-none-match'] = 'somethingelse'
req.headers['content-length'] = '0'
with set_http_connect(201, 201, 201):
resp = self.controller.PUT(req)
with set_http_connect():
resp = req.get_response(self.app)
self.assertEquals(resp.status_int, 400)
def test_GET_simple(self):
req = swift.common.swob.Request.blank('/v1/a/c/o')
with set_http_connect(200):
resp = self.controller.GET(req)
resp = req.get_response(self.app)
self.assertEquals(resp.status_int, 200)
def test_DELETE_simple(self):
def test_GET_error(self):
req = swift.common.swob.Request.blank('/v1/a/c/o')
with set_http_connect(503, 200):
resp = req.get_response(self.app)
self.assertEquals(resp.status_int, 200)
def test_GET_handoff(self):
req = swift.common.swob.Request.blank('/v1/a/c/o')
codes = [503] * self.obj_ring.replicas + [200]
with set_http_connect(*codes):
resp = req.get_response(self.app)
self.assertEquals(resp.status_int, 200)
def test_GET_not_found(self):
req = swift.common.swob.Request.blank('/v1/a/c/o')
codes = [404] * (self.obj_ring.replicas +
self.obj_ring.max_more_nodes)
with set_http_connect(*codes):
resp = req.get_response(self.app)
self.assertEquals(resp.status_int, 404)
def test_DELETE_simple(self):
req = swift.common.swob.Request.blank('/v1/a/c/o', method='DELETE')
with set_http_connect(204, 204, 204):
resp = self.controller.DELETE(req)
resp = req.get_response(self.app)
self.assertEquals(resp.status_int, 204)
def test_DELETE_missing_one(self):
req = swift.common.swob.Request.blank('/v1/a/c/o', method='DELETE')
with set_http_connect(404, 204, 204):
resp = req.get_response(self.app)
self.assertEquals(resp.status_int, 204)
def test_DELETE_not_found(self):
req = swift.common.swob.Request.blank('/v1/a/c/o', method='DELETE')
with set_http_connect(404, 404, 204):
resp = req.get_response(self.app)
self.assertEquals(resp.status_int, 404)
def test_DELETE_handoff(self):
req = swift.common.swob.Request.blank('/v1/a/c/o', method='DELETE')
codes = [204] * self.obj_ring.replicas
with set_http_connect(507, *codes):
resp = req.get_response(self.app)
self.assertEquals(resp.status_int, 204)
def test_POST_as_COPY_simple(self):
req = swift.common.swob.Request.blank('/v1/a/c/o')
with set_http_connect(200, 200, 200, 201, 201, 201) as fake_conn:
resp = self.controller.POST(req)
self.assertRaises(StopIteration, fake_conn.code_iter.next)
req = swift.common.swob.Request.blank('/v1/a/c/o', method='POST')
head_resp = [200] * self.obj_ring.replicas + \
[404] * self.obj_ring.max_more_nodes
put_resp = [201] * self.obj_ring.replicas
codes = head_resp + put_resp
with set_http_connect(*codes):
resp = req.get_response(self.app)
self.assertEquals(resp.status_int, 202)
def test_container_sync_put_x_timestamp_not_found(self):
test_indexes = [None] + [int(p) for p in POLICIES]
for policy_index in test_indexes:
self.container_info['storage_policy'] = policy_index
put_timestamp = utils.Timestamp(time.time()).normal
req = swob.Request.blank(
'/v1/a/c/o', method='PUT', headers={
'Content-Length': 0,
'X-Timestamp': put_timestamp})
ts_iter = itertools.repeat(put_timestamp)
head_resp = [404] * self.obj_ring.replicas + \
[404] * self.obj_ring.max_more_nodes
put_resp = [201] * self.obj_ring.replicas
codes = head_resp + put_resp
with set_http_connect(*codes, timestamps=ts_iter):
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 201)
def test_container_sync_put_x_timestamp_match(self):
test_indexes = [None] + [int(p) for p in POLICIES]
for policy_index in test_indexes:
self.container_info['storage_policy'] = policy_index
put_timestamp = utils.Timestamp(time.time()).normal
req = swob.Request.blank(
'/v1/a/c/o', method='PUT', headers={
'Content-Length': 0,
'X-Timestamp': put_timestamp})
ts_iter = itertools.repeat(put_timestamp)
head_resp = [200] * self.obj_ring.replicas + \
[404] * self.obj_ring.max_more_nodes
codes = head_resp
with set_http_connect(*codes, timestamps=ts_iter):
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 202)
def test_container_sync_put_x_timestamp_older(self):
ts = (utils.Timestamp(t) for t in itertools.count(int(time.time())))
test_indexes = [None] + [int(p) for p in POLICIES]
for policy_index in test_indexes:
self.container_info['storage_policy'] = policy_index
req = swob.Request.blank(
'/v1/a/c/o', method='PUT', headers={
'Content-Length': 0,
'X-Timestamp': ts.next().internal})
ts_iter = itertools.repeat(ts.next().internal)
head_resp = [200] * self.obj_ring.replicas + \
[404] * self.obj_ring.max_more_nodes
codes = head_resp
with set_http_connect(*codes, timestamps=ts_iter):
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 202)
def test_container_sync_put_x_timestamp_newer(self):
ts = (utils.Timestamp(t) for t in itertools.count(int(time.time())))
test_indexes = [None] + [int(p) for p in POLICIES]
for policy_index in test_indexes:
orig_timestamp = ts.next().internal
req = swob.Request.blank(
'/v1/a/c/o', method='PUT', headers={
'Content-Length': 0,
'X-Timestamp': ts.next().internal})
ts_iter = itertools.repeat(orig_timestamp)
head_resp = [200] * self.obj_ring.replicas + \
[404] * self.obj_ring.max_more_nodes
put_resp = [201] * self.obj_ring.replicas
codes = head_resp + put_resp
with set_http_connect(*codes, timestamps=ts_iter):
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 201)
def test_container_sync_delete(self):
ts = (utils.Timestamp(t) for t in itertools.count(int(time.time())))
test_indexes = [None] + [int(p) for p in POLICIES]
for policy_index in test_indexes:
req = swob.Request.blank(
'/v1/a/c/o', method='DELETE', headers={
'X-Timestamp': ts.next().internal})
codes = [409] * self.obj_ring.replicas
ts_iter = itertools.repeat(ts.next().internal)
with set_http_connect(*codes, timestamps=ts_iter):
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 409)
def test_put_x_timestamp_conflict(self):
ts = (utils.Timestamp(t) for t in itertools.count(int(time.time())))
req = swob.Request.blank(
'/v1/a/c/o', method='PUT', headers={
'Content-Length': 0,
'X-Timestamp': ts.next().internal})
head_resp = [404] * self.obj_ring.replicas + \
[404] * self.obj_ring.max_more_nodes
put_resp = [409] + [201] * self.obj_ring.replicas
codes = head_resp + put_resp
with set_http_connect(*codes):
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 201)
def test_COPY_simple(self):
req = swift.common.swob.Request.blank(
'/v1/a/c/o', headers={'Content-Length': 0,
'Destination': 'c/o-copy'})
with set_http_connect(200, 200, 200, 201, 201, 201) as fake_conn:
resp = self.controller.COPY(req)
self.assertRaises(StopIteration, fake_conn.code_iter.next)
'/v1/a/c/o', method='COPY',
headers={'Content-Length': 0,
'Destination': 'c/o-copy'})
head_resp = [200] * self.obj_ring.replicas + \
[404] * self.obj_ring.max_more_nodes
put_resp = [201] * self.obj_ring.replicas
codes = head_resp + put_resp
with set_http_connect(*codes):
resp = req.get_response(self.app)
self.assertEquals(resp.status_int, 201)
def test_HEAD_simple(self):
req = swift.common.swob.Request.blank('/v1/a/c/o')
req = swift.common.swob.Request.blank('/v1/a/c/o', method='HEAD')
with set_http_connect(200):
resp = self.controller.HEAD(req)
resp = req.get_response(self.app)
self.assertEquals(resp.status_int, 200)
def test_HEAD_x_newest(self):
req = swift.common.swob.Request.blank('/v1/a/c/o',
req = swift.common.swob.Request.blank('/v1/a/c/o', method='HEAD',
headers={'X-Newest': 'true'})
with set_http_connect(200, 200, 200) as fake_conn:
resp = self.controller.HEAD(req)
self.assertRaises(StopIteration, fake_conn.code_iter.next)
with set_http_connect(200, 200, 200):
resp = req.get_response(self.app)
self.assertEquals(resp.status_int, 200)
def test_PUT_log_info(self):
# mock out enough to get to the area of the code we want to test
with mock.patch('swift.proxy.controllers.obj.check_object_creation',
mock.MagicMock(return_value=None)):
req = swift.common.swob.Request.blank('/v1/a/c/o')
req.headers['x-copy-from'] = 'somewhere'
try:
self.controller.PUT(req)
except HTTPException:
pass
self.assertEquals(
req.environ.get('swift.log_info'), ['x-copy-from:somewhere'])
# and then check that we don't do that for originating POSTs
req = swift.common.swob.Request.blank('/v1/a/c/o')
req.method = 'POST'
req.headers['x-copy-from'] = 'elsewhere'
try:
self.controller.PUT(req)
except HTTPException:
pass
self.assertEquals(req.environ.get('swift.log_info'), None)
req = swift.common.swob.Request.blank('/v1/a/c/o', method='PUT')
req.headers['x-copy-from'] = 'some/where'
req.headers['Content-Length'] = 0
# override FakeConn default resp headers to keep log_info clean
resp_headers = {'x-delete-at': None}
head_resp = [200] * self.obj_ring.replicas + \
[404] * self.obj_ring.max_more_nodes
put_resp = [201] * self.obj_ring.replicas
codes = head_resp + put_resp
with set_http_connect(*codes, headers=resp_headers):
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 201)
self.assertEquals(
req.environ.get('swift.log_info'), ['x-copy-from:some/where'])
# and then check that we don't do that for originating POSTs
req = swift.common.swob.Request.blank('/v1/a/c/o')
req.method = 'POST'
req.headers['x-copy-from'] = 'else/where'
with set_http_connect(*codes, headers=resp_headers):
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 202)
self.assertEquals(req.environ.get('swift.log_info'), None)
@patch_policies([
StoragePolicy(0, 'zero', True),
StoragePolicy(1, 'one'),
StoragePolicy(2, 'two'),
])
class TestObjControllerLegacyCache(TestObjController):
"""
This test pretends like memcache returned a stored value that should
resemble whatever "old" format. It catches KeyErrors you'd get if your
code was expecting some new format during a rolling upgrade.
"""
container_info = {
'read_acl': None,
'write_acl': None,
'sync_key': None,
'versions': None,
}
if __name__ == '__main__':

View File

@ -34,7 +34,7 @@ import re
import random
import mock
from eventlet import sleep, spawn, wsgi, listen
from eventlet import sleep, spawn, wsgi, listen, Timeout
from swift.common.utils import json
from test.unit import (
@ -59,7 +59,7 @@ from swift.common.swob import Request, Response, HTTPUnauthorized, \
HTTPException
from swift.common import storage_policy
from swift.common.storage_policy import StoragePolicy, \
StoragePolicyCollection, POLICIES, POLICY, POLICY_INDEX
StoragePolicyCollection, POLICIES
from swift.common.request_helpers import get_sys_meta_prefix
# mocks
@ -1138,11 +1138,20 @@ class TestObjectController(unittest.TestCase):
with save_globals():
controller = proxy_server.ObjectController(self.app, 'account',
'container', 'object')
# The (201, -4) tuples in there have the effect of letting the
# initial connect succeed, after which getexpect() gets called and
# then the -4 makes the response of that actually be 201 instead of
# 100. Perfectly straightforward.
set_http_connect(200, 200, (201, -4), (201, -4), (201, -4),
# The (201, Exception('test')) tuples in there have the effect of
# changing the status of the initial expect response. The default
# expect response from FakeConn for 201 is 100.
# But the object server won't send a 100 continue line if the
# client doesn't send a expect 100 header (as is the case with
# zero byte PUTs as validated by this test), nevertheless the
# object controller calls getexpect without prejudice. In this
# case the status from the response shows up early in getexpect
# instead of having to wait until getresponse. The Exception is
# in there to ensure that the object controller also *uses* the
# result of getexpect instead of calling getresponse in which case
# our FakeConn will blow up.
success_codes = [(201, Exception('test'))] * 3
set_http_connect(200, 200, *success_codes,
give_connect=test_connect)
req = Request.blank('/v1/a/c/o.jpg', {})
req.content_length = 0
@ -1165,7 +1174,12 @@ class TestObjectController(unittest.TestCase):
with save_globals():
controller = \
proxy_server.ObjectController(self.app, 'a', 'c', 'o.jpg')
set_http_connect(200, 200, 201, 201, 201,
# the (100, 201) tuples in there are just being extra explicit
# about the FakeConn returning the 100 Continue status when the
# object controller calls getexpect. Which is FakeConn's default
# for 201 if no expect_status is specified.
success_codes = [(100, 201)] * 3
set_http_connect(200, 200, *success_codes,
give_connect=test_connect)
req = Request.blank('/v1/a/c/o.jpg', {})
req.content_length = 1
@ -1585,11 +1599,19 @@ class TestObjectController(unittest.TestCase):
res = controller.PUT(req)
expected = str(expected)
self.assertEquals(res.status[:len(expected)], expected)
test_status_map((200, 200, 201, 201, -1), 201)
test_status_map((200, 200, 201, 201, -2), 201) # expect timeout
test_status_map((200, 200, 201, 201, -3), 201) # error limited
test_status_map((200, 200, 201, -1, -1), 503)
test_status_map((200, 200, 503, 503, -1), 503)
test_status_map((200, 200, 201, 201, -1), 201) # connect exc
# connect errors
test_status_map((200, 200, Timeout(), 201, 201, ), 201)
test_status_map((200, 200, 201, 201, Exception()), 201)
# expect errors
test_status_map((200, 200, (Timeout(), None), 201, 201), 201)
test_status_map((200, 200, (Exception(), None), 201, 201), 201)
# response errors
test_status_map((200, 200, (100, Timeout()), 201, 201), 201)
test_status_map((200, 200, (100, Exception()), 201, 201), 201)
test_status_map((200, 200, 507, 201, 201), 201) # error limited
test_status_map((200, 200, -1, 201, -1), 503)
test_status_map((200, 200, 503, -1, 503), 503)
def test_PUT_send_exceptions(self):
with save_globals():
@ -1715,31 +1737,39 @@ class TestObjectController(unittest.TestCase):
check_request(account_request, method='HEAD', path='/sda/1/a')
container_request = backend_requests.pop(0)
check_request(container_request, method='HEAD', path='/sda/1/a/c')
for i, (device, request) in enumerate(zip(('sda', 'sdb', 'sdc'),
backend_requests)):
# make sure backend requests included expected container headers
container_headers = {}
for request in backend_requests:
req_headers = request[2]
device = req_headers['x-container-device']
host = req_headers['x-container-host']
container_headers[device] = host
expectations = {
'method': 'POST',
'path': '/%s/1/a/c/o' % device,
'path': '/1/a/c/o',
'headers': {
'X-Container-Host': '10.0.0.%d:100%d' % (i, i),
'X-Container-Partition': '1',
'Connection': 'close',
'User-Agent': 'proxy-server %s' % os.getpid(),
'Host': 'localhost:80',
'X-Container-Device': device,
'Referer': 'POST http://localhost/v1/a/c/o',
'X-Object-Meta-Color': 'Blue',
POLICY_INDEX: '1'
'X-Backend-Storage-Policy-Index': '1'
},
}
check_request(request, **expectations)
expected = {}
for i, device in enumerate(['sda', 'sdb', 'sdc']):
expected[device] = '10.0.0.%d:100%d' % (i, i)
self.assertEqual(container_headers, expected)
# and again with policy override
self.app.memcache.store = {}
backend_requests = []
req = Request.blank('/v1/a/c/o', {}, method='POST',
headers={'X-Object-Meta-Color': 'Blue',
POLICY_INDEX: 0})
'X-Backend-Storage-Policy-Index': 0})
with mocked_http_conn(
200, 200, 202, 202, 202,
headers=resp_headers, give_connect=capture_requests
@ -1754,7 +1784,7 @@ class TestObjectController(unittest.TestCase):
'path': '/1/a/c/o', # ignore device bit
'headers': {
'X-Object-Meta-Color': 'Blue',
POLICY_INDEX: '0',
'X-Backend-Storage-Policy-Index': '0',
}
}
check_request(request, **expectations)
@ -1765,7 +1795,7 @@ class TestObjectController(unittest.TestCase):
backend_requests = []
req = Request.blank('/v1/a/c/o', {}, method='POST',
headers={'X-Object-Meta-Color': 'Blue',
POLICY_INDEX: 0})
'X-Backend-Storage-Policy-Index': 0})
with mocked_http_conn(
200, 200, 200, 200, 200, 201, 201, 201,
headers=resp_headers, give_connect=capture_requests
@ -1774,8 +1804,8 @@ class TestObjectController(unittest.TestCase):
self.assertRaises(StopIteration, fake_conn.code_iter.next)
self.assertEqual(resp.status_int, 202)
self.assertEqual(len(backend_requests), 8)
policy0 = {POLICY_INDEX: '0'}
policy1 = {POLICY_INDEX: '1'}
policy0 = {'X-Backend-Storage-Policy-Index': '0'}
policy1 = {'X-Backend-Storage-Policy-Index': '1'}
expected = [
# account info
{'method': 'HEAD', 'path': '/1/a'},
@ -4234,8 +4264,10 @@ class TestObjectController(unittest.TestCase):
headers=None, query_string=None):
if method == "HEAD":
self.assertEquals(path, '/a/c/o.jpg')
self.assertNotEquals(None, headers[POLICY_INDEX])
self.assertEquals(1, int(headers[POLICY_INDEX]))
self.assertNotEquals(None,
headers['X-Backend-Storage-Policy-Index'])
self.assertEquals(1, int(headers
['X-Backend-Storage-Policy-Index']))
def fake_container_info(account, container, req):
return {'status': 200, 'sync_key': None, 'storage_policy': '1',
@ -4333,7 +4365,8 @@ class TestObjectController(unittest.TestCase):
expected_storage_policy = 0
else:
continue
storage_policy_index = int(headers[POLICY_INDEX])
storage_policy_index = \
int(headers['X-Backend-Storage-Policy-Index'])
self.assertEqual(
expected_storage_policy, storage_policy_index,
'Unexpected %s request for %s '
@ -4829,7 +4862,7 @@ class TestContainerController(unittest.TestCase):
for name, index in expected.items():
req = Request.blank('/a/c', headers={'Content-Length': '0',
'Content-Type': 'text/plain',
POLICY: name})
'X-Storage-Policy': name})
self.assertEqual(controller._convert_policy_to_index(req), index)
# default test
req = Request.blank('/a/c', headers={'Content-Length': '0',
@ -4837,13 +4870,14 @@ class TestContainerController(unittest.TestCase):
self.assertEqual(controller._convert_policy_to_index(req), None)
# negative test
req = Request.blank('/a/c', headers={'Content-Length': '0',
'Content-Type': 'text/plain', POLICY: 'nada'})
'Content-Type': 'text/plain',
'X-Storage-Policy': 'nada'})
self.assertRaises(HTTPException, controller._convert_policy_to_index,
req)
# storage policy two is deprecated
req = Request.blank('/a/c', headers={'Content-Length': '0',
'Content-Type': 'text/plain',
POLICY: 'two'})
'X-Storage-Policy': 'two'})
self.assertRaises(HTTPException, controller._convert_policy_to_index,
req)
@ -4852,33 +4886,34 @@ class TestContainerController(unittest.TestCase):
req = Request.blank('/v1/a/c')
with mocked_http_conn(
200, 200,
headers={POLICY_INDEX: int(policy)},
headers={'X-Backend-Storage-Policy-Index': int(policy)},
) as fake_conn:
resp = req.get_response(self.app)
self.assertRaises(StopIteration, fake_conn.code_iter.next)
self.assertEqual(resp.status_int, 200)
self.assertEqual(resp.headers[POLICY], policy.name)
self.assertEqual(resp.headers['X-Storage-Policy'], policy.name)
def test_no_convert_index_to_name_when_container_not_found(self):
policy = random.choice(list(POLICIES))
req = Request.blank('/v1/a/c')
with mocked_http_conn(
200, 404, 404, 404,
headers={POLICY_INDEX: int(policy)}) as fake_conn:
headers={'X-Backend-Storage-Policy-Index':
int(policy)}) as fake_conn:
resp = req.get_response(self.app)
self.assertRaises(StopIteration, fake_conn.code_iter.next)
self.assertEqual(resp.status_int, 404)
self.assertEqual(resp.headers[POLICY], None)
self.assertEqual(resp.headers['X-Storage-Policy'], None)
def test_error_convert_index_to_name(self):
req = Request.blank('/v1/a/c')
with mocked_http_conn(
200, 200,
headers={POLICY_INDEX: '-1'}) as fake_conn:
headers={'X-Backend-Storage-Policy-Index': '-1'}) as fake_conn:
resp = req.get_response(self.app)
self.assertRaises(StopIteration, fake_conn.code_iter.next)
self.assertEqual(resp.status_int, 200)
self.assertEqual(resp.headers[POLICY], None)
self.assertEqual(resp.headers['X-Storage-Policy'], None)
error_lines = self.app.logger.get_lines_for_level('error')
self.assertEqual(2, len(error_lines))
for msg in error_lines:
@ -5010,7 +5045,7 @@ class TestContainerController(unittest.TestCase):
headers={'Content-Length': 0})
if requested_policy:
expected_policy = requested_policy
req.headers[POLICY] = policy.name
req.headers['X-Storage-Policy'] = policy.name
else:
expected_policy = POLICIES.default
res = req.get_response(self.app)
@ -5028,15 +5063,18 @@ class TestContainerController(unittest.TestCase):
len(backend_requests))
for headers in backend_requests:
if not requested_policy:
self.assertFalse(POLICY_INDEX in headers)
self.assertFalse('X-Backend-Storage-Policy-Index' in
headers)
self.assertTrue(
'X-Backend-Storage-Policy-Default' in headers)
self.assertEqual(
int(expected_policy),
int(headers['X-Backend-Storage-Policy-Default']))
else:
self.assertTrue(POLICY_INDEX in headers)
self.assertEqual(int(headers[POLICY_INDEX]),
self.assertTrue('X-Backend-Storage-Policy-Index' in
headers)
self.assertEqual(int(headers
['X-Backend-Storage-Policy-Index']),
policy.idx)
# make sure all mocked responses are consumed
self.assertRaises(StopIteration, mock_conn.code_iter.next)

View File

@ -59,6 +59,6 @@ commands = {posargs}
# H501 -> don't use locals() for str formatting
# H903 -> \n not \r\n
ignore = H
select = F,E,W,H102,H103,H501,H903
select = F,E,W,H102,H103,H501,H903,H231
exclude = .venv,.tox,dist,doc,*egg
show-source = True