Merge branch 'master' into feature/ec
Change-Id: Ic255786a8470fdd9b9802656017d97b61e0e159a
This commit is contained in:
commit
fd19daee7f
|
@ -520,12 +520,14 @@ log_name object-auditor Label used when logging
|
|||
log_facility LOG_LOCAL0 Syslog log facility
|
||||
log_level INFO Logging level
|
||||
log_time 3600 Frequency of status logs in seconds.
|
||||
files_per_second 20 Maximum files audited per second. Should
|
||||
be tuned according to individual system
|
||||
specs. 0 is unlimited.
|
||||
bytes_per_second 10000000 Maximum bytes audited per second. Should
|
||||
be tuned according to individual system
|
||||
specs. 0 is unlimited.
|
||||
files_per_second 20 Maximum files audited per second per
|
||||
auditor process. Should be tuned according
|
||||
to individual system specs. 0 is unlimited.
|
||||
bytes_per_second 10000000 Maximum bytes audited per second per
|
||||
auditor process. Should be tuned according
|
||||
to individual system specs. 0 is unlimited.
|
||||
concurrency 1 The number of parallel processes to use
|
||||
for checksum auditing.
|
||||
================== ============== ==========================================
|
||||
|
||||
------------------------------
|
||||
|
|
|
@ -162,6 +162,11 @@ Getting the code
|
|||
|
||||
cd $HOME/python-swiftclient; sudo python setup.py develop; cd -
|
||||
|
||||
Ubuntu 12.04 users need to install python-swiftclient's dependencies before the installation of
|
||||
python-swiftclient. This is due to a bug in an older version of setup tools::
|
||||
|
||||
cd $HOME/python-swiftclient; sudo pip install -r requirements.txt; sudo python setup.py develop; cd -
|
||||
|
||||
#. Check out the swift repo::
|
||||
|
||||
git clone https://github.com/openstack/swift.git
|
||||
|
|
|
@ -78,8 +78,8 @@ awareness of storage policies to use them; once a container has been created
|
|||
with a specific policy, all objects stored in it will be done so in accordance
|
||||
with that policy.
|
||||
|
||||
Storage Policies are not implemented as a separate code module but are a core
|
||||
abstraction of Swift architecture.
|
||||
The Storage Policies feature is implemented throughout the entire code base so
|
||||
it is an important concept in understanding Swift architecture.
|
||||
|
||||
-------------
|
||||
Object Server
|
||||
|
|
|
@ -3,9 +3,9 @@ Storage Policies
|
|||
================
|
||||
|
||||
Storage Policies allow for some level of segmenting the cluster for various
|
||||
purposes through the creation of multiple object rings. Storage Policies are
|
||||
not implemented as a separate code module but are an important concept in
|
||||
understanding Swift architecture.
|
||||
purposes through the creation of multiple object rings. The Storage Policies
|
||||
feature is implemented throughout the entire code base so it is an important
|
||||
concept in understanding Swift architecture.
|
||||
|
||||
As described in :doc:`overview_ring`, Swift uses modified hashing rings to
|
||||
determine where data should reside in the cluster. There is a separate ring
|
||||
|
@ -61,10 +61,10 @@ Policy-0 is considered the default). We will be covering the difference
|
|||
between default and Policy-0 in the next section.
|
||||
|
||||
Policies are assigned when a container is created. Once a container has been
|
||||
assigned a policy, it cannot be changed until the container is deleted. The implications
|
||||
assigned a policy, it cannot be changed (unless it is deleted/recreated). The implications
|
||||
on data placement/movement for large datasets would make this a task best left for
|
||||
applications to perform. Therefore, if a container has an existing policy of,
|
||||
for example 3x replication, and one wanted to migrate that data to a policy that specifies,
|
||||
for example 3x replication, and one wanted to migrate that data to a policy that specifies
|
||||
a different replication level, the application would create another container
|
||||
specifying the other policy name and then simply move the data from one container
|
||||
to the other. Policies apply on a per container basis allowing for minimal application
|
||||
|
@ -84,8 +84,8 @@ Storage Policies on either side of a network outage at the same time? Furthermo
|
|||
what would happen if objects were placed in those containers, a whole bunch of them,
|
||||
and then later the network outage was restored? Well, without special care it would
|
||||
be a big problem as an application could end up using the wrong ring to try and find
|
||||
an object. Luckily there is a solution for this problem, a daemon covered in more
|
||||
detail later, works tirelessly to identify and rectify this potential scenario.
|
||||
an object. Luckily there is a solution for this problem, a daemon known as the
|
||||
Container Reconciler works tirelessly to identify and rectify this potential scenario.
|
||||
|
||||
--------------------
|
||||
Container Reconciler
|
||||
|
@ -126,19 +126,19 @@ The object rows are ensured to be fully durable during replication using
|
|||
the normal container replication. After the container
|
||||
replicator pushes its object rows to available primary nodes any
|
||||
misplaced object rows are bulk loaded into containers based off the
|
||||
object timestamp under the ".misplaced_objects" system account. The
|
||||
object timestamp under the ``.misplaced_objects`` system account. The
|
||||
rows are initially written to a handoff container on the local node, and
|
||||
at the end of the replication pass the .misplaced_object containers are
|
||||
at the end of the replication pass the ``.misplaced_objects`` containers are
|
||||
replicated to the correct primary nodes.
|
||||
|
||||
The container-reconciler processes the .misplaced_objects containers in
|
||||
The container-reconciler processes the ``.misplaced_objects`` containers in
|
||||
descending order and reaps its containers as the objects represented by
|
||||
the rows are successfully reconciled. The container-reconciler will
|
||||
always validate the correct storage policy for enqueued objects using
|
||||
direct container HEAD requests which are accelerated via caching.
|
||||
|
||||
Because failure of individual storage nodes in aggregate is assumed to
|
||||
be common at scale the container-reconciler will make forward progress
|
||||
be common at scale, the container-reconciler will make forward progress
|
||||
with a simple quorum majority. During a combination of failures and
|
||||
rebalances it is possible that a quorum could provide an incomplete
|
||||
record of the correct storage policy - so an object write may have to be
|
||||
|
@ -209,7 +209,7 @@ on disk but no longer accessible) and to provide proper messaging to
|
|||
applications when a policy needs to be retired, the notion of deprecation is
|
||||
used. :ref:`configure-policy` describes how to deprecate a policy.
|
||||
|
||||
Swift's behavior with deprecated policies will change as follows:
|
||||
Swift's behavior with deprecated policies is as follows:
|
||||
|
||||
* The deprecated policy will not appear in /info
|
||||
* PUT/GET/DELETE/POST/HEAD are still allowed on the pre-existing containers
|
||||
|
@ -221,7 +221,7 @@ Swift's behavior with deprecated policies will change as follows:
|
|||
|
||||
.. note::
|
||||
|
||||
A policy can not be both the default and deprecated. If you deprecate the
|
||||
A policy cannot be both the default and deprecated. If you deprecate the
|
||||
default policy, you must specify a new default.
|
||||
|
||||
You can also use the deprecated feature to rollout new policies. If you
|
||||
|
@ -268,9 +268,9 @@ are a number of rules enforced by Swift when parsing this file:
|
|||
* Policy names must be unique
|
||||
* The policy name 'Policy-0' can only be used for the policy with index 0
|
||||
* If any policies are defined, exactly one policy must be declared default
|
||||
* Deprecated policies can not be declared the default
|
||||
* Deprecated policies cannot be declared the default
|
||||
|
||||
The following is an example of a properly configured ''swift.conf'' file. See :doc:`policies_saio`
|
||||
The following is an example of a properly configured ``swift.conf`` file. See :doc:`policies_saio`
|
||||
for full instructions on setting up an all-in-one with this example configuration.::
|
||||
|
||||
[swift-hash]
|
||||
|
@ -327,8 +327,8 @@ for policy 1::
|
|||
Using Policies
|
||||
--------------
|
||||
|
||||
Using policies is very simple, a policy is only specified when a container is
|
||||
initially created, there are no other API changes. Creating a container can
|
||||
Using policies is very simple - a policy is only specified when a container is
|
||||
initially created. There are no other API changes. Creating a container can
|
||||
be done without any special policy information::
|
||||
|
||||
curl -v -X PUT -H 'X-Auth-Token: <your auth token>' \
|
||||
|
@ -344,7 +344,7 @@ If we wanted to explicitly state that we wanted policy 'gold' the command
|
|||
would simply need to include a new header as shown below::
|
||||
|
||||
curl -v -X PUT -H 'X-Auth-Token: <your auth token>' \
|
||||
-H 'X-Storage-Policy: gold' http://127.0.0.1:8080/v1/AUTH_test/myCont1
|
||||
-H 'X-Storage-Policy: gold' http://127.0.0.1:8080/v1/AUTH_test/myCont0
|
||||
|
||||
And that's it! The application does not need to specify the policy name ever
|
||||
again. There are some illegal operations however:
|
||||
|
@ -388,7 +388,7 @@ collection is made up of policies of class :class:`.StoragePolicy`. The
|
|||
collection class includes handy functions for getting to a policy either by
|
||||
name or by index , getting info about the policies, etc. There's also one
|
||||
very important function, :meth:`~.StoragePolicyCollection.get_object_ring`.
|
||||
Object rings are now members of the :class:`.StoragePolicy` class and are
|
||||
Object rings are members of the :class:`.StoragePolicy` class and are
|
||||
actually not instantiated until the :meth:`~.StoragePolicy.load_ring`
|
||||
method is called. Any caller anywhere in the code base that needs to access
|
||||
an object ring must use the :data:`.POLICIES` global singleton to access the
|
||||
|
@ -408,7 +408,7 @@ and by importing :func:`.get_container_info` to gain access to the policy
|
|||
index associated with the container in question. From the index it
|
||||
can then use the :data:`.POLICIES` singleton to grab the right ring. For example,
|
||||
:ref:`list_endpoints` is policy aware using the means just described. Another
|
||||
example is :ref:`recon` which will report the md5 sums for all object rings.
|
||||
example is :ref:`recon` which will report the md5 sums for all of the rings.
|
||||
|
||||
Proxy Server
|
||||
------------
|
||||
|
@ -452,7 +452,7 @@ on this later, but for now be aware of the following directory naming convention
|
|||
* ``/quarantined/objects-N`` maps to the quarantine directory for policy index #N
|
||||
|
||||
Note that these directory names are actually owned by the specific Diskfile
|
||||
Implementation, the names shown above are used by the default Diskfile.
|
||||
implementation, the names shown above are used by the default Diskfile.
|
||||
|
||||
Object Server
|
||||
-------------
|
||||
|
@ -466,7 +466,7 @@ policy index and leaves the actual directory naming/structure mechanisms to
|
|||
:class:`.Diskfile` being used will assure that data is properly located in the
|
||||
tree based on its policy.
|
||||
|
||||
For the same reason, the :ref:`object-updater` also is policy aware; as previously
|
||||
For the same reason, the :ref:`object-updater` also is policy aware. As previously
|
||||
described, different policies use different async pending directories so the
|
||||
updater needs to know how to scan them appropriately.
|
||||
|
||||
|
@ -476,7 +476,7 @@ handling a replication job for 2x versus 3x is trivial; however, the difference
|
|||
handling replication between 3x and erasure code is most definitely not. In
|
||||
fact, the term 'replication' really isn't appropriate for some policies
|
||||
like erasure code; however, the majority of the framework for collecting and
|
||||
processing jobs remains the same. Thus, those functions in the replicator are
|
||||
processing jobs is common. Thus, those functions in the replicator are
|
||||
leveraged for all policies and then there is policy specific code required for
|
||||
each policy, added when the policy is defined if needed.
|
||||
|
||||
|
@ -524,8 +524,8 @@ the old table.
|
|||
The policy index is stored here for use in reporting information
|
||||
about the container as well as managing split-brain scenario induced
|
||||
discrepancies between containers and their storage policies. Furthermore,
|
||||
during split-brain containers must be prepared to track object updates from
|
||||
multiple policies, so the object table also includes a
|
||||
during split-brain, containers must be prepared to track object updates from
|
||||
multiple policies so the object table also includes a
|
||||
``storage_policy_index`` column. Per-policy object counts and bytes are
|
||||
updated in the ``policy_stat`` table using ``INSERT`` and ``DELETE`` triggers
|
||||
similar to the pre-policy triggers that updated ``container_stat`` directly.
|
||||
|
@ -535,7 +535,7 @@ schemas as part of its normal consistency checking process when it updates the
|
|||
``reconciler_sync_point`` entry in the ``container_info`` table. This ensures
|
||||
that read heavy containers which do not encounter any writes will still get
|
||||
migrated to be fully compatible with the post-storage-policy queries without
|
||||
having to fall-back and retry queries with the legacy schema to service
|
||||
having to fall back and retry queries with the legacy schema to service
|
||||
container read requests.
|
||||
|
||||
The :ref:`container-sync-daemon` functionality only needs to be policy aware in that it
|
||||
|
@ -564,7 +564,7 @@ pre-storage-policy accounts by altering the DB schema and populating the
|
|||
point in time.
|
||||
|
||||
The per-storage-policy object and byte counts are not updated with each object
|
||||
PUT and DELETE request container updates to the account server is performed
|
||||
PUT and DELETE request, instead container updates to the account server are performed
|
||||
asynchronously by the ``swift-container-updater``.
|
||||
|
||||
.. _upgrade-policy:
|
||||
|
@ -582,6 +582,7 @@ an existing cluster that already has lots of data on it and upgrade to Swift wit
|
|||
Storage Policies. From there you want to go ahead and create a policy and test a
|
||||
few things out. All you need to do is:
|
||||
|
||||
#. Upgrade all of your Swift nodes to a policy-aware version of Swift
|
||||
#. Define your policies in ``/etc/swift/swift.conf``
|
||||
#. Create the corresponding object rings
|
||||
#. Create containers and objects and confirm their placement is as expected
|
||||
|
|
|
@ -227,6 +227,7 @@ use = egg:swift#recon
|
|||
# log_address = /dev/log
|
||||
#
|
||||
# files_per_second = 20
|
||||
# concurrency = 1
|
||||
# bytes_per_second = 10000000
|
||||
# log_time = 3600
|
||||
# zero_byte_files_per_second = 50
|
||||
|
|
|
@ -31,7 +31,7 @@ from swift.common.ring import Ring
|
|||
from swift.common.utils import get_logger, whataremyips, ismount, \
|
||||
config_true_value, Timestamp
|
||||
from swift.common.daemon import Daemon
|
||||
from swift.common.storage_policy import POLICIES, POLICY_INDEX
|
||||
from swift.common.storage_policy import POLICIES
|
||||
|
||||
|
||||
class AccountReaper(Daemon):
|
||||
|
@ -352,7 +352,7 @@ class AccountReaper(Daemon):
|
|||
if not objects:
|
||||
break
|
||||
try:
|
||||
policy_index = headers.get(POLICY_INDEX, 0)
|
||||
policy_index = headers.get('X-Backend-Storage-Policy-Index', 0)
|
||||
for obj in objects:
|
||||
if isinstance(obj['name'], unicode):
|
||||
obj['name'] = obj['name'].encode('utf8')
|
||||
|
@ -442,7 +442,7 @@ class AccountReaper(Daemon):
|
|||
headers={'X-Container-Host': '%(ip)s:%(port)s' % cnode,
|
||||
'X-Container-Partition': str(container_partition),
|
||||
'X-Container-Device': cnode['device'],
|
||||
POLICY_INDEX: policy_index})
|
||||
'X-Backend-Storage-Policy-Index': policy_index})
|
||||
successes += 1
|
||||
self.stats_return_codes[2] = \
|
||||
self.stats_return_codes.get(2, 0) + 1
|
||||
|
|
|
@ -38,7 +38,6 @@ from swift.common.swob import HTTPAccepted, HTTPBadRequest, \
|
|||
HTTPPreconditionFailed, HTTPConflict, Request, \
|
||||
HTTPInsufficientStorage, HTTPException
|
||||
from swift.common.request_helpers import is_sys_or_user_meta
|
||||
from swift.common.storage_policy import POLICY_INDEX
|
||||
|
||||
|
||||
class AccountController(object):
|
||||
|
@ -110,7 +109,8 @@ class AccountController(object):
|
|||
else:
|
||||
timestamp = valid_timestamp(req)
|
||||
pending_timeout = None
|
||||
container_policy_index = req.headers.get(POLICY_INDEX, 0)
|
||||
container_policy_index = \
|
||||
req.headers.get('X-Backend-Storage-Policy-Index', 0)
|
||||
if 'x-trans-id' in req.headers:
|
||||
pending_timeout = 3
|
||||
broker = self._get_account_broker(drive, part, account,
|
||||
|
|
|
@ -25,7 +25,7 @@ from swift.account.backend import AccountBroker, DATADIR as ABDATADIR
|
|||
from swift.container.backend import ContainerBroker, DATADIR as CBDATADIR
|
||||
from swift.obj.diskfile import get_data_dir, read_metadata, DATADIR_BASE, \
|
||||
extract_policy_index
|
||||
from swift.common.storage_policy import POLICIES, POLICY_INDEX
|
||||
from swift.common.storage_policy import POLICIES
|
||||
|
||||
|
||||
class InfoSystemExit(Exception):
|
||||
|
@ -101,14 +101,16 @@ def print_ring_locations(ring, datadir, account, container=None, obj=None,
|
|||
% (node['ip'], node['port'], node['device'], part,
|
||||
urllib.quote(target))
|
||||
if policy_index is not None:
|
||||
cmd += ' -H "%s: %s"' % (POLICY_INDEX, policy_index)
|
||||
cmd += ' -H "%s: %s"' % ('X-Backend-Storage-Policy-Index',
|
||||
policy_index)
|
||||
print cmd
|
||||
for node in handoff_nodes:
|
||||
cmd = 'curl -I -XHEAD "http://%s:%s/%s/%s/%s"' \
|
||||
% (node['ip'], node['port'], node['device'], part,
|
||||
urllib.quote(target))
|
||||
if policy_index is not None:
|
||||
cmd += ' -H "%s: %s"' % (POLICY_INDEX, policy_index)
|
||||
cmd += ' -H "%s: %s"' % ('X-Backend-Storage-Policy-Index',
|
||||
policy_index)
|
||||
cmd += ' # [Handoff]'
|
||||
print cmd
|
||||
|
||||
|
|
|
@ -558,12 +558,18 @@ class SwiftRecon(object):
|
|||
"""
|
||||
Generator that yields all values for given key in a recon cache entry.
|
||||
This is for use with object auditor recon cache entries. If the
|
||||
object auditor has run in 'once' mode with a subset of devices
|
||||
specified the checksum auditor section will have an entry of the form:
|
||||
{'object_auditor_stats_ALL': { 'disk1disk2diskN': {..}}
|
||||
The same is true of the ZBF auditor cache entry section. We use this
|
||||
generator to find all instances of a particular key in these multi-
|
||||
level dictionaries.
|
||||
object auditor has run in parallel, the recon cache will have entries
|
||||
of the form: {'object_auditor_stats_ALL': { 'disk1': {..},
|
||||
'disk2': {..},
|
||||
'disk3': {..},
|
||||
...}}
|
||||
If the object auditor hasn't run in parallel, the recon cache will have
|
||||
entries of the form: {'object_auditor_stats_ALL': {...}}.
|
||||
The ZBF auditor doesn't run in parallel. However, if a subset of
|
||||
devices is selected for auditing, the recon cache will have an entry
|
||||
of the form: {'object_auditor_stats_ZBF': { 'disk1disk2..diskN': {}}
|
||||
We use this generator to find all instances of a particular key in
|
||||
these multi-level dictionaries.
|
||||
"""
|
||||
for k, v in recon_entry.items():
|
||||
if isinstance(v, dict):
|
||||
|
@ -597,15 +603,15 @@ class SwiftRecon(object):
|
|||
zbf_scan[url] = response['object_auditor_stats_ZBF']
|
||||
if len(all_scan) > 0:
|
||||
stats = {}
|
||||
stats[atime] = [(self.nested_get_value(atime, all_scan[i]))
|
||||
stats[atime] = [sum(self.nested_get_value(atime, all_scan[i]))
|
||||
for i in all_scan]
|
||||
stats[bprocessed] = [(self.nested_get_value(bprocessed,
|
||||
stats[bprocessed] = [sum(self.nested_get_value(bprocessed,
|
||||
all_scan[i])) for i in all_scan]
|
||||
stats[passes] = [(self.nested_get_value(passes, all_scan[i]))
|
||||
stats[passes] = [sum(self.nested_get_value(passes, all_scan[i]))
|
||||
for i in all_scan]
|
||||
stats[errors] = [(self.nested_get_value(errors, all_scan[i]))
|
||||
stats[errors] = [sum(self.nested_get_value(errors, all_scan[i]))
|
||||
for i in all_scan]
|
||||
stats[quarantined] = [(self.nested_get_value(quarantined,
|
||||
stats[quarantined] = [sum(self.nested_get_value(quarantined,
|
||||
all_scan[i])) for i in all_scan]
|
||||
for k in stats:
|
||||
if None in stats[k]:
|
||||
|
@ -623,13 +629,13 @@ class SwiftRecon(object):
|
|||
print("[ALL_auditor] - No hosts returned valid data.")
|
||||
if len(zbf_scan) > 0:
|
||||
stats = {}
|
||||
stats[atime] = [(self.nested_get_value(atime, zbf_scan[i]))
|
||||
stats[atime] = [sum(self.nested_get_value(atime, zbf_scan[i]))
|
||||
for i in zbf_scan]
|
||||
stats[bprocessed] = [(self.nested_get_value(bprocessed,
|
||||
stats[bprocessed] = [sum(self.nested_get_value(bprocessed,
|
||||
zbf_scan[i])) for i in zbf_scan]
|
||||
stats[errors] = [(self.nested_get_value(errors, zbf_scan[i]))
|
||||
stats[errors] = [sum(self.nested_get_value(errors, zbf_scan[i]))
|
||||
for i in zbf_scan]
|
||||
stats[quarantined] = [(self.nested_get_value(quarantined,
|
||||
stats[quarantined] = [sum(self.nested_get_value(quarantined,
|
||||
zbf_scan[i])) for i in zbf_scan]
|
||||
for k in stats:
|
||||
if None in stats[k]:
|
||||
|
|
|
@ -728,14 +728,14 @@ class SimpleClient(object):
|
|||
max_backoff=5, retries=5):
|
||||
self.url = url
|
||||
self.token = token
|
||||
self.attempts = 0
|
||||
self.starting_backoff = starting_backoff
|
||||
self.max_backoff = max_backoff
|
||||
self.retries = retries
|
||||
|
||||
def base_request(self, method, container=None, name=None, prefix=None,
|
||||
headers=None, proxy=None, contents=None,
|
||||
full_listing=None, logger=None, additional_info=None):
|
||||
full_listing=None, logger=None, additional_info=None,
|
||||
timeout=None):
|
||||
# Common request method
|
||||
trans_start = time()
|
||||
url = self.url
|
||||
|
@ -756,16 +756,12 @@ class SimpleClient(object):
|
|||
if prefix:
|
||||
url += '&prefix=%s' % prefix
|
||||
|
||||
req = urllib2.Request(url, headers=headers, data=contents)
|
||||
if proxy:
|
||||
proxy = urlparse.urlparse(proxy)
|
||||
proxy = urllib2.ProxyHandler({proxy.scheme: proxy.netloc})
|
||||
opener = urllib2.build_opener(proxy)
|
||||
urllib2.install_opener(opener)
|
||||
|
||||
req = urllib2.Request(url, headers=headers, data=contents)
|
||||
req.set_proxy(proxy.netloc, proxy.scheme)
|
||||
req.get_method = lambda: method
|
||||
urllib2.urlopen(req)
|
||||
conn = urllib2.urlopen(req)
|
||||
conn = urllib2.urlopen(req, timeout=timeout)
|
||||
body = conn.read()
|
||||
try:
|
||||
body_data = json.loads(body)
|
||||
|
@ -799,14 +795,15 @@ class SimpleClient(object):
|
|||
return [None, body_data]
|
||||
|
||||
def retry_request(self, method, **kwargs):
|
||||
self.attempts = 0
|
||||
retries = kwargs.pop('retries', self.retries)
|
||||
attempts = 0
|
||||
backoff = self.starting_backoff
|
||||
while self.attempts <= self.retries:
|
||||
self.attempts += 1
|
||||
while attempts <= retries:
|
||||
attempts += 1
|
||||
try:
|
||||
return self.base_request(method, **kwargs)
|
||||
except (socket.error, httplib.HTTPException, urllib2.URLError):
|
||||
if self.attempts > self.retries:
|
||||
if attempts > retries:
|
||||
raise
|
||||
sleep(backoff)
|
||||
backoff = min(backoff * 2, self.max_backoff)
|
||||
|
|
|
@ -30,7 +30,6 @@ from swift.common.exceptions import ListingIterError, SegmentError
|
|||
from swift.common.http import is_success, HTTP_SERVICE_UNAVAILABLE
|
||||
from swift.common.swob import HTTPBadRequest, HTTPNotAcceptable
|
||||
from swift.common.utils import split_path, validate_device_partition
|
||||
from swift.common.storage_policy import POLICY_INDEX
|
||||
from swift.common.wsgi import make_subrequest
|
||||
|
||||
|
||||
|
@ -91,7 +90,7 @@ def get_name_and_placement(request, minsegs=1, maxsegs=None,
|
|||
storage_policy_index appended on the end
|
||||
:raises: HTTPBadRequest
|
||||
"""
|
||||
policy_idx = request.headers.get(POLICY_INDEX, '0')
|
||||
policy_idx = request.headers.get('X-Backend-Storage-Policy-Index', '0')
|
||||
policy_idx = int(policy_idx)
|
||||
results = split_and_validate_path(request, minsegs=minsegs,
|
||||
maxsegs=maxsegs,
|
||||
|
|
|
@ -15,6 +15,7 @@
|
|||
|
||||
import array
|
||||
import cPickle as pickle
|
||||
import inspect
|
||||
from collections import defaultdict
|
||||
from gzip import GzipFile
|
||||
from os.path import getmtime
|
||||
|
@ -112,10 +113,10 @@ class RingData(object):
|
|||
# This only works on Python 2.7; on 2.6, we always get the
|
||||
# current time in the gzip output.
|
||||
tempf = NamedTemporaryFile(dir=".", prefix=filename, delete=False)
|
||||
try:
|
||||
if 'mtime' in inspect.getargspec(GzipFile.__init__).args:
|
||||
gz_file = GzipFile(filename, mode='wb', fileobj=tempf,
|
||||
mtime=mtime)
|
||||
except TypeError:
|
||||
else:
|
||||
gz_file = GzipFile(filename, mode='wb', fileobj=tempf)
|
||||
self.serialize_v1(gz_file)
|
||||
gz_file.close()
|
||||
|
|
|
@ -18,8 +18,6 @@ import string
|
|||
from swift.common.utils import config_true_value, SWIFT_CONF_FILE
|
||||
from swift.common.ring import Ring
|
||||
|
||||
POLICY = 'X-Storage-Policy'
|
||||
POLICY_INDEX = 'X-Backend-Storage-Policy-Index'
|
||||
LEGACY_POLICY_NAME = 'Policy-0'
|
||||
VALID_CHARS = '-' + string.letters + string.digits
|
||||
|
||||
|
|
|
@ -64,7 +64,9 @@ utf8_encoder = codecs.getencoder('utf-8')
|
|||
|
||||
from swift import gettext_ as _
|
||||
import swift.common.exceptions
|
||||
from swift.common.http import is_success, is_redirection, HTTP_NOT_FOUND
|
||||
from swift.common.http import is_success, is_redirection, HTTP_NOT_FOUND, \
|
||||
HTTP_PRECONDITION_FAILED, HTTP_REQUESTED_RANGE_NOT_SATISFIABLE
|
||||
|
||||
|
||||
# logging doesn't import patched as cleanly as one would like
|
||||
from logging.handlers import SysLogHandler
|
||||
|
@ -989,6 +991,24 @@ class StatsdClient(object):
|
|||
sample_rate)
|
||||
|
||||
|
||||
def server_handled_successfully(status_int):
|
||||
"""
|
||||
True for successful responses *or* error codes that are not Swift's fault,
|
||||
False otherwise. For example, 500 is definitely the server's fault, but
|
||||
412 is an error code (4xx are all errors) that is due to a header the
|
||||
client sent.
|
||||
|
||||
If one is tracking error rates to monitor server health, one would be
|
||||
advised to use a function like this one, lest a client cause a flurry of
|
||||
404s or 416s and make a spurious spike in your errors graph.
|
||||
"""
|
||||
return (is_success(status_int) or
|
||||
is_redirection(status_int) or
|
||||
status_int == HTTP_NOT_FOUND or
|
||||
status_int == HTTP_PRECONDITION_FAILED or
|
||||
status_int == HTTP_REQUESTED_RANGE_NOT_SATISFIABLE)
|
||||
|
||||
|
||||
def timing_stats(**dec_kwargs):
|
||||
"""
|
||||
Returns a decorator that logs timing events or errors for public methods in
|
||||
|
@ -1001,9 +1021,7 @@ def timing_stats(**dec_kwargs):
|
|||
def _timing_stats(ctrl, *args, **kwargs):
|
||||
start_time = time.time()
|
||||
resp = func(ctrl, *args, **kwargs)
|
||||
if is_success(resp.status_int) or \
|
||||
is_redirection(resp.status_int) or \
|
||||
resp.status_int == HTTP_NOT_FOUND:
|
||||
if server_handled_successfully(resp.status_int):
|
||||
ctrl.logger.timing_since(method + '.timing',
|
||||
start_time, **dec_kwargs)
|
||||
else:
|
||||
|
@ -1896,11 +1914,16 @@ def audit_location_generator(devices, datadir, suffix='',
|
|||
for device in device_dir:
|
||||
if mount_check and not ismount(os.path.join(devices, device)):
|
||||
if logger:
|
||||
logger.debug(
|
||||
logger.warning(
|
||||
_('Skipping %s as it is not mounted'), device)
|
||||
continue
|
||||
datadir_path = os.path.join(devices, device, datadir)
|
||||
partitions = listdir(datadir_path)
|
||||
try:
|
||||
partitions = listdir(datadir_path)
|
||||
except OSError as e:
|
||||
if logger:
|
||||
logger.warning('Skipping %s because %s', datadir_path, e)
|
||||
continue
|
||||
for partition in partitions:
|
||||
part_path = os.path.join(datadir_path, partition)
|
||||
try:
|
||||
|
@ -2284,7 +2307,8 @@ def put_recon_cache_entry(cache_entry, key, item):
|
|||
"""
|
||||
Function that will check if item is a dict, and if so put it under
|
||||
cache_entry[key]. We use nested recon cache entries when the object
|
||||
auditor runs in 'once' mode with a specified subset of devices.
|
||||
auditor runs in parallel or else in 'once' mode with a specified
|
||||
subset of devices.
|
||||
"""
|
||||
if isinstance(item, dict):
|
||||
if key not in cache_entry or key in cache_entry and not \
|
||||
|
|
|
@ -25,7 +25,6 @@ from swift.common.direct_client import (
|
|||
direct_head_container, direct_delete_container_object,
|
||||
direct_put_container_object, ClientException)
|
||||
from swift.common.internal_client import InternalClient, UnexpectedResponse
|
||||
from swift.common.storage_policy import POLICY_INDEX
|
||||
from swift.common.utils import get_logger, split_path, quorum_size, \
|
||||
FileLikeIter, Timestamp, last_modified_date_to_timestamp, \
|
||||
LRUCache
|
||||
|
@ -100,7 +99,7 @@ def incorrect_policy_index(info, remote_info):
|
|||
def translate_container_headers_to_info(headers):
|
||||
default_timestamp = Timestamp(0).internal
|
||||
return {
|
||||
'storage_policy_index': int(headers[POLICY_INDEX]),
|
||||
'storage_policy_index': int(headers['X-Backend-Storage-Policy-Index']),
|
||||
'put_timestamp': headers.get('x-backend-put-timestamp',
|
||||
default_timestamp),
|
||||
'delete_timestamp': headers.get('x-backend-delete-timestamp',
|
||||
|
@ -373,7 +372,7 @@ class ContainerReconciler(Daemon):
|
|||
an object was manually re-enqued.
|
||||
"""
|
||||
q_path = '/%s/%s/%s' % (MISPLACED_OBJECTS_ACCOUNT, container, obj)
|
||||
x_timestamp = slightly_later_timestamp(q_record)
|
||||
x_timestamp = slightly_later_timestamp(max(q_record, q_ts))
|
||||
self.stats_log('pop_queue', 'remove %r (%f) from the queue (%s)',
|
||||
q_path, q_ts, x_timestamp)
|
||||
headers = {'X-Timestamp': x_timestamp}
|
||||
|
|
|
@ -37,7 +37,7 @@ from swift.common import constraints
|
|||
from swift.common.bufferedhttp import http_connect
|
||||
from swift.common.exceptions import ConnectionTimeout
|
||||
from swift.common.http import HTTP_NOT_FOUND, is_success
|
||||
from swift.common.storage_policy import POLICIES, POLICY_INDEX
|
||||
from swift.common.storage_policy import POLICIES
|
||||
from swift.common.swob import HTTPAccepted, HTTPBadRequest, HTTPConflict, \
|
||||
HTTPCreated, HTTPInternalServerError, HTTPNoContent, HTTPNotFound, \
|
||||
HTTPPreconditionFailed, HTTPMethodNotAllowed, Request, Response, \
|
||||
|
@ -57,7 +57,7 @@ def gen_resp_headers(info, is_deleted=False):
|
|||
info.get('delete_timestamp', 0)).internal,
|
||||
'X-Backend-Status-Changed-At': Timestamp(
|
||||
info.get('status_changed_at', 0)).internal,
|
||||
POLICY_INDEX: info.get('storage_policy_index', 0),
|
||||
'X-Backend-Storage-Policy-Index': info.get('storage_policy_index', 0),
|
||||
}
|
||||
if not is_deleted:
|
||||
# base container info on deleted containers is not exposed to client
|
||||
|
@ -137,7 +137,7 @@ class ContainerController(object):
|
|||
:raises: HTTPBadRequest if the supplied index is bogus
|
||||
"""
|
||||
|
||||
policy_index = req.headers.get(POLICY_INDEX, None)
|
||||
policy_index = req.headers.get('X-Backend-Storage-Policy-Index', None)
|
||||
if policy_index is None:
|
||||
return None
|
||||
|
||||
|
@ -202,7 +202,7 @@ class ContainerController(object):
|
|||
'x-object-count': info['object_count'],
|
||||
'x-bytes-used': info['bytes_used'],
|
||||
'x-trans-id': req.headers.get('x-trans-id', '-'),
|
||||
POLICY_INDEX: info['storage_policy_index'],
|
||||
'X-Backend-Storage-Policy-Index': info['storage_policy_index'],
|
||||
'user-agent': 'container-server %s' % os.getpid(),
|
||||
'referer': req.as_referer()})
|
||||
if req.headers.get('x-account-override-deleted', 'no').lower() == \
|
||||
|
|
|
@ -35,7 +35,7 @@ from swift.common.utils import (
|
|||
whataremyips, Timestamp)
|
||||
from swift.common.daemon import Daemon
|
||||
from swift.common.http import HTTP_UNAUTHORIZED, HTTP_NOT_FOUND
|
||||
from swift.common.storage_policy import POLICIES, POLICY_INDEX
|
||||
from swift.common.storage_policy import POLICIES
|
||||
|
||||
|
||||
class ContainerSync(Daemon):
|
||||
|
@ -376,7 +376,8 @@ class ContainerSync(Daemon):
|
|||
looking_for_timestamp = Timestamp(row['created_at'])
|
||||
timestamp = -1
|
||||
headers = body = None
|
||||
headers_out = {POLICY_INDEX: str(info['storage_policy_index'])}
|
||||
headers_out = {'X-Backend-Storage-Policy-Index':
|
||||
str(info['storage_policy_index'])}
|
||||
for node in nodes:
|
||||
try:
|
||||
these_headers, this_body = direct_get_object(
|
||||
|
|
|
@ -33,7 +33,6 @@ from swift.common.utils import get_logger, config_true_value, ismount, \
|
|||
dump_recon_cache, quorum_size, Timestamp
|
||||
from swift.common.daemon import Daemon
|
||||
from swift.common.http import is_success, HTTP_INTERNAL_SERVER_ERROR
|
||||
from swift.common.storage_policy import POLICY_INDEX
|
||||
|
||||
|
||||
class ContainerUpdater(Daemon):
|
||||
|
@ -278,7 +277,7 @@ class ContainerUpdater(Daemon):
|
|||
'X-Object-Count': count,
|
||||
'X-Bytes-Used': bytes,
|
||||
'X-Account-Override-Deleted': 'yes',
|
||||
POLICY_INDEX: storage_policy_index,
|
||||
'X-Backend-Storage-Policy-Index': storage_policy_index,
|
||||
'user-agent': self.user_agent}
|
||||
conn = http_connect(
|
||||
node['ip'], node['port'], node['device'], part,
|
||||
|
|
|
@ -17,6 +17,7 @@ import os
|
|||
import sys
|
||||
import time
|
||||
import signal
|
||||
from random import shuffle
|
||||
from swift import gettext_ as _
|
||||
from contextlib import closing
|
||||
from eventlet import Timeout
|
||||
|
@ -72,7 +73,10 @@ class AuditorWorker(object):
|
|||
description = ''
|
||||
if device_dirs:
|
||||
device_dir_str = ','.join(sorted(device_dirs))
|
||||
description = _(' - %s') % device_dir_str
|
||||
if self.auditor_type == 'ALL':
|
||||
description = _(' - parallel, %s') % device_dir_str
|
||||
else:
|
||||
description = _(' - %s') % device_dir_str
|
||||
self.logger.info(_('Begin object audit "%s" mode (%s%s)') %
|
||||
(mode, self.auditor_type, description))
|
||||
begin = reported = time.time()
|
||||
|
@ -223,6 +227,7 @@ class ObjectAuditor(Daemon):
|
|||
self.conf = conf
|
||||
self.logger = get_logger(conf, log_route='object-auditor')
|
||||
self.devices = conf.get('devices', '/srv/node')
|
||||
self.concurrency = int(conf.get('concurrency', 1))
|
||||
self.conf_zero_byte_fps = int(
|
||||
conf.get('zero_byte_files_per_second', 50))
|
||||
self.recon_cache_path = conf.get('recon_cache_path',
|
||||
|
@ -260,7 +265,7 @@ class ObjectAuditor(Daemon):
|
|||
sys.exit()
|
||||
|
||||
def audit_loop(self, parent, zbo_fps, override_devices=None, **kwargs):
|
||||
"""Audit loop"""
|
||||
"""Parallel audit loop"""
|
||||
self.clear_recon_cache('ALL')
|
||||
self.clear_recon_cache('ZBF')
|
||||
kwargs['device_dirs'] = override_devices
|
||||
|
@ -272,7 +277,34 @@ class ObjectAuditor(Daemon):
|
|||
if self.conf_zero_byte_fps:
|
||||
zbf_pid = self.fork_child(zero_byte_fps=True, **kwargs)
|
||||
pids.append(zbf_pid)
|
||||
pids.append(self.fork_child(**kwargs))
|
||||
if self.concurrency == 1:
|
||||
# Audit all devices in 1 process
|
||||
pids.append(self.fork_child(**kwargs))
|
||||
else:
|
||||
# Divide devices amongst parallel processes set by
|
||||
# self.concurrency. Total number of parallel processes
|
||||
# is self.concurrency + 1 if zero_byte_fps.
|
||||
parallel_proc = self.concurrency + 1 if \
|
||||
self.conf_zero_byte_fps else self.concurrency
|
||||
device_list = list(override_devices) if override_devices else \
|
||||
listdir(self.devices)
|
||||
shuffle(device_list)
|
||||
while device_list:
|
||||
pid = None
|
||||
if len(pids) == parallel_proc:
|
||||
pid = os.wait()[0]
|
||||
pids.remove(pid)
|
||||
# ZBF scanner must be restarted as soon as it finishes
|
||||
if self.conf_zero_byte_fps and pid == zbf_pid:
|
||||
kwargs['device_dirs'] = override_devices
|
||||
# sleep between ZBF scanner forks
|
||||
self._sleep()
|
||||
zbf_pid = self.fork_child(zero_byte_fps=True,
|
||||
**kwargs)
|
||||
pids.append(zbf_pid)
|
||||
else:
|
||||
kwargs['device_dirs'] = [device_list.pop()]
|
||||
pids.append(self.fork_child(**kwargs))
|
||||
while pids:
|
||||
pid = os.wait()[0]
|
||||
# ZBF scanner must be restarted as soon as it finishes
|
||||
|
|
|
@ -37,7 +37,7 @@ from swift.common.http import HTTP_OK, HTTP_INSUFFICIENT_STORAGE
|
|||
from swift.obj import ssync_sender
|
||||
from swift.obj.diskfile import (DiskFileManager, get_hashes, get_data_dir,
|
||||
get_tmp_dir)
|
||||
from swift.common.storage_policy import POLICY_INDEX, POLICIES
|
||||
from swift.common.storage_policy import POLICIES
|
||||
|
||||
|
||||
hubs.use_hub(get_hub())
|
||||
|
@ -228,7 +228,7 @@ class ObjectReplicator(Daemon):
|
|||
if len(suff) == 3 and isdir(join(path, suff))]
|
||||
self.replication_count += 1
|
||||
self.logger.increment('partition.delete.count.%s' % (job['device'],))
|
||||
self.headers[POLICY_INDEX] = job['policy_idx']
|
||||
self.headers['X-Backend-Storage-Policy-Index'] = job['policy_idx']
|
||||
begin = time.time()
|
||||
try:
|
||||
responses = []
|
||||
|
@ -270,7 +270,7 @@ class ObjectReplicator(Daemon):
|
|||
"""
|
||||
self.replication_count += 1
|
||||
self.logger.increment('partition.update.count.%s' % (job['device'],))
|
||||
self.headers[POLICY_INDEX] = job['policy_idx']
|
||||
self.headers['X-Backend-Storage-Policy-Index'] = job['policy_idx']
|
||||
begin = time.time()
|
||||
try:
|
||||
hashed, local_hash = tpool_reraise(
|
||||
|
|
|
@ -46,7 +46,6 @@ from swift.common.swob import HTTPAccepted, HTTPBadRequest, HTTPCreated, \
|
|||
HTTPInsufficientStorage, HTTPForbidden, HTTPException, HeaderKeyDict, \
|
||||
HTTPConflict
|
||||
from swift.obj.diskfile import DATAFILE_SYSTEM_META, DiskFileManager
|
||||
from swift.common.storage_policy import POLICY_INDEX
|
||||
|
||||
|
||||
class ObjectController(object):
|
||||
|
@ -113,7 +112,7 @@ class ObjectController(object):
|
|||
# network_chunk_size parameter value instead.
|
||||
socket._fileobject.default_bufsize = self.network_chunk_size
|
||||
|
||||
# Provide further setup sepecific to an object server implemenation.
|
||||
# Provide further setup specific to an object server implementation.
|
||||
self.setup(conf)
|
||||
|
||||
def setup(self, conf):
|
||||
|
@ -240,7 +239,7 @@ class ObjectController(object):
|
|||
|
||||
headers_out['x-trans-id'] = headers_in.get('x-trans-id', '-')
|
||||
headers_out['referer'] = request.as_referer()
|
||||
headers_out[POLICY_INDEX] = policy_idx
|
||||
headers_out['X-Backend-Storage-Policy-Index'] = policy_idx
|
||||
for conthost, contdevice in updates:
|
||||
self.async_update(op, account, container, obj, conthost,
|
||||
contpartition, contdevice, headers_out,
|
||||
|
@ -270,7 +269,8 @@ class ObjectController(object):
|
|||
hosts = contdevices = [None]
|
||||
headers_in = request.headers
|
||||
headers_out = HeaderKeyDict({
|
||||
POLICY_INDEX: 0, # system accounts are always Policy-0
|
||||
# system accounts are always Policy-0
|
||||
'X-Backend-Storage-Policy-Index': 0,
|
||||
'x-timestamp': request.timestamp.internal,
|
||||
'x-trans-id': headers_in.get('x-trans-id', '-'),
|
||||
'referer': request.as_referer()})
|
||||
|
|
|
@ -25,8 +25,6 @@ from swift.common import http
|
|||
from swift.common import swob
|
||||
from swift.common import utils
|
||||
|
||||
from swift.common.storage_policy import POLICY_INDEX
|
||||
|
||||
|
||||
class Receiver(object):
|
||||
"""
|
||||
|
@ -170,7 +168,8 @@ class Receiver(object):
|
|||
self.request.environ['eventlet.minimum_write_chunk_size'] = 0
|
||||
self.device, self.partition = utils.split_path(
|
||||
urllib.unquote(self.request.path), 2, 2, False)
|
||||
self.policy_idx = int(self.request.headers.get(POLICY_INDEX, 0))
|
||||
self.policy_idx = \
|
||||
int(self.request.headers.get('X-Backend-Storage-Policy-Index', 0))
|
||||
utils.validate_device_partition(self.device, self.partition)
|
||||
if self.app._diskfile_mgr.mount_check and \
|
||||
not constraints.check_mount(
|
||||
|
@ -354,7 +353,7 @@ class Receiver(object):
|
|||
subreq_iter())
|
||||
else:
|
||||
raise Exception('Invalid subrequest method %s' % method)
|
||||
subreq.headers[POLICY_INDEX] = self.policy_idx
|
||||
subreq.headers['X-Backend-Storage-Policy-Index'] = self.policy_idx
|
||||
subreq.headers['X-Backend-Replication'] = 'True'
|
||||
if replication_headers:
|
||||
subreq.headers['X-Backend-Replication-Headers'] = \
|
||||
|
|
|
@ -18,8 +18,6 @@ from swift.common import bufferedhttp
|
|||
from swift.common import exceptions
|
||||
from swift.common import http
|
||||
|
||||
from swift.common.storage_policy import POLICY_INDEX
|
||||
|
||||
|
||||
class Sender(object):
|
||||
"""
|
||||
|
@ -100,7 +98,8 @@ class Sender(object):
|
|||
self.connection.putrequest('REPLICATION', '/%s/%s' % (
|
||||
self.node['device'], self.job['partition']))
|
||||
self.connection.putheader('Transfer-Encoding', 'chunked')
|
||||
self.connection.putheader(POLICY_INDEX, self.policy_idx)
|
||||
self.connection.putheader('X-Backend-Storage-Policy-Index',
|
||||
self.policy_idx)
|
||||
self.connection.endheaders()
|
||||
with exceptions.MessageTimeout(
|
||||
self.daemon.node_timeout, 'connect receive'):
|
||||
|
|
|
@ -26,7 +26,6 @@ from eventlet import patcher, Timeout
|
|||
from swift.common.bufferedhttp import http_connect
|
||||
from swift.common.exceptions import ConnectionTimeout
|
||||
from swift.common.ring import Ring
|
||||
from swift.common.storage_policy import POLICY_INDEX
|
||||
from swift.common.utils import get_logger, renamer, write_pickle, \
|
||||
dump_recon_cache, config_true_value, ismount
|
||||
from swift.common.daemon import Daemon
|
||||
|
@ -222,7 +221,8 @@ class ObjectUpdater(Daemon):
|
|||
for node in nodes:
|
||||
if node['id'] not in successes:
|
||||
headers = update['headers'].copy()
|
||||
headers.setdefault(POLICY_INDEX, str(policy_idx))
|
||||
headers.setdefault('X-Backend-Storage-Policy-Index',
|
||||
str(policy_idx))
|
||||
status = self.object_update(node, part, update['op'], obj,
|
||||
headers)
|
||||
if not is_success(status) and status != HTTP_NOT_FOUND:
|
||||
|
|
|
@ -50,7 +50,7 @@ from swift.common.swob import Request, Response, HeaderKeyDict, Range, \
|
|||
HTTPException, HTTPRequestedRangeNotSatisfiable
|
||||
from swift.common.request_helpers import strip_sys_meta_prefix, \
|
||||
strip_user_meta_prefix, is_user_meta, is_sys_meta, is_sys_or_user_meta
|
||||
from swift.common.storage_policy import POLICY_INDEX, POLICY, POLICIES
|
||||
from swift.common.storage_policy import POLICIES
|
||||
|
||||
|
||||
def update_headers(response, headers):
|
||||
|
@ -162,7 +162,8 @@ def headers_to_container_info(headers, status_int=HTTP_OK):
|
|||
'object_count': headers.get('x-container-object-count'),
|
||||
'bytes': headers.get('x-container-bytes-used'),
|
||||
'versions': headers.get('x-versions-location'),
|
||||
'storage_policy': headers.get(POLICY_INDEX.lower(), '0'),
|
||||
'storage_policy': headers.get('X-Backend-Storage-Policy-Index'.lower(),
|
||||
'0'),
|
||||
'cors': {
|
||||
'allow_origin': meta.get('access-control-allow-origin'),
|
||||
'expose_headers': meta.get('access-control-expose-headers'),
|
||||
|
@ -295,6 +296,7 @@ def get_container_info(env, app, swift_source=None):
|
|||
swift_source=swift_source)
|
||||
if not info:
|
||||
info = headers_to_container_info({}, 0)
|
||||
info.setdefault('storage_policy', '0')
|
||||
return info
|
||||
|
||||
|
||||
|
@ -987,6 +989,7 @@ class Controller(object):
|
|||
else:
|
||||
info['partition'] = part
|
||||
info['nodes'] = nodes
|
||||
info.setdefault('storage_policy', '0')
|
||||
return info
|
||||
|
||||
def _make_request(self, nodes, part, method, path, headers, query,
|
||||
|
@ -1206,14 +1209,18 @@ class Controller(object):
|
|||
pass
|
||||
# if a backend policy index is present in resp headers, translate it
|
||||
# here with the friendly policy name
|
||||
if POLICY_INDEX in res.headers and is_success(res.status_int):
|
||||
policy = POLICIES.get_by_index(res.headers[POLICY_INDEX])
|
||||
if 'X-Backend-Storage-Policy-Index' in res.headers and \
|
||||
is_success(res.status_int):
|
||||
policy = \
|
||||
POLICIES.get_by_index(
|
||||
res.headers['X-Backend-Storage-Policy-Index'])
|
||||
if policy:
|
||||
res.headers[POLICY] = policy.name
|
||||
res.headers['X-Storage-Policy'] = policy.name
|
||||
else:
|
||||
self.app.logger.error(
|
||||
'Could not translate %s (%r) from %r to policy',
|
||||
POLICY_INDEX, res.headers[POLICY_INDEX], path)
|
||||
'X-Backend-Storage-Policy-Index',
|
||||
res.headers['X-Backend-Storage-Policy-Index'], path)
|
||||
return res
|
||||
|
||||
def is_origin_allowed(self, cors_info, origin):
|
||||
|
|
|
@ -23,7 +23,7 @@ from swift.common import constraints
|
|||
from swift.common.http import HTTP_ACCEPTED
|
||||
from swift.proxy.controllers.base import Controller, delay_denial, \
|
||||
cors_validation, clear_info_cache
|
||||
from swift.common.storage_policy import POLICIES, POLICY, POLICY_INDEX
|
||||
from swift.common.storage_policy import POLICIES
|
||||
from swift.common.swob import HTTPBadRequest, HTTPForbidden, \
|
||||
HTTPNotFound
|
||||
|
||||
|
@ -55,7 +55,7 @@ class ContainerController(Controller):
|
|||
|
||||
:param req: incoming request
|
||||
"""
|
||||
policy_name = req.headers.get(POLICY)
|
||||
policy_name = req.headers.get('X-Storage-Policy')
|
||||
if not policy_name:
|
||||
return
|
||||
policy = POLICIES.get_by_name(policy_name)
|
||||
|
@ -63,7 +63,7 @@ class ContainerController(Controller):
|
|||
raise HTTPBadRequest(request=req,
|
||||
content_type="text/plain",
|
||||
body=("Invalid %s '%s'"
|
||||
% (POLICY, policy_name)))
|
||||
% ('X-Storage-Policy', policy_name)))
|
||||
if policy.is_deprecated:
|
||||
body = 'Storage Policy %r is deprecated' % (policy.name)
|
||||
raise HTTPBadRequest(request=req, body=body)
|
||||
|
@ -214,7 +214,7 @@ class ContainerController(Controller):
|
|||
additional['X-Backend-Storage-Policy-Default'] = \
|
||||
int(POLICIES.default)
|
||||
else:
|
||||
additional[POLICY_INDEX] = str(policy_index)
|
||||
additional['X-Backend-Storage-Policy-Index'] = str(policy_index)
|
||||
headers = [self.generate_request_headers(req, transfer=True,
|
||||
additional=additional)
|
||||
for _junk in range(n_outgoing)]
|
||||
|
|
|
@ -56,7 +56,6 @@ from swift.common.swob import HTTPAccepted, HTTPBadRequest, HTTPNotFound, \
|
|||
HTTPPreconditionFailed, HTTPRequestEntityTooLarge, HTTPRequestTimeout, \
|
||||
HTTPServerError, HTTPServiceUnavailable, Request, \
|
||||
HTTPClientDisconnect, HTTPNotImplemented
|
||||
from swift.common.storage_policy import POLICY_INDEX
|
||||
from swift.common.request_helpers import is_user_meta
|
||||
|
||||
|
||||
|
@ -197,10 +196,10 @@ class ObjectController(Controller):
|
|||
self.account_name, self.container_name, req)
|
||||
req.acl = container_info['read_acl']
|
||||
# pass the policy index to storage nodes via req header
|
||||
policy_index = req.headers.get(POLICY_INDEX,
|
||||
policy_index = req.headers.get('X-Backend-Storage-Policy-Index',
|
||||
container_info['storage_policy'])
|
||||
obj_ring = self.app.get_object_ring(policy_index)
|
||||
req.headers[POLICY_INDEX] = policy_index
|
||||
req.headers['X-Backend-Storage-Policy-Index'] = policy_index
|
||||
if 'swift.authorize' in req.environ:
|
||||
aresp = req.environ['swift.authorize'](req)
|
||||
if aresp:
|
||||
|
@ -303,10 +302,10 @@ class ObjectController(Controller):
|
|||
else:
|
||||
delete_at_container = delete_at_part = delete_at_nodes = None
|
||||
# pass the policy index to storage nodes via req header
|
||||
policy_index = req.headers.get(POLICY_INDEX,
|
||||
policy_index = req.headers.get('X-Backend-Storage-Policy-Index',
|
||||
container_info['storage_policy'])
|
||||
obj_ring = self.app.get_object_ring(policy_index)
|
||||
req.headers[POLICY_INDEX] = policy_index
|
||||
req.headers['X-Backend-Storage-Policy-Index'] = policy_index
|
||||
partition, nodes = obj_ring.get_nodes(
|
||||
self.account_name, self.container_name, self.object_name)
|
||||
req.headers['X-Timestamp'] = Timestamp(time.time()).internal
|
||||
|
@ -458,11 +457,11 @@ class ObjectController(Controller):
|
|||
body='If-None-Match only supports *')
|
||||
container_info = self.container_info(
|
||||
self.account_name, self.container_name, req)
|
||||
policy_index = req.headers.get(POLICY_INDEX,
|
||||
policy_index = req.headers.get('X-Backend-Storage-Policy-Index',
|
||||
container_info['storage_policy'])
|
||||
obj_ring = self.app.get_object_ring(policy_index)
|
||||
# pass the policy index to storage nodes via req header
|
||||
req.headers[POLICY_INDEX] = policy_index
|
||||
req.headers['X-Backend-Storage-Policy-Index'] = policy_index
|
||||
container_partition = container_info['partition']
|
||||
containers = container_info['nodes']
|
||||
req.acl = container_info['write_acl']
|
||||
|
@ -500,7 +499,7 @@ class ObjectController(Controller):
|
|||
(object_versions and not
|
||||
req.environ.get('swift_versioned_copy')):
|
||||
# make sure proxy-server uses the right policy index
|
||||
_headers = {POLICY_INDEX: req.headers[POLICY_INDEX],
|
||||
_headers = {'X-Backend-Storage-Policy-Index': policy_index,
|
||||
'X-Newest': 'True'}
|
||||
hreq = Request.blank(req.path_info, headers=_headers,
|
||||
environ={'REQUEST_METHOD': 'HEAD'})
|
||||
|
@ -586,7 +585,7 @@ class ObjectController(Controller):
|
|||
src_container_name, src_obj_name)
|
||||
source_req = req.copy_get()
|
||||
# make sure the source request uses it's container_info
|
||||
source_req.headers.pop(POLICY_INDEX, None)
|
||||
source_req.headers.pop('X-Backend-Storage-Policy-Index', None)
|
||||
source_req.path_info = source_header
|
||||
source_req.headers['X-Newest'] = 'true'
|
||||
orig_obj_name = self.object_name
|
||||
|
@ -777,11 +776,11 @@ class ObjectController(Controller):
|
|||
container_info = self.container_info(
|
||||
self.account_name, self.container_name, req)
|
||||
# pass the policy index to storage nodes via req header
|
||||
policy_index = req.headers.get(POLICY_INDEX,
|
||||
policy_index = req.headers.get('X-Backend-Storage-Policy-Index',
|
||||
container_info['storage_policy'])
|
||||
obj_ring = self.app.get_object_ring(policy_index)
|
||||
# pass the policy index to storage nodes via req header
|
||||
req.headers[POLICY_INDEX] = policy_index
|
||||
req.headers['X-Backend-Storage-Policy-Index'] = policy_index
|
||||
container_partition = container_info['partition']
|
||||
containers = container_info['nodes']
|
||||
req.acl = container_info['write_acl']
|
||||
|
@ -838,7 +837,8 @@ class ObjectController(Controller):
|
|||
policy_idx = container_info['storage_policy']
|
||||
obj_ring = self.app.get_object_ring(policy_idx)
|
||||
# pass the policy index to storage nodes via req header
|
||||
new_del_req.headers[POLICY_INDEX] = policy_idx
|
||||
new_del_req.headers['X-Backend-Storage-Policy-Index'] = \
|
||||
policy_idx
|
||||
container_partition = container_info['partition']
|
||||
containers = container_info['nodes']
|
||||
new_del_req.acl = container_info['write_acl']
|
||||
|
|
|
@ -28,8 +28,6 @@ import uuid
|
|||
import eventlet
|
||||
from nose import SkipTest
|
||||
|
||||
from swift.common.storage_policy import POLICY
|
||||
|
||||
from test.functional import normalized_urls, load_constraint, cluster_info
|
||||
import test.functional as tf
|
||||
from test.functional.swift_test_client import Account, Connection, File, \
|
||||
|
@ -2120,13 +2118,13 @@ class TestCrossPolicyObjectVersioningEnv(object):
|
|||
|
||||
cls.versions_container = cls.account.container(prefix + "-versions")
|
||||
if not cls.versions_container.create(
|
||||
{POLICY: policy['name']}):
|
||||
{'X-Storage-Policy': policy['name']}):
|
||||
raise ResponseError(cls.conn.response)
|
||||
|
||||
cls.container = cls.account.container(prefix + "-objs")
|
||||
if not cls.container.create(
|
||||
hdrs={'X-Versions-Location': cls.versions_container.name,
|
||||
POLICY: version_policy['name']}):
|
||||
'X-Storage-Policy': version_policy['name']}):
|
||||
raise ResponseError(cls.conn.response)
|
||||
|
||||
container_info = cls.container.info()
|
||||
|
|
|
@ -27,7 +27,7 @@ from nose import SkipTest
|
|||
from swift.common.manager import Manager
|
||||
from swift.common.internal_client import InternalClient
|
||||
from swift.common import utils, direct_client, ring
|
||||
from swift.common.storage_policy import POLICIES, POLICY_INDEX
|
||||
from swift.common.storage_policy import POLICIES
|
||||
from swift.common.http import HTTP_NOT_FOUND
|
||||
from test.probe.common import reset_environment, get_to_final_state
|
||||
|
||||
|
@ -203,8 +203,9 @@ class TestContainerMergePolicyIndex(unittest.TestCase):
|
|||
metadata = direct_client.direct_head_container(
|
||||
node, container_part, self.account, self.container_name)
|
||||
head_responses.append((node, metadata))
|
||||
found_policy_indexes = set(metadata[POLICY_INDEX] for
|
||||
node, metadata in head_responses)
|
||||
found_policy_indexes = \
|
||||
set(metadata['X-Backend-Storage-Policy-Index'] for
|
||||
node, metadata in head_responses)
|
||||
self.assert_(len(found_policy_indexes) > 1,
|
||||
'primary nodes did not disagree about policy index %r' %
|
||||
head_responses)
|
||||
|
@ -218,7 +219,9 @@ class TestContainerMergePolicyIndex(unittest.TestCase):
|
|||
try:
|
||||
direct_client.direct_head_object(
|
||||
node, part, self.account, self.container_name,
|
||||
self.object_name, headers={POLICY_INDEX: policy_index})
|
||||
self.object_name,
|
||||
headers={'X-Backend-Storage-Policy-Index':
|
||||
policy_index})
|
||||
except direct_client.ClientException as err:
|
||||
continue
|
||||
orig_policy_index = policy_index
|
||||
|
@ -237,8 +240,9 @@ class TestContainerMergePolicyIndex(unittest.TestCase):
|
|||
metadata = direct_client.direct_head_container(
|
||||
node, container_part, self.account, self.container_name)
|
||||
head_responses.append((node, metadata))
|
||||
found_policy_indexes = set(metadata[POLICY_INDEX] for
|
||||
node, metadata in head_responses)
|
||||
found_policy_indexes = \
|
||||
set(metadata['X-Backend-Storage-Policy-Index'] for
|
||||
node, metadata in head_responses)
|
||||
self.assert_(len(found_policy_indexes) == 1,
|
||||
'primary nodes disagree about policy index %r' %
|
||||
head_responses)
|
||||
|
@ -253,7 +257,7 @@ class TestContainerMergePolicyIndex(unittest.TestCase):
|
|||
direct_client.direct_head_object(
|
||||
node, part, self.account, self.container_name,
|
||||
self.object_name, headers={
|
||||
POLICY_INDEX: orig_policy_index})
|
||||
'X-Backend-Storage-Policy-Index': orig_policy_index})
|
||||
except direct_client.ClientException as err:
|
||||
if err.http_status == HTTP_NOT_FOUND:
|
||||
continue
|
||||
|
@ -299,8 +303,9 @@ class TestContainerMergePolicyIndex(unittest.TestCase):
|
|||
metadata = direct_client.direct_head_container(
|
||||
node, container_part, self.account, self.container_name)
|
||||
head_responses.append((node, metadata))
|
||||
found_policy_indexes = set(metadata[POLICY_INDEX] for
|
||||
node, metadata in head_responses)
|
||||
found_policy_indexes = \
|
||||
set(metadata['X-Backend-Storage-Policy-Index'] for
|
||||
node, metadata in head_responses)
|
||||
self.assert_(len(found_policy_indexes) > 1,
|
||||
'primary nodes did not disagree about policy index %r' %
|
||||
head_responses)
|
||||
|
@ -314,7 +319,9 @@ class TestContainerMergePolicyIndex(unittest.TestCase):
|
|||
try:
|
||||
direct_client.direct_head_object(
|
||||
node, part, self.account, self.container_name,
|
||||
self.object_name, headers={POLICY_INDEX: policy_index})
|
||||
self.object_name,
|
||||
headers={'X-Backend-Storage-Policy-Index':
|
||||
policy_index})
|
||||
except direct_client.ClientException as err:
|
||||
if 'x-backend-timestamp' in err.http_headers:
|
||||
ts_policy_index = policy_index
|
||||
|
@ -338,11 +345,13 @@ class TestContainerMergePolicyIndex(unittest.TestCase):
|
|||
metadata = direct_client.direct_head_container(
|
||||
node, container_part, self.account, self.container_name)
|
||||
head_responses.append((node, metadata))
|
||||
new_found_policy_indexes = set(metadata[POLICY_INDEX] for node,
|
||||
metadata in head_responses)
|
||||
new_found_policy_indexes = \
|
||||
set(metadata['X-Backend-Storage-Policy-Index'] for node,
|
||||
metadata in head_responses)
|
||||
self.assert_(len(new_found_policy_indexes) == 1,
|
||||
'primary nodes disagree about policy index %r' %
|
||||
dict((node['port'], metadata[POLICY_INDEX])
|
||||
dict((node['port'],
|
||||
metadata['X-Backend-Storage-Policy-Index'])
|
||||
for node, metadata in head_responses))
|
||||
expected_policy_index = new_found_policy_indexes.pop()
|
||||
self.assertEqual(orig_policy_index, expected_policy_index)
|
||||
|
@ -355,7 +364,9 @@ class TestContainerMergePolicyIndex(unittest.TestCase):
|
|||
try:
|
||||
direct_client.direct_head_object(
|
||||
node, part, self.account, self.container_name,
|
||||
self.object_name, headers={POLICY_INDEX: policy_index})
|
||||
self.object_name,
|
||||
headers={'X-Backend-Storage-Policy-Index':
|
||||
policy_index})
|
||||
except direct_client.ClientException as err:
|
||||
if err.http_status == HTTP_NOT_FOUND:
|
||||
continue
|
||||
|
@ -430,7 +441,7 @@ class TestContainerMergePolicyIndex(unittest.TestCase):
|
|||
'x-container-device': ','.join(n['device'] for n in
|
||||
self.container_ring.devs),
|
||||
'x-container-partition': container_part,
|
||||
POLICY_INDEX: wrong_policy.idx,
|
||||
'X-Backend-Storage-Policy-Index': wrong_policy.idx,
|
||||
'X-Static-Large-Object': 'True',
|
||||
}
|
||||
for node in nodes:
|
||||
|
@ -575,6 +586,13 @@ class TestContainerMergePolicyIndex(unittest.TestCase):
|
|||
acceptable_statuses=(4,),
|
||||
headers={'X-Backend-Storage-Policy-Index': int(old_policy)})
|
||||
|
||||
# make sure the queue is settled
|
||||
get_to_final_state()
|
||||
for container in client.iter_containers('.misplaced_objects'):
|
||||
for obj in client.iter_objects('.misplaced_objects',
|
||||
container['name']):
|
||||
self.fail('Found unexpected object %r in the queue' % obj)
|
||||
|
||||
|
||||
def main():
|
||||
options, commands = parser.parse_args()
|
||||
|
|
|
@ -24,7 +24,6 @@ from uuid import uuid4
|
|||
from swiftclient import client
|
||||
|
||||
from swift.common import direct_client
|
||||
from swift.common.storage_policy import POLICY_INDEX
|
||||
from swift.obj.diskfile import get_data_dir
|
||||
from swift.common.exceptions import ClientException
|
||||
from test.probe.common import kill_server, kill_servers, reset_environment,\
|
||||
|
@ -102,7 +101,7 @@ class TestEmptyDevice(TestCase):
|
|||
another_onode = self.object_ring.get_more_nodes(opart).next()
|
||||
odata = direct_client.direct_get_object(
|
||||
another_onode, opart, self.account, container, obj,
|
||||
headers={POLICY_INDEX: self.policy.idx})[-1]
|
||||
headers={'X-Backend-Storage-Policy-Index': self.policy.idx})[-1]
|
||||
if odata != 'VERIFY':
|
||||
raise Exception('Direct object GET did not return VERIFY, instead '
|
||||
'it returned: %s' % repr(odata))
|
||||
|
@ -134,7 +133,7 @@ class TestEmptyDevice(TestCase):
|
|||
try:
|
||||
direct_client.direct_get_object(
|
||||
onode, opart, self.account, container, obj, headers={
|
||||
POLICY_INDEX: self.policy.idx})
|
||||
'X-Backend-Storage-Policy-Index': self.policy.idx})
|
||||
except ClientException as err:
|
||||
exc = err
|
||||
self.assertEquals(exc.http_status, 404)
|
||||
|
@ -157,7 +156,7 @@ class TestEmptyDevice(TestCase):
|
|||
|
||||
odata = direct_client.direct_get_object(
|
||||
onode, opart, self.account, container, obj, headers={
|
||||
POLICY_INDEX: self.policy.idx})[-1]
|
||||
'X-Backend-Storage-Policy-Index': self.policy.idx})[-1]
|
||||
if odata != 'VERIFY':
|
||||
raise Exception('Direct object GET did not return VERIFY, instead '
|
||||
'it returned: %s' % repr(odata))
|
||||
|
@ -165,7 +164,7 @@ class TestEmptyDevice(TestCase):
|
|||
try:
|
||||
direct_client.direct_get_object(
|
||||
another_onode, opart, self.account, container, obj, headers={
|
||||
POLICY_INDEX: self.policy.idx})
|
||||
'X-Backend-Storage-Policy-Index': self.policy.idx})
|
||||
except ClientException as err:
|
||||
exc = err
|
||||
self.assertEquals(exc.http_status, 404)
|
||||
|
|
|
@ -23,7 +23,6 @@ from uuid import uuid4
|
|||
from swiftclient import client
|
||||
|
||||
from swift.common import direct_client
|
||||
from swift.common.storage_policy import POLICY_INDEX
|
||||
from swift.common.exceptions import ClientException
|
||||
from swift.common.utils import hash_path, readconf
|
||||
from swift.obj.diskfile import write_metadata, read_metadata, get_data_dir
|
||||
|
@ -90,12 +89,12 @@ class TestObjectFailures(TestCase):
|
|||
|
||||
odata = direct_client.direct_get_object(
|
||||
onode, opart, self.account, container, obj, headers={
|
||||
POLICY_INDEX: self.policy.idx})[-1]
|
||||
'X-Backend-Storage-Policy-Index': self.policy.idx})[-1]
|
||||
self.assertEquals(odata, 'VERIFY')
|
||||
try:
|
||||
direct_client.direct_get_object(
|
||||
onode, opart, self.account, container, obj, headers={
|
||||
POLICY_INDEX: self.policy.idx})
|
||||
'X-Backend-Storage-Policy-Index': self.policy.idx})
|
||||
raise Exception("Did not quarantine object")
|
||||
except ClientException as err:
|
||||
self.assertEquals(err.http_status, 404)
|
||||
|
@ -109,7 +108,7 @@ class TestObjectFailures(TestCase):
|
|||
metadata = read_metadata(data_file)
|
||||
metadata['ETag'] = 'badetag'
|
||||
write_metadata(data_file, metadata)
|
||||
base_headers = {POLICY_INDEX: self.policy.idx}
|
||||
base_headers = {'X-Backend-Storage-Policy-Index': self.policy.idx}
|
||||
for header, result in [({'Range': 'bytes=0-2'}, 'RAN'),
|
||||
({'Range': 'bytes=1-11'}, 'ANGE'),
|
||||
({'Range': 'bytes=0-11'}, 'RANGE')]:
|
||||
|
@ -123,7 +122,7 @@ class TestObjectFailures(TestCase):
|
|||
try:
|
||||
direct_client.direct_get_object(
|
||||
onode, opart, self.account, container, obj, headers={
|
||||
POLICY_INDEX: self.policy.idx})
|
||||
'X-Backend-Storage-Policy-Index': self.policy.idx})
|
||||
raise Exception("Did not quarantine object")
|
||||
except ClientException as err:
|
||||
self.assertEquals(err.http_status, 404)
|
||||
|
@ -140,7 +139,8 @@ class TestObjectFailures(TestCase):
|
|||
try:
|
||||
direct_client.direct_get_object(
|
||||
onode, opart, self.account, container, obj, conn_timeout=1,
|
||||
response_timeout=1, headers={POLICY_INDEX: self.policy.idx})
|
||||
response_timeout=1, headers={'X-Backend-Storage-Policy-Index':
|
||||
self.policy.idx})
|
||||
raise Exception("Did not quarantine object")
|
||||
except ClientException as err:
|
||||
self.assertEquals(err.http_status, 404)
|
||||
|
@ -157,7 +157,8 @@ class TestObjectFailures(TestCase):
|
|||
try:
|
||||
direct_client.direct_head_object(
|
||||
onode, opart, self.account, container, obj, conn_timeout=1,
|
||||
response_timeout=1, headers={POLICY_INDEX: self.policy.idx})
|
||||
response_timeout=1, headers={'X-Backend-Storage-Policy-Index':
|
||||
self.policy.idx})
|
||||
raise Exception("Did not quarantine object")
|
||||
except ClientException as err:
|
||||
self.assertEquals(err.http_status, 404)
|
||||
|
@ -173,7 +174,7 @@ class TestObjectFailures(TestCase):
|
|||
write_metadata(fpointer, metadata)
|
||||
try:
|
||||
headers = {'X-Object-Meta-1': 'One', 'X-Object-Meta-Two': 'Two',
|
||||
POLICY_INDEX: self.policy.idx}
|
||||
'X-Backend-Storage-Policy-Index': self.policy.idx}
|
||||
direct_client.direct_post_object(
|
||||
onode, opart, self.account,
|
||||
container, obj,
|
||||
|
|
|
@ -20,7 +20,6 @@ from uuid import uuid4
|
|||
from swiftclient import client
|
||||
|
||||
from swift.common import direct_client
|
||||
from swift.common.storage_policy import POLICY_INDEX
|
||||
from swift.common.exceptions import ClientException
|
||||
from swift.common.manager import Manager
|
||||
from test.probe.common import kill_server, kill_servers, reset_environment, \
|
||||
|
@ -92,7 +91,7 @@ class TestObjectHandoff(TestCase):
|
|||
another_onode = self.object_ring.get_more_nodes(opart).next()
|
||||
odata = direct_client.direct_get_object(
|
||||
another_onode, opart, self.account, container, obj, headers={
|
||||
POLICY_INDEX: self.policy.idx})[-1]
|
||||
'X-Backend-Storage-Policy-Index': self.policy.idx})[-1]
|
||||
if odata != 'VERIFY':
|
||||
raise Exception('Direct object GET did not return VERIFY, instead '
|
||||
'it returned: %s' % repr(odata))
|
||||
|
@ -113,7 +112,7 @@ class TestObjectHandoff(TestCase):
|
|||
try:
|
||||
direct_client.direct_get_object(
|
||||
onode, opart, self.account, container, obj, headers={
|
||||
POLICY_INDEX: self.policy.idx})
|
||||
'X-Backend-Storage-Policy-Index': self.policy.idx})
|
||||
except ClientException as err:
|
||||
exc = err
|
||||
self.assertEquals(exc.http_status, 404)
|
||||
|
@ -133,7 +132,7 @@ class TestObjectHandoff(TestCase):
|
|||
Manager(['object-replicator']).once(number=another_num)
|
||||
odata = direct_client.direct_get_object(
|
||||
onode, opart, self.account, container, obj, headers={
|
||||
POLICY_INDEX: self.policy.idx})[-1]
|
||||
'X-Backend-Storage-Policy-Index': self.policy.idx})[-1]
|
||||
if odata != 'VERIFY':
|
||||
raise Exception('Direct object GET did not return VERIFY, instead '
|
||||
'it returned: %s' % repr(odata))
|
||||
|
@ -141,7 +140,7 @@ class TestObjectHandoff(TestCase):
|
|||
try:
|
||||
direct_client.direct_get_object(
|
||||
another_onode, opart, self.account, container, obj, headers={
|
||||
POLICY_INDEX: self.policy.idx})
|
||||
'X-Backend-Storage-Policy-Index': self.policy.idx})
|
||||
except ClientException as err:
|
||||
exc = err
|
||||
self.assertEquals(exc.http_status, 404)
|
||||
|
@ -177,7 +176,7 @@ class TestObjectHandoff(TestCase):
|
|||
start_server(onode['port'], self.port2server, self.pids)
|
||||
direct_client.direct_get_object(
|
||||
onode, opart, self.account, container, obj, headers={
|
||||
POLICY_INDEX: self.policy.idx})
|
||||
'X-Backend-Storage-Policy-Index': self.policy.idx})
|
||||
# Run the extra server last so it'll remove its extra partition
|
||||
for node in onodes:
|
||||
try:
|
||||
|
@ -192,7 +191,7 @@ class TestObjectHandoff(TestCase):
|
|||
try:
|
||||
direct_client.direct_get_object(
|
||||
another_onode, opart, self.account, container, obj, headers={
|
||||
POLICY_INDEX: self.policy.idx})
|
||||
'X-Backend-Storage-Policy-Index': self.policy.idx})
|
||||
except ClientException as err:
|
||||
exc = err
|
||||
self.assertEquals(exc.http_status, 404)
|
||||
|
|
|
@ -41,16 +41,18 @@ import cPickle as pickle
|
|||
from gzip import GzipFile
|
||||
import mock as mocklib
|
||||
|
||||
DEFAULT_PATCH_POLICIES = [storage_policy.StoragePolicy(0, 'nulo', True),
|
||||
storage_policy.StoragePolicy(1, 'unu')]
|
||||
LEGACY_PATCH_POLICIES = [storage_policy.StoragePolicy(0, 'legacy', True)]
|
||||
|
||||
|
||||
def patch_policies(thing_or_policies=None, legacy_only=False):
|
||||
if legacy_only:
|
||||
default_policies = LEGACY_PATCH_POLICIES
|
||||
default_policies = [storage_policy.StoragePolicy(
|
||||
0, 'legacy', True, object_ring=FakeRing())]
|
||||
else:
|
||||
default_policies = DEFAULT_PATCH_POLICIES
|
||||
default_policies = [
|
||||
storage_policy.StoragePolicy(
|
||||
0, 'nulo', True, object_ring=FakeRing()),
|
||||
storage_policy.StoragePolicy(
|
||||
1, 'unu', object_ring=FakeRing()),
|
||||
]
|
||||
|
||||
thing_or_policies = thing_or_policies or default_policies
|
||||
|
||||
|
@ -138,6 +140,7 @@ class FakeRing(Ring):
|
|||
self._reload()
|
||||
|
||||
def get_part(self, *args, **kwargs):
|
||||
# always call the real method, even if the fake ignores the result
|
||||
real_part = super(FakeRing, self).get_part(*args, **kwargs)
|
||||
if self._part_shift == 32:
|
||||
return 1
|
||||
|
@ -604,12 +607,27 @@ def fake_http_connect(*code_iter, **kwargs):
|
|||
class FakeConn(object):
|
||||
|
||||
def __init__(self, status, etag=None, body='', timestamp='1',
|
||||
expect_status=None, headers=None):
|
||||
self.status = status
|
||||
if expect_status is None:
|
||||
self.expect_status = self.status
|
||||
headers=None):
|
||||
# connect exception
|
||||
if isinstance(status, (Exception, Timeout)):
|
||||
raise status
|
||||
if isinstance(status, tuple):
|
||||
self.expect_status, self.status = status
|
||||
else:
|
||||
self.expect_status = expect_status
|
||||
self.expect_status, self.status = (None, status)
|
||||
if not self.expect_status:
|
||||
# when a swift backend service returns a status before reading
|
||||
# from the body (mostly an error response) eventlet.wsgi will
|
||||
# respond with that status line immediately instead of 100
|
||||
# Continue, even if the client sent the Expect 100 header.
|
||||
# BufferedHttp and the proxy both see these error statuses
|
||||
# when they call getexpect, so our FakeConn tries to act like
|
||||
# our backend services and return certain types of responses
|
||||
# as expect statuses just like a real backend server would do.
|
||||
if self.status in (507, 412, 409):
|
||||
self.expect_status = status
|
||||
else:
|
||||
self.expect_status = 100
|
||||
self.reason = 'Fake'
|
||||
self.host = '1.2.3.4'
|
||||
self.port = '1234'
|
||||
|
@ -626,9 +644,11 @@ def fake_http_connect(*code_iter, **kwargs):
|
|||
self._next_sleep = None
|
||||
|
||||
def getresponse(self):
|
||||
if isinstance(self.status, (Exception, Timeout)):
|
||||
raise self.status
|
||||
exc = kwargs.get('raise_exc')
|
||||
if exc:
|
||||
if isinstance(exc, Exception):
|
||||
if isinstance(exc, (Exception, Timeout)):
|
||||
raise exc
|
||||
raise Exception('test')
|
||||
if kwargs.get('raise_timeout_exc'):
|
||||
|
@ -636,15 +656,9 @@ def fake_http_connect(*code_iter, **kwargs):
|
|||
return self
|
||||
|
||||
def getexpect(self):
|
||||
if self.expect_status == -2:
|
||||
raise HTTPException()
|
||||
if self.expect_status == -3:
|
||||
return FakeConn(507)
|
||||
if self.expect_status == -4:
|
||||
return FakeConn(201)
|
||||
if self.expect_status == 412:
|
||||
return FakeConn(412)
|
||||
return FakeConn(100)
|
||||
if isinstance(self.expect_status, (Exception, Timeout)):
|
||||
raise self.expect_status
|
||||
return FakeConn(self.expect_status)
|
||||
|
||||
def getheaders(self):
|
||||
etag = self.etag
|
||||
|
@ -737,10 +751,6 @@ def fake_http_connect(*code_iter, **kwargs):
|
|||
if 'give_connect' in kwargs:
|
||||
kwargs['give_connect'](*args, **ckwargs)
|
||||
status = code_iter.next()
|
||||
if isinstance(status, tuple):
|
||||
status, expect_status = status
|
||||
else:
|
||||
expect_status = status
|
||||
etag = etag_iter.next()
|
||||
headers = headers_iter.next()
|
||||
timestamp = timestamps_iter.next()
|
||||
|
@ -752,7 +762,7 @@ def fake_http_connect(*code_iter, **kwargs):
|
|||
else:
|
||||
body = body_iter.next()
|
||||
return FakeConn(status, etag, body=body, timestamp=timestamp,
|
||||
expect_status=expect_status, headers=headers)
|
||||
headers=headers)
|
||||
|
||||
connect.code_iter = code_iter
|
||||
|
||||
|
|
|
@ -30,7 +30,7 @@ from swift.common.exceptions import ClientException
|
|||
from swift.common.utils import normalize_timestamp
|
||||
|
||||
from test import unit
|
||||
from swift.common.storage_policy import StoragePolicy, POLICIES, POLICY_INDEX
|
||||
from swift.common.storage_policy import StoragePolicy, POLICIES
|
||||
|
||||
|
||||
class FakeLogger(object):
|
||||
|
@ -291,7 +291,7 @@ class TestReaper(unittest.TestCase):
|
|||
'X-Container-Host': host,
|
||||
'X-Container-Partition': 'partition',
|
||||
'X-Container-Device': device,
|
||||
POLICY_INDEX: policy.idx
|
||||
'X-Backend-Storage-Policy-Index': policy.idx
|
||||
}
|
||||
ring = r.get_object_ring(policy.idx)
|
||||
expected = call(ring.devs[i], 1, 'a', 'c', 'o',
|
||||
|
@ -322,7 +322,7 @@ class TestReaper(unittest.TestCase):
|
|||
direct_get_container=DEFAULT,
|
||||
direct_delete_object=DEFAULT,
|
||||
direct_delete_container=DEFAULT) as mocks:
|
||||
headers = {POLICY_INDEX: policy.idx}
|
||||
headers = {'X-Backend-Storage-Policy-Index': policy.idx}
|
||||
obj_listing = [{'name': 'o'}]
|
||||
|
||||
def fake_get_container(*args, **kwargs):
|
||||
|
@ -340,7 +340,8 @@ class TestReaper(unittest.TestCase):
|
|||
self.assertEqual(3, len(mock_calls))
|
||||
for call_args in mock_calls:
|
||||
_args, kwargs = call_args
|
||||
self.assertEqual(kwargs['headers'][POLICY_INDEX],
|
||||
self.assertEqual(kwargs['headers']
|
||||
['X-Backend-Storage-Policy-Index'],
|
||||
policy.idx)
|
||||
|
||||
self.assertEquals(mocks['direct_delete_container'].call_count, 3)
|
||||
|
@ -542,7 +543,7 @@ class TestReaper(unittest.TestCase):
|
|||
with patch('swift.account.reaper.random.random', fake_random):
|
||||
try:
|
||||
r.run_forever()
|
||||
except Exception, err:
|
||||
except Exception as err:
|
||||
pass
|
||||
self.assertEqual(self.val, 1)
|
||||
self.assertEqual(str(err), 'exit')
|
||||
|
|
|
@ -34,7 +34,7 @@ from swift.account.server import AccountController
|
|||
from swift.common.utils import normalize_timestamp, replication, public
|
||||
from swift.common.request_helpers import get_sys_meta_prefix
|
||||
from test.unit import patch_policies
|
||||
from swift.common.storage_policy import StoragePolicy, POLICIES, POLICY_INDEX
|
||||
from swift.common.storage_policy import StoragePolicy, POLICIES
|
||||
|
||||
|
||||
@patch_policies
|
||||
|
@ -1723,7 +1723,7 @@ class TestAccountController(unittest.TestCase):
|
|||
'X-Delete-Timestamp': '0',
|
||||
'X-Object-Count': '2',
|
||||
'X-Bytes-Used': '4',
|
||||
POLICY_INDEX: policy.idx,
|
||||
'X-Backend-Storage-Policy-Index': policy.idx,
|
||||
})
|
||||
resp = req.get_response(self.controller)
|
||||
self.assertEqual(resp.status_int, 201)
|
||||
|
@ -1780,7 +1780,7 @@ class TestAccountController(unittest.TestCase):
|
|||
'X-Delete-Timestamp': '0',
|
||||
'X-Object-Count': '2',
|
||||
'X-Bytes-Used': '4',
|
||||
POLICY_INDEX: policy.idx,
|
||||
'X-Backend-Storage-Policy-Index': policy.idx,
|
||||
})
|
||||
resp = req.get_response(self.controller)
|
||||
self.assertEqual(resp.status_int, 201)
|
||||
|
@ -1812,7 +1812,7 @@ class TestAccountController(unittest.TestCase):
|
|||
'X-Delete-Timestamp': '0',
|
||||
'X-Object-Count': count,
|
||||
'X-Bytes-Used': count,
|
||||
POLICY_INDEX: policy.idx,
|
||||
'X-Backend-Storage-Policy-Index': policy.idx,
|
||||
})
|
||||
resp = req.get_response(self.controller)
|
||||
self.assertEqual(resp.status_int, 201)
|
||||
|
|
|
@ -79,7 +79,7 @@ class TestCliInfoBase(unittest.TestCase):
|
|||
def assertRaisesMessage(self, exc, msg, func, *args, **kwargs):
|
||||
try:
|
||||
func(*args, **kwargs)
|
||||
except Exception, e:
|
||||
except Exception as e:
|
||||
self.assertTrue(msg in str(e),
|
||||
"Expected %r in %r" % (msg, str(e)))
|
||||
self.assertTrue(isinstance(e, exc),
|
||||
|
|
|
@ -258,3 +258,48 @@ class TestReconCommands(unittest.TestCase):
|
|||
self.assertTrue("1/2 hosts matched" in output)
|
||||
self.assertTrue("http://10.2.2.2:10000/recon/swiftconfmd5 (bogus) "
|
||||
"doesn't match on disk md5sum" in output)
|
||||
|
||||
def test_object_auditor_check(self):
|
||||
# Recon middleware response from an object server
|
||||
def dummy_request(*args, **kwargs):
|
||||
values = {
|
||||
'passes': 0, 'errors': 0, 'audit_time': 0,
|
||||
'start_time': 0, 'quarantined': 0, 'bytes_processed': 0}
|
||||
|
||||
return [('http://127.0.0.1:6010/recon/auditor/object', {
|
||||
'object_auditor_stats_ALL': values,
|
||||
'object_auditor_stats_ZBF': values,
|
||||
}, 200)]
|
||||
|
||||
response = {}
|
||||
|
||||
def catch_print(computed):
|
||||
response[computed.get('name')] = computed
|
||||
|
||||
cli = recon.SwiftRecon()
|
||||
cli.pool.imap = dummy_request
|
||||
cli._print_stats = catch_print
|
||||
|
||||
cli.object_auditor_check([('127.0.0.1', 6010)])
|
||||
|
||||
# Now check that output contains all keys and names
|
||||
keys = ['average', 'number_none', 'high',
|
||||
'reported', 'low', 'total', 'perc_none']
|
||||
|
||||
names = [
|
||||
'ALL_audit_time_last_path',
|
||||
'ALL_quarantined_last_path',
|
||||
'ALL_errors_last_path',
|
||||
'ALL_passes_last_path',
|
||||
'ALL_bytes_processed_last_path',
|
||||
'ZBF_audit_time_last_path',
|
||||
'ZBF_quarantined_last_path',
|
||||
'ZBF_errors_last_path',
|
||||
'ZBF_bytes_processed_last_path'
|
||||
]
|
||||
|
||||
for name in names:
|
||||
computed = response.get(name)
|
||||
self.assertTrue(computed)
|
||||
for key in keys:
|
||||
self.assertTrue(key in computed)
|
||||
|
|
|
@ -662,15 +662,23 @@ class TestReconSuccess(TestCase):
|
|||
"files_processed": 2310,
|
||||
"quarantined": 0}})
|
||||
|
||||
def test_get_auditor_info_object_once(self):
|
||||
def test_get_auditor_info_object_parallel_once(self):
|
||||
from_cache_response = {
|
||||
"object_auditor_stats_ALL": {'disk1disk2': {
|
||||
"audit_time": 115.14418768882751,
|
||||
"bytes_processed": 234660,
|
||||
"completed": 115.4512460231781,
|
||||
"errors": 0,
|
||||
"files_processed": 2310,
|
||||
"quarantined": 0}},
|
||||
"object_auditor_stats_ALL": {
|
||||
'disk1': {
|
||||
"audit_time": 115.14418768882751,
|
||||
"bytes_processed": 234660,
|
||||
"completed": 115.4512460231781,
|
||||
"errors": 0,
|
||||
"files_processed": 2310,
|
||||
"quarantined": 0},
|
||||
'disk2': {
|
||||
"audit_time": 115,
|
||||
"bytes_processed": 234660,
|
||||
"completed": 115,
|
||||
"errors": 0,
|
||||
"files_processed": 2310,
|
||||
"quarantined": 0}},
|
||||
"object_auditor_stats_ZBF": {'disk1disk2': {
|
||||
"audit_time": 45.877294063568115,
|
||||
"bytes_processed": 0,
|
||||
|
@ -686,13 +694,21 @@ class TestReconSuccess(TestCase):
|
|||
'object_auditor_stats_ZBF'],
|
||||
'/var/cache/swift/object.recon'), {})])
|
||||
self.assertEquals(rv, {
|
||||
"object_auditor_stats_ALL": {'disk1disk2': {
|
||||
"audit_time": 115.14418768882751,
|
||||
"bytes_processed": 234660,
|
||||
"completed": 115.4512460231781,
|
||||
"errors": 0,
|
||||
"files_processed": 2310,
|
||||
"quarantined": 0}},
|
||||
"object_auditor_stats_ALL": {
|
||||
'disk1': {
|
||||
"audit_time": 115.14418768882751,
|
||||
"bytes_processed": 234660,
|
||||
"completed": 115.4512460231781,
|
||||
"errors": 0,
|
||||
"files_processed": 2310,
|
||||
"quarantined": 0},
|
||||
'disk2': {
|
||||
"audit_time": 115,
|
||||
"bytes_processed": 234660,
|
||||
"completed": 115,
|
||||
"errors": 0,
|
||||
"files_processed": 2310,
|
||||
"quarantined": 0}},
|
||||
"object_auditor_stats_ZBF": {'disk1disk2': {
|
||||
"audit_time": 45.877294063568115,
|
||||
"bytes_processed": 0,
|
||||
|
|
|
@ -821,8 +821,8 @@ class TestSloGetManifest(SloTestCase):
|
|||
headers)
|
||||
try:
|
||||
resp_data = json.loads(body)
|
||||
except json.JSONDecodeError:
|
||||
resp_data = None
|
||||
except ValueError:
|
||||
self.fail("Invalid JSON in manifest GET: %r" % body)
|
||||
|
||||
self.assertEqual(
|
||||
resp_data,
|
||||
|
|
|
@ -27,7 +27,7 @@ from swift.common import direct_client
|
|||
from swift.common.exceptions import ClientException
|
||||
from swift.common.utils import json, Timestamp
|
||||
from swift.common.swob import HeaderKeyDict, RESPONSE_REASONS
|
||||
from swift.common.storage_policy import POLICY_INDEX, POLICIES
|
||||
from swift.common.storage_policy import POLICIES
|
||||
|
||||
from test.unit import patch_policies
|
||||
|
||||
|
@ -134,9 +134,11 @@ class TestDirectClient(unittest.TestCase):
|
|||
for add_ts in (True, False):
|
||||
now = time.time()
|
||||
headers = direct_client.gen_headers(
|
||||
{POLICY_INDEX: policy.idx}, add_ts=add_ts)
|
||||
{'X-Backend-Storage-Policy-Index': policy.idx},
|
||||
add_ts=add_ts)
|
||||
self.assertEqual(headers['user-agent'], stub_user_agent)
|
||||
self.assertEqual(headers[POLICY_INDEX], str(policy.idx))
|
||||
self.assertEqual(headers['X-Backend-Storage-Policy-Index'],
|
||||
str(policy.idx))
|
||||
expected_header_count = 2
|
||||
if add_ts:
|
||||
expected_header_count += 1
|
||||
|
|
|
@ -23,9 +23,11 @@ from textwrap import dedent
|
|||
import os
|
||||
|
||||
from test.unit import FakeLogger
|
||||
import eventlet
|
||||
from eventlet.green import urllib2
|
||||
from swift.common import internal_client
|
||||
from swift.common import swob
|
||||
from swift.common.storage_policy import StoragePolicy
|
||||
|
||||
from test.unit import with_tempdir, write_fake_ring, patch_policies
|
||||
from test.unit.common.middleware.helpers import FakeSwift
|
||||
|
@ -202,7 +204,6 @@ class TestCompressingfileReader(unittest.TestCase):
|
|||
|
||||
class TestInternalClient(unittest.TestCase):
|
||||
|
||||
@patch_policies(legacy_only=True)
|
||||
@mock.patch('swift.common.utils.HASH_PATH_SUFFIX', new='endcap')
|
||||
@with_tempdir
|
||||
def test_load_from_config(self, tempdir):
|
||||
|
@ -232,7 +233,8 @@ class TestInternalClient(unittest.TestCase):
|
|||
write_fake_ring(container_ring_path)
|
||||
object_ring_path = os.path.join(tempdir, 'object.ring.gz')
|
||||
write_fake_ring(object_ring_path)
|
||||
client = internal_client.InternalClient(conf_path, 'test', 1)
|
||||
with patch_policies([StoragePolicy(0, 'legacy', True)]):
|
||||
client = internal_client.InternalClient(conf_path, 'test', 1)
|
||||
self.assertEqual(client.account_ring, client.app.app.app.account_ring)
|
||||
self.assertEqual(client.account_ring.serialized_path,
|
||||
account_ring_path)
|
||||
|
@ -1130,6 +1132,7 @@ class TestSimpleClient(unittest.TestCase):
|
|||
logger = FakeLogger()
|
||||
retval = sc.retry_request(
|
||||
'GET', headers={'content-length': '123'}, logger=logger)
|
||||
self.assertEqual(urlopen.call_count, 1)
|
||||
request.assert_called_with('http://127.0.0.1?format=json',
|
||||
headers={'content-length': '123'},
|
||||
data=None)
|
||||
|
@ -1181,29 +1184,107 @@ class TestSimpleClient(unittest.TestCase):
|
|||
def test_get_with_retries_all_failed(self, request, urlopen):
|
||||
# Simulate a failing request, ensure retries done
|
||||
request.return_value.get_type.return_value = "http"
|
||||
request.side_effect = urllib2.URLError('')
|
||||
urlopen.return_value.read.return_value = ''
|
||||
urlopen.side_effect = urllib2.URLError('')
|
||||
sc = internal_client.SimpleClient(url='http://127.0.0.1', retries=1)
|
||||
self.assertRaises(urllib2.URLError, sc.retry_request, 'GET')
|
||||
with mock.patch('swift.common.internal_client.sleep') as mock_sleep:
|
||||
self.assertRaises(urllib2.URLError, sc.retry_request, 'GET')
|
||||
self.assertEqual(mock_sleep.call_count, 1)
|
||||
self.assertEqual(request.call_count, 2)
|
||||
self.assertEqual(urlopen.call_count, 2)
|
||||
|
||||
@mock.patch('eventlet.green.urllib2.urlopen')
|
||||
@mock.patch('eventlet.green.urllib2.Request')
|
||||
def test_get_with_retries(self, request, urlopen):
|
||||
# First request fails, retry successful
|
||||
request.return_value.get_type.return_value = "http"
|
||||
urlopen.return_value.read.return_value = ''
|
||||
req = urllib2.Request('http://127.0.0.1', method='GET')
|
||||
request.side_effect = [urllib2.URLError(''), req]
|
||||
mock_resp = mock.MagicMock()
|
||||
mock_resp.read.return_value = ''
|
||||
urlopen.side_effect = [urllib2.URLError(''), mock_resp]
|
||||
sc = internal_client.SimpleClient(url='http://127.0.0.1', retries=1,
|
||||
token='token')
|
||||
|
||||
retval = sc.retry_request('GET')
|
||||
self.assertEqual(request.call_count, 3)
|
||||
with mock.patch('swift.common.internal_client.sleep') as mock_sleep:
|
||||
retval = sc.retry_request('GET')
|
||||
self.assertEqual(mock_sleep.call_count, 1)
|
||||
self.assertEqual(request.call_count, 2)
|
||||
self.assertEqual(urlopen.call_count, 2)
|
||||
request.assert_called_with('http://127.0.0.1?format=json', data=None,
|
||||
headers={'X-Auth-Token': 'token'})
|
||||
self.assertEqual([None, None], retval)
|
||||
|
||||
@mock.patch('eventlet.green.urllib2.urlopen')
|
||||
def test_get_with_retries_param(self, mock_urlopen):
|
||||
mock_response = mock.MagicMock()
|
||||
mock_response.read.return_value = ''
|
||||
mock_urlopen.side_effect = internal_client.httplib.BadStatusLine('')
|
||||
c = internal_client.SimpleClient(url='http://127.0.0.1', token='token')
|
||||
self.assertEqual(c.retries, 5)
|
||||
|
||||
# first without retries param
|
||||
with mock.patch('swift.common.internal_client.sleep') as mock_sleep:
|
||||
self.assertRaises(internal_client.httplib.BadStatusLine,
|
||||
c.retry_request, 'GET')
|
||||
self.assertEqual(mock_sleep.call_count, 5)
|
||||
self.assertEqual(mock_urlopen.call_count, 6)
|
||||
# then with retries param
|
||||
mock_urlopen.reset_mock()
|
||||
with mock.patch('swift.common.internal_client.sleep') as mock_sleep:
|
||||
self.assertRaises(internal_client.httplib.BadStatusLine,
|
||||
c.retry_request, 'GET', retries=2)
|
||||
self.assertEqual(mock_sleep.call_count, 2)
|
||||
self.assertEqual(mock_urlopen.call_count, 3)
|
||||
# and this time with a real response
|
||||
mock_urlopen.reset_mock()
|
||||
mock_urlopen.side_effect = [internal_client.httplib.BadStatusLine(''),
|
||||
mock_response]
|
||||
with mock.patch('swift.common.internal_client.sleep') as mock_sleep:
|
||||
retval = c.retry_request('GET', retries=1)
|
||||
self.assertEqual(mock_sleep.call_count, 1)
|
||||
self.assertEqual(mock_urlopen.call_count, 2)
|
||||
self.assertEqual([None, None], retval)
|
||||
|
||||
def test_proxy(self):
|
||||
running = True
|
||||
|
||||
def handle(sock):
|
||||
while running:
|
||||
try:
|
||||
with eventlet.Timeout(0.1):
|
||||
(conn, addr) = sock.accept()
|
||||
except eventlet.Timeout:
|
||||
continue
|
||||
else:
|
||||
conn.send('HTTP/1.1 503 Server Error')
|
||||
conn.close()
|
||||
sock.close()
|
||||
|
||||
sock = eventlet.listen(('', 0))
|
||||
port = sock.getsockname()[1]
|
||||
proxy = 'http://127.0.0.1:%s' % port
|
||||
url = 'https://127.0.0.1:1/a'
|
||||
server = eventlet.spawn(handle, sock)
|
||||
try:
|
||||
headers = {'Content-Length': '0'}
|
||||
with mock.patch('swift.common.internal_client.sleep'):
|
||||
try:
|
||||
internal_client.put_object(
|
||||
url, container='c', name='o1', headers=headers,
|
||||
contents='', proxy=proxy, timeout=0.1, retries=0)
|
||||
except urllib2.HTTPError as e:
|
||||
self.assertEqual(e.code, 503)
|
||||
except urllib2.URLError as e:
|
||||
if 'ECONNREFUSED' in str(e):
|
||||
self.fail(
|
||||
"Got %s which probably means the http proxy "
|
||||
"settings were not used" % e)
|
||||
else:
|
||||
raise e
|
||||
else:
|
||||
self.fail('Unexpected successful response')
|
||||
finally:
|
||||
running = False
|
||||
server.wait()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
|
|
@ -3050,6 +3050,20 @@ class TestStatsdLogging(unittest.TestCase):
|
|||
self.assertEquals(mock_controller.args[0], 'METHOD.timing')
|
||||
self.assert_(mock_controller.args[1] > 0)
|
||||
|
||||
mock_controller = MockController(412)
|
||||
METHOD(mock_controller)
|
||||
self.assertEquals(len(mock_controller.args), 2)
|
||||
self.assertEquals(mock_controller.called, 'timing')
|
||||
self.assertEquals(mock_controller.args[0], 'METHOD.timing')
|
||||
self.assert_(mock_controller.args[1] > 0)
|
||||
|
||||
mock_controller = MockController(416)
|
||||
METHOD(mock_controller)
|
||||
self.assertEquals(len(mock_controller.args), 2)
|
||||
self.assertEquals(mock_controller.called, 'timing')
|
||||
self.assertEquals(mock_controller.args[0], 'METHOD.timing')
|
||||
self.assert_(mock_controller.args[1] > 0)
|
||||
|
||||
mock_controller = MockController(401)
|
||||
METHOD(mock_controller)
|
||||
self.assertEquals(len(mock_controller.args), 2)
|
||||
|
@ -3683,8 +3697,112 @@ class TestThreadpool(unittest.TestCase):
|
|||
|
||||
|
||||
class TestAuditLocationGenerator(unittest.TestCase):
|
||||
|
||||
def test_drive_tree_access(self):
|
||||
orig_listdir = utils.listdir
|
||||
|
||||
def _mock_utils_listdir(path):
|
||||
if 'bad_part' in path:
|
||||
raise OSError(errno.EACCES)
|
||||
elif 'bad_suffix' in path:
|
||||
raise OSError(errno.EACCES)
|
||||
elif 'bad_hash' in path:
|
||||
raise OSError(errno.EACCES)
|
||||
else:
|
||||
return orig_listdir(path)
|
||||
|
||||
#Check Raise on Bad partition
|
||||
tmpdir = mkdtemp()
|
||||
data = os.path.join(tmpdir, "drive", "data")
|
||||
os.makedirs(data)
|
||||
obj_path = os.path.join(data, "bad_part")
|
||||
with open(obj_path, "w"):
|
||||
pass
|
||||
part1 = os.path.join(data, "partition1")
|
||||
os.makedirs(part1)
|
||||
part2 = os.path.join(data, "partition2")
|
||||
os.makedirs(part2)
|
||||
with patch('swift.common.utils.listdir', _mock_utils_listdir):
|
||||
audit = lambda: list(utils.audit_location_generator(
|
||||
tmpdir, "data", mount_check=False))
|
||||
self.assertRaises(OSError, audit)
|
||||
|
||||
#Check Raise on Bad Suffix
|
||||
tmpdir = mkdtemp()
|
||||
data = os.path.join(tmpdir, "drive", "data")
|
||||
os.makedirs(data)
|
||||
part1 = os.path.join(data, "partition1")
|
||||
os.makedirs(part1)
|
||||
part2 = os.path.join(data, "partition2")
|
||||
os.makedirs(part2)
|
||||
obj_path = os.path.join(part1, "bad_suffix")
|
||||
with open(obj_path, 'w'):
|
||||
pass
|
||||
suffix = os.path.join(part2, "suffix")
|
||||
os.makedirs(suffix)
|
||||
with patch('swift.common.utils.listdir', _mock_utils_listdir):
|
||||
audit = lambda: list(utils.audit_location_generator(
|
||||
tmpdir, "data", mount_check=False))
|
||||
self.assertRaises(OSError, audit)
|
||||
|
||||
#Check Raise on Bad Hash
|
||||
tmpdir = mkdtemp()
|
||||
data = os.path.join(tmpdir, "drive", "data")
|
||||
os.makedirs(data)
|
||||
part1 = os.path.join(data, "partition1")
|
||||
os.makedirs(part1)
|
||||
suffix = os.path.join(part1, "suffix")
|
||||
os.makedirs(suffix)
|
||||
hash1 = os.path.join(suffix, "hash1")
|
||||
os.makedirs(hash1)
|
||||
obj_path = os.path.join(suffix, "bad_hash")
|
||||
with open(obj_path, 'w'):
|
||||
pass
|
||||
with patch('swift.common.utils.listdir', _mock_utils_listdir):
|
||||
audit = lambda: list(utils.audit_location_generator(
|
||||
tmpdir, "data", mount_check=False))
|
||||
self.assertRaises(OSError, audit)
|
||||
|
||||
def test_non_dir_drive(self):
|
||||
with temptree([]) as tmpdir:
|
||||
logger = FakeLogger()
|
||||
data = os.path.join(tmpdir, "drive", "data")
|
||||
os.makedirs(data)
|
||||
#Create a file, that represents a non-dir drive
|
||||
open(os.path.join(tmpdir, 'asdf'), 'w')
|
||||
locations = utils.audit_location_generator(
|
||||
tmpdir, "data", mount_check=False, logger=logger
|
||||
)
|
||||
self.assertEqual(list(locations), [])
|
||||
self.assertEqual(1, len(logger.get_lines_for_level('warning')))
|
||||
#Test without the logger
|
||||
locations = utils.audit_location_generator(
|
||||
tmpdir, "data", mount_check=False
|
||||
)
|
||||
self.assertEqual(list(locations), [])
|
||||
|
||||
def test_mount_check_drive(self):
|
||||
with temptree([]) as tmpdir:
|
||||
logger = FakeLogger()
|
||||
data = os.path.join(tmpdir, "drive", "data")
|
||||
os.makedirs(data)
|
||||
#Create a file, that represents a non-dir drive
|
||||
open(os.path.join(tmpdir, 'asdf'), 'w')
|
||||
locations = utils.audit_location_generator(
|
||||
tmpdir, "data", mount_check=True, logger=logger
|
||||
)
|
||||
self.assertEqual(list(locations), [])
|
||||
self.assertEqual(2, len(logger.get_lines_for_level('warning')))
|
||||
|
||||
#Test without the logger
|
||||
locations = utils.audit_location_generator(
|
||||
tmpdir, "data", mount_check=True
|
||||
)
|
||||
self.assertEqual(list(locations), [])
|
||||
|
||||
def test_non_dir_contents(self):
|
||||
with temptree([]) as tmpdir:
|
||||
logger = FakeLogger()
|
||||
data = os.path.join(tmpdir, "drive", "data")
|
||||
os.makedirs(data)
|
||||
with open(os.path.join(data, "partition1"), "w"):
|
||||
|
@ -3698,31 +3816,49 @@ class TestAuditLocationGenerator(unittest.TestCase):
|
|||
with open(os.path.join(suffix, "hash1"), "w"):
|
||||
pass
|
||||
locations = utils.audit_location_generator(
|
||||
tmpdir, "data", mount_check=False
|
||||
tmpdir, "data", mount_check=False, logger=logger
|
||||
)
|
||||
self.assertEqual(list(locations), [])
|
||||
|
||||
def test_find_objects(self):
|
||||
with temptree([]) as tmpdir:
|
||||
expected_objs = list()
|
||||
logger = FakeLogger()
|
||||
data = os.path.join(tmpdir, "drive", "data")
|
||||
os.makedirs(data)
|
||||
#Create a file, that represents a non-dir drive
|
||||
open(os.path.join(tmpdir, 'asdf'), 'w')
|
||||
partition = os.path.join(data, "partition1")
|
||||
os.makedirs(partition)
|
||||
suffix = os.path.join(partition, "suffix")
|
||||
os.makedirs(suffix)
|
||||
hash_path = os.path.join(suffix, "hash")
|
||||
os.makedirs(hash_path)
|
||||
obj_path = os.path.join(hash_path, "obj1.db")
|
||||
with open(obj_path, "w"):
|
||||
pass
|
||||
expected_objs.append((obj_path, 'drive', 'partition1'))
|
||||
partition = os.path.join(data, "partition2")
|
||||
os.makedirs(partition)
|
||||
suffix = os.path.join(partition, "suffix2")
|
||||
os.makedirs(suffix)
|
||||
hash_path = os.path.join(suffix, "hash2")
|
||||
os.makedirs(hash_path)
|
||||
obj_path = os.path.join(hash_path, "obj1.dat")
|
||||
obj_path = os.path.join(hash_path, "obj2.db")
|
||||
with open(obj_path, "w"):
|
||||
pass
|
||||
expected_objs.append((obj_path, 'drive', 'partition2'))
|
||||
locations = utils.audit_location_generator(
|
||||
tmpdir, "data", ".dat", mount_check=False
|
||||
tmpdir, "data", mount_check=False, logger=logger
|
||||
)
|
||||
self.assertEqual(list(locations),
|
||||
[(obj_path, "drive", "partition2")])
|
||||
got_objs = list(locations)
|
||||
self.assertEqual(len(got_objs), len(expected_objs))
|
||||
self.assertEqual(sorted(got_objs), sorted(expected_objs))
|
||||
self.assertEqual(1, len(logger.get_lines_for_level('warning')))
|
||||
|
||||
def test_ignore_metadata(self):
|
||||
with temptree([]) as tmpdir:
|
||||
logger = FakeLogger()
|
||||
data = os.path.join(tmpdir, "drive", "data")
|
||||
os.makedirs(data)
|
||||
partition = os.path.join(data, "partition2")
|
||||
|
@ -3738,7 +3874,7 @@ class TestAuditLocationGenerator(unittest.TestCase):
|
|||
with open(meta_path, "w"):
|
||||
pass
|
||||
locations = utils.audit_location_generator(
|
||||
tmpdir, "data", ".dat", mount_check=False
|
||||
tmpdir, "data", ".dat", mount_check=False, logger=logger
|
||||
)
|
||||
self.assertEqual(list(locations),
|
||||
[(obj_path, "drive", "partition2")])
|
||||
|
|
|
@ -40,8 +40,7 @@ import swift.container.server as container_server
|
|||
import swift.account.server as account_server
|
||||
from swift.common.swob import Request
|
||||
from swift.common import wsgi, utils
|
||||
from swift.common.storage_policy import StoragePolicy, \
|
||||
StoragePolicyCollection
|
||||
from swift.common.storage_policy import POLICIES
|
||||
|
||||
from test.unit import temptree, with_tempdir, write_fake_ring, patch_policies
|
||||
|
||||
|
@ -51,16 +50,15 @@ from paste.deploy import loadwsgi
|
|||
def _fake_rings(tmpdir):
|
||||
write_fake_ring(os.path.join(tmpdir, 'account.ring.gz'))
|
||||
write_fake_ring(os.path.join(tmpdir, 'container.ring.gz'))
|
||||
# Some storage-policy-specific fake rings.
|
||||
policy = [StoragePolicy(0, 'zero'),
|
||||
StoragePolicy(1, 'one', is_default=True)]
|
||||
policies = StoragePolicyCollection(policy)
|
||||
for pol in policies:
|
||||
for policy in POLICIES:
|
||||
obj_ring_path = \
|
||||
os.path.join(tmpdir, pol.ring_name + '.ring.gz')
|
||||
os.path.join(tmpdir, policy.ring_name + '.ring.gz')
|
||||
write_fake_ring(obj_ring_path)
|
||||
# make sure there's no other ring cached on this policy
|
||||
policy.object_ring = None
|
||||
|
||||
|
||||
@patch_policies
|
||||
class TestWSGI(unittest.TestCase):
|
||||
"""Tests for swift.common.wsgi"""
|
||||
|
||||
|
@ -757,6 +755,7 @@ class TestPipelineWrapper(unittest.TestCase):
|
|||
"<unknown> catch_errors tempurl proxy-server")
|
||||
|
||||
|
||||
@patch_policies
|
||||
@mock.patch('swift.common.utils.HASH_PATH_SUFFIX', new='endcap')
|
||||
class TestPipelineModification(unittest.TestCase):
|
||||
def pipeline_modules(self, app):
|
||||
|
@ -1012,7 +1011,6 @@ class TestPipelineModification(unittest.TestCase):
|
|||
'swift.common.middleware.dlo',
|
||||
'swift.proxy.server'])
|
||||
|
||||
@patch_policies
|
||||
@with_tempdir
|
||||
def test_loadapp_proxy(self, tempdir):
|
||||
conf_path = os.path.join(tempdir, 'proxy-server.conf')
|
||||
|
@ -1034,24 +1032,23 @@ class TestPipelineModification(unittest.TestCase):
|
|||
""" % tempdir
|
||||
with open(conf_path, 'w') as f:
|
||||
f.write(dedent(conf_body))
|
||||
_fake_rings(tempdir)
|
||||
account_ring_path = os.path.join(tempdir, 'account.ring.gz')
|
||||
write_fake_ring(account_ring_path)
|
||||
container_ring_path = os.path.join(tempdir, 'container.ring.gz')
|
||||
write_fake_ring(container_ring_path)
|
||||
object_ring_path = os.path.join(tempdir, 'object.ring.gz')
|
||||
write_fake_ring(object_ring_path)
|
||||
object_1_ring_path = os.path.join(tempdir, 'object-1.ring.gz')
|
||||
write_fake_ring(object_1_ring_path)
|
||||
object_ring_paths = {}
|
||||
for policy in POLICIES:
|
||||
object_ring_paths[int(policy)] = os.path.join(
|
||||
tempdir, policy.ring_name + '.ring.gz')
|
||||
|
||||
app = wsgi.loadapp(conf_path)
|
||||
proxy_app = app.app.app.app.app
|
||||
self.assertEqual(proxy_app.account_ring.serialized_path,
|
||||
account_ring_path)
|
||||
self.assertEqual(proxy_app.container_ring.serialized_path,
|
||||
container_ring_path)
|
||||
self.assertEqual(proxy_app.get_object_ring(0).serialized_path,
|
||||
object_ring_path)
|
||||
self.assertEqual(proxy_app.get_object_ring(1).serialized_path,
|
||||
object_1_ring_path)
|
||||
for policy_index, expected_path in object_ring_paths.items():
|
||||
object_ring = proxy_app.get_object_ring(policy_index)
|
||||
self.assertEqual(expected_path, object_ring.serialized_path)
|
||||
|
||||
@with_tempdir
|
||||
def test_loadapp_storage(self, tempdir):
|
||||
|
|
|
@ -38,8 +38,7 @@ from swift.common import constraints
|
|||
from swift.common.utils import (Timestamp, mkdirs, public, replication,
|
||||
lock_parent_directory, json)
|
||||
from test.unit import fake_http_connect
|
||||
from swift.common.storage_policy import (POLICY_INDEX, POLICIES,
|
||||
StoragePolicy)
|
||||
from swift.common.storage_policy import (POLICIES, StoragePolicy)
|
||||
from swift.common.request_helpers import get_sys_meta_prefix
|
||||
|
||||
from test.unit import patch_policies
|
||||
|
@ -87,7 +86,8 @@ class TestContainerController(unittest.TestCase):
|
|||
req = Request.blank(req.path, method='HEAD')
|
||||
resp = req.get_response(self.controller)
|
||||
self.assertEqual(204, resp.status_int)
|
||||
self.assertEqual(str(policy_index), resp.headers[POLICY_INDEX])
|
||||
self.assertEqual(str(policy_index),
|
||||
resp.headers['X-Backend-Storage-Policy-Index'])
|
||||
|
||||
def test_get_and_validate_policy_index(self):
|
||||
# no policy is OK
|
||||
|
@ -100,7 +100,7 @@ class TestContainerController(unittest.TestCase):
|
|||
req = Request.blank('/sda1/p/a/c_%s' % policy, method='PUT',
|
||||
headers={
|
||||
'X-Timestamp': '0',
|
||||
POLICY_INDEX: policy
|
||||
'X-Backend-Storage-Policy-Index': policy
|
||||
})
|
||||
resp = req.get_response(self.controller)
|
||||
self.assertEqual(400, resp.status_int)
|
||||
|
@ -111,7 +111,8 @@ class TestContainerController(unittest.TestCase):
|
|||
req = Request.blank('/sda1/p/a/c_%s' % policy.name, method='PUT',
|
||||
headers={
|
||||
'X-Timestamp': '0',
|
||||
POLICY_INDEX: policy.idx,
|
||||
'X-Backend-Storage-Policy-Index':
|
||||
policy.idx,
|
||||
})
|
||||
self._check_put_container_storage_policy(req, policy.idx)
|
||||
|
||||
|
@ -204,7 +205,8 @@ class TestContainerController(unittest.TestCase):
|
|||
Timestamp(start).normal)
|
||||
|
||||
# backend headers
|
||||
self.assertEqual(int(response.headers[POLICY_INDEX]),
|
||||
self.assertEqual(int(response.headers
|
||||
['X-Backend-Storage-Policy-Index']),
|
||||
int(POLICIES.default))
|
||||
self.assert_(
|
||||
Timestamp(response.headers['x-backend-timestamp']) >= start)
|
||||
|
@ -219,7 +221,8 @@ class TestContainerController(unittest.TestCase):
|
|||
req = Request.blank('/sda1/p/a/c', method='HEAD')
|
||||
resp = req.get_response(self.controller)
|
||||
self.assertEqual(resp.status_int, 404)
|
||||
self.assertEqual(int(resp.headers[POLICY_INDEX]), 0)
|
||||
self.assertEqual(int(resp.headers['X-Backend-Storage-Policy-Index']),
|
||||
0)
|
||||
self.assertEqual(resp.headers['x-backend-timestamp'],
|
||||
Timestamp(0).internal)
|
||||
self.assertEqual(resp.headers['x-backend-put-timestamp'],
|
||||
|
@ -252,7 +255,8 @@ class TestContainerController(unittest.TestCase):
|
|||
resp = req.get_response(self.controller)
|
||||
self.assertEqual(resp.status_int, 404)
|
||||
# backend headers
|
||||
self.assertEqual(int(resp.headers[POLICY_INDEX]),
|
||||
self.assertEqual(int(resp.headers[
|
||||
'X-Backend-Storage-Policy-Index']),
|
||||
int(POLICIES.default))
|
||||
self.assert_(Timestamp(resp.headers['x-backend-timestamp']) >=
|
||||
Timestamp(request_method_times['PUT']))
|
||||
|
@ -363,14 +367,16 @@ class TestContainerController(unittest.TestCase):
|
|||
# Set metadata header
|
||||
req = Request.blank('/sda1/p/a/c', method='PUT',
|
||||
headers={'X-Timestamp': Timestamp(1).internal,
|
||||
POLICY_INDEX: policy.idx})
|
||||
'X-Backend-Storage-Policy-Index':
|
||||
policy.idx})
|
||||
resp = req.get_response(self.controller)
|
||||
self.assertEquals(resp.status_int, 201)
|
||||
|
||||
# now make sure we read it back
|
||||
req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'GET'})
|
||||
resp = req.get_response(self.controller)
|
||||
self.assertEquals(resp.headers.get(POLICY_INDEX), str(policy.idx))
|
||||
self.assertEquals(resp.headers.get('X-Backend-Storage-Policy-Index'),
|
||||
str(policy.idx))
|
||||
|
||||
def test_PUT_no_policy_specified(self):
|
||||
# Set metadata header
|
||||
|
@ -382,14 +388,14 @@ class TestContainerController(unittest.TestCase):
|
|||
# now make sure the default was used (pol 1)
|
||||
req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'GET'})
|
||||
resp = req.get_response(self.controller)
|
||||
self.assertEquals(resp.headers.get(POLICY_INDEX),
|
||||
self.assertEquals(resp.headers.get('X-Backend-Storage-Policy-Index'),
|
||||
str(POLICIES.default.idx))
|
||||
|
||||
def test_PUT_bad_policy_specified(self):
|
||||
# Set metadata header
|
||||
req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'PUT'},
|
||||
headers={'X-Timestamp': Timestamp(1).internal,
|
||||
POLICY_INDEX: 'nada'})
|
||||
'X-Backend-Storage-Policy-Index': 'nada'})
|
||||
resp = req.get_response(self.controller)
|
||||
# make sure we get bad response
|
||||
self.assertEquals(resp.status_int, 400)
|
||||
|
@ -400,20 +406,21 @@ class TestContainerController(unittest.TestCase):
|
|||
# Set metadata header
|
||||
req = Request.blank('/sda1/p/a/c', method='PUT', headers={
|
||||
'X-Timestamp': ts.next(),
|
||||
POLICY_INDEX: policy.idx})
|
||||
'X-Backend-Storage-Policy-Index': policy.idx})
|
||||
resp = req.get_response(self.controller)
|
||||
self.assertEquals(resp.status_int, 201)
|
||||
req = Request.blank('/sda1/p/a/c')
|
||||
resp = req.get_response(self.controller)
|
||||
self.assertEquals(resp.status_int, 204)
|
||||
# make sure we get the right index back
|
||||
self.assertEquals(resp.headers.get(POLICY_INDEX), str(policy.idx))
|
||||
self.assertEquals(resp.headers.get('X-Backend-Storage-Policy-Index'),
|
||||
str(policy.idx))
|
||||
|
||||
# now try to update w/o changing the policy
|
||||
for method in ('POST', 'PUT'):
|
||||
req = Request.blank('/sda1/p/a/c', method=method, headers={
|
||||
'X-Timestamp': ts.next(),
|
||||
POLICY_INDEX: policy.idx
|
||||
'X-Backend-Storage-Policy-Index': policy.idx
|
||||
})
|
||||
resp = req.get_response(self.controller)
|
||||
self.assertEquals(resp.status_int // 100, 2)
|
||||
|
@ -421,7 +428,8 @@ class TestContainerController(unittest.TestCase):
|
|||
req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'GET'})
|
||||
resp = req.get_response(self.controller)
|
||||
self.assertEquals(resp.status_int, 204)
|
||||
self.assertEquals(resp.headers.get(POLICY_INDEX), str(policy.idx))
|
||||
self.assertEquals(resp.headers.get('X-Backend-Storage-Policy-Index'),
|
||||
str(policy.idx))
|
||||
|
||||
def test_PUT_bad_policy_change(self):
|
||||
ts = (Timestamp(t).internal for t in itertools.count(time.time()))
|
||||
|
@ -429,21 +437,22 @@ class TestContainerController(unittest.TestCase):
|
|||
# Set metadata header
|
||||
req = Request.blank('/sda1/p/a/c', method='PUT', headers={
|
||||
'X-Timestamp': ts.next(),
|
||||
POLICY_INDEX: policy.idx})
|
||||
'X-Backend-Storage-Policy-Index': policy.idx})
|
||||
resp = req.get_response(self.controller)
|
||||
self.assertEquals(resp.status_int, 201)
|
||||
req = Request.blank('/sda1/p/a/c')
|
||||
resp = req.get_response(self.controller)
|
||||
self.assertEquals(resp.status_int, 204)
|
||||
# make sure we get the right index back
|
||||
self.assertEquals(resp.headers.get(POLICY_INDEX), str(policy.idx))
|
||||
self.assertEquals(resp.headers.get('X-Backend-Storage-Policy-Index'),
|
||||
str(policy.idx))
|
||||
|
||||
other_policies = [p for p in POLICIES if p != policy]
|
||||
for other_policy in other_policies:
|
||||
# now try to change it and make sure we get a conflict
|
||||
req = Request.blank('/sda1/p/a/c', method='PUT', headers={
|
||||
'X-Timestamp': ts.next(),
|
||||
POLICY_INDEX: other_policy.idx
|
||||
'X-Backend-Storage-Policy-Index': other_policy.idx
|
||||
})
|
||||
resp = req.get_response(self.controller)
|
||||
self.assertEquals(resp.status_int, 409)
|
||||
|
@ -453,28 +462,30 @@ class TestContainerController(unittest.TestCase):
|
|||
resp = req.get_response(self.controller)
|
||||
self.assertEquals(resp.status_int, 204)
|
||||
# make sure we get the right index back
|
||||
self.assertEquals(resp.headers.get(POLICY_INDEX), str(policy.idx))
|
||||
self.assertEquals(resp.headers.get('X-Backend-Storage-Policy-Index'),
|
||||
str(policy.idx))
|
||||
|
||||
def test_POST_ignores_policy_change(self):
|
||||
ts = (Timestamp(t).internal for t in itertools.count(time.time()))
|
||||
policy = random.choice(list(POLICIES))
|
||||
req = Request.blank('/sda1/p/a/c', method='PUT', headers={
|
||||
'X-Timestamp': ts.next(),
|
||||
POLICY_INDEX: policy.idx})
|
||||
'X-Backend-Storage-Policy-Index': policy.idx})
|
||||
resp = req.get_response(self.controller)
|
||||
self.assertEquals(resp.status_int, 201)
|
||||
req = Request.blank('/sda1/p/a/c')
|
||||
resp = req.get_response(self.controller)
|
||||
self.assertEquals(resp.status_int, 204)
|
||||
# make sure we get the right index back
|
||||
self.assertEquals(resp.headers.get(POLICY_INDEX), str(policy.idx))
|
||||
self.assertEquals(resp.headers.get('X-Backend-Storage-Policy-Index'),
|
||||
str(policy.idx))
|
||||
|
||||
other_policies = [p for p in POLICIES if p != policy]
|
||||
for other_policy in other_policies:
|
||||
# now try to change it and make sure we get a conflict
|
||||
req = Request.blank('/sda1/p/a/c', method='POST', headers={
|
||||
'X-Timestamp': ts.next(),
|
||||
POLICY_INDEX: other_policy.idx
|
||||
'X-Backend-Storage-Policy-Index': other_policy.idx
|
||||
})
|
||||
resp = req.get_response(self.controller)
|
||||
# valid request
|
||||
|
@ -485,7 +496,9 @@ class TestContainerController(unittest.TestCase):
|
|||
resp = req.get_response(self.controller)
|
||||
self.assertEquals(resp.status_int, 204)
|
||||
# make sure we get the right index back
|
||||
self.assertEquals(resp.headers.get(POLICY_INDEX), str(policy.idx))
|
||||
self.assertEquals(resp.headers.get
|
||||
('X-Backend-Storage-Policy-Index'),
|
||||
str(policy.idx))
|
||||
|
||||
def test_PUT_no_policy_for_existing_default(self):
|
||||
ts = (Timestamp(t).internal for t in
|
||||
|
@ -501,7 +514,7 @@ class TestContainerController(unittest.TestCase):
|
|||
req = Request.blank('/sda1/p/a/c', method='HEAD')
|
||||
resp = req.get_response(self.controller)
|
||||
self.assertEqual(resp.status_int, 204)
|
||||
self.assertEqual(resp.headers[POLICY_INDEX],
|
||||
self.assertEqual(resp.headers['X-Backend-Storage-Policy-Index'],
|
||||
str(POLICIES.default.idx))
|
||||
|
||||
# put again without specifying the storage policy
|
||||
|
@ -515,7 +528,7 @@ class TestContainerController(unittest.TestCase):
|
|||
req = Request.blank('/sda1/p/a/c', method='HEAD')
|
||||
resp = req.get_response(self.controller)
|
||||
self.assertEqual(resp.status_int, 204)
|
||||
self.assertEqual(resp.headers[POLICY_INDEX],
|
||||
self.assertEqual(resp.headers['X-Backend-Storage-Policy-Index'],
|
||||
str(POLICIES.default.idx))
|
||||
|
||||
def test_PUT_proxy_default_no_policy_for_existing_default(self):
|
||||
|
@ -537,7 +550,7 @@ class TestContainerController(unittest.TestCase):
|
|||
req = Request.blank('/sda1/p/a/c', method='HEAD')
|
||||
resp = req.get_response(self.controller)
|
||||
self.assertEqual(resp.status_int, 204)
|
||||
self.assertEqual(int(resp.headers[POLICY_INDEX]),
|
||||
self.assertEqual(int(resp.headers['X-Backend-Storage-Policy-Index']),
|
||||
int(proxy_default))
|
||||
|
||||
# put again without proxy specifying the different default
|
||||
|
@ -552,7 +565,7 @@ class TestContainerController(unittest.TestCase):
|
|||
req = Request.blank('/sda1/p/a/c', method='HEAD')
|
||||
resp = req.get_response(self.controller)
|
||||
self.assertEqual(resp.status_int, 204)
|
||||
self.assertEqual(int(resp.headers[POLICY_INDEX]),
|
||||
self.assertEqual(int(resp.headers['X-Backend-Storage-Policy-Index']),
|
||||
int(proxy_default))
|
||||
|
||||
def test_PUT_no_policy_for_existing_non_default(self):
|
||||
|
@ -561,7 +574,7 @@ class TestContainerController(unittest.TestCase):
|
|||
# create a container with the non-default storage policy
|
||||
req = Request.blank('/sda1/p/a/c', method='PUT', headers={
|
||||
'X-Timestamp': ts.next(),
|
||||
POLICY_INDEX: non_default_policy.idx,
|
||||
'X-Backend-Storage-Policy-Index': non_default_policy.idx,
|
||||
})
|
||||
resp = req.get_response(self.controller)
|
||||
self.assertEqual(resp.status_int, 201) # sanity check
|
||||
|
@ -570,7 +583,7 @@ class TestContainerController(unittest.TestCase):
|
|||
req = Request.blank('/sda1/p/a/c', method='HEAD')
|
||||
resp = req.get_response(self.controller)
|
||||
self.assertEqual(resp.status_int, 204)
|
||||
self.assertEqual(resp.headers[POLICY_INDEX],
|
||||
self.assertEqual(resp.headers['X-Backend-Storage-Policy-Index'],
|
||||
str(non_default_policy.idx))
|
||||
|
||||
# put again without specifiying the storage policy
|
||||
|
@ -584,7 +597,7 @@ class TestContainerController(unittest.TestCase):
|
|||
req = Request.blank('/sda1/p/a/c', method='HEAD')
|
||||
resp = req.get_response(self.controller)
|
||||
self.assertEqual(resp.status_int, 204)
|
||||
self.assertEqual(resp.headers[POLICY_INDEX],
|
||||
self.assertEqual(resp.headers['X-Backend-Storage-Policy-Index'],
|
||||
str(non_default_policy.idx))
|
||||
|
||||
def test_PUT_GET_metadata(self):
|
||||
|
@ -1244,7 +1257,7 @@ class TestContainerController(unittest.TestCase):
|
|||
req = Request.blank(
|
||||
'/sda1/p/a/c', method='PUT',
|
||||
headers={'X-Timestamp': ts.next(),
|
||||
POLICY_INDEX: policy.idx})
|
||||
'X-Backend-Storage-Policy-Index': policy.idx})
|
||||
resp = req.get_response(self.controller)
|
||||
self.assertEqual(resp.status_int, 201) # sanity check
|
||||
|
||||
|
@ -1261,14 +1274,14 @@ class TestContainerController(unittest.TestCase):
|
|||
# state, so changing the policy index is perfectly acceptable
|
||||
req = Request.blank('/sda1/p/a/c', method='PUT', headers={
|
||||
'X-Timestamp': ts.next(),
|
||||
POLICY_INDEX: other_policy.idx})
|
||||
'X-Backend-Storage-Policy-Index': other_policy.idx})
|
||||
resp = req.get_response(self.controller)
|
||||
self.assertEqual(resp.status_int, 201) # sanity check
|
||||
|
||||
req = Request.blank(
|
||||
'/sda1/p/a/c', method='HEAD')
|
||||
resp = req.get_response(self.controller)
|
||||
self.assertEqual(resp.headers[POLICY_INDEX],
|
||||
self.assertEqual(resp.headers['X-Backend-Storage-Policy-Index'],
|
||||
str(other_policy.idx))
|
||||
|
||||
def test_change_to_default_storage_policy_via_DELETE_then_PUT(self):
|
||||
|
@ -1278,7 +1291,7 @@ class TestContainerController(unittest.TestCase):
|
|||
if not p.is_default])
|
||||
req = Request.blank('/sda1/p/a/c', method='PUT', headers={
|
||||
'X-Timestamp': ts.next(),
|
||||
POLICY_INDEX: non_default_policy.idx,
|
||||
'X-Backend-Storage-Policy-Index': non_default_policy.idx,
|
||||
})
|
||||
resp = req.get_response(self.controller)
|
||||
self.assertEqual(resp.status_int, 201) # sanity check
|
||||
|
@ -1299,7 +1312,7 @@ class TestContainerController(unittest.TestCase):
|
|||
|
||||
req = Request.blank('/sda1/p/a/c', method='HEAD')
|
||||
resp = req.get_response(self.controller)
|
||||
self.assertEqual(resp.headers[POLICY_INDEX],
|
||||
self.assertEqual(resp.headers['X-Backend-Storage-Policy-Index'],
|
||||
str(POLICIES.default.idx))
|
||||
|
||||
def test_DELETE_object(self):
|
||||
|
@ -1347,7 +1360,7 @@ class TestContainerController(unittest.TestCase):
|
|||
req = Request.blank('/sda1/p/a/c', method='HEAD')
|
||||
resp = req.get_response(self.controller)
|
||||
self.assertEqual(resp.status_int, 204)
|
||||
self.assertEqual(int(resp.headers[POLICY_INDEX]),
|
||||
self.assertEqual(int(resp.headers['X-Backend-Storage-Policy-Index']),
|
||||
int(POLICIES.default))
|
||||
# create object
|
||||
obj_timestamp = ts.next()
|
||||
|
@ -2381,7 +2394,7 @@ class TestContainerController(unittest.TestCase):
|
|||
'x-delete-timestamp': '0',
|
||||
'x-object-count': 0,
|
||||
'x-put-timestamp': Timestamp(12345).internal,
|
||||
POLICY_INDEX: '%s' % POLICIES.default.idx,
|
||||
'X-Backend-Storage-Policy-Index': '%s' % POLICIES.default.idx,
|
||||
'referer': 'PUT http://localhost/sda1/p/a/c',
|
||||
'user-agent': 'container-server %d' % os.getpid(),
|
||||
'x-trans-id': '-'})})
|
||||
|
@ -2399,7 +2412,7 @@ class TestContainerController(unittest.TestCase):
|
|||
'x-delete-timestamp': '0',
|
||||
'x-object-count': 0,
|
||||
'x-put-timestamp': Timestamp(12345).internal,
|
||||
POLICY_INDEX: '%s' % POLICIES.default.idx,
|
||||
'X-Backend-Storage-Policy-Index': '%s' % POLICIES.default.idx,
|
||||
'referer': 'PUT http://localhost/sda1/p/a/c',
|
||||
'user-agent': 'container-server %d' % os.getpid(),
|
||||
'x-trans-id': '-'})})
|
||||
|
@ -2551,10 +2564,11 @@ class TestNonLegacyDefaultStoragePolicy(TestContainerController):
|
|||
|
||||
def _update_object_put_headers(self, req):
|
||||
"""
|
||||
Add policy index headers for containers created with default POLICY
|
||||
Add policy index headers for containers created with default policy
|
||||
- which in this TestCase is 1.
|
||||
"""
|
||||
req.headers[POLICY_INDEX] = str(POLICIES.default.idx)
|
||||
req.headers['X-Backend-Storage-Policy-Index'] = \
|
||||
str(POLICIES.default.idx)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
|
|
@ -23,7 +23,7 @@ from test.unit import FakeLogger
|
|||
from swift.container import sync
|
||||
from swift.common import utils
|
||||
from swift.common.exceptions import ClientException
|
||||
from swift.common.storage_policy import StoragePolicy, POLICY_INDEX
|
||||
from swift.common.storage_policy import StoragePolicy
|
||||
from test.unit import patch_policies
|
||||
|
||||
utils.HASH_PATH_SUFFIX = 'endcap'
|
||||
|
@ -789,7 +789,8 @@ class TestContainerSync(unittest.TestCase):
|
|||
|
||||
def fake_direct_get_object(node, part, account, container, obj,
|
||||
headers, resp_chunk_size=1):
|
||||
self.assertEquals(headers[POLICY_INDEX], '0')
|
||||
self.assertEquals(headers['X-Backend-Storage-Policy-Index'],
|
||||
'0')
|
||||
return ({'other-header': 'other header value',
|
||||
'etag': '"etagvalue"', 'x-timestamp': '1.2',
|
||||
'content-type': 'text/plain; swift_bytes=123'},
|
||||
|
@ -807,7 +808,8 @@ class TestContainerSync(unittest.TestCase):
|
|||
|
||||
def fake_direct_get_object(node, part, account, container, obj,
|
||||
headers, resp_chunk_size=1):
|
||||
self.assertEquals(headers[POLICY_INDEX], '0')
|
||||
self.assertEquals(headers['X-Backend-Storage-Policy-Index'],
|
||||
'0')
|
||||
return ({'date': 'date value',
|
||||
'last-modified': 'last modified value',
|
||||
'x-timestamp': '1.2',
|
||||
|
@ -833,7 +835,8 @@ class TestContainerSync(unittest.TestCase):
|
|||
|
||||
def fake_direct_get_object(node, part, account, container, obj,
|
||||
headers, resp_chunk_size=1):
|
||||
self.assertEquals(headers[POLICY_INDEX], '0')
|
||||
self.assertEquals(headers['X-Backend-Storage-Policy-Index'],
|
||||
'0')
|
||||
exc.append(Exception('test exception'))
|
||||
raise exc[-1]
|
||||
|
||||
|
@ -854,7 +857,8 @@ class TestContainerSync(unittest.TestCase):
|
|||
|
||||
def fake_direct_get_object(node, part, account, container, obj,
|
||||
headers, resp_chunk_size=1):
|
||||
self.assertEquals(headers[POLICY_INDEX], '0')
|
||||
self.assertEquals(headers['X-Backend-Storage-Policy-Index'],
|
||||
'0')
|
||||
if len(exc) == 0:
|
||||
exc.append(Exception('test other exception'))
|
||||
else:
|
||||
|
@ -878,7 +882,8 @@ class TestContainerSync(unittest.TestCase):
|
|||
|
||||
def fake_direct_get_object(node, part, account, container, obj,
|
||||
headers, resp_chunk_size=1):
|
||||
self.assertEquals(headers[POLICY_INDEX], '0')
|
||||
self.assertEquals(headers['X-Backend-Storage-Policy-Index'],
|
||||
'0')
|
||||
return ({'other-header': 'other header value',
|
||||
'x-timestamp': '1.2', 'etag': '"etagvalue"'},
|
||||
iter('contents'))
|
||||
|
|
|
@ -278,6 +278,24 @@ class TestAuditor(unittest.TestCase):
|
|||
self.assertEquals(auditor_worker.stats_buckets[10240], 0)
|
||||
self.assertEquals(auditor_worker.stats_buckets['OVER'], 1)
|
||||
|
||||
def test_object_run_logging(self):
|
||||
logger = FakeLogger()
|
||||
auditor_worker = auditor.AuditorWorker(self.conf, logger,
|
||||
self.rcache, self.devices)
|
||||
auditor_worker.audit_all_objects(device_dirs=['sda'])
|
||||
log_lines = logger.get_lines_for_level('info')
|
||||
self.assertTrue(len(log_lines) > 0)
|
||||
self.assertTrue(log_lines[0].index('ALL - parallel, sda'))
|
||||
|
||||
logger = FakeLogger()
|
||||
auditor_worker = auditor.AuditorWorker(self.conf, logger,
|
||||
self.rcache, self.devices,
|
||||
zero_byte_only_at_fps=50)
|
||||
auditor_worker.audit_all_objects(device_dirs=['sda'])
|
||||
log_lines = logger.get_lines_for_level('info')
|
||||
self.assertTrue(len(log_lines) > 0)
|
||||
self.assertTrue(log_lines[0].index('ZBF - sda'))
|
||||
|
||||
def test_object_run_once_no_sda(self):
|
||||
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
|
||||
self.rcache, self.devices)
|
||||
|
@ -444,15 +462,14 @@ class TestAuditor(unittest.TestCase):
|
|||
self.assertTrue(os.path.exists(ts_file_path))
|
||||
|
||||
def test_sleeper(self):
|
||||
auditor.SLEEP_BETWEEN_AUDITS = 0.10
|
||||
my_auditor = auditor.ObjectAuditor(self.conf)
|
||||
start = time.time()
|
||||
my_auditor._sleep()
|
||||
delta_t = time.time() - start
|
||||
self.assert_(delta_t > 0.08)
|
||||
self.assert_(delta_t < 0.12)
|
||||
with mock.patch(
|
||||
'time.sleep', mock.MagicMock()) as mock_sleep:
|
||||
auditor.SLEEP_BETWEEN_AUDITS = 0.10
|
||||
my_auditor = auditor.ObjectAuditor(self.conf)
|
||||
my_auditor._sleep()
|
||||
mock_sleep.assert_called_with(auditor.SLEEP_BETWEEN_AUDITS)
|
||||
|
||||
def test_run_audit(self):
|
||||
def test_run_parallel_audit(self):
|
||||
|
||||
class StopForever(Exception):
|
||||
pass
|
||||
|
@ -500,7 +517,9 @@ class TestAuditor(unittest.TestCase):
|
|||
|
||||
my_auditor = auditor.ObjectAuditor(dict(devices=self.devices,
|
||||
mount_check='false',
|
||||
zero_byte_files_per_second=89))
|
||||
zero_byte_files_per_second=89,
|
||||
concurrency=1))
|
||||
|
||||
mocker = ObjectAuditorMock()
|
||||
my_auditor.logger.exception = mock.MagicMock()
|
||||
real_audit_loop = my_auditor.audit_loop
|
||||
|
@ -555,12 +574,17 @@ class TestAuditor(unittest.TestCase):
|
|||
|
||||
my_auditor._sleep = mocker.mock_sleep_continue
|
||||
|
||||
my_auditor.concurrency = 2
|
||||
mocker.fork_called = 0
|
||||
mocker.wait_called = 0
|
||||
my_auditor.run_once()
|
||||
# Fork is called 3 times since the zbf process is forked twice
|
||||
self.assertEquals(mocker.fork_called, 3)
|
||||
self.assertEquals(mocker.wait_called, 3)
|
||||
# Fork is called no. of devices + (no. of devices)/2 + 1 times
|
||||
# since zbf process is forked (no.of devices)/2 + 1 times
|
||||
no_devices = len(os.listdir(self.devices))
|
||||
self.assertEquals(mocker.fork_called, no_devices + no_devices / 2
|
||||
+ 1)
|
||||
self.assertEquals(mocker.wait_called, no_devices + no_devices / 2
|
||||
+ 1)
|
||||
|
||||
finally:
|
||||
os.fork = was_fork
|
||||
|
|
|
@ -31,7 +31,7 @@ from swift.common import utils
|
|||
from swift.common.utils import hash_path, mkdirs, normalize_timestamp
|
||||
from swift.common import ring
|
||||
from swift.obj import diskfile, replicator as object_replicator
|
||||
from swift.common.storage_policy import StoragePolicy, POLICY_INDEX, POLICIES
|
||||
from swift.common.storage_policy import StoragePolicy, POLICIES
|
||||
|
||||
|
||||
def _ips():
|
||||
|
@ -706,7 +706,7 @@ class TestObjectReplicator(unittest.TestCase):
|
|||
for job in jobs:
|
||||
set_default(self)
|
||||
ring = self.replicator.get_object_ring(job['policy_idx'])
|
||||
self.headers[POLICY_INDEX] = job['policy_idx']
|
||||
self.headers['X-Backend-Storage-Policy-Index'] = job['policy_idx']
|
||||
self.replicator.update(job)
|
||||
self.assertTrue(error in mock_logger.error.call_args[0][0])
|
||||
self.assertTrue(expect in mock_logger.exception.call_args[0][0])
|
||||
|
@ -792,7 +792,7 @@ class TestObjectReplicator(unittest.TestCase):
|
|||
set_default(self)
|
||||
# with only one set of headers make sure we speicy index 0 here
|
||||
# as otherwise it may be different from earlier tests
|
||||
self.headers[POLICY_INDEX] = 0
|
||||
self.headers['X-Backend-Storage-Policy-Index'] = 0
|
||||
self.replicator.update(repl_job)
|
||||
reqs = []
|
||||
for node in repl_job['nodes']:
|
||||
|
|
|
@ -44,7 +44,7 @@ from swift.common.utils import hash_path, mkdirs, normalize_timestamp, \
|
|||
NullLogger, storage_directory, public, replication
|
||||
from swift.common import constraints
|
||||
from swift.common.swob import Request, HeaderKeyDict
|
||||
from swift.common.storage_policy import POLICY_INDEX, POLICIES
|
||||
from swift.common.storage_policy import POLICIES
|
||||
from swift.common.exceptions import DiskFileDeviceUnavailable
|
||||
|
||||
|
||||
|
@ -2311,7 +2311,8 @@ class TestObjectController(unittest.TestCase):
|
|||
self.object_controller.async_update(
|
||||
'PUT', 'a', 'c', 'o', '127.0.0.1:1234', 1, 'sdc1',
|
||||
{'x-timestamp': '1', 'x-out': 'set',
|
||||
POLICY_INDEX: policy.idx}, 'sda1', policy.idx)
|
||||
'X-Backend-Storage-Policy-Index': policy.idx}, 'sda1',
|
||||
policy.idx)
|
||||
finally:
|
||||
object_server.http_connect = orig_http_connect
|
||||
self.assertEquals(
|
||||
|
@ -2319,7 +2320,7 @@ class TestObjectController(unittest.TestCase):
|
|||
['127.0.0.1', '1234', 'sdc1', 1, 'PUT', '/a/c/o', {
|
||||
'x-timestamp': '1', 'x-out': 'set',
|
||||
'user-agent': 'obj-server %s' % os.getpid(),
|
||||
POLICY_INDEX: policy.idx}])
|
||||
'X-Backend-Storage-Policy-Index': policy.idx}])
|
||||
|
||||
@patch_policies([storage_policy.StoragePolicy(0, 'zero', True),
|
||||
storage_policy.StoragePolicy(1, 'one'),
|
||||
|
@ -2363,7 +2364,7 @@ class TestObjectController(unittest.TestCase):
|
|||
headers={'X-Timestamp': '12345',
|
||||
'Content-Type': 'application/burrito',
|
||||
'Content-Length': '0',
|
||||
POLICY_INDEX: policy.idx,
|
||||
'X-Backend-Storage-Policy-Index': policy.idx,
|
||||
'X-Container-Partition': '20',
|
||||
'X-Container-Host': '1.2.3.4:5',
|
||||
'X-Container-Device': 'sdb1',
|
||||
|
@ -2396,10 +2397,10 @@ class TestObjectController(unittest.TestCase):
|
|||
'x-etag': 'd41d8cd98f00b204e9800998ecf8427e',
|
||||
'x-size': '0',
|
||||
'x-timestamp': utils.Timestamp('12345').internal,
|
||||
POLICY_INDEX: '37',
|
||||
'X-Backend-Storage-Policy-Index': '37',
|
||||
'referer': 'PUT http://localhost/sda1/p/a/c/o',
|
||||
'user-agent': 'obj-server %d' % os.getpid(),
|
||||
POLICY_INDEX: policy.idx,
|
||||
'X-Backend-Storage-Policy-Index': policy.idx,
|
||||
'x-trans-id': '-'})})
|
||||
self.assertEquals(
|
||||
http_connect_args[1],
|
||||
|
@ -2417,7 +2418,8 @@ class TestObjectController(unittest.TestCase):
|
|||
'x-timestamp': utils.Timestamp('12345').internal,
|
||||
'referer': 'PUT http://localhost/sda1/p/a/c/o',
|
||||
'user-agent': 'obj-server %d' % os.getpid(),
|
||||
POLICY_INDEX: 0, # system account storage policy is 0
|
||||
# system account storage policy is 0
|
||||
'X-Backend-Storage-Policy-Index': 0,
|
||||
'x-trans-id': '-'})})
|
||||
self.assertEquals(
|
||||
http_connect_args[2],
|
||||
|
@ -2435,7 +2437,8 @@ class TestObjectController(unittest.TestCase):
|
|||
'x-timestamp': utils.Timestamp('12345').internal,
|
||||
'referer': 'PUT http://localhost/sda1/p/a/c/o',
|
||||
'user-agent': 'obj-server %d' % os.getpid(),
|
||||
POLICY_INDEX: 0, # system account storage policy is 0
|
||||
# system account storage policy is 0
|
||||
'X-Backend-Storage-Policy-Index': 0,
|
||||
'x-trans-id': '-'})})
|
||||
|
||||
@patch_policies([storage_policy.StoragePolicy(0, 'zero', True),
|
||||
|
@ -2476,7 +2479,7 @@ class TestObjectController(unittest.TestCase):
|
|||
headers={'X-Timestamp': '12345',
|
||||
'Content-Type': 'application/burrito',
|
||||
'Content-Length': '0',
|
||||
POLICY_INDEX: '26',
|
||||
'X-Backend-Storage-Policy-Index': '26',
|
||||
'X-Container-Partition': '20',
|
||||
'X-Container-Host': '1.2.3.4:5, 6.7.8.9:10',
|
||||
'X-Container-Device': 'sdb1, sdf1'})
|
||||
|
@ -2502,7 +2505,7 @@ class TestObjectController(unittest.TestCase):
|
|||
'x-etag': 'd41d8cd98f00b204e9800998ecf8427e',
|
||||
'x-size': '0',
|
||||
'x-timestamp': utils.Timestamp('12345').internal,
|
||||
POLICY_INDEX: '26',
|
||||
'X-Backend-Storage-Policy-Index': '26',
|
||||
'referer': 'PUT http://localhost/sda1/p/a/c/o',
|
||||
'user-agent': 'obj-server %d' % os.getpid(),
|
||||
'x-trans-id': '-'})})
|
||||
|
@ -2520,7 +2523,7 @@ class TestObjectController(unittest.TestCase):
|
|||
'x-etag': 'd41d8cd98f00b204e9800998ecf8427e',
|
||||
'x-size': '0',
|
||||
'x-timestamp': utils.Timestamp('12345').internal,
|
||||
POLICY_INDEX: '26',
|
||||
'X-Backend-Storage-Policy-Index': '26',
|
||||
'referer': 'PUT http://localhost/sda1/p/a/c/o',
|
||||
'user-agent': 'obj-server %d' % os.getpid(),
|
||||
'x-trans-id': '-'})})
|
||||
|
@ -2553,7 +2556,7 @@ class TestObjectController(unittest.TestCase):
|
|||
'X-Delete-At-Partition': 'p',
|
||||
'X-Delete-At-Host': '10.0.0.2:6002',
|
||||
'X-Delete-At-Device': 'sda1',
|
||||
POLICY_INDEX: int(policy),
|
||||
'X-Backend-Storage-Policy-Index': int(policy),
|
||||
})
|
||||
with mocked_http_conn(
|
||||
500, 500, give_connect=capture_updates) as fake_conn:
|
||||
|
@ -2571,7 +2574,8 @@ class TestObjectController(unittest.TestCase):
|
|||
(delete_at_container, delete_at_timestamp))
|
||||
expected = {
|
||||
'X-Timestamp': put_timestamp,
|
||||
POLICY_INDEX: 0, # system account is always 0
|
||||
# system account storage policy is 0
|
||||
'X-Backend-Storage-Policy-Index': 0,
|
||||
}
|
||||
for key, value in expected.items():
|
||||
self.assertEqual(headers[key], str(value))
|
||||
|
@ -2583,7 +2587,7 @@ class TestObjectController(unittest.TestCase):
|
|||
self.assertEqual(path, '/sda1/p/a/c/o')
|
||||
expected = {
|
||||
'X-Timestamp': put_timestamp,
|
||||
POLICY_INDEX: int(policy),
|
||||
'X-Backend-Storage-Policy-Index': int(policy),
|
||||
}
|
||||
for key, value in expected.items():
|
||||
self.assertEqual(headers[key], str(value))
|
||||
|
@ -2598,10 +2602,12 @@ class TestObjectController(unittest.TestCase):
|
|||
data = pickle.load(open(async_file))
|
||||
if data['account'] == 'a':
|
||||
self.assertEquals(
|
||||
int(data['headers'][POLICY_INDEX]), policy.idx)
|
||||
int(data['headers']
|
||||
['X-Backend-Storage-Policy-Index']), policy.idx)
|
||||
elif data['account'] == '.expiring_objects':
|
||||
self.assertEquals(
|
||||
int(data['headers'][POLICY_INDEX]), 0)
|
||||
int(data['headers']
|
||||
['X-Backend-Storage-Policy-Index']), 0)
|
||||
else:
|
||||
self.fail('unexpected async pending data')
|
||||
self.assertEqual(2, len(found_files))
|
||||
|
@ -2621,7 +2627,8 @@ class TestObjectController(unittest.TestCase):
|
|||
self.object_controller.async_update(
|
||||
'PUT', 'a', 'c', 'o', '127.0.0.1:1234', 1, 'sdc1',
|
||||
{'x-timestamp': '1', 'x-out': 'set',
|
||||
POLICY_INDEX: policy.idx}, 'sda1', policy.idx)
|
||||
'X-Backend-Storage-Policy-Index': policy.idx}, 'sda1',
|
||||
policy.idx)
|
||||
finally:
|
||||
object_server.http_connect = orig_http_connect
|
||||
utils.HASH_PATH_PREFIX = _prefix
|
||||
|
@ -2633,7 +2640,7 @@ class TestObjectController(unittest.TestCase):
|
|||
utils.Timestamp(1).internal))),
|
||||
{'headers': {'x-timestamp': '1', 'x-out': 'set',
|
||||
'user-agent': 'obj-server %s' % os.getpid(),
|
||||
POLICY_INDEX: policy.idx},
|
||||
'X-Backend-Storage-Policy-Index': policy.idx},
|
||||
'account': 'a', 'container': 'c', 'obj': 'o', 'op': 'PUT'})
|
||||
|
||||
def test_async_update_saves_on_non_2xx(self):
|
||||
|
@ -2664,7 +2671,8 @@ class TestObjectController(unittest.TestCase):
|
|||
self.object_controller.async_update(
|
||||
'PUT', 'a', 'c', 'o', '127.0.0.1:1234', 1, 'sdc1',
|
||||
{'x-timestamp': '1', 'x-out': str(status),
|
||||
POLICY_INDEX: policy.idx}, 'sda1', policy.idx)
|
||||
'X-Backend-Storage-Policy-Index': policy.idx}, 'sda1',
|
||||
policy.idx)
|
||||
async_dir = diskfile.get_async_dir(policy.idx)
|
||||
self.assertEquals(
|
||||
pickle.load(open(os.path.join(
|
||||
|
@ -2673,7 +2681,8 @@ class TestObjectController(unittest.TestCase):
|
|||
utils.Timestamp(1).internal))),
|
||||
{'headers': {'x-timestamp': '1', 'x-out': str(status),
|
||||
'user-agent': 'obj-server %s' % os.getpid(),
|
||||
POLICY_INDEX: policy.idx},
|
||||
'X-Backend-Storage-Policy-Index':
|
||||
policy.idx},
|
||||
'account': 'a', 'container': 'c', 'obj': 'o',
|
||||
'op': 'PUT'})
|
||||
finally:
|
||||
|
@ -2799,7 +2808,7 @@ class TestObjectController(unittest.TestCase):
|
|||
'x-etag': 'd41d8cd98f00b204e9800998ecf8427e',
|
||||
'x-content-type': 'text/plain',
|
||||
'x-timestamp': utils.Timestamp(1).internal,
|
||||
POLICY_INDEX: '0', # default when not given
|
||||
'X-Backend-Storage-Policy-Index': '0', # default when not given
|
||||
'x-trans-id': '123',
|
||||
'referer': 'PUT http://localhost/sda1/0/a/c/o'}))
|
||||
|
||||
|
@ -2900,7 +2909,7 @@ class TestObjectController(unittest.TestCase):
|
|||
'DELETE', '.expiring_objects', '0000000000',
|
||||
'0000000002-a/c/o', None, None, None,
|
||||
HeaderKeyDict({
|
||||
POLICY_INDEX: 0,
|
||||
'X-Backend-Storage-Policy-Index': 0,
|
||||
'x-timestamp': utils.Timestamp('1').internal,
|
||||
'x-trans-id': '123',
|
||||
'referer': 'PUT http://localhost/v1/a/c/o'}),
|
||||
|
@ -2927,7 +2936,7 @@ class TestObjectController(unittest.TestCase):
|
|||
'DELETE', '.expiring_objects', '0000000000', '0000000000-a/c/o',
|
||||
None, None, None,
|
||||
HeaderKeyDict({
|
||||
POLICY_INDEX: 0,
|
||||
'X-Backend-Storage-Policy-Index': 0,
|
||||
'x-timestamp': utils.Timestamp('1').internal,
|
||||
'x-trans-id': '1234',
|
||||
'referer': 'PUT http://localhost/v1/a/c/o'}),
|
||||
|
@ -2954,7 +2963,7 @@ class TestObjectController(unittest.TestCase):
|
|||
'DELETE', '.expiring_objects', '9999936000', '9999999999-a/c/o',
|
||||
None, None, None,
|
||||
HeaderKeyDict({
|
||||
POLICY_INDEX: 0,
|
||||
'X-Backend-Storage-Policy-Index': 0,
|
||||
'x-timestamp': utils.Timestamp('1').internal,
|
||||
'x-trans-id': '1234',
|
||||
'referer': 'PUT http://localhost/v1/a/c/o'}),
|
||||
|
@ -2986,7 +2995,7 @@ class TestObjectController(unittest.TestCase):
|
|||
'PUT', '.expiring_objects', '0000000000', '0000000002-a/c/o',
|
||||
'127.0.0.1:1234',
|
||||
'3', 'sdc1', HeaderKeyDict({
|
||||
POLICY_INDEX: 0,
|
||||
'X-Backend-Storage-Policy-Index': 0,
|
||||
'x-size': '0',
|
||||
'x-etag': 'd41d8cd98f00b204e9800998ecf8427e',
|
||||
'x-content-type': 'text/plain',
|
||||
|
@ -3040,7 +3049,7 @@ class TestObjectController(unittest.TestCase):
|
|||
'DELETE', '.expiring_objects', '0000000000',
|
||||
'0000000002-a/c/o', None, None,
|
||||
None, HeaderKeyDict({
|
||||
POLICY_INDEX: 0,
|
||||
'X-Backend-Storage-Policy-Index': 0,
|
||||
'x-timestamp': utils.Timestamp('1').internal,
|
||||
'x-trans-id': '1234',
|
||||
'referer': 'DELETE http://localhost/v1/a/c/o'}),
|
||||
|
@ -4065,7 +4074,7 @@ class TestObjectController(unittest.TestCase):
|
|||
'Content-Type': 'application/x-test',
|
||||
'Foo': 'fooheader',
|
||||
'Baz': 'bazheader',
|
||||
POLICY_INDEX: 1,
|
||||
'X-Backend-Storage-Policy-Index': 1,
|
||||
'X-Object-Meta-1': 'One',
|
||||
'X-Object-Meta-Two': 'Two'})
|
||||
req.body = 'VERIFY'
|
||||
|
|
|
@ -27,7 +27,6 @@ from swift.common import constraints
|
|||
from swift.common import exceptions
|
||||
from swift.common import swob
|
||||
from swift.common import utils
|
||||
from swift.common.storage_policy import POLICY_INDEX
|
||||
from swift.obj import diskfile
|
||||
from swift.obj import server
|
||||
from swift.obj import ssync_receiver
|
||||
|
@ -1066,7 +1065,7 @@ class TestReceiver(unittest.TestCase):
|
|||
'Content-Encoding': 'gzip',
|
||||
'Specialty-Header': 'value',
|
||||
'Host': 'localhost:80',
|
||||
POLICY_INDEX: '0',
|
||||
'X-Backend-Storage-Policy-Index': '0',
|
||||
'X-Backend-Replication': 'True',
|
||||
'X-Backend-Replication-Headers': (
|
||||
'content-length x-timestamp x-object-meta-test1 '
|
||||
|
@ -1117,7 +1116,7 @@ class TestReceiver(unittest.TestCase):
|
|||
'Content-Encoding': 'gzip',
|
||||
'Specialty-Header': 'value',
|
||||
'Host': 'localhost:80',
|
||||
POLICY_INDEX: '1',
|
||||
'X-Backend-Storage-Policy-Index': '1',
|
||||
'X-Backend-Replication': 'True',
|
||||
'X-Backend-Replication-Headers': (
|
||||
'content-length x-timestamp x-object-meta-test1 '
|
||||
|
@ -1155,7 +1154,7 @@ class TestReceiver(unittest.TestCase):
|
|||
self.assertEqual(req.headers, {
|
||||
'X-Timestamp': '1364456113.76334',
|
||||
'Host': 'localhost:80',
|
||||
POLICY_INDEX: '0',
|
||||
'X-Backend-Storage-Policy-Index': '0',
|
||||
'X-Backend-Replication': 'True',
|
||||
'X-Backend-Replication-Headers': 'x-timestamp'})
|
||||
|
||||
|
@ -1256,7 +1255,7 @@ class TestReceiver(unittest.TestCase):
|
|||
'Content-Encoding': 'gzip',
|
||||
'Specialty-Header': 'value',
|
||||
'Host': 'localhost:80',
|
||||
POLICY_INDEX: '0',
|
||||
'X-Backend-Storage-Policy-Index': '0',
|
||||
'X-Backend-Replication': 'True',
|
||||
'X-Backend-Replication-Headers': (
|
||||
'content-length x-timestamp x-object-meta-test1 '
|
||||
|
@ -1268,7 +1267,7 @@ class TestReceiver(unittest.TestCase):
|
|||
self.assertEqual(req.headers, {
|
||||
'X-Timestamp': '1364456113.00002',
|
||||
'Host': 'localhost:80',
|
||||
POLICY_INDEX: '0',
|
||||
'X-Backend-Storage-Policy-Index': '0',
|
||||
'X-Backend-Replication': 'True',
|
||||
'X-Backend-Replication-Headers': 'x-timestamp'})
|
||||
req = _requests.pop(0)
|
||||
|
@ -1279,7 +1278,7 @@ class TestReceiver(unittest.TestCase):
|
|||
'Content-Length': '3',
|
||||
'X-Timestamp': '1364456113.00003',
|
||||
'Host': 'localhost:80',
|
||||
POLICY_INDEX: '0',
|
||||
'X-Backend-Storage-Policy-Index': '0',
|
||||
'X-Backend-Replication': 'True',
|
||||
'X-Backend-Replication-Headers': (
|
||||
'content-length x-timestamp')})
|
||||
|
@ -1292,7 +1291,7 @@ class TestReceiver(unittest.TestCase):
|
|||
'Content-Length': '4',
|
||||
'X-Timestamp': '1364456113.00004',
|
||||
'Host': 'localhost:80',
|
||||
POLICY_INDEX: '0',
|
||||
'X-Backend-Storage-Policy-Index': '0',
|
||||
'X-Backend-Replication': 'True',
|
||||
'X-Backend-Replication-Headers': (
|
||||
'content-length x-timestamp')})
|
||||
|
@ -1303,7 +1302,7 @@ class TestReceiver(unittest.TestCase):
|
|||
self.assertEqual(req.headers, {
|
||||
'X-Timestamp': '1364456113.00005',
|
||||
'Host': 'localhost:80',
|
||||
POLICY_INDEX: '0',
|
||||
'X-Backend-Storage-Policy-Index': '0',
|
||||
'X-Backend-Replication': 'True',
|
||||
'X-Backend-Replication-Headers': 'x-timestamp'})
|
||||
req = _requests.pop(0)
|
||||
|
@ -1312,7 +1311,7 @@ class TestReceiver(unittest.TestCase):
|
|||
self.assertEqual(req.headers, {
|
||||
'X-Timestamp': '1364456113.00006',
|
||||
'Host': 'localhost:80',
|
||||
POLICY_INDEX: '0',
|
||||
'X-Backend-Storage-Policy-Index': '0',
|
||||
'X-Backend-Replication': 'True',
|
||||
'X-Backend-Replication-Headers': 'x-timestamp'})
|
||||
self.assertEqual(_requests, [])
|
||||
|
@ -1376,7 +1375,7 @@ class TestReceiver(unittest.TestCase):
|
|||
'Content-Length': '3',
|
||||
'X-Timestamp': '1364456113.00001',
|
||||
'Host': 'localhost:80',
|
||||
POLICY_INDEX: '0',
|
||||
'X-Backend-Storage-Policy-Index': '0',
|
||||
'X-Backend-Replication': 'True',
|
||||
'X-Backend-Replication-Headers': (
|
||||
'content-length x-timestamp')})
|
||||
|
@ -1388,7 +1387,7 @@ class TestReceiver(unittest.TestCase):
|
|||
'Content-Length': '1',
|
||||
'X-Timestamp': '1364456113.00002',
|
||||
'Host': 'localhost:80',
|
||||
POLICY_INDEX: '0',
|
||||
'X-Backend-Storage-Policy-Index': '0',
|
||||
'X-Backend-Replication': 'True',
|
||||
'X-Backend-Replication-Headers': (
|
||||
'content-length x-timestamp')})
|
||||
|
|
|
@ -26,7 +26,6 @@ import mock
|
|||
|
||||
from swift.common import exceptions, utils
|
||||
from swift.obj import ssync_sender, diskfile
|
||||
from swift.common.storage_policy import POLICY_INDEX
|
||||
|
||||
from test.unit import DebugLogger, patch_policies
|
||||
|
||||
|
@ -228,7 +227,7 @@ class TestSender(unittest.TestCase):
|
|||
],
|
||||
'putheader': [
|
||||
mock.call('Transfer-Encoding', 'chunked'),
|
||||
mock.call(POLICY_INDEX, 1),
|
||||
mock.call('X-Backend-Storage-Policy-Index', 1),
|
||||
],
|
||||
'endheaders': [mock.call()],
|
||||
}
|
||||
|
|
|
@ -37,7 +37,7 @@ from swift.common.utils import hash_path, normalize_timestamp, mkdirs, \
|
|||
write_pickle
|
||||
from swift.common import swob
|
||||
from test.unit import debug_logger, patch_policies, mocked_http_conn
|
||||
from swift.common.storage_policy import StoragePolicy, POLICIES, POLICY_INDEX
|
||||
from swift.common.storage_policy import StoragePolicy, POLICIES
|
||||
|
||||
|
||||
_mocked_policies = [StoragePolicy(0, 'zero', False),
|
||||
|
@ -288,7 +288,8 @@ class TestObjectUpdater(unittest.TestCase):
|
|||
line.split(':')[1].strip()
|
||||
line = inc.readline()
|
||||
self.assertTrue('x-container-timestamp' in headers)
|
||||
self.assertTrue(POLICY_INDEX in headers)
|
||||
self.assertTrue('X-Backend-Storage-Policy-Index' in
|
||||
headers)
|
||||
except BaseException as err:
|
||||
return err
|
||||
return None
|
||||
|
@ -392,7 +393,8 @@ class TestObjectUpdater(unittest.TestCase):
|
|||
for request_args, request_kwargs in request_log:
|
||||
ip, part, method, path, headers, qs, ssl = request_args
|
||||
self.assertEqual(method, op)
|
||||
self.assertEqual(headers[POLICY_INDEX], str(policy.idx))
|
||||
self.assertEqual(headers['X-Backend-Storage-Policy-Index'],
|
||||
str(policy.idx))
|
||||
self.assertEqual(daemon.logger.get_increment_counts(),
|
||||
{'successes': 1, 'unlinks': 1,
|
||||
'async_pendings': 1})
|
||||
|
@ -420,7 +422,7 @@ class TestObjectUpdater(unittest.TestCase):
|
|||
'x-content-type': 'text/plain',
|
||||
'x-etag': 'd41d8cd98f00b204e9800998ecf8427e',
|
||||
'x-timestamp': ts.next(),
|
||||
POLICY_INDEX: policy.idx,
|
||||
'X-Backend-Storage-Policy-Index': policy.idx,
|
||||
})
|
||||
data = {'op': op, 'account': account, 'container': container,
|
||||
'obj': obj, 'headers': headers_out}
|
||||
|
@ -444,7 +446,8 @@ class TestObjectUpdater(unittest.TestCase):
|
|||
for request_args, request_kwargs in request_log:
|
||||
ip, part, method, path, headers, qs, ssl = request_args
|
||||
self.assertEqual(method, 'PUT')
|
||||
self.assertEqual(headers[POLICY_INDEX], str(policy.idx))
|
||||
self.assertEqual(headers['X-Backend-Storage-Policy-Index'],
|
||||
str(policy.idx))
|
||||
self.assertEqual(daemon.logger.get_increment_counts(),
|
||||
{'successes': 1, 'unlinks': 1, 'async_pendings': 1})
|
||||
|
||||
|
|
|
@ -315,6 +315,7 @@ class TestFuncs(unittest.TestCase):
|
|||
req = Request.blank("/v1/AUTH_account/cont",
|
||||
environ={'swift.cache': FakeCache({})})
|
||||
resp = get_container_info(req.environ, FakeApp())
|
||||
self.assertEquals(resp['storage_policy'], '0')
|
||||
self.assertEquals(resp['bytes'], 6666)
|
||||
self.assertEquals(resp['object_count'], 1000)
|
||||
|
||||
|
@ -342,6 +343,7 @@ class TestFuncs(unittest.TestCase):
|
|||
req = Request.blank("/v1/account/cont",
|
||||
environ={'swift.cache': FakeCache(cache_stub)})
|
||||
resp = get_container_info(req.environ, FakeApp())
|
||||
self.assertEquals(resp['storage_policy'], '0')
|
||||
self.assertEquals(resp['bytes'], 3333)
|
||||
self.assertEquals(resp['object_count'], 10)
|
||||
self.assertEquals(resp['status'], 404)
|
||||
|
|
|
@ -14,33 +14,50 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import itertools
|
||||
import time
|
||||
import unittest
|
||||
from contextlib import contextmanager
|
||||
|
||||
import mock
|
||||
|
||||
import swift
|
||||
from swift.common import utils, swob
|
||||
from swift.proxy import server as proxy_server
|
||||
from swift.common.swob import HTTPException
|
||||
from test.unit import FakeRing, FakeMemcache, fake_http_connect, debug_logger
|
||||
from swift.common.storage_policy import StoragePolicy
|
||||
from swift.common.storage_policy import StoragePolicy, POLICIES
|
||||
|
||||
from test.unit import patch_policies
|
||||
from test.unit import FakeRing, FakeMemcache, fake_http_connect, \
|
||||
debug_logger, patch_policies
|
||||
|
||||
|
||||
@contextmanager
|
||||
def set_http_connect(*args, **kwargs):
|
||||
old_connect = swift.proxy.controllers.base.http_connect
|
||||
new_connect = fake_http_connect(*args, **kwargs)
|
||||
swift.proxy.controllers.base.http_connect = new_connect
|
||||
swift.proxy.controllers.obj.http_connect = new_connect
|
||||
swift.proxy.controllers.account.http_connect = new_connect
|
||||
swift.proxy.controllers.container.http_connect = new_connect
|
||||
yield new_connect
|
||||
swift.proxy.controllers.base.http_connect = old_connect
|
||||
swift.proxy.controllers.obj.http_connect = old_connect
|
||||
swift.proxy.controllers.account.http_connect = old_connect
|
||||
swift.proxy.controllers.container.http_connect = old_connect
|
||||
try:
|
||||
swift.proxy.controllers.base.http_connect = new_connect
|
||||
swift.proxy.controllers.obj.http_connect = new_connect
|
||||
swift.proxy.controllers.account.http_connect = new_connect
|
||||
swift.proxy.controllers.container.http_connect = new_connect
|
||||
yield new_connect
|
||||
left_over_status = list(new_connect.code_iter)
|
||||
if left_over_status:
|
||||
raise AssertionError('left over status %r' % left_over_status)
|
||||
finally:
|
||||
swift.proxy.controllers.base.http_connect = old_connect
|
||||
swift.proxy.controllers.obj.http_connect = old_connect
|
||||
swift.proxy.controllers.account.http_connect = old_connect
|
||||
swift.proxy.controllers.container.http_connect = old_connect
|
||||
|
||||
|
||||
class PatchedObjControllerApp(proxy_server.Application):
|
||||
|
||||
object_controller = proxy_server.ObjectController
|
||||
|
||||
def handle_request(self, req):
|
||||
with mock.patch('swift.proxy.server.ObjectController',
|
||||
new=self.object_controller):
|
||||
return super(PatchedObjControllerApp, self).handle_request(req)
|
||||
|
||||
|
||||
@patch_policies([StoragePolicy(0, 'zero', True,
|
||||
|
@ -90,129 +107,314 @@ class TestObjControllerWriteAffinity(unittest.TestCase):
|
|||
def test_connect_put_node_timeout(self):
|
||||
controller = proxy_server.ObjectController(self.app, 'a', 'c', 'o')
|
||||
self.app.conn_timeout = 0.05
|
||||
with set_http_connect(200, slow_connect=True):
|
||||
with set_http_connect(slow_connect=True):
|
||||
nodes = [dict(ip='', port='', device='')]
|
||||
res = controller._connect_put_node(nodes, '', '', {}, ('', ''))
|
||||
self.assertTrue(res is None)
|
||||
|
||||
|
||||
@patch_policies([StoragePolicy(0, 'zero', True, object_ring=FakeRing())])
|
||||
@patch_policies([
|
||||
StoragePolicy(0, 'zero', True),
|
||||
StoragePolicy(1, 'one'),
|
||||
StoragePolicy(2, 'two'),
|
||||
])
|
||||
class TestObjController(unittest.TestCase):
|
||||
container_info = {
|
||||
'partition': 1,
|
||||
'nodes': [
|
||||
{'ip': '127.0.0.1', 'port': '1', 'device': 'sda'},
|
||||
{'ip': '127.0.0.1', 'port': '2', 'device': 'sda'},
|
||||
{'ip': '127.0.0.1', 'port': '3', 'device': 'sda'},
|
||||
],
|
||||
'write_acl': None,
|
||||
'read_acl': None,
|
||||
'storage_policy': None,
|
||||
'sync_key': None,
|
||||
'versions': None,
|
||||
}
|
||||
|
||||
def setUp(self):
|
||||
# setup fake rings with handoffs
|
||||
self.obj_ring = FakeRing(max_more_nodes=3)
|
||||
for policy in POLICIES:
|
||||
policy.object_ring = self.obj_ring
|
||||
|
||||
logger = debug_logger('proxy-server')
|
||||
logger.thread_locals = ('txn1', '127.0.0.2')
|
||||
self.app = proxy_server.Application(
|
||||
self.app = PatchedObjControllerApp(
|
||||
None, FakeMemcache(), account_ring=FakeRing(),
|
||||
container_ring=FakeRing(), logger=logger)
|
||||
self.controller = proxy_server.ObjectController(self.app,
|
||||
'a', 'c', 'o')
|
||||
self.controller.container_info = mock.MagicMock(return_value={
|
||||
'partition': 1,
|
||||
'nodes': [
|
||||
{'ip': '127.0.0.1', 'port': '1', 'device': 'sda'},
|
||||
{'ip': '127.0.0.1', 'port': '2', 'device': 'sda'},
|
||||
{'ip': '127.0.0.1', 'port': '3', 'device': 'sda'},
|
||||
],
|
||||
'write_acl': None,
|
||||
'read_acl': None,
|
||||
'storage_policy': None,
|
||||
'sync_key': None,
|
||||
'versions': None})
|
||||
|
||||
class FakeContainerInfoObjController(proxy_server.ObjectController):
|
||||
|
||||
def container_info(controller, *args, **kwargs):
|
||||
patch_path = 'swift.proxy.controllers.base.get_info'
|
||||
with mock.patch(patch_path) as mock_get_info:
|
||||
mock_get_info.return_value = dict(self.container_info)
|
||||
return super(FakeContainerInfoObjController,
|
||||
controller).container_info(*args, **kwargs)
|
||||
|
||||
self.app.object_controller = FakeContainerInfoObjController
|
||||
|
||||
def test_PUT_simple(self):
|
||||
req = swift.common.swob.Request.blank('/v1/a/c/o')
|
||||
req = swift.common.swob.Request.blank('/v1/a/c/o', method='PUT')
|
||||
req.headers['content-length'] = '0'
|
||||
with set_http_connect(201, 201, 201):
|
||||
resp = self.controller.PUT(req)
|
||||
resp = req.get_response(self.app)
|
||||
self.assertEquals(resp.status_int, 201)
|
||||
|
||||
def test_PUT_if_none_match(self):
|
||||
req = swift.common.swob.Request.blank('/v1/a/c/o')
|
||||
req = swift.common.swob.Request.blank('/v1/a/c/o', method='PUT')
|
||||
req.headers['if-none-match'] = '*'
|
||||
req.headers['content-length'] = '0'
|
||||
with set_http_connect(201, 201, 201):
|
||||
resp = self.controller.PUT(req)
|
||||
resp = req.get_response(self.app)
|
||||
self.assertEquals(resp.status_int, 201)
|
||||
|
||||
def test_PUT_if_none_match_denied(self):
|
||||
req = swift.common.swob.Request.blank('/v1/a/c/o')
|
||||
req = swift.common.swob.Request.blank('/v1/a/c/o', method='PUT')
|
||||
req.headers['if-none-match'] = '*'
|
||||
req.headers['content-length'] = '0'
|
||||
with set_http_connect(201, (412, 412), 201):
|
||||
resp = self.controller.PUT(req)
|
||||
with set_http_connect(201, 412, 201):
|
||||
resp = req.get_response(self.app)
|
||||
self.assertEquals(resp.status_int, 412)
|
||||
|
||||
def test_PUT_if_none_match_not_star(self):
|
||||
req = swift.common.swob.Request.blank('/v1/a/c/o')
|
||||
req = swift.common.swob.Request.blank('/v1/a/c/o', method='PUT')
|
||||
req.headers['if-none-match'] = 'somethingelse'
|
||||
req.headers['content-length'] = '0'
|
||||
with set_http_connect(201, 201, 201):
|
||||
resp = self.controller.PUT(req)
|
||||
with set_http_connect():
|
||||
resp = req.get_response(self.app)
|
||||
self.assertEquals(resp.status_int, 400)
|
||||
|
||||
def test_GET_simple(self):
|
||||
req = swift.common.swob.Request.blank('/v1/a/c/o')
|
||||
with set_http_connect(200):
|
||||
resp = self.controller.GET(req)
|
||||
resp = req.get_response(self.app)
|
||||
self.assertEquals(resp.status_int, 200)
|
||||
|
||||
def test_DELETE_simple(self):
|
||||
def test_GET_error(self):
|
||||
req = swift.common.swob.Request.blank('/v1/a/c/o')
|
||||
with set_http_connect(503, 200):
|
||||
resp = req.get_response(self.app)
|
||||
self.assertEquals(resp.status_int, 200)
|
||||
|
||||
def test_GET_handoff(self):
|
||||
req = swift.common.swob.Request.blank('/v1/a/c/o')
|
||||
codes = [503] * self.obj_ring.replicas + [200]
|
||||
with set_http_connect(*codes):
|
||||
resp = req.get_response(self.app)
|
||||
self.assertEquals(resp.status_int, 200)
|
||||
|
||||
def test_GET_not_found(self):
|
||||
req = swift.common.swob.Request.blank('/v1/a/c/o')
|
||||
codes = [404] * (self.obj_ring.replicas +
|
||||
self.obj_ring.max_more_nodes)
|
||||
with set_http_connect(*codes):
|
||||
resp = req.get_response(self.app)
|
||||
self.assertEquals(resp.status_int, 404)
|
||||
|
||||
def test_DELETE_simple(self):
|
||||
req = swift.common.swob.Request.blank('/v1/a/c/o', method='DELETE')
|
||||
with set_http_connect(204, 204, 204):
|
||||
resp = self.controller.DELETE(req)
|
||||
resp = req.get_response(self.app)
|
||||
self.assertEquals(resp.status_int, 204)
|
||||
|
||||
def test_DELETE_missing_one(self):
|
||||
req = swift.common.swob.Request.blank('/v1/a/c/o', method='DELETE')
|
||||
with set_http_connect(404, 204, 204):
|
||||
resp = req.get_response(self.app)
|
||||
self.assertEquals(resp.status_int, 204)
|
||||
|
||||
def test_DELETE_not_found(self):
|
||||
req = swift.common.swob.Request.blank('/v1/a/c/o', method='DELETE')
|
||||
with set_http_connect(404, 404, 204):
|
||||
resp = req.get_response(self.app)
|
||||
self.assertEquals(resp.status_int, 404)
|
||||
|
||||
def test_DELETE_handoff(self):
|
||||
req = swift.common.swob.Request.blank('/v1/a/c/o', method='DELETE')
|
||||
codes = [204] * self.obj_ring.replicas
|
||||
with set_http_connect(507, *codes):
|
||||
resp = req.get_response(self.app)
|
||||
self.assertEquals(resp.status_int, 204)
|
||||
|
||||
def test_POST_as_COPY_simple(self):
|
||||
req = swift.common.swob.Request.blank('/v1/a/c/o')
|
||||
with set_http_connect(200, 200, 200, 201, 201, 201) as fake_conn:
|
||||
resp = self.controller.POST(req)
|
||||
self.assertRaises(StopIteration, fake_conn.code_iter.next)
|
||||
req = swift.common.swob.Request.blank('/v1/a/c/o', method='POST')
|
||||
head_resp = [200] * self.obj_ring.replicas + \
|
||||
[404] * self.obj_ring.max_more_nodes
|
||||
put_resp = [201] * self.obj_ring.replicas
|
||||
codes = head_resp + put_resp
|
||||
with set_http_connect(*codes):
|
||||
resp = req.get_response(self.app)
|
||||
self.assertEquals(resp.status_int, 202)
|
||||
|
||||
def test_container_sync_put_x_timestamp_not_found(self):
|
||||
test_indexes = [None] + [int(p) for p in POLICIES]
|
||||
for policy_index in test_indexes:
|
||||
self.container_info['storage_policy'] = policy_index
|
||||
put_timestamp = utils.Timestamp(time.time()).normal
|
||||
req = swob.Request.blank(
|
||||
'/v1/a/c/o', method='PUT', headers={
|
||||
'Content-Length': 0,
|
||||
'X-Timestamp': put_timestamp})
|
||||
ts_iter = itertools.repeat(put_timestamp)
|
||||
head_resp = [404] * self.obj_ring.replicas + \
|
||||
[404] * self.obj_ring.max_more_nodes
|
||||
put_resp = [201] * self.obj_ring.replicas
|
||||
codes = head_resp + put_resp
|
||||
with set_http_connect(*codes, timestamps=ts_iter):
|
||||
resp = req.get_response(self.app)
|
||||
self.assertEqual(resp.status_int, 201)
|
||||
|
||||
def test_container_sync_put_x_timestamp_match(self):
|
||||
test_indexes = [None] + [int(p) for p in POLICIES]
|
||||
for policy_index in test_indexes:
|
||||
self.container_info['storage_policy'] = policy_index
|
||||
put_timestamp = utils.Timestamp(time.time()).normal
|
||||
req = swob.Request.blank(
|
||||
'/v1/a/c/o', method='PUT', headers={
|
||||
'Content-Length': 0,
|
||||
'X-Timestamp': put_timestamp})
|
||||
ts_iter = itertools.repeat(put_timestamp)
|
||||
head_resp = [200] * self.obj_ring.replicas + \
|
||||
[404] * self.obj_ring.max_more_nodes
|
||||
codes = head_resp
|
||||
with set_http_connect(*codes, timestamps=ts_iter):
|
||||
resp = req.get_response(self.app)
|
||||
self.assertEqual(resp.status_int, 202)
|
||||
|
||||
def test_container_sync_put_x_timestamp_older(self):
|
||||
ts = (utils.Timestamp(t) for t in itertools.count(int(time.time())))
|
||||
test_indexes = [None] + [int(p) for p in POLICIES]
|
||||
for policy_index in test_indexes:
|
||||
self.container_info['storage_policy'] = policy_index
|
||||
req = swob.Request.blank(
|
||||
'/v1/a/c/o', method='PUT', headers={
|
||||
'Content-Length': 0,
|
||||
'X-Timestamp': ts.next().internal})
|
||||
ts_iter = itertools.repeat(ts.next().internal)
|
||||
head_resp = [200] * self.obj_ring.replicas + \
|
||||
[404] * self.obj_ring.max_more_nodes
|
||||
codes = head_resp
|
||||
with set_http_connect(*codes, timestamps=ts_iter):
|
||||
resp = req.get_response(self.app)
|
||||
self.assertEqual(resp.status_int, 202)
|
||||
|
||||
def test_container_sync_put_x_timestamp_newer(self):
|
||||
ts = (utils.Timestamp(t) for t in itertools.count(int(time.time())))
|
||||
test_indexes = [None] + [int(p) for p in POLICIES]
|
||||
for policy_index in test_indexes:
|
||||
orig_timestamp = ts.next().internal
|
||||
req = swob.Request.blank(
|
||||
'/v1/a/c/o', method='PUT', headers={
|
||||
'Content-Length': 0,
|
||||
'X-Timestamp': ts.next().internal})
|
||||
ts_iter = itertools.repeat(orig_timestamp)
|
||||
head_resp = [200] * self.obj_ring.replicas + \
|
||||
[404] * self.obj_ring.max_more_nodes
|
||||
put_resp = [201] * self.obj_ring.replicas
|
||||
codes = head_resp + put_resp
|
||||
with set_http_connect(*codes, timestamps=ts_iter):
|
||||
resp = req.get_response(self.app)
|
||||
self.assertEqual(resp.status_int, 201)
|
||||
|
||||
def test_container_sync_delete(self):
|
||||
ts = (utils.Timestamp(t) for t in itertools.count(int(time.time())))
|
||||
test_indexes = [None] + [int(p) for p in POLICIES]
|
||||
for policy_index in test_indexes:
|
||||
req = swob.Request.blank(
|
||||
'/v1/a/c/o', method='DELETE', headers={
|
||||
'X-Timestamp': ts.next().internal})
|
||||
codes = [409] * self.obj_ring.replicas
|
||||
ts_iter = itertools.repeat(ts.next().internal)
|
||||
with set_http_connect(*codes, timestamps=ts_iter):
|
||||
resp = req.get_response(self.app)
|
||||
self.assertEqual(resp.status_int, 409)
|
||||
|
||||
def test_put_x_timestamp_conflict(self):
|
||||
ts = (utils.Timestamp(t) for t in itertools.count(int(time.time())))
|
||||
req = swob.Request.blank(
|
||||
'/v1/a/c/o', method='PUT', headers={
|
||||
'Content-Length': 0,
|
||||
'X-Timestamp': ts.next().internal})
|
||||
head_resp = [404] * self.obj_ring.replicas + \
|
||||
[404] * self.obj_ring.max_more_nodes
|
||||
put_resp = [409] + [201] * self.obj_ring.replicas
|
||||
codes = head_resp + put_resp
|
||||
with set_http_connect(*codes):
|
||||
resp = req.get_response(self.app)
|
||||
self.assertEqual(resp.status_int, 201)
|
||||
|
||||
def test_COPY_simple(self):
|
||||
req = swift.common.swob.Request.blank(
|
||||
'/v1/a/c/o', headers={'Content-Length': 0,
|
||||
'Destination': 'c/o-copy'})
|
||||
with set_http_connect(200, 200, 200, 201, 201, 201) as fake_conn:
|
||||
resp = self.controller.COPY(req)
|
||||
self.assertRaises(StopIteration, fake_conn.code_iter.next)
|
||||
'/v1/a/c/o', method='COPY',
|
||||
headers={'Content-Length': 0,
|
||||
'Destination': 'c/o-copy'})
|
||||
head_resp = [200] * self.obj_ring.replicas + \
|
||||
[404] * self.obj_ring.max_more_nodes
|
||||
put_resp = [201] * self.obj_ring.replicas
|
||||
codes = head_resp + put_resp
|
||||
with set_http_connect(*codes):
|
||||
resp = req.get_response(self.app)
|
||||
self.assertEquals(resp.status_int, 201)
|
||||
|
||||
def test_HEAD_simple(self):
|
||||
req = swift.common.swob.Request.blank('/v1/a/c/o')
|
||||
req = swift.common.swob.Request.blank('/v1/a/c/o', method='HEAD')
|
||||
with set_http_connect(200):
|
||||
resp = self.controller.HEAD(req)
|
||||
resp = req.get_response(self.app)
|
||||
self.assertEquals(resp.status_int, 200)
|
||||
|
||||
def test_HEAD_x_newest(self):
|
||||
req = swift.common.swob.Request.blank('/v1/a/c/o',
|
||||
req = swift.common.swob.Request.blank('/v1/a/c/o', method='HEAD',
|
||||
headers={'X-Newest': 'true'})
|
||||
with set_http_connect(200, 200, 200) as fake_conn:
|
||||
resp = self.controller.HEAD(req)
|
||||
self.assertRaises(StopIteration, fake_conn.code_iter.next)
|
||||
with set_http_connect(200, 200, 200):
|
||||
resp = req.get_response(self.app)
|
||||
self.assertEquals(resp.status_int, 200)
|
||||
|
||||
def test_PUT_log_info(self):
|
||||
# mock out enough to get to the area of the code we want to test
|
||||
with mock.patch('swift.proxy.controllers.obj.check_object_creation',
|
||||
mock.MagicMock(return_value=None)):
|
||||
req = swift.common.swob.Request.blank('/v1/a/c/o')
|
||||
req.headers['x-copy-from'] = 'somewhere'
|
||||
try:
|
||||
self.controller.PUT(req)
|
||||
except HTTPException:
|
||||
pass
|
||||
self.assertEquals(
|
||||
req.environ.get('swift.log_info'), ['x-copy-from:somewhere'])
|
||||
# and then check that we don't do that for originating POSTs
|
||||
req = swift.common.swob.Request.blank('/v1/a/c/o')
|
||||
req.method = 'POST'
|
||||
req.headers['x-copy-from'] = 'elsewhere'
|
||||
try:
|
||||
self.controller.PUT(req)
|
||||
except HTTPException:
|
||||
pass
|
||||
self.assertEquals(req.environ.get('swift.log_info'), None)
|
||||
req = swift.common.swob.Request.blank('/v1/a/c/o', method='PUT')
|
||||
req.headers['x-copy-from'] = 'some/where'
|
||||
req.headers['Content-Length'] = 0
|
||||
# override FakeConn default resp headers to keep log_info clean
|
||||
resp_headers = {'x-delete-at': None}
|
||||
head_resp = [200] * self.obj_ring.replicas + \
|
||||
[404] * self.obj_ring.max_more_nodes
|
||||
put_resp = [201] * self.obj_ring.replicas
|
||||
codes = head_resp + put_resp
|
||||
with set_http_connect(*codes, headers=resp_headers):
|
||||
resp = req.get_response(self.app)
|
||||
self.assertEqual(resp.status_int, 201)
|
||||
self.assertEquals(
|
||||
req.environ.get('swift.log_info'), ['x-copy-from:some/where'])
|
||||
# and then check that we don't do that for originating POSTs
|
||||
req = swift.common.swob.Request.blank('/v1/a/c/o')
|
||||
req.method = 'POST'
|
||||
req.headers['x-copy-from'] = 'else/where'
|
||||
with set_http_connect(*codes, headers=resp_headers):
|
||||
resp = req.get_response(self.app)
|
||||
self.assertEqual(resp.status_int, 202)
|
||||
self.assertEquals(req.environ.get('swift.log_info'), None)
|
||||
|
||||
|
||||
@patch_policies([
|
||||
StoragePolicy(0, 'zero', True),
|
||||
StoragePolicy(1, 'one'),
|
||||
StoragePolicy(2, 'two'),
|
||||
])
|
||||
class TestObjControllerLegacyCache(TestObjController):
|
||||
"""
|
||||
This test pretends like memcache returned a stored value that should
|
||||
resemble whatever "old" format. It catches KeyErrors you'd get if your
|
||||
code was expecting some new format during a rolling upgrade.
|
||||
"""
|
||||
|
||||
container_info = {
|
||||
'read_acl': None,
|
||||
'write_acl': None,
|
||||
'sync_key': None,
|
||||
'versions': None,
|
||||
}
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
|
|
@ -34,7 +34,7 @@ import re
|
|||
import random
|
||||
|
||||
import mock
|
||||
from eventlet import sleep, spawn, wsgi, listen
|
||||
from eventlet import sleep, spawn, wsgi, listen, Timeout
|
||||
from swift.common.utils import json
|
||||
|
||||
from test.unit import (
|
||||
|
@ -59,7 +59,7 @@ from swift.common.swob import Request, Response, HTTPUnauthorized, \
|
|||
HTTPException
|
||||
from swift.common import storage_policy
|
||||
from swift.common.storage_policy import StoragePolicy, \
|
||||
StoragePolicyCollection, POLICIES, POLICY, POLICY_INDEX
|
||||
StoragePolicyCollection, POLICIES
|
||||
from swift.common.request_helpers import get_sys_meta_prefix
|
||||
|
||||
# mocks
|
||||
|
@ -1138,11 +1138,20 @@ class TestObjectController(unittest.TestCase):
|
|||
with save_globals():
|
||||
controller = proxy_server.ObjectController(self.app, 'account',
|
||||
'container', 'object')
|
||||
# The (201, -4) tuples in there have the effect of letting the
|
||||
# initial connect succeed, after which getexpect() gets called and
|
||||
# then the -4 makes the response of that actually be 201 instead of
|
||||
# 100. Perfectly straightforward.
|
||||
set_http_connect(200, 200, (201, -4), (201, -4), (201, -4),
|
||||
# The (201, Exception('test')) tuples in there have the effect of
|
||||
# changing the status of the initial expect response. The default
|
||||
# expect response from FakeConn for 201 is 100.
|
||||
# But the object server won't send a 100 continue line if the
|
||||
# client doesn't send a expect 100 header (as is the case with
|
||||
# zero byte PUTs as validated by this test), nevertheless the
|
||||
# object controller calls getexpect without prejudice. In this
|
||||
# case the status from the response shows up early in getexpect
|
||||
# instead of having to wait until getresponse. The Exception is
|
||||
# in there to ensure that the object controller also *uses* the
|
||||
# result of getexpect instead of calling getresponse in which case
|
||||
# our FakeConn will blow up.
|
||||
success_codes = [(201, Exception('test'))] * 3
|
||||
set_http_connect(200, 200, *success_codes,
|
||||
give_connect=test_connect)
|
||||
req = Request.blank('/v1/a/c/o.jpg', {})
|
||||
req.content_length = 0
|
||||
|
@ -1165,7 +1174,12 @@ class TestObjectController(unittest.TestCase):
|
|||
with save_globals():
|
||||
controller = \
|
||||
proxy_server.ObjectController(self.app, 'a', 'c', 'o.jpg')
|
||||
set_http_connect(200, 200, 201, 201, 201,
|
||||
# the (100, 201) tuples in there are just being extra explicit
|
||||
# about the FakeConn returning the 100 Continue status when the
|
||||
# object controller calls getexpect. Which is FakeConn's default
|
||||
# for 201 if no expect_status is specified.
|
||||
success_codes = [(100, 201)] * 3
|
||||
set_http_connect(200, 200, *success_codes,
|
||||
give_connect=test_connect)
|
||||
req = Request.blank('/v1/a/c/o.jpg', {})
|
||||
req.content_length = 1
|
||||
|
@ -1585,11 +1599,19 @@ class TestObjectController(unittest.TestCase):
|
|||
res = controller.PUT(req)
|
||||
expected = str(expected)
|
||||
self.assertEquals(res.status[:len(expected)], expected)
|
||||
test_status_map((200, 200, 201, 201, -1), 201)
|
||||
test_status_map((200, 200, 201, 201, -2), 201) # expect timeout
|
||||
test_status_map((200, 200, 201, 201, -3), 201) # error limited
|
||||
test_status_map((200, 200, 201, -1, -1), 503)
|
||||
test_status_map((200, 200, 503, 503, -1), 503)
|
||||
test_status_map((200, 200, 201, 201, -1), 201) # connect exc
|
||||
# connect errors
|
||||
test_status_map((200, 200, Timeout(), 201, 201, ), 201)
|
||||
test_status_map((200, 200, 201, 201, Exception()), 201)
|
||||
# expect errors
|
||||
test_status_map((200, 200, (Timeout(), None), 201, 201), 201)
|
||||
test_status_map((200, 200, (Exception(), None), 201, 201), 201)
|
||||
# response errors
|
||||
test_status_map((200, 200, (100, Timeout()), 201, 201), 201)
|
||||
test_status_map((200, 200, (100, Exception()), 201, 201), 201)
|
||||
test_status_map((200, 200, 507, 201, 201), 201) # error limited
|
||||
test_status_map((200, 200, -1, 201, -1), 503)
|
||||
test_status_map((200, 200, 503, -1, 503), 503)
|
||||
|
||||
def test_PUT_send_exceptions(self):
|
||||
with save_globals():
|
||||
|
@ -1715,31 +1737,39 @@ class TestObjectController(unittest.TestCase):
|
|||
check_request(account_request, method='HEAD', path='/sda/1/a')
|
||||
container_request = backend_requests.pop(0)
|
||||
check_request(container_request, method='HEAD', path='/sda/1/a/c')
|
||||
for i, (device, request) in enumerate(zip(('sda', 'sdb', 'sdc'),
|
||||
backend_requests)):
|
||||
# make sure backend requests included expected container headers
|
||||
container_headers = {}
|
||||
for request in backend_requests:
|
||||
req_headers = request[2]
|
||||
device = req_headers['x-container-device']
|
||||
host = req_headers['x-container-host']
|
||||
container_headers[device] = host
|
||||
expectations = {
|
||||
'method': 'POST',
|
||||
'path': '/%s/1/a/c/o' % device,
|
||||
'path': '/1/a/c/o',
|
||||
'headers': {
|
||||
'X-Container-Host': '10.0.0.%d:100%d' % (i, i),
|
||||
'X-Container-Partition': '1',
|
||||
'Connection': 'close',
|
||||
'User-Agent': 'proxy-server %s' % os.getpid(),
|
||||
'Host': 'localhost:80',
|
||||
'X-Container-Device': device,
|
||||
'Referer': 'POST http://localhost/v1/a/c/o',
|
||||
'X-Object-Meta-Color': 'Blue',
|
||||
POLICY_INDEX: '1'
|
||||
'X-Backend-Storage-Policy-Index': '1'
|
||||
},
|
||||
}
|
||||
check_request(request, **expectations)
|
||||
|
||||
expected = {}
|
||||
for i, device in enumerate(['sda', 'sdb', 'sdc']):
|
||||
expected[device] = '10.0.0.%d:100%d' % (i, i)
|
||||
self.assertEqual(container_headers, expected)
|
||||
|
||||
# and again with policy override
|
||||
self.app.memcache.store = {}
|
||||
backend_requests = []
|
||||
req = Request.blank('/v1/a/c/o', {}, method='POST',
|
||||
headers={'X-Object-Meta-Color': 'Blue',
|
||||
POLICY_INDEX: 0})
|
||||
'X-Backend-Storage-Policy-Index': 0})
|
||||
with mocked_http_conn(
|
||||
200, 200, 202, 202, 202,
|
||||
headers=resp_headers, give_connect=capture_requests
|
||||
|
@ -1754,7 +1784,7 @@ class TestObjectController(unittest.TestCase):
|
|||
'path': '/1/a/c/o', # ignore device bit
|
||||
'headers': {
|
||||
'X-Object-Meta-Color': 'Blue',
|
||||
POLICY_INDEX: '0',
|
||||
'X-Backend-Storage-Policy-Index': '0',
|
||||
}
|
||||
}
|
||||
check_request(request, **expectations)
|
||||
|
@ -1765,7 +1795,7 @@ class TestObjectController(unittest.TestCase):
|
|||
backend_requests = []
|
||||
req = Request.blank('/v1/a/c/o', {}, method='POST',
|
||||
headers={'X-Object-Meta-Color': 'Blue',
|
||||
POLICY_INDEX: 0})
|
||||
'X-Backend-Storage-Policy-Index': 0})
|
||||
with mocked_http_conn(
|
||||
200, 200, 200, 200, 200, 201, 201, 201,
|
||||
headers=resp_headers, give_connect=capture_requests
|
||||
|
@ -1774,8 +1804,8 @@ class TestObjectController(unittest.TestCase):
|
|||
self.assertRaises(StopIteration, fake_conn.code_iter.next)
|
||||
self.assertEqual(resp.status_int, 202)
|
||||
self.assertEqual(len(backend_requests), 8)
|
||||
policy0 = {POLICY_INDEX: '0'}
|
||||
policy1 = {POLICY_INDEX: '1'}
|
||||
policy0 = {'X-Backend-Storage-Policy-Index': '0'}
|
||||
policy1 = {'X-Backend-Storage-Policy-Index': '1'}
|
||||
expected = [
|
||||
# account info
|
||||
{'method': 'HEAD', 'path': '/1/a'},
|
||||
|
@ -4234,8 +4264,10 @@ class TestObjectController(unittest.TestCase):
|
|||
headers=None, query_string=None):
|
||||
if method == "HEAD":
|
||||
self.assertEquals(path, '/a/c/o.jpg')
|
||||
self.assertNotEquals(None, headers[POLICY_INDEX])
|
||||
self.assertEquals(1, int(headers[POLICY_INDEX]))
|
||||
self.assertNotEquals(None,
|
||||
headers['X-Backend-Storage-Policy-Index'])
|
||||
self.assertEquals(1, int(headers
|
||||
['X-Backend-Storage-Policy-Index']))
|
||||
|
||||
def fake_container_info(account, container, req):
|
||||
return {'status': 200, 'sync_key': None, 'storage_policy': '1',
|
||||
|
@ -4333,7 +4365,8 @@ class TestObjectController(unittest.TestCase):
|
|||
expected_storage_policy = 0
|
||||
else:
|
||||
continue
|
||||
storage_policy_index = int(headers[POLICY_INDEX])
|
||||
storage_policy_index = \
|
||||
int(headers['X-Backend-Storage-Policy-Index'])
|
||||
self.assertEqual(
|
||||
expected_storage_policy, storage_policy_index,
|
||||
'Unexpected %s request for %s '
|
||||
|
@ -4829,7 +4862,7 @@ class TestContainerController(unittest.TestCase):
|
|||
for name, index in expected.items():
|
||||
req = Request.blank('/a/c', headers={'Content-Length': '0',
|
||||
'Content-Type': 'text/plain',
|
||||
POLICY: name})
|
||||
'X-Storage-Policy': name})
|
||||
self.assertEqual(controller._convert_policy_to_index(req), index)
|
||||
# default test
|
||||
req = Request.blank('/a/c', headers={'Content-Length': '0',
|
||||
|
@ -4837,13 +4870,14 @@ class TestContainerController(unittest.TestCase):
|
|||
self.assertEqual(controller._convert_policy_to_index(req), None)
|
||||
# negative test
|
||||
req = Request.blank('/a/c', headers={'Content-Length': '0',
|
||||
'Content-Type': 'text/plain', POLICY: 'nada'})
|
||||
'Content-Type': 'text/plain',
|
||||
'X-Storage-Policy': 'nada'})
|
||||
self.assertRaises(HTTPException, controller._convert_policy_to_index,
|
||||
req)
|
||||
# storage policy two is deprecated
|
||||
req = Request.blank('/a/c', headers={'Content-Length': '0',
|
||||
'Content-Type': 'text/plain',
|
||||
POLICY: 'two'})
|
||||
'X-Storage-Policy': 'two'})
|
||||
self.assertRaises(HTTPException, controller._convert_policy_to_index,
|
||||
req)
|
||||
|
||||
|
@ -4852,33 +4886,34 @@ class TestContainerController(unittest.TestCase):
|
|||
req = Request.blank('/v1/a/c')
|
||||
with mocked_http_conn(
|
||||
200, 200,
|
||||
headers={POLICY_INDEX: int(policy)},
|
||||
headers={'X-Backend-Storage-Policy-Index': int(policy)},
|
||||
) as fake_conn:
|
||||
resp = req.get_response(self.app)
|
||||
self.assertRaises(StopIteration, fake_conn.code_iter.next)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
self.assertEqual(resp.headers[POLICY], policy.name)
|
||||
self.assertEqual(resp.headers['X-Storage-Policy'], policy.name)
|
||||
|
||||
def test_no_convert_index_to_name_when_container_not_found(self):
|
||||
policy = random.choice(list(POLICIES))
|
||||
req = Request.blank('/v1/a/c')
|
||||
with mocked_http_conn(
|
||||
200, 404, 404, 404,
|
||||
headers={POLICY_INDEX: int(policy)}) as fake_conn:
|
||||
headers={'X-Backend-Storage-Policy-Index':
|
||||
int(policy)}) as fake_conn:
|
||||
resp = req.get_response(self.app)
|
||||
self.assertRaises(StopIteration, fake_conn.code_iter.next)
|
||||
self.assertEqual(resp.status_int, 404)
|
||||
self.assertEqual(resp.headers[POLICY], None)
|
||||
self.assertEqual(resp.headers['X-Storage-Policy'], None)
|
||||
|
||||
def test_error_convert_index_to_name(self):
|
||||
req = Request.blank('/v1/a/c')
|
||||
with mocked_http_conn(
|
||||
200, 200,
|
||||
headers={POLICY_INDEX: '-1'}) as fake_conn:
|
||||
headers={'X-Backend-Storage-Policy-Index': '-1'}) as fake_conn:
|
||||
resp = req.get_response(self.app)
|
||||
self.assertRaises(StopIteration, fake_conn.code_iter.next)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
self.assertEqual(resp.headers[POLICY], None)
|
||||
self.assertEqual(resp.headers['X-Storage-Policy'], None)
|
||||
error_lines = self.app.logger.get_lines_for_level('error')
|
||||
self.assertEqual(2, len(error_lines))
|
||||
for msg in error_lines:
|
||||
|
@ -5010,7 +5045,7 @@ class TestContainerController(unittest.TestCase):
|
|||
headers={'Content-Length': 0})
|
||||
if requested_policy:
|
||||
expected_policy = requested_policy
|
||||
req.headers[POLICY] = policy.name
|
||||
req.headers['X-Storage-Policy'] = policy.name
|
||||
else:
|
||||
expected_policy = POLICIES.default
|
||||
res = req.get_response(self.app)
|
||||
|
@ -5028,15 +5063,18 @@ class TestContainerController(unittest.TestCase):
|
|||
len(backend_requests))
|
||||
for headers in backend_requests:
|
||||
if not requested_policy:
|
||||
self.assertFalse(POLICY_INDEX in headers)
|
||||
self.assertFalse('X-Backend-Storage-Policy-Index' in
|
||||
headers)
|
||||
self.assertTrue(
|
||||
'X-Backend-Storage-Policy-Default' in headers)
|
||||
self.assertEqual(
|
||||
int(expected_policy),
|
||||
int(headers['X-Backend-Storage-Policy-Default']))
|
||||
else:
|
||||
self.assertTrue(POLICY_INDEX in headers)
|
||||
self.assertEqual(int(headers[POLICY_INDEX]),
|
||||
self.assertTrue('X-Backend-Storage-Policy-Index' in
|
||||
headers)
|
||||
self.assertEqual(int(headers
|
||||
['X-Backend-Storage-Policy-Index']),
|
||||
policy.idx)
|
||||
# make sure all mocked responses are consumed
|
||||
self.assertRaises(StopIteration, mock_conn.code_iter.next)
|
||||
|
|
Loading…
Reference in New Issue