Merge remote-tracking branch 'origin/master' into feature/deep
Needed to add back the func-post-as-copy tox env (for some reason). Change-Id: I28acef130372f06a0528a1981044761b994eee61
This commit is contained in:
commit
3122895118
|
@ -3,4 +3,5 @@ branch = True
|
|||
omit = /usr*,setup.py,*egg*,.venv/*,.tox/*,test/*
|
||||
|
||||
[report]
|
||||
show_missing = True
|
||||
ignore_errors = True
|
||||
|
|
|
@ -17,5 +17,6 @@ pycscope.*
|
|||
MANIFEST
|
||||
|
||||
.testrepository/*
|
||||
.stestr/*
|
||||
subunit.log
|
||||
test/probe/.noseids
|
||||
|
|
20
.zuul.yaml
20
.zuul.yaml
|
@ -58,23 +58,6 @@
|
|||
parent: swift-tox-func
|
||||
nodeset: centos-7
|
||||
|
||||
- job:
|
||||
name: swift-tox-func-post-as-copy
|
||||
parent: swift-tox-base
|
||||
description: |
|
||||
Run functional tests for swift under cPython version 2.7.
|
||||
|
||||
Uses tox with the ``func-post-as-copy`` environment.
|
||||
It sets TMPDIR to an XFS mount point created via
|
||||
tools/test-setup.sh.
|
||||
vars:
|
||||
tox_envlist: func-post-as-copy
|
||||
|
||||
- job:
|
||||
name: swift-tox-func-post-as-copy-centos-7
|
||||
parent: swift-tox-func-post-as-copy
|
||||
nodeset: centos-7
|
||||
|
||||
- job:
|
||||
name: swift-tox-func-encryption
|
||||
parent: swift-tox-base
|
||||
|
@ -117,7 +100,6 @@
|
|||
- swift-tox-py27
|
||||
- swift-tox-py35
|
||||
- swift-tox-func
|
||||
- swift-tox-func-post-as-copy
|
||||
- swift-tox-func-encryption
|
||||
- swift-tox-func-ec
|
||||
gate:
|
||||
|
@ -125,13 +107,11 @@
|
|||
- swift-tox-py27
|
||||
- swift-tox-py35
|
||||
- swift-tox-func
|
||||
- swift-tox-func-post-as-copy
|
||||
- swift-tox-func-encryption
|
||||
- swift-tox-func-ec
|
||||
experimental:
|
||||
jobs:
|
||||
- swift-tox-py27-centos-7
|
||||
- swift-tox-func-centos-7
|
||||
- swift-tox-func-post-as-copy-centos-7
|
||||
- swift-tox-func-encryption-centos-7
|
||||
- swift-tox-func-ec-centos-7
|
||||
|
|
|
@ -16,7 +16,7 @@ and high concurrency. Swift is ideal for backups, web and mobile
|
|||
content, and any other unstructured data that can grow without bound.
|
||||
|
||||
Swift provides a simple, REST-based API fully documented at
|
||||
http://docs.openstack.org/.
|
||||
https://docs.openstack.org/.
|
||||
|
||||
Swift was originally developed as the basis for Rackspace's Cloud Files
|
||||
and was open-sourced in 2010 as part of the OpenStack project. It has
|
||||
|
@ -118,7 +118,7 @@ Swift is a WSGI application and uses eventlet's WSGI server. After the
|
|||
processes are running, the entry point for new requests is the
|
||||
``Application`` class in ``swift/proxy/server.py``. From there, a
|
||||
controller is chosen, and the request is processed. The proxy may choose
|
||||
to forward the request to a back- end server. For example, the entry
|
||||
to forward the request to a back-end server. For example, the entry
|
||||
point for requests to the object server is the ``ObjectController``
|
||||
class in ``swift/obj/server.py``.
|
||||
|
||||
|
@ -141,7 +141,7 @@ For Client Apps
|
|||
---------------
|
||||
|
||||
For client applications, official Python language bindings are provided
|
||||
at http://github.com/openstack/python-swiftclient.
|
||||
at https://github.com/openstack/python-swiftclient.
|
||||
|
||||
Complete API documentation at
|
||||
https://developer.openstack.org/api-ref/object-store/
|
||||
|
|
|
@ -692,18 +692,31 @@ X-Copy-From-Account:
|
|||
type: string
|
||||
X-Delete-After:
|
||||
description: |
|
||||
The number of seconds after which the system
|
||||
removes the object. Internally, the Object Storage system stores
|
||||
this value in the ``X-Delete-At`` metadata item.
|
||||
The number of seconds after which the system removes the object. The value
|
||||
should be a positive integer. Internally, the Object Storage system uses
|
||||
this value to generate an ``X-Delete-At`` metadata item. If both
|
||||
``X-Delete-After`` and ``X-Delete-At`` are set then ``X-Delete-After``
|
||||
takes precedence.
|
||||
in: header
|
||||
required: false
|
||||
type: integer
|
||||
X-Delete-At:
|
||||
description: |
|
||||
The date and time in `UNIX Epoch time stamp
|
||||
format <https://en.wikipedia.org/wiki/Unix_time>`_ when the system
|
||||
removes the object. For example, ``1440619048`` is equivalent to
|
||||
``Mon, Wed, 26 Aug 2015 19:57:28 GMT``.
|
||||
The date and time in `UNIX Epoch time stamp format
|
||||
<https://en.wikipedia.org/wiki/Unix_time>`_ when the system removes the
|
||||
object. For example, ``1440619048`` is equivalent to ``Mon, Wed, 26 Aug
|
||||
2015 19:57:28 GMT``. The value should be a positive integer corresponding
|
||||
to a time in the future. If both ``X-Delete-After`` and ``X-Delete-At`` are
|
||||
set then ``X-Delete-After`` takes precedence.
|
||||
in: header
|
||||
required: false
|
||||
type: integer
|
||||
X-Delete-At_resp:
|
||||
description: |
|
||||
If present, specifies date and time in `UNIX Epoch time stamp format
|
||||
<https://en.wikipedia.org/wiki/Unix_time>`_ when the system removes the
|
||||
object. For example, ``1440619048`` is equivalent to ``Mon, Wed, 26 Aug
|
||||
2015 19:57:28 GMT``.
|
||||
in: header
|
||||
required: false
|
||||
type: integer
|
||||
|
|
|
@ -130,7 +130,7 @@ Response Parameters
|
|||
- X-Object-Meta-name: X-Object-Meta-name_resp
|
||||
- Content-Disposition: Content-Disposition_resp
|
||||
- Content-Encoding: Content-Encoding_resp
|
||||
- X-Delete-At: X-Delete-At
|
||||
- X-Delete-At: X-Delete-At_resp
|
||||
- Accept-Ranges: Accept-Ranges
|
||||
- X-Object-Manifest: X-Object-Manifest_resp
|
||||
- Last-Modified: Last-Modified
|
||||
|
@ -602,7 +602,7 @@ Response Parameters
|
|||
- X-Object-Meta-name: X-Object-Meta-name
|
||||
- Content-Disposition: Content-Disposition_resp
|
||||
- Content-Encoding: Content-Encoding_resp
|
||||
- X-Delete-At: X-Delete-At
|
||||
- X-Delete-At: X-Delete-At_resp
|
||||
- X-Object-Manifest: X-Object-Manifest_resp
|
||||
- Last-Modified: Last-Modified
|
||||
- ETag: ETag_obj_resp
|
||||
|
@ -650,6 +650,12 @@ metadata on the request to copy the object, either PUT or COPY ,
|
|||
the metadata overwrites any conflicting keys on the target (new)
|
||||
object.
|
||||
|
||||
.. note::
|
||||
|
||||
While using COPY instead of POST allows sending only a subset of
|
||||
the metadata, it carries the cost of reading and rewriting the entire
|
||||
contents of the object.
|
||||
|
||||
A POST request deletes any existing custom metadata that you added
|
||||
with a previous PUT or POST request. Consequently, you must specify
|
||||
all custom metadata in the request. However, system metadata is
|
||||
|
@ -752,9 +758,9 @@ Request
|
|||
- X-Service-Token: X-Service-Token
|
||||
- X-Object-Meta-name: X-Object-Meta-name
|
||||
- X-Delete-At: X-Delete-At
|
||||
- X-Delete-After: X-Delete-After
|
||||
- Content-Disposition: Content-Disposition
|
||||
- Content-Encoding: Content-Encoding
|
||||
- X-Delete-After: X-Delete-After
|
||||
- Content-Type: Content-Type_obj_cu_req
|
||||
- X-Trans-Id-Extra: X-Trans-Id-Extra
|
||||
|
||||
|
|
|
@ -20,9 +20,8 @@ import os
|
|||
import sys
|
||||
|
||||
from gettext import gettext as _
|
||||
from six.moves.configparser import ConfigParser
|
||||
|
||||
from swift.common.utils import get_logger, dump_recon_cache
|
||||
from swift.common.utils import get_logger, dump_recon_cache, readconf
|
||||
from swift.obj.diskfile import ASYNCDIR_BASE
|
||||
|
||||
|
||||
|
@ -46,17 +45,13 @@ def get_async_count(device_dir, logger):
|
|||
|
||||
|
||||
def main():
|
||||
c = ConfigParser()
|
||||
try:
|
||||
conf_path = sys.argv[1]
|
||||
except Exception:
|
||||
print("Usage: %s CONF_FILE" % sys.argv[0].split('/')[-1])
|
||||
print("ex: swift-recon-cron /etc/swift/object-server.conf")
|
||||
sys.exit(1)
|
||||
if not c.read(conf_path):
|
||||
print("Unable to read config file %s" % conf_path)
|
||||
sys.exit(1)
|
||||
conf = dict(c.items('filter:recon'))
|
||||
conf = readconf(conf_path, 'filter:recon')
|
||||
device_dir = conf.get('devices', '/srv/node')
|
||||
recon_cache_path = conf.get('recon_cache_path', '/var/cache/swift')
|
||||
recon_lock_path = conf.get('recon_lock_path', '/var/lock')
|
||||
|
|
|
@ -194,12 +194,10 @@ This is normally \fBegg:swift#proxy_logging\fR. See proxy-server.conf-sample for
|
|||
.PD
|
||||
|
||||
|
||||
.SH ADDITIONAL SECTIONS
|
||||
.SH OBJECT EXPIRER SECTION
|
||||
.PD 1
|
||||
.RS 0
|
||||
The following sections are used by other swift-account services, such as replicator,
|
||||
auditor and reaper.
|
||||
.IP "\fB[account-replicator]\fR"
|
||||
.IP "\fB[object-expirer]\fR"
|
||||
.RE
|
||||
.RS 3
|
||||
.IP \fBinterval\fR
|
||||
|
@ -210,6 +208,9 @@ The default is ".".
|
|||
The default is 'expiring_objects'.
|
||||
.IP \fBreport_interval\fR
|
||||
The default is 300 seconds.
|
||||
.IP \fBrequest_tries\fR
|
||||
The number of times the expirer's internal client will
|
||||
attempt any given request in the event of failure. The default is 3.
|
||||
.IP \fBconcurrency\fR
|
||||
Number of expirer workers to spawn. The default is 1.
|
||||
.IP \fBprocesses\fR
|
||||
|
@ -227,6 +228,21 @@ up to reclaim_age seconds before it gives up and deletes the entry in the
|
|||
queue. The default is 604800 seconds.
|
||||
.IP \fBrecon_cache_path\fR
|
||||
Path to recon cache directory. The default is /var/cache/swift.
|
||||
.IP \fBnice_priority\fR
|
||||
Modify scheduling priority of server processes. Niceness values range from -20
|
||||
(most favorable to the process) to 19 (least favorable to the process).
|
||||
The default does not modify priority.
|
||||
.IP \fBionice_class\fR
|
||||
Modify I/O scheduling class of server processes. I/O niceness class values
|
||||
are IOPRIO_CLASS_RT (realtime), IOPRIO_CLASS_BE (best-effort) and IOPRIO_CLASS_IDLE (idle).
|
||||
The default does not modify class and priority.
|
||||
Work only with ionice_priority.
|
||||
.IP \fBionice_priority\fR
|
||||
Modify I/O scheduling priority of server processes. I/O niceness priority
|
||||
is a number which goes from 0 to 7. The higher the value, the lower
|
||||
the I/O priority of the process. Work only with ionice_class.
|
||||
Ignored if IOPRIO_CLASS_IDLE is set.
|
||||
|
||||
.RE
|
||||
.PD
|
||||
|
||||
|
|
|
@ -5,7 +5,7 @@ Object Storage monitoring
|
|||
.. note::
|
||||
|
||||
This section was excerpted from a `blog post by Darrell
|
||||
Bishop <http://swiftstack.com/blog/2012/04/11/swift-monitoring-with-statsd>`_ and
|
||||
Bishop <https://swiftstack.com/blog/2012/04/11/swift-monitoring-with-statsd>`_ and
|
||||
has since been edited.
|
||||
|
||||
An OpenStack Object Storage cluster is a collection of many daemons that
|
||||
|
@ -123,10 +123,8 @@ actual number when flushing meters upstream.
|
|||
|
||||
To avoid the problems inherent with middleware-based monitoring and
|
||||
after-the-fact log processing, the sending of StatsD meters is
|
||||
integrated into Object Storage itself. The submitted change set (see
|
||||
`<https://review.openstack.org/#change,6058>`_) currently reports 124 meters
|
||||
across 15 Object Storage daemons and the tempauth middleware. Details of
|
||||
the meters tracked are in the :doc:`/admin_guide`.
|
||||
integrated into Object Storage itself. Details of the meters tracked
|
||||
are in the :doc:`/admin_guide`.
|
||||
|
||||
The sending of meters is integrated with the logging framework. To
|
||||
enable, configure ``log_statsd_host`` in the relevant config file. You
|
||||
|
|
|
@ -120,25 +120,24 @@ out were you need to add capacity or to help tune an :ref:`ring_overload` value.
|
|||
Now let's take an example with 1 region, 3 zones and 4 devices. Each device has
|
||||
the same weight, and the ``dispersion --verbose`` might show the following::
|
||||
|
||||
Dispersion is 50.000000, Balance is 0.000000, Overload is 0.00%
|
||||
Dispersion is 16.666667, Balance is 0.000000, Overload is 0.00%
|
||||
Required overload is 33.333333%
|
||||
Worst tier is 50.000000 (r1z3)
|
||||
Worst tier is 33.333333 (r1z3)
|
||||
--------------------------------------------------------------------------
|
||||
Tier Parts % Max 0 1 2 3
|
||||
--------------------------------------------------------------------------
|
||||
r1 256 0.00 3 0 0 0 256
|
||||
r1 768 0.00 3 0 0 0 256
|
||||
r1z1 192 0.00 1 64 192 0 0
|
||||
r1z1-127.0.0.1 192 0.00 1 64 192 0 0
|
||||
r1z1-127.0.0.1/sda 192 0.00 1 64 192 0 0
|
||||
r1z2 192 0.00 1 64 192 0 0
|
||||
r1z2-127.0.0.2 192 0.00 1 64 192 0 0
|
||||
r1z2-127.0.0.2/sda 192 0.00 1 64 192 0 0
|
||||
r1z3 256 50.00 1 0 128 128 0
|
||||
r1z3-127.0.0.3 256 50.00 1 0 128 128 0
|
||||
r1z3 384 33.33 1 0 128 128 0
|
||||
r1z3-127.0.0.3 384 33.33 1 0 128 128 0
|
||||
r1z3-127.0.0.3/sda 192 0.00 1 64 192 0 0
|
||||
r1z3-127.0.0.3/sdb 192 0.00 1 64 192 0 0
|
||||
|
||||
|
||||
The first line reports that there are 256 partitions with 3 copies in region 1;
|
||||
and this is an expected output in this case (single region with 3 replicas) as
|
||||
reported by the "Max" value.
|
||||
|
@ -818,7 +817,7 @@ resolves to an IPv4 address, an IPv4 socket will be used to send StatsD UDP
|
|||
packets, even if the hostname would also resolve to an IPv6 address.
|
||||
|
||||
.. _StatsD: http://codeascraft.etsy.com/2011/02/15/measure-anything-measure-everything/
|
||||
.. _Graphite: http://graphite.wikidot.com/
|
||||
.. _Graphite: http://graphiteapp.org/
|
||||
.. _Ganglia: http://ganglia.sourceforge.net/
|
||||
|
||||
The sample rate is a real number between 0 and 1 which defines the
|
||||
|
|
|
@ -171,14 +171,14 @@ The API Reference describes the operations that you can perform with the
|
|||
Object Storage API:
|
||||
|
||||
- `Storage
|
||||
accounts <https://developer.openstack.org/api-ref/object-storage/index.html#accounts>`__:
|
||||
accounts <https://developer.openstack.org/api-ref/object-store/index.html#accounts>`__:
|
||||
Use to perform account-level tasks.
|
||||
|
||||
Lists containers for a specified account. Creates, updates, and
|
||||
deletes account metadata. Shows account metadata.
|
||||
|
||||
- `Storage
|
||||
containers <https://developer.openstack.org/api-ref/object-storage/index.html#containers>`__:
|
||||
containers <https://developer.openstack.org/api-ref/object-store/index.html#containers>`__:
|
||||
Use to perform container-level tasks.
|
||||
|
||||
Lists objects in a specified container. Creates, shows details for,
|
||||
|
@ -186,7 +186,7 @@ Object Storage API:
|
|||
container metadata.
|
||||
|
||||
- `Storage
|
||||
objects <https://developer.openstack.org/api-ref/object-storage/index.html#objects>`__:
|
||||
objects <https://developer.openstack.org/api-ref/object-store/index.html#objects>`__:
|
||||
Use to perform object-level tasks.
|
||||
|
||||
Creates, replaces, shows details for, and deletes objects. Copies
|
||||
|
|
|
@ -57,7 +57,7 @@ or ISO 8601 UTC timestamp. For example, ``1390852007`` or
|
|||
``Mon, 27 Jan 2014 19:46:47 GMT``.
|
||||
|
||||
For more information, see `Epoch & Unix Timestamp Conversion
|
||||
Tools <http://www.epochconverter.com/>`__.
|
||||
Tools <https://www.epochconverter.com/>`__.
|
||||
|
||||
**filename**: Optional. Overrides the default file name. Object Storage
|
||||
generates a default file name for **GET** temporary URLs that is based on the
|
||||
|
|
|
@ -10,20 +10,20 @@ Application Bindings
|
|||
|
||||
* OpenStack supported binding:
|
||||
|
||||
* `Python-SwiftClient <http://pypi.python.org/pypi/python-swiftclient>`_
|
||||
* `Python-SwiftClient <https://pypi.python.org/pypi/python-swiftclient>`_
|
||||
|
||||
* Unofficial libraries and bindings:
|
||||
|
||||
* `PHP-opencloud <http://php-opencloud.com>`_ - Official Rackspace PHP bindings that should work for other Swift deployments too.
|
||||
* `PyRAX <https://github.com/rackspace/pyrax>`_ - Official Rackspace Python bindings for CloudFiles that should work for other Swift deployments too.
|
||||
* `PyRAX <https://github.com/pycontribs/pyrax>`_ - Official Rackspace Python bindings for CloudFiles that should work for other Swift deployments too.
|
||||
* `openstack.net <https://github.com/rackspace/openstack.net/>`_ - Official Rackspace .NET bindings that should work for other Swift deployments too.
|
||||
* `RSwift <https://github.com/pandemicsyn/RSwift>`_ - R API bindings.
|
||||
* `Go language bindings <https://github.com/ncw/swift>`_
|
||||
* `supload <https://github.com/selectel/supload>`_ - Bash script to upload file to cloud storage based on OpenStack Swift API.
|
||||
* `libcloud <http://libcloud.apache.org>`_ - Apache Libcloud - a unified interface in Python for different clouds with OpenStack Swift support.
|
||||
* `SwiftBox <https://github.com/suniln/SwiftBox>`_ - C# library using RestSharp
|
||||
* `jclouds <http://jclouds.incubator.apache.org/documentation/quickstart/openstack/>`_ - Java library offering bindings for all OpenStack projects
|
||||
* `java-openstack-swift <https://github.com/dkocher/java-openstack-swift>`_ - Java bindings for OpenStack Swift
|
||||
* `jclouds <http://jclouds.apache.org/guides/openstack/>`_ - Java library offering bindings for all OpenStack projects
|
||||
* `java-openstack-swift <https://github.com/iterate-ch/java-openstack-swift>`_ - Java bindings for OpenStack Swift
|
||||
* `swift_client <https://github.com/mrkamel/swift_client>`_ - Small but powerful Ruby client to interact with OpenStack Swift
|
||||
* `nightcrawler_swift <https://github.com/tulios/nightcrawler_swift>`_ - This Ruby gem teleports your assets to a OpenStack Swift bucket/container
|
||||
* `swift storage <https://rubygems.org/gems/swift-storage>`_ - Simple OpenStack Swift storage client.
|
||||
|
@ -94,13 +94,13 @@ Storage Backends (DiskFile API implementations)
|
|||
|
||||
Developer Tools
|
||||
---------------
|
||||
* `SAIO bash scripts <https://github.com/ntata/swift-setup-scripts.git>`_ -
|
||||
* `SAIO bash scripts <https://github.com/ntata/swift-setup-scripts>`_ -
|
||||
Well commented simple bash scripts for Swift all in one setup.
|
||||
* `vagrant-swift-all-in-one
|
||||
<https://github.com/swiftstack/vagrant-swift-all-in-one>`_ - Quickly setup a
|
||||
standard development environment using Vagrant and Chef cookbooks in an
|
||||
Ubuntu virtual machine.
|
||||
* `SAIO Ansible playbook <https://github.com/thiagodasilva/swift-aio>`_ -
|
||||
* `SAIO Ansible playbook <https://github.com/thiagodasilva/ansible-saio>`_ -
|
||||
Quickly setup a standard development environment using Vagrant and Ansible in
|
||||
a Fedora virtual machine (with built-in `Swift-on-File
|
||||
<https://github.com/openstack/swiftonfile>`_ support).
|
||||
|
@ -111,11 +111,11 @@ Other
|
|||
* `Glance <https://github.com/openstack/glance>`_ - Provides services for discovering, registering, and retrieving virtual machine images (for OpenStack Compute [Nova], for example).
|
||||
* `Better Staticweb <https://github.com/CloudVPS/better-staticweb>`_ - Makes swift containers accessible by default.
|
||||
* `Django Swiftbrowser <https://github.com/cschwede/django-swiftbrowser>`_ - Simple Django web app to access OpenStack Swift.
|
||||
* `Swift-account-stats <https://github.com/enovance/swift-account-stats>`_ - Swift-account-stats is a tool to report statistics on Swift usage at tenant and global levels.
|
||||
* `Swift-account-stats <https://github.com/redhat-cip/swift-account-stats>`_ - Swift-account-stats is a tool to report statistics on Swift usage at tenant and global levels.
|
||||
* `PyECLib <https://github.com/openstack/pyeclib>`_ - High Level Erasure Code library used by Swift
|
||||
* `liberasurecode <https://github.com/openstack/liberasurecode>`_ - Low Level Erasure Code library used by PyECLib
|
||||
* `Swift Browser <https://github.com/zerovm/swift-browser>`_ - JavaScript interface for Swift
|
||||
* `Swift Browser <https://github.com/mgeisler/swift-browser>`_ - JavaScript interface for Swift
|
||||
* `swift-ui <https://github.com/fanatic/swift-ui>`_ - OpenStack Swift web browser
|
||||
* `Swift Durability Calculator <https://github.com/enovance/swift-durability-calculator>`_ - Data Durability Calculation Tool for Swift
|
||||
* `Swift Durability Calculator <https://github.com/redhat-cip/swift-durability-calculator>`_ - Data Durability Calculation Tool for Swift
|
||||
* `swiftbackmeup <https://github.com/redhat-cip/swiftbackmeup>`_ - Utility that allows one to create backups and upload them to OpenStack Swift
|
||||
* `Multi Swift <https://github.com/ntata/multi-swift-POC>`_ - Bash scripts to spin up multiple Swift clusters sharing the same hardware
|
||||
|
|
|
@ -909,6 +909,8 @@ objects_per_second 50 Maximum objects updated per second.
|
|||
system specs. 0 is unlimited.
|
||||
slowdown 0.01 Time in seconds to wait between objects.
|
||||
Deprecated in favor of objects_per_second.
|
||||
report_interval 300 Interval in seconds between logging
|
||||
statistics about the current update pass.
|
||||
recon_cache_path /var/cache/swift Path to recon cache
|
||||
nice_priority None Scheduling priority of server processes.
|
||||
Niceness values range from -20 (most
|
||||
|
|
|
@ -217,7 +217,7 @@ do the following::
|
|||
|
||||
To persist this, edit and add the following to ``/etc/fstab``::
|
||||
|
||||
/home/swift/xfs_file /tmp xfs rw,noatime,nodiratime,attr2,inode64,noquota 0 0
|
||||
/home/<your-user-name>/xfs_file /tmp xfs rw,noatime,nodiratime,attr2,inode64,noquota 0 0
|
||||
|
||||
----------------
|
||||
Getting the code
|
||||
|
|
|
@ -11,9 +11,12 @@ object from the system.
|
|||
The ``X-Delete-At`` header takes a Unix Epoch timestamp, in integer form; for
|
||||
example: ``1317070737`` represents ``Mon Sep 26 20:58:57 2011 UTC``.
|
||||
|
||||
The ``X-Delete-After`` header takes an integer number of seconds. The proxy
|
||||
server that receives the request will convert this header into an
|
||||
``X-Delete-At`` header using its current time plus the value given.
|
||||
The ``X-Delete-After`` header takes a positive integer number of seconds. The
|
||||
proxy server that receives the request will convert this header into an
|
||||
``X-Delete-At`` header using the request timestamp plus the value given.
|
||||
|
||||
If both the ``X-Delete-At`` and ``X-Delete-After`` headers are sent with a
|
||||
request then the ``X-Delete-After`` header will take precedence.
|
||||
|
||||
As expiring objects are added to the system, the object servers will record the
|
||||
expirations in a hidden ``.expiring_objects`` account for the
|
||||
|
|
|
@ -51,11 +51,10 @@
|
|||
# Instead of the project name, the project id may also be used.
|
||||
# project_id = changeme
|
||||
|
||||
# The Keystone URL to authenticate to. The value of auth_url may be
|
||||
# The Keystone URL to authenticate to. The value of auth_endpoint may be
|
||||
# set according to the value of auth_uri in [filter:authtoken] in
|
||||
# proxy-server.conf. Currently, the only supported version of the Identity API
|
||||
# is v3, which requires that the url end in "/v3".
|
||||
# auth_endpoint = http://keystonehost:5000/v3
|
||||
# proxy-server.conf.
|
||||
# auth_endpoint = http://keystonehost/identity
|
||||
|
||||
# The project and user domain names may optionally be specified. If they are
|
||||
# not specified, the default values of 'Default' (for *_domain_name) and
|
||||
|
|
|
@ -42,25 +42,34 @@
|
|||
# auto_create_account_prefix = .
|
||||
# expiring_objects_account_name = expiring_objects
|
||||
# report_interval = 300
|
||||
# concurrency is the level of concurrency o use to do the work, this value
|
||||
#
|
||||
# request_tries is the number of times the expirer's internal client will
|
||||
# attempt any given request in the event of failure. The default is 3.
|
||||
# request_tries = 3
|
||||
|
||||
# concurrency is the level of concurrency to use to do the work, this value
|
||||
# must be set to at least 1
|
||||
# concurrency = 1
|
||||
#
|
||||
# processes is how many parts to divide the work into, one part per process
|
||||
# that will be doing the work
|
||||
# that will be doing the work
|
||||
# processes set 0 means that a single process will be doing all the work
|
||||
# processes can also be specified on the command line and will override the
|
||||
# config value
|
||||
# config value
|
||||
# processes = 0
|
||||
#
|
||||
# process is which of the parts a particular process will work on
|
||||
# process can also be specified on the command line and will override the config
|
||||
# value
|
||||
# value
|
||||
# process is "zero based", if you want to use 3 processes, you should run
|
||||
# processes with process set to 0, 1, and 2
|
||||
# processes with process set to 0, 1, and 2
|
||||
# process = 0
|
||||
#
|
||||
# The expirer will re-attempt expiring if the source object is not available
|
||||
# up to reclaim_age seconds before it gives up and deletes the entry in the
|
||||
# queue.
|
||||
# reclaim_age = 604800
|
||||
#
|
||||
# recon_cache_path = /var/cache/swift
|
||||
#
|
||||
# You can set scheduling priority of processes. Niceness values range from -20
|
||||
|
@ -74,6 +83,11 @@
|
|||
# ionice_class =
|
||||
# ionice_priority =
|
||||
|
||||
#
|
||||
# The following sections define the configuration of the expirer's internal
|
||||
# client pipeline
|
||||
#
|
||||
|
||||
[pipeline:main]
|
||||
pipeline = catch_errors proxy-logging cache proxy-server
|
||||
|
||||
|
|
|
@ -372,6 +372,12 @@ use = egg:swift#recon
|
|||
# objects_per_second instead.
|
||||
# slowdown = 0.01
|
||||
#
|
||||
# Log stats (at INFO level) every report_interval seconds. This
|
||||
# logging is per-process, so with concurrency > 1, the logs will
|
||||
# contain one stats log per worker process every report_interval
|
||||
# seconds.
|
||||
# report_interval = 300
|
||||
#
|
||||
# recon_cache_path = /var/cache/swift
|
||||
#
|
||||
# You can set scheduling priority of processes. Niceness values range from -20
|
||||
|
|
|
@ -66,7 +66,7 @@ scripts =
|
|||
[extras]
|
||||
kms_keymaster =
|
||||
oslo.config>=4.0.0,!=4.3.0,!=4.4.0 # Apache-2.0
|
||||
castellan>=0.7.0 # Apache-2.0
|
||||
castellan>=0.13.0 # Apache-2.0
|
||||
|
||||
[entry_points]
|
||||
paste.app_factory =
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
from __future__ import print_function
|
||||
import logging
|
||||
|
||||
from collections import defaultdict
|
||||
from errno import EEXIST
|
||||
from itertools import islice
|
||||
from operator import itemgetter
|
||||
|
@ -471,18 +472,18 @@ swift-ring-builder <builder_file>
|
|||
builder_id = "(not assigned)"
|
||||
print('%s, build version %d, id %s' %
|
||||
(builder_file, builder.version, builder_id))
|
||||
regions = 0
|
||||
zones = 0
|
||||
balance = 0
|
||||
dev_count = 0
|
||||
if builder.devs:
|
||||
regions = len(set(d['region'] for d in builder.devs
|
||||
if d is not None))
|
||||
zones = len(set((d['region'], d['zone']) for d in builder.devs
|
||||
if d is not None))
|
||||
dev_count = len([dev for dev in builder.devs
|
||||
if dev is not None])
|
||||
ring_empty_error = None
|
||||
regions = len(set(d['region'] for d in builder.devs
|
||||
if d is not None))
|
||||
zones = len(set((d['region'], d['zone']) for d in builder.devs
|
||||
if d is not None))
|
||||
dev_count = len([dev for dev in builder.devs
|
||||
if dev is not None])
|
||||
try:
|
||||
balance = builder.get_balance()
|
||||
except exceptions.EmptyRingError as e:
|
||||
ring_empty_error = str(e)
|
||||
dispersion_trailer = '' if builder.dispersion is None else (
|
||||
', %.02f dispersion' % (builder.dispersion))
|
||||
print('%d partitions, %.6f replicas, %d regions, %d zones, '
|
||||
|
@ -514,16 +515,18 @@ swift-ring-builder <builder_file>
|
|||
else:
|
||||
print('Ring file %s is obsolete' % ring_file)
|
||||
|
||||
if builder.devs:
|
||||
if ring_empty_error:
|
||||
balance_per_dev = defaultdict(int)
|
||||
else:
|
||||
balance_per_dev = builder._build_balance_per_dev()
|
||||
header_line, print_dev_f = _make_display_device_table(builder)
|
||||
print(header_line)
|
||||
for dev in sorted(
|
||||
builder._iter_devs(),
|
||||
key=lambda x: (x['region'], x['zone'], x['ip'], x['device'])
|
||||
):
|
||||
flags = 'DEL' if dev in builder._remove_devs else ''
|
||||
print_dev_f(dev, balance_per_dev[dev['id']], flags)
|
||||
header_line, print_dev_f = _make_display_device_table(builder)
|
||||
print(header_line)
|
||||
for dev in sorted(
|
||||
builder._iter_devs(),
|
||||
key=lambda x: (x['region'], x['zone'], x['ip'], x['device'])
|
||||
):
|
||||
flags = 'DEL' if dev in builder._remove_devs else ''
|
||||
print_dev_f(dev, balance_per_dev[dev['id']], flags)
|
||||
|
||||
# Print some helpful info if partition power increase in progress
|
||||
if (builder.next_part_power and
|
||||
|
@ -542,6 +545,8 @@ swift-ring-builder <builder_file>
|
|||
print('Run "swift-object-relinker cleanup" on all nodes before '
|
||||
'moving on to finish_increase_partition_power.')
|
||||
|
||||
if ring_empty_error:
|
||||
print(ring_empty_error)
|
||||
exit(EXIT_SUCCESS)
|
||||
|
||||
@staticmethod
|
||||
|
@ -938,7 +943,8 @@ swift-ring-builder <builder_file> rebalance [options]
|
|||
balance_changed = (
|
||||
abs(last_balance - balance) >= 1 or
|
||||
(last_balance == MAX_BALANCE and balance == MAX_BALANCE))
|
||||
dispersion_changed = abs(last_dispersion - dispersion) >= 1
|
||||
dispersion_changed = last_dispersion is None or (
|
||||
abs(last_dispersion - dispersion) >= 1)
|
||||
if balance_changed or dispersion_changed:
|
||||
be_cowardly = False
|
||||
|
||||
|
@ -997,6 +1003,7 @@ swift-ring-builder <builder_file> dispersion <search_filter> [options]
|
|||
|
||||
Output report on dispersion.
|
||||
|
||||
--recalculate option will rebuild cached dispersion info and save builder
|
||||
--verbose option will display dispersion graph broken down by tier
|
||||
|
||||
You can filter which tiers are evaluated to drill down using a regex
|
||||
|
@ -1035,6 +1042,8 @@ swift-ring-builder <builder_file> dispersion <search_filter> [options]
|
|||
exit(EXIT_ERROR)
|
||||
usage = Commands.dispersion.__doc__.strip()
|
||||
parser = optparse.OptionParser(usage)
|
||||
parser.add_option('--recalculate', action='store_true',
|
||||
help='Rebuild cached dispersion info and save')
|
||||
parser.add_option('-v', '--verbose', action='store_true',
|
||||
help='Display dispersion report for tiers')
|
||||
options, args = parser.parse_args(argv)
|
||||
|
@ -1042,8 +1051,13 @@ swift-ring-builder <builder_file> dispersion <search_filter> [options]
|
|||
search_filter = args[3]
|
||||
else:
|
||||
search_filter = None
|
||||
orig_version = builder.version
|
||||
report = dispersion_report(builder, search_filter=search_filter,
|
||||
verbose=options.verbose)
|
||||
verbose=options.verbose,
|
||||
recalculate=options.recalculate)
|
||||
if builder.version != orig_version:
|
||||
# we've already done the work, better go ahead and save it!
|
||||
builder.save(builder_file)
|
||||
print('Dispersion is %.06f, Balance is %.06f, Overload is %0.2f%%' % (
|
||||
builder.dispersion, builder.get_balance(), builder.overload * 100))
|
||||
print('Required overload is %.6f%%' % (
|
||||
|
|
|
@ -16,7 +16,6 @@
|
|||
import functools
|
||||
import os
|
||||
from os.path import isdir # tighter scoped import for mocking
|
||||
import time
|
||||
|
||||
import six
|
||||
from six.moves.configparser import ConfigParser, NoSectionError, NoOptionError
|
||||
|
@ -303,15 +302,19 @@ def valid_timestamp(request):
|
|||
|
||||
def check_delete_headers(request):
|
||||
"""
|
||||
Validate if 'x-delete' headers are have correct values
|
||||
values should be positive integers and correspond to
|
||||
a time in the future.
|
||||
Check that 'x-delete-after' and 'x-delete-at' headers have valid values.
|
||||
Values should be positive integers and correspond to a time greater than
|
||||
the request timestamp.
|
||||
|
||||
If the 'x-delete-after' header is found then its value is used to compute
|
||||
an 'x-delete-at' value which takes precedence over any existing
|
||||
'x-delete-at' header.
|
||||
|
||||
:param request: the swob request object
|
||||
|
||||
:returns: HTTPBadRequest in case of invalid values
|
||||
or None if values are ok
|
||||
:raises: HTTPBadRequest in case of invalid values
|
||||
:returns: the swob request object
|
||||
"""
|
||||
now = float(valid_timestamp(request))
|
||||
if 'x-delete-after' in request.headers:
|
||||
try:
|
||||
x_delete_after = int(request.headers['x-delete-after'])
|
||||
|
@ -319,13 +322,14 @@ def check_delete_headers(request):
|
|||
raise HTTPBadRequest(request=request,
|
||||
content_type='text/plain',
|
||||
body='Non-integer X-Delete-After')
|
||||
actual_del_time = time.time() + x_delete_after
|
||||
if actual_del_time < time.time():
|
||||
actual_del_time = utils.normalize_delete_at_timestamp(
|
||||
now + x_delete_after)
|
||||
if int(actual_del_time) <= now:
|
||||
raise HTTPBadRequest(request=request,
|
||||
content_type='text/plain',
|
||||
body='X-Delete-After in past')
|
||||
request.headers['x-delete-at'] = utils.normalize_delete_at_timestamp(
|
||||
actual_del_time)
|
||||
request.headers['x-delete-at'] = actual_del_time
|
||||
del request.headers['x-delete-after']
|
||||
|
||||
if 'x-delete-at' in request.headers:
|
||||
try:
|
||||
|
@ -335,7 +339,7 @@ def check_delete_headers(request):
|
|||
raise HTTPBadRequest(request=request, content_type='text/plain',
|
||||
body='Non-integer X-Delete-At')
|
||||
|
||||
if x_delete_at < time.time() and not utils.config_true_value(
|
||||
if x_delete_at <= now and not utils.config_true_value(
|
||||
request.headers.get('x-backend-replication', 'f')):
|
||||
raise HTTPBadRequest(request=request, content_type='text/plain',
|
||||
body='X-Delete-At in past')
|
||||
|
|
|
@ -22,14 +22,15 @@ from six.moves import urllib
|
|||
import struct
|
||||
from sys import exc_info, exit
|
||||
import zlib
|
||||
from swift import gettext_ as _
|
||||
from time import gmtime, strftime, time
|
||||
from zlib import compressobj
|
||||
|
||||
from swift.common.exceptions import ClientException
|
||||
from swift.common.http import HTTP_NOT_FOUND, HTTP_MULTIPLE_CHOICES
|
||||
from swift.common.http import HTTP_NOT_FOUND, HTTP_MULTIPLE_CHOICES, \
|
||||
HTTP_CONFLICT
|
||||
from swift.common.swob import Request
|
||||
from swift.common.utils import quote
|
||||
from swift.common.utils import quote, closing_if_possible, \
|
||||
server_handled_successfully
|
||||
from swift.common.wsgi import loadapp, pipeline_property
|
||||
|
||||
if six.PY3:
|
||||
|
@ -192,17 +193,38 @@ class InternalClient(object):
|
|||
req.params = params
|
||||
try:
|
||||
resp = req.get_response(self.app)
|
||||
except (Exception, Timeout):
|
||||
exc_type, exc_value, exc_traceback = exc_info()
|
||||
else:
|
||||
if resp.status_int in acceptable_statuses or \
|
||||
resp.status_int // 100 in acceptable_statuses:
|
||||
return resp
|
||||
except (Exception, Timeout):
|
||||
exc_type, exc_value, exc_traceback = exc_info()
|
||||
elif server_handled_successfully(resp.status_int):
|
||||
# No sense retrying when we expect the same result
|
||||
break
|
||||
elif resp.status_int == HTTP_CONFLICT and 'x-timestamp' in [
|
||||
header.lower() for header in headers]:
|
||||
# Since the caller provided the timestamp, retrying won't
|
||||
# change the result
|
||||
break
|
||||
# sleep only between tries, not after each one
|
||||
if attempt < self.request_tries - 1:
|
||||
if resp:
|
||||
# always close any resp.app_iter before we discard it
|
||||
with closing_if_possible(resp.app_iter):
|
||||
# for non 2XX requests it's safe and useful to drain
|
||||
# the response body so we log the correct status code
|
||||
if resp.status_int // 100 != 2:
|
||||
for iter_body in resp.app_iter:
|
||||
pass
|
||||
sleep(2 ** (attempt + 1))
|
||||
if resp:
|
||||
raise UnexpectedResponse(
|
||||
_('Unexpected response: %s') % resp.status, resp)
|
||||
msg = 'Unexpected response: %s' % resp.status
|
||||
if resp.status_int // 100 != 2 and resp.body:
|
||||
# provide additional context (and drain the response body) for
|
||||
# non 2XX responses
|
||||
msg += ' (%s)' % resp.body
|
||||
raise UnexpectedResponse(msg, resp)
|
||||
if exc_type:
|
||||
# To make pep8 tool happy, in place of raise t, v, tb:
|
||||
six.reraise(exc_type(*exc_value.args), None, exc_traceback)
|
||||
|
@ -241,7 +263,7 @@ class InternalClient(object):
|
|||
return metadata
|
||||
|
||||
def _iter_items(
|
||||
self, path, marker='', end_marker='',
|
||||
self, path, marker='', end_marker='', prefix='',
|
||||
acceptable_statuses=(2, HTTP_NOT_FOUND)):
|
||||
"""
|
||||
Returns an iterator of items from a json listing. Assumes listing has
|
||||
|
@ -251,6 +273,7 @@ class InternalClient(object):
|
|||
:param marker: Prefix of first desired item, defaults to ''.
|
||||
:param end_marker: Last item returned will be 'less' than this,
|
||||
defaults to ''.
|
||||
:param prefix: Prefix of items
|
||||
:param acceptable_statuses: List of status for valid responses,
|
||||
defaults to (2, HTTP_NOT_FOUND).
|
||||
|
||||
|
@ -262,8 +285,8 @@ class InternalClient(object):
|
|||
|
||||
while True:
|
||||
resp = self.make_request(
|
||||
'GET', '%s?format=json&marker=%s&end_marker=%s' %
|
||||
(path, quote(marker), quote(end_marker)),
|
||||
'GET', '%s?format=json&marker=%s&end_marker=%s&prefix=%s' %
|
||||
(path, quote(marker), quote(end_marker), quote(prefix)),
|
||||
{}, acceptable_statuses)
|
||||
if not resp.status_int == 200:
|
||||
if resp.status_int >= HTTP_MULTIPLE_CHOICES:
|
||||
|
@ -331,7 +354,7 @@ class InternalClient(object):
|
|||
# account methods
|
||||
|
||||
def iter_containers(
|
||||
self, account, marker='', end_marker='',
|
||||
self, account, marker='', end_marker='', prefix='',
|
||||
acceptable_statuses=(2, HTTP_NOT_FOUND)):
|
||||
"""
|
||||
Returns an iterator of containers dicts from an account.
|
||||
|
@ -340,6 +363,7 @@ class InternalClient(object):
|
|||
:param marker: Prefix of first desired item, defaults to ''.
|
||||
:param end_marker: Last item returned will be 'less' than this,
|
||||
defaults to ''.
|
||||
:param prefix: Prefix of containers
|
||||
:param acceptable_statuses: List of status for valid responses,
|
||||
defaults to (2, HTTP_NOT_FOUND).
|
||||
|
||||
|
@ -350,7 +374,8 @@ class InternalClient(object):
|
|||
"""
|
||||
|
||||
path = self.make_path(account)
|
||||
return self._iter_items(path, marker, end_marker, acceptable_statuses)
|
||||
return self._iter_items(path, marker, end_marker, prefix,
|
||||
acceptable_statuses)
|
||||
|
||||
def get_account_info(
|
||||
self, account, acceptable_statuses=(2, HTTP_NOT_FOUND)):
|
||||
|
@ -508,7 +533,7 @@ class InternalClient(object):
|
|||
return self._get_metadata(path, metadata_prefix, acceptable_statuses)
|
||||
|
||||
def iter_objects(
|
||||
self, account, container, marker='', end_marker='',
|
||||
self, account, container, marker='', end_marker='', prefix='',
|
||||
acceptable_statuses=(2, HTTP_NOT_FOUND)):
|
||||
"""
|
||||
Returns an iterator of object dicts from a container.
|
||||
|
@ -518,6 +543,7 @@ class InternalClient(object):
|
|||
:param marker: Prefix of first desired item, defaults to ''.
|
||||
:param end_marker: Last item returned will be 'less' than this,
|
||||
defaults to ''.
|
||||
:param prefix: Prefix of objects
|
||||
:param acceptable_statuses: List of status for valid responses,
|
||||
defaults to (2, HTTP_NOT_FOUND).
|
||||
|
||||
|
@ -528,7 +554,8 @@ class InternalClient(object):
|
|||
"""
|
||||
|
||||
path = self.make_path(account, container)
|
||||
return self._iter_items(path, marker, end_marker, acceptable_statuses)
|
||||
return self._iter_items(path, marker, end_marker, prefix,
|
||||
acceptable_statuses)
|
||||
|
||||
def set_container_metadata(
|
||||
self, account, container, metadata, metadata_prefix='',
|
||||
|
|
|
@ -59,6 +59,7 @@ class KmsKeyMaster(KeyMaster):
|
|||
set(keymaster_opts).intersection(conf))))
|
||||
conf = readconf(self.keymaster_config_path, 'kms_keymaster')
|
||||
ctxt = keystone_password.KeystonePassword(
|
||||
auth_url=conf.get('auth_endpoint'),
|
||||
username=conf.get('username'),
|
||||
password=conf.get('password'),
|
||||
project_name=conf.get('project_name'),
|
||||
|
|
|
@ -876,8 +876,6 @@ class SloGetContext(WSGIContext):
|
|||
headers=response_headers,
|
||||
conditional_response=True,
|
||||
app_iter=segmented_iter)
|
||||
if req.range:
|
||||
response.headers.pop('Etag')
|
||||
return response
|
||||
|
||||
|
||||
|
|
|
@ -35,7 +35,7 @@ from time import time
|
|||
from swift.common import exceptions
|
||||
from swift.common.ring import RingData
|
||||
from swift.common.ring.utils import tiers_for_dev, build_tier_tree, \
|
||||
validate_and_normalize_address, pretty_dev
|
||||
validate_and_normalize_address, validate_replicas_by_tier, pretty_dev
|
||||
|
||||
# we can't store None's in the replica2part2dev array, so we high-jack
|
||||
# the max value for magic to represent the part is not currently
|
||||
|
@ -247,7 +247,11 @@ class RingBuilder(object):
|
|||
self.version = builder.version
|
||||
self._replica2part2dev = builder._replica2part2dev
|
||||
self._last_part_moves_epoch = builder._last_part_moves_epoch
|
||||
self._last_part_moves = builder._last_part_moves
|
||||
if builder._last_part_moves is None:
|
||||
self._last_part_moves = array(
|
||||
'B', itertools.repeat(0, self.parts))
|
||||
else:
|
||||
self._last_part_moves = builder._last_part_moves
|
||||
self._last_part_gather_start = builder._last_part_gather_start
|
||||
self._remove_devs = builder._remove_devs
|
||||
self._id = getattr(builder, '_id', None)
|
||||
|
@ -263,7 +267,11 @@ class RingBuilder(object):
|
|||
self.version = builder['version']
|
||||
self._replica2part2dev = builder['_replica2part2dev']
|
||||
self._last_part_moves_epoch = builder['_last_part_moves_epoch']
|
||||
self._last_part_moves = builder['_last_part_moves']
|
||||
if builder['_last_part_moves'] is None:
|
||||
self._last_part_moves = array(
|
||||
'B', itertools.repeat(0, self.parts))
|
||||
else:
|
||||
self._last_part_moves = builder['_last_part_moves']
|
||||
self._last_part_gather_start = builder['_last_part_gather_start']
|
||||
self._dispersion_graph = builder.get('_dispersion_graph', {})
|
||||
self.dispersion = builder.get('dispersion')
|
||||
|
@ -555,7 +563,6 @@ class RingBuilder(object):
|
|||
{'status': finish_status, 'count': gather_count + 1})
|
||||
|
||||
self.devs_changed = False
|
||||
self.version += 1
|
||||
changed_parts = self._build_dispersion_graph(old_replica2part2dev)
|
||||
|
||||
# clean up the cache
|
||||
|
@ -623,22 +630,23 @@ class RingBuilder(object):
|
|||
|
||||
if old_device != dev['id']:
|
||||
changed_parts += 1
|
||||
part_at_risk = False
|
||||
# update running totals for each tiers' number of parts with a
|
||||
# given replica count
|
||||
part_risk_depth = defaultdict(int)
|
||||
part_risk_depth[0] = 0
|
||||
for tier, replicas in replicas_at_tier.items():
|
||||
if tier not in dispersion_graph:
|
||||
dispersion_graph[tier] = [self.parts] + [0] * int_replicas
|
||||
dispersion_graph[tier][0] -= 1
|
||||
dispersion_graph[tier][replicas] += 1
|
||||
if replicas > max_allowed_replicas[tier]:
|
||||
part_at_risk = True
|
||||
# this part may be at risk in multiple tiers, but we only count it
|
||||
# as at_risk once
|
||||
if part_at_risk:
|
||||
parts_at_risk += 1
|
||||
part_risk_depth[len(tier)] += (
|
||||
replicas - max_allowed_replicas[tier])
|
||||
# count each part-replica once at tier where dispersion is worst
|
||||
parts_at_risk += max(part_risk_depth.values())
|
||||
self._dispersion_graph = dispersion_graph
|
||||
self.dispersion = 100.0 * parts_at_risk / self.parts
|
||||
self.dispersion = 100.0 * parts_at_risk / (self.parts * self.replicas)
|
||||
self.version += 1
|
||||
return changed_parts
|
||||
|
||||
def validate(self, stats=False):
|
||||
|
@ -1475,14 +1483,7 @@ class RingBuilder(object):
|
|||
# belts & suspenders/paranoia - at every level, the sum of
|
||||
# weighted_replicas should be very close to the total number of
|
||||
# replicas for the ring
|
||||
tiers = ['cluster', 'regions', 'zones', 'servers', 'devices']
|
||||
for i, tier_name in enumerate(tiers):
|
||||
replicas_at_tier = sum(weighted_replicas_by_tier[t] for t in
|
||||
weighted_replicas_by_tier if len(t) == i)
|
||||
if abs(self.replicas - replicas_at_tier) > 1e-10:
|
||||
raise exceptions.RingValidationError(
|
||||
'%s != %s at tier %s' % (
|
||||
replicas_at_tier, self.replicas, tier_name))
|
||||
validate_replicas_by_tier(self.replicas, weighted_replicas_by_tier)
|
||||
|
||||
return weighted_replicas_by_tier
|
||||
|
||||
|
@ -1585,14 +1586,7 @@ class RingBuilder(object):
|
|||
# belts & suspenders/paranoia - at every level, the sum of
|
||||
# wanted_replicas should be very close to the total number of
|
||||
# replicas for the ring
|
||||
tiers = ['cluster', 'regions', 'zones', 'servers', 'devices']
|
||||
for i, tier_name in enumerate(tiers):
|
||||
replicas_at_tier = sum(wanted_replicas[t] for t in
|
||||
wanted_replicas if len(t) == i)
|
||||
if abs(self.replicas - replicas_at_tier) > 1e-10:
|
||||
raise exceptions.RingValidationError(
|
||||
'%s != %s at tier %s' % (
|
||||
replicas_at_tier, self.replicas, tier_name))
|
||||
validate_replicas_by_tier(self.replicas, wanted_replicas)
|
||||
|
||||
return wanted_replicas
|
||||
|
||||
|
@ -1621,14 +1615,7 @@ class RingBuilder(object):
|
|||
# belts & suspenders/paranoia - at every level, the sum of
|
||||
# target_replicas should be very close to the total number
|
||||
# of replicas for the ring
|
||||
tiers = ['cluster', 'regions', 'zones', 'servers', 'devices']
|
||||
for i, tier_name in enumerate(tiers):
|
||||
replicas_at_tier = sum(target_replicas[t] for t in
|
||||
target_replicas if len(t) == i)
|
||||
if abs(self.replicas - replicas_at_tier) > 1e-10:
|
||||
raise exceptions.RingValidationError(
|
||||
'%s != %s at tier %s' % (
|
||||
replicas_at_tier, self.replicas, tier_name))
|
||||
validate_replicas_by_tier(self.replicas, target_replicas)
|
||||
|
||||
return target_replicas
|
||||
|
||||
|
|
|
@ -17,6 +17,7 @@ import optparse
|
|||
import re
|
||||
import socket
|
||||
|
||||
from swift.common import exceptions
|
||||
from swift.common.utils import expand_ipv6, is_valid_ip, is_valid_ipv4, \
|
||||
is_valid_ipv6
|
||||
|
||||
|
@ -606,8 +607,9 @@ def build_dev_from_opts(opts):
|
|||
'replication_port': replication_port, 'weight': opts.weight}
|
||||
|
||||
|
||||
def dispersion_report(builder, search_filter=None, verbose=False):
|
||||
if not builder._dispersion_graph:
|
||||
def dispersion_report(builder, search_filter=None,
|
||||
verbose=False, recalculate=False):
|
||||
if recalculate or not builder._dispersion_graph:
|
||||
builder._build_dispersion_graph()
|
||||
max_allowed_replicas = builder._build_max_replicas_by_tier()
|
||||
worst_tier = None
|
||||
|
@ -618,8 +620,11 @@ def dispersion_report(builder, search_filter=None, verbose=False):
|
|||
if search_filter and not re.match(search_filter, tier_name):
|
||||
continue
|
||||
max_replicas = int(max_allowed_replicas[tier])
|
||||
at_risk_parts = sum(replica_counts[max_replicas + 1:])
|
||||
placed_parts = sum(replica_counts[1:])
|
||||
at_risk_parts = sum(replica_counts[i] * (i - max_replicas)
|
||||
for i in range(max_replicas + 1,
|
||||
len(replica_counts)))
|
||||
placed_parts = sum(replica_counts[i] * i for i in range(
|
||||
1, len(replica_counts)))
|
||||
tier_dispersion = 100.0 * at_risk_parts / placed_parts
|
||||
if tier_dispersion > max_dispersion:
|
||||
max_dispersion = tier_dispersion
|
||||
|
@ -642,6 +647,25 @@ def dispersion_report(builder, search_filter=None, verbose=False):
|
|||
}
|
||||
|
||||
|
||||
def validate_replicas_by_tier(replicas, replicas_by_tier):
|
||||
"""
|
||||
Validate the sum of the replicas at each tier.
|
||||
The sum of the replicas at each tier should be less than or very close to
|
||||
the upper limit indicated by replicas
|
||||
|
||||
:param replicas: float,the upper limit of replicas
|
||||
:param replicas_by_tier: defaultdict,the replicas by tier
|
||||
"""
|
||||
tiers = ['cluster', 'regions', 'zones', 'servers', 'devices']
|
||||
for i, tier_name in enumerate(tiers):
|
||||
replicas_at_tier = sum(replicas_by_tier[t] for t in
|
||||
replicas_by_tier if len(t) == i)
|
||||
if abs(replicas - replicas_at_tier) > 1e-10:
|
||||
raise exceptions.RingValidationError(
|
||||
'%s != %s at tier %s' % (
|
||||
replicas_at_tier, replicas, tier_name))
|
||||
|
||||
|
||||
def format_device(region=None, zone=None, ip=None, device=None, **kwargs):
|
||||
"""
|
||||
Convert device dict or tier attributes to a representative string.
|
||||
|
|
|
@ -272,12 +272,18 @@ class ObjectExpirer(Daemon):
|
|||
self.pop_queue(container, obj)
|
||||
self.report_objects += 1
|
||||
self.logger.increment('objects')
|
||||
except UnexpectedResponse as err:
|
||||
self.logger.increment('errors')
|
||||
self.logger.error(
|
||||
'Unexpected response while deleting object %(container)s '
|
||||
'%(obj)s: %(err)s' % {'container': container, 'obj': obj,
|
||||
'err': str(err.resp.status_int)})
|
||||
except (Exception, Timeout) as err:
|
||||
self.logger.increment('errors')
|
||||
self.logger.exception(
|
||||
_('Exception while deleting object %(container)s %(obj)s'
|
||||
' %(err)s') % {'container': container,
|
||||
'obj': obj, 'err': str(err)})
|
||||
'Exception while deleting object %(container)s %(obj)s'
|
||||
' %(err)s' % {'container': container,
|
||||
'obj': obj, 'err': str(err)})
|
||||
self.logger.timing_since('timing', start_time)
|
||||
self.report()
|
||||
|
||||
|
@ -302,7 +308,8 @@ class ObjectExpirer(Daemon):
|
|||
perform the actual delete.
|
||||
"""
|
||||
path = '/v1/' + urllib.parse.quote(actual_obj.lstrip('/'))
|
||||
self.swift.make_request('DELETE', path,
|
||||
{'X-If-Delete-At': str(timestamp),
|
||||
'X-Timestamp': str(timestamp)},
|
||||
(2,))
|
||||
self.swift.make_request(
|
||||
'DELETE', path,
|
||||
{'X-If-Delete-At': str(timestamp), 'X-Timestamp': str(timestamp),
|
||||
'X-Backend-Clean-Expiring-Object-Queue': 'no'},
|
||||
(2,))
|
||||
|
|
|
@ -418,19 +418,26 @@ class ObjectController(BaseStorageServer):
|
|||
'x-trans-id': headers_in.get('x-trans-id', '-'),
|
||||
'referer': request.as_referer()})
|
||||
if op != 'DELETE':
|
||||
hosts = headers_in.get('X-Delete-At-Host', None)
|
||||
if hosts is None:
|
||||
# If header is missing, no update needed as sufficient other
|
||||
# object servers should perform the required update.
|
||||
return
|
||||
delete_at_container = headers_in.get('X-Delete-At-Container', None)
|
||||
if not delete_at_container:
|
||||
# older proxy servers did not send X-Delete-At-Container so for
|
||||
# backwards compatibility calculate the value here, but also
|
||||
# log a warning because this is prone to inconsistent
|
||||
# expiring_objects_container_divisor configurations.
|
||||
# See https://bugs.launchpad.net/swift/+bug/1187200
|
||||
self.logger.warning(
|
||||
'X-Delete-At-Container header must be specified for '
|
||||
'expiring objects background %s to work properly. Making '
|
||||
'best guess as to the container name for now.' % op)
|
||||
# TODO(gholt): In a future release, change the above warning to
|
||||
# a raised exception and remove the guess code below.
|
||||
delete_at_container = get_expirer_container(
|
||||
delete_at, self.expiring_objects_container_divisor,
|
||||
account, container, obj)
|
||||
partition = headers_in.get('X-Delete-At-Partition', None)
|
||||
hosts = headers_in.get('X-Delete-At-Host', '')
|
||||
contdevices = headers_in.get('X-Delete-At-Device', '')
|
||||
updates = [upd for upd in
|
||||
zip((h.strip() for h in hosts.split(',')),
|
||||
|
@ -442,6 +449,11 @@ class ObjectController(BaseStorageServer):
|
|||
headers_out['x-content-type'] = 'text/plain'
|
||||
headers_out['x-etag'] = 'd41d8cd98f00b204e9800998ecf8427e'
|
||||
else:
|
||||
if not config_true_value(
|
||||
request.headers.get(
|
||||
'X-Backend-Clean-Expiring-Object-Queue', 't')):
|
||||
return
|
||||
|
||||
# DELETEs of old expiration data have no way of knowing what the
|
||||
# old X-Delete-At-Container was at the time of the initial setting
|
||||
# of the data, so a best guess is made here.
|
||||
|
|
|
@ -38,6 +38,47 @@ from swift.common.http import is_success, HTTP_INTERNAL_SERVER_ERROR, \
|
|||
HTTP_MOVED_PERMANENTLY
|
||||
|
||||
|
||||
class SweepStats(object):
|
||||
"""
|
||||
Stats bucket for an update sweep
|
||||
"""
|
||||
def __init__(self, errors=0, failures=0, quarantines=0, successes=0,
|
||||
unlinks=0):
|
||||
self.errors = errors
|
||||
self.failures = failures
|
||||
self.quarantines = quarantines
|
||||
self.successes = successes
|
||||
self.unlinks = unlinks
|
||||
|
||||
def copy(self):
|
||||
return type(self)(self.errors, self.failures, self.quarantines,
|
||||
self.successes, self.unlinks)
|
||||
|
||||
def since(self, other):
|
||||
return type(self)(self.errors - other.errors,
|
||||
self.failures - other.failures,
|
||||
self.quarantines - other.quarantines,
|
||||
self.successes - other.successes,
|
||||
self.unlinks - other.unlinks)
|
||||
|
||||
def reset(self):
|
||||
self.errors = 0
|
||||
self.failures = 0
|
||||
self.quarantines = 0
|
||||
self.successes = 0
|
||||
self.unlinks = 0
|
||||
|
||||
def __str__(self):
|
||||
keys = (
|
||||
(self.successes, 'successes'),
|
||||
(self.failures, 'failures'),
|
||||
(self.quarantines, 'quarantines'),
|
||||
(self.unlinks, 'unlinks'),
|
||||
(self.errors, 'errors'),
|
||||
)
|
||||
return ', '.join('%d %s' % pair for pair in keys)
|
||||
|
||||
|
||||
class ObjectUpdater(Daemon):
|
||||
"""Update object information in container listings."""
|
||||
|
||||
|
@ -65,16 +106,18 @@ class ObjectUpdater(Daemon):
|
|||
objects_per_second))
|
||||
self.node_timeout = float(conf.get('node_timeout', 10))
|
||||
self.conn_timeout = float(conf.get('conn_timeout', 0.5))
|
||||
self.successes = 0
|
||||
self.failures = 0
|
||||
self.report_interval = float(conf.get('report_interval', 300))
|
||||
self.recon_cache_path = conf.get('recon_cache_path',
|
||||
'/var/cache/swift')
|
||||
self.rcache = os.path.join(self.recon_cache_path, 'object.recon')
|
||||
self.stats = SweepStats()
|
||||
|
||||
def _listdir(self, path):
|
||||
try:
|
||||
return os.listdir(path)
|
||||
except OSError as e:
|
||||
self.stats.errors += 1
|
||||
self.logger.increment('errors')
|
||||
self.logger.error(_('ERROR: Unable to access %(path)s: '
|
||||
'%(error)s') %
|
||||
{'path': path, 'error': e})
|
||||
|
@ -97,7 +140,9 @@ class ObjectUpdater(Daemon):
|
|||
self.get_container_ring().get_nodes('')
|
||||
for device in self._listdir(self.devices):
|
||||
if not check_drive(self.devices, device, self.mount_check):
|
||||
self.logger.increment('errors')
|
||||
# We don't count this as an error. The occasional
|
||||
# unmounted drive is part of normal cluster operations,
|
||||
# so a simple warning is sufficient.
|
||||
self.logger.warning(
|
||||
_('Skipping %s as it is not mounted'), device)
|
||||
continue
|
||||
|
@ -109,17 +154,15 @@ class ObjectUpdater(Daemon):
|
|||
else:
|
||||
signal.signal(signal.SIGTERM, signal.SIG_DFL)
|
||||
eventlet_monkey_patch()
|
||||
self.successes = 0
|
||||
self.failures = 0
|
||||
self.stats.reset()
|
||||
forkbegin = time.time()
|
||||
self.object_sweep(os.path.join(self.devices, device))
|
||||
elapsed = time.time() - forkbegin
|
||||
self.logger.info(
|
||||
_('Object update sweep of %(device)s'
|
||||
' completed: %(elapsed).02fs, %(success)s successes'
|
||||
', %(fail)s failures'),
|
||||
('Object update sweep of %(device)s '
|
||||
'completed: %(elapsed).02fs, %(stats)s'),
|
||||
{'device': device, 'elapsed': elapsed,
|
||||
'success': self.successes, 'fail': self.failures})
|
||||
'stats': self.stats})
|
||||
sys.exit()
|
||||
while pids:
|
||||
pids.remove(os.wait()[0])
|
||||
|
@ -135,21 +178,21 @@ class ObjectUpdater(Daemon):
|
|||
"""Run the updater once."""
|
||||
self.logger.info(_('Begin object update single threaded sweep'))
|
||||
begin = time.time()
|
||||
self.successes = 0
|
||||
self.failures = 0
|
||||
self.stats.reset()
|
||||
for device in self._listdir(self.devices):
|
||||
if not check_drive(self.devices, device, self.mount_check):
|
||||
self.logger.increment('errors')
|
||||
# We don't count this as an error. The occasional unmounted
|
||||
# drive is part of normal cluster operations, so a simple
|
||||
# warning is sufficient.
|
||||
self.logger.warning(
|
||||
_('Skipping %s as it is not mounted'), device)
|
||||
continue
|
||||
self.object_sweep(os.path.join(self.devices, device))
|
||||
elapsed = time.time() - begin
|
||||
self.logger.info(
|
||||
_('Object update single threaded sweep completed: '
|
||||
'%(elapsed).02fs, %(success)s successes, %(fail)s failures'),
|
||||
{'elapsed': elapsed, 'success': self.successes,
|
||||
'fail': self.failures})
|
||||
('Object update single-threaded sweep completed: '
|
||||
'%(elapsed).02fs, %(stats)s'),
|
||||
{'elapsed': elapsed, 'stats': self.stats})
|
||||
dump_recon_cache({'object_updater_sweep': elapsed},
|
||||
self.rcache, self.logger)
|
||||
|
||||
|
@ -160,6 +203,12 @@ class ObjectUpdater(Daemon):
|
|||
:param device: path to device
|
||||
"""
|
||||
start_time = time.time()
|
||||
last_status_update = start_time
|
||||
start_stats = self.stats.copy()
|
||||
my_pid = os.getpid()
|
||||
self.logger.info("Object update sweep starting on %s (pid: %d)",
|
||||
device, my_pid)
|
||||
|
||||
# loop through async pending dirs for all policies
|
||||
for asyncdir in self._listdir(device):
|
||||
# we only care about directories
|
||||
|
@ -172,6 +221,8 @@ class ObjectUpdater(Daemon):
|
|||
try:
|
||||
base, policy = split_policy_string(asyncdir)
|
||||
except PolicyError as e:
|
||||
# This isn't an error, but a misconfiguration. Logging a
|
||||
# warning should be sufficient.
|
||||
self.logger.warning(_('Directory %(directory)r does not map '
|
||||
'to a valid policy (%(error)s)') % {
|
||||
'directory': asyncdir, 'error': e})
|
||||
|
@ -188,6 +239,7 @@ class ObjectUpdater(Daemon):
|
|||
try:
|
||||
obj_hash, timestamp = update.split('-')
|
||||
except ValueError:
|
||||
self.stats.errors += 1
|
||||
self.logger.increment('errors')
|
||||
self.logger.error(
|
||||
_('ERROR async pending file with unexpected '
|
||||
|
@ -195,7 +247,8 @@ class ObjectUpdater(Daemon):
|
|||
% (update_path))
|
||||
continue
|
||||
if obj_hash == last_obj_hash:
|
||||
self.logger.increment("unlinks")
|
||||
self.stats.unlinks += 1
|
||||
self.logger.increment('unlinks')
|
||||
os.unlink(update_path)
|
||||
else:
|
||||
self.process_object_update(update_path, device,
|
||||
|
@ -205,11 +258,39 @@ class ObjectUpdater(Daemon):
|
|||
self.objects_running_time = ratelimit_sleep(
|
||||
self.objects_running_time,
|
||||
self.max_objects_per_second)
|
||||
|
||||
now = time.time()
|
||||
if now - last_status_update >= self.report_interval:
|
||||
this_sweep = self.stats.since(start_stats)
|
||||
self.logger.info(
|
||||
('Object update sweep progress on %(device)s: '
|
||||
'%(elapsed).02fs, %(stats)s (pid: %(pid)d)'),
|
||||
{'device': device,
|
||||
'elapsed': now - start_time,
|
||||
'pid': my_pid,
|
||||
'stats': this_sweep})
|
||||
last_status_update = now
|
||||
try:
|
||||
os.rmdir(prefix_path)
|
||||
except OSError:
|
||||
pass
|
||||
self.logger.timing_since('timing', start_time)
|
||||
sweep_totals = self.stats.since(start_stats)
|
||||
self.logger.info(
|
||||
('Object update sweep completed on %(device)s '
|
||||
'in %(elapsed).02fs seconds:, '
|
||||
'%(successes)d successes, %(failures)d failures, '
|
||||
'%(quarantines)d quarantines, '
|
||||
'%(unlinks)d unlinks, %(errors)d errors '
|
||||
'(pid: %(pid)d)'),
|
||||
{'device': device,
|
||||
'elapsed': time.time() - start_time,
|
||||
'pid': my_pid,
|
||||
'successes': sweep_totals.successes,
|
||||
'failures': sweep_totals.failures,
|
||||
'quarantines': sweep_totals.quarantines,
|
||||
'unlinks': sweep_totals.unlinks,
|
||||
'errors': sweep_totals.errors})
|
||||
|
||||
def process_object_update(self, update_path, device, policy):
|
||||
"""
|
||||
|
@ -224,6 +305,7 @@ class ObjectUpdater(Daemon):
|
|||
except Exception:
|
||||
self.logger.exception(
|
||||
_('ERROR Pickle problem, quarantining %s'), update_path)
|
||||
self.stats.quarantines += 1
|
||||
self.logger.increment('quarantines')
|
||||
target_path = os.path.join(device, 'quarantined', 'objects',
|
||||
os.path.basename(update_path))
|
||||
|
@ -284,14 +366,15 @@ class ObjectUpdater(Daemon):
|
|||
break
|
||||
|
||||
if success:
|
||||
self.successes += 1
|
||||
self.stats.successes += 1
|
||||
self.logger.increment('successes')
|
||||
self.logger.debug('Update sent for %(obj)s %(path)s',
|
||||
{'obj': obj, 'path': update_path})
|
||||
self.logger.increment("unlinks")
|
||||
self.stats.unlinks += 1
|
||||
self.logger.increment('unlinks')
|
||||
os.unlink(update_path)
|
||||
else:
|
||||
self.failures += 1
|
||||
self.stats.failures += 1
|
||||
self.logger.increment('failures')
|
||||
self.logger.debug('Update failed for %(obj)s %(path)s',
|
||||
{'obj': obj, 'path': update_path})
|
||||
|
|
|
@ -1845,9 +1845,11 @@ class Controller(object):
|
|||
if is_success(resp.status_int):
|
||||
self.app.logger.info(_('autocreate account %r'), path)
|
||||
clear_info_cache(self.app, req.environ, account)
|
||||
return True
|
||||
else:
|
||||
self.app.logger.warning(_('Could not autocreate account %r'),
|
||||
path)
|
||||
return False
|
||||
|
||||
def GETorHEAD_base(self, req, server_type, node_iter, partition, path,
|
||||
concurrency=1, client_chunk_size=None):
|
||||
|
|
|
@ -27,7 +27,7 @@ from swift.proxy.controllers.base import Controller, delay_denial, \
|
|||
cors_validation, set_info_cache, clear_info_cache
|
||||
from swift.common.storage_policy import POLICIES
|
||||
from swift.common.swob import HTTPBadRequest, HTTPForbidden, \
|
||||
HTTPNotFound
|
||||
HTTPNotFound, HTTPServerError
|
||||
from swift.container.backend import DB_STATE_SHARDING, DB_STATE_UNSHARDED, \
|
||||
DB_STATE_SHARDED
|
||||
|
||||
|
@ -254,7 +254,8 @@ class ContainerController(Controller):
|
|||
account_partition, accounts, container_count = \
|
||||
self.account_info(self.account_name, req)
|
||||
if not accounts and self.app.account_autocreate:
|
||||
self.autocreate_account(req, self.account_name)
|
||||
if not self.autocreate_account(req, self.account_name):
|
||||
return HTTPServerError(request=req)
|
||||
account_partition, accounts, container_count = \
|
||||
self.account_info(self.account_name, req)
|
||||
if not accounts:
|
||||
|
|
|
@ -86,6 +86,46 @@ def check_content_type(req):
|
|||
return None
|
||||
|
||||
|
||||
def num_container_updates(container_replicas, container_quorum,
|
||||
object_replicas, object_quorum):
|
||||
"""
|
||||
We need to send container updates via enough object servers such
|
||||
that, if the object PUT succeeds, then the container update is
|
||||
durable (either it's synchronously updated or written to async
|
||||
pendings).
|
||||
|
||||
Define:
|
||||
Qc = the quorum size for the container ring
|
||||
Qo = the quorum size for the object ring
|
||||
Rc = the replica count for the container ring
|
||||
Ro = the replica count (or EC N+K) for the object ring
|
||||
|
||||
A durable container update is one that's made it to at least Qc
|
||||
nodes. To always be durable, we have to send enough container
|
||||
updates so that, if only Qo object PUTs succeed, and all the
|
||||
failed object PUTs had container updates, at least Qc updates
|
||||
remain. Since (Ro - Qo) object PUTs may fail, we must have at
|
||||
least Qc + Ro - Qo container updates to ensure that Qc of them
|
||||
remain.
|
||||
|
||||
Also, each container replica is named in at least one object PUT
|
||||
request so that, when all requests succeed, no work is generated
|
||||
for the container replicator. Thus, at least Rc updates are
|
||||
necessary.
|
||||
|
||||
:param container_replicas: replica count for the container ring (Rc)
|
||||
:param container_quorum: quorum size for the container ring (Qc)
|
||||
:param object_replicas: replica count for the object ring (Ro)
|
||||
:param object_quorum: quorum size for the object ring (Qo)
|
||||
|
||||
"""
|
||||
return max(
|
||||
# Qc + Ro - Qo
|
||||
container_quorum + object_replicas - object_quorum,
|
||||
# Rc
|
||||
container_replicas)
|
||||
|
||||
|
||||
class ObjectControllerRouter(object):
|
||||
|
||||
policy_type_to_controller_map = {}
|
||||
|
@ -267,6 +307,8 @@ class BaseObjectController(Controller):
|
|||
if error_response:
|
||||
return error_response
|
||||
|
||||
req.headers['X-Timestamp'] = Timestamp.now().internal
|
||||
|
||||
req, delete_at_container, delete_at_part, \
|
||||
delete_at_nodes = self._config_obj_expiration(req)
|
||||
|
||||
|
@ -281,8 +323,6 @@ class BaseObjectController(Controller):
|
|||
partition, nodes = obj_ring.get_nodes(
|
||||
self.account_name, self.container_name, self.object_name)
|
||||
|
||||
req.headers['X-Timestamp'] = Timestamp.now().internal
|
||||
|
||||
headers = self._backend_requests(
|
||||
req, len(nodes), container_partition, container_nodes,
|
||||
delete_at_container, delete_at_part, delete_at_nodes,
|
||||
|
@ -309,31 +349,55 @@ class BaseObjectController(Controller):
|
|||
if container_path:
|
||||
headers[index]['X-Backend-Container-Path'] = container_path
|
||||
|
||||
for i, container in enumerate(containers):
|
||||
i = i % len(headers)
|
||||
set_container_update(i, container)
|
||||
def set_delete_at_headers(index, delete_at_node):
|
||||
headers[index]['X-Delete-At-Container'] = delete_at_container
|
||||
headers[index]['X-Delete-At-Partition'] = delete_at_partition
|
||||
headers[index]['X-Delete-At-Host'] = csv_append(
|
||||
headers[index].get('X-Delete-At-Host'),
|
||||
'%(ip)s:%(port)s' % delete_at_node)
|
||||
headers[index]['X-Delete-At-Device'] = csv_append(
|
||||
headers[index].get('X-Delete-At-Device'),
|
||||
delete_at_node['device'])
|
||||
|
||||
n_updates_needed = num_container_updates(
|
||||
len(containers), quorum_size(len(containers)),
|
||||
n_outgoing, policy.quorum)
|
||||
|
||||
# if # of container_updates is not enough against # of replicas
|
||||
# (or fragments). Fill them like as pigeon hole problem.
|
||||
# TODO?: apply these to X-Delete-At-Container?
|
||||
n_updates_needed = min(policy.quorum + 1, n_outgoing)
|
||||
container_iter = itertools.cycle(containers)
|
||||
existing_updates = len(containers)
|
||||
dan_iter = itertools.cycle(delete_at_nodes or [])
|
||||
existing_updates = 0
|
||||
while existing_updates < n_updates_needed:
|
||||
set_container_update(existing_updates, next(container_iter))
|
||||
index = existing_updates % n_outgoing
|
||||
set_container_update(index, next(container_iter))
|
||||
if delete_at_nodes:
|
||||
# We reverse the index in order to distribute the updates
|
||||
# across all nodes.
|
||||
set_delete_at_headers(n_outgoing - 1 - index, next(dan_iter))
|
||||
existing_updates += 1
|
||||
|
||||
for i, node in enumerate(delete_at_nodes or []):
|
||||
i = i % len(headers)
|
||||
|
||||
headers[i]['X-Delete-At-Container'] = delete_at_container
|
||||
headers[i]['X-Delete-At-Partition'] = delete_at_partition
|
||||
headers[i]['X-Delete-At-Host'] = csv_append(
|
||||
headers[i].get('X-Delete-At-Host'),
|
||||
'%(ip)s:%(port)s' % node)
|
||||
headers[i]['X-Delete-At-Device'] = csv_append(
|
||||
headers[i].get('X-Delete-At-Device'),
|
||||
node['device'])
|
||||
# Keep the number of expirer-queue deletes to a reasonable number.
|
||||
#
|
||||
# In the best case, at least one object server writes out an
|
||||
# async_pending for an expirer-queue update. In the worst case, no
|
||||
# object server does so, and an expirer-queue row remains that
|
||||
# refers to an already-deleted object. In this case, upon attempting
|
||||
# to delete the object, the object expirer will notice that the
|
||||
# object does not exist and then remove the row from the expirer
|
||||
# queue.
|
||||
#
|
||||
# In other words: expirer-queue updates on object DELETE are nice to
|
||||
# have, but not strictly necessary for correct operation.
|
||||
#
|
||||
# Also, each queue update results in an async_pending record, which
|
||||
# causes the object updater to talk to all container servers. If we
|
||||
# have N async_pendings and Rc container replicas, we cause N * Rc
|
||||
# requests from object updaters to container servers (possibly more,
|
||||
# depending on retries). Thus, it is helpful to keep this number
|
||||
# small.
|
||||
n_desired_queue_updates = 2
|
||||
for i in range(len(headers)):
|
||||
headers[i].setdefault('X-Backend-Clean-Expiring-Object-Queue',
|
||||
't' if i < n_desired_queue_updates else 'f')
|
||||
|
||||
return headers
|
||||
|
||||
|
@ -480,7 +544,9 @@ class BaseObjectController(Controller):
|
|||
req.headers.pop('x-detect-content-type')
|
||||
|
||||
def _update_x_timestamp(self, req):
|
||||
# Used by container sync feature
|
||||
# The container sync feature includes an x-timestamp header with
|
||||
# requests. If present this is checked and preserved, otherwise a fresh
|
||||
# timestamp is added.
|
||||
if 'x-timestamp' in req.headers:
|
||||
try:
|
||||
req_timestamp = Timestamp(req.headers['X-Timestamp'])
|
||||
|
@ -735,14 +801,14 @@ class BaseObjectController(Controller):
|
|||
# update content type in case it is missing
|
||||
self._update_content_type(req)
|
||||
|
||||
self._update_x_timestamp(req)
|
||||
|
||||
# check constraints on object name and request headers
|
||||
error_response = check_object_creation(req, self.object_name) or \
|
||||
check_content_type(req)
|
||||
if error_response:
|
||||
return error_response
|
||||
|
||||
self._update_x_timestamp(req)
|
||||
|
||||
def reader():
|
||||
try:
|
||||
return req.environ['wsgi.input'].read(
|
||||
|
@ -794,18 +860,8 @@ class BaseObjectController(Controller):
|
|||
return HTTPNotFound(request=req)
|
||||
partition, nodes = obj_ring.get_nodes(
|
||||
self.account_name, self.container_name, self.object_name)
|
||||
# Used by container sync feature
|
||||
if 'x-timestamp' in req.headers:
|
||||
try:
|
||||
req_timestamp = Timestamp(req.headers['X-Timestamp'])
|
||||
except ValueError:
|
||||
return HTTPBadRequest(
|
||||
request=req, content_type='text/plain',
|
||||
body='X-Timestamp should be a UNIX timestamp float value; '
|
||||
'was %r' % req.headers['x-timestamp'])
|
||||
req.headers['X-Timestamp'] = req_timestamp.internal
|
||||
else:
|
||||
req.headers['X-Timestamp'] = Timestamp.now().internal
|
||||
|
||||
self._update_x_timestamp(req)
|
||||
|
||||
# Include local handoff nodes if write-affinity is enabled.
|
||||
node_count = len(nodes)
|
||||
|
@ -1049,7 +1105,11 @@ class ECAppIter(object):
|
|||
"""
|
||||
self.mime_boundary = resp.boundary
|
||||
|
||||
self.stashed_iter = reiterate(self._real_iter(req, resp.headers))
|
||||
try:
|
||||
self.stashed_iter = reiterate(self._real_iter(req, resp.headers))
|
||||
except Exception:
|
||||
self.close()
|
||||
raise
|
||||
|
||||
if self.learned_content_type is not None:
|
||||
resp.content_type = self.learned_content_type
|
||||
|
@ -2109,7 +2169,7 @@ class ECGetResponseCollection(object):
|
|||
Return the best bucket in the collection.
|
||||
|
||||
The "best" bucket is the newest timestamp with sufficient getters, or
|
||||
the closest to having a sufficient getters, unless it is bettered by a
|
||||
the closest to having sufficient getters, unless it is bettered by a
|
||||
bucket with potential alternate nodes.
|
||||
|
||||
:return: An instance of :class:`~ECGetResponseBucket` or None if there
|
||||
|
|
|
@ -219,9 +219,21 @@ class TestSlo(Base):
|
|||
"Expected slo_enabled to be True/False, got %r" %
|
||||
(self.env.slo_enabled,))
|
||||
|
||||
manifest_abcde_hash = hashlib.md5()
|
||||
manifest_abcde_hash.update(hashlib.md5('a' * 1024 * 1024).hexdigest())
|
||||
manifest_abcde_hash.update(hashlib.md5('b' * 1024 * 1024).hexdigest())
|
||||
manifest_abcde_hash.update(hashlib.md5('c' * 1024 * 1024).hexdigest())
|
||||
manifest_abcde_hash.update(hashlib.md5('d' * 1024 * 1024).hexdigest())
|
||||
manifest_abcde_hash.update(hashlib.md5('e').hexdigest())
|
||||
self.manifest_abcde_etag = manifest_abcde_hash.hexdigest()
|
||||
|
||||
def test_slo_get_simple_manifest(self):
|
||||
file_item = self.env.container.file('manifest-abcde')
|
||||
file_contents = file_item.read()
|
||||
self.assertEqual(file_item.conn.response.status, 200)
|
||||
headers = dict(file_item.conn.response.getheaders())
|
||||
self.assertIn('etag', headers)
|
||||
self.assertEqual(headers['etag'], '"%s"' % self.manifest_abcde_etag)
|
||||
self.assertEqual(4 * 1024 * 1024 + 1, len(file_contents))
|
||||
self.assertEqual('a', file_contents[0])
|
||||
self.assertEqual('a', file_contents[1024 * 1024 - 1])
|
||||
|
@ -348,6 +360,10 @@ class TestSlo(Base):
|
|||
file_item = self.env.container.file('manifest-abcde')
|
||||
file_contents = file_item.read(size=1024 * 1024 + 2,
|
||||
offset=1024 * 1024 - 1)
|
||||
self.assertEqual(file_item.conn.response.status, 206)
|
||||
headers = dict(file_item.conn.response.getheaders())
|
||||
self.assertIn('etag', headers)
|
||||
self.assertEqual(headers['etag'], '"%s"' % self.manifest_abcde_etag)
|
||||
self.assertEqual('a', file_contents[0])
|
||||
self.assertEqual('b', file_contents[1])
|
||||
self.assertEqual('b', file_contents[-2])
|
||||
|
@ -418,16 +434,10 @@ class TestSlo(Base):
|
|||
self.assertEqual('d', file_contents[-1])
|
||||
|
||||
def test_slo_etag_is_hash_of_etags(self):
|
||||
expected_hash = hashlib.md5()
|
||||
expected_hash.update(hashlib.md5('a' * 1024 * 1024).hexdigest())
|
||||
expected_hash.update(hashlib.md5('b' * 1024 * 1024).hexdigest())
|
||||
expected_hash.update(hashlib.md5('c' * 1024 * 1024).hexdigest())
|
||||
expected_hash.update(hashlib.md5('d' * 1024 * 1024).hexdigest())
|
||||
expected_hash.update(hashlib.md5('e').hexdigest())
|
||||
expected_etag = expected_hash.hexdigest()
|
||||
|
||||
# we have this check in test_slo_get_simple_manifest, too,
|
||||
# but verify that it holds for HEAD requests
|
||||
file_item = self.env.container.file('manifest-abcde')
|
||||
self.assertEqual(expected_etag, file_item.info()['etag'])
|
||||
self.assertEqual(self.manifest_abcde_etag, file_item.info()['etag'])
|
||||
|
||||
def test_slo_etag_is_hash_of_etags_submanifests(self):
|
||||
|
||||
|
|
|
@ -12,6 +12,8 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import errno
|
||||
import os
|
||||
import random
|
||||
import time
|
||||
import uuid
|
||||
|
@ -103,7 +105,7 @@ class TestObjectExpirer(ReplProbeTest):
|
|||
|
||||
# clear proxy cache
|
||||
client.post_container(self.url, self.token, self.container_name, {})
|
||||
# run the expirier again after replication
|
||||
# run the expirer again after replication
|
||||
self.expirer.once()
|
||||
|
||||
# object is not in the listing
|
||||
|
@ -126,6 +128,80 @@ class TestObjectExpirer(ReplProbeTest):
|
|||
self.assertGreater(Timestamp(metadata['x-backend-timestamp']),
|
||||
create_timestamp)
|
||||
|
||||
def test_expirer_doesnt_make_async_pendings(self):
|
||||
# The object expirer cleans up its own queue. The inner loop
|
||||
# basically looks like this:
|
||||
#
|
||||
# for obj in stuff_to_delete:
|
||||
# delete_the_object(obj)
|
||||
# remove_the_queue_entry(obj)
|
||||
#
|
||||
# By default, upon receipt of a DELETE request for an expiring
|
||||
# object, the object servers will create async_pending records to
|
||||
# clean the expirer queue. Since the expirer cleans its own queue,
|
||||
# this is unnecessary. The expirer can make requests in such a way
|
||||
# tha the object server does not write out any async pendings; this
|
||||
# test asserts that this is the case.
|
||||
|
||||
def gather_async_pendings(onodes):
|
||||
async_pendings = []
|
||||
for onode in onodes:
|
||||
device_dir = self.device_dir('', onode)
|
||||
for ap_pol_dir in os.listdir(device_dir):
|
||||
if not ap_pol_dir.startswith('async_pending'):
|
||||
# skip 'objects', 'containers', etc.
|
||||
continue
|
||||
async_pending_dir = os.path.join(device_dir, ap_pol_dir)
|
||||
try:
|
||||
ap_dirs = os.listdir(async_pending_dir)
|
||||
except OSError as err:
|
||||
if err.errno == errno.ENOENT:
|
||||
pass
|
||||
else:
|
||||
raise
|
||||
else:
|
||||
for ap_dir in ap_dirs:
|
||||
ap_dir_fullpath = os.path.join(
|
||||
async_pending_dir, ap_dir)
|
||||
async_pendings.extend([
|
||||
os.path.join(ap_dir_fullpath, ent)
|
||||
for ent in os.listdir(ap_dir_fullpath)])
|
||||
return async_pendings
|
||||
|
||||
# Make an expiring object in each policy
|
||||
for policy in ENABLED_POLICIES:
|
||||
container_name = "expirer-test-%d" % policy.idx
|
||||
container_headers = {'X-Storage-Policy': policy.name}
|
||||
client.put_container(self.url, self.token, container_name,
|
||||
headers=container_headers)
|
||||
|
||||
now = time.time()
|
||||
delete_at = int(now + 2.0)
|
||||
client.put_object(
|
||||
self.url, self.token, container_name, "some-object",
|
||||
headers={'X-Delete-At': str(delete_at),
|
||||
'X-Timestamp': Timestamp(now).normal},
|
||||
contents='dontcare')
|
||||
|
||||
time.sleep(2.0)
|
||||
# make sure auto-created expirer-queue containers get in the account
|
||||
# listing so the expirer can find them
|
||||
Manager(['container-updater']).once()
|
||||
|
||||
# Make sure there's no async_pendings anywhere. Probe tests only run
|
||||
# on single-node installs anyway, so this set should be small enough
|
||||
# that an exhaustive check doesn't take too long.
|
||||
all_obj_nodes = {}
|
||||
for policy in ENABLED_POLICIES:
|
||||
for dev in policy.object_ring.devs:
|
||||
all_obj_nodes[dev['device']] = dev
|
||||
pendings_before = gather_async_pendings(all_obj_nodes.values())
|
||||
|
||||
# expire the objects
|
||||
Manager(['object-expirer']).once()
|
||||
pendings_after = gather_async_pendings(all_obj_nodes.values())
|
||||
self.assertEqual(pendings_after, pendings_before)
|
||||
|
||||
def test_expirer_object_should_not_be_expired(self):
|
||||
|
||||
# Current object-expirer checks the correctness via x-if-delete-at
|
||||
|
@ -282,9 +358,6 @@ class TestObjectExpirer(ReplProbeTest):
|
|||
# run expirer again, delete should now succeed
|
||||
self.expirer.once()
|
||||
|
||||
# this is mainly to paper over lp bug #1652323
|
||||
self.get_to_final_state()
|
||||
|
||||
# verify the deletion by checking the container listing
|
||||
self.assertFalse(self._check_obj_in_container_listing(),
|
||||
msg='Found listing for %s' % self.object_name)
|
||||
|
|
|
@ -133,7 +133,7 @@ class TestCommands(unittest.TestCase, RunSwiftRingBuilderMixin):
|
|||
msg += '%3d: %s\n' % (i, line)
|
||||
self.fail(msg)
|
||||
|
||||
def create_sample_ring(self, part_power=6, overload=None):
|
||||
def create_sample_ring(self, part_power=6, overload=None, empty=False):
|
||||
"""
|
||||
Create a sample ring with four devices
|
||||
|
||||
|
@ -152,35 +152,36 @@ class TestCommands(unittest.TestCase, RunSwiftRingBuilderMixin):
|
|||
ring = RingBuilder(part_power, 3, 1)
|
||||
if overload is not None:
|
||||
ring.set_overload(overload)
|
||||
ring.add_dev({'weight': 100.0,
|
||||
'region': 0,
|
||||
'zone': 0,
|
||||
'ip': '127.0.0.1',
|
||||
'port': 6200,
|
||||
'device': 'sda1',
|
||||
'meta': 'some meta data',
|
||||
})
|
||||
ring.add_dev({'weight': 100.0,
|
||||
'region': 1,
|
||||
'zone': 1,
|
||||
'ip': '127.0.0.2',
|
||||
'port': 6201,
|
||||
'device': 'sda2'
|
||||
})
|
||||
ring.add_dev({'weight': 100.0,
|
||||
'region': 2,
|
||||
'zone': 2,
|
||||
'ip': '127.0.0.3',
|
||||
'port': 6202,
|
||||
'device': 'sdc3'
|
||||
})
|
||||
ring.add_dev({'weight': 100.0,
|
||||
'region': 3,
|
||||
'zone': 3,
|
||||
'ip': '127.0.0.4',
|
||||
'port': 6203,
|
||||
'device': 'sdd4'
|
||||
})
|
||||
if not empty:
|
||||
ring.add_dev({'weight': 100.0,
|
||||
'region': 0,
|
||||
'zone': 0,
|
||||
'ip': '127.0.0.1',
|
||||
'port': 6200,
|
||||
'device': 'sda1',
|
||||
'meta': 'some meta data',
|
||||
})
|
||||
ring.add_dev({'weight': 100.0,
|
||||
'region': 1,
|
||||
'zone': 1,
|
||||
'ip': '127.0.0.2',
|
||||
'port': 6201,
|
||||
'device': 'sda2'
|
||||
})
|
||||
ring.add_dev({'weight': 100.0,
|
||||
'region': 2,
|
||||
'zone': 2,
|
||||
'ip': '127.0.0.3',
|
||||
'port': 6202,
|
||||
'device': 'sdc3'
|
||||
})
|
||||
ring.add_dev({'weight': 100.0,
|
||||
'region': 3,
|
||||
'zone': 3,
|
||||
'ip': '127.0.0.4',
|
||||
'port': 6203,
|
||||
'device': 'sdd4'
|
||||
})
|
||||
ring.save(self.tmpfile)
|
||||
return ring
|
||||
|
||||
|
@ -1885,6 +1886,55 @@ class TestCommands(unittest.TestCase, RunSwiftRingBuilderMixin):
|
|||
ring_invalid_re = re.compile("Ring file .*\.ring\.gz is invalid")
|
||||
self.assertTrue(ring_invalid_re.findall(mock_stdout.getvalue()))
|
||||
|
||||
def test_default_no_device_ring_without_exception(self):
|
||||
self.create_sample_ring()
|
||||
|
||||
# remove devices from ring file
|
||||
mock_stdout = six.StringIO()
|
||||
mock_stderr = six.StringIO()
|
||||
for device in ["d0", "d1", "d2", "d3"]:
|
||||
argv = ["", self.tmpfile, "remove", device]
|
||||
with mock.patch("sys.stdout", mock_stdout):
|
||||
with mock.patch("sys.stderr", mock_stderr):
|
||||
self.assertSystemExit(EXIT_SUCCESS, ringbuilder.main, argv)
|
||||
# default ring file without exception
|
||||
mock_stdout = six.StringIO()
|
||||
mock_stderr = six.StringIO()
|
||||
argv = ["", self.tmpfile, "default"]
|
||||
with mock.patch("sys.stdout", mock_stdout):
|
||||
with mock.patch("sys.stderr", mock_stderr):
|
||||
self.assertSystemExit(EXIT_SUCCESS, ringbuilder.main, argv)
|
||||
deleted_dev_list = (
|
||||
" 0 0 0 127.0.0.1:6200 127.0.0.1:6200 "
|
||||
"sda1 0.00 0 0.00 DEL some meta data\n"
|
||||
" 1 1 1 127.0.0.2:6201 127.0.0.2:6201 "
|
||||
"sda2 0.00 0 0.00 DEL \n"
|
||||
" 2 2 2 127.0.0.3:6202 127.0.0.3:6202 "
|
||||
"sdc3 0.00 0 0.00 DEL \n"
|
||||
" 3 3 3 127.0.0.4:6203 127.0.0.4:6203 "
|
||||
"sdd4 0.00 0 0.00 DEL \n")
|
||||
|
||||
output = mock_stdout.getvalue()
|
||||
self.assertIn("64 partitions", output)
|
||||
self.assertIn("all devices have been deleted", output)
|
||||
self.assertIn("all devices have been deleted", output)
|
||||
self.assertIn(deleted_dev_list, output)
|
||||
|
||||
def test_empty_ring(self):
|
||||
self.create_sample_ring(empty=True)
|
||||
|
||||
# default ring file without exception
|
||||
mock_stdout = six.StringIO()
|
||||
mock_stderr = six.StringIO()
|
||||
argv = ["", self.tmpfile, "default"]
|
||||
with mock.patch("sys.stdout", mock_stdout):
|
||||
with mock.patch("sys.stderr", mock_stderr):
|
||||
self.assertSystemExit(EXIT_SUCCESS, ringbuilder.main, argv)
|
||||
|
||||
output = mock_stdout.getvalue()
|
||||
self.assertIn("64 partitions", output)
|
||||
self.assertIn("There are no devices in this ring", output)
|
||||
|
||||
def test_pretend_min_part_hours_passed(self):
|
||||
self.run_srb("create", 8, 3, 1)
|
||||
argv_pretend = ["", self.tmpfile, "pretend_min_part_hours_passed"]
|
||||
|
@ -1927,74 +1977,41 @@ class TestCommands(unittest.TestCase, RunSwiftRingBuilderMixin):
|
|||
# device won't acquire any partitions, so the ring's balance won't
|
||||
# change. However, dispersion will improve.
|
||||
|
||||
ring = RingBuilder(6, 5, 1)
|
||||
ring.add_dev({
|
||||
'region': 1, 'zone': 1,
|
||||
'ip': '10.0.0.1', 'port': 20001, 'weight': 1000,
|
||||
'device': 'sda'})
|
||||
ring.add_dev({
|
||||
'region': 1, 'zone': 1,
|
||||
'ip': '10.0.0.1', 'port': 20001, 'weight': 1000,
|
||||
'device': 'sdb'})
|
||||
ring.add_dev({
|
||||
'region': 1, 'zone': 1,
|
||||
'ip': '10.0.0.1', 'port': 20001, 'weight': 1000,
|
||||
'device': 'sdc'})
|
||||
ring.add_dev({
|
||||
'region': 1, 'zone': 1,
|
||||
'ip': '10.0.0.1', 'port': 20001, 'weight': 1000,
|
||||
'device': 'sdd'})
|
||||
ring.add_dev({
|
||||
'region': 1, 'zone': 1,
|
||||
'ip': '10.0.0.1', 'port': 20001, 'weight': 1000,
|
||||
'device': 'sde'})
|
||||
ring = RingBuilder(6, 6, 1)
|
||||
devs = ('d%s' % i for i in itertools.count())
|
||||
for i in range(6):
|
||||
ring.add_dev({
|
||||
'region': 1, 'zone': 1,
|
||||
'ip': '10.0.0.1', 'port': 20001, 'weight': 1000,
|
||||
'device': next(devs)})
|
||||
ring.rebalance()
|
||||
|
||||
# The last guy in zone 1
|
||||
ring.add_dev({
|
||||
'region': 1, 'zone': 1,
|
||||
'ip': '10.0.0.1', 'port': 20001, 'weight': 1000,
|
||||
'device': 'sdf'})
|
||||
'device': next(devs)})
|
||||
|
||||
# Add zone 2 (same total weight as zone 1)
|
||||
ring.add_dev({
|
||||
'region': 1, 'zone': 2,
|
||||
'ip': '10.0.0.2', 'port': 20001, 'weight': 1000,
|
||||
'device': 'sda'})
|
||||
ring.add_dev({
|
||||
'region': 1, 'zone': 2,
|
||||
'ip': '10.0.0.2', 'port': 20001, 'weight': 1000,
|
||||
'device': 'sdb'})
|
||||
ring.add_dev({
|
||||
'region': 1, 'zone': 2,
|
||||
'ip': '10.0.0.2', 'port': 20001, 'weight': 1000,
|
||||
'device': 'sdc'})
|
||||
ring.add_dev({
|
||||
'region': 1, 'zone': 2,
|
||||
'ip': '10.0.0.2', 'port': 20001, 'weight': 1000,
|
||||
'device': 'sdd'})
|
||||
ring.add_dev({
|
||||
'region': 1, 'zone': 2,
|
||||
'ip': '10.0.0.2', 'port': 20001, 'weight': 1000,
|
||||
'device': 'sde'})
|
||||
ring.add_dev({
|
||||
'region': 1, 'zone': 2,
|
||||
'ip': '10.0.0.2', 'port': 20001, 'weight': 1000,
|
||||
'device': 'sdf'})
|
||||
for i in range(7):
|
||||
ring.add_dev({
|
||||
'region': 1, 'zone': 2,
|
||||
'ip': '10.0.0.2', 'port': 20001, 'weight': 1000,
|
||||
'device': next(devs)})
|
||||
ring.pretend_min_part_hours_passed()
|
||||
ring.save(self.tmpfile)
|
||||
del ring
|
||||
|
||||
# Rebalance once: this gets 1/5 replica into zone 2; the ring is
|
||||
# Rebalance once: this gets 1/6th replica into zone 2; the ring is
|
||||
# saved because devices changed.
|
||||
argv = ["", self.tmpfile, "rebalance", "5759339"]
|
||||
self.assertSystemExit(EXIT_WARNING, ringbuilder.main, argv)
|
||||
rb = RingBuilder.load(self.tmpfile)
|
||||
self.assertEqual(rb.dispersion, 100)
|
||||
self.assertEqual(rb.dispersion, 33.333333333333336)
|
||||
self.assertEqual(rb.get_balance(), 100)
|
||||
self.run_srb('pretend_min_part_hours_passed')
|
||||
|
||||
# Rebalance again: this gets 2/5 replica into zone 2, but no devices
|
||||
# Rebalance again: this gets 2/6th replica into zone 2, but no devices
|
||||
# changed and the balance stays the same. The only improvement is
|
||||
# dispersion.
|
||||
|
||||
|
@ -2009,7 +2026,7 @@ class TestCommands(unittest.TestCase, RunSwiftRingBuilderMixin):
|
|||
with mock.patch('swift.common.ring.RingBuilder.save', capture_save):
|
||||
self.assertSystemExit(EXIT_WARNING, ringbuilder.main, argv)
|
||||
self.assertEqual(captured, {
|
||||
'dispersion': 0,
|
||||
'dispersion': 16.666666666666668,
|
||||
'balance': 100,
|
||||
})
|
||||
|
||||
|
@ -2212,6 +2229,14 @@ class TestCommands(unittest.TestCase, RunSwiftRingBuilderMixin):
|
|||
argv = ["", backup_file, "write_builder", "24"]
|
||||
self.assertIsNone(ringbuilder.main(argv))
|
||||
|
||||
rb = RingBuilder.load(self.tmpfile + '.builder')
|
||||
self.assertIsNotNone(rb._last_part_moves)
|
||||
rb._last_part_moves = None
|
||||
rb.save(self.tmpfile)
|
||||
|
||||
argv = ["", self.tmpfile + '.builder', "rebalance"]
|
||||
self.assertSystemExit(EXIT_WARNING, ringbuilder.main, argv)
|
||||
|
||||
def test_warn_at_risk(self):
|
||||
# check that warning is generated when rebalance does not achieve
|
||||
# satisfactory balance
|
||||
|
@ -2288,6 +2313,28 @@ class TestCommands(unittest.TestCase, RunSwiftRingBuilderMixin):
|
|||
self.assertIn('dispersion', out.lower())
|
||||
self.assertFalse(err)
|
||||
|
||||
def test_dispersion_command_recalculate(self):
|
||||
rb = RingBuilder(8, 3, 0)
|
||||
for i in range(3):
|
||||
i += 1
|
||||
rb.add_dev({'region': 1, 'zone': i, 'weight': 1.0,
|
||||
'ip': '127.0.0.%d' % i, 'port': 6000, 'device': 'sda'})
|
||||
# extra device in z1
|
||||
rb.add_dev({'region': 1, 'zone': 1, 'weight': 1.0,
|
||||
'ip': '127.0.0.1', 'port': 6000, 'device': 'sdb'})
|
||||
rb.rebalance()
|
||||
self.assertEqual(rb.dispersion, 16.666666666666668)
|
||||
# simulate an out-of-date dispersion calculation
|
||||
rb.dispersion = 50
|
||||
rb.save(self.tempfile)
|
||||
old_version = rb.version
|
||||
out, err = self.run_srb('dispersion')
|
||||
self.assertIn('Dispersion is 50.000000', out)
|
||||
out, err = self.run_srb('dispersion --recalculate')
|
||||
self.assertIn('Dispersion is 16.666667', out)
|
||||
rb = RingBuilder.load(self.tempfile)
|
||||
self.assertEqual(rb.version, old_version + 1)
|
||||
|
||||
def test_use_ringfile_as_builderfile(self):
|
||||
mock_stdout = six.StringIO()
|
||||
mock_stderr = six.StringIO()
|
||||
|
|
|
@ -78,15 +78,16 @@ def mock_config_opts_side_effect(*args, **kwargs):
|
|||
return dict()
|
||||
|
||||
|
||||
def mock_keystone_password_side_effect(username, password, project_name,
|
||||
user_domain_name, project_domain_name,
|
||||
user_id, user_domain_id, trust_id,
|
||||
def mock_keystone_password_side_effect(auth_url, username, password,
|
||||
project_name, user_domain_name,
|
||||
project_domain_name, user_id,
|
||||
user_domain_id, trust_id,
|
||||
domain_id, domain_name, project_id,
|
||||
project_domain_id, reauthenticate):
|
||||
return MockPassword(username, password, project_name, user_domain_name,
|
||||
project_domain_name, user_id, user_domain_id, trust_id,
|
||||
domain_id, domain_name, project_id, project_domain_id,
|
||||
reauthenticate)
|
||||
return MockPassword(auth_url, username, password, project_name,
|
||||
user_domain_name, project_domain_name, user_id,
|
||||
user_domain_id, trust_id, domain_id, domain_name,
|
||||
project_id, project_domain_id, reauthenticate)
|
||||
|
||||
ERR_MESSAGE_SECRET_INCORRECTLY_SPECIFIED = 'Secret incorrectly specified.'
|
||||
ERR_MESSAGE_KEY_UUID_NOT_FOUND = 'Key not found, uuid: '
|
||||
|
@ -154,10 +155,11 @@ class MockBarbicanKey(object):
|
|||
|
||||
|
||||
class MockPassword(object):
|
||||
def __init__(self, username, password, project_name, user_domain_name,
|
||||
project_domain_name, user_id, user_domain_id, trust_id,
|
||||
domain_id, domain_name, project_id, project_domain_id,
|
||||
reauthenticate):
|
||||
def __init__(self, auth_url, username, password, project_name,
|
||||
user_domain_name, project_domain_name, user_id,
|
||||
user_domain_id, trust_id, domain_id, domain_name, project_id,
|
||||
project_domain_id, reauthenticate):
|
||||
self.auth_url = auth_url
|
||||
self.password = password
|
||||
self.username = username
|
||||
self.user_domain_name = user_domain_name
|
||||
|
|
|
@ -28,9 +28,9 @@ from test.unit import FakeLogger, FakeRing
|
|||
|
||||
|
||||
class LeakTrackingIter(object):
|
||||
def __init__(self, inner_iter, fake_swift, path):
|
||||
def __init__(self, inner_iter, mark_closed, path):
|
||||
self.inner_iter = inner_iter
|
||||
self.fake_swift = fake_swift
|
||||
self.mark_closed = mark_closed
|
||||
self.path = path
|
||||
|
||||
def __iter__(self):
|
||||
|
@ -38,7 +38,7 @@ class LeakTrackingIter(object):
|
|||
yield x
|
||||
|
||||
def close(self):
|
||||
self.fake_swift.mark_closed(self.path)
|
||||
self.mark_closed(self.path)
|
||||
|
||||
|
||||
FakeSwiftCall = namedtuple('FakeSwiftCall', ['method', 'path', 'headers'])
|
||||
|
@ -173,7 +173,7 @@ class FakeSwift(object):
|
|||
conditional_etag=conditional_etag)
|
||||
wsgi_iter = resp(env, start_response)
|
||||
self.mark_opened(path)
|
||||
return LeakTrackingIter(wsgi_iter, self, path)
|
||||
return LeakTrackingIter(wsgi_iter, self.mark_closed, path)
|
||||
|
||||
def mark_opened(self, path):
|
||||
self._unclosed_req_paths[path] += 1
|
||||
|
|
|
@ -59,6 +59,9 @@ class SloTestCase(unittest.TestCase):
|
|||
slo_conf = {'rate_limit_under_size': '0'}
|
||||
self.slo = slo.filter_factory(slo_conf)(self.app)
|
||||
self.slo.logger = self.app.logger
|
||||
self.manifest_abcd_etag = md5hex(
|
||||
md5hex("a" * 5) + md5hex(md5hex("b" * 10) + md5hex("c" * 15)) +
|
||||
md5hex("d" * 20))
|
||||
|
||||
def call_app(self, req, app=None):
|
||||
if app is None:
|
||||
|
@ -1683,10 +1686,6 @@ class TestSloGetManifest(SloTestCase):
|
|||
'Etag': md5hex(_abcdefghijkl_manifest_json)},
|
||||
_abcdefghijkl_manifest_json)
|
||||
|
||||
self.manifest_abcd_etag = md5hex(
|
||||
md5hex("a" * 5) + md5hex(md5hex("b" * 10) + md5hex("c" * 15)) +
|
||||
md5hex("d" * 20))
|
||||
|
||||
_bc_ranges_manifest_json = json.dumps(
|
||||
[{'name': '/gettest/b_10', 'hash': md5hex('b' * 10),
|
||||
'content_type': 'text/plain', 'bytes': '10',
|
||||
|
@ -1986,7 +1985,7 @@ class TestSloGetManifest(SloTestCase):
|
|||
|
||||
self.assertEqual(status, '206 Partial Content')
|
||||
self.assertEqual(headers['Content-Length'], '15')
|
||||
self.assertNotIn('Etag', headers)
|
||||
self.assertEqual(headers['Etag'], '"%s"' % self.manifest_abcd_etag)
|
||||
self.assertEqual(body, 'aabbbbbbbbbbccc')
|
||||
|
||||
self.assertEqual(
|
||||
|
@ -2231,7 +2230,7 @@ class TestSloGetManifest(SloTestCase):
|
|||
|
||||
self.assertEqual(status, '206 Partial Content')
|
||||
self.assertEqual(headers['Content-Length'], '25')
|
||||
self.assertNotIn('Etag', headers)
|
||||
self.assertEqual(headers['Etag'], '"%s"' % self.manifest_abcd_etag)
|
||||
self.assertEqual(body, 'bbbbbbbbbbccccccccccccccc')
|
||||
|
||||
self.assertEqual(
|
||||
|
@ -2434,7 +2433,7 @@ class TestSloGetManifest(SloTestCase):
|
|||
self.assertEqual(status, '206 Partial Content')
|
||||
self.assertEqual(headers['Content-Length'], '20')
|
||||
self.assertEqual(headers['Content-Type'], 'application/json')
|
||||
self.assertNotIn('Etag', headers)
|
||||
self.assertIn('Etag', headers)
|
||||
self.assertEqual(body, 'accccccccbbbbbbbbddd')
|
||||
|
||||
self.assertEqual(
|
||||
|
@ -3235,9 +3234,7 @@ class TestSloConditionalGetOldManifest(SloTestCase):
|
|||
|
||||
self.assertEqual(status, '206 Partial Content')
|
||||
self.assertIn(('Content-Length', '4'), headers)
|
||||
# We intentionally drop Etag for ranged requests.
|
||||
# Presumably because of broken clients?
|
||||
self.assertNotIn('etag', [h.lower() for h, v in headers])
|
||||
self.assertIn(('Etag', '"%s"' % self.manifest_abcd_etag), headers)
|
||||
self.assertEqual(body, 'aabb')
|
||||
|
||||
expected_app_calls = [
|
||||
|
|
|
@ -27,6 +27,7 @@ from tempfile import mkdtemp
|
|||
from shutil import rmtree
|
||||
import random
|
||||
import uuid
|
||||
import itertools
|
||||
|
||||
from six.moves import range
|
||||
|
||||
|
@ -36,6 +37,16 @@ from swift.common.ring import utils
|
|||
from swift.common.ring.builder import MAX_BALANCE
|
||||
|
||||
|
||||
def _partition_counts(builder, key='id'):
|
||||
"""
|
||||
Returns a dictionary mapping the given device key to (number of
|
||||
partitions assigned to that key).
|
||||
"""
|
||||
return Counter(builder.devs[dev_id][key]
|
||||
for part2dev_id in builder._replica2part2dev
|
||||
for dev_id in part2dev_id)
|
||||
|
||||
|
||||
class TestRingBuilder(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
|
@ -44,21 +55,12 @@ class TestRingBuilder(unittest.TestCase):
|
|||
def tearDown(self):
|
||||
rmtree(self.testdir, ignore_errors=1)
|
||||
|
||||
def _partition_counts(self, builder, key='id'):
|
||||
"""
|
||||
Returns a dictionary mapping the given device key to (number of
|
||||
partitions assigned to that key).
|
||||
"""
|
||||
return Counter(builder.devs[dev_id][key]
|
||||
for part2dev_id in builder._replica2part2dev
|
||||
for dev_id in part2dev_id)
|
||||
|
||||
def _get_population_by_region(self, builder):
|
||||
"""
|
||||
Returns a dictionary mapping region to number of partitions in that
|
||||
region.
|
||||
"""
|
||||
return self._partition_counts(builder, key='region')
|
||||
return _partition_counts(builder, key='region')
|
||||
|
||||
def test_init(self):
|
||||
rb = ring.RingBuilder(8, 3, 1)
|
||||
|
@ -320,7 +322,7 @@ class TestRingBuilder(unittest.TestCase):
|
|||
'port': 10000 + region * 100 + zone,
|
||||
'device': 'sda%d' % dev_id})
|
||||
rb.rebalance()
|
||||
self.assertEqual(self._partition_counts(rb, 'zone'),
|
||||
self.assertEqual(_partition_counts(rb, 'zone'),
|
||||
{0: 256, 10: 256, 11: 256})
|
||||
wanted_by_zone = defaultdict(lambda: defaultdict(int))
|
||||
for dev in rb._iter_devs():
|
||||
|
@ -777,7 +779,7 @@ class TestRingBuilder(unittest.TestCase):
|
|||
# replica should have been moved, therefore we expect 256 parts in zone
|
||||
# 0 and 1, and a total of 256 in zone 2,3, and 4
|
||||
expected = defaultdict(int, {0: 256, 1: 256, 2: 86, 3: 85, 4: 85})
|
||||
self.assertEqual(expected, self._partition_counts(rb, key='zone'))
|
||||
self.assertEqual(expected, _partition_counts(rb, key='zone'))
|
||||
|
||||
zone_histogram = defaultdict(int)
|
||||
for part in range(rb.parts):
|
||||
|
@ -804,7 +806,7 @@ class TestRingBuilder(unittest.TestCase):
|
|||
self.assertAlmostEqual(rb.get_balance(), 0, delta=0.5)
|
||||
|
||||
# every zone has either 153 or 154 parts
|
||||
for zone, count in self._partition_counts(
|
||||
for zone, count in _partition_counts(
|
||||
rb, key='zone').items():
|
||||
self.assertAlmostEqual(153.5, count, delta=1)
|
||||
|
||||
|
@ -872,18 +874,18 @@ class TestRingBuilder(unittest.TestCase):
|
|||
self.assertFalse(rb.ever_rebalanced)
|
||||
rb.rebalance()
|
||||
self.assertTrue(rb.ever_rebalanced)
|
||||
counts = self._partition_counts(rb)
|
||||
counts = _partition_counts(rb)
|
||||
self.assertEqual(counts, {0: 256, 1: 256, 2: 256})
|
||||
rb.add_dev({'id': 3, 'region': 0, 'zone': 3, 'weight': 1,
|
||||
'ip': '127.0.0.1', 'port': 10003, 'device': 'sda1'})
|
||||
rb.pretend_min_part_hours_passed()
|
||||
rb.rebalance()
|
||||
self.assertTrue(rb.ever_rebalanced)
|
||||
counts = self._partition_counts(rb)
|
||||
counts = _partition_counts(rb)
|
||||
self.assertEqual(counts, {0: 192, 1: 192, 2: 192, 3: 192})
|
||||
rb.set_dev_weight(3, 100)
|
||||
rb.rebalance()
|
||||
counts = self._partition_counts(rb)
|
||||
counts = _partition_counts(rb)
|
||||
self.assertEqual(counts[3], 256)
|
||||
|
||||
def test_add_rebalance_add_rebalance_delete_rebalance(self):
|
||||
|
@ -1637,7 +1639,7 @@ class TestRingBuilder(unittest.TestCase):
|
|||
rb.validate()
|
||||
|
||||
# sanity check: balance respects weights, so default
|
||||
part_counts = self._partition_counts(rb, key='zone')
|
||||
part_counts = _partition_counts(rb, key='zone')
|
||||
self.assertEqual(part_counts[0], 192)
|
||||
self.assertEqual(part_counts[1], 192)
|
||||
self.assertEqual(part_counts[2], 384)
|
||||
|
@ -1648,7 +1650,7 @@ class TestRingBuilder(unittest.TestCase):
|
|||
rb.pretend_min_part_hours_passed()
|
||||
rb.rebalance(seed=12345)
|
||||
|
||||
part_counts = self._partition_counts(rb, key='zone')
|
||||
part_counts = _partition_counts(rb, key='zone')
|
||||
self.assertEqual({0: 212, 1: 211, 2: 345}, part_counts)
|
||||
|
||||
# Now, devices 0 and 1 take 50% more than their fair shares by
|
||||
|
@ -1658,7 +1660,7 @@ class TestRingBuilder(unittest.TestCase):
|
|||
rb.pretend_min_part_hours_passed()
|
||||
rb.rebalance(seed=12345)
|
||||
|
||||
part_counts = self._partition_counts(rb, key='zone')
|
||||
part_counts = _partition_counts(rb, key='zone')
|
||||
self.assertEqual({0: 256, 1: 256, 2: 256}, part_counts)
|
||||
|
||||
# Devices 0 and 1 may take up to 75% over their fair share, but the
|
||||
|
@ -1669,7 +1671,7 @@ class TestRingBuilder(unittest.TestCase):
|
|||
rb.pretend_min_part_hours_passed()
|
||||
rb.rebalance(seed=12345)
|
||||
|
||||
part_counts = self._partition_counts(rb, key='zone')
|
||||
part_counts = _partition_counts(rb, key='zone')
|
||||
self.assertEqual(part_counts[0], 256)
|
||||
self.assertEqual(part_counts[1], 256)
|
||||
self.assertEqual(part_counts[2], 256)
|
||||
|
@ -1709,7 +1711,7 @@ class TestRingBuilder(unittest.TestCase):
|
|||
rb.rebalance(seed=12345)
|
||||
|
||||
# sanity check: our overload is big enough to balance things
|
||||
part_counts = self._partition_counts(rb, key='ip')
|
||||
part_counts = _partition_counts(rb, key='ip')
|
||||
self.assertEqual(part_counts['127.0.0.1'], 216)
|
||||
self.assertEqual(part_counts['127.0.0.2'], 216)
|
||||
self.assertEqual(part_counts['127.0.0.3'], 336)
|
||||
|
@ -1721,7 +1723,7 @@ class TestRingBuilder(unittest.TestCase):
|
|||
rb.pretend_min_part_hours_passed()
|
||||
rb.rebalance(seed=12345)
|
||||
|
||||
part_counts = self._partition_counts(rb, key='ip')
|
||||
part_counts = _partition_counts(rb, key='ip')
|
||||
|
||||
self.assertEqual({
|
||||
'127.0.0.1': 237,
|
||||
|
@ -1737,7 +1739,7 @@ class TestRingBuilder(unittest.TestCase):
|
|||
rb.pretend_min_part_hours_passed()
|
||||
rb.rebalance(seed=12345)
|
||||
|
||||
part_counts = self._partition_counts(rb, key='ip')
|
||||
part_counts = _partition_counts(rb, key='ip')
|
||||
self.assertEqual(part_counts['127.0.0.1'], 256)
|
||||
self.assertEqual(part_counts['127.0.0.2'], 256)
|
||||
self.assertEqual(part_counts['127.0.0.3'], 256)
|
||||
|
@ -1770,7 +1772,7 @@ class TestRingBuilder(unittest.TestCase):
|
|||
'127.0.0.4': 192,
|
||||
}
|
||||
|
||||
part_counts = self._partition_counts(rb, key='ip')
|
||||
part_counts = _partition_counts(rb, key='ip')
|
||||
self.assertEqual(part_counts, expected)
|
||||
|
||||
def test_overload_keeps_balanceable_things_balanced_initially(self):
|
||||
|
@ -1803,7 +1805,7 @@ class TestRingBuilder(unittest.TestCase):
|
|||
rb.set_overload(99999)
|
||||
rb.rebalance(seed=12345)
|
||||
|
||||
part_counts = self._partition_counts(rb)
|
||||
part_counts = _partition_counts(rb)
|
||||
self.assertEqual(part_counts, {
|
||||
0: 128,
|
||||
1: 128,
|
||||
|
@ -1847,7 +1849,7 @@ class TestRingBuilder(unittest.TestCase):
|
|||
rb.set_overload(99999)
|
||||
|
||||
rb.rebalance(seed=123)
|
||||
part_counts = self._partition_counts(rb)
|
||||
part_counts = _partition_counts(rb)
|
||||
self.assertEqual(part_counts, {
|
||||
0: 128,
|
||||
1: 128,
|
||||
|
@ -1868,7 +1870,7 @@ class TestRingBuilder(unittest.TestCase):
|
|||
rb.set_dev_weight(1, 8)
|
||||
|
||||
rb.rebalance(seed=456)
|
||||
part_counts = self._partition_counts(rb)
|
||||
part_counts = _partition_counts(rb)
|
||||
self.assertEqual(part_counts, {
|
||||
0: 128,
|
||||
1: 128,
|
||||
|
@ -2273,7 +2275,7 @@ class TestRingBuilder(unittest.TestCase):
|
|||
self.assertRaises(exceptions.RingValidationError, rb.validate)
|
||||
|
||||
rb.rebalance()
|
||||
counts = self._partition_counts(rb, key='zone')
|
||||
counts = _partition_counts(rb, key='zone')
|
||||
self.assertEqual(counts, {0: 128, 1: 128, 2: 256, 3: 256})
|
||||
|
||||
dev_usage, worst = rb.validate()
|
||||
|
@ -2455,7 +2457,7 @@ class TestRingBuilder(unittest.TestCase):
|
|||
# we'll rebalance but can't move any parts
|
||||
rb.rebalance(seed=1)
|
||||
# zero weight tier has one copy of 1/4 part-replica
|
||||
self.assertEqual(rb.dispersion, 75.0)
|
||||
self.assertEqual(rb.dispersion, 25.0)
|
||||
self.assertEqual(rb._dispersion_graph, {
|
||||
(0,): [0, 0, 0, 256],
|
||||
(0, 0): [0, 0, 0, 256],
|
||||
|
@ -2514,7 +2516,7 @@ class TestRingBuilder(unittest.TestCase):
|
|||
# so the first time, rings are still unbalanced becase we'll only move
|
||||
# one replica of each part.
|
||||
self.assertEqual(rb.get_balance(), 50.1953125)
|
||||
self.assertEqual(rb.dispersion, 99.609375)
|
||||
self.assertEqual(rb.dispersion, 16.6015625)
|
||||
|
||||
# N.B. since we mostly end up grabbing parts by "weight forced" some
|
||||
# seeds given some specific ring state will randomly pick bad
|
||||
|
@ -2524,14 +2526,14 @@ class TestRingBuilder(unittest.TestCase):
|
|||
# ... this isn't a really "desirable" behavior, but even with bad luck,
|
||||
# things do get better
|
||||
self.assertEqual(rb.get_balance(), 47.265625)
|
||||
self.assertEqual(rb.dispersion, 99.609375)
|
||||
self.assertEqual(rb.dispersion, 16.6015625)
|
||||
|
||||
# but if you stick with it, eventually the next rebalance, will get to
|
||||
# move "the right" part-replicas, resulting in near optimal balance
|
||||
changed_part, _, _ = rb.rebalance(seed=7)
|
||||
self.assertEqual(changed_part, 240)
|
||||
self.assertEqual(rb.get_balance(), 0.390625)
|
||||
self.assertEqual(rb.dispersion, 99.609375)
|
||||
self.assertEqual(rb.dispersion, 16.6015625)
|
||||
|
||||
def test_undispersable_server_converge_on_balance(self):
|
||||
rb = ring.RingBuilder(8, 6, 0)
|
||||
|
@ -2567,7 +2569,7 @@ class TestRingBuilder(unittest.TestCase):
|
|||
# but the first time, those are still unbalance becase ring builder
|
||||
# can move only one replica for each part
|
||||
self.assertEqual(rb.get_balance(), 16.9921875)
|
||||
self.assertEqual(rb.dispersion, 59.765625)
|
||||
self.assertEqual(rb.dispersion, 9.9609375)
|
||||
|
||||
rb.rebalance(seed=7)
|
||||
|
||||
|
@ -2575,7 +2577,7 @@ class TestRingBuilder(unittest.TestCase):
|
|||
self.assertGreaterEqual(rb.get_balance(), 0)
|
||||
self.assertLess(rb.get_balance(), 1)
|
||||
# dispersion doesn't get any worse
|
||||
self.assertEqual(rb.dispersion, 59.765625)
|
||||
self.assertEqual(rb.dispersion, 9.9609375)
|
||||
|
||||
def test_effective_overload(self):
|
||||
rb = ring.RingBuilder(8, 3, 1)
|
||||
|
@ -3628,7 +3630,7 @@ class TestGetRequiredOverload(unittest.TestCase):
|
|||
# when overload can not change the outcome none is required
|
||||
self.assertEqual(0.0, rb.get_required_overload())
|
||||
# even though dispersion is terrible (in z1 particularly)
|
||||
self.assertEqual(100.0, rb.dispersion)
|
||||
self.assertEqual(20.0, rb.dispersion)
|
||||
|
||||
def test_one_big_guy_does_not_spoil_his_buddy(self):
|
||||
rb = ring.RingBuilder(8, 3, 0)
|
||||
|
@ -4363,5 +4365,158 @@ class TestGetRequiredOverload(unittest.TestCase):
|
|||
wr.items() if len(t) == tier_len})
|
||||
|
||||
|
||||
class TestRingBuilderDispersion(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.devs = ('d%s' % i for i in itertools.count())
|
||||
|
||||
def assertAlmostPartCount(self, counts, expected, delta=3):
|
||||
msgs = []
|
||||
failed = False
|
||||
for k, p in sorted(expected.items()):
|
||||
try:
|
||||
self.assertAlmostEqual(counts[k], p, delta=delta)
|
||||
except KeyError:
|
||||
self.fail('%r is missing the key %r' % (counts, k))
|
||||
except AssertionError:
|
||||
failed = True
|
||||
state = '!='
|
||||
else:
|
||||
state = 'ok'
|
||||
msgs.append('parts in %s was %s expected %s (%s)' % (
|
||||
k, counts[k], p, state))
|
||||
if failed:
|
||||
self.fail('some part counts not close enough '
|
||||
'to expected:\n' + '\n'.join(msgs))
|
||||
|
||||
def test_rebalance_dispersion(self):
|
||||
rb = ring.RingBuilder(8, 6, 0)
|
||||
|
||||
for i in range(6):
|
||||
rb.add_dev({'region': 0, 'zone': 0, 'ip': '127.0.0.1',
|
||||
'port': 6000, 'weight': 1.0,
|
||||
'device': next(self.devs)})
|
||||
rb.rebalance()
|
||||
self.assertEqual(0, rb.dispersion)
|
||||
|
||||
for z in range(2):
|
||||
for i in range(6):
|
||||
rb.add_dev({'region': 0, 'zone': z + 1, 'ip': '127.0.1.1',
|
||||
'port': 6000, 'weight': 1.0,
|
||||
'device': next(self.devs)})
|
||||
|
||||
self.assertAlmostPartCount(_partition_counts(rb, 'zone'),
|
||||
{0: 1536, 1: 0, 2: 0})
|
||||
rb.rebalance()
|
||||
self.assertEqual(rb.dispersion, 50.0)
|
||||
expected = {0: 1280, 1: 128, 2: 128}
|
||||
self.assertAlmostPartCount(_partition_counts(rb, 'zone'),
|
||||
expected)
|
||||
report = dict(utils.dispersion_report(
|
||||
rb, r'r\d+z\d+$', verbose=True)['graph'])
|
||||
counts = {int(k.split('z')[1]): d['placed_parts']
|
||||
for k, d in report.items()}
|
||||
self.assertAlmostPartCount(counts, expected)
|
||||
rb.rebalance()
|
||||
self.assertEqual(rb.dispersion, 33.333333333333336)
|
||||
expected = {0: 1024, 1: 256, 2: 256}
|
||||
self.assertAlmostPartCount(_partition_counts(rb, 'zone'),
|
||||
expected)
|
||||
report = dict(utils.dispersion_report(
|
||||
rb, r'r\d+z\d+$', verbose=True)['graph'])
|
||||
counts = {int(k.split('z')[1]): d['placed_parts']
|
||||
for k, d in report.items()}
|
||||
self.assertAlmostPartCount(counts, expected)
|
||||
rb.rebalance()
|
||||
self.assertEqual(rb.dispersion, 16.666666666666668)
|
||||
expected = {0: 768, 1: 384, 2: 384}
|
||||
self.assertAlmostPartCount(_partition_counts(rb, 'zone'),
|
||||
expected)
|
||||
report = dict(utils.dispersion_report(
|
||||
rb, r'r\d+z\d+$', verbose=True)['graph'])
|
||||
counts = {int(k.split('z')[1]): d['placed_parts']
|
||||
for k, d in report.items()}
|
||||
self.assertAlmostPartCount(counts, expected)
|
||||
rb.rebalance()
|
||||
self.assertEqual(0, rb.dispersion)
|
||||
expected = {0: 512, 1: 512, 2: 512}
|
||||
self.assertAlmostPartCount(_partition_counts(rb, 'zone'), expected)
|
||||
report = dict(utils.dispersion_report(
|
||||
rb, r'r\d+z\d+$', verbose=True)['graph'])
|
||||
counts = {int(k.split('z')[1]): d['placed_parts']
|
||||
for k, d in report.items()}
|
||||
self.assertAlmostPartCount(counts, expected)
|
||||
|
||||
def test_weight_dispersion(self):
|
||||
rb = ring.RingBuilder(8, 3, 0)
|
||||
|
||||
for i in range(2):
|
||||
for d in range(3):
|
||||
rb.add_dev({'region': 0, 'zone': 0, 'ip': '127.0.%s.1' % i,
|
||||
'port': 6000, 'weight': 1.0,
|
||||
'device': next(self.devs)})
|
||||
for d in range(3):
|
||||
rb.add_dev({'region': 0, 'zone': 0, 'ip': '127.0.2.1',
|
||||
'port': 6000, 'weight': 10.0,
|
||||
'device': next(self.devs)})
|
||||
|
||||
rb.rebalance()
|
||||
# each tier should only have 1 replicanth, but the big server has 2
|
||||
# replicas of every part and 3 replicas another 1/2 - so our total
|
||||
# dispersion is greater than one replicanth, it's 1.5
|
||||
self.assertEqual(50.0, rb.dispersion)
|
||||
expected = {
|
||||
'127.0.0.1': 64,
|
||||
'127.0.1.1': 64,
|
||||
'127.0.2.1': 640,
|
||||
}
|
||||
self.assertAlmostPartCount(_partition_counts(rb, 'ip'),
|
||||
expected)
|
||||
report = dict(utils.dispersion_report(
|
||||
rb, r'r\d+z\d+-[^/]*$', verbose=True)['graph'])
|
||||
counts = {k.split('-')[1]: d['placed_parts']
|
||||
for k, d in report.items()}
|
||||
self.assertAlmostPartCount(counts, expected)
|
||||
|
||||
def test_multiple_tier_dispersion(self):
|
||||
rb = ring.RingBuilder(10, 8, 0)
|
||||
r_z_to_ip_count = {
|
||||
(0, 0): 2,
|
||||
(1, 1): 1,
|
||||
(1, 2): 2,
|
||||
}
|
||||
ip_index = 0
|
||||
for (r, z), ip_count in sorted(r_z_to_ip_count.items()):
|
||||
for i in range(ip_count):
|
||||
ip_index += 1
|
||||
for d in range(3):
|
||||
rb.add_dev({'region': r, 'zone': z,
|
||||
'ip': '127.%s.%s.%s' % (r, z, ip_index),
|
||||
'port': 6000, 'weight': 1.0,
|
||||
'device': next(self.devs)})
|
||||
|
||||
for i in range(3):
|
||||
# it might take a few rebalances for all the right part replicas to
|
||||
# balance from r1z2 into r1z1
|
||||
rb.rebalance()
|
||||
self.assertAlmostEqual(15.52734375, rb.dispersion, delta=5.0)
|
||||
self.assertAlmostEqual(0.0, rb.get_balance(), delta=0.5)
|
||||
expected = {
|
||||
'127.0.0.1': 1638,
|
||||
'127.0.0.2': 1638,
|
||||
'127.1.1.3': 1638,
|
||||
'127.1.2.4': 1638,
|
||||
'127.1.2.5': 1638,
|
||||
}
|
||||
delta = 10
|
||||
self.assertAlmostPartCount(_partition_counts(rb, 'ip'), expected,
|
||||
delta=delta)
|
||||
report = dict(utils.dispersion_report(
|
||||
rb, r'r\d+z\d+-[^/]*$', verbose=True)['graph'])
|
||||
counts = {k.split('-')[1]: d['placed_parts']
|
||||
for k, d in report.items()}
|
||||
self.assertAlmostPartCount(counts, expected, delta=delta)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
|
|
@ -14,7 +14,9 @@
|
|||
# limitations under the License.
|
||||
|
||||
import unittest
|
||||
from collections import defaultdict
|
||||
|
||||
from swift.common import exceptions
|
||||
from swift.common import ring
|
||||
from swift.common.ring.utils import (tiers_for_dev, build_tier_tree,
|
||||
validate_and_normalize_ip,
|
||||
|
@ -26,7 +28,8 @@ from swift.common.ring.utils import (tiers_for_dev, build_tier_tree,
|
|||
validate_args, parse_args,
|
||||
parse_builder_ring_filename_args,
|
||||
build_dev_from_opts, dispersion_report,
|
||||
parse_address, get_tier_name, pretty_dev)
|
||||
parse_address, get_tier_name, pretty_dev,
|
||||
validate_replicas_by_tier)
|
||||
|
||||
|
||||
class TestUtils(unittest.TestCase):
|
||||
|
@ -172,6 +175,81 @@ class TestUtils(unittest.TestCase):
|
|||
self.assertRaises(ValueError,
|
||||
validate_and_normalize_address, hostname)
|
||||
|
||||
def test_validate_replicas_by_tier_close(self):
|
||||
one_ip_six_devices = \
|
||||
defaultdict(float,
|
||||
{(): 4.0,
|
||||
(0,): 4.0,
|
||||
(0, 0): 4.0,
|
||||
(0, 0, '127.0.0.1'): 4.0,
|
||||
(0, 0, '127.0.0.1', 0): 0.6666666670,
|
||||
(0, 0, '127.0.0.1', 1): 0.6666666668,
|
||||
(0, 0, '127.0.0.1', 2): 0.6666666667,
|
||||
(0, 0, '127.0.0.1', 3): 0.6666666666,
|
||||
(0, 0, '127.0.0.1', 4): 0.6666666665,
|
||||
(0, 0, '127.0.0.1', 5): 0.6666666664,
|
||||
})
|
||||
try:
|
||||
validate_replicas_by_tier(4, one_ip_six_devices)
|
||||
except Exception as e:
|
||||
self.fail('one_ip_six_devices is invalid for %s' % e)
|
||||
|
||||
def test_validate_replicas_by_tier_exact(self):
|
||||
three_regions_three_devices = \
|
||||
defaultdict(float,
|
||||
{(): 3.0,
|
||||
(0,): 1.0,
|
||||
(0, 0): 1.0,
|
||||
(0, 0, '127.0.0.1'): 1.0,
|
||||
(0, 0, '127.0.0.1', 0): 1.0,
|
||||
(1,): 1.0,
|
||||
(1, 1): 1.0,
|
||||
(1, 1, '127.0.0.1'): 1.0,
|
||||
(1, 1, '127.0.0.1', 1): 1.0,
|
||||
(2,): 1.0,
|
||||
(2, 2): 1.0,
|
||||
(2, 2, '127.0.0.1'): 1.0,
|
||||
(2, 2, '127.0.0.1', 2): 1.0,
|
||||
})
|
||||
try:
|
||||
validate_replicas_by_tier(3, three_regions_three_devices)
|
||||
except Exception as e:
|
||||
self.fail('three_regions_three_devices is invalid for %s' % e)
|
||||
|
||||
def test_validate_replicas_by_tier_errors(self):
|
||||
pseudo_replicas = \
|
||||
defaultdict(float,
|
||||
{(): 3.0,
|
||||
(0,): 1.0,
|
||||
(0, 0): 1.0,
|
||||
(0, 0, '127.0.0.1'): 1.0,
|
||||
(0, 0, '127.0.0.1', 0): 1.0,
|
||||
(1,): 1.0,
|
||||
(1, 1): 1.0,
|
||||
(1, 1, '127.0.0.1'): 1.0,
|
||||
(1, 1, '127.0.0.1', 1): 1.0,
|
||||
(2,): 1.0,
|
||||
(2, 2): 1.0,
|
||||
(2, 2, '127.0.0.1'): 1.0,
|
||||
(2, 2, '127.0.0.1', 2): 1.0,
|
||||
})
|
||||
|
||||
def do_test(bad_tier_key, bad_tier_name):
|
||||
# invalidate a copy of pseudo_replicas at given key and check for
|
||||
# an exception to be raised
|
||||
test_replicas = dict(pseudo_replicas)
|
||||
test_replicas[bad_tier_key] += 0.1 # <- this is not fair!
|
||||
with self.assertRaises(exceptions.RingValidationError) as ctx:
|
||||
validate_replicas_by_tier(3, test_replicas)
|
||||
self.assertEqual(
|
||||
'3.1 != 3 at tier %s' % bad_tier_name, str(ctx.exception))
|
||||
|
||||
do_test((), 'cluster')
|
||||
do_test((1,), 'regions')
|
||||
do_test((0, 0), 'zones')
|
||||
do_test((2, 2, '127.0.0.1'), 'servers')
|
||||
do_test((1, 1, '127.0.0.1', 1), 'devices')
|
||||
|
||||
def test_parse_search_value(self):
|
||||
res = parse_search_value('r0')
|
||||
self.assertEqual(res, {'region': 0})
|
||||
|
@ -619,10 +697,10 @@ class TestUtils(unittest.TestCase):
|
|||
rb.rebalance(seed=100)
|
||||
rb.validate()
|
||||
|
||||
self.assertEqual(rb.dispersion, 55.46875)
|
||||
self.assertEqual(rb.dispersion, 18.489583333333332)
|
||||
report = dispersion_report(rb)
|
||||
self.assertEqual(report['worst_tier'], 'r1z1')
|
||||
self.assertEqual(report['max_dispersion'], 44.921875)
|
||||
self.assertEqual(report['worst_tier'], 'r1z1-127.0.0.1')
|
||||
self.assertEqual(report['max_dispersion'], 22.68370607028754)
|
||||
|
||||
def build_tier_report(max_replicas, placed_parts, dispersion,
|
||||
replicas):
|
||||
|
@ -633,17 +711,15 @@ class TestUtils(unittest.TestCase):
|
|||
'replicas': replicas,
|
||||
}
|
||||
|
||||
# Each node should store less than or equal to 256 partitions to
|
||||
# avoid multiple replicas.
|
||||
# 2/5 of total weight * 768 ~= 307 -> 51 partitions on each node in
|
||||
# zone 1 are stored at least twice on the nodes
|
||||
# every partition has at least two replicas in this zone, unfortunately
|
||||
# sometimes they're both on the same server.
|
||||
expected = [
|
||||
['r1z1', build_tier_report(
|
||||
2, 256, 44.921875, [0, 0, 141, 115])],
|
||||
2, 627, 18.341307814992025, [0, 0, 141, 115])],
|
||||
['r1z1-127.0.0.1', build_tier_report(
|
||||
1, 242, 29.33884297520661, [14, 171, 71, 0])],
|
||||
1, 313, 22.68370607028754, [14, 171, 71, 0])],
|
||||
['r1z1-127.0.0.2', build_tier_report(
|
||||
1, 243, 29.218106995884774, [13, 172, 71, 0])],
|
||||
1, 314, 22.611464968152866, [13, 172, 71, 0])],
|
||||
]
|
||||
report = dispersion_report(rb, 'r1z1[^/]*$', verbose=True)
|
||||
graph = report['graph']
|
||||
|
@ -668,15 +744,15 @@ class TestUtils(unittest.TestCase):
|
|||
# can't move all the part-replicas in one rebalance
|
||||
rb.rebalance(seed=100)
|
||||
report = dispersion_report(rb, verbose=True)
|
||||
self.assertEqual(rb.dispersion, 11.71875)
|
||||
self.assertEqual(rb.dispersion, 3.90625)
|
||||
self.assertEqual(report['worst_tier'], 'r1z1-127.0.0.2')
|
||||
self.assertEqual(report['max_dispersion'], 8.875739644970414)
|
||||
self.assertEqual(report['max_dispersion'], 8.152173913043478)
|
||||
# do a sencond rebalance
|
||||
rb.rebalance(seed=100)
|
||||
report = dispersion_report(rb, verbose=True)
|
||||
self.assertEqual(rb.dispersion, 50.0)
|
||||
self.assertEqual(rb.dispersion, 16.666666666666668)
|
||||
self.assertEqual(report['worst_tier'], 'r1z0-127.0.0.3')
|
||||
self.assertEqual(report['max_dispersion'], 50.0)
|
||||
self.assertEqual(report['max_dispersion'], 33.333333333333336)
|
||||
|
||||
# ... but overload can square it
|
||||
rb.set_overload(rb.get_required_overload())
|
||||
|
|
|
@ -19,7 +19,6 @@ import tempfile
|
|||
import time
|
||||
|
||||
from six.moves import range
|
||||
from test import safe_repr
|
||||
from test.unit import mock_check_drive
|
||||
|
||||
from swift.common.swob import Request, HTTPException
|
||||
|
@ -30,14 +29,6 @@ from swift.common.constraints import MAX_OBJECT_NAME_LENGTH
|
|||
|
||||
|
||||
class TestConstraints(unittest.TestCase):
|
||||
|
||||
def assertIn(self, member, container, msg=None):
|
||||
"""Copied from 2.7"""
|
||||
if member not in container:
|
||||
standardMsg = '%s not found in %s' % (safe_repr(member),
|
||||
safe_repr(container))
|
||||
self.fail(self._formatMessage(msg, standardMsg))
|
||||
|
||||
def test_check_metadata_empty(self):
|
||||
headers = {}
|
||||
self.assertIsNone(constraints.check_metadata(Request.blank(
|
||||
|
@ -145,49 +136,57 @@ class TestConstraints(unittest.TestCase):
|
|||
|
||||
def test_check_object_creation_content_length(self):
|
||||
headers = {'Content-Length': str(constraints.MAX_FILE_SIZE),
|
||||
'Content-Type': 'text/plain'}
|
||||
'Content-Type': 'text/plain',
|
||||
'X-Timestamp': str(time.time())}
|
||||
self.assertIsNone(constraints.check_object_creation(Request.blank(
|
||||
'/', headers=headers), 'object_name'))
|
||||
|
||||
headers = {'Content-Length': str(constraints.MAX_FILE_SIZE + 1),
|
||||
'Content-Type': 'text/plain'}
|
||||
'Content-Type': 'text/plain',
|
||||
'X-Timestamp': str(time.time())}
|
||||
resp = constraints.check_object_creation(
|
||||
Request.blank('/', headers=headers), 'object_name')
|
||||
self.assertEqual(resp.status_int, HTTP_REQUEST_ENTITY_TOO_LARGE)
|
||||
|
||||
headers = {'Transfer-Encoding': 'chunked',
|
||||
'Content-Type': 'text/plain'}
|
||||
'Content-Type': 'text/plain',
|
||||
'X-Timestamp': str(time.time())}
|
||||
self.assertIsNone(constraints.check_object_creation(Request.blank(
|
||||
'/', headers=headers), 'object_name'))
|
||||
|
||||
headers = {'Transfer-Encoding': 'gzip',
|
||||
'Content-Type': 'text/plain'}
|
||||
'Content-Type': 'text/plain',
|
||||
'X-Timestamp': str(time.time())}
|
||||
resp = constraints.check_object_creation(Request.blank(
|
||||
'/', headers=headers), 'object_name')
|
||||
self.assertEqual(resp.status_int, HTTP_BAD_REQUEST)
|
||||
self.assertIn('Invalid Transfer-Encoding header value', resp.body)
|
||||
|
||||
headers = {'Content-Type': 'text/plain'}
|
||||
headers = {'Content-Type': 'text/plain',
|
||||
'X-Timestamp': str(time.time())}
|
||||
resp = constraints.check_object_creation(
|
||||
Request.blank('/', headers=headers), 'object_name')
|
||||
self.assertEqual(resp.status_int, HTTP_LENGTH_REQUIRED)
|
||||
|
||||
headers = {'Content-Length': 'abc',
|
||||
'Content-Type': 'text/plain'}
|
||||
'Content-Type': 'text/plain',
|
||||
'X-Timestamp': str(time.time())}
|
||||
resp = constraints.check_object_creation(Request.blank(
|
||||
'/', headers=headers), 'object_name')
|
||||
self.assertEqual(resp.status_int, HTTP_BAD_REQUEST)
|
||||
self.assertIn('Invalid Content-Length header value', resp.body)
|
||||
|
||||
headers = {'Transfer-Encoding': 'gzip,chunked',
|
||||
'Content-Type': 'text/plain'}
|
||||
'Content-Type': 'text/plain',
|
||||
'X-Timestamp': str(time.time())}
|
||||
resp = constraints.check_object_creation(Request.blank(
|
||||
'/', headers=headers), 'object_name')
|
||||
self.assertEqual(resp.status_int, HTTP_NOT_IMPLEMENTED)
|
||||
|
||||
def test_check_object_creation_name_length(self):
|
||||
headers = {'Transfer-Encoding': 'chunked',
|
||||
'Content-Type': 'text/plain'}
|
||||
'Content-Type': 'text/plain',
|
||||
'X-Timestamp': str(time.time())}
|
||||
name = 'o' * constraints.MAX_OBJECT_NAME_LENGTH
|
||||
self.assertIsNone(constraints.check_object_creation(Request.blank(
|
||||
'/', headers=headers), name))
|
||||
|
@ -202,11 +201,13 @@ class TestConstraints(unittest.TestCase):
|
|||
|
||||
def test_check_object_creation_content_type(self):
|
||||
headers = {'Transfer-Encoding': 'chunked',
|
||||
'Content-Type': 'text/plain'}
|
||||
'Content-Type': 'text/plain',
|
||||
'X-Timestamp': str(time.time())}
|
||||
self.assertIsNone(constraints.check_object_creation(Request.blank(
|
||||
'/', headers=headers), 'object_name'))
|
||||
|
||||
headers = {'Transfer-Encoding': 'chunked'}
|
||||
headers = {'Transfer-Encoding': 'chunked',
|
||||
'X-Timestamp': str(time.time())}
|
||||
resp = constraints.check_object_creation(
|
||||
Request.blank('/', headers=headers), 'object_name')
|
||||
self.assertEqual(resp.status_int, HTTP_BAD_REQUEST)
|
||||
|
@ -214,140 +215,171 @@ class TestConstraints(unittest.TestCase):
|
|||
|
||||
def test_check_object_creation_bad_content_type(self):
|
||||
headers = {'Transfer-Encoding': 'chunked',
|
||||
'Content-Type': '\xff\xff'}
|
||||
'Content-Type': '\xff\xff',
|
||||
'X-Timestamp': str(time.time())}
|
||||
resp = constraints.check_object_creation(
|
||||
Request.blank('/', headers=headers), 'object_name')
|
||||
self.assertEqual(resp.status_int, HTTP_BAD_REQUEST)
|
||||
self.assertTrue('Content-Type' in resp.body)
|
||||
self.assertIn('Content-Type', resp.body)
|
||||
|
||||
def test_check_object_creation_bad_delete_headers(self):
|
||||
headers = {'Transfer-Encoding': 'chunked',
|
||||
'Content-Type': 'text/plain',
|
||||
'X-Delete-After': 'abc'}
|
||||
'X-Delete-After': 'abc',
|
||||
'X-Timestamp': str(time.time())}
|
||||
resp = constraints.check_object_creation(
|
||||
Request.blank('/', headers=headers), 'object_name')
|
||||
self.assertEqual(resp.status_int, HTTP_BAD_REQUEST)
|
||||
self.assertTrue('Non-integer X-Delete-After' in resp.body)
|
||||
self.assertIn('Non-integer X-Delete-After', resp.body)
|
||||
|
||||
t = str(int(time.time() - 60))
|
||||
headers = {'Transfer-Encoding': 'chunked',
|
||||
'Content-Type': 'text/plain',
|
||||
'X-Delete-At': t}
|
||||
'X-Delete-At': t,
|
||||
'X-Timestamp': str(time.time())}
|
||||
resp = constraints.check_object_creation(
|
||||
Request.blank('/', headers=headers), 'object_name')
|
||||
self.assertEqual(resp.status_int, HTTP_BAD_REQUEST)
|
||||
self.assertTrue('X-Delete-At in past' in resp.body)
|
||||
self.assertIn('X-Delete-At in past', resp.body)
|
||||
|
||||
def test_check_delete_headers(self):
|
||||
# x-delete-at value should be relative to the request timestamp rather
|
||||
# than time.time() so separate the two to ensure the checks are robust
|
||||
ts = utils.Timestamp(time.time() + 100)
|
||||
|
||||
# X-Delete-After
|
||||
headers = {'X-Delete-After': '60'}
|
||||
resp = constraints.check_delete_headers(
|
||||
headers = {'X-Delete-After': '600',
|
||||
'X-Timestamp': ts.internal}
|
||||
req = constraints.check_delete_headers(
|
||||
Request.blank('/', headers=headers))
|
||||
self.assertTrue(isinstance(resp, Request))
|
||||
self.assertTrue('x-delete-at' in resp.headers)
|
||||
self.assertIsInstance(req, Request)
|
||||
self.assertIn('x-delete-at', req.headers)
|
||||
self.assertNotIn('x-delete-after', req.headers)
|
||||
expected_delete_at = str(int(ts) + 600)
|
||||
self.assertEqual(req.headers.get('X-Delete-At'), expected_delete_at)
|
||||
|
||||
headers = {'X-Delete-After': 'abc'}
|
||||
try:
|
||||
resp = constraints.check_delete_headers(
|
||||
Request.blank('/', headers=headers))
|
||||
except HTTPException as e:
|
||||
self.assertEqual(e.status_int, HTTP_BAD_REQUEST)
|
||||
self.assertTrue('Non-integer X-Delete-After' in e.body)
|
||||
else:
|
||||
self.fail("Should have failed with HTTPBadRequest")
|
||||
headers = {'X-Delete-After': 'abc',
|
||||
'X-Timestamp': ts.internal}
|
||||
|
||||
headers = {'X-Delete-After': '60.1'}
|
||||
try:
|
||||
resp = constraints.check_delete_headers(
|
||||
with self.assertRaises(HTTPException) as cm:
|
||||
constraints.check_delete_headers(
|
||||
Request.blank('/', headers=headers))
|
||||
except HTTPException as e:
|
||||
self.assertEqual(e.status_int, HTTP_BAD_REQUEST)
|
||||
self.assertTrue('Non-integer X-Delete-After' in e.body)
|
||||
else:
|
||||
self.fail("Should have failed with HTTPBadRequest")
|
||||
self.assertEqual(cm.exception.status_int, HTTP_BAD_REQUEST)
|
||||
self.assertIn('Non-integer X-Delete-After', cm.exception.body)
|
||||
|
||||
headers = {'X-Delete-After': '-1'}
|
||||
try:
|
||||
resp = constraints.check_delete_headers(
|
||||
headers = {'X-Delete-After': '60.1',
|
||||
'X-Timestamp': ts.internal}
|
||||
with self.assertRaises(HTTPException) as cm:
|
||||
constraints.check_delete_headers(
|
||||
Request.blank('/', headers=headers))
|
||||
except HTTPException as e:
|
||||
self.assertEqual(e.status_int, HTTP_BAD_REQUEST)
|
||||
self.assertTrue('X-Delete-After in past' in e.body)
|
||||
else:
|
||||
self.fail("Should have failed with HTTPBadRequest")
|
||||
self.assertEqual(cm.exception.status_int, HTTP_BAD_REQUEST)
|
||||
self.assertIn('Non-integer X-Delete-After', cm.exception.body)
|
||||
|
||||
headers = {'X-Delete-After': '-1',
|
||||
'X-Timestamp': ts.internal}
|
||||
with self.assertRaises(HTTPException) as cm:
|
||||
constraints.check_delete_headers(
|
||||
Request.blank('/', headers=headers))
|
||||
self.assertEqual(cm.exception.status_int, HTTP_BAD_REQUEST)
|
||||
self.assertIn('X-Delete-After in past', cm.exception.body)
|
||||
|
||||
headers = {'X-Delete-After': '0',
|
||||
'X-Timestamp': ts.internal}
|
||||
with self.assertRaises(HTTPException) as cm:
|
||||
constraints.check_delete_headers(
|
||||
Request.blank('/', headers=headers))
|
||||
self.assertEqual(cm.exception.status_int, HTTP_BAD_REQUEST)
|
||||
self.assertIn('X-Delete-After in past', cm.exception.body)
|
||||
|
||||
# x-delete-after = 0 disallowed when it results in x-delete-at equal to
|
||||
# the timestamp
|
||||
headers = {'X-Delete-After': '0',
|
||||
'X-Timestamp': utils.Timestamp(int(ts)).internal}
|
||||
with self.assertRaises(HTTPException) as cm:
|
||||
constraints.check_delete_headers(
|
||||
Request.blank('/', headers=headers))
|
||||
self.assertEqual(cm.exception.status_int, HTTP_BAD_REQUEST)
|
||||
self.assertIn('X-Delete-After in past', cm.exception.body)
|
||||
|
||||
# X-Delete-At
|
||||
t = str(int(time.time() + 100))
|
||||
headers = {'X-Delete-At': t}
|
||||
resp = constraints.check_delete_headers(
|
||||
delete_at = str(int(ts) + 100)
|
||||
headers = {'X-Delete-At': delete_at,
|
||||
'X-Timestamp': ts.internal}
|
||||
req = constraints.check_delete_headers(
|
||||
Request.blank('/', headers=headers))
|
||||
self.assertTrue(isinstance(resp, Request))
|
||||
self.assertTrue('x-delete-at' in resp.headers)
|
||||
self.assertEqual(resp.headers.get('X-Delete-At'), t)
|
||||
self.assertIsInstance(req, Request)
|
||||
self.assertIn('x-delete-at', req.headers)
|
||||
self.assertEqual(req.headers.get('X-Delete-At'), delete_at)
|
||||
|
||||
headers = {'X-Delete-At': 'abc'}
|
||||
try:
|
||||
resp = constraints.check_delete_headers(
|
||||
headers = {'X-Delete-At': 'abc',
|
||||
'X-Timestamp': ts.internal}
|
||||
with self.assertRaises(HTTPException) as cm:
|
||||
constraints.check_delete_headers(
|
||||
Request.blank('/', headers=headers))
|
||||
except HTTPException as e:
|
||||
self.assertEqual(e.status_int, HTTP_BAD_REQUEST)
|
||||
self.assertTrue('Non-integer X-Delete-At' in e.body)
|
||||
else:
|
||||
self.fail("Should have failed with HTTPBadRequest")
|
||||
self.assertEqual(cm.exception.status_int, HTTP_BAD_REQUEST)
|
||||
self.assertIn('Non-integer X-Delete-At', cm.exception.body)
|
||||
|
||||
t = str(int(time.time() + 100)) + '.1'
|
||||
headers = {'X-Delete-At': t}
|
||||
try:
|
||||
resp = constraints.check_delete_headers(
|
||||
delete_at = str(int(ts) + 100) + '.1'
|
||||
headers = {'X-Delete-At': delete_at,
|
||||
'X-Timestamp': ts.internal}
|
||||
with self.assertRaises(HTTPException) as cm:
|
||||
constraints.check_delete_headers(
|
||||
Request.blank('/', headers=headers))
|
||||
except HTTPException as e:
|
||||
self.assertEqual(e.status_int, HTTP_BAD_REQUEST)
|
||||
self.assertTrue('Non-integer X-Delete-At' in e.body)
|
||||
else:
|
||||
self.fail("Should have failed with HTTPBadRequest")
|
||||
self.assertEqual(cm.exception.status_int, HTTP_BAD_REQUEST)
|
||||
self.assertIn('Non-integer X-Delete-At', cm.exception.body)
|
||||
|
||||
t = str(int(time.time()))
|
||||
headers = {'X-Delete-At': t}
|
||||
try:
|
||||
resp = constraints.check_delete_headers(
|
||||
delete_at = str(int(ts) - 1)
|
||||
headers = {'X-Delete-At': delete_at,
|
||||
'X-Timestamp': ts.internal}
|
||||
with self.assertRaises(HTTPException) as cm:
|
||||
constraints.check_delete_headers(
|
||||
Request.blank('/', headers=headers))
|
||||
except HTTPException as e:
|
||||
self.assertEqual(e.status_int, HTTP_BAD_REQUEST)
|
||||
self.assertTrue('X-Delete-At in past' in e.body)
|
||||
else:
|
||||
self.fail("Should have failed with HTTPBadRequest")
|
||||
self.assertEqual(cm.exception.status_int, HTTP_BAD_REQUEST)
|
||||
self.assertIn('X-Delete-At in past', cm.exception.body)
|
||||
|
||||
t = str(int(time.time() - 1))
|
||||
headers = {'X-Delete-At': t}
|
||||
try:
|
||||
resp = constraints.check_delete_headers(
|
||||
# x-delete-at disallowed when exactly equal to timestamp
|
||||
delete_at = str(int(ts))
|
||||
headers = {'X-Delete-At': delete_at,
|
||||
'X-Timestamp': utils.Timestamp(int(ts)).internal}
|
||||
with self.assertRaises(HTTPException) as cm:
|
||||
constraints.check_delete_headers(
|
||||
Request.blank('/', headers=headers))
|
||||
except HTTPException as e:
|
||||
self.assertEqual(e.status_int, HTTP_BAD_REQUEST)
|
||||
self.assertTrue('X-Delete-At in past' in e.body)
|
||||
else:
|
||||
self.fail("Should have failed with HTTPBadRequest")
|
||||
self.assertEqual(cm.exception.status_int, HTTP_BAD_REQUEST)
|
||||
self.assertIn('X-Delete-At in past', cm.exception.body)
|
||||
|
||||
def test_check_delete_headers_removes_delete_after(self):
|
||||
ts = utils.Timestamp.now()
|
||||
headers = {'Content-Length': '0',
|
||||
'Content-Type': 'text/plain',
|
||||
'X-Delete-After': '42',
|
||||
'X-Delete-At': str(int(ts) + 40),
|
||||
'X-Timestamp': ts.internal}
|
||||
req = Request.blank('/', headers=headers)
|
||||
constraints.check_delete_headers(req)
|
||||
self.assertNotIn('X-Delete-After', req.headers)
|
||||
self.assertEqual(req.headers['X-Delete-At'], str(int(ts) + 42))
|
||||
|
||||
def test_check_delete_headers_sets_delete_at(self):
|
||||
t = time.time() + 1000
|
||||
t = time.time()
|
||||
expected = str(int(t) + 1000)
|
||||
# check delete-at is passed through
|
||||
headers = {'Content-Length': '0',
|
||||
'Content-Type': 'text/plain',
|
||||
'X-Delete-At': str(int(t))}
|
||||
'X-Delete-At': expected,
|
||||
'X-Timestamp': str(t)}
|
||||
req = Request.blank('/', headers=headers)
|
||||
constraints.check_delete_headers(req)
|
||||
self.assertTrue('X-Delete-At' in req.headers)
|
||||
self.assertEqual(req.headers['X-Delete-At'], str(int(t)))
|
||||
self.assertIn('X-Delete-At', req.headers)
|
||||
self.assertEqual(req.headers['X-Delete-At'], expected)
|
||||
|
||||
# check delete-after is converted to delete-at
|
||||
headers = {'Content-Length': '0',
|
||||
'Content-Type': 'text/plain',
|
||||
'X-Delete-After': '42'}
|
||||
'X-Delete-After': '42',
|
||||
'X-Timestamp': str(t)}
|
||||
req = Request.blank('/', headers=headers)
|
||||
with mock.patch('time.time', lambda: t):
|
||||
constraints.check_delete_headers(req)
|
||||
self.assertTrue('X-Delete-At' in req.headers)
|
||||
constraints.check_delete_headers(req)
|
||||
self.assertIn('X-Delete-At', req.headers)
|
||||
expected = str(int(t) + 42)
|
||||
self.assertEqual(req.headers['X-Delete-At'], expected)
|
||||
|
||||
|
@ -355,21 +387,21 @@ class TestConstraints(unittest.TestCase):
|
|||
headers = {'Content-Length': '0',
|
||||
'Content-Type': 'text/plain',
|
||||
'X-Delete-After': '42',
|
||||
'X-Delete-At': str(int(t) + 40)}
|
||||
'X-Delete-At': str(int(t) + 40),
|
||||
'X-Timestamp': str(t)}
|
||||
req = Request.blank('/', headers=headers)
|
||||
with mock.patch('time.time', lambda: t):
|
||||
constraints.check_delete_headers(req)
|
||||
self.assertTrue('X-Delete-At' in req.headers)
|
||||
constraints.check_delete_headers(req)
|
||||
self.assertIn('X-Delete-At', req.headers)
|
||||
self.assertEqual(req.headers['X-Delete-At'], expected)
|
||||
|
||||
headers = {'Content-Length': '0',
|
||||
'Content-Type': 'text/plain',
|
||||
'X-Delete-After': '42',
|
||||
'X-Delete-At': str(int(t) + 44)}
|
||||
'X-Delete-At': str(int(t) + 44),
|
||||
'X-Timestamp': str(t)}
|
||||
req = Request.blank('/', headers=headers)
|
||||
with mock.patch('time.time', lambda: t):
|
||||
constraints.check_delete_headers(req)
|
||||
self.assertTrue('X-Delete-At' in req.headers)
|
||||
constraints.check_delete_headers(req)
|
||||
self.assertIn('X-Delete-At', req.headers)
|
||||
self.assertEqual(req.headers['X-Delete-At'], expected)
|
||||
|
||||
def test_check_drive_invalid_path(self):
|
||||
|
@ -473,10 +505,10 @@ class TestConstraints(unittest.TestCase):
|
|||
|
||||
def test_validate_constraints(self):
|
||||
c = constraints
|
||||
self.assertTrue(c.MAX_META_OVERALL_SIZE > c.MAX_META_NAME_LENGTH)
|
||||
self.assertTrue(c.MAX_META_OVERALL_SIZE > c.MAX_META_VALUE_LENGTH)
|
||||
self.assertTrue(c.MAX_HEADER_SIZE > c.MAX_META_NAME_LENGTH)
|
||||
self.assertTrue(c.MAX_HEADER_SIZE > c.MAX_META_VALUE_LENGTH)
|
||||
self.assertGreater(c.MAX_META_OVERALL_SIZE, c.MAX_META_NAME_LENGTH)
|
||||
self.assertGreater(c.MAX_META_OVERALL_SIZE, c.MAX_META_VALUE_LENGTH)
|
||||
self.assertGreater(c.MAX_HEADER_SIZE, c.MAX_META_NAME_LENGTH)
|
||||
self.assertGreater(c.MAX_HEADER_SIZE, c.MAX_META_VALUE_LENGTH)
|
||||
|
||||
def test_check_account_format(self):
|
||||
req = Request.blank(
|
||||
|
@ -501,14 +533,11 @@ class TestConstraints(unittest.TestCase):
|
|||
req = Request.blank(
|
||||
'/v/a/c/o', headers={
|
||||
'X-Versions-Location': versions_location})
|
||||
try:
|
||||
with self.assertRaises(HTTPException) as cm:
|
||||
constraints.check_container_format(
|
||||
req, req.headers['X-Versions-Location'])
|
||||
except HTTPException as e:
|
||||
self.assertTrue(e.body.startswith('Container name cannot'))
|
||||
else:
|
||||
self.fail('check_container_format did not raise error for %r' %
|
||||
req.headers['X-Versions-Location'])
|
||||
self.assertTrue(cm.exception.body.startswith(
|
||||
'Container name cannot'))
|
||||
|
||||
def test_valid_api_version(self):
|
||||
version = 'v1'
|
||||
|
|
|
@ -19,6 +19,7 @@ import unittest
|
|||
import zlib
|
||||
from textwrap import dedent
|
||||
import os
|
||||
from itertools import izip_longest
|
||||
|
||||
import six
|
||||
from six import StringIO
|
||||
|
@ -28,9 +29,11 @@ from test.unit import FakeLogger
|
|||
from swift.common import exceptions, internal_client, swob
|
||||
from swift.common.header_key_dict import HeaderKeyDict
|
||||
from swift.common.storage_policy import StoragePolicy
|
||||
from swift.common.middleware.proxy_logging import ProxyLoggingMiddleware
|
||||
|
||||
from test.unit import with_tempdir, write_fake_ring, patch_policies
|
||||
from test.unit.common.middleware.helpers import FakeSwift
|
||||
from test.unit import with_tempdir, write_fake_ring, patch_policies, \
|
||||
DebugLogger
|
||||
from test.unit.common.middleware.helpers import FakeSwift, LeakTrackingIter
|
||||
|
||||
if six.PY3:
|
||||
from eventlet.green.urllib import request as urllib2
|
||||
|
@ -136,19 +139,23 @@ class SetMetadataInternalClient(internal_client.InternalClient):
|
|||
|
||||
class IterInternalClient(internal_client.InternalClient):
|
||||
def __init__(
|
||||
self, test, path, marker, end_marker, acceptable_statuses, items):
|
||||
self, test, path, marker, end_marker, prefix, acceptable_statuses,
|
||||
items):
|
||||
self.test = test
|
||||
self.path = path
|
||||
self.marker = marker
|
||||
self.end_marker = end_marker
|
||||
self.prefix = prefix
|
||||
self.acceptable_statuses = acceptable_statuses
|
||||
self.items = items
|
||||
|
||||
def _iter_items(
|
||||
self, path, marker='', end_marker='', acceptable_statuses=None):
|
||||
self, path, marker='', end_marker='', prefix='',
|
||||
acceptable_statuses=None):
|
||||
self.test.assertEqual(self.path, path)
|
||||
self.test.assertEqual(self.marker, marker)
|
||||
self.test.assertEqual(self.end_marker, end_marker)
|
||||
self.test.assertEqual(self.prefix, prefix)
|
||||
self.test.assertEqual(self.acceptable_statuses, acceptable_statuses)
|
||||
for item in self.items:
|
||||
yield item
|
||||
|
@ -434,6 +441,91 @@ class TestInternalClient(unittest.TestCase):
|
|||
self.assertEqual(client.env['PATH_INFO'], path)
|
||||
self.assertEqual(client.env['HTTP_X_TEST'], path)
|
||||
|
||||
def test_make_request_error_case(self):
|
||||
class InternalClient(internal_client.InternalClient):
|
||||
def __init__(self):
|
||||
self.logger = DebugLogger()
|
||||
# wrap the fake app with ProxyLoggingMiddleware
|
||||
self.app = ProxyLoggingMiddleware(
|
||||
self.fake_app, {}, self.logger)
|
||||
self.user_agent = 'some_agent'
|
||||
self.request_tries = 3
|
||||
|
||||
def fake_app(self, env, start_response):
|
||||
body = 'fake error response'
|
||||
start_response('409 Conflict',
|
||||
[('Content-Length', str(len(body)))])
|
||||
return [body]
|
||||
|
||||
client = InternalClient()
|
||||
with self.assertRaises(internal_client.UnexpectedResponse), \
|
||||
mock.patch('swift.common.internal_client.sleep'):
|
||||
client.make_request('DELETE', '/container', {}, (200,))
|
||||
|
||||
# Since we didn't provide an X-Timestamp, retrying gives us a chance to
|
||||
# succeed (assuming the failure was due to clock skew between servers)
|
||||
expected = (' HTTP/1.0 409 ', ' HTTP/1.0 409 ', ' HTTP/1.0 409 ', )
|
||||
loglines = client.logger.get_lines_for_level('info')
|
||||
for expected, logline in izip_longest(expected, loglines):
|
||||
if not expected:
|
||||
self.fail('Unexpected extra log line: %r' % logline)
|
||||
self.assertIn(expected, logline)
|
||||
|
||||
def test_make_request_acceptable_status_not_2xx(self):
|
||||
class InternalClient(internal_client.InternalClient):
|
||||
def __init__(self, resp_status):
|
||||
self.logger = DebugLogger()
|
||||
# wrap the fake app with ProxyLoggingMiddleware
|
||||
self.app = ProxyLoggingMiddleware(
|
||||
self.fake_app, {}, self.logger)
|
||||
self.user_agent = 'some_agent'
|
||||
self.resp_status = resp_status
|
||||
self.request_tries = 3
|
||||
self.closed_paths = []
|
||||
|
||||
def fake_app(self, env, start_response):
|
||||
body = 'fake error response'
|
||||
start_response(self.resp_status,
|
||||
[('Content-Length', str(len(body)))])
|
||||
return LeakTrackingIter(body, self.closed_paths.append,
|
||||
env['PATH_INFO'])
|
||||
|
||||
def do_test(resp_status):
|
||||
client = InternalClient(resp_status)
|
||||
with self.assertRaises(internal_client.UnexpectedResponse) as ctx, \
|
||||
mock.patch('swift.common.internal_client.sleep'):
|
||||
# This is obvious strange tests to expect only 400 Bad Request
|
||||
# but this test intended to avoid extra body drain if it's
|
||||
# correct object body with 2xx.
|
||||
client.make_request('GET', '/cont/obj', {}, (400,))
|
||||
loglines = client.logger.get_lines_for_level('info')
|
||||
return client.closed_paths, ctx.exception.resp, loglines
|
||||
|
||||
closed_paths, resp, loglines = do_test('200 OK')
|
||||
# Since the 200 is considered "properly handled", it won't be retried
|
||||
self.assertEqual(closed_paths, [])
|
||||
# ...and it'll be on us (the caller) to close (for example, by using
|
||||
# swob.Response's body property)
|
||||
self.assertEqual(resp.body, 'fake error response')
|
||||
self.assertEqual(closed_paths, ['/cont/obj'])
|
||||
|
||||
expected = (' HTTP/1.0 200 ', )
|
||||
for expected, logline in izip_longest(expected, loglines):
|
||||
if not expected:
|
||||
self.fail('Unexpected extra log line: %r' % logline)
|
||||
self.assertIn(expected, logline)
|
||||
|
||||
closed_paths, resp, loglines = do_test('503 Service Unavailable')
|
||||
# But since 5xx is neither "properly handled" not likely to include
|
||||
# a large body, it will be retried and responses will already be closed
|
||||
self.assertEqual(closed_paths, ['/cont/obj'] * 3)
|
||||
|
||||
expected = (' HTTP/1.0 503 ', ' HTTP/1.0 503 ', ' HTTP/1.0 503 ', )
|
||||
for expected, logline in izip_longest(expected, loglines):
|
||||
if not expected:
|
||||
self.fail('Unexpected extra log line: %r' % logline)
|
||||
self.assertIn(expected, logline)
|
||||
|
||||
def test_make_request_codes(self):
|
||||
class InternalClient(internal_client.InternalClient):
|
||||
def __init__(self):
|
||||
|
@ -487,30 +579,33 @@ class TestInternalClient(unittest.TestCase):
|
|||
self.test.assertEqual(0, whence)
|
||||
|
||||
class InternalClient(internal_client.InternalClient):
|
||||
def __init__(self):
|
||||
def __init__(self, status):
|
||||
self.app = self.fake_app
|
||||
self.user_agent = 'some_agent'
|
||||
self.request_tries = 3
|
||||
self.status = status
|
||||
self.call_count = 0
|
||||
|
||||
def fake_app(self, env, start_response):
|
||||
start_response('404 Not Found', [('Content-Length', '0')])
|
||||
self.call_count += 1
|
||||
start_response(self.status, [('Content-Length', '0')])
|
||||
return []
|
||||
|
||||
fobj = FileObject(self)
|
||||
client = InternalClient()
|
||||
def do_test(status, expected_calls):
|
||||
fobj = FileObject(self)
|
||||
client = InternalClient(status)
|
||||
|
||||
try:
|
||||
old_sleep = internal_client.sleep
|
||||
internal_client.sleep = not_sleep
|
||||
try:
|
||||
client.make_request('PUT', '/', {}, (2,), fobj)
|
||||
except Exception as err:
|
||||
pass
|
||||
self.assertEqual(404, err.resp.status_int)
|
||||
finally:
|
||||
internal_client.sleep = old_sleep
|
||||
with mock.patch.object(internal_client, 'sleep', not_sleep):
|
||||
with self.assertRaises(Exception) as exc_mgr:
|
||||
client.make_request('PUT', '/', {}, (2,), fobj)
|
||||
self.assertEqual(int(status[:3]),
|
||||
exc_mgr.exception.resp.status_int)
|
||||
|
||||
self.assertEqual(client.request_tries, fobj.seek_called)
|
||||
self.assertEqual(client.call_count, fobj.seek_called)
|
||||
self.assertEqual(client.call_count, expected_calls)
|
||||
|
||||
do_test('404 Not Found', 1)
|
||||
do_test('503 Service Unavailable', 3)
|
||||
|
||||
def test_make_request_request_exception(self):
|
||||
class InternalClient(internal_client.InternalClient):
|
||||
|
@ -575,17 +670,15 @@ class TestInternalClient(unittest.TestCase):
|
|||
self.assertEqual(1, client.make_request_called)
|
||||
|
||||
def test_get_metadata_invalid_status(self):
|
||||
class FakeApp(object):
|
||||
|
||||
def __call__(self, environ, start_response):
|
||||
start_response('404 Not Found', [('x-foo', 'bar')])
|
||||
return ['nope']
|
||||
|
||||
class InternalClient(internal_client.InternalClient):
|
||||
def __init__(self):
|
||||
self.user_agent = 'test'
|
||||
self.request_tries = 1
|
||||
self.app = FakeApp()
|
||||
self.app = self.fake_app
|
||||
|
||||
def fake_app(self, environ, start_response):
|
||||
start_response('404 Not Found', [('x-foo', 'bar')])
|
||||
return ['nope']
|
||||
|
||||
client = InternalClient()
|
||||
self.assertRaises(internal_client.UnexpectedResponse,
|
||||
|
@ -667,9 +760,9 @@ class TestInternalClient(unittest.TestCase):
|
|||
return self.responses.pop(0)
|
||||
|
||||
paths = [
|
||||
'/?format=json&marker=start&end_marker=end',
|
||||
'/?format=json&marker=one%C3%A9&end_marker=end',
|
||||
'/?format=json&marker=two&end_marker=end',
|
||||
'/?format=json&marker=start&end_marker=end&prefix=',
|
||||
'/?format=json&marker=one%C3%A9&end_marker=end&prefix=',
|
||||
'/?format=json&marker=two&end_marker=end&prefix=',
|
||||
]
|
||||
|
||||
responses = [
|
||||
|
@ -685,6 +778,49 @@ class TestInternalClient(unittest.TestCase):
|
|||
|
||||
self.assertEqual('one\xc3\xa9 two'.split(), items)
|
||||
|
||||
def test_iter_items_with_markers_and_prefix(self):
|
||||
class Response(object):
|
||||
def __init__(self, status_int, body):
|
||||
self.status_int = status_int
|
||||
self.body = body
|
||||
|
||||
class InternalClient(internal_client.InternalClient):
|
||||
def __init__(self, test, paths, responses):
|
||||
self.test = test
|
||||
self.paths = paths
|
||||
self.responses = responses
|
||||
|
||||
def make_request(
|
||||
self, method, path, headers, acceptable_statuses,
|
||||
body_file=None):
|
||||
exp_path = self.paths.pop(0)
|
||||
self.test.assertEqual(exp_path, path)
|
||||
return self.responses.pop(0)
|
||||
|
||||
paths = [
|
||||
'/?format=json&marker=prefixed_start&end_marker=prefixed_end'
|
||||
'&prefix=prefixed_',
|
||||
'/?format=json&marker=prefixed_one%C3%A9&end_marker=prefixed_end'
|
||||
'&prefix=prefixed_',
|
||||
'/?format=json&marker=prefixed_two&end_marker=prefixed_end'
|
||||
'&prefix=prefixed_',
|
||||
]
|
||||
|
||||
responses = [
|
||||
Response(200, json.dumps([{'name': 'prefixed_one\xc3\xa9'}, ])),
|
||||
Response(200, json.dumps([{'name': 'prefixed_two'}, ])),
|
||||
Response(204, ''),
|
||||
]
|
||||
|
||||
items = []
|
||||
client = InternalClient(self, paths, responses)
|
||||
for item in client._iter_items('/', marker='prefixed_start',
|
||||
end_marker='prefixed_end',
|
||||
prefix='prefixed_'):
|
||||
items.append(item['name'].encode('utf8'))
|
||||
|
||||
self.assertEqual('prefixed_one\xc3\xa9 prefixed_two'.split(), items)
|
||||
|
||||
def test_iter_item_read_response_if_status_is_acceptable(self):
|
||||
class Response(object):
|
||||
def __init__(self, status_int, body, app_iter):
|
||||
|
@ -779,12 +915,13 @@ class TestInternalClient(unittest.TestCase):
|
|||
items = '0 1 2'.split()
|
||||
marker = 'some_marker'
|
||||
end_marker = 'some_end_marker'
|
||||
prefix = 'some_prefix'
|
||||
acceptable_statuses = 'some_status_list'
|
||||
client = IterInternalClient(
|
||||
self, path, marker, end_marker, acceptable_statuses, items)
|
||||
self, path, marker, end_marker, prefix, acceptable_statuses, items)
|
||||
ret_items = []
|
||||
for container in client.iter_containers(
|
||||
account, marker, end_marker,
|
||||
account, marker, end_marker, prefix,
|
||||
acceptable_statuses=acceptable_statuses):
|
||||
ret_items.append(container)
|
||||
self.assertEqual(items, ret_items)
|
||||
|
@ -987,13 +1124,15 @@ class TestInternalClient(unittest.TestCase):
|
|||
path = make_path(account, container)
|
||||
marker = 'some_maker'
|
||||
end_marker = 'some_end_marker'
|
||||
prefix = 'some_prefix'
|
||||
acceptable_statuses = 'some_status_list'
|
||||
items = '0 1 2'.split()
|
||||
client = IterInternalClient(
|
||||
self, path, marker, end_marker, acceptable_statuses, items)
|
||||
self, path, marker, end_marker, prefix, acceptable_statuses, items)
|
||||
ret_items = []
|
||||
for obj in client.iter_objects(
|
||||
account, container, marker, end_marker, acceptable_statuses):
|
||||
account, container, marker, end_marker, prefix,
|
||||
acceptable_statuses):
|
||||
ret_items.append(obj)
|
||||
self.assertEqual(items, ret_items)
|
||||
|
||||
|
|
|
@ -151,15 +151,17 @@ class FakeInternalClient(reconciler.InternalClient):
|
|||
container_listing_data.sort(key=operator.itemgetter('name'))
|
||||
# register container listing response
|
||||
container_headers = {}
|
||||
container_qry_string = '?format=json&marker=&end_marker='
|
||||
container_qry_string = \
|
||||
'?format=json&marker=&end_marker=&prefix='
|
||||
self.app.register('GET', container_path + container_qry_string,
|
||||
swob.HTTPOk, container_headers,
|
||||
json.dumps(container_listing_data))
|
||||
if container_listing_data:
|
||||
obj_name = container_listing_data[-1]['name']
|
||||
# client should quote and encode marker
|
||||
end_qry_string = '?format=json&marker=%s&end_marker=' % (
|
||||
urllib.parse.quote(obj_name.encode('utf-8')))
|
||||
end_qry_string = \
|
||||
'?format=json&marker=%s&end_marker=&prefix=' % (
|
||||
urllib.parse.quote(obj_name.encode('utf-8')))
|
||||
self.app.register('GET', container_path + end_qry_string,
|
||||
swob.HTTPOk, container_headers,
|
||||
json.dumps([]))
|
||||
|
@ -171,11 +173,11 @@ class FakeInternalClient(reconciler.InternalClient):
|
|||
# register account response
|
||||
account_listing_data.sort(key=operator.itemgetter('name'))
|
||||
account_headers = {}
|
||||
account_qry_string = '?format=json&marker=&end_marker='
|
||||
account_qry_string = '?format=json&marker=&end_marker=&prefix='
|
||||
self.app.register('GET', account_path + account_qry_string,
|
||||
swob.HTTPOk, account_headers,
|
||||
json.dumps(account_listing_data))
|
||||
end_qry_string = '?format=json&marker=%s&end_marker=' % (
|
||||
end_qry_string = '?format=json&marker=%s&end_marker=&prefix=' % (
|
||||
urllib.parse.quote(account_listing_data[-1]['name']))
|
||||
self.app.register('GET', account_path + end_qry_string,
|
||||
swob.HTTPOk, account_headers,
|
||||
|
@ -704,7 +706,7 @@ class TestReconcilerUtils(unittest.TestCase):
|
|||
|
||||
|
||||
def listing_qs(marker):
|
||||
return "?format=json&marker=%s&end_marker=" % \
|
||||
return "?format=json&marker=%s&end_marker=&prefix=" % \
|
||||
urllib.parse.quote(marker.encode('utf-8'))
|
||||
|
||||
|
||||
|
|
|
@ -252,8 +252,14 @@ class TestObjectExpirer(TestCase):
|
|||
self.assertFalse(pop_queue.called)
|
||||
self.assertEqual(start_reports, x.report_objects)
|
||||
self.assertEqual(1, len(log_lines))
|
||||
self.assertIn('Exception while deleting object container obj',
|
||||
log_lines[0])
|
||||
if isinstance(exc, internal_client.UnexpectedResponse):
|
||||
self.assertEqual(
|
||||
log_lines[0],
|
||||
'Unexpected response while deleting object container '
|
||||
'obj: %s' % exc.resp.status_int)
|
||||
else:
|
||||
self.assertTrue(log_lines[0].startswith(
|
||||
'Exception while deleting object container obj'))
|
||||
|
||||
# verify pop_queue logic on exceptions
|
||||
for exc, ts, should_pop in [(None, timestamp, True),
|
||||
|
@ -735,30 +741,30 @@ class TestObjectExpirer(TestCase):
|
|||
got_env[0]['HTTP_X_IF_DELETE_AT'])
|
||||
self.assertEqual(got_env[0]['PATH_INFO'], '/v1/path/to/object name')
|
||||
|
||||
def test_delete_actual_object_raises_404(self):
|
||||
def test_delete_actual_object_returns_expected_error(self):
|
||||
def do_test(test_status):
|
||||
calls = [0]
|
||||
|
||||
def fake_app(env, start_response):
|
||||
start_response('404 Not Found', [('Content-Length', '0')])
|
||||
return []
|
||||
def fake_app(env, start_response):
|
||||
calls[0] += 1
|
||||
start_response(test_status, [('Content-Length', '0')])
|
||||
return []
|
||||
|
||||
internal_client.loadapp = lambda *a, **kw: fake_app
|
||||
internal_client.loadapp = lambda *a, **kw: fake_app
|
||||
|
||||
x = expirer.ObjectExpirer({})
|
||||
self.assertRaises(internal_client.UnexpectedResponse,
|
||||
x.delete_actual_object, '/path/to/object', '1234')
|
||||
x = expirer.ObjectExpirer({})
|
||||
self.assertRaises(internal_client.UnexpectedResponse,
|
||||
x.delete_actual_object, '/path/to/object',
|
||||
'1234')
|
||||
self.assertEqual(calls[0], 1)
|
||||
|
||||
def test_delete_actual_object_raises_412(self):
|
||||
|
||||
def fake_app(env, start_response):
|
||||
start_response('412 Precondition Failed',
|
||||
[('Content-Length', '0')])
|
||||
return []
|
||||
|
||||
internal_client.loadapp = lambda *a, **kw: fake_app
|
||||
|
||||
x = expirer.ObjectExpirer({})
|
||||
self.assertRaises(internal_client.UnexpectedResponse,
|
||||
x.delete_actual_object, '/path/to/object', '1234')
|
||||
# object was deleted and tombstone reaped
|
||||
do_test('404 Not Found')
|
||||
# object was overwritten *after* the original expiration, or
|
||||
# object was deleted but tombstone still exists, or
|
||||
# object was overwritten ahead of the original expiration, or
|
||||
# object was POSTed to with a new (or no) expiration, or ...
|
||||
do_test('412 Precondition Failed')
|
||||
|
||||
def test_delete_actual_object_does_not_handle_odd_stuff(self):
|
||||
|
||||
|
@ -784,12 +790,25 @@ class TestObjectExpirer(TestCase):
|
|||
name = 'this name should get quoted'
|
||||
timestamp = '1366063156.863045'
|
||||
x = expirer.ObjectExpirer({})
|
||||
x.swift.make_request = mock.MagicMock()
|
||||
x.swift.make_request = mock.Mock()
|
||||
x.swift.make_request.return_value.status_int = 204
|
||||
x.delete_actual_object(name, timestamp)
|
||||
self.assertEqual(x.swift.make_request.call_count, 1)
|
||||
self.assertEqual(x.swift.make_request.call_args[0][1],
|
||||
'/v1/' + urllib.parse.quote(name))
|
||||
|
||||
def test_delete_actual_object_queue_cleaning(self):
|
||||
name = 'something'
|
||||
timestamp = '1515544858.80602'
|
||||
x = expirer.ObjectExpirer({})
|
||||
x.swift.make_request = mock.MagicMock()
|
||||
x.delete_actual_object(name, timestamp)
|
||||
self.assertEqual(x.swift.make_request.call_count, 1)
|
||||
header = 'X-Backend-Clean-Expiring-Object-Queue'
|
||||
self.assertEqual(
|
||||
x.swift.make_request.call_args[0][2].get(header),
|
||||
'no')
|
||||
|
||||
def test_pop_queue(self):
|
||||
class InternalClient(object):
|
||||
container_ring = FakeRing()
|
||||
|
|
|
@ -5515,35 +5515,50 @@ class TestObjectController(unittest.TestCase):
|
|||
self.assertTrue('chost,badhost' in msg)
|
||||
self.assertTrue('cdevice' in msg)
|
||||
|
||||
def test_delete_at_update_on_put(self):
|
||||
# Test how delete_at_update works when issued a delete for old
|
||||
# expiration info after a new put with no new expiration info.
|
||||
def test_delete_at_update_cleans_old_entries(self):
|
||||
# Test how delete_at_update works with a request to overwrite an object
|
||||
# with delete-at metadata
|
||||
policy = random.choice(list(POLICIES))
|
||||
given_args = []
|
||||
|
||||
def fake_async_update(*args):
|
||||
given_args.extend(args)
|
||||
def do_test(method, headers, expected_args):
|
||||
given_args = []
|
||||
|
||||
req = Request.blank(
|
||||
'/v1/a/c/o',
|
||||
environ={'REQUEST_METHOD': 'PUT'},
|
||||
headers={'X-Timestamp': 1,
|
||||
'X-Trans-Id': '123',
|
||||
'X-Backend-Storage-Policy-Index': int(policy)})
|
||||
with mock.patch.object(self.object_controller, 'async_update',
|
||||
fake_async_update):
|
||||
self.object_controller.delete_at_update(
|
||||
'DELETE', 2, 'a', 'c', 'o', req, 'sda1', policy)
|
||||
self.assertEqual(
|
||||
given_args, [
|
||||
def fake_async_update(*args):
|
||||
given_args.extend(args)
|
||||
|
||||
headers.update({'X-Timestamp': 1,
|
||||
'X-Trans-Id': '123',
|
||||
'X-Backend-Storage-Policy-Index': int(policy)})
|
||||
req = Request.blank(
|
||||
'/v1/a/c/o',
|
||||
environ={'REQUEST_METHOD': method},
|
||||
headers=headers)
|
||||
with mock.patch.object(self.object_controller, 'async_update',
|
||||
fake_async_update):
|
||||
self.object_controller.delete_at_update(
|
||||
'DELETE', 2, 'a', 'c', 'o', req, 'sda1', policy)
|
||||
self.assertEqual(expected_args, given_args)
|
||||
|
||||
for method in ('PUT', 'POST', 'DELETE'):
|
||||
expected_args = [
|
||||
'DELETE', '.expiring_objects', '0000000000',
|
||||
'0000000002-a/c/o', None, None, None,
|
||||
HeaderKeyDict({
|
||||
'0000000002-a/c/o', None, None,
|
||||
None, HeaderKeyDict({
|
||||
'X-Backend-Storage-Policy-Index': 0,
|
||||
'x-timestamp': utils.Timestamp('1').internal,
|
||||
'x-trans-id': '123',
|
||||
'referer': 'PUT http://localhost/v1/a/c/o'}),
|
||||
'sda1', policy])
|
||||
'referer': '%s http://localhost/v1/a/c/o' % method}),
|
||||
'sda1', policy]
|
||||
# async_update should be called by default...
|
||||
do_test(method, {}, expected_args)
|
||||
do_test(method, {'X-Backend-Clean-Expiring-Object-Queue': 'true'},
|
||||
expected_args)
|
||||
do_test(method, {'X-Backend-Clean-Expiring-Object-Queue': 't'},
|
||||
expected_args)
|
||||
# ...unless header has a false value
|
||||
do_test(method, {'X-Backend-Clean-Expiring-Object-Queue': 'false'},
|
||||
[])
|
||||
do_test(method, {'X-Backend-Clean-Expiring-Object-Queue': 'f'}, [])
|
||||
|
||||
def test_delete_at_negative(self):
|
||||
# Test how delete_at_update works when issued a delete for old
|
||||
|
@ -5675,6 +5690,83 @@ class TestObjectController(unittest.TestCase):
|
|||
['X-Delete-At-Container header must be specified for expiring '
|
||||
'objects background PUT to work properly. Making best guess as '
|
||||
'to the container name for now.'])
|
||||
self.assertEqual(
|
||||
given_args, [
|
||||
'PUT', '.expiring_objects', '0000000000', '0000000002-a/c/o',
|
||||
'127.0.0.1:1234',
|
||||
'3', 'sdc1', HeaderKeyDict({
|
||||
# the .expiring_objects account is always policy-0
|
||||
'X-Backend-Storage-Policy-Index': 0,
|
||||
'x-size': '0',
|
||||
'x-etag': 'd41d8cd98f00b204e9800998ecf8427e',
|
||||
'x-content-type': 'text/plain',
|
||||
'x-timestamp': utils.Timestamp('1').internal,
|
||||
'x-trans-id': '1234',
|
||||
'referer': 'PUT http://localhost/v1/a/c/o'}),
|
||||
'sda1', policy])
|
||||
|
||||
def test_delete_at_update_put_with_info_but_missing_host(self):
|
||||
# Same as test_delete_at_update_put_with_info, but just
|
||||
# missing the X-Delete-At-Host header.
|
||||
policy = random.choice(list(POLICIES))
|
||||
given_args = []
|
||||
|
||||
def fake_async_update(*args):
|
||||
given_args.extend(args)
|
||||
|
||||
self.object_controller.async_update = fake_async_update
|
||||
self.object_controller.logger = self.logger
|
||||
req = Request.blank(
|
||||
'/v1/a/c/o',
|
||||
environ={'REQUEST_METHOD': 'PUT'},
|
||||
headers={'X-Timestamp': 1,
|
||||
'X-Trans-Id': '1234',
|
||||
'X-Delete-At-Container': '0',
|
||||
'X-Delete-At-Partition': '3',
|
||||
'X-Delete-At-Device': 'sdc1',
|
||||
'X-Backend-Storage-Policy-Index': int(policy)})
|
||||
self.object_controller.delete_at_update('PUT', 2, 'a', 'c', 'o',
|
||||
req, 'sda1', policy)
|
||||
self.assertFalse(self.logger.get_lines_for_level('warning'))
|
||||
self.assertEqual(given_args, [])
|
||||
|
||||
def test_delete_at_update_put_with_info_but_empty_host(self):
|
||||
# Same as test_delete_at_update_put_with_info, but empty
|
||||
# X-Delete-At-Host header and no X-Delete-At-Partition nor
|
||||
# X-Delete-At-Device.
|
||||
policy = random.choice(list(POLICIES))
|
||||
given_args = []
|
||||
|
||||
def fake_async_update(*args):
|
||||
given_args.extend(args)
|
||||
|
||||
self.object_controller.async_update = fake_async_update
|
||||
self.object_controller.logger = self.logger
|
||||
req = Request.blank(
|
||||
'/v1/a/c/o',
|
||||
environ={'REQUEST_METHOD': 'PUT'},
|
||||
headers={'X-Timestamp': 1,
|
||||
'X-Trans-Id': '1234',
|
||||
'X-Delete-At-Container': '0',
|
||||
'X-Delete-At-Host': '',
|
||||
'X-Backend-Storage-Policy-Index': int(policy)})
|
||||
self.object_controller.delete_at_update('PUT', 2, 'a', 'c', 'o',
|
||||
req, 'sda1', policy)
|
||||
self.assertFalse(self.logger.get_lines_for_level('warning'))
|
||||
self.assertEqual(
|
||||
given_args, [
|
||||
'PUT', '.expiring_objects', '0000000000', '0000000002-a/c/o',
|
||||
None,
|
||||
None, None, HeaderKeyDict({
|
||||
# the .expiring_objects account is always policy-0
|
||||
'X-Backend-Storage-Policy-Index': 0,
|
||||
'x-size': '0',
|
||||
'x-etag': 'd41d8cd98f00b204e9800998ecf8427e',
|
||||
'x-content-type': 'text/plain',
|
||||
'x-timestamp': utils.Timestamp('1').internal,
|
||||
'x-trans-id': '1234',
|
||||
'referer': 'PUT http://localhost/v1/a/c/o'}),
|
||||
'sda1', policy])
|
||||
|
||||
def test_delete_at_update_delete(self):
|
||||
policy = random.choice(list(POLICIES))
|
||||
|
@ -5861,16 +5953,16 @@ class TestObjectController(unittest.TestCase):
|
|||
given_args[5], 'sda1', policy])
|
||||
|
||||
def test_GET_but_expired(self):
|
||||
# Start off with an existing object that will expire
|
||||
now = time()
|
||||
test_time = now + 10000
|
||||
delete_at_timestamp = int(test_time + 100)
|
||||
delete_at_timestamp = int(now + 100)
|
||||
delete_at_container = str(
|
||||
delete_at_timestamp /
|
||||
self.object_controller.expiring_objects_container_divisor *
|
||||
self.object_controller.expiring_objects_container_divisor)
|
||||
req = Request.blank(
|
||||
'/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
|
||||
headers={'X-Timestamp': normalize_timestamp(test_time - 2000),
|
||||
headers={'X-Timestamp': normalize_timestamp(now),
|
||||
'X-Delete-At': str(delete_at_timestamp),
|
||||
'X-Delete-At-Container': delete_at_container,
|
||||
'Content-Length': '4',
|
||||
|
@ -5879,50 +5971,29 @@ class TestObjectController(unittest.TestCase):
|
|||
resp = req.get_response(self.object_controller)
|
||||
self.assertEqual(resp.status_int, 201)
|
||||
|
||||
# It expires in the future, so it's accessible via GET
|
||||
req = Request.blank(
|
||||
'/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'GET'},
|
||||
headers={'X-Timestamp': normalize_timestamp(test_time)})
|
||||
resp = req.get_response(self.object_controller)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
|
||||
delete_at_timestamp = int(now + 1)
|
||||
delete_at_container = str(
|
||||
delete_at_timestamp /
|
||||
self.object_controller.expiring_objects_container_divisor *
|
||||
self.object_controller.expiring_objects_container_divisor)
|
||||
put_timestamp = normalize_timestamp(test_time - 1000)
|
||||
req = Request.blank(
|
||||
'/sda1/p/a/c/o',
|
||||
environ={'REQUEST_METHOD': 'PUT'},
|
||||
headers={'X-Timestamp': put_timestamp,
|
||||
'X-Delete-At': str(delete_at_timestamp),
|
||||
'X-Delete-At-Container': delete_at_container,
|
||||
'Content-Length': '4',
|
||||
'Content-Type': 'application/octet-stream'})
|
||||
req.body = 'TEST'
|
||||
|
||||
# fix server time to now: delete-at is in future, verify GET is ok
|
||||
headers={'X-Timestamp': normalize_timestamp(now)})
|
||||
with mock.patch('swift.obj.server.time.time', return_value=now):
|
||||
resp = req.get_response(self.object_controller)
|
||||
self.assertEqual(resp.status_int, 201)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
|
||||
req = Request.blank(
|
||||
'/sda1/p/a/c/o',
|
||||
environ={'REQUEST_METHOD': 'GET'},
|
||||
headers={'X-Timestamp': normalize_timestamp(test_time)})
|
||||
# It expires in the past, so it's not accessible via GET...
|
||||
req = Request.blank(
|
||||
'/sda1/p/a/c/o',
|
||||
environ={'REQUEST_METHOD': 'GET'},
|
||||
headers={'X-Timestamp': normalize_timestamp(
|
||||
delete_at_timestamp + 1)})
|
||||
with mock.patch('swift.obj.server.time.time',
|
||||
return_value=delete_at_timestamp + 1):
|
||||
resp = req.get_response(self.object_controller)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
self.assertEqual(resp.status_int, 404)
|
||||
self.assertEqual(resp.headers['X-Backend-Timestamp'],
|
||||
utils.Timestamp(now))
|
||||
|
||||
# fix server time to now + 2: delete-at is in past, verify GET fails...
|
||||
with mock.patch('swift.obj.server.time.time', return_value=now + 2):
|
||||
req = Request.blank(
|
||||
'/sda1/p/a/c/o',
|
||||
environ={'REQUEST_METHOD': 'GET'},
|
||||
headers={'X-Timestamp': normalize_timestamp(now + 2)})
|
||||
resp = req.get_response(self.object_controller)
|
||||
self.assertEqual(resp.status_int, 404)
|
||||
self.assertEqual(resp.headers['X-Backend-Timestamp'],
|
||||
utils.Timestamp(put_timestamp))
|
||||
with mock.patch('swift.obj.server.time.time',
|
||||
return_value=delete_at_timestamp + 1):
|
||||
# ...unless X-Backend-Replication is sent
|
||||
expected = {
|
||||
'GET': 'TEST',
|
||||
|
@ -5931,22 +6002,24 @@ class TestObjectController(unittest.TestCase):
|
|||
for meth, expected_body in expected.items():
|
||||
req = Request.blank(
|
||||
'/sda1/p/a/c/o', method=meth,
|
||||
headers={'X-Timestamp': normalize_timestamp(now + 2),
|
||||
headers={'X-Timestamp':
|
||||
normalize_timestamp(delete_at_timestamp + 1),
|
||||
'X-Backend-Replication': 'True'})
|
||||
resp = req.get_response(self.object_controller)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
self.assertEqual(expected_body, resp.body)
|
||||
|
||||
def test_HEAD_but_expired(self):
|
||||
test_time = time() + 10000
|
||||
delete_at_timestamp = int(test_time + 100)
|
||||
# We have an object that expires in the future
|
||||
now = time()
|
||||
delete_at_timestamp = int(now + 100)
|
||||
delete_at_container = str(
|
||||
delete_at_timestamp /
|
||||
self.object_controller.expiring_objects_container_divisor *
|
||||
self.object_controller.expiring_objects_container_divisor)
|
||||
req = Request.blank(
|
||||
'/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
|
||||
headers={'X-Timestamp': normalize_timestamp(test_time - 2000),
|
||||
headers={'X-Timestamp': normalize_timestamp(now),
|
||||
'X-Delete-At': str(delete_at_timestamp),
|
||||
'X-Delete-At-Container': delete_at_container,
|
||||
'Content-Length': '4',
|
||||
|
@ -5955,26 +6028,43 @@ class TestObjectController(unittest.TestCase):
|
|||
resp = req.get_response(self.object_controller)
|
||||
self.assertEqual(resp.status_int, 201)
|
||||
|
||||
# It's accessible since it expires in the future
|
||||
req = Request.blank(
|
||||
'/sda1/p/a/c/o',
|
||||
environ={'REQUEST_METHOD': 'HEAD'},
|
||||
headers={'X-Timestamp': normalize_timestamp(test_time)})
|
||||
resp = req.get_response(self.object_controller)
|
||||
headers={'X-Timestamp': normalize_timestamp(now)})
|
||||
with mock.patch('swift.obj.server.time.time', return_value=now):
|
||||
resp = req.get_response(self.object_controller)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
|
||||
# fix server time to now: delete-at is in future, verify GET is ok
|
||||
# It's not accessible now since it expires in the past
|
||||
req = Request.blank(
|
||||
'/sda1/p/a/c/o',
|
||||
environ={'REQUEST_METHOD': 'HEAD'},
|
||||
headers={'X-Timestamp': normalize_timestamp(
|
||||
delete_at_timestamp + 1)})
|
||||
with mock.patch('swift.obj.server.time.time',
|
||||
return_value=delete_at_timestamp + 1):
|
||||
resp = req.get_response(self.object_controller)
|
||||
self.assertEqual(resp.status_int, 404)
|
||||
self.assertEqual(resp.headers['X-Backend-Timestamp'],
|
||||
utils.Timestamp(now))
|
||||
|
||||
def test_POST_but_expired(self):
|
||||
now = time()
|
||||
with mock.patch('swift.obj.server.time.time', return_value=now):
|
||||
delete_at_timestamp = int(now + 1)
|
||||
delete_at_container = str(
|
||||
delete_at_timestamp /
|
||||
self.object_controller.expiring_objects_container_divisor *
|
||||
self.object_controller.expiring_objects_container_divisor)
|
||||
put_timestamp = normalize_timestamp(test_time - 1000)
|
||||
delete_at_timestamp = int(now + 100)
|
||||
delete_at_container = str(
|
||||
delete_at_timestamp /
|
||||
self.object_controller.expiring_objects_container_divisor *
|
||||
self.object_controller.expiring_objects_container_divisor)
|
||||
|
||||
# We recreate the test object every time to ensure a clean test; a
|
||||
# POST may change attributes of the object, so it's not safe to
|
||||
# re-use.
|
||||
def recreate_test_object(when):
|
||||
req = Request.blank(
|
||||
'/sda1/p/a/c/o',
|
||||
environ={'REQUEST_METHOD': 'PUT'},
|
||||
headers={'X-Timestamp': put_timestamp,
|
||||
'/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
|
||||
headers={'X-Timestamp': normalize_timestamp(when),
|
||||
'X-Delete-At': str(delete_at_timestamp),
|
||||
'X-Delete-At-Container': delete_at_container,
|
||||
'Content-Length': '4',
|
||||
|
@ -5982,76 +6072,69 @@ class TestObjectController(unittest.TestCase):
|
|||
req.body = 'TEST'
|
||||
resp = req.get_response(self.object_controller)
|
||||
self.assertEqual(resp.status_int, 201)
|
||||
req = Request.blank(
|
||||
'/sda1/p/a/c/o',
|
||||
environ={'REQUEST_METHOD': 'HEAD'},
|
||||
headers={'X-Timestamp': normalize_timestamp(test_time)})
|
||||
resp = req.get_response(self.object_controller)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
|
||||
with mock.patch('swift.obj.server.time.time', return_value=now + 2):
|
||||
req = Request.blank(
|
||||
'/sda1/p/a/c/o',
|
||||
environ={'REQUEST_METHOD': 'HEAD'},
|
||||
headers={'X-Timestamp': normalize_timestamp(now + 2)})
|
||||
resp = req.get_response(self.object_controller)
|
||||
self.assertEqual(resp.status_int, 404)
|
||||
self.assertEqual(resp.headers['X-Backend-Timestamp'],
|
||||
utils.Timestamp(put_timestamp))
|
||||
|
||||
def test_POST_but_expired(self):
|
||||
test_time = time() + 10000
|
||||
delete_at_timestamp = int(test_time + 100)
|
||||
delete_at_container = str(
|
||||
delete_at_timestamp /
|
||||
self.object_controller.expiring_objects_container_divisor *
|
||||
self.object_controller.expiring_objects_container_divisor)
|
||||
req = Request.blank(
|
||||
'/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
|
||||
headers={'X-Timestamp': normalize_timestamp(test_time - 2000),
|
||||
'X-Delete-At': str(delete_at_timestamp),
|
||||
'X-Delete-At-Container': delete_at_container,
|
||||
'Content-Length': '4',
|
||||
'Content-Type': 'application/octet-stream'})
|
||||
req.body = 'TEST'
|
||||
resp = req.get_response(self.object_controller)
|
||||
self.assertEqual(resp.status_int, 201)
|
||||
|
||||
# You can POST to a not-yet-expired object
|
||||
recreate_test_object(now)
|
||||
the_time = now + 1
|
||||
req = Request.blank(
|
||||
'/sda1/p/a/c/o',
|
||||
environ={'REQUEST_METHOD': 'POST'},
|
||||
headers={'X-Timestamp': normalize_timestamp(test_time - 1500)})
|
||||
resp = req.get_response(self.object_controller)
|
||||
headers={'X-Timestamp': normalize_timestamp(the_time)})
|
||||
with mock.patch('swift.obj.server.time.time', return_value=the_time):
|
||||
resp = req.get_response(self.object_controller)
|
||||
self.assertEqual(resp.status_int, 202)
|
||||
|
||||
delete_at_timestamp = int(time() + 2)
|
||||
# You cannot POST to an expired object
|
||||
now += 2
|
||||
recreate_test_object(now)
|
||||
the_time = delete_at_timestamp + 1
|
||||
req = Request.blank(
|
||||
'/sda1/p/a/c/o',
|
||||
environ={'REQUEST_METHOD': 'POST'},
|
||||
headers={'X-Timestamp': normalize_timestamp(the_time)})
|
||||
with mock.patch('swift.obj.server.time.time', return_value=the_time):
|
||||
resp = req.get_response(self.object_controller)
|
||||
self.assertEqual(resp.status_int, 404)
|
||||
|
||||
def test_DELETE_can_skip_updating_expirer_queue(self):
|
||||
policy = POLICIES.get_by_index(0)
|
||||
test_time = time()
|
||||
put_time = test_time
|
||||
delete_time = test_time + 1
|
||||
delete_at_timestamp = int(test_time + 10000)
|
||||
delete_at_container = str(
|
||||
delete_at_timestamp /
|
||||
self.object_controller.expiring_objects_container_divisor *
|
||||
self.object_controller.expiring_objects_container_divisor)
|
||||
req = Request.blank(
|
||||
'/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
|
||||
headers={'X-Timestamp': normalize_timestamp(test_time - 1000),
|
||||
headers={'X-Timestamp': normalize_timestamp(put_time),
|
||||
'X-Delete-At': str(delete_at_timestamp),
|
||||
'X-Delete-At-Container': delete_at_container,
|
||||
'Content-Length': '4',
|
||||
'Content-Type': 'application/octet-stream'})
|
||||
req.body = 'TEST'
|
||||
resp = req.get_response(self.object_controller)
|
||||
|
||||
# Mock out async_update so we don't get any async_pending files.
|
||||
with mock.patch.object(self.object_controller, 'async_update'):
|
||||
resp = req.get_response(self.object_controller)
|
||||
self.assertEqual(resp.status_int, 201)
|
||||
|
||||
orig_time = object_server.time.time
|
||||
req = Request.blank(
|
||||
'/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'DELETE'},
|
||||
headers={'X-Timestamp': normalize_timestamp(delete_time),
|
||||
'X-Backend-Clean-Expiring-Object-Queue': 'false',
|
||||
'X-If-Delete-At': str(delete_at_timestamp)})
|
||||
resp = req.get_response(self.object_controller)
|
||||
self.assertEqual(resp.status_int, 204)
|
||||
|
||||
async_pending_dir = os.path.join(
|
||||
self.testdir, 'sda1', diskfile.get_async_dir(policy))
|
||||
# empty dir or absent dir, either is fine
|
||||
try:
|
||||
t = time() + 3
|
||||
object_server.time.time = lambda: t
|
||||
req = Request.blank(
|
||||
'/sda1/p/a/c/o',
|
||||
environ={'REQUEST_METHOD': 'POST'},
|
||||
headers={'X-Timestamp': normalize_timestamp(time())})
|
||||
resp = req.get_response(self.object_controller)
|
||||
self.assertEqual(resp.status_int, 404)
|
||||
finally:
|
||||
object_server.time.time = orig_time
|
||||
self.assertEqual([], os.listdir(async_pending_dir))
|
||||
except OSError as err:
|
||||
self.assertEqual(err.errno, errno.ENOENT)
|
||||
|
||||
def test_DELETE_but_expired(self):
|
||||
test_time = time() + 10000
|
||||
|
@ -6118,7 +6201,7 @@ class TestObjectController(unittest.TestCase):
|
|||
utils.Timestamp(test_timestamp).internal + '.data')
|
||||
self.assertTrue(os.path.isfile(objfile))
|
||||
|
||||
# move time past expirery
|
||||
# move time past expiry
|
||||
with mock.patch('swift.obj.diskfile.time') as mock_time:
|
||||
mock_time.time.return_value = test_time + 100
|
||||
req = Request.blank(
|
||||
|
@ -6298,6 +6381,111 @@ class TestObjectController(unittest.TestCase):
|
|||
'DELETE', int(delete_at_timestamp1), 'a', 'c', 'o',
|
||||
given_args[5], 'sda1', POLICIES[0]])
|
||||
|
||||
def test_PUT_can_skip_updating_expirer_queue(self):
|
||||
policy = POLICIES.get_by_index(0)
|
||||
test_time = time()
|
||||
put_time = test_time
|
||||
overwrite_time = test_time + 1
|
||||
delete_at_timestamp = int(test_time + 10000)
|
||||
delete_at_container = str(
|
||||
delete_at_timestamp /
|
||||
self.object_controller.expiring_objects_container_divisor *
|
||||
self.object_controller.expiring_objects_container_divisor)
|
||||
req = Request.blank(
|
||||
'/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
|
||||
headers={'X-Timestamp': normalize_timestamp(put_time),
|
||||
'X-Delete-At': str(delete_at_timestamp),
|
||||
'X-Delete-At-Container': delete_at_container,
|
||||
'Content-Length': '4',
|
||||
'Content-Type': 'application/octet-stream'})
|
||||
req.body = 'TEST'
|
||||
|
||||
# Mock out async_update so we don't get any async_pending files.
|
||||
with mock.patch.object(self.object_controller, 'async_update'):
|
||||
resp = req.get_response(self.object_controller)
|
||||
self.assertEqual(resp.status_int, 201)
|
||||
|
||||
# Overwrite with a non-expiring object
|
||||
req = Request.blank(
|
||||
'/sda1/p/a/c/o',
|
||||
environ={'REQUEST_METHOD': 'PUT'},
|
||||
headers={'X-Timestamp': normalize_timestamp(overwrite_time),
|
||||
'X-Backend-Clean-Expiring-Object-Queue': 'false',
|
||||
'Content-Length': '9',
|
||||
'Content-Type': 'application/octet-stream'})
|
||||
req.body = 'new stuff'
|
||||
resp = req.get_response(self.object_controller)
|
||||
self.assertEqual(resp.status_int, 201)
|
||||
|
||||
async_pending_dir = os.path.join(
|
||||
self.testdir, 'sda1', diskfile.get_async_dir(policy))
|
||||
# empty dir or absent dir, either is fine
|
||||
try:
|
||||
self.assertEqual([], os.listdir(async_pending_dir))
|
||||
except OSError as err:
|
||||
self.assertEqual(err.errno, errno.ENOENT)
|
||||
|
||||
def test_PUT_can_skip_deleting_expirer_queue_but_still_inserts(self):
|
||||
policy = POLICIES.get_by_index(0)
|
||||
test_time = time()
|
||||
put_time = test_time
|
||||
overwrite_time = test_time + 1
|
||||
delete_at_timestamp_1 = int(test_time + 10000)
|
||||
delete_at_timestamp_2 = int(test_time + 20000)
|
||||
delete_at_container_1 = str(
|
||||
delete_at_timestamp_1 /
|
||||
self.object_controller.expiring_objects_container_divisor *
|
||||
self.object_controller.expiring_objects_container_divisor)
|
||||
delete_at_container_2 = str(
|
||||
delete_at_timestamp_2 /
|
||||
self.object_controller.expiring_objects_container_divisor *
|
||||
self.object_controller.expiring_objects_container_divisor)
|
||||
req = Request.blank(
|
||||
'/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
|
||||
headers={'X-Timestamp': normalize_timestamp(put_time),
|
||||
'X-Delete-At': str(delete_at_timestamp_1),
|
||||
'X-Delete-At-Container': delete_at_container_1,
|
||||
'X-Delete-At-Host': '1.2.3.4',
|
||||
'Content-Length': '4',
|
||||
'Content-Type': 'application/octet-stream'})
|
||||
req.body = 'TEST'
|
||||
|
||||
# Mock out async_update so we don't get any async_pending files.
|
||||
with mock.patch.object(self.object_controller, 'async_update'):
|
||||
resp = req.get_response(self.object_controller)
|
||||
self.assertEqual(resp.status_int, 201)
|
||||
|
||||
# Overwrite with an expiring object
|
||||
req = Request.blank(
|
||||
'/sda1/p/a/c/o',
|
||||
environ={'REQUEST_METHOD': 'PUT'},
|
||||
headers={'X-Timestamp': normalize_timestamp(overwrite_time),
|
||||
'X-Backend-Clean-Expiring-Object-Queue': 'false',
|
||||
'X-Delete-At': str(delete_at_timestamp_2),
|
||||
'X-Delete-At-Container': delete_at_container_2,
|
||||
'X-Delete-At-Host': '1.2.3.4',
|
||||
'Content-Length': '9',
|
||||
'Content-Type': 'application/octet-stream'})
|
||||
req.body = 'new stuff'
|
||||
resp = req.get_response(self.object_controller)
|
||||
self.assertEqual(resp.status_int, 201)
|
||||
|
||||
async_pendings = []
|
||||
async_pending_dir = os.path.join(
|
||||
self.testdir, 'sda1', diskfile.get_async_dir(policy))
|
||||
for dirpath, _, filenames in os.walk(async_pending_dir):
|
||||
for filename in filenames:
|
||||
async_pendings.append(os.path.join(dirpath, filename))
|
||||
|
||||
self.assertEqual(len(async_pendings), 1)
|
||||
|
||||
async_pending_ops = []
|
||||
for pending_file in async_pendings:
|
||||
with open(pending_file) as fh:
|
||||
async_pending = pickle.load(fh)
|
||||
async_pending_ops.append(async_pending['op'])
|
||||
self.assertEqual(async_pending_ops, ['PUT'])
|
||||
|
||||
def test_PUT_delete_at_in_past(self):
|
||||
req = Request.blank(
|
||||
'/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
|
||||
|
@ -6310,6 +6498,48 @@ class TestObjectController(unittest.TestCase):
|
|||
self.assertEqual(resp.status_int, 400)
|
||||
self.assertTrue('X-Delete-At in past' in resp.body)
|
||||
|
||||
def test_POST_can_skip_updating_expirer_queue(self):
|
||||
policy = POLICIES.get_by_index(0)
|
||||
test_time = time()
|
||||
put_time = test_time
|
||||
overwrite_time = test_time + 1
|
||||
delete_at_timestamp = int(test_time + 10000)
|
||||
delete_at_container = str(
|
||||
delete_at_timestamp /
|
||||
self.object_controller.expiring_objects_container_divisor *
|
||||
self.object_controller.expiring_objects_container_divisor)
|
||||
req = Request.blank(
|
||||
'/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
|
||||
headers={'X-Timestamp': normalize_timestamp(put_time),
|
||||
'X-Delete-At': str(delete_at_timestamp),
|
||||
'X-Delete-At-Container': delete_at_container,
|
||||
'Content-Length': '4',
|
||||
'Content-Type': 'application/octet-stream'})
|
||||
req.body = 'TEST'
|
||||
|
||||
# Mock out async_update so we don't get any async_pending files.
|
||||
with mock.patch.object(self.object_controller, 'async_update'):
|
||||
resp = req.get_response(self.object_controller)
|
||||
self.assertEqual(resp.status_int, 201)
|
||||
|
||||
# POST to remove X-Delete-At
|
||||
req = Request.blank(
|
||||
'/sda1/p/a/c/o',
|
||||
environ={'REQUEST_METHOD': 'POST'},
|
||||
headers={'X-Timestamp': normalize_timestamp(overwrite_time),
|
||||
'X-Backend-Clean-Expiring-Object-Queue': 'false',
|
||||
'X-Delete-At': ''})
|
||||
resp = req.get_response(self.object_controller)
|
||||
self.assertEqual(resp.status_int, 202)
|
||||
|
||||
async_pending_dir = os.path.join(
|
||||
self.testdir, 'sda1', diskfile.get_async_dir(policy))
|
||||
# empty dir or absent dir, either is fine
|
||||
try:
|
||||
self.assertEqual([], os.listdir(async_pending_dir))
|
||||
except OSError as err:
|
||||
self.assertEqual(err.errno, errno.ENOENT)
|
||||
|
||||
def test_POST_delete_at_in_past(self):
|
||||
req = Request.blank(
|
||||
'/sda1/p/a/c/o',
|
||||
|
|
|
@ -25,7 +25,8 @@ from tempfile import mkdtemp
|
|||
from shutil import rmtree
|
||||
from test import listen_zero
|
||||
from test.unit import (
|
||||
make_timestamp_iter, debug_logger, patch_policies, mocked_http_conn)
|
||||
make_timestamp_iter, debug_logger, patch_policies, mocked_http_conn,
|
||||
FakeLogger)
|
||||
from time import time
|
||||
from distutils.dir_util import mkpath
|
||||
|
||||
|
@ -250,6 +251,72 @@ class TestObjectUpdater(unittest.TestCase):
|
|||
# a warning indicating that the '99' policy isn't valid
|
||||
check_with_idx('99', 1, should_skip=True)
|
||||
|
||||
def test_sweep_logs(self):
|
||||
asyncdir = os.path.join(self.sda1, ASYNCDIR_BASE)
|
||||
prefix_dir = os.path.join(asyncdir, 'abc')
|
||||
mkpath(prefix_dir)
|
||||
|
||||
for o, t in [('abc', 123), ('def', 234), ('ghi', 345),
|
||||
('jkl', 456), ('mno', 567)]:
|
||||
ohash = hash_path('account', 'container', o)
|
||||
o_path = os.path.join(prefix_dir, ohash + '-' +
|
||||
normalize_timestamp(t))
|
||||
write_pickle({}, o_path)
|
||||
|
||||
class MockObjectUpdater(object_updater.ObjectUpdater):
|
||||
def process_object_update(self, update_path, device, policy):
|
||||
os.unlink(update_path)
|
||||
self.stats.successes += 1
|
||||
self.stats.unlinks += 1
|
||||
|
||||
logger = FakeLogger()
|
||||
ou = MockObjectUpdater({
|
||||
'devices': self.devices_dir,
|
||||
'mount_check': 'false',
|
||||
'swift_dir': self.testdir,
|
||||
'interval': '1',
|
||||
'concurrency': '1',
|
||||
'report_interval': '10.0',
|
||||
'node_timeout': '5'}, logger=logger)
|
||||
|
||||
now = [time()]
|
||||
|
||||
def mock_time_function():
|
||||
rv = now[0]
|
||||
now[0] += 5
|
||||
return rv
|
||||
|
||||
# With 10s between updates, time() advancing 5s every time we look,
|
||||
# and 5 async_pendings on disk, we should get at least two progress
|
||||
# lines.
|
||||
with mock.patch('swift.obj.updater.time',
|
||||
mock.MagicMock(time=mock_time_function)):
|
||||
ou.object_sweep(self.sda1)
|
||||
|
||||
info_lines = logger.get_lines_for_level('info')
|
||||
self.assertEqual(4, len(info_lines))
|
||||
self.assertIn("sweep starting", info_lines[0])
|
||||
self.assertIn(self.sda1, info_lines[0])
|
||||
|
||||
self.assertIn("sweep progress", info_lines[1])
|
||||
# the space ensures it's a positive number
|
||||
self.assertIn(
|
||||
"2 successes, 0 failures, 0 quarantines, 2 unlinks, 0 error",
|
||||
info_lines[1])
|
||||
self.assertIn(self.sda1, info_lines[1])
|
||||
|
||||
self.assertIn("sweep progress", info_lines[2])
|
||||
self.assertIn(
|
||||
"4 successes, 0 failures, 0 quarantines, 4 unlinks, 0 error",
|
||||
info_lines[2])
|
||||
self.assertIn(self.sda1, info_lines[2])
|
||||
|
||||
self.assertIn("sweep complete", info_lines[3])
|
||||
self.assertIn(
|
||||
"5 successes, 0 failures, 0 quarantines, 5 unlinks, 0 error",
|
||||
info_lines[3])
|
||||
self.assertIn(self.sda1, info_lines[3])
|
||||
|
||||
@mock.patch.object(object_updater, 'check_drive')
|
||||
def test_run_once_with_disk_unmounted(self, mock_check_drive):
|
||||
mock_check_drive.return_value = False
|
||||
|
@ -288,7 +355,7 @@ class TestObjectUpdater(unittest.TestCase):
|
|||
self.assertEqual([
|
||||
mock.call(self.devices_dir, 'sda1', True),
|
||||
], mock_check_drive.mock_calls)
|
||||
self.assertEqual(ou.logger.get_increment_counts(), {'errors': 1})
|
||||
self.assertEqual(ou.logger.get_increment_counts(), {})
|
||||
|
||||
@mock.patch.object(object_updater, 'check_drive')
|
||||
def test_run_once(self, mock_check_drive):
|
||||
|
|
|
@ -34,7 +34,7 @@ from six.moves import range
|
|||
import swift
|
||||
from swift.common import utils, swob, exceptions
|
||||
from swift.common.exceptions import ChunkWriteTimeout
|
||||
from swift.common.utils import Timestamp
|
||||
from swift.common.utils import Timestamp, list_from_csv
|
||||
from swift.proxy import server as proxy_server
|
||||
from swift.proxy.controllers import obj
|
||||
from swift.proxy.controllers.base import \
|
||||
|
@ -45,7 +45,7 @@ from swift.common.storage_policy import POLICIES, ECDriverError, \
|
|||
from test.unit import FakeRing, FakeMemcache, fake_http_connect, \
|
||||
debug_logger, patch_policies, SlowBody, FakeStatus, \
|
||||
DEFAULT_TEST_EC_TYPE, encode_frag_archive_bodies, make_ec_object_stub, \
|
||||
fake_ec_node_response, StubResponse
|
||||
fake_ec_node_response, StubResponse, mocked_http_conn
|
||||
from test.unit.proxy.test_server import node_error_count
|
||||
|
||||
|
||||
|
@ -452,6 +452,70 @@ class BaseObjectControllerMixin(object):
|
|||
resp = req.get_response(self.app)
|
||||
self.assertEqual(resp.status_int, 204)
|
||||
|
||||
def test_DELETE_limits_expirer_queue_updates(self):
|
||||
req = swift.common.swob.Request.blank('/v1/a/c/o', method='DELETE')
|
||||
codes = [204] * self.replicas()
|
||||
captured_headers = []
|
||||
|
||||
def capture_headers(ip, port, device, part, method, path,
|
||||
headers=None, **kwargs):
|
||||
captured_headers.append(headers)
|
||||
|
||||
with set_http_connect(*codes, give_connect=capture_headers):
|
||||
resp = req.get_response(self.app)
|
||||
self.assertEqual(resp.status_int, 204) # sanity check
|
||||
|
||||
counts = {True: 0, False: 0, None: 0}
|
||||
for headers in captured_headers:
|
||||
v = headers.get('X-Backend-Clean-Expiring-Object-Queue')
|
||||
norm_v = None if v is None else utils.config_true_value(v)
|
||||
counts[norm_v] += 1
|
||||
|
||||
max_queue_updates = 2
|
||||
o_replicas = self.replicas()
|
||||
self.assertEqual(counts, {
|
||||
True: min(max_queue_updates, o_replicas),
|
||||
False: max(o_replicas - max_queue_updates, 0),
|
||||
None: 0,
|
||||
})
|
||||
|
||||
def test_expirer_DELETE_suppresses_expirer_queue_updates(self):
|
||||
req = swift.common.swob.Request.blank(
|
||||
'/v1/a/c/o', method='DELETE', headers={
|
||||
'X-Backend-Clean-Expiring-Object-Queue': 'no'})
|
||||
codes = [204] * self.replicas()
|
||||
captured_headers = []
|
||||
|
||||
def capture_headers(ip, port, device, part, method, path,
|
||||
headers=None, **kwargs):
|
||||
captured_headers.append(headers)
|
||||
|
||||
with set_http_connect(*codes, give_connect=capture_headers):
|
||||
resp = req.get_response(self.app)
|
||||
self.assertEqual(resp.status_int, 204) # sanity check
|
||||
|
||||
counts = {True: 0, False: 0, None: 0}
|
||||
for headers in captured_headers:
|
||||
v = headers.get('X-Backend-Clean-Expiring-Object-Queue')
|
||||
norm_v = None if v is None else utils.config_true_value(v)
|
||||
counts[norm_v] += 1
|
||||
|
||||
o_replicas = self.replicas()
|
||||
self.assertEqual(counts, {
|
||||
True: 0,
|
||||
False: o_replicas,
|
||||
None: 0,
|
||||
})
|
||||
|
||||
# Make sure we're not sending any expirer-queue update headers here.
|
||||
# Since we're not updating the expirer queue, these headers would be
|
||||
# superfluous.
|
||||
for headers in captured_headers:
|
||||
self.assertNotIn('X-Delete-At-Container', headers)
|
||||
self.assertNotIn('X-Delete-At-Partition', headers)
|
||||
self.assertNotIn('X-Delete-At-Host', headers)
|
||||
self.assertNotIn('X-Delete-At-Device', headers)
|
||||
|
||||
def test_DELETE_write_affinity_before_replication(self):
|
||||
policy_conf = self.app.get_policy_options(self.policy)
|
||||
policy_conf.write_affinity_handoff_delete_count = self.replicas() / 2
|
||||
|
@ -482,6 +546,69 @@ class BaseObjectControllerMixin(object):
|
|||
|
||||
self.assertEqual(resp.status_int, 204)
|
||||
|
||||
def test_PUT_limits_expirer_queue_deletes(self):
|
||||
req = swift.common.swob.Request.blank(
|
||||
'/v1/a/c/o', method='PUT', body='',
|
||||
headers={'Content-Type': 'application/octet-stream'})
|
||||
codes = [201] * self.replicas()
|
||||
captured_headers = []
|
||||
|
||||
def capture_headers(ip, port, device, part, method, path,
|
||||
headers=None, **kwargs):
|
||||
captured_headers.append(headers)
|
||||
|
||||
expect_headers = {
|
||||
'X-Obj-Metadata-Footer': 'yes',
|
||||
'X-Obj-Multiphase-Commit': 'yes'
|
||||
}
|
||||
with set_http_connect(*codes, give_connect=capture_headers,
|
||||
expect_headers=expect_headers):
|
||||
resp = req.get_response(self.app)
|
||||
self.assertEqual(resp.status_int, 201) # sanity check
|
||||
|
||||
counts = {True: 0, False: 0, None: 0}
|
||||
for headers in captured_headers:
|
||||
v = headers.get('X-Backend-Clean-Expiring-Object-Queue')
|
||||
norm_v = None if v is None else utils.config_true_value(v)
|
||||
counts[norm_v] += 1
|
||||
|
||||
max_queue_updates = 2
|
||||
o_replicas = self.replicas()
|
||||
self.assertEqual(counts, {
|
||||
True: min(max_queue_updates, o_replicas),
|
||||
False: max(o_replicas - max_queue_updates, 0),
|
||||
None: 0,
|
||||
})
|
||||
|
||||
def test_POST_limits_expirer_queue_deletes(self):
|
||||
req = swift.common.swob.Request.blank(
|
||||
'/v1/a/c/o', method='POST', body='',
|
||||
headers={'Content-Type': 'application/octet-stream'})
|
||||
codes = [201] * self.replicas()
|
||||
captured_headers = []
|
||||
|
||||
def capture_headers(ip, port, device, part, method, path,
|
||||
headers=None, **kwargs):
|
||||
captured_headers.append(headers)
|
||||
|
||||
with set_http_connect(*codes, give_connect=capture_headers):
|
||||
resp = req.get_response(self.app)
|
||||
self.assertEqual(resp.status_int, 201) # sanity check
|
||||
|
||||
counts = {True: 0, False: 0, None: 0}
|
||||
for headers in captured_headers:
|
||||
v = headers.get('X-Backend-Clean-Expiring-Object-Queue')
|
||||
norm_v = None if v is None else utils.config_true_value(v)
|
||||
counts[norm_v] += 1
|
||||
|
||||
max_queue_updates = 2
|
||||
o_replicas = self.replicas()
|
||||
self.assertEqual(counts, {
|
||||
True: min(max_queue_updates, o_replicas),
|
||||
False: max(o_replicas - max_queue_updates, 0),
|
||||
None: 0,
|
||||
})
|
||||
|
||||
def test_POST_non_int_delete_after(self):
|
||||
t = str(int(time.time() + 100)) + '.1'
|
||||
req = swob.Request.blank('/v1/a/c/o', method='POST',
|
||||
|
@ -676,20 +803,131 @@ class BaseObjectControllerMixin(object):
|
|||
req, self.replicas(policy), 1, containers)
|
||||
|
||||
# how many of the backend headers have a container update
|
||||
container_updates = len(
|
||||
n_container_updates = len(
|
||||
[headers for headers in backend_headers
|
||||
if 'X-Container-Partition' in headers])
|
||||
|
||||
if num_containers <= self.quorum(policy):
|
||||
# filling case
|
||||
expected = min(self.quorum(policy) + 1,
|
||||
self.replicas(policy))
|
||||
else:
|
||||
# container updates >= object replicas
|
||||
expected = min(num_containers,
|
||||
self.replicas(policy))
|
||||
# how many object-server PUTs can fail and still let the
|
||||
# client PUT succeed
|
||||
n_can_fail = self.replicas(policy) - self.quorum(policy)
|
||||
n_expected_updates = (
|
||||
n_can_fail + utils.quorum_size(num_containers))
|
||||
|
||||
self.assertEqual(container_updates, expected)
|
||||
# you get at least one update per container no matter what
|
||||
n_expected_updates = max(
|
||||
n_expected_updates, num_containers)
|
||||
|
||||
# you can't have more object requests with updates than you
|
||||
# have object requests (the container stuff gets doubled up,
|
||||
# but that's not important for purposes of durability)
|
||||
n_expected_updates = min(
|
||||
n_expected_updates, self.replicas(policy))
|
||||
self.assertEqual(n_expected_updates, n_container_updates)
|
||||
|
||||
def test_delete_at_backend_requests(self):
|
||||
t = str(int(time.time() + 100))
|
||||
for policy in POLICIES:
|
||||
req = swift.common.swob.Request.blank(
|
||||
'/v1/a/c/o', method='PUT',
|
||||
headers={'Content-Length': '0',
|
||||
'X-Backend-Storage-Policy-Index': int(policy),
|
||||
'X-Delete-At': t})
|
||||
controller = self.controller_cls(self.app, 'a', 'c', 'o')
|
||||
|
||||
for num_del_at_nodes in range(1, 16):
|
||||
containers = [
|
||||
{'ip': '2.0.0.%s' % i, 'port': '70%s' % str(i).zfill(2),
|
||||
'device': 'sdc'} for i in range(num_del_at_nodes)]
|
||||
del_at_nodes = [
|
||||
{'ip': '1.0.0.%s' % i, 'port': '60%s' % str(i).zfill(2),
|
||||
'device': 'sdb'} for i in range(num_del_at_nodes)]
|
||||
|
||||
backend_headers = controller._backend_requests(
|
||||
req, self.replicas(policy), 1, containers,
|
||||
delete_at_container='dac', delete_at_partition=2,
|
||||
delete_at_nodes=del_at_nodes)
|
||||
|
||||
devices = []
|
||||
hosts = []
|
||||
part = ctr = 0
|
||||
for given_headers in backend_headers:
|
||||
self.assertEqual(given_headers.get('X-Delete-At'), t)
|
||||
if 'X-Delete-At-Partition' in given_headers:
|
||||
self.assertEqual(
|
||||
given_headers.get('X-Delete-At-Partition'), '2')
|
||||
part += 1
|
||||
if 'X-Delete-At-Container' in given_headers:
|
||||
self.assertEqual(
|
||||
given_headers.get('X-Delete-At-Container'), 'dac')
|
||||
ctr += 1
|
||||
devices += (
|
||||
list_from_csv(given_headers.get('X-Delete-At-Device')))
|
||||
hosts += (
|
||||
list_from_csv(given_headers.get('X-Delete-At-Host')))
|
||||
|
||||
# same as in test_container_update_backend_requests
|
||||
n_can_fail = self.replicas(policy) - self.quorum(policy)
|
||||
n_expected_updates = (
|
||||
n_can_fail + utils.quorum_size(num_del_at_nodes))
|
||||
|
||||
n_expected_hosts = max(
|
||||
n_expected_updates, num_del_at_nodes)
|
||||
|
||||
self.assertEqual(len(hosts), n_expected_hosts)
|
||||
self.assertEqual(len(devices), n_expected_hosts)
|
||||
|
||||
# parts don't get doubled up, maximum is count of obj requests
|
||||
n_expected_parts = min(
|
||||
n_expected_hosts, self.replicas(policy))
|
||||
self.assertEqual(part, n_expected_parts)
|
||||
self.assertEqual(ctr, n_expected_parts)
|
||||
|
||||
# check that hosts are correct
|
||||
self.assertEqual(
|
||||
set(hosts),
|
||||
set('%s:%s' % (h['ip'], h['port']) for h in del_at_nodes))
|
||||
self.assertEqual(set(devices), set(('sdb',)))
|
||||
|
||||
def test_smooth_distributed_backend_requests(self):
|
||||
t = str(int(time.time() + 100))
|
||||
for policy in POLICIES:
|
||||
req = swift.common.swob.Request.blank(
|
||||
'/v1/a/c/o', method='PUT',
|
||||
headers={'Content-Length': '0',
|
||||
'X-Backend-Storage-Policy-Index': int(policy),
|
||||
'X-Delete-At': t})
|
||||
controller = self.controller_cls(self.app, 'a', 'c', 'o')
|
||||
|
||||
for num_containers in range(1, 16):
|
||||
containers = [
|
||||
{'ip': '2.0.0.%s' % i, 'port': '70%s' % str(i).zfill(2),
|
||||
'device': 'sdc'} for i in range(num_containers)]
|
||||
del_at_nodes = [
|
||||
{'ip': '1.0.0.%s' % i, 'port': '60%s' % str(i).zfill(2),
|
||||
'device': 'sdb'} for i in range(num_containers)]
|
||||
|
||||
backend_headers = controller._backend_requests(
|
||||
req, self.replicas(policy), 1, containers,
|
||||
delete_at_container='dac', delete_at_partition=2,
|
||||
delete_at_nodes=del_at_nodes)
|
||||
|
||||
# caculate no of expected updates, see
|
||||
# test_container_update_backend_requests for explanation
|
||||
n_expected_updates = min(max(
|
||||
self.replicas(policy) - self.quorum(policy) +
|
||||
utils.quorum_size(num_containers), num_containers),
|
||||
self.replicas(policy))
|
||||
|
||||
# the first n_expected_updates servers should have received
|
||||
# a container update
|
||||
self.assertTrue(
|
||||
all([h.get('X-Container-Partition')
|
||||
for h in backend_headers[:n_expected_updates]]))
|
||||
# the last n_expected_updates servers should have received
|
||||
# the x-delete-at* headers
|
||||
self.assertTrue(
|
||||
all([h.get('X-Delete-At-Container')
|
||||
for h in backend_headers[-n_expected_updates:]]))
|
||||
|
||||
def _check_write_affinity(
|
||||
self, conf, policy_conf, policy, affinity_regions, affinity_count):
|
||||
|
@ -1580,6 +1818,52 @@ class TestReplicatedObjController(BaseObjectControllerMixin,
|
|||
resp = req.get_response(self.app)
|
||||
self.assertEqual(resp.status_int, 202)
|
||||
|
||||
def test_x_timestamp_not_overridden(self):
|
||||
def do_test(method, base_headers, resp_code):
|
||||
# no given x-timestamp
|
||||
req = swob.Request.blank(
|
||||
'/v1/a/c/o', method=method, headers=base_headers)
|
||||
codes = [resp_code] * self.replicas()
|
||||
with mocked_http_conn(*codes) as fake_conn:
|
||||
resp = req.get_response(self.app)
|
||||
self.assertEqual(resp.status_int, resp_code)
|
||||
self.assertEqual(self.replicas(), len(fake_conn.requests))
|
||||
for req in fake_conn.requests:
|
||||
self.assertIn('X-Timestamp', req['headers'])
|
||||
# check value can be parsed as valid timestamp
|
||||
Timestamp(req['headers']['X-Timestamp'])
|
||||
|
||||
# given x-timestamp is retained
|
||||
def do_check(ts):
|
||||
headers = dict(base_headers)
|
||||
headers['X-Timestamp'] = ts.internal
|
||||
req = swob.Request.blank(
|
||||
'/v1/a/c/o', method=method, headers=headers)
|
||||
codes = [resp_code] * self.replicas()
|
||||
with mocked_http_conn(*codes) as fake_conn:
|
||||
resp = req.get_response(self.app)
|
||||
self.assertEqual(resp.status_int, resp_code)
|
||||
self.assertEqual(self.replicas(), len(fake_conn.requests))
|
||||
for req in fake_conn.requests:
|
||||
self.assertEqual(ts.internal,
|
||||
req['headers']['X-Timestamp'])
|
||||
|
||||
do_check(Timestamp.now())
|
||||
do_check(Timestamp.now(offset=123))
|
||||
|
||||
# given x-timestamp gets sanity checked
|
||||
headers = dict(base_headers)
|
||||
headers['X-Timestamp'] = 'bad timestamp'
|
||||
req = swob.Request.blank(
|
||||
'/v1/a/c/o', method=method, headers=headers)
|
||||
with mocked_http_conn() as fake_conn:
|
||||
resp = req.get_response(self.app)
|
||||
self.assertEqual(resp.status_int, 400)
|
||||
self.assertIn('X-Timestamp should be a UNIX timestamp ', resp.body)
|
||||
|
||||
do_test('PUT', {'Content-Length': 0}, 200)
|
||||
do_test('DELETE', {}, 204)
|
||||
|
||||
|
||||
@patch_policies(
|
||||
[StoragePolicy(0, '1-replica', True),
|
||||
|
@ -4505,5 +4789,37 @@ class TestECDuplicationObjController(
|
|||
self._test_determine_chunk_destinations_prioritize(1, 0)
|
||||
|
||||
|
||||
class TestNumContainerUpdates(unittest.TestCase):
|
||||
def test_it(self):
|
||||
test_cases = [
|
||||
# (container replicas, object replicas, object quorum, expected)
|
||||
(3, 17, 13, 6), # EC 12+5
|
||||
(3, 9, 4, 7), # EC 3+6
|
||||
(3, 14, 11, 5), # EC 10+4
|
||||
(5, 14, 11, 6), # EC 10+4, 5 container replicas
|
||||
(7, 14, 11, 7), # EC 10+4, 7 container replicas
|
||||
(3, 19, 16, 5), # EC 15+4
|
||||
(5, 19, 16, 6), # EC 15+4, 5 container replicas
|
||||
(3, 28, 22, 8), # EC (10+4)x2
|
||||
(5, 28, 22, 9), # EC (10+4)x2, 5 container replicas
|
||||
(3, 1, 1, 3), # 1 object replica
|
||||
(3, 2, 1, 3), # 2 object replicas
|
||||
(3, 3, 2, 3), # 3 object replicas
|
||||
(3, 4, 2, 4), # 4 object replicas
|
||||
(3, 5, 3, 4), # 5 object replicas
|
||||
(3, 6, 3, 5), # 6 object replicas
|
||||
(3, 7, 4, 5), # 7 object replicas
|
||||
]
|
||||
|
||||
for c_replica, o_replica, o_quorum, exp in test_cases:
|
||||
c_quorum = utils.quorum_size(c_replica)
|
||||
got = obj.num_container_updates(c_replica, c_quorum,
|
||||
o_replica, o_quorum)
|
||||
self.assertEqual(
|
||||
exp, got,
|
||||
"Failed for c_replica=%d, o_replica=%d, o_quorum=%d" % (
|
||||
c_replica, o_replica, o_quorum))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
|
|
@ -62,6 +62,7 @@ from test.unit.helpers import setup_servers, teardown_servers
|
|||
from swift.proxy import server as proxy_server
|
||||
from swift.proxy.controllers.obj import ReplicatedObjectController
|
||||
from swift.obj import server as object_server
|
||||
from swift.common.bufferedhttp import BufferedHTTPResponse
|
||||
from swift.common.middleware import proxy_logging, versioned_writes, \
|
||||
copy, listing_formats
|
||||
from swift.common.middleware.acl import parse_acl, format_acl
|
||||
|
@ -5429,6 +5430,33 @@ class TestReplicatedObjectController(
|
|||
|
||||
self.assertEqual(
|
||||
seen_headers, [
|
||||
{'X-Container-Host': '10.0.0.0:1000',
|
||||
'X-Container-Partition': '0',
|
||||
'X-Container-Device': 'sda'},
|
||||
{'X-Container-Host': '10.0.0.1:1001',
|
||||
'X-Container-Partition': '0',
|
||||
'X-Container-Device': 'sdb'},
|
||||
{'X-Container-Host': None,
|
||||
'X-Container-Partition': None,
|
||||
'X-Container-Device': None}])
|
||||
|
||||
def test_PUT_x_container_headers_with_many_object_replicas(self):
|
||||
POLICIES[0].object_ring.set_replicas(11)
|
||||
|
||||
req = Request.blank('/v1/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
|
||||
headers={'Content-Length': '5'}, body='12345')
|
||||
controller = ReplicatedObjectController(
|
||||
self.app, 'a', 'c', 'o')
|
||||
seen_headers = self._gather_x_container_headers(
|
||||
controller.PUT, req,
|
||||
# HEAD HEAD PUT PUT PUT PUT PUT PUT PUT PUT PUT PUT PUT
|
||||
200, 200, 201, 201, 201, 201, 201, 201, 201, 201, 201, 201, 201)
|
||||
|
||||
self.assertEqual(
|
||||
sorted(seen_headers), sorted([
|
||||
{'X-Container-Host': '10.0.0.0:1000',
|
||||
'X-Container-Partition': '0',
|
||||
'X-Container-Device': 'sda'},
|
||||
{'X-Container-Host': '10.0.0.0:1000',
|
||||
'X-Container-Partition': '0',
|
||||
'X-Container-Device': 'sda'},
|
||||
|
@ -5437,7 +5465,29 @@ class TestReplicatedObjectController(
|
|||
'X-Container-Device': 'sda'},
|
||||
{'X-Container-Host': '10.0.0.1:1001',
|
||||
'X-Container-Partition': '0',
|
||||
'X-Container-Device': 'sdb'}])
|
||||
'X-Container-Device': 'sdb'},
|
||||
{'X-Container-Host': '10.0.0.1:1001',
|
||||
'X-Container-Partition': '0',
|
||||
'X-Container-Device': 'sdb'},
|
||||
{'X-Container-Host': '10.0.0.2:1002',
|
||||
'X-Container-Partition': '0',
|
||||
'X-Container-Device': 'sdc'},
|
||||
{'X-Container-Host': '10.0.0.2:1002',
|
||||
'X-Container-Partition': '0',
|
||||
'X-Container-Device': 'sdc'},
|
||||
{'X-Container-Host': None,
|
||||
'X-Container-Partition': None,
|
||||
'X-Container-Device': None},
|
||||
{'X-Container-Host': None,
|
||||
'X-Container-Partition': None,
|
||||
'X-Container-Device': None},
|
||||
{'X-Container-Host': None,
|
||||
'X-Container-Partition': None,
|
||||
'X-Container-Device': None},
|
||||
{'X-Container-Host': None,
|
||||
'X-Container-Partition': None,
|
||||
'X-Container-Device': None},
|
||||
]))
|
||||
|
||||
def test_PUT_x_container_headers_with_more_container_replicas(self):
|
||||
self.app.container_ring.set_replicas(4)
|
||||
|
@ -5540,9 +5590,9 @@ class TestReplicatedObjectController(
|
|||
'X-Delete-At-Partition': '0',
|
||||
'X-Delete-At-Device': 'sdb'},
|
||||
{'X-Delete-At-Host': None,
|
||||
'X-Delete-At-Container': None,
|
||||
'X-Delete-At-Partition': None,
|
||||
'X-Delete-At-Device': None}
|
||||
'X-Delete-At-Container': None,
|
||||
'X-Delete-At-Device': None},
|
||||
])
|
||||
|
||||
@mock.patch('time.time', new=lambda: STATIC_TIME)
|
||||
|
@ -7272,6 +7322,48 @@ class TestObjectECRangedGET(unittest.TestCase):
|
|||
self.assertIn('Content-Range', headers)
|
||||
self.assertEqual('bytes */%d' % obj_len, headers['Content-Range'])
|
||||
|
||||
def test_unsatisfiable_socket_leak(self):
|
||||
unclosed_http_responses = {}
|
||||
tracked_responses = [0]
|
||||
|
||||
class LeakTrackingHTTPResponse(BufferedHTTPResponse):
|
||||
def begin(self):
|
||||
# no super(); we inherit from an old-style class (it's
|
||||
# httplib's fault; don't try and fix it).
|
||||
retval = BufferedHTTPResponse.begin(self)
|
||||
if self.status != 204:
|
||||
# This mock is overly broad and catches account and
|
||||
# container HEAD requests too. We don't care about
|
||||
# those; it's the object GETs that were leaky.
|
||||
#
|
||||
# Unfortunately, we don't have access to the request
|
||||
# path here, so we use "status == 204" as a crude proxy
|
||||
# for "not an object response".
|
||||
unclosed_http_responses[id(self)] = self
|
||||
tracked_responses[0] += 1
|
||||
return retval
|
||||
|
||||
def close(self, *args, **kwargs):
|
||||
rv = BufferedHTTPResponse.close(self, *args, **kwargs)
|
||||
unclosed_http_responses.pop(id(self), None)
|
||||
return rv
|
||||
|
||||
def __repr__(self):
|
||||
swift_conn = getattr(self, 'swift_conn', None)
|
||||
method = getattr(swift_conn, '_method', '<unknown>')
|
||||
path = getattr(swift_conn, '_path', '<unknown>')
|
||||
return '%s<method=%r path=%r>' % (
|
||||
self.__class__.__name__, method, path)
|
||||
|
||||
obj_len = len(self.obj)
|
||||
with mock.patch('swift.common.bufferedhttp.BufferedHTTPConnection'
|
||||
'.response_class', LeakTrackingHTTPResponse):
|
||||
status, headers, _junk = self._get_obj(
|
||||
"bytes=%d-%d" % (obj_len, obj_len + 100))
|
||||
self.assertEqual(status, 416) # sanity check
|
||||
self.assertGreater(tracked_responses[0], 0) # ensure tracking happened
|
||||
self.assertEqual(unclosed_http_responses, {})
|
||||
|
||||
def test_off_end(self):
|
||||
# Ranged GET that's mostly off the end of the object, but overlaps
|
||||
# it in just the last byte
|
||||
|
@ -7837,7 +7929,7 @@ class TestContainerController(unittest.TestCase):
|
|||
# fail to retrieve account info
|
||||
test_status_map(
|
||||
(503, 503, 503), # account_info fails on 503
|
||||
404, missing_container=True)
|
||||
500, missing_container=True)
|
||||
# account fail after creation
|
||||
test_status_map(
|
||||
(404, 404, 404, # account_info fails on 404
|
||||
|
@ -7848,7 +7940,7 @@ class TestContainerController(unittest.TestCase):
|
|||
(503, 503, 404, # account_info fails on 404
|
||||
503, 503, 503, # PUT account
|
||||
503, 503, 404), # account_info fail
|
||||
404, missing_container=True)
|
||||
500, missing_container=True)
|
||||
# put fails
|
||||
test_status_map(
|
||||
(404, 404, 404, # account_info fails on 404
|
||||
|
|
11
tox.ini
11
tox.ini
|
@ -59,11 +59,6 @@ commands =
|
|||
basepython = python2.7
|
||||
commands = ./.functests {posargs}
|
||||
|
||||
[testenv:func-post-as-copy]
|
||||
commands = ./.functests {posargs}
|
||||
setenv = SWIFT_TEST_IN_PROCESS=1
|
||||
SWIFT_TEST_IN_PROCESS_OBJECT_POST_AS_COPY=True
|
||||
|
||||
[testenv:func-encryption]
|
||||
commands = ./.functests {posargs}
|
||||
setenv = SWIFT_TEST_IN_PROCESS=1
|
||||
|
@ -120,3 +115,9 @@ commands = bindep test
|
|||
|
||||
[testenv:releasenotes]
|
||||
commands = sphinx-build -a -W -E -d releasenotes/build/doctrees -b html releasenotes/source releasenotes/build/html
|
||||
|
||||
[testenv:func-post-as-copy]
|
||||
skip_install = True
|
||||
install_command = true {packages}
|
||||
commands = true
|
||||
whitelist_externals = true
|
||||
|
|
Loading…
Reference in New Issue