Merge remote-tracking branch 'origin/master' into feature/deep

Needed to add back the func-post-as-copy tox env (for some reason).

Change-Id: I28acef130372f06a0528a1981044761b994eee61
This commit is contained in:
Tim Burke 2018-01-22 18:01:53 +00:00
commit 3122895118
50 changed files with 2255 additions and 747 deletions

View File

@ -3,4 +3,5 @@ branch = True
omit = /usr*,setup.py,*egg*,.venv/*,.tox/*,test/*
[report]
show_missing = True
ignore_errors = True

1
.gitignore vendored
View File

@ -17,5 +17,6 @@ pycscope.*
MANIFEST
.testrepository/*
.stestr/*
subunit.log
test/probe/.noseids

View File

@ -58,23 +58,6 @@
parent: swift-tox-func
nodeset: centos-7
- job:
name: swift-tox-func-post-as-copy
parent: swift-tox-base
description: |
Run functional tests for swift under cPython version 2.7.
Uses tox with the ``func-post-as-copy`` environment.
It sets TMPDIR to an XFS mount point created via
tools/test-setup.sh.
vars:
tox_envlist: func-post-as-copy
- job:
name: swift-tox-func-post-as-copy-centos-7
parent: swift-tox-func-post-as-copy
nodeset: centos-7
- job:
name: swift-tox-func-encryption
parent: swift-tox-base
@ -117,7 +100,6 @@
- swift-tox-py27
- swift-tox-py35
- swift-tox-func
- swift-tox-func-post-as-copy
- swift-tox-func-encryption
- swift-tox-func-ec
gate:
@ -125,13 +107,11 @@
- swift-tox-py27
- swift-tox-py35
- swift-tox-func
- swift-tox-func-post-as-copy
- swift-tox-func-encryption
- swift-tox-func-ec
experimental:
jobs:
- swift-tox-py27-centos-7
- swift-tox-func-centos-7
- swift-tox-func-post-as-copy-centos-7
- swift-tox-func-encryption-centos-7
- swift-tox-func-ec-centos-7

View File

@ -16,7 +16,7 @@ and high concurrency. Swift is ideal for backups, web and mobile
content, and any other unstructured data that can grow without bound.
Swift provides a simple, REST-based API fully documented at
http://docs.openstack.org/.
https://docs.openstack.org/.
Swift was originally developed as the basis for Rackspace's Cloud Files
and was open-sourced in 2010 as part of the OpenStack project. It has
@ -118,7 +118,7 @@ Swift is a WSGI application and uses eventlet's WSGI server. After the
processes are running, the entry point for new requests is the
``Application`` class in ``swift/proxy/server.py``. From there, a
controller is chosen, and the request is processed. The proxy may choose
to forward the request to a back- end server. For example, the entry
to forward the request to a back-end server. For example, the entry
point for requests to the object server is the ``ObjectController``
class in ``swift/obj/server.py``.
@ -141,7 +141,7 @@ For Client Apps
---------------
For client applications, official Python language bindings are provided
at http://github.com/openstack/python-swiftclient.
at https://github.com/openstack/python-swiftclient.
Complete API documentation at
https://developer.openstack.org/api-ref/object-store/

View File

@ -692,18 +692,31 @@ X-Copy-From-Account:
type: string
X-Delete-After:
description: |
The number of seconds after which the system
removes the object. Internally, the Object Storage system stores
this value in the ``X-Delete-At`` metadata item.
The number of seconds after which the system removes the object. The value
should be a positive integer. Internally, the Object Storage system uses
this value to generate an ``X-Delete-At`` metadata item. If both
``X-Delete-After`` and ``X-Delete-At`` are set then ``X-Delete-After``
takes precedence.
in: header
required: false
type: integer
X-Delete-At:
description: |
The date and time in `UNIX Epoch time stamp
format <https://en.wikipedia.org/wiki/Unix_time>`_ when the system
removes the object. For example, ``1440619048`` is equivalent to
``Mon, Wed, 26 Aug 2015 19:57:28 GMT``.
The date and time in `UNIX Epoch time stamp format
<https://en.wikipedia.org/wiki/Unix_time>`_ when the system removes the
object. For example, ``1440619048`` is equivalent to ``Mon, Wed, 26 Aug
2015 19:57:28 GMT``. The value should be a positive integer corresponding
to a time in the future. If both ``X-Delete-After`` and ``X-Delete-At`` are
set then ``X-Delete-After`` takes precedence.
in: header
required: false
type: integer
X-Delete-At_resp:
description: |
If present, specifies date and time in `UNIX Epoch time stamp format
<https://en.wikipedia.org/wiki/Unix_time>`_ when the system removes the
object. For example, ``1440619048`` is equivalent to ``Mon, Wed, 26 Aug
2015 19:57:28 GMT``.
in: header
required: false
type: integer

View File

@ -130,7 +130,7 @@ Response Parameters
- X-Object-Meta-name: X-Object-Meta-name_resp
- Content-Disposition: Content-Disposition_resp
- Content-Encoding: Content-Encoding_resp
- X-Delete-At: X-Delete-At
- X-Delete-At: X-Delete-At_resp
- Accept-Ranges: Accept-Ranges
- X-Object-Manifest: X-Object-Manifest_resp
- Last-Modified: Last-Modified
@ -602,7 +602,7 @@ Response Parameters
- X-Object-Meta-name: X-Object-Meta-name
- Content-Disposition: Content-Disposition_resp
- Content-Encoding: Content-Encoding_resp
- X-Delete-At: X-Delete-At
- X-Delete-At: X-Delete-At_resp
- X-Object-Manifest: X-Object-Manifest_resp
- Last-Modified: Last-Modified
- ETag: ETag_obj_resp
@ -650,6 +650,12 @@ metadata on the request to copy the object, either PUT or COPY ,
the metadata overwrites any conflicting keys on the target (new)
object.
.. note::
While using COPY instead of POST allows sending only a subset of
the metadata, it carries the cost of reading and rewriting the entire
contents of the object.
A POST request deletes any existing custom metadata that you added
with a previous PUT or POST request. Consequently, you must specify
all custom metadata in the request. However, system metadata is
@ -752,9 +758,9 @@ Request
- X-Service-Token: X-Service-Token
- X-Object-Meta-name: X-Object-Meta-name
- X-Delete-At: X-Delete-At
- X-Delete-After: X-Delete-After
- Content-Disposition: Content-Disposition
- Content-Encoding: Content-Encoding
- X-Delete-After: X-Delete-After
- Content-Type: Content-Type_obj_cu_req
- X-Trans-Id-Extra: X-Trans-Id-Extra

View File

@ -20,9 +20,8 @@ import os
import sys
from gettext import gettext as _
from six.moves.configparser import ConfigParser
from swift.common.utils import get_logger, dump_recon_cache
from swift.common.utils import get_logger, dump_recon_cache, readconf
from swift.obj.diskfile import ASYNCDIR_BASE
@ -46,17 +45,13 @@ def get_async_count(device_dir, logger):
def main():
c = ConfigParser()
try:
conf_path = sys.argv[1]
except Exception:
print("Usage: %s CONF_FILE" % sys.argv[0].split('/')[-1])
print("ex: swift-recon-cron /etc/swift/object-server.conf")
sys.exit(1)
if not c.read(conf_path):
print("Unable to read config file %s" % conf_path)
sys.exit(1)
conf = dict(c.items('filter:recon'))
conf = readconf(conf_path, 'filter:recon')
device_dir = conf.get('devices', '/srv/node')
recon_cache_path = conf.get('recon_cache_path', '/var/cache/swift')
recon_lock_path = conf.get('recon_lock_path', '/var/lock')

View File

@ -194,12 +194,10 @@ This is normally \fBegg:swift#proxy_logging\fR. See proxy-server.conf-sample for
.PD
.SH ADDITIONAL SECTIONS
.SH OBJECT EXPIRER SECTION
.PD 1
.RS 0
The following sections are used by other swift-account services, such as replicator,
auditor and reaper.
.IP "\fB[account-replicator]\fR"
.IP "\fB[object-expirer]\fR"
.RE
.RS 3
.IP \fBinterval\fR
@ -210,6 +208,9 @@ The default is ".".
The default is 'expiring_objects'.
.IP \fBreport_interval\fR
The default is 300 seconds.
.IP \fBrequest_tries\fR
The number of times the expirer's internal client will
attempt any given request in the event of failure. The default is 3.
.IP \fBconcurrency\fR
Number of expirer workers to spawn. The default is 1.
.IP \fBprocesses\fR
@ -227,6 +228,21 @@ up to reclaim_age seconds before it gives up and deletes the entry in the
queue. The default is 604800 seconds.
.IP \fBrecon_cache_path\fR
Path to recon cache directory. The default is /var/cache/swift.
.IP \fBnice_priority\fR
Modify scheduling priority of server processes. Niceness values range from -20
(most favorable to the process) to 19 (least favorable to the process).
The default does not modify priority.
.IP \fBionice_class\fR
Modify I/O scheduling class of server processes. I/O niceness class values
are IOPRIO_CLASS_RT (realtime), IOPRIO_CLASS_BE (best-effort) and IOPRIO_CLASS_IDLE (idle).
The default does not modify class and priority.
Work only with ionice_priority.
.IP \fBionice_priority\fR
Modify I/O scheduling priority of server processes. I/O niceness priority
is a number which goes from 0 to 7. The higher the value, the lower
the I/O priority of the process. Work only with ionice_class.
Ignored if IOPRIO_CLASS_IDLE is set.
.RE
.PD

View File

@ -5,7 +5,7 @@ Object Storage monitoring
.. note::
This section was excerpted from a `blog post by Darrell
Bishop <http://swiftstack.com/blog/2012/04/11/swift-monitoring-with-statsd>`_ and
Bishop <https://swiftstack.com/blog/2012/04/11/swift-monitoring-with-statsd>`_ and
has since been edited.
An OpenStack Object Storage cluster is a collection of many daemons that
@ -123,10 +123,8 @@ actual number when flushing meters upstream.
To avoid the problems inherent with middleware-based monitoring and
after-the-fact log processing, the sending of StatsD meters is
integrated into Object Storage itself. The submitted change set (see
`<https://review.openstack.org/#change,6058>`_) currently reports 124 meters
across 15 Object Storage daemons and the tempauth middleware. Details of
the meters tracked are in the :doc:`/admin_guide`.
integrated into Object Storage itself. Details of the meters tracked
are in the :doc:`/admin_guide`.
The sending of meters is integrated with the logging framework. To
enable, configure ``log_statsd_host`` in the relevant config file. You

View File

@ -120,25 +120,24 @@ out were you need to add capacity or to help tune an :ref:`ring_overload` value.
Now let's take an example with 1 region, 3 zones and 4 devices. Each device has
the same weight, and the ``dispersion --verbose`` might show the following::
Dispersion is 50.000000, Balance is 0.000000, Overload is 0.00%
Dispersion is 16.666667, Balance is 0.000000, Overload is 0.00%
Required overload is 33.333333%
Worst tier is 50.000000 (r1z3)
Worst tier is 33.333333 (r1z3)
--------------------------------------------------------------------------
Tier Parts % Max 0 1 2 3
--------------------------------------------------------------------------
r1 256 0.00 3 0 0 0 256
r1 768 0.00 3 0 0 0 256
r1z1 192 0.00 1 64 192 0 0
r1z1-127.0.0.1 192 0.00 1 64 192 0 0
r1z1-127.0.0.1/sda 192 0.00 1 64 192 0 0
r1z2 192 0.00 1 64 192 0 0
r1z2-127.0.0.2 192 0.00 1 64 192 0 0
r1z2-127.0.0.2/sda 192 0.00 1 64 192 0 0
r1z3 256 50.00 1 0 128 128 0
r1z3-127.0.0.3 256 50.00 1 0 128 128 0
r1z3 384 33.33 1 0 128 128 0
r1z3-127.0.0.3 384 33.33 1 0 128 128 0
r1z3-127.0.0.3/sda 192 0.00 1 64 192 0 0
r1z3-127.0.0.3/sdb 192 0.00 1 64 192 0 0
The first line reports that there are 256 partitions with 3 copies in region 1;
and this is an expected output in this case (single region with 3 replicas) as
reported by the "Max" value.
@ -818,7 +817,7 @@ resolves to an IPv4 address, an IPv4 socket will be used to send StatsD UDP
packets, even if the hostname would also resolve to an IPv6 address.
.. _StatsD: http://codeascraft.etsy.com/2011/02/15/measure-anything-measure-everything/
.. _Graphite: http://graphite.wikidot.com/
.. _Graphite: http://graphiteapp.org/
.. _Ganglia: http://ganglia.sourceforge.net/
The sample rate is a real number between 0 and 1 which defines the

View File

@ -171,14 +171,14 @@ The API Reference describes the operations that you can perform with the
Object Storage API:
- `Storage
accounts <https://developer.openstack.org/api-ref/object-storage/index.html#accounts>`__:
accounts <https://developer.openstack.org/api-ref/object-store/index.html#accounts>`__:
Use to perform account-level tasks.
Lists containers for a specified account. Creates, updates, and
deletes account metadata. Shows account metadata.
- `Storage
containers <https://developer.openstack.org/api-ref/object-storage/index.html#containers>`__:
containers <https://developer.openstack.org/api-ref/object-store/index.html#containers>`__:
Use to perform container-level tasks.
Lists objects in a specified container. Creates, shows details for,
@ -186,7 +186,7 @@ Object Storage API:
container metadata.
- `Storage
objects <https://developer.openstack.org/api-ref/object-storage/index.html#objects>`__:
objects <https://developer.openstack.org/api-ref/object-store/index.html#objects>`__:
Use to perform object-level tasks.
Creates, replaces, shows details for, and deletes objects. Copies

View File

@ -57,7 +57,7 @@ or ISO 8601 UTC timestamp. For example, ``1390852007`` or
``Mon, 27 Jan 2014 19:46:47 GMT``.
For more information, see `Epoch & Unix Timestamp Conversion
Tools <http://www.epochconverter.com/>`__.
Tools <https://www.epochconverter.com/>`__.
**filename**: Optional. Overrides the default file name. Object Storage
generates a default file name for **GET** temporary URLs that is based on the

View File

@ -10,20 +10,20 @@ Application Bindings
* OpenStack supported binding:
* `Python-SwiftClient <http://pypi.python.org/pypi/python-swiftclient>`_
* `Python-SwiftClient <https://pypi.python.org/pypi/python-swiftclient>`_
* Unofficial libraries and bindings:
* `PHP-opencloud <http://php-opencloud.com>`_ - Official Rackspace PHP bindings that should work for other Swift deployments too.
* `PyRAX <https://github.com/rackspace/pyrax>`_ - Official Rackspace Python bindings for CloudFiles that should work for other Swift deployments too.
* `PyRAX <https://github.com/pycontribs/pyrax>`_ - Official Rackspace Python bindings for CloudFiles that should work for other Swift deployments too.
* `openstack.net <https://github.com/rackspace/openstack.net/>`_ - Official Rackspace .NET bindings that should work for other Swift deployments too.
* `RSwift <https://github.com/pandemicsyn/RSwift>`_ - R API bindings.
* `Go language bindings <https://github.com/ncw/swift>`_
* `supload <https://github.com/selectel/supload>`_ - Bash script to upload file to cloud storage based on OpenStack Swift API.
* `libcloud <http://libcloud.apache.org>`_ - Apache Libcloud - a unified interface in Python for different clouds with OpenStack Swift support.
* `SwiftBox <https://github.com/suniln/SwiftBox>`_ - C# library using RestSharp
* `jclouds <http://jclouds.incubator.apache.org/documentation/quickstart/openstack/>`_ - Java library offering bindings for all OpenStack projects
* `java-openstack-swift <https://github.com/dkocher/java-openstack-swift>`_ - Java bindings for OpenStack Swift
* `jclouds <http://jclouds.apache.org/guides/openstack/>`_ - Java library offering bindings for all OpenStack projects
* `java-openstack-swift <https://github.com/iterate-ch/java-openstack-swift>`_ - Java bindings for OpenStack Swift
* `swift_client <https://github.com/mrkamel/swift_client>`_ - Small but powerful Ruby client to interact with OpenStack Swift
* `nightcrawler_swift <https://github.com/tulios/nightcrawler_swift>`_ - This Ruby gem teleports your assets to a OpenStack Swift bucket/container
* `swift storage <https://rubygems.org/gems/swift-storage>`_ - Simple OpenStack Swift storage client.
@ -94,13 +94,13 @@ Storage Backends (DiskFile API implementations)
Developer Tools
---------------
* `SAIO bash scripts <https://github.com/ntata/swift-setup-scripts.git>`_ -
* `SAIO bash scripts <https://github.com/ntata/swift-setup-scripts>`_ -
Well commented simple bash scripts for Swift all in one setup.
* `vagrant-swift-all-in-one
<https://github.com/swiftstack/vagrant-swift-all-in-one>`_ - Quickly setup a
standard development environment using Vagrant and Chef cookbooks in an
Ubuntu virtual machine.
* `SAIO Ansible playbook <https://github.com/thiagodasilva/swift-aio>`_ -
* `SAIO Ansible playbook <https://github.com/thiagodasilva/ansible-saio>`_ -
Quickly setup a standard development environment using Vagrant and Ansible in
a Fedora virtual machine (with built-in `Swift-on-File
<https://github.com/openstack/swiftonfile>`_ support).
@ -111,11 +111,11 @@ Other
* `Glance <https://github.com/openstack/glance>`_ - Provides services for discovering, registering, and retrieving virtual machine images (for OpenStack Compute [Nova], for example).
* `Better Staticweb <https://github.com/CloudVPS/better-staticweb>`_ - Makes swift containers accessible by default.
* `Django Swiftbrowser <https://github.com/cschwede/django-swiftbrowser>`_ - Simple Django web app to access OpenStack Swift.
* `Swift-account-stats <https://github.com/enovance/swift-account-stats>`_ - Swift-account-stats is a tool to report statistics on Swift usage at tenant and global levels.
* `Swift-account-stats <https://github.com/redhat-cip/swift-account-stats>`_ - Swift-account-stats is a tool to report statistics on Swift usage at tenant and global levels.
* `PyECLib <https://github.com/openstack/pyeclib>`_ - High Level Erasure Code library used by Swift
* `liberasurecode <https://github.com/openstack/liberasurecode>`_ - Low Level Erasure Code library used by PyECLib
* `Swift Browser <https://github.com/zerovm/swift-browser>`_ - JavaScript interface for Swift
* `Swift Browser <https://github.com/mgeisler/swift-browser>`_ - JavaScript interface for Swift
* `swift-ui <https://github.com/fanatic/swift-ui>`_ - OpenStack Swift web browser
* `Swift Durability Calculator <https://github.com/enovance/swift-durability-calculator>`_ - Data Durability Calculation Tool for Swift
* `Swift Durability Calculator <https://github.com/redhat-cip/swift-durability-calculator>`_ - Data Durability Calculation Tool for Swift
* `swiftbackmeup <https://github.com/redhat-cip/swiftbackmeup>`_ - Utility that allows one to create backups and upload them to OpenStack Swift
* `Multi Swift <https://github.com/ntata/multi-swift-POC>`_ - Bash scripts to spin up multiple Swift clusters sharing the same hardware

View File

@ -909,6 +909,8 @@ objects_per_second 50 Maximum objects updated per second.
system specs. 0 is unlimited.
slowdown 0.01 Time in seconds to wait between objects.
Deprecated in favor of objects_per_second.
report_interval 300 Interval in seconds between logging
statistics about the current update pass.
recon_cache_path /var/cache/swift Path to recon cache
nice_priority None Scheduling priority of server processes.
Niceness values range from -20 (most

View File

@ -217,7 +217,7 @@ do the following::
To persist this, edit and add the following to ``/etc/fstab``::
/home/swift/xfs_file /tmp xfs rw,noatime,nodiratime,attr2,inode64,noquota 0 0
/home/<your-user-name>/xfs_file /tmp xfs rw,noatime,nodiratime,attr2,inode64,noquota 0 0
----------------
Getting the code

View File

@ -11,9 +11,12 @@ object from the system.
The ``X-Delete-At`` header takes a Unix Epoch timestamp, in integer form; for
example: ``1317070737`` represents ``Mon Sep 26 20:58:57 2011 UTC``.
The ``X-Delete-After`` header takes an integer number of seconds. The proxy
server that receives the request will convert this header into an
``X-Delete-At`` header using its current time plus the value given.
The ``X-Delete-After`` header takes a positive integer number of seconds. The
proxy server that receives the request will convert this header into an
``X-Delete-At`` header using the request timestamp plus the value given.
If both the ``X-Delete-At`` and ``X-Delete-After`` headers are sent with a
request then the ``X-Delete-After`` header will take precedence.
As expiring objects are added to the system, the object servers will record the
expirations in a hidden ``.expiring_objects`` account for the

View File

@ -51,11 +51,10 @@
# Instead of the project name, the project id may also be used.
# project_id = changeme
# The Keystone URL to authenticate to. The value of auth_url may be
# The Keystone URL to authenticate to. The value of auth_endpoint may be
# set according to the value of auth_uri in [filter:authtoken] in
# proxy-server.conf. Currently, the only supported version of the Identity API
# is v3, which requires that the url end in "/v3".
# auth_endpoint = http://keystonehost:5000/v3
# proxy-server.conf.
# auth_endpoint = http://keystonehost/identity
# The project and user domain names may optionally be specified. If they are
# not specified, the default values of 'Default' (for *_domain_name) and

View File

@ -42,25 +42,34 @@
# auto_create_account_prefix = .
# expiring_objects_account_name = expiring_objects
# report_interval = 300
# concurrency is the level of concurrency o use to do the work, this value
#
# request_tries is the number of times the expirer's internal client will
# attempt any given request in the event of failure. The default is 3.
# request_tries = 3
# concurrency is the level of concurrency to use to do the work, this value
# must be set to at least 1
# concurrency = 1
#
# processes is how many parts to divide the work into, one part per process
# that will be doing the work
# that will be doing the work
# processes set 0 means that a single process will be doing all the work
# processes can also be specified on the command line and will override the
# config value
# config value
# processes = 0
#
# process is which of the parts a particular process will work on
# process can also be specified on the command line and will override the config
# value
# value
# process is "zero based", if you want to use 3 processes, you should run
# processes with process set to 0, 1, and 2
# processes with process set to 0, 1, and 2
# process = 0
#
# The expirer will re-attempt expiring if the source object is not available
# up to reclaim_age seconds before it gives up and deletes the entry in the
# queue.
# reclaim_age = 604800
#
# recon_cache_path = /var/cache/swift
#
# You can set scheduling priority of processes. Niceness values range from -20
@ -74,6 +83,11 @@
# ionice_class =
# ionice_priority =
#
# The following sections define the configuration of the expirer's internal
# client pipeline
#
[pipeline:main]
pipeline = catch_errors proxy-logging cache proxy-server

View File

@ -372,6 +372,12 @@ use = egg:swift#recon
# objects_per_second instead.
# slowdown = 0.01
#
# Log stats (at INFO level) every report_interval seconds. This
# logging is per-process, so with concurrency > 1, the logs will
# contain one stats log per worker process every report_interval
# seconds.
# report_interval = 300
#
# recon_cache_path = /var/cache/swift
#
# You can set scheduling priority of processes. Niceness values range from -20

View File

@ -66,7 +66,7 @@ scripts =
[extras]
kms_keymaster =
oslo.config>=4.0.0,!=4.3.0,!=4.4.0 # Apache-2.0
castellan>=0.7.0 # Apache-2.0
castellan>=0.13.0 # Apache-2.0
[entry_points]
paste.app_factory =

View File

@ -16,6 +16,7 @@
from __future__ import print_function
import logging
from collections import defaultdict
from errno import EEXIST
from itertools import islice
from operator import itemgetter
@ -471,18 +472,18 @@ swift-ring-builder <builder_file>
builder_id = "(not assigned)"
print('%s, build version %d, id %s' %
(builder_file, builder.version, builder_id))
regions = 0
zones = 0
balance = 0
dev_count = 0
if builder.devs:
regions = len(set(d['region'] for d in builder.devs
if d is not None))
zones = len(set((d['region'], d['zone']) for d in builder.devs
if d is not None))
dev_count = len([dev for dev in builder.devs
if dev is not None])
ring_empty_error = None
regions = len(set(d['region'] for d in builder.devs
if d is not None))
zones = len(set((d['region'], d['zone']) for d in builder.devs
if d is not None))
dev_count = len([dev for dev in builder.devs
if dev is not None])
try:
balance = builder.get_balance()
except exceptions.EmptyRingError as e:
ring_empty_error = str(e)
dispersion_trailer = '' if builder.dispersion is None else (
', %.02f dispersion' % (builder.dispersion))
print('%d partitions, %.6f replicas, %d regions, %d zones, '
@ -514,16 +515,18 @@ swift-ring-builder <builder_file>
else:
print('Ring file %s is obsolete' % ring_file)
if builder.devs:
if ring_empty_error:
balance_per_dev = defaultdict(int)
else:
balance_per_dev = builder._build_balance_per_dev()
header_line, print_dev_f = _make_display_device_table(builder)
print(header_line)
for dev in sorted(
builder._iter_devs(),
key=lambda x: (x['region'], x['zone'], x['ip'], x['device'])
):
flags = 'DEL' if dev in builder._remove_devs else ''
print_dev_f(dev, balance_per_dev[dev['id']], flags)
header_line, print_dev_f = _make_display_device_table(builder)
print(header_line)
for dev in sorted(
builder._iter_devs(),
key=lambda x: (x['region'], x['zone'], x['ip'], x['device'])
):
flags = 'DEL' if dev in builder._remove_devs else ''
print_dev_f(dev, balance_per_dev[dev['id']], flags)
# Print some helpful info if partition power increase in progress
if (builder.next_part_power and
@ -542,6 +545,8 @@ swift-ring-builder <builder_file>
print('Run "swift-object-relinker cleanup" on all nodes before '
'moving on to finish_increase_partition_power.')
if ring_empty_error:
print(ring_empty_error)
exit(EXIT_SUCCESS)
@staticmethod
@ -938,7 +943,8 @@ swift-ring-builder <builder_file> rebalance [options]
balance_changed = (
abs(last_balance - balance) >= 1 or
(last_balance == MAX_BALANCE and balance == MAX_BALANCE))
dispersion_changed = abs(last_dispersion - dispersion) >= 1
dispersion_changed = last_dispersion is None or (
abs(last_dispersion - dispersion) >= 1)
if balance_changed or dispersion_changed:
be_cowardly = False
@ -997,6 +1003,7 @@ swift-ring-builder <builder_file> dispersion <search_filter> [options]
Output report on dispersion.
--recalculate option will rebuild cached dispersion info and save builder
--verbose option will display dispersion graph broken down by tier
You can filter which tiers are evaluated to drill down using a regex
@ -1035,6 +1042,8 @@ swift-ring-builder <builder_file> dispersion <search_filter> [options]
exit(EXIT_ERROR)
usage = Commands.dispersion.__doc__.strip()
parser = optparse.OptionParser(usage)
parser.add_option('--recalculate', action='store_true',
help='Rebuild cached dispersion info and save')
parser.add_option('-v', '--verbose', action='store_true',
help='Display dispersion report for tiers')
options, args = parser.parse_args(argv)
@ -1042,8 +1051,13 @@ swift-ring-builder <builder_file> dispersion <search_filter> [options]
search_filter = args[3]
else:
search_filter = None
orig_version = builder.version
report = dispersion_report(builder, search_filter=search_filter,
verbose=options.verbose)
verbose=options.verbose,
recalculate=options.recalculate)
if builder.version != orig_version:
# we've already done the work, better go ahead and save it!
builder.save(builder_file)
print('Dispersion is %.06f, Balance is %.06f, Overload is %0.2f%%' % (
builder.dispersion, builder.get_balance(), builder.overload * 100))
print('Required overload is %.6f%%' % (

View File

@ -16,7 +16,6 @@
import functools
import os
from os.path import isdir # tighter scoped import for mocking
import time
import six
from six.moves.configparser import ConfigParser, NoSectionError, NoOptionError
@ -303,15 +302,19 @@ def valid_timestamp(request):
def check_delete_headers(request):
"""
Validate if 'x-delete' headers are have correct values
values should be positive integers and correspond to
a time in the future.
Check that 'x-delete-after' and 'x-delete-at' headers have valid values.
Values should be positive integers and correspond to a time greater than
the request timestamp.
If the 'x-delete-after' header is found then its value is used to compute
an 'x-delete-at' value which takes precedence over any existing
'x-delete-at' header.
:param request: the swob request object
:returns: HTTPBadRequest in case of invalid values
or None if values are ok
:raises: HTTPBadRequest in case of invalid values
:returns: the swob request object
"""
now = float(valid_timestamp(request))
if 'x-delete-after' in request.headers:
try:
x_delete_after = int(request.headers['x-delete-after'])
@ -319,13 +322,14 @@ def check_delete_headers(request):
raise HTTPBadRequest(request=request,
content_type='text/plain',
body='Non-integer X-Delete-After')
actual_del_time = time.time() + x_delete_after
if actual_del_time < time.time():
actual_del_time = utils.normalize_delete_at_timestamp(
now + x_delete_after)
if int(actual_del_time) <= now:
raise HTTPBadRequest(request=request,
content_type='text/plain',
body='X-Delete-After in past')
request.headers['x-delete-at'] = utils.normalize_delete_at_timestamp(
actual_del_time)
request.headers['x-delete-at'] = actual_del_time
del request.headers['x-delete-after']
if 'x-delete-at' in request.headers:
try:
@ -335,7 +339,7 @@ def check_delete_headers(request):
raise HTTPBadRequest(request=request, content_type='text/plain',
body='Non-integer X-Delete-At')
if x_delete_at < time.time() and not utils.config_true_value(
if x_delete_at <= now and not utils.config_true_value(
request.headers.get('x-backend-replication', 'f')):
raise HTTPBadRequest(request=request, content_type='text/plain',
body='X-Delete-At in past')

View File

@ -22,14 +22,15 @@ from six.moves import urllib
import struct
from sys import exc_info, exit
import zlib
from swift import gettext_ as _
from time import gmtime, strftime, time
from zlib import compressobj
from swift.common.exceptions import ClientException
from swift.common.http import HTTP_NOT_FOUND, HTTP_MULTIPLE_CHOICES
from swift.common.http import HTTP_NOT_FOUND, HTTP_MULTIPLE_CHOICES, \
HTTP_CONFLICT
from swift.common.swob import Request
from swift.common.utils import quote
from swift.common.utils import quote, closing_if_possible, \
server_handled_successfully
from swift.common.wsgi import loadapp, pipeline_property
if six.PY3:
@ -192,17 +193,38 @@ class InternalClient(object):
req.params = params
try:
resp = req.get_response(self.app)
except (Exception, Timeout):
exc_type, exc_value, exc_traceback = exc_info()
else:
if resp.status_int in acceptable_statuses or \
resp.status_int // 100 in acceptable_statuses:
return resp
except (Exception, Timeout):
exc_type, exc_value, exc_traceback = exc_info()
elif server_handled_successfully(resp.status_int):
# No sense retrying when we expect the same result
break
elif resp.status_int == HTTP_CONFLICT and 'x-timestamp' in [
header.lower() for header in headers]:
# Since the caller provided the timestamp, retrying won't
# change the result
break
# sleep only between tries, not after each one
if attempt < self.request_tries - 1:
if resp:
# always close any resp.app_iter before we discard it
with closing_if_possible(resp.app_iter):
# for non 2XX requests it's safe and useful to drain
# the response body so we log the correct status code
if resp.status_int // 100 != 2:
for iter_body in resp.app_iter:
pass
sleep(2 ** (attempt + 1))
if resp:
raise UnexpectedResponse(
_('Unexpected response: %s') % resp.status, resp)
msg = 'Unexpected response: %s' % resp.status
if resp.status_int // 100 != 2 and resp.body:
# provide additional context (and drain the response body) for
# non 2XX responses
msg += ' (%s)' % resp.body
raise UnexpectedResponse(msg, resp)
if exc_type:
# To make pep8 tool happy, in place of raise t, v, tb:
six.reraise(exc_type(*exc_value.args), None, exc_traceback)
@ -241,7 +263,7 @@ class InternalClient(object):
return metadata
def _iter_items(
self, path, marker='', end_marker='',
self, path, marker='', end_marker='', prefix='',
acceptable_statuses=(2, HTTP_NOT_FOUND)):
"""
Returns an iterator of items from a json listing. Assumes listing has
@ -251,6 +273,7 @@ class InternalClient(object):
:param marker: Prefix of first desired item, defaults to ''.
:param end_marker: Last item returned will be 'less' than this,
defaults to ''.
:param prefix: Prefix of items
:param acceptable_statuses: List of status for valid responses,
defaults to (2, HTTP_NOT_FOUND).
@ -262,8 +285,8 @@ class InternalClient(object):
while True:
resp = self.make_request(
'GET', '%s?format=json&marker=%s&end_marker=%s' %
(path, quote(marker), quote(end_marker)),
'GET', '%s?format=json&marker=%s&end_marker=%s&prefix=%s' %
(path, quote(marker), quote(end_marker), quote(prefix)),
{}, acceptable_statuses)
if not resp.status_int == 200:
if resp.status_int >= HTTP_MULTIPLE_CHOICES:
@ -331,7 +354,7 @@ class InternalClient(object):
# account methods
def iter_containers(
self, account, marker='', end_marker='',
self, account, marker='', end_marker='', prefix='',
acceptable_statuses=(2, HTTP_NOT_FOUND)):
"""
Returns an iterator of containers dicts from an account.
@ -340,6 +363,7 @@ class InternalClient(object):
:param marker: Prefix of first desired item, defaults to ''.
:param end_marker: Last item returned will be 'less' than this,
defaults to ''.
:param prefix: Prefix of containers
:param acceptable_statuses: List of status for valid responses,
defaults to (2, HTTP_NOT_FOUND).
@ -350,7 +374,8 @@ class InternalClient(object):
"""
path = self.make_path(account)
return self._iter_items(path, marker, end_marker, acceptable_statuses)
return self._iter_items(path, marker, end_marker, prefix,
acceptable_statuses)
def get_account_info(
self, account, acceptable_statuses=(2, HTTP_NOT_FOUND)):
@ -508,7 +533,7 @@ class InternalClient(object):
return self._get_metadata(path, metadata_prefix, acceptable_statuses)
def iter_objects(
self, account, container, marker='', end_marker='',
self, account, container, marker='', end_marker='', prefix='',
acceptable_statuses=(2, HTTP_NOT_FOUND)):
"""
Returns an iterator of object dicts from a container.
@ -518,6 +543,7 @@ class InternalClient(object):
:param marker: Prefix of first desired item, defaults to ''.
:param end_marker: Last item returned will be 'less' than this,
defaults to ''.
:param prefix: Prefix of objects
:param acceptable_statuses: List of status for valid responses,
defaults to (2, HTTP_NOT_FOUND).
@ -528,7 +554,8 @@ class InternalClient(object):
"""
path = self.make_path(account, container)
return self._iter_items(path, marker, end_marker, acceptable_statuses)
return self._iter_items(path, marker, end_marker, prefix,
acceptable_statuses)
def set_container_metadata(
self, account, container, metadata, metadata_prefix='',

View File

@ -59,6 +59,7 @@ class KmsKeyMaster(KeyMaster):
set(keymaster_opts).intersection(conf))))
conf = readconf(self.keymaster_config_path, 'kms_keymaster')
ctxt = keystone_password.KeystonePassword(
auth_url=conf.get('auth_endpoint'),
username=conf.get('username'),
password=conf.get('password'),
project_name=conf.get('project_name'),

View File

@ -876,8 +876,6 @@ class SloGetContext(WSGIContext):
headers=response_headers,
conditional_response=True,
app_iter=segmented_iter)
if req.range:
response.headers.pop('Etag')
return response

View File

@ -35,7 +35,7 @@ from time import time
from swift.common import exceptions
from swift.common.ring import RingData
from swift.common.ring.utils import tiers_for_dev, build_tier_tree, \
validate_and_normalize_address, pretty_dev
validate_and_normalize_address, validate_replicas_by_tier, pretty_dev
# we can't store None's in the replica2part2dev array, so we high-jack
# the max value for magic to represent the part is not currently
@ -247,7 +247,11 @@ class RingBuilder(object):
self.version = builder.version
self._replica2part2dev = builder._replica2part2dev
self._last_part_moves_epoch = builder._last_part_moves_epoch
self._last_part_moves = builder._last_part_moves
if builder._last_part_moves is None:
self._last_part_moves = array(
'B', itertools.repeat(0, self.parts))
else:
self._last_part_moves = builder._last_part_moves
self._last_part_gather_start = builder._last_part_gather_start
self._remove_devs = builder._remove_devs
self._id = getattr(builder, '_id', None)
@ -263,7 +267,11 @@ class RingBuilder(object):
self.version = builder['version']
self._replica2part2dev = builder['_replica2part2dev']
self._last_part_moves_epoch = builder['_last_part_moves_epoch']
self._last_part_moves = builder['_last_part_moves']
if builder['_last_part_moves'] is None:
self._last_part_moves = array(
'B', itertools.repeat(0, self.parts))
else:
self._last_part_moves = builder['_last_part_moves']
self._last_part_gather_start = builder['_last_part_gather_start']
self._dispersion_graph = builder.get('_dispersion_graph', {})
self.dispersion = builder.get('dispersion')
@ -555,7 +563,6 @@ class RingBuilder(object):
{'status': finish_status, 'count': gather_count + 1})
self.devs_changed = False
self.version += 1
changed_parts = self._build_dispersion_graph(old_replica2part2dev)
# clean up the cache
@ -623,22 +630,23 @@ class RingBuilder(object):
if old_device != dev['id']:
changed_parts += 1
part_at_risk = False
# update running totals for each tiers' number of parts with a
# given replica count
part_risk_depth = defaultdict(int)
part_risk_depth[0] = 0
for tier, replicas in replicas_at_tier.items():
if tier not in dispersion_graph:
dispersion_graph[tier] = [self.parts] + [0] * int_replicas
dispersion_graph[tier][0] -= 1
dispersion_graph[tier][replicas] += 1
if replicas > max_allowed_replicas[tier]:
part_at_risk = True
# this part may be at risk in multiple tiers, but we only count it
# as at_risk once
if part_at_risk:
parts_at_risk += 1
part_risk_depth[len(tier)] += (
replicas - max_allowed_replicas[tier])
# count each part-replica once at tier where dispersion is worst
parts_at_risk += max(part_risk_depth.values())
self._dispersion_graph = dispersion_graph
self.dispersion = 100.0 * parts_at_risk / self.parts
self.dispersion = 100.0 * parts_at_risk / (self.parts * self.replicas)
self.version += 1
return changed_parts
def validate(self, stats=False):
@ -1475,14 +1483,7 @@ class RingBuilder(object):
# belts & suspenders/paranoia - at every level, the sum of
# weighted_replicas should be very close to the total number of
# replicas for the ring
tiers = ['cluster', 'regions', 'zones', 'servers', 'devices']
for i, tier_name in enumerate(tiers):
replicas_at_tier = sum(weighted_replicas_by_tier[t] for t in
weighted_replicas_by_tier if len(t) == i)
if abs(self.replicas - replicas_at_tier) > 1e-10:
raise exceptions.RingValidationError(
'%s != %s at tier %s' % (
replicas_at_tier, self.replicas, tier_name))
validate_replicas_by_tier(self.replicas, weighted_replicas_by_tier)
return weighted_replicas_by_tier
@ -1585,14 +1586,7 @@ class RingBuilder(object):
# belts & suspenders/paranoia - at every level, the sum of
# wanted_replicas should be very close to the total number of
# replicas for the ring
tiers = ['cluster', 'regions', 'zones', 'servers', 'devices']
for i, tier_name in enumerate(tiers):
replicas_at_tier = sum(wanted_replicas[t] for t in
wanted_replicas if len(t) == i)
if abs(self.replicas - replicas_at_tier) > 1e-10:
raise exceptions.RingValidationError(
'%s != %s at tier %s' % (
replicas_at_tier, self.replicas, tier_name))
validate_replicas_by_tier(self.replicas, wanted_replicas)
return wanted_replicas
@ -1621,14 +1615,7 @@ class RingBuilder(object):
# belts & suspenders/paranoia - at every level, the sum of
# target_replicas should be very close to the total number
# of replicas for the ring
tiers = ['cluster', 'regions', 'zones', 'servers', 'devices']
for i, tier_name in enumerate(tiers):
replicas_at_tier = sum(target_replicas[t] for t in
target_replicas if len(t) == i)
if abs(self.replicas - replicas_at_tier) > 1e-10:
raise exceptions.RingValidationError(
'%s != %s at tier %s' % (
replicas_at_tier, self.replicas, tier_name))
validate_replicas_by_tier(self.replicas, target_replicas)
return target_replicas

View File

@ -17,6 +17,7 @@ import optparse
import re
import socket
from swift.common import exceptions
from swift.common.utils import expand_ipv6, is_valid_ip, is_valid_ipv4, \
is_valid_ipv6
@ -606,8 +607,9 @@ def build_dev_from_opts(opts):
'replication_port': replication_port, 'weight': opts.weight}
def dispersion_report(builder, search_filter=None, verbose=False):
if not builder._dispersion_graph:
def dispersion_report(builder, search_filter=None,
verbose=False, recalculate=False):
if recalculate or not builder._dispersion_graph:
builder._build_dispersion_graph()
max_allowed_replicas = builder._build_max_replicas_by_tier()
worst_tier = None
@ -618,8 +620,11 @@ def dispersion_report(builder, search_filter=None, verbose=False):
if search_filter and not re.match(search_filter, tier_name):
continue
max_replicas = int(max_allowed_replicas[tier])
at_risk_parts = sum(replica_counts[max_replicas + 1:])
placed_parts = sum(replica_counts[1:])
at_risk_parts = sum(replica_counts[i] * (i - max_replicas)
for i in range(max_replicas + 1,
len(replica_counts)))
placed_parts = sum(replica_counts[i] * i for i in range(
1, len(replica_counts)))
tier_dispersion = 100.0 * at_risk_parts / placed_parts
if tier_dispersion > max_dispersion:
max_dispersion = tier_dispersion
@ -642,6 +647,25 @@ def dispersion_report(builder, search_filter=None, verbose=False):
}
def validate_replicas_by_tier(replicas, replicas_by_tier):
"""
Validate the sum of the replicas at each tier.
The sum of the replicas at each tier should be less than or very close to
the upper limit indicated by replicas
:param replicas: float,the upper limit of replicas
:param replicas_by_tier: defaultdict,the replicas by tier
"""
tiers = ['cluster', 'regions', 'zones', 'servers', 'devices']
for i, tier_name in enumerate(tiers):
replicas_at_tier = sum(replicas_by_tier[t] for t in
replicas_by_tier if len(t) == i)
if abs(replicas - replicas_at_tier) > 1e-10:
raise exceptions.RingValidationError(
'%s != %s at tier %s' % (
replicas_at_tier, replicas, tier_name))
def format_device(region=None, zone=None, ip=None, device=None, **kwargs):
"""
Convert device dict or tier attributes to a representative string.

View File

@ -272,12 +272,18 @@ class ObjectExpirer(Daemon):
self.pop_queue(container, obj)
self.report_objects += 1
self.logger.increment('objects')
except UnexpectedResponse as err:
self.logger.increment('errors')
self.logger.error(
'Unexpected response while deleting object %(container)s '
'%(obj)s: %(err)s' % {'container': container, 'obj': obj,
'err': str(err.resp.status_int)})
except (Exception, Timeout) as err:
self.logger.increment('errors')
self.logger.exception(
_('Exception while deleting object %(container)s %(obj)s'
' %(err)s') % {'container': container,
'obj': obj, 'err': str(err)})
'Exception while deleting object %(container)s %(obj)s'
' %(err)s' % {'container': container,
'obj': obj, 'err': str(err)})
self.logger.timing_since('timing', start_time)
self.report()
@ -302,7 +308,8 @@ class ObjectExpirer(Daemon):
perform the actual delete.
"""
path = '/v1/' + urllib.parse.quote(actual_obj.lstrip('/'))
self.swift.make_request('DELETE', path,
{'X-If-Delete-At': str(timestamp),
'X-Timestamp': str(timestamp)},
(2,))
self.swift.make_request(
'DELETE', path,
{'X-If-Delete-At': str(timestamp), 'X-Timestamp': str(timestamp),
'X-Backend-Clean-Expiring-Object-Queue': 'no'},
(2,))

View File

@ -418,19 +418,26 @@ class ObjectController(BaseStorageServer):
'x-trans-id': headers_in.get('x-trans-id', '-'),
'referer': request.as_referer()})
if op != 'DELETE':
hosts = headers_in.get('X-Delete-At-Host', None)
if hosts is None:
# If header is missing, no update needed as sufficient other
# object servers should perform the required update.
return
delete_at_container = headers_in.get('X-Delete-At-Container', None)
if not delete_at_container:
# older proxy servers did not send X-Delete-At-Container so for
# backwards compatibility calculate the value here, but also
# log a warning because this is prone to inconsistent
# expiring_objects_container_divisor configurations.
# See https://bugs.launchpad.net/swift/+bug/1187200
self.logger.warning(
'X-Delete-At-Container header must be specified for '
'expiring objects background %s to work properly. Making '
'best guess as to the container name for now.' % op)
# TODO(gholt): In a future release, change the above warning to
# a raised exception and remove the guess code below.
delete_at_container = get_expirer_container(
delete_at, self.expiring_objects_container_divisor,
account, container, obj)
partition = headers_in.get('X-Delete-At-Partition', None)
hosts = headers_in.get('X-Delete-At-Host', '')
contdevices = headers_in.get('X-Delete-At-Device', '')
updates = [upd for upd in
zip((h.strip() for h in hosts.split(',')),
@ -442,6 +449,11 @@ class ObjectController(BaseStorageServer):
headers_out['x-content-type'] = 'text/plain'
headers_out['x-etag'] = 'd41d8cd98f00b204e9800998ecf8427e'
else:
if not config_true_value(
request.headers.get(
'X-Backend-Clean-Expiring-Object-Queue', 't')):
return
# DELETEs of old expiration data have no way of knowing what the
# old X-Delete-At-Container was at the time of the initial setting
# of the data, so a best guess is made here.

View File

@ -38,6 +38,47 @@ from swift.common.http import is_success, HTTP_INTERNAL_SERVER_ERROR, \
HTTP_MOVED_PERMANENTLY
class SweepStats(object):
"""
Stats bucket for an update sweep
"""
def __init__(self, errors=0, failures=0, quarantines=0, successes=0,
unlinks=0):
self.errors = errors
self.failures = failures
self.quarantines = quarantines
self.successes = successes
self.unlinks = unlinks
def copy(self):
return type(self)(self.errors, self.failures, self.quarantines,
self.successes, self.unlinks)
def since(self, other):
return type(self)(self.errors - other.errors,
self.failures - other.failures,
self.quarantines - other.quarantines,
self.successes - other.successes,
self.unlinks - other.unlinks)
def reset(self):
self.errors = 0
self.failures = 0
self.quarantines = 0
self.successes = 0
self.unlinks = 0
def __str__(self):
keys = (
(self.successes, 'successes'),
(self.failures, 'failures'),
(self.quarantines, 'quarantines'),
(self.unlinks, 'unlinks'),
(self.errors, 'errors'),
)
return ', '.join('%d %s' % pair for pair in keys)
class ObjectUpdater(Daemon):
"""Update object information in container listings."""
@ -65,16 +106,18 @@ class ObjectUpdater(Daemon):
objects_per_second))
self.node_timeout = float(conf.get('node_timeout', 10))
self.conn_timeout = float(conf.get('conn_timeout', 0.5))
self.successes = 0
self.failures = 0
self.report_interval = float(conf.get('report_interval', 300))
self.recon_cache_path = conf.get('recon_cache_path',
'/var/cache/swift')
self.rcache = os.path.join(self.recon_cache_path, 'object.recon')
self.stats = SweepStats()
def _listdir(self, path):
try:
return os.listdir(path)
except OSError as e:
self.stats.errors += 1
self.logger.increment('errors')
self.logger.error(_('ERROR: Unable to access %(path)s: '
'%(error)s') %
{'path': path, 'error': e})
@ -97,7 +140,9 @@ class ObjectUpdater(Daemon):
self.get_container_ring().get_nodes('')
for device in self._listdir(self.devices):
if not check_drive(self.devices, device, self.mount_check):
self.logger.increment('errors')
# We don't count this as an error. The occasional
# unmounted drive is part of normal cluster operations,
# so a simple warning is sufficient.
self.logger.warning(
_('Skipping %s as it is not mounted'), device)
continue
@ -109,17 +154,15 @@ class ObjectUpdater(Daemon):
else:
signal.signal(signal.SIGTERM, signal.SIG_DFL)
eventlet_monkey_patch()
self.successes = 0
self.failures = 0
self.stats.reset()
forkbegin = time.time()
self.object_sweep(os.path.join(self.devices, device))
elapsed = time.time() - forkbegin
self.logger.info(
_('Object update sweep of %(device)s'
' completed: %(elapsed).02fs, %(success)s successes'
', %(fail)s failures'),
('Object update sweep of %(device)s '
'completed: %(elapsed).02fs, %(stats)s'),
{'device': device, 'elapsed': elapsed,
'success': self.successes, 'fail': self.failures})
'stats': self.stats})
sys.exit()
while pids:
pids.remove(os.wait()[0])
@ -135,21 +178,21 @@ class ObjectUpdater(Daemon):
"""Run the updater once."""
self.logger.info(_('Begin object update single threaded sweep'))
begin = time.time()
self.successes = 0
self.failures = 0
self.stats.reset()
for device in self._listdir(self.devices):
if not check_drive(self.devices, device, self.mount_check):
self.logger.increment('errors')
# We don't count this as an error. The occasional unmounted
# drive is part of normal cluster operations, so a simple
# warning is sufficient.
self.logger.warning(
_('Skipping %s as it is not mounted'), device)
continue
self.object_sweep(os.path.join(self.devices, device))
elapsed = time.time() - begin
self.logger.info(
_('Object update single threaded sweep completed: '
'%(elapsed).02fs, %(success)s successes, %(fail)s failures'),
{'elapsed': elapsed, 'success': self.successes,
'fail': self.failures})
('Object update single-threaded sweep completed: '
'%(elapsed).02fs, %(stats)s'),
{'elapsed': elapsed, 'stats': self.stats})
dump_recon_cache({'object_updater_sweep': elapsed},
self.rcache, self.logger)
@ -160,6 +203,12 @@ class ObjectUpdater(Daemon):
:param device: path to device
"""
start_time = time.time()
last_status_update = start_time
start_stats = self.stats.copy()
my_pid = os.getpid()
self.logger.info("Object update sweep starting on %s (pid: %d)",
device, my_pid)
# loop through async pending dirs for all policies
for asyncdir in self._listdir(device):
# we only care about directories
@ -172,6 +221,8 @@ class ObjectUpdater(Daemon):
try:
base, policy = split_policy_string(asyncdir)
except PolicyError as e:
# This isn't an error, but a misconfiguration. Logging a
# warning should be sufficient.
self.logger.warning(_('Directory %(directory)r does not map '
'to a valid policy (%(error)s)') % {
'directory': asyncdir, 'error': e})
@ -188,6 +239,7 @@ class ObjectUpdater(Daemon):
try:
obj_hash, timestamp = update.split('-')
except ValueError:
self.stats.errors += 1
self.logger.increment('errors')
self.logger.error(
_('ERROR async pending file with unexpected '
@ -195,7 +247,8 @@ class ObjectUpdater(Daemon):
% (update_path))
continue
if obj_hash == last_obj_hash:
self.logger.increment("unlinks")
self.stats.unlinks += 1
self.logger.increment('unlinks')
os.unlink(update_path)
else:
self.process_object_update(update_path, device,
@ -205,11 +258,39 @@ class ObjectUpdater(Daemon):
self.objects_running_time = ratelimit_sleep(
self.objects_running_time,
self.max_objects_per_second)
now = time.time()
if now - last_status_update >= self.report_interval:
this_sweep = self.stats.since(start_stats)
self.logger.info(
('Object update sweep progress on %(device)s: '
'%(elapsed).02fs, %(stats)s (pid: %(pid)d)'),
{'device': device,
'elapsed': now - start_time,
'pid': my_pid,
'stats': this_sweep})
last_status_update = now
try:
os.rmdir(prefix_path)
except OSError:
pass
self.logger.timing_since('timing', start_time)
sweep_totals = self.stats.since(start_stats)
self.logger.info(
('Object update sweep completed on %(device)s '
'in %(elapsed).02fs seconds:, '
'%(successes)d successes, %(failures)d failures, '
'%(quarantines)d quarantines, '
'%(unlinks)d unlinks, %(errors)d errors '
'(pid: %(pid)d)'),
{'device': device,
'elapsed': time.time() - start_time,
'pid': my_pid,
'successes': sweep_totals.successes,
'failures': sweep_totals.failures,
'quarantines': sweep_totals.quarantines,
'unlinks': sweep_totals.unlinks,
'errors': sweep_totals.errors})
def process_object_update(self, update_path, device, policy):
"""
@ -224,6 +305,7 @@ class ObjectUpdater(Daemon):
except Exception:
self.logger.exception(
_('ERROR Pickle problem, quarantining %s'), update_path)
self.stats.quarantines += 1
self.logger.increment('quarantines')
target_path = os.path.join(device, 'quarantined', 'objects',
os.path.basename(update_path))
@ -284,14 +366,15 @@ class ObjectUpdater(Daemon):
break
if success:
self.successes += 1
self.stats.successes += 1
self.logger.increment('successes')
self.logger.debug('Update sent for %(obj)s %(path)s',
{'obj': obj, 'path': update_path})
self.logger.increment("unlinks")
self.stats.unlinks += 1
self.logger.increment('unlinks')
os.unlink(update_path)
else:
self.failures += 1
self.stats.failures += 1
self.logger.increment('failures')
self.logger.debug('Update failed for %(obj)s %(path)s',
{'obj': obj, 'path': update_path})

View File

@ -1845,9 +1845,11 @@ class Controller(object):
if is_success(resp.status_int):
self.app.logger.info(_('autocreate account %r'), path)
clear_info_cache(self.app, req.environ, account)
return True
else:
self.app.logger.warning(_('Could not autocreate account %r'),
path)
return False
def GETorHEAD_base(self, req, server_type, node_iter, partition, path,
concurrency=1, client_chunk_size=None):

View File

@ -27,7 +27,7 @@ from swift.proxy.controllers.base import Controller, delay_denial, \
cors_validation, set_info_cache, clear_info_cache
from swift.common.storage_policy import POLICIES
from swift.common.swob import HTTPBadRequest, HTTPForbidden, \
HTTPNotFound
HTTPNotFound, HTTPServerError
from swift.container.backend import DB_STATE_SHARDING, DB_STATE_UNSHARDED, \
DB_STATE_SHARDED
@ -254,7 +254,8 @@ class ContainerController(Controller):
account_partition, accounts, container_count = \
self.account_info(self.account_name, req)
if not accounts and self.app.account_autocreate:
self.autocreate_account(req, self.account_name)
if not self.autocreate_account(req, self.account_name):
return HTTPServerError(request=req)
account_partition, accounts, container_count = \
self.account_info(self.account_name, req)
if not accounts:

View File

@ -86,6 +86,46 @@ def check_content_type(req):
return None
def num_container_updates(container_replicas, container_quorum,
object_replicas, object_quorum):
"""
We need to send container updates via enough object servers such
that, if the object PUT succeeds, then the container update is
durable (either it's synchronously updated or written to async
pendings).
Define:
Qc = the quorum size for the container ring
Qo = the quorum size for the object ring
Rc = the replica count for the container ring
Ro = the replica count (or EC N+K) for the object ring
A durable container update is one that's made it to at least Qc
nodes. To always be durable, we have to send enough container
updates so that, if only Qo object PUTs succeed, and all the
failed object PUTs had container updates, at least Qc updates
remain. Since (Ro - Qo) object PUTs may fail, we must have at
least Qc + Ro - Qo container updates to ensure that Qc of them
remain.
Also, each container replica is named in at least one object PUT
request so that, when all requests succeed, no work is generated
for the container replicator. Thus, at least Rc updates are
necessary.
:param container_replicas: replica count for the container ring (Rc)
:param container_quorum: quorum size for the container ring (Qc)
:param object_replicas: replica count for the object ring (Ro)
:param object_quorum: quorum size for the object ring (Qo)
"""
return max(
# Qc + Ro - Qo
container_quorum + object_replicas - object_quorum,
# Rc
container_replicas)
class ObjectControllerRouter(object):
policy_type_to_controller_map = {}
@ -267,6 +307,8 @@ class BaseObjectController(Controller):
if error_response:
return error_response
req.headers['X-Timestamp'] = Timestamp.now().internal
req, delete_at_container, delete_at_part, \
delete_at_nodes = self._config_obj_expiration(req)
@ -281,8 +323,6 @@ class BaseObjectController(Controller):
partition, nodes = obj_ring.get_nodes(
self.account_name, self.container_name, self.object_name)
req.headers['X-Timestamp'] = Timestamp.now().internal
headers = self._backend_requests(
req, len(nodes), container_partition, container_nodes,
delete_at_container, delete_at_part, delete_at_nodes,
@ -309,31 +349,55 @@ class BaseObjectController(Controller):
if container_path:
headers[index]['X-Backend-Container-Path'] = container_path
for i, container in enumerate(containers):
i = i % len(headers)
set_container_update(i, container)
def set_delete_at_headers(index, delete_at_node):
headers[index]['X-Delete-At-Container'] = delete_at_container
headers[index]['X-Delete-At-Partition'] = delete_at_partition
headers[index]['X-Delete-At-Host'] = csv_append(
headers[index].get('X-Delete-At-Host'),
'%(ip)s:%(port)s' % delete_at_node)
headers[index]['X-Delete-At-Device'] = csv_append(
headers[index].get('X-Delete-At-Device'),
delete_at_node['device'])
n_updates_needed = num_container_updates(
len(containers), quorum_size(len(containers)),
n_outgoing, policy.quorum)
# if # of container_updates is not enough against # of replicas
# (or fragments). Fill them like as pigeon hole problem.
# TODO?: apply these to X-Delete-At-Container?
n_updates_needed = min(policy.quorum + 1, n_outgoing)
container_iter = itertools.cycle(containers)
existing_updates = len(containers)
dan_iter = itertools.cycle(delete_at_nodes or [])
existing_updates = 0
while existing_updates < n_updates_needed:
set_container_update(existing_updates, next(container_iter))
index = existing_updates % n_outgoing
set_container_update(index, next(container_iter))
if delete_at_nodes:
# We reverse the index in order to distribute the updates
# across all nodes.
set_delete_at_headers(n_outgoing - 1 - index, next(dan_iter))
existing_updates += 1
for i, node in enumerate(delete_at_nodes or []):
i = i % len(headers)
headers[i]['X-Delete-At-Container'] = delete_at_container
headers[i]['X-Delete-At-Partition'] = delete_at_partition
headers[i]['X-Delete-At-Host'] = csv_append(
headers[i].get('X-Delete-At-Host'),
'%(ip)s:%(port)s' % node)
headers[i]['X-Delete-At-Device'] = csv_append(
headers[i].get('X-Delete-At-Device'),
node['device'])
# Keep the number of expirer-queue deletes to a reasonable number.
#
# In the best case, at least one object server writes out an
# async_pending for an expirer-queue update. In the worst case, no
# object server does so, and an expirer-queue row remains that
# refers to an already-deleted object. In this case, upon attempting
# to delete the object, the object expirer will notice that the
# object does not exist and then remove the row from the expirer
# queue.
#
# In other words: expirer-queue updates on object DELETE are nice to
# have, but not strictly necessary for correct operation.
#
# Also, each queue update results in an async_pending record, which
# causes the object updater to talk to all container servers. If we
# have N async_pendings and Rc container replicas, we cause N * Rc
# requests from object updaters to container servers (possibly more,
# depending on retries). Thus, it is helpful to keep this number
# small.
n_desired_queue_updates = 2
for i in range(len(headers)):
headers[i].setdefault('X-Backend-Clean-Expiring-Object-Queue',
't' if i < n_desired_queue_updates else 'f')
return headers
@ -480,7 +544,9 @@ class BaseObjectController(Controller):
req.headers.pop('x-detect-content-type')
def _update_x_timestamp(self, req):
# Used by container sync feature
# The container sync feature includes an x-timestamp header with
# requests. If present this is checked and preserved, otherwise a fresh
# timestamp is added.
if 'x-timestamp' in req.headers:
try:
req_timestamp = Timestamp(req.headers['X-Timestamp'])
@ -735,14 +801,14 @@ class BaseObjectController(Controller):
# update content type in case it is missing
self._update_content_type(req)
self._update_x_timestamp(req)
# check constraints on object name and request headers
error_response = check_object_creation(req, self.object_name) or \
check_content_type(req)
if error_response:
return error_response
self._update_x_timestamp(req)
def reader():
try:
return req.environ['wsgi.input'].read(
@ -794,18 +860,8 @@ class BaseObjectController(Controller):
return HTTPNotFound(request=req)
partition, nodes = obj_ring.get_nodes(
self.account_name, self.container_name, self.object_name)
# Used by container sync feature
if 'x-timestamp' in req.headers:
try:
req_timestamp = Timestamp(req.headers['X-Timestamp'])
except ValueError:
return HTTPBadRequest(
request=req, content_type='text/plain',
body='X-Timestamp should be a UNIX timestamp float value; '
'was %r' % req.headers['x-timestamp'])
req.headers['X-Timestamp'] = req_timestamp.internal
else:
req.headers['X-Timestamp'] = Timestamp.now().internal
self._update_x_timestamp(req)
# Include local handoff nodes if write-affinity is enabled.
node_count = len(nodes)
@ -1049,7 +1105,11 @@ class ECAppIter(object):
"""
self.mime_boundary = resp.boundary
self.stashed_iter = reiterate(self._real_iter(req, resp.headers))
try:
self.stashed_iter = reiterate(self._real_iter(req, resp.headers))
except Exception:
self.close()
raise
if self.learned_content_type is not None:
resp.content_type = self.learned_content_type
@ -2109,7 +2169,7 @@ class ECGetResponseCollection(object):
Return the best bucket in the collection.
The "best" bucket is the newest timestamp with sufficient getters, or
the closest to having a sufficient getters, unless it is bettered by a
the closest to having sufficient getters, unless it is bettered by a
bucket with potential alternate nodes.
:return: An instance of :class:`~ECGetResponseBucket` or None if there

View File

@ -219,9 +219,21 @@ class TestSlo(Base):
"Expected slo_enabled to be True/False, got %r" %
(self.env.slo_enabled,))
manifest_abcde_hash = hashlib.md5()
manifest_abcde_hash.update(hashlib.md5('a' * 1024 * 1024).hexdigest())
manifest_abcde_hash.update(hashlib.md5('b' * 1024 * 1024).hexdigest())
manifest_abcde_hash.update(hashlib.md5('c' * 1024 * 1024).hexdigest())
manifest_abcde_hash.update(hashlib.md5('d' * 1024 * 1024).hexdigest())
manifest_abcde_hash.update(hashlib.md5('e').hexdigest())
self.manifest_abcde_etag = manifest_abcde_hash.hexdigest()
def test_slo_get_simple_manifest(self):
file_item = self.env.container.file('manifest-abcde')
file_contents = file_item.read()
self.assertEqual(file_item.conn.response.status, 200)
headers = dict(file_item.conn.response.getheaders())
self.assertIn('etag', headers)
self.assertEqual(headers['etag'], '"%s"' % self.manifest_abcde_etag)
self.assertEqual(4 * 1024 * 1024 + 1, len(file_contents))
self.assertEqual('a', file_contents[0])
self.assertEqual('a', file_contents[1024 * 1024 - 1])
@ -348,6 +360,10 @@ class TestSlo(Base):
file_item = self.env.container.file('manifest-abcde')
file_contents = file_item.read(size=1024 * 1024 + 2,
offset=1024 * 1024 - 1)
self.assertEqual(file_item.conn.response.status, 206)
headers = dict(file_item.conn.response.getheaders())
self.assertIn('etag', headers)
self.assertEqual(headers['etag'], '"%s"' % self.manifest_abcde_etag)
self.assertEqual('a', file_contents[0])
self.assertEqual('b', file_contents[1])
self.assertEqual('b', file_contents[-2])
@ -418,16 +434,10 @@ class TestSlo(Base):
self.assertEqual('d', file_contents[-1])
def test_slo_etag_is_hash_of_etags(self):
expected_hash = hashlib.md5()
expected_hash.update(hashlib.md5('a' * 1024 * 1024).hexdigest())
expected_hash.update(hashlib.md5('b' * 1024 * 1024).hexdigest())
expected_hash.update(hashlib.md5('c' * 1024 * 1024).hexdigest())
expected_hash.update(hashlib.md5('d' * 1024 * 1024).hexdigest())
expected_hash.update(hashlib.md5('e').hexdigest())
expected_etag = expected_hash.hexdigest()
# we have this check in test_slo_get_simple_manifest, too,
# but verify that it holds for HEAD requests
file_item = self.env.container.file('manifest-abcde')
self.assertEqual(expected_etag, file_item.info()['etag'])
self.assertEqual(self.manifest_abcde_etag, file_item.info()['etag'])
def test_slo_etag_is_hash_of_etags_submanifests(self):

View File

@ -12,6 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import errno
import os
import random
import time
import uuid
@ -103,7 +105,7 @@ class TestObjectExpirer(ReplProbeTest):
# clear proxy cache
client.post_container(self.url, self.token, self.container_name, {})
# run the expirier again after replication
# run the expirer again after replication
self.expirer.once()
# object is not in the listing
@ -126,6 +128,80 @@ class TestObjectExpirer(ReplProbeTest):
self.assertGreater(Timestamp(metadata['x-backend-timestamp']),
create_timestamp)
def test_expirer_doesnt_make_async_pendings(self):
# The object expirer cleans up its own queue. The inner loop
# basically looks like this:
#
# for obj in stuff_to_delete:
# delete_the_object(obj)
# remove_the_queue_entry(obj)
#
# By default, upon receipt of a DELETE request for an expiring
# object, the object servers will create async_pending records to
# clean the expirer queue. Since the expirer cleans its own queue,
# this is unnecessary. The expirer can make requests in such a way
# tha the object server does not write out any async pendings; this
# test asserts that this is the case.
def gather_async_pendings(onodes):
async_pendings = []
for onode in onodes:
device_dir = self.device_dir('', onode)
for ap_pol_dir in os.listdir(device_dir):
if not ap_pol_dir.startswith('async_pending'):
# skip 'objects', 'containers', etc.
continue
async_pending_dir = os.path.join(device_dir, ap_pol_dir)
try:
ap_dirs = os.listdir(async_pending_dir)
except OSError as err:
if err.errno == errno.ENOENT:
pass
else:
raise
else:
for ap_dir in ap_dirs:
ap_dir_fullpath = os.path.join(
async_pending_dir, ap_dir)
async_pendings.extend([
os.path.join(ap_dir_fullpath, ent)
for ent in os.listdir(ap_dir_fullpath)])
return async_pendings
# Make an expiring object in each policy
for policy in ENABLED_POLICIES:
container_name = "expirer-test-%d" % policy.idx
container_headers = {'X-Storage-Policy': policy.name}
client.put_container(self.url, self.token, container_name,
headers=container_headers)
now = time.time()
delete_at = int(now + 2.0)
client.put_object(
self.url, self.token, container_name, "some-object",
headers={'X-Delete-At': str(delete_at),
'X-Timestamp': Timestamp(now).normal},
contents='dontcare')
time.sleep(2.0)
# make sure auto-created expirer-queue containers get in the account
# listing so the expirer can find them
Manager(['container-updater']).once()
# Make sure there's no async_pendings anywhere. Probe tests only run
# on single-node installs anyway, so this set should be small enough
# that an exhaustive check doesn't take too long.
all_obj_nodes = {}
for policy in ENABLED_POLICIES:
for dev in policy.object_ring.devs:
all_obj_nodes[dev['device']] = dev
pendings_before = gather_async_pendings(all_obj_nodes.values())
# expire the objects
Manager(['object-expirer']).once()
pendings_after = gather_async_pendings(all_obj_nodes.values())
self.assertEqual(pendings_after, pendings_before)
def test_expirer_object_should_not_be_expired(self):
# Current object-expirer checks the correctness via x-if-delete-at
@ -282,9 +358,6 @@ class TestObjectExpirer(ReplProbeTest):
# run expirer again, delete should now succeed
self.expirer.once()
# this is mainly to paper over lp bug #1652323
self.get_to_final_state()
# verify the deletion by checking the container listing
self.assertFalse(self._check_obj_in_container_listing(),
msg='Found listing for %s' % self.object_name)

View File

@ -133,7 +133,7 @@ class TestCommands(unittest.TestCase, RunSwiftRingBuilderMixin):
msg += '%3d: %s\n' % (i, line)
self.fail(msg)
def create_sample_ring(self, part_power=6, overload=None):
def create_sample_ring(self, part_power=6, overload=None, empty=False):
"""
Create a sample ring with four devices
@ -152,35 +152,36 @@ class TestCommands(unittest.TestCase, RunSwiftRingBuilderMixin):
ring = RingBuilder(part_power, 3, 1)
if overload is not None:
ring.set_overload(overload)
ring.add_dev({'weight': 100.0,
'region': 0,
'zone': 0,
'ip': '127.0.0.1',
'port': 6200,
'device': 'sda1',
'meta': 'some meta data',
})
ring.add_dev({'weight': 100.0,
'region': 1,
'zone': 1,
'ip': '127.0.0.2',
'port': 6201,
'device': 'sda2'
})
ring.add_dev({'weight': 100.0,
'region': 2,
'zone': 2,
'ip': '127.0.0.3',
'port': 6202,
'device': 'sdc3'
})
ring.add_dev({'weight': 100.0,
'region': 3,
'zone': 3,
'ip': '127.0.0.4',
'port': 6203,
'device': 'sdd4'
})
if not empty:
ring.add_dev({'weight': 100.0,
'region': 0,
'zone': 0,
'ip': '127.0.0.1',
'port': 6200,
'device': 'sda1',
'meta': 'some meta data',
})
ring.add_dev({'weight': 100.0,
'region': 1,
'zone': 1,
'ip': '127.0.0.2',
'port': 6201,
'device': 'sda2'
})
ring.add_dev({'weight': 100.0,
'region': 2,
'zone': 2,
'ip': '127.0.0.3',
'port': 6202,
'device': 'sdc3'
})
ring.add_dev({'weight': 100.0,
'region': 3,
'zone': 3,
'ip': '127.0.0.4',
'port': 6203,
'device': 'sdd4'
})
ring.save(self.tmpfile)
return ring
@ -1885,6 +1886,55 @@ class TestCommands(unittest.TestCase, RunSwiftRingBuilderMixin):
ring_invalid_re = re.compile("Ring file .*\.ring\.gz is invalid")
self.assertTrue(ring_invalid_re.findall(mock_stdout.getvalue()))
def test_default_no_device_ring_without_exception(self):
self.create_sample_ring()
# remove devices from ring file
mock_stdout = six.StringIO()
mock_stderr = six.StringIO()
for device in ["d0", "d1", "d2", "d3"]:
argv = ["", self.tmpfile, "remove", device]
with mock.patch("sys.stdout", mock_stdout):
with mock.patch("sys.stderr", mock_stderr):
self.assertSystemExit(EXIT_SUCCESS, ringbuilder.main, argv)
# default ring file without exception
mock_stdout = six.StringIO()
mock_stderr = six.StringIO()
argv = ["", self.tmpfile, "default"]
with mock.patch("sys.stdout", mock_stdout):
with mock.patch("sys.stderr", mock_stderr):
self.assertSystemExit(EXIT_SUCCESS, ringbuilder.main, argv)
deleted_dev_list = (
" 0 0 0 127.0.0.1:6200 127.0.0.1:6200 "
"sda1 0.00 0 0.00 DEL some meta data\n"
" 1 1 1 127.0.0.2:6201 127.0.0.2:6201 "
"sda2 0.00 0 0.00 DEL \n"
" 2 2 2 127.0.0.3:6202 127.0.0.3:6202 "
"sdc3 0.00 0 0.00 DEL \n"
" 3 3 3 127.0.0.4:6203 127.0.0.4:6203 "
"sdd4 0.00 0 0.00 DEL \n")
output = mock_stdout.getvalue()
self.assertIn("64 partitions", output)
self.assertIn("all devices have been deleted", output)
self.assertIn("all devices have been deleted", output)
self.assertIn(deleted_dev_list, output)
def test_empty_ring(self):
self.create_sample_ring(empty=True)
# default ring file without exception
mock_stdout = six.StringIO()
mock_stderr = six.StringIO()
argv = ["", self.tmpfile, "default"]
with mock.patch("sys.stdout", mock_stdout):
with mock.patch("sys.stderr", mock_stderr):
self.assertSystemExit(EXIT_SUCCESS, ringbuilder.main, argv)
output = mock_stdout.getvalue()
self.assertIn("64 partitions", output)
self.assertIn("There are no devices in this ring", output)
def test_pretend_min_part_hours_passed(self):
self.run_srb("create", 8, 3, 1)
argv_pretend = ["", self.tmpfile, "pretend_min_part_hours_passed"]
@ -1927,74 +1977,41 @@ class TestCommands(unittest.TestCase, RunSwiftRingBuilderMixin):
# device won't acquire any partitions, so the ring's balance won't
# change. However, dispersion will improve.
ring = RingBuilder(6, 5, 1)
ring.add_dev({
'region': 1, 'zone': 1,
'ip': '10.0.0.1', 'port': 20001, 'weight': 1000,
'device': 'sda'})
ring.add_dev({
'region': 1, 'zone': 1,
'ip': '10.0.0.1', 'port': 20001, 'weight': 1000,
'device': 'sdb'})
ring.add_dev({
'region': 1, 'zone': 1,
'ip': '10.0.0.1', 'port': 20001, 'weight': 1000,
'device': 'sdc'})
ring.add_dev({
'region': 1, 'zone': 1,
'ip': '10.0.0.1', 'port': 20001, 'weight': 1000,
'device': 'sdd'})
ring.add_dev({
'region': 1, 'zone': 1,
'ip': '10.0.0.1', 'port': 20001, 'weight': 1000,
'device': 'sde'})
ring = RingBuilder(6, 6, 1)
devs = ('d%s' % i for i in itertools.count())
for i in range(6):
ring.add_dev({
'region': 1, 'zone': 1,
'ip': '10.0.0.1', 'port': 20001, 'weight': 1000,
'device': next(devs)})
ring.rebalance()
# The last guy in zone 1
ring.add_dev({
'region': 1, 'zone': 1,
'ip': '10.0.0.1', 'port': 20001, 'weight': 1000,
'device': 'sdf'})
'device': next(devs)})
# Add zone 2 (same total weight as zone 1)
ring.add_dev({
'region': 1, 'zone': 2,
'ip': '10.0.0.2', 'port': 20001, 'weight': 1000,
'device': 'sda'})
ring.add_dev({
'region': 1, 'zone': 2,
'ip': '10.0.0.2', 'port': 20001, 'weight': 1000,
'device': 'sdb'})
ring.add_dev({
'region': 1, 'zone': 2,
'ip': '10.0.0.2', 'port': 20001, 'weight': 1000,
'device': 'sdc'})
ring.add_dev({
'region': 1, 'zone': 2,
'ip': '10.0.0.2', 'port': 20001, 'weight': 1000,
'device': 'sdd'})
ring.add_dev({
'region': 1, 'zone': 2,
'ip': '10.0.0.2', 'port': 20001, 'weight': 1000,
'device': 'sde'})
ring.add_dev({
'region': 1, 'zone': 2,
'ip': '10.0.0.2', 'port': 20001, 'weight': 1000,
'device': 'sdf'})
for i in range(7):
ring.add_dev({
'region': 1, 'zone': 2,
'ip': '10.0.0.2', 'port': 20001, 'weight': 1000,
'device': next(devs)})
ring.pretend_min_part_hours_passed()
ring.save(self.tmpfile)
del ring
# Rebalance once: this gets 1/5 replica into zone 2; the ring is
# Rebalance once: this gets 1/6th replica into zone 2; the ring is
# saved because devices changed.
argv = ["", self.tmpfile, "rebalance", "5759339"]
self.assertSystemExit(EXIT_WARNING, ringbuilder.main, argv)
rb = RingBuilder.load(self.tmpfile)
self.assertEqual(rb.dispersion, 100)
self.assertEqual(rb.dispersion, 33.333333333333336)
self.assertEqual(rb.get_balance(), 100)
self.run_srb('pretend_min_part_hours_passed')
# Rebalance again: this gets 2/5 replica into zone 2, but no devices
# Rebalance again: this gets 2/6th replica into zone 2, but no devices
# changed and the balance stays the same. The only improvement is
# dispersion.
@ -2009,7 +2026,7 @@ class TestCommands(unittest.TestCase, RunSwiftRingBuilderMixin):
with mock.patch('swift.common.ring.RingBuilder.save', capture_save):
self.assertSystemExit(EXIT_WARNING, ringbuilder.main, argv)
self.assertEqual(captured, {
'dispersion': 0,
'dispersion': 16.666666666666668,
'balance': 100,
})
@ -2212,6 +2229,14 @@ class TestCommands(unittest.TestCase, RunSwiftRingBuilderMixin):
argv = ["", backup_file, "write_builder", "24"]
self.assertIsNone(ringbuilder.main(argv))
rb = RingBuilder.load(self.tmpfile + '.builder')
self.assertIsNotNone(rb._last_part_moves)
rb._last_part_moves = None
rb.save(self.tmpfile)
argv = ["", self.tmpfile + '.builder', "rebalance"]
self.assertSystemExit(EXIT_WARNING, ringbuilder.main, argv)
def test_warn_at_risk(self):
# check that warning is generated when rebalance does not achieve
# satisfactory balance
@ -2288,6 +2313,28 @@ class TestCommands(unittest.TestCase, RunSwiftRingBuilderMixin):
self.assertIn('dispersion', out.lower())
self.assertFalse(err)
def test_dispersion_command_recalculate(self):
rb = RingBuilder(8, 3, 0)
for i in range(3):
i += 1
rb.add_dev({'region': 1, 'zone': i, 'weight': 1.0,
'ip': '127.0.0.%d' % i, 'port': 6000, 'device': 'sda'})
# extra device in z1
rb.add_dev({'region': 1, 'zone': 1, 'weight': 1.0,
'ip': '127.0.0.1', 'port': 6000, 'device': 'sdb'})
rb.rebalance()
self.assertEqual(rb.dispersion, 16.666666666666668)
# simulate an out-of-date dispersion calculation
rb.dispersion = 50
rb.save(self.tempfile)
old_version = rb.version
out, err = self.run_srb('dispersion')
self.assertIn('Dispersion is 50.000000', out)
out, err = self.run_srb('dispersion --recalculate')
self.assertIn('Dispersion is 16.666667', out)
rb = RingBuilder.load(self.tempfile)
self.assertEqual(rb.version, old_version + 1)
def test_use_ringfile_as_builderfile(self):
mock_stdout = six.StringIO()
mock_stderr = six.StringIO()

View File

@ -78,15 +78,16 @@ def mock_config_opts_side_effect(*args, **kwargs):
return dict()
def mock_keystone_password_side_effect(username, password, project_name,
user_domain_name, project_domain_name,
user_id, user_domain_id, trust_id,
def mock_keystone_password_side_effect(auth_url, username, password,
project_name, user_domain_name,
project_domain_name, user_id,
user_domain_id, trust_id,
domain_id, domain_name, project_id,
project_domain_id, reauthenticate):
return MockPassword(username, password, project_name, user_domain_name,
project_domain_name, user_id, user_domain_id, trust_id,
domain_id, domain_name, project_id, project_domain_id,
reauthenticate)
return MockPassword(auth_url, username, password, project_name,
user_domain_name, project_domain_name, user_id,
user_domain_id, trust_id, domain_id, domain_name,
project_id, project_domain_id, reauthenticate)
ERR_MESSAGE_SECRET_INCORRECTLY_SPECIFIED = 'Secret incorrectly specified.'
ERR_MESSAGE_KEY_UUID_NOT_FOUND = 'Key not found, uuid: '
@ -154,10 +155,11 @@ class MockBarbicanKey(object):
class MockPassword(object):
def __init__(self, username, password, project_name, user_domain_name,
project_domain_name, user_id, user_domain_id, trust_id,
domain_id, domain_name, project_id, project_domain_id,
reauthenticate):
def __init__(self, auth_url, username, password, project_name,
user_domain_name, project_domain_name, user_id,
user_domain_id, trust_id, domain_id, domain_name, project_id,
project_domain_id, reauthenticate):
self.auth_url = auth_url
self.password = password
self.username = username
self.user_domain_name = user_domain_name

View File

@ -28,9 +28,9 @@ from test.unit import FakeLogger, FakeRing
class LeakTrackingIter(object):
def __init__(self, inner_iter, fake_swift, path):
def __init__(self, inner_iter, mark_closed, path):
self.inner_iter = inner_iter
self.fake_swift = fake_swift
self.mark_closed = mark_closed
self.path = path
def __iter__(self):
@ -38,7 +38,7 @@ class LeakTrackingIter(object):
yield x
def close(self):
self.fake_swift.mark_closed(self.path)
self.mark_closed(self.path)
FakeSwiftCall = namedtuple('FakeSwiftCall', ['method', 'path', 'headers'])
@ -173,7 +173,7 @@ class FakeSwift(object):
conditional_etag=conditional_etag)
wsgi_iter = resp(env, start_response)
self.mark_opened(path)
return LeakTrackingIter(wsgi_iter, self, path)
return LeakTrackingIter(wsgi_iter, self.mark_closed, path)
def mark_opened(self, path):
self._unclosed_req_paths[path] += 1

View File

@ -59,6 +59,9 @@ class SloTestCase(unittest.TestCase):
slo_conf = {'rate_limit_under_size': '0'}
self.slo = slo.filter_factory(slo_conf)(self.app)
self.slo.logger = self.app.logger
self.manifest_abcd_etag = md5hex(
md5hex("a" * 5) + md5hex(md5hex("b" * 10) + md5hex("c" * 15)) +
md5hex("d" * 20))
def call_app(self, req, app=None):
if app is None:
@ -1683,10 +1686,6 @@ class TestSloGetManifest(SloTestCase):
'Etag': md5hex(_abcdefghijkl_manifest_json)},
_abcdefghijkl_manifest_json)
self.manifest_abcd_etag = md5hex(
md5hex("a" * 5) + md5hex(md5hex("b" * 10) + md5hex("c" * 15)) +
md5hex("d" * 20))
_bc_ranges_manifest_json = json.dumps(
[{'name': '/gettest/b_10', 'hash': md5hex('b' * 10),
'content_type': 'text/plain', 'bytes': '10',
@ -1986,7 +1985,7 @@ class TestSloGetManifest(SloTestCase):
self.assertEqual(status, '206 Partial Content')
self.assertEqual(headers['Content-Length'], '15')
self.assertNotIn('Etag', headers)
self.assertEqual(headers['Etag'], '"%s"' % self.manifest_abcd_etag)
self.assertEqual(body, 'aabbbbbbbbbbccc')
self.assertEqual(
@ -2231,7 +2230,7 @@ class TestSloGetManifest(SloTestCase):
self.assertEqual(status, '206 Partial Content')
self.assertEqual(headers['Content-Length'], '25')
self.assertNotIn('Etag', headers)
self.assertEqual(headers['Etag'], '"%s"' % self.manifest_abcd_etag)
self.assertEqual(body, 'bbbbbbbbbbccccccccccccccc')
self.assertEqual(
@ -2434,7 +2433,7 @@ class TestSloGetManifest(SloTestCase):
self.assertEqual(status, '206 Partial Content')
self.assertEqual(headers['Content-Length'], '20')
self.assertEqual(headers['Content-Type'], 'application/json')
self.assertNotIn('Etag', headers)
self.assertIn('Etag', headers)
self.assertEqual(body, 'accccccccbbbbbbbbddd')
self.assertEqual(
@ -3235,9 +3234,7 @@ class TestSloConditionalGetOldManifest(SloTestCase):
self.assertEqual(status, '206 Partial Content')
self.assertIn(('Content-Length', '4'), headers)
# We intentionally drop Etag for ranged requests.
# Presumably because of broken clients?
self.assertNotIn('etag', [h.lower() for h, v in headers])
self.assertIn(('Etag', '"%s"' % self.manifest_abcd_etag), headers)
self.assertEqual(body, 'aabb')
expected_app_calls = [

View File

@ -27,6 +27,7 @@ from tempfile import mkdtemp
from shutil import rmtree
import random
import uuid
import itertools
from six.moves import range
@ -36,6 +37,16 @@ from swift.common.ring import utils
from swift.common.ring.builder import MAX_BALANCE
def _partition_counts(builder, key='id'):
"""
Returns a dictionary mapping the given device key to (number of
partitions assigned to that key).
"""
return Counter(builder.devs[dev_id][key]
for part2dev_id in builder._replica2part2dev
for dev_id in part2dev_id)
class TestRingBuilder(unittest.TestCase):
def setUp(self):
@ -44,21 +55,12 @@ class TestRingBuilder(unittest.TestCase):
def tearDown(self):
rmtree(self.testdir, ignore_errors=1)
def _partition_counts(self, builder, key='id'):
"""
Returns a dictionary mapping the given device key to (number of
partitions assigned to that key).
"""
return Counter(builder.devs[dev_id][key]
for part2dev_id in builder._replica2part2dev
for dev_id in part2dev_id)
def _get_population_by_region(self, builder):
"""
Returns a dictionary mapping region to number of partitions in that
region.
"""
return self._partition_counts(builder, key='region')
return _partition_counts(builder, key='region')
def test_init(self):
rb = ring.RingBuilder(8, 3, 1)
@ -320,7 +322,7 @@ class TestRingBuilder(unittest.TestCase):
'port': 10000 + region * 100 + zone,
'device': 'sda%d' % dev_id})
rb.rebalance()
self.assertEqual(self._partition_counts(rb, 'zone'),
self.assertEqual(_partition_counts(rb, 'zone'),
{0: 256, 10: 256, 11: 256})
wanted_by_zone = defaultdict(lambda: defaultdict(int))
for dev in rb._iter_devs():
@ -777,7 +779,7 @@ class TestRingBuilder(unittest.TestCase):
# replica should have been moved, therefore we expect 256 parts in zone
# 0 and 1, and a total of 256 in zone 2,3, and 4
expected = defaultdict(int, {0: 256, 1: 256, 2: 86, 3: 85, 4: 85})
self.assertEqual(expected, self._partition_counts(rb, key='zone'))
self.assertEqual(expected, _partition_counts(rb, key='zone'))
zone_histogram = defaultdict(int)
for part in range(rb.parts):
@ -804,7 +806,7 @@ class TestRingBuilder(unittest.TestCase):
self.assertAlmostEqual(rb.get_balance(), 0, delta=0.5)
# every zone has either 153 or 154 parts
for zone, count in self._partition_counts(
for zone, count in _partition_counts(
rb, key='zone').items():
self.assertAlmostEqual(153.5, count, delta=1)
@ -872,18 +874,18 @@ class TestRingBuilder(unittest.TestCase):
self.assertFalse(rb.ever_rebalanced)
rb.rebalance()
self.assertTrue(rb.ever_rebalanced)
counts = self._partition_counts(rb)
counts = _partition_counts(rb)
self.assertEqual(counts, {0: 256, 1: 256, 2: 256})
rb.add_dev({'id': 3, 'region': 0, 'zone': 3, 'weight': 1,
'ip': '127.0.0.1', 'port': 10003, 'device': 'sda1'})
rb.pretend_min_part_hours_passed()
rb.rebalance()
self.assertTrue(rb.ever_rebalanced)
counts = self._partition_counts(rb)
counts = _partition_counts(rb)
self.assertEqual(counts, {0: 192, 1: 192, 2: 192, 3: 192})
rb.set_dev_weight(3, 100)
rb.rebalance()
counts = self._partition_counts(rb)
counts = _partition_counts(rb)
self.assertEqual(counts[3], 256)
def test_add_rebalance_add_rebalance_delete_rebalance(self):
@ -1637,7 +1639,7 @@ class TestRingBuilder(unittest.TestCase):
rb.validate()
# sanity check: balance respects weights, so default
part_counts = self._partition_counts(rb, key='zone')
part_counts = _partition_counts(rb, key='zone')
self.assertEqual(part_counts[0], 192)
self.assertEqual(part_counts[1], 192)
self.assertEqual(part_counts[2], 384)
@ -1648,7 +1650,7 @@ class TestRingBuilder(unittest.TestCase):
rb.pretend_min_part_hours_passed()
rb.rebalance(seed=12345)
part_counts = self._partition_counts(rb, key='zone')
part_counts = _partition_counts(rb, key='zone')
self.assertEqual({0: 212, 1: 211, 2: 345}, part_counts)
# Now, devices 0 and 1 take 50% more than their fair shares by
@ -1658,7 +1660,7 @@ class TestRingBuilder(unittest.TestCase):
rb.pretend_min_part_hours_passed()
rb.rebalance(seed=12345)
part_counts = self._partition_counts(rb, key='zone')
part_counts = _partition_counts(rb, key='zone')
self.assertEqual({0: 256, 1: 256, 2: 256}, part_counts)
# Devices 0 and 1 may take up to 75% over their fair share, but the
@ -1669,7 +1671,7 @@ class TestRingBuilder(unittest.TestCase):
rb.pretend_min_part_hours_passed()
rb.rebalance(seed=12345)
part_counts = self._partition_counts(rb, key='zone')
part_counts = _partition_counts(rb, key='zone')
self.assertEqual(part_counts[0], 256)
self.assertEqual(part_counts[1], 256)
self.assertEqual(part_counts[2], 256)
@ -1709,7 +1711,7 @@ class TestRingBuilder(unittest.TestCase):
rb.rebalance(seed=12345)
# sanity check: our overload is big enough to balance things
part_counts = self._partition_counts(rb, key='ip')
part_counts = _partition_counts(rb, key='ip')
self.assertEqual(part_counts['127.0.0.1'], 216)
self.assertEqual(part_counts['127.0.0.2'], 216)
self.assertEqual(part_counts['127.0.0.3'], 336)
@ -1721,7 +1723,7 @@ class TestRingBuilder(unittest.TestCase):
rb.pretend_min_part_hours_passed()
rb.rebalance(seed=12345)
part_counts = self._partition_counts(rb, key='ip')
part_counts = _partition_counts(rb, key='ip')
self.assertEqual({
'127.0.0.1': 237,
@ -1737,7 +1739,7 @@ class TestRingBuilder(unittest.TestCase):
rb.pretend_min_part_hours_passed()
rb.rebalance(seed=12345)
part_counts = self._partition_counts(rb, key='ip')
part_counts = _partition_counts(rb, key='ip')
self.assertEqual(part_counts['127.0.0.1'], 256)
self.assertEqual(part_counts['127.0.0.2'], 256)
self.assertEqual(part_counts['127.0.0.3'], 256)
@ -1770,7 +1772,7 @@ class TestRingBuilder(unittest.TestCase):
'127.0.0.4': 192,
}
part_counts = self._partition_counts(rb, key='ip')
part_counts = _partition_counts(rb, key='ip')
self.assertEqual(part_counts, expected)
def test_overload_keeps_balanceable_things_balanced_initially(self):
@ -1803,7 +1805,7 @@ class TestRingBuilder(unittest.TestCase):
rb.set_overload(99999)
rb.rebalance(seed=12345)
part_counts = self._partition_counts(rb)
part_counts = _partition_counts(rb)
self.assertEqual(part_counts, {
0: 128,
1: 128,
@ -1847,7 +1849,7 @@ class TestRingBuilder(unittest.TestCase):
rb.set_overload(99999)
rb.rebalance(seed=123)
part_counts = self._partition_counts(rb)
part_counts = _partition_counts(rb)
self.assertEqual(part_counts, {
0: 128,
1: 128,
@ -1868,7 +1870,7 @@ class TestRingBuilder(unittest.TestCase):
rb.set_dev_weight(1, 8)
rb.rebalance(seed=456)
part_counts = self._partition_counts(rb)
part_counts = _partition_counts(rb)
self.assertEqual(part_counts, {
0: 128,
1: 128,
@ -2273,7 +2275,7 @@ class TestRingBuilder(unittest.TestCase):
self.assertRaises(exceptions.RingValidationError, rb.validate)
rb.rebalance()
counts = self._partition_counts(rb, key='zone')
counts = _partition_counts(rb, key='zone')
self.assertEqual(counts, {0: 128, 1: 128, 2: 256, 3: 256})
dev_usage, worst = rb.validate()
@ -2455,7 +2457,7 @@ class TestRingBuilder(unittest.TestCase):
# we'll rebalance but can't move any parts
rb.rebalance(seed=1)
# zero weight tier has one copy of 1/4 part-replica
self.assertEqual(rb.dispersion, 75.0)
self.assertEqual(rb.dispersion, 25.0)
self.assertEqual(rb._dispersion_graph, {
(0,): [0, 0, 0, 256],
(0, 0): [0, 0, 0, 256],
@ -2514,7 +2516,7 @@ class TestRingBuilder(unittest.TestCase):
# so the first time, rings are still unbalanced becase we'll only move
# one replica of each part.
self.assertEqual(rb.get_balance(), 50.1953125)
self.assertEqual(rb.dispersion, 99.609375)
self.assertEqual(rb.dispersion, 16.6015625)
# N.B. since we mostly end up grabbing parts by "weight forced" some
# seeds given some specific ring state will randomly pick bad
@ -2524,14 +2526,14 @@ class TestRingBuilder(unittest.TestCase):
# ... this isn't a really "desirable" behavior, but even with bad luck,
# things do get better
self.assertEqual(rb.get_balance(), 47.265625)
self.assertEqual(rb.dispersion, 99.609375)
self.assertEqual(rb.dispersion, 16.6015625)
# but if you stick with it, eventually the next rebalance, will get to
# move "the right" part-replicas, resulting in near optimal balance
changed_part, _, _ = rb.rebalance(seed=7)
self.assertEqual(changed_part, 240)
self.assertEqual(rb.get_balance(), 0.390625)
self.assertEqual(rb.dispersion, 99.609375)
self.assertEqual(rb.dispersion, 16.6015625)
def test_undispersable_server_converge_on_balance(self):
rb = ring.RingBuilder(8, 6, 0)
@ -2567,7 +2569,7 @@ class TestRingBuilder(unittest.TestCase):
# but the first time, those are still unbalance becase ring builder
# can move only one replica for each part
self.assertEqual(rb.get_balance(), 16.9921875)
self.assertEqual(rb.dispersion, 59.765625)
self.assertEqual(rb.dispersion, 9.9609375)
rb.rebalance(seed=7)
@ -2575,7 +2577,7 @@ class TestRingBuilder(unittest.TestCase):
self.assertGreaterEqual(rb.get_balance(), 0)
self.assertLess(rb.get_balance(), 1)
# dispersion doesn't get any worse
self.assertEqual(rb.dispersion, 59.765625)
self.assertEqual(rb.dispersion, 9.9609375)
def test_effective_overload(self):
rb = ring.RingBuilder(8, 3, 1)
@ -3628,7 +3630,7 @@ class TestGetRequiredOverload(unittest.TestCase):
# when overload can not change the outcome none is required
self.assertEqual(0.0, rb.get_required_overload())
# even though dispersion is terrible (in z1 particularly)
self.assertEqual(100.0, rb.dispersion)
self.assertEqual(20.0, rb.dispersion)
def test_one_big_guy_does_not_spoil_his_buddy(self):
rb = ring.RingBuilder(8, 3, 0)
@ -4363,5 +4365,158 @@ class TestGetRequiredOverload(unittest.TestCase):
wr.items() if len(t) == tier_len})
class TestRingBuilderDispersion(unittest.TestCase):
def setUp(self):
self.devs = ('d%s' % i for i in itertools.count())
def assertAlmostPartCount(self, counts, expected, delta=3):
msgs = []
failed = False
for k, p in sorted(expected.items()):
try:
self.assertAlmostEqual(counts[k], p, delta=delta)
except KeyError:
self.fail('%r is missing the key %r' % (counts, k))
except AssertionError:
failed = True
state = '!='
else:
state = 'ok'
msgs.append('parts in %s was %s expected %s (%s)' % (
k, counts[k], p, state))
if failed:
self.fail('some part counts not close enough '
'to expected:\n' + '\n'.join(msgs))
def test_rebalance_dispersion(self):
rb = ring.RingBuilder(8, 6, 0)
for i in range(6):
rb.add_dev({'region': 0, 'zone': 0, 'ip': '127.0.0.1',
'port': 6000, 'weight': 1.0,
'device': next(self.devs)})
rb.rebalance()
self.assertEqual(0, rb.dispersion)
for z in range(2):
for i in range(6):
rb.add_dev({'region': 0, 'zone': z + 1, 'ip': '127.0.1.1',
'port': 6000, 'weight': 1.0,
'device': next(self.devs)})
self.assertAlmostPartCount(_partition_counts(rb, 'zone'),
{0: 1536, 1: 0, 2: 0})
rb.rebalance()
self.assertEqual(rb.dispersion, 50.0)
expected = {0: 1280, 1: 128, 2: 128}
self.assertAlmostPartCount(_partition_counts(rb, 'zone'),
expected)
report = dict(utils.dispersion_report(
rb, r'r\d+z\d+$', verbose=True)['graph'])
counts = {int(k.split('z')[1]): d['placed_parts']
for k, d in report.items()}
self.assertAlmostPartCount(counts, expected)
rb.rebalance()
self.assertEqual(rb.dispersion, 33.333333333333336)
expected = {0: 1024, 1: 256, 2: 256}
self.assertAlmostPartCount(_partition_counts(rb, 'zone'),
expected)
report = dict(utils.dispersion_report(
rb, r'r\d+z\d+$', verbose=True)['graph'])
counts = {int(k.split('z')[1]): d['placed_parts']
for k, d in report.items()}
self.assertAlmostPartCount(counts, expected)
rb.rebalance()
self.assertEqual(rb.dispersion, 16.666666666666668)
expected = {0: 768, 1: 384, 2: 384}
self.assertAlmostPartCount(_partition_counts(rb, 'zone'),
expected)
report = dict(utils.dispersion_report(
rb, r'r\d+z\d+$', verbose=True)['graph'])
counts = {int(k.split('z')[1]): d['placed_parts']
for k, d in report.items()}
self.assertAlmostPartCount(counts, expected)
rb.rebalance()
self.assertEqual(0, rb.dispersion)
expected = {0: 512, 1: 512, 2: 512}
self.assertAlmostPartCount(_partition_counts(rb, 'zone'), expected)
report = dict(utils.dispersion_report(
rb, r'r\d+z\d+$', verbose=True)['graph'])
counts = {int(k.split('z')[1]): d['placed_parts']
for k, d in report.items()}
self.assertAlmostPartCount(counts, expected)
def test_weight_dispersion(self):
rb = ring.RingBuilder(8, 3, 0)
for i in range(2):
for d in range(3):
rb.add_dev({'region': 0, 'zone': 0, 'ip': '127.0.%s.1' % i,
'port': 6000, 'weight': 1.0,
'device': next(self.devs)})
for d in range(3):
rb.add_dev({'region': 0, 'zone': 0, 'ip': '127.0.2.1',
'port': 6000, 'weight': 10.0,
'device': next(self.devs)})
rb.rebalance()
# each tier should only have 1 replicanth, but the big server has 2
# replicas of every part and 3 replicas another 1/2 - so our total
# dispersion is greater than one replicanth, it's 1.5
self.assertEqual(50.0, rb.dispersion)
expected = {
'127.0.0.1': 64,
'127.0.1.1': 64,
'127.0.2.1': 640,
}
self.assertAlmostPartCount(_partition_counts(rb, 'ip'),
expected)
report = dict(utils.dispersion_report(
rb, r'r\d+z\d+-[^/]*$', verbose=True)['graph'])
counts = {k.split('-')[1]: d['placed_parts']
for k, d in report.items()}
self.assertAlmostPartCount(counts, expected)
def test_multiple_tier_dispersion(self):
rb = ring.RingBuilder(10, 8, 0)
r_z_to_ip_count = {
(0, 0): 2,
(1, 1): 1,
(1, 2): 2,
}
ip_index = 0
for (r, z), ip_count in sorted(r_z_to_ip_count.items()):
for i in range(ip_count):
ip_index += 1
for d in range(3):
rb.add_dev({'region': r, 'zone': z,
'ip': '127.%s.%s.%s' % (r, z, ip_index),
'port': 6000, 'weight': 1.0,
'device': next(self.devs)})
for i in range(3):
# it might take a few rebalances for all the right part replicas to
# balance from r1z2 into r1z1
rb.rebalance()
self.assertAlmostEqual(15.52734375, rb.dispersion, delta=5.0)
self.assertAlmostEqual(0.0, rb.get_balance(), delta=0.5)
expected = {
'127.0.0.1': 1638,
'127.0.0.2': 1638,
'127.1.1.3': 1638,
'127.1.2.4': 1638,
'127.1.2.5': 1638,
}
delta = 10
self.assertAlmostPartCount(_partition_counts(rb, 'ip'), expected,
delta=delta)
report = dict(utils.dispersion_report(
rb, r'r\d+z\d+-[^/]*$', verbose=True)['graph'])
counts = {k.split('-')[1]: d['placed_parts']
for k, d in report.items()}
self.assertAlmostPartCount(counts, expected, delta=delta)
if __name__ == '__main__':
unittest.main()

View File

@ -14,7 +14,9 @@
# limitations under the License.
import unittest
from collections import defaultdict
from swift.common import exceptions
from swift.common import ring
from swift.common.ring.utils import (tiers_for_dev, build_tier_tree,
validate_and_normalize_ip,
@ -26,7 +28,8 @@ from swift.common.ring.utils import (tiers_for_dev, build_tier_tree,
validate_args, parse_args,
parse_builder_ring_filename_args,
build_dev_from_opts, dispersion_report,
parse_address, get_tier_name, pretty_dev)
parse_address, get_tier_name, pretty_dev,
validate_replicas_by_tier)
class TestUtils(unittest.TestCase):
@ -172,6 +175,81 @@ class TestUtils(unittest.TestCase):
self.assertRaises(ValueError,
validate_and_normalize_address, hostname)
def test_validate_replicas_by_tier_close(self):
one_ip_six_devices = \
defaultdict(float,
{(): 4.0,
(0,): 4.0,
(0, 0): 4.0,
(0, 0, '127.0.0.1'): 4.0,
(0, 0, '127.0.0.1', 0): 0.6666666670,
(0, 0, '127.0.0.1', 1): 0.6666666668,
(0, 0, '127.0.0.1', 2): 0.6666666667,
(0, 0, '127.0.0.1', 3): 0.6666666666,
(0, 0, '127.0.0.1', 4): 0.6666666665,
(0, 0, '127.0.0.1', 5): 0.6666666664,
})
try:
validate_replicas_by_tier(4, one_ip_six_devices)
except Exception as e:
self.fail('one_ip_six_devices is invalid for %s' % e)
def test_validate_replicas_by_tier_exact(self):
three_regions_three_devices = \
defaultdict(float,
{(): 3.0,
(0,): 1.0,
(0, 0): 1.0,
(0, 0, '127.0.0.1'): 1.0,
(0, 0, '127.0.0.1', 0): 1.0,
(1,): 1.0,
(1, 1): 1.0,
(1, 1, '127.0.0.1'): 1.0,
(1, 1, '127.0.0.1', 1): 1.0,
(2,): 1.0,
(2, 2): 1.0,
(2, 2, '127.0.0.1'): 1.0,
(2, 2, '127.0.0.1', 2): 1.0,
})
try:
validate_replicas_by_tier(3, three_regions_three_devices)
except Exception as e:
self.fail('three_regions_three_devices is invalid for %s' % e)
def test_validate_replicas_by_tier_errors(self):
pseudo_replicas = \
defaultdict(float,
{(): 3.0,
(0,): 1.0,
(0, 0): 1.0,
(0, 0, '127.0.0.1'): 1.0,
(0, 0, '127.0.0.1', 0): 1.0,
(1,): 1.0,
(1, 1): 1.0,
(1, 1, '127.0.0.1'): 1.0,
(1, 1, '127.0.0.1', 1): 1.0,
(2,): 1.0,
(2, 2): 1.0,
(2, 2, '127.0.0.1'): 1.0,
(2, 2, '127.0.0.1', 2): 1.0,
})
def do_test(bad_tier_key, bad_tier_name):
# invalidate a copy of pseudo_replicas at given key and check for
# an exception to be raised
test_replicas = dict(pseudo_replicas)
test_replicas[bad_tier_key] += 0.1 # <- this is not fair!
with self.assertRaises(exceptions.RingValidationError) as ctx:
validate_replicas_by_tier(3, test_replicas)
self.assertEqual(
'3.1 != 3 at tier %s' % bad_tier_name, str(ctx.exception))
do_test((), 'cluster')
do_test((1,), 'regions')
do_test((0, 0), 'zones')
do_test((2, 2, '127.0.0.1'), 'servers')
do_test((1, 1, '127.0.0.1', 1), 'devices')
def test_parse_search_value(self):
res = parse_search_value('r0')
self.assertEqual(res, {'region': 0})
@ -619,10 +697,10 @@ class TestUtils(unittest.TestCase):
rb.rebalance(seed=100)
rb.validate()
self.assertEqual(rb.dispersion, 55.46875)
self.assertEqual(rb.dispersion, 18.489583333333332)
report = dispersion_report(rb)
self.assertEqual(report['worst_tier'], 'r1z1')
self.assertEqual(report['max_dispersion'], 44.921875)
self.assertEqual(report['worst_tier'], 'r1z1-127.0.0.1')
self.assertEqual(report['max_dispersion'], 22.68370607028754)
def build_tier_report(max_replicas, placed_parts, dispersion,
replicas):
@ -633,17 +711,15 @@ class TestUtils(unittest.TestCase):
'replicas': replicas,
}
# Each node should store less than or equal to 256 partitions to
# avoid multiple replicas.
# 2/5 of total weight * 768 ~= 307 -> 51 partitions on each node in
# zone 1 are stored at least twice on the nodes
# every partition has at least two replicas in this zone, unfortunately
# sometimes they're both on the same server.
expected = [
['r1z1', build_tier_report(
2, 256, 44.921875, [0, 0, 141, 115])],
2, 627, 18.341307814992025, [0, 0, 141, 115])],
['r1z1-127.0.0.1', build_tier_report(
1, 242, 29.33884297520661, [14, 171, 71, 0])],
1, 313, 22.68370607028754, [14, 171, 71, 0])],
['r1z1-127.0.0.2', build_tier_report(
1, 243, 29.218106995884774, [13, 172, 71, 0])],
1, 314, 22.611464968152866, [13, 172, 71, 0])],
]
report = dispersion_report(rb, 'r1z1[^/]*$', verbose=True)
graph = report['graph']
@ -668,15 +744,15 @@ class TestUtils(unittest.TestCase):
# can't move all the part-replicas in one rebalance
rb.rebalance(seed=100)
report = dispersion_report(rb, verbose=True)
self.assertEqual(rb.dispersion, 11.71875)
self.assertEqual(rb.dispersion, 3.90625)
self.assertEqual(report['worst_tier'], 'r1z1-127.0.0.2')
self.assertEqual(report['max_dispersion'], 8.875739644970414)
self.assertEqual(report['max_dispersion'], 8.152173913043478)
# do a sencond rebalance
rb.rebalance(seed=100)
report = dispersion_report(rb, verbose=True)
self.assertEqual(rb.dispersion, 50.0)
self.assertEqual(rb.dispersion, 16.666666666666668)
self.assertEqual(report['worst_tier'], 'r1z0-127.0.0.3')
self.assertEqual(report['max_dispersion'], 50.0)
self.assertEqual(report['max_dispersion'], 33.333333333333336)
# ... but overload can square it
rb.set_overload(rb.get_required_overload())

View File

@ -19,7 +19,6 @@ import tempfile
import time
from six.moves import range
from test import safe_repr
from test.unit import mock_check_drive
from swift.common.swob import Request, HTTPException
@ -30,14 +29,6 @@ from swift.common.constraints import MAX_OBJECT_NAME_LENGTH
class TestConstraints(unittest.TestCase):
def assertIn(self, member, container, msg=None):
"""Copied from 2.7"""
if member not in container:
standardMsg = '%s not found in %s' % (safe_repr(member),
safe_repr(container))
self.fail(self._formatMessage(msg, standardMsg))
def test_check_metadata_empty(self):
headers = {}
self.assertIsNone(constraints.check_metadata(Request.blank(
@ -145,49 +136,57 @@ class TestConstraints(unittest.TestCase):
def test_check_object_creation_content_length(self):
headers = {'Content-Length': str(constraints.MAX_FILE_SIZE),
'Content-Type': 'text/plain'}
'Content-Type': 'text/plain',
'X-Timestamp': str(time.time())}
self.assertIsNone(constraints.check_object_creation(Request.blank(
'/', headers=headers), 'object_name'))
headers = {'Content-Length': str(constraints.MAX_FILE_SIZE + 1),
'Content-Type': 'text/plain'}
'Content-Type': 'text/plain',
'X-Timestamp': str(time.time())}
resp = constraints.check_object_creation(
Request.blank('/', headers=headers), 'object_name')
self.assertEqual(resp.status_int, HTTP_REQUEST_ENTITY_TOO_LARGE)
headers = {'Transfer-Encoding': 'chunked',
'Content-Type': 'text/plain'}
'Content-Type': 'text/plain',
'X-Timestamp': str(time.time())}
self.assertIsNone(constraints.check_object_creation(Request.blank(
'/', headers=headers), 'object_name'))
headers = {'Transfer-Encoding': 'gzip',
'Content-Type': 'text/plain'}
'Content-Type': 'text/plain',
'X-Timestamp': str(time.time())}
resp = constraints.check_object_creation(Request.blank(
'/', headers=headers), 'object_name')
self.assertEqual(resp.status_int, HTTP_BAD_REQUEST)
self.assertIn('Invalid Transfer-Encoding header value', resp.body)
headers = {'Content-Type': 'text/plain'}
headers = {'Content-Type': 'text/plain',
'X-Timestamp': str(time.time())}
resp = constraints.check_object_creation(
Request.blank('/', headers=headers), 'object_name')
self.assertEqual(resp.status_int, HTTP_LENGTH_REQUIRED)
headers = {'Content-Length': 'abc',
'Content-Type': 'text/plain'}
'Content-Type': 'text/plain',
'X-Timestamp': str(time.time())}
resp = constraints.check_object_creation(Request.blank(
'/', headers=headers), 'object_name')
self.assertEqual(resp.status_int, HTTP_BAD_REQUEST)
self.assertIn('Invalid Content-Length header value', resp.body)
headers = {'Transfer-Encoding': 'gzip,chunked',
'Content-Type': 'text/plain'}
'Content-Type': 'text/plain',
'X-Timestamp': str(time.time())}
resp = constraints.check_object_creation(Request.blank(
'/', headers=headers), 'object_name')
self.assertEqual(resp.status_int, HTTP_NOT_IMPLEMENTED)
def test_check_object_creation_name_length(self):
headers = {'Transfer-Encoding': 'chunked',
'Content-Type': 'text/plain'}
'Content-Type': 'text/plain',
'X-Timestamp': str(time.time())}
name = 'o' * constraints.MAX_OBJECT_NAME_LENGTH
self.assertIsNone(constraints.check_object_creation(Request.blank(
'/', headers=headers), name))
@ -202,11 +201,13 @@ class TestConstraints(unittest.TestCase):
def test_check_object_creation_content_type(self):
headers = {'Transfer-Encoding': 'chunked',
'Content-Type': 'text/plain'}
'Content-Type': 'text/plain',
'X-Timestamp': str(time.time())}
self.assertIsNone(constraints.check_object_creation(Request.blank(
'/', headers=headers), 'object_name'))
headers = {'Transfer-Encoding': 'chunked'}
headers = {'Transfer-Encoding': 'chunked',
'X-Timestamp': str(time.time())}
resp = constraints.check_object_creation(
Request.blank('/', headers=headers), 'object_name')
self.assertEqual(resp.status_int, HTTP_BAD_REQUEST)
@ -214,140 +215,171 @@ class TestConstraints(unittest.TestCase):
def test_check_object_creation_bad_content_type(self):
headers = {'Transfer-Encoding': 'chunked',
'Content-Type': '\xff\xff'}
'Content-Type': '\xff\xff',
'X-Timestamp': str(time.time())}
resp = constraints.check_object_creation(
Request.blank('/', headers=headers), 'object_name')
self.assertEqual(resp.status_int, HTTP_BAD_REQUEST)
self.assertTrue('Content-Type' in resp.body)
self.assertIn('Content-Type', resp.body)
def test_check_object_creation_bad_delete_headers(self):
headers = {'Transfer-Encoding': 'chunked',
'Content-Type': 'text/plain',
'X-Delete-After': 'abc'}
'X-Delete-After': 'abc',
'X-Timestamp': str(time.time())}
resp = constraints.check_object_creation(
Request.blank('/', headers=headers), 'object_name')
self.assertEqual(resp.status_int, HTTP_BAD_REQUEST)
self.assertTrue('Non-integer X-Delete-After' in resp.body)
self.assertIn('Non-integer X-Delete-After', resp.body)
t = str(int(time.time() - 60))
headers = {'Transfer-Encoding': 'chunked',
'Content-Type': 'text/plain',
'X-Delete-At': t}
'X-Delete-At': t,
'X-Timestamp': str(time.time())}
resp = constraints.check_object_creation(
Request.blank('/', headers=headers), 'object_name')
self.assertEqual(resp.status_int, HTTP_BAD_REQUEST)
self.assertTrue('X-Delete-At in past' in resp.body)
self.assertIn('X-Delete-At in past', resp.body)
def test_check_delete_headers(self):
# x-delete-at value should be relative to the request timestamp rather
# than time.time() so separate the two to ensure the checks are robust
ts = utils.Timestamp(time.time() + 100)
# X-Delete-After
headers = {'X-Delete-After': '60'}
resp = constraints.check_delete_headers(
headers = {'X-Delete-After': '600',
'X-Timestamp': ts.internal}
req = constraints.check_delete_headers(
Request.blank('/', headers=headers))
self.assertTrue(isinstance(resp, Request))
self.assertTrue('x-delete-at' in resp.headers)
self.assertIsInstance(req, Request)
self.assertIn('x-delete-at', req.headers)
self.assertNotIn('x-delete-after', req.headers)
expected_delete_at = str(int(ts) + 600)
self.assertEqual(req.headers.get('X-Delete-At'), expected_delete_at)
headers = {'X-Delete-After': 'abc'}
try:
resp = constraints.check_delete_headers(
Request.blank('/', headers=headers))
except HTTPException as e:
self.assertEqual(e.status_int, HTTP_BAD_REQUEST)
self.assertTrue('Non-integer X-Delete-After' in e.body)
else:
self.fail("Should have failed with HTTPBadRequest")
headers = {'X-Delete-After': 'abc',
'X-Timestamp': ts.internal}
headers = {'X-Delete-After': '60.1'}
try:
resp = constraints.check_delete_headers(
with self.assertRaises(HTTPException) as cm:
constraints.check_delete_headers(
Request.blank('/', headers=headers))
except HTTPException as e:
self.assertEqual(e.status_int, HTTP_BAD_REQUEST)
self.assertTrue('Non-integer X-Delete-After' in e.body)
else:
self.fail("Should have failed with HTTPBadRequest")
self.assertEqual(cm.exception.status_int, HTTP_BAD_REQUEST)
self.assertIn('Non-integer X-Delete-After', cm.exception.body)
headers = {'X-Delete-After': '-1'}
try:
resp = constraints.check_delete_headers(
headers = {'X-Delete-After': '60.1',
'X-Timestamp': ts.internal}
with self.assertRaises(HTTPException) as cm:
constraints.check_delete_headers(
Request.blank('/', headers=headers))
except HTTPException as e:
self.assertEqual(e.status_int, HTTP_BAD_REQUEST)
self.assertTrue('X-Delete-After in past' in e.body)
else:
self.fail("Should have failed with HTTPBadRequest")
self.assertEqual(cm.exception.status_int, HTTP_BAD_REQUEST)
self.assertIn('Non-integer X-Delete-After', cm.exception.body)
headers = {'X-Delete-After': '-1',
'X-Timestamp': ts.internal}
with self.assertRaises(HTTPException) as cm:
constraints.check_delete_headers(
Request.blank('/', headers=headers))
self.assertEqual(cm.exception.status_int, HTTP_BAD_REQUEST)
self.assertIn('X-Delete-After in past', cm.exception.body)
headers = {'X-Delete-After': '0',
'X-Timestamp': ts.internal}
with self.assertRaises(HTTPException) as cm:
constraints.check_delete_headers(
Request.blank('/', headers=headers))
self.assertEqual(cm.exception.status_int, HTTP_BAD_REQUEST)
self.assertIn('X-Delete-After in past', cm.exception.body)
# x-delete-after = 0 disallowed when it results in x-delete-at equal to
# the timestamp
headers = {'X-Delete-After': '0',
'X-Timestamp': utils.Timestamp(int(ts)).internal}
with self.assertRaises(HTTPException) as cm:
constraints.check_delete_headers(
Request.blank('/', headers=headers))
self.assertEqual(cm.exception.status_int, HTTP_BAD_REQUEST)
self.assertIn('X-Delete-After in past', cm.exception.body)
# X-Delete-At
t = str(int(time.time() + 100))
headers = {'X-Delete-At': t}
resp = constraints.check_delete_headers(
delete_at = str(int(ts) + 100)
headers = {'X-Delete-At': delete_at,
'X-Timestamp': ts.internal}
req = constraints.check_delete_headers(
Request.blank('/', headers=headers))
self.assertTrue(isinstance(resp, Request))
self.assertTrue('x-delete-at' in resp.headers)
self.assertEqual(resp.headers.get('X-Delete-At'), t)
self.assertIsInstance(req, Request)
self.assertIn('x-delete-at', req.headers)
self.assertEqual(req.headers.get('X-Delete-At'), delete_at)
headers = {'X-Delete-At': 'abc'}
try:
resp = constraints.check_delete_headers(
headers = {'X-Delete-At': 'abc',
'X-Timestamp': ts.internal}
with self.assertRaises(HTTPException) as cm:
constraints.check_delete_headers(
Request.blank('/', headers=headers))
except HTTPException as e:
self.assertEqual(e.status_int, HTTP_BAD_REQUEST)
self.assertTrue('Non-integer X-Delete-At' in e.body)
else:
self.fail("Should have failed with HTTPBadRequest")
self.assertEqual(cm.exception.status_int, HTTP_BAD_REQUEST)
self.assertIn('Non-integer X-Delete-At', cm.exception.body)
t = str(int(time.time() + 100)) + '.1'
headers = {'X-Delete-At': t}
try:
resp = constraints.check_delete_headers(
delete_at = str(int(ts) + 100) + '.1'
headers = {'X-Delete-At': delete_at,
'X-Timestamp': ts.internal}
with self.assertRaises(HTTPException) as cm:
constraints.check_delete_headers(
Request.blank('/', headers=headers))
except HTTPException as e:
self.assertEqual(e.status_int, HTTP_BAD_REQUEST)
self.assertTrue('Non-integer X-Delete-At' in e.body)
else:
self.fail("Should have failed with HTTPBadRequest")
self.assertEqual(cm.exception.status_int, HTTP_BAD_REQUEST)
self.assertIn('Non-integer X-Delete-At', cm.exception.body)
t = str(int(time.time()))
headers = {'X-Delete-At': t}
try:
resp = constraints.check_delete_headers(
delete_at = str(int(ts) - 1)
headers = {'X-Delete-At': delete_at,
'X-Timestamp': ts.internal}
with self.assertRaises(HTTPException) as cm:
constraints.check_delete_headers(
Request.blank('/', headers=headers))
except HTTPException as e:
self.assertEqual(e.status_int, HTTP_BAD_REQUEST)
self.assertTrue('X-Delete-At in past' in e.body)
else:
self.fail("Should have failed with HTTPBadRequest")
self.assertEqual(cm.exception.status_int, HTTP_BAD_REQUEST)
self.assertIn('X-Delete-At in past', cm.exception.body)
t = str(int(time.time() - 1))
headers = {'X-Delete-At': t}
try:
resp = constraints.check_delete_headers(
# x-delete-at disallowed when exactly equal to timestamp
delete_at = str(int(ts))
headers = {'X-Delete-At': delete_at,
'X-Timestamp': utils.Timestamp(int(ts)).internal}
with self.assertRaises(HTTPException) as cm:
constraints.check_delete_headers(
Request.blank('/', headers=headers))
except HTTPException as e:
self.assertEqual(e.status_int, HTTP_BAD_REQUEST)
self.assertTrue('X-Delete-At in past' in e.body)
else:
self.fail("Should have failed with HTTPBadRequest")
self.assertEqual(cm.exception.status_int, HTTP_BAD_REQUEST)
self.assertIn('X-Delete-At in past', cm.exception.body)
def test_check_delete_headers_removes_delete_after(self):
ts = utils.Timestamp.now()
headers = {'Content-Length': '0',
'Content-Type': 'text/plain',
'X-Delete-After': '42',
'X-Delete-At': str(int(ts) + 40),
'X-Timestamp': ts.internal}
req = Request.blank('/', headers=headers)
constraints.check_delete_headers(req)
self.assertNotIn('X-Delete-After', req.headers)
self.assertEqual(req.headers['X-Delete-At'], str(int(ts) + 42))
def test_check_delete_headers_sets_delete_at(self):
t = time.time() + 1000
t = time.time()
expected = str(int(t) + 1000)
# check delete-at is passed through
headers = {'Content-Length': '0',
'Content-Type': 'text/plain',
'X-Delete-At': str(int(t))}
'X-Delete-At': expected,
'X-Timestamp': str(t)}
req = Request.blank('/', headers=headers)
constraints.check_delete_headers(req)
self.assertTrue('X-Delete-At' in req.headers)
self.assertEqual(req.headers['X-Delete-At'], str(int(t)))
self.assertIn('X-Delete-At', req.headers)
self.assertEqual(req.headers['X-Delete-At'], expected)
# check delete-after is converted to delete-at
headers = {'Content-Length': '0',
'Content-Type': 'text/plain',
'X-Delete-After': '42'}
'X-Delete-After': '42',
'X-Timestamp': str(t)}
req = Request.blank('/', headers=headers)
with mock.patch('time.time', lambda: t):
constraints.check_delete_headers(req)
self.assertTrue('X-Delete-At' in req.headers)
constraints.check_delete_headers(req)
self.assertIn('X-Delete-At', req.headers)
expected = str(int(t) + 42)
self.assertEqual(req.headers['X-Delete-At'], expected)
@ -355,21 +387,21 @@ class TestConstraints(unittest.TestCase):
headers = {'Content-Length': '0',
'Content-Type': 'text/plain',
'X-Delete-After': '42',
'X-Delete-At': str(int(t) + 40)}
'X-Delete-At': str(int(t) + 40),
'X-Timestamp': str(t)}
req = Request.blank('/', headers=headers)
with mock.patch('time.time', lambda: t):
constraints.check_delete_headers(req)
self.assertTrue('X-Delete-At' in req.headers)
constraints.check_delete_headers(req)
self.assertIn('X-Delete-At', req.headers)
self.assertEqual(req.headers['X-Delete-At'], expected)
headers = {'Content-Length': '0',
'Content-Type': 'text/plain',
'X-Delete-After': '42',
'X-Delete-At': str(int(t) + 44)}
'X-Delete-At': str(int(t) + 44),
'X-Timestamp': str(t)}
req = Request.blank('/', headers=headers)
with mock.patch('time.time', lambda: t):
constraints.check_delete_headers(req)
self.assertTrue('X-Delete-At' in req.headers)
constraints.check_delete_headers(req)
self.assertIn('X-Delete-At', req.headers)
self.assertEqual(req.headers['X-Delete-At'], expected)
def test_check_drive_invalid_path(self):
@ -473,10 +505,10 @@ class TestConstraints(unittest.TestCase):
def test_validate_constraints(self):
c = constraints
self.assertTrue(c.MAX_META_OVERALL_SIZE > c.MAX_META_NAME_LENGTH)
self.assertTrue(c.MAX_META_OVERALL_SIZE > c.MAX_META_VALUE_LENGTH)
self.assertTrue(c.MAX_HEADER_SIZE > c.MAX_META_NAME_LENGTH)
self.assertTrue(c.MAX_HEADER_SIZE > c.MAX_META_VALUE_LENGTH)
self.assertGreater(c.MAX_META_OVERALL_SIZE, c.MAX_META_NAME_LENGTH)
self.assertGreater(c.MAX_META_OVERALL_SIZE, c.MAX_META_VALUE_LENGTH)
self.assertGreater(c.MAX_HEADER_SIZE, c.MAX_META_NAME_LENGTH)
self.assertGreater(c.MAX_HEADER_SIZE, c.MAX_META_VALUE_LENGTH)
def test_check_account_format(self):
req = Request.blank(
@ -501,14 +533,11 @@ class TestConstraints(unittest.TestCase):
req = Request.blank(
'/v/a/c/o', headers={
'X-Versions-Location': versions_location})
try:
with self.assertRaises(HTTPException) as cm:
constraints.check_container_format(
req, req.headers['X-Versions-Location'])
except HTTPException as e:
self.assertTrue(e.body.startswith('Container name cannot'))
else:
self.fail('check_container_format did not raise error for %r' %
req.headers['X-Versions-Location'])
self.assertTrue(cm.exception.body.startswith(
'Container name cannot'))
def test_valid_api_version(self):
version = 'v1'

View File

@ -19,6 +19,7 @@ import unittest
import zlib
from textwrap import dedent
import os
from itertools import izip_longest
import six
from six import StringIO
@ -28,9 +29,11 @@ from test.unit import FakeLogger
from swift.common import exceptions, internal_client, swob
from swift.common.header_key_dict import HeaderKeyDict
from swift.common.storage_policy import StoragePolicy
from swift.common.middleware.proxy_logging import ProxyLoggingMiddleware
from test.unit import with_tempdir, write_fake_ring, patch_policies
from test.unit.common.middleware.helpers import FakeSwift
from test.unit import with_tempdir, write_fake_ring, patch_policies, \
DebugLogger
from test.unit.common.middleware.helpers import FakeSwift, LeakTrackingIter
if six.PY3:
from eventlet.green.urllib import request as urllib2
@ -136,19 +139,23 @@ class SetMetadataInternalClient(internal_client.InternalClient):
class IterInternalClient(internal_client.InternalClient):
def __init__(
self, test, path, marker, end_marker, acceptable_statuses, items):
self, test, path, marker, end_marker, prefix, acceptable_statuses,
items):
self.test = test
self.path = path
self.marker = marker
self.end_marker = end_marker
self.prefix = prefix
self.acceptable_statuses = acceptable_statuses
self.items = items
def _iter_items(
self, path, marker='', end_marker='', acceptable_statuses=None):
self, path, marker='', end_marker='', prefix='',
acceptable_statuses=None):
self.test.assertEqual(self.path, path)
self.test.assertEqual(self.marker, marker)
self.test.assertEqual(self.end_marker, end_marker)
self.test.assertEqual(self.prefix, prefix)
self.test.assertEqual(self.acceptable_statuses, acceptable_statuses)
for item in self.items:
yield item
@ -434,6 +441,91 @@ class TestInternalClient(unittest.TestCase):
self.assertEqual(client.env['PATH_INFO'], path)
self.assertEqual(client.env['HTTP_X_TEST'], path)
def test_make_request_error_case(self):
class InternalClient(internal_client.InternalClient):
def __init__(self):
self.logger = DebugLogger()
# wrap the fake app with ProxyLoggingMiddleware
self.app = ProxyLoggingMiddleware(
self.fake_app, {}, self.logger)
self.user_agent = 'some_agent'
self.request_tries = 3
def fake_app(self, env, start_response):
body = 'fake error response'
start_response('409 Conflict',
[('Content-Length', str(len(body)))])
return [body]
client = InternalClient()
with self.assertRaises(internal_client.UnexpectedResponse), \
mock.patch('swift.common.internal_client.sleep'):
client.make_request('DELETE', '/container', {}, (200,))
# Since we didn't provide an X-Timestamp, retrying gives us a chance to
# succeed (assuming the failure was due to clock skew between servers)
expected = (' HTTP/1.0 409 ', ' HTTP/1.0 409 ', ' HTTP/1.0 409 ', )
loglines = client.logger.get_lines_for_level('info')
for expected, logline in izip_longest(expected, loglines):
if not expected:
self.fail('Unexpected extra log line: %r' % logline)
self.assertIn(expected, logline)
def test_make_request_acceptable_status_not_2xx(self):
class InternalClient(internal_client.InternalClient):
def __init__(self, resp_status):
self.logger = DebugLogger()
# wrap the fake app with ProxyLoggingMiddleware
self.app = ProxyLoggingMiddleware(
self.fake_app, {}, self.logger)
self.user_agent = 'some_agent'
self.resp_status = resp_status
self.request_tries = 3
self.closed_paths = []
def fake_app(self, env, start_response):
body = 'fake error response'
start_response(self.resp_status,
[('Content-Length', str(len(body)))])
return LeakTrackingIter(body, self.closed_paths.append,
env['PATH_INFO'])
def do_test(resp_status):
client = InternalClient(resp_status)
with self.assertRaises(internal_client.UnexpectedResponse) as ctx, \
mock.patch('swift.common.internal_client.sleep'):
# This is obvious strange tests to expect only 400 Bad Request
# but this test intended to avoid extra body drain if it's
# correct object body with 2xx.
client.make_request('GET', '/cont/obj', {}, (400,))
loglines = client.logger.get_lines_for_level('info')
return client.closed_paths, ctx.exception.resp, loglines
closed_paths, resp, loglines = do_test('200 OK')
# Since the 200 is considered "properly handled", it won't be retried
self.assertEqual(closed_paths, [])
# ...and it'll be on us (the caller) to close (for example, by using
# swob.Response's body property)
self.assertEqual(resp.body, 'fake error response')
self.assertEqual(closed_paths, ['/cont/obj'])
expected = (' HTTP/1.0 200 ', )
for expected, logline in izip_longest(expected, loglines):
if not expected:
self.fail('Unexpected extra log line: %r' % logline)
self.assertIn(expected, logline)
closed_paths, resp, loglines = do_test('503 Service Unavailable')
# But since 5xx is neither "properly handled" not likely to include
# a large body, it will be retried and responses will already be closed
self.assertEqual(closed_paths, ['/cont/obj'] * 3)
expected = (' HTTP/1.0 503 ', ' HTTP/1.0 503 ', ' HTTP/1.0 503 ', )
for expected, logline in izip_longest(expected, loglines):
if not expected:
self.fail('Unexpected extra log line: %r' % logline)
self.assertIn(expected, logline)
def test_make_request_codes(self):
class InternalClient(internal_client.InternalClient):
def __init__(self):
@ -487,30 +579,33 @@ class TestInternalClient(unittest.TestCase):
self.test.assertEqual(0, whence)
class InternalClient(internal_client.InternalClient):
def __init__(self):
def __init__(self, status):
self.app = self.fake_app
self.user_agent = 'some_agent'
self.request_tries = 3
self.status = status
self.call_count = 0
def fake_app(self, env, start_response):
start_response('404 Not Found', [('Content-Length', '0')])
self.call_count += 1
start_response(self.status, [('Content-Length', '0')])
return []
fobj = FileObject(self)
client = InternalClient()
def do_test(status, expected_calls):
fobj = FileObject(self)
client = InternalClient(status)
try:
old_sleep = internal_client.sleep
internal_client.sleep = not_sleep
try:
client.make_request('PUT', '/', {}, (2,), fobj)
except Exception as err:
pass
self.assertEqual(404, err.resp.status_int)
finally:
internal_client.sleep = old_sleep
with mock.patch.object(internal_client, 'sleep', not_sleep):
with self.assertRaises(Exception) as exc_mgr:
client.make_request('PUT', '/', {}, (2,), fobj)
self.assertEqual(int(status[:3]),
exc_mgr.exception.resp.status_int)
self.assertEqual(client.request_tries, fobj.seek_called)
self.assertEqual(client.call_count, fobj.seek_called)
self.assertEqual(client.call_count, expected_calls)
do_test('404 Not Found', 1)
do_test('503 Service Unavailable', 3)
def test_make_request_request_exception(self):
class InternalClient(internal_client.InternalClient):
@ -575,17 +670,15 @@ class TestInternalClient(unittest.TestCase):
self.assertEqual(1, client.make_request_called)
def test_get_metadata_invalid_status(self):
class FakeApp(object):
def __call__(self, environ, start_response):
start_response('404 Not Found', [('x-foo', 'bar')])
return ['nope']
class InternalClient(internal_client.InternalClient):
def __init__(self):
self.user_agent = 'test'
self.request_tries = 1
self.app = FakeApp()
self.app = self.fake_app
def fake_app(self, environ, start_response):
start_response('404 Not Found', [('x-foo', 'bar')])
return ['nope']
client = InternalClient()
self.assertRaises(internal_client.UnexpectedResponse,
@ -667,9 +760,9 @@ class TestInternalClient(unittest.TestCase):
return self.responses.pop(0)
paths = [
'/?format=json&marker=start&end_marker=end',
'/?format=json&marker=one%C3%A9&end_marker=end',
'/?format=json&marker=two&end_marker=end',
'/?format=json&marker=start&end_marker=end&prefix=',
'/?format=json&marker=one%C3%A9&end_marker=end&prefix=',
'/?format=json&marker=two&end_marker=end&prefix=',
]
responses = [
@ -685,6 +778,49 @@ class TestInternalClient(unittest.TestCase):
self.assertEqual('one\xc3\xa9 two'.split(), items)
def test_iter_items_with_markers_and_prefix(self):
class Response(object):
def __init__(self, status_int, body):
self.status_int = status_int
self.body = body
class InternalClient(internal_client.InternalClient):
def __init__(self, test, paths, responses):
self.test = test
self.paths = paths
self.responses = responses
def make_request(
self, method, path, headers, acceptable_statuses,
body_file=None):
exp_path = self.paths.pop(0)
self.test.assertEqual(exp_path, path)
return self.responses.pop(0)
paths = [
'/?format=json&marker=prefixed_start&end_marker=prefixed_end'
'&prefix=prefixed_',
'/?format=json&marker=prefixed_one%C3%A9&end_marker=prefixed_end'
'&prefix=prefixed_',
'/?format=json&marker=prefixed_two&end_marker=prefixed_end'
'&prefix=prefixed_',
]
responses = [
Response(200, json.dumps([{'name': 'prefixed_one\xc3\xa9'}, ])),
Response(200, json.dumps([{'name': 'prefixed_two'}, ])),
Response(204, ''),
]
items = []
client = InternalClient(self, paths, responses)
for item in client._iter_items('/', marker='prefixed_start',
end_marker='prefixed_end',
prefix='prefixed_'):
items.append(item['name'].encode('utf8'))
self.assertEqual('prefixed_one\xc3\xa9 prefixed_two'.split(), items)
def test_iter_item_read_response_if_status_is_acceptable(self):
class Response(object):
def __init__(self, status_int, body, app_iter):
@ -779,12 +915,13 @@ class TestInternalClient(unittest.TestCase):
items = '0 1 2'.split()
marker = 'some_marker'
end_marker = 'some_end_marker'
prefix = 'some_prefix'
acceptable_statuses = 'some_status_list'
client = IterInternalClient(
self, path, marker, end_marker, acceptable_statuses, items)
self, path, marker, end_marker, prefix, acceptable_statuses, items)
ret_items = []
for container in client.iter_containers(
account, marker, end_marker,
account, marker, end_marker, prefix,
acceptable_statuses=acceptable_statuses):
ret_items.append(container)
self.assertEqual(items, ret_items)
@ -987,13 +1124,15 @@ class TestInternalClient(unittest.TestCase):
path = make_path(account, container)
marker = 'some_maker'
end_marker = 'some_end_marker'
prefix = 'some_prefix'
acceptable_statuses = 'some_status_list'
items = '0 1 2'.split()
client = IterInternalClient(
self, path, marker, end_marker, acceptable_statuses, items)
self, path, marker, end_marker, prefix, acceptable_statuses, items)
ret_items = []
for obj in client.iter_objects(
account, container, marker, end_marker, acceptable_statuses):
account, container, marker, end_marker, prefix,
acceptable_statuses):
ret_items.append(obj)
self.assertEqual(items, ret_items)

View File

@ -151,15 +151,17 @@ class FakeInternalClient(reconciler.InternalClient):
container_listing_data.sort(key=operator.itemgetter('name'))
# register container listing response
container_headers = {}
container_qry_string = '?format=json&marker=&end_marker='
container_qry_string = \
'?format=json&marker=&end_marker=&prefix='
self.app.register('GET', container_path + container_qry_string,
swob.HTTPOk, container_headers,
json.dumps(container_listing_data))
if container_listing_data:
obj_name = container_listing_data[-1]['name']
# client should quote and encode marker
end_qry_string = '?format=json&marker=%s&end_marker=' % (
urllib.parse.quote(obj_name.encode('utf-8')))
end_qry_string = \
'?format=json&marker=%s&end_marker=&prefix=' % (
urllib.parse.quote(obj_name.encode('utf-8')))
self.app.register('GET', container_path + end_qry_string,
swob.HTTPOk, container_headers,
json.dumps([]))
@ -171,11 +173,11 @@ class FakeInternalClient(reconciler.InternalClient):
# register account response
account_listing_data.sort(key=operator.itemgetter('name'))
account_headers = {}
account_qry_string = '?format=json&marker=&end_marker='
account_qry_string = '?format=json&marker=&end_marker=&prefix='
self.app.register('GET', account_path + account_qry_string,
swob.HTTPOk, account_headers,
json.dumps(account_listing_data))
end_qry_string = '?format=json&marker=%s&end_marker=' % (
end_qry_string = '?format=json&marker=%s&end_marker=&prefix=' % (
urllib.parse.quote(account_listing_data[-1]['name']))
self.app.register('GET', account_path + end_qry_string,
swob.HTTPOk, account_headers,
@ -704,7 +706,7 @@ class TestReconcilerUtils(unittest.TestCase):
def listing_qs(marker):
return "?format=json&marker=%s&end_marker=" % \
return "?format=json&marker=%s&end_marker=&prefix=" % \
urllib.parse.quote(marker.encode('utf-8'))

View File

@ -252,8 +252,14 @@ class TestObjectExpirer(TestCase):
self.assertFalse(pop_queue.called)
self.assertEqual(start_reports, x.report_objects)
self.assertEqual(1, len(log_lines))
self.assertIn('Exception while deleting object container obj',
log_lines[0])
if isinstance(exc, internal_client.UnexpectedResponse):
self.assertEqual(
log_lines[0],
'Unexpected response while deleting object container '
'obj: %s' % exc.resp.status_int)
else:
self.assertTrue(log_lines[0].startswith(
'Exception while deleting object container obj'))
# verify pop_queue logic on exceptions
for exc, ts, should_pop in [(None, timestamp, True),
@ -735,30 +741,30 @@ class TestObjectExpirer(TestCase):
got_env[0]['HTTP_X_IF_DELETE_AT'])
self.assertEqual(got_env[0]['PATH_INFO'], '/v1/path/to/object name')
def test_delete_actual_object_raises_404(self):
def test_delete_actual_object_returns_expected_error(self):
def do_test(test_status):
calls = [0]
def fake_app(env, start_response):
start_response('404 Not Found', [('Content-Length', '0')])
return []
def fake_app(env, start_response):
calls[0] += 1
start_response(test_status, [('Content-Length', '0')])
return []
internal_client.loadapp = lambda *a, **kw: fake_app
internal_client.loadapp = lambda *a, **kw: fake_app
x = expirer.ObjectExpirer({})
self.assertRaises(internal_client.UnexpectedResponse,
x.delete_actual_object, '/path/to/object', '1234')
x = expirer.ObjectExpirer({})
self.assertRaises(internal_client.UnexpectedResponse,
x.delete_actual_object, '/path/to/object',
'1234')
self.assertEqual(calls[0], 1)
def test_delete_actual_object_raises_412(self):
def fake_app(env, start_response):
start_response('412 Precondition Failed',
[('Content-Length', '0')])
return []
internal_client.loadapp = lambda *a, **kw: fake_app
x = expirer.ObjectExpirer({})
self.assertRaises(internal_client.UnexpectedResponse,
x.delete_actual_object, '/path/to/object', '1234')
# object was deleted and tombstone reaped
do_test('404 Not Found')
# object was overwritten *after* the original expiration, or
# object was deleted but tombstone still exists, or
# object was overwritten ahead of the original expiration, or
# object was POSTed to with a new (or no) expiration, or ...
do_test('412 Precondition Failed')
def test_delete_actual_object_does_not_handle_odd_stuff(self):
@ -784,12 +790,25 @@ class TestObjectExpirer(TestCase):
name = 'this name should get quoted'
timestamp = '1366063156.863045'
x = expirer.ObjectExpirer({})
x.swift.make_request = mock.MagicMock()
x.swift.make_request = mock.Mock()
x.swift.make_request.return_value.status_int = 204
x.delete_actual_object(name, timestamp)
self.assertEqual(x.swift.make_request.call_count, 1)
self.assertEqual(x.swift.make_request.call_args[0][1],
'/v1/' + urllib.parse.quote(name))
def test_delete_actual_object_queue_cleaning(self):
name = 'something'
timestamp = '1515544858.80602'
x = expirer.ObjectExpirer({})
x.swift.make_request = mock.MagicMock()
x.delete_actual_object(name, timestamp)
self.assertEqual(x.swift.make_request.call_count, 1)
header = 'X-Backend-Clean-Expiring-Object-Queue'
self.assertEqual(
x.swift.make_request.call_args[0][2].get(header),
'no')
def test_pop_queue(self):
class InternalClient(object):
container_ring = FakeRing()

View File

@ -5515,35 +5515,50 @@ class TestObjectController(unittest.TestCase):
self.assertTrue('chost,badhost' in msg)
self.assertTrue('cdevice' in msg)
def test_delete_at_update_on_put(self):
# Test how delete_at_update works when issued a delete for old
# expiration info after a new put with no new expiration info.
def test_delete_at_update_cleans_old_entries(self):
# Test how delete_at_update works with a request to overwrite an object
# with delete-at metadata
policy = random.choice(list(POLICIES))
given_args = []
def fake_async_update(*args):
given_args.extend(args)
def do_test(method, headers, expected_args):
given_args = []
req = Request.blank(
'/v1/a/c/o',
environ={'REQUEST_METHOD': 'PUT'},
headers={'X-Timestamp': 1,
'X-Trans-Id': '123',
'X-Backend-Storage-Policy-Index': int(policy)})
with mock.patch.object(self.object_controller, 'async_update',
fake_async_update):
self.object_controller.delete_at_update(
'DELETE', 2, 'a', 'c', 'o', req, 'sda1', policy)
self.assertEqual(
given_args, [
def fake_async_update(*args):
given_args.extend(args)
headers.update({'X-Timestamp': 1,
'X-Trans-Id': '123',
'X-Backend-Storage-Policy-Index': int(policy)})
req = Request.blank(
'/v1/a/c/o',
environ={'REQUEST_METHOD': method},
headers=headers)
with mock.patch.object(self.object_controller, 'async_update',
fake_async_update):
self.object_controller.delete_at_update(
'DELETE', 2, 'a', 'c', 'o', req, 'sda1', policy)
self.assertEqual(expected_args, given_args)
for method in ('PUT', 'POST', 'DELETE'):
expected_args = [
'DELETE', '.expiring_objects', '0000000000',
'0000000002-a/c/o', None, None, None,
HeaderKeyDict({
'0000000002-a/c/o', None, None,
None, HeaderKeyDict({
'X-Backend-Storage-Policy-Index': 0,
'x-timestamp': utils.Timestamp('1').internal,
'x-trans-id': '123',
'referer': 'PUT http://localhost/v1/a/c/o'}),
'sda1', policy])
'referer': '%s http://localhost/v1/a/c/o' % method}),
'sda1', policy]
# async_update should be called by default...
do_test(method, {}, expected_args)
do_test(method, {'X-Backend-Clean-Expiring-Object-Queue': 'true'},
expected_args)
do_test(method, {'X-Backend-Clean-Expiring-Object-Queue': 't'},
expected_args)
# ...unless header has a false value
do_test(method, {'X-Backend-Clean-Expiring-Object-Queue': 'false'},
[])
do_test(method, {'X-Backend-Clean-Expiring-Object-Queue': 'f'}, [])
def test_delete_at_negative(self):
# Test how delete_at_update works when issued a delete for old
@ -5675,6 +5690,83 @@ class TestObjectController(unittest.TestCase):
['X-Delete-At-Container header must be specified for expiring '
'objects background PUT to work properly. Making best guess as '
'to the container name for now.'])
self.assertEqual(
given_args, [
'PUT', '.expiring_objects', '0000000000', '0000000002-a/c/o',
'127.0.0.1:1234',
'3', 'sdc1', HeaderKeyDict({
# the .expiring_objects account is always policy-0
'X-Backend-Storage-Policy-Index': 0,
'x-size': '0',
'x-etag': 'd41d8cd98f00b204e9800998ecf8427e',
'x-content-type': 'text/plain',
'x-timestamp': utils.Timestamp('1').internal,
'x-trans-id': '1234',
'referer': 'PUT http://localhost/v1/a/c/o'}),
'sda1', policy])
def test_delete_at_update_put_with_info_but_missing_host(self):
# Same as test_delete_at_update_put_with_info, but just
# missing the X-Delete-At-Host header.
policy = random.choice(list(POLICIES))
given_args = []
def fake_async_update(*args):
given_args.extend(args)
self.object_controller.async_update = fake_async_update
self.object_controller.logger = self.logger
req = Request.blank(
'/v1/a/c/o',
environ={'REQUEST_METHOD': 'PUT'},
headers={'X-Timestamp': 1,
'X-Trans-Id': '1234',
'X-Delete-At-Container': '0',
'X-Delete-At-Partition': '3',
'X-Delete-At-Device': 'sdc1',
'X-Backend-Storage-Policy-Index': int(policy)})
self.object_controller.delete_at_update('PUT', 2, 'a', 'c', 'o',
req, 'sda1', policy)
self.assertFalse(self.logger.get_lines_for_level('warning'))
self.assertEqual(given_args, [])
def test_delete_at_update_put_with_info_but_empty_host(self):
# Same as test_delete_at_update_put_with_info, but empty
# X-Delete-At-Host header and no X-Delete-At-Partition nor
# X-Delete-At-Device.
policy = random.choice(list(POLICIES))
given_args = []
def fake_async_update(*args):
given_args.extend(args)
self.object_controller.async_update = fake_async_update
self.object_controller.logger = self.logger
req = Request.blank(
'/v1/a/c/o',
environ={'REQUEST_METHOD': 'PUT'},
headers={'X-Timestamp': 1,
'X-Trans-Id': '1234',
'X-Delete-At-Container': '0',
'X-Delete-At-Host': '',
'X-Backend-Storage-Policy-Index': int(policy)})
self.object_controller.delete_at_update('PUT', 2, 'a', 'c', 'o',
req, 'sda1', policy)
self.assertFalse(self.logger.get_lines_for_level('warning'))
self.assertEqual(
given_args, [
'PUT', '.expiring_objects', '0000000000', '0000000002-a/c/o',
None,
None, None, HeaderKeyDict({
# the .expiring_objects account is always policy-0
'X-Backend-Storage-Policy-Index': 0,
'x-size': '0',
'x-etag': 'd41d8cd98f00b204e9800998ecf8427e',
'x-content-type': 'text/plain',
'x-timestamp': utils.Timestamp('1').internal,
'x-trans-id': '1234',
'referer': 'PUT http://localhost/v1/a/c/o'}),
'sda1', policy])
def test_delete_at_update_delete(self):
policy = random.choice(list(POLICIES))
@ -5861,16 +5953,16 @@ class TestObjectController(unittest.TestCase):
given_args[5], 'sda1', policy])
def test_GET_but_expired(self):
# Start off with an existing object that will expire
now = time()
test_time = now + 10000
delete_at_timestamp = int(test_time + 100)
delete_at_timestamp = int(now + 100)
delete_at_container = str(
delete_at_timestamp /
self.object_controller.expiring_objects_container_divisor *
self.object_controller.expiring_objects_container_divisor)
req = Request.blank(
'/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
headers={'X-Timestamp': normalize_timestamp(test_time - 2000),
headers={'X-Timestamp': normalize_timestamp(now),
'X-Delete-At': str(delete_at_timestamp),
'X-Delete-At-Container': delete_at_container,
'Content-Length': '4',
@ -5879,50 +5971,29 @@ class TestObjectController(unittest.TestCase):
resp = req.get_response(self.object_controller)
self.assertEqual(resp.status_int, 201)
# It expires in the future, so it's accessible via GET
req = Request.blank(
'/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'GET'},
headers={'X-Timestamp': normalize_timestamp(test_time)})
resp = req.get_response(self.object_controller)
self.assertEqual(resp.status_int, 200)
delete_at_timestamp = int(now + 1)
delete_at_container = str(
delete_at_timestamp /
self.object_controller.expiring_objects_container_divisor *
self.object_controller.expiring_objects_container_divisor)
put_timestamp = normalize_timestamp(test_time - 1000)
req = Request.blank(
'/sda1/p/a/c/o',
environ={'REQUEST_METHOD': 'PUT'},
headers={'X-Timestamp': put_timestamp,
'X-Delete-At': str(delete_at_timestamp),
'X-Delete-At-Container': delete_at_container,
'Content-Length': '4',
'Content-Type': 'application/octet-stream'})
req.body = 'TEST'
# fix server time to now: delete-at is in future, verify GET is ok
headers={'X-Timestamp': normalize_timestamp(now)})
with mock.patch('swift.obj.server.time.time', return_value=now):
resp = req.get_response(self.object_controller)
self.assertEqual(resp.status_int, 201)
self.assertEqual(resp.status_int, 200)
req = Request.blank(
'/sda1/p/a/c/o',
environ={'REQUEST_METHOD': 'GET'},
headers={'X-Timestamp': normalize_timestamp(test_time)})
# It expires in the past, so it's not accessible via GET...
req = Request.blank(
'/sda1/p/a/c/o',
environ={'REQUEST_METHOD': 'GET'},
headers={'X-Timestamp': normalize_timestamp(
delete_at_timestamp + 1)})
with mock.patch('swift.obj.server.time.time',
return_value=delete_at_timestamp + 1):
resp = req.get_response(self.object_controller)
self.assertEqual(resp.status_int, 200)
self.assertEqual(resp.status_int, 404)
self.assertEqual(resp.headers['X-Backend-Timestamp'],
utils.Timestamp(now))
# fix server time to now + 2: delete-at is in past, verify GET fails...
with mock.patch('swift.obj.server.time.time', return_value=now + 2):
req = Request.blank(
'/sda1/p/a/c/o',
environ={'REQUEST_METHOD': 'GET'},
headers={'X-Timestamp': normalize_timestamp(now + 2)})
resp = req.get_response(self.object_controller)
self.assertEqual(resp.status_int, 404)
self.assertEqual(resp.headers['X-Backend-Timestamp'],
utils.Timestamp(put_timestamp))
with mock.patch('swift.obj.server.time.time',
return_value=delete_at_timestamp + 1):
# ...unless X-Backend-Replication is sent
expected = {
'GET': 'TEST',
@ -5931,22 +6002,24 @@ class TestObjectController(unittest.TestCase):
for meth, expected_body in expected.items():
req = Request.blank(
'/sda1/p/a/c/o', method=meth,
headers={'X-Timestamp': normalize_timestamp(now + 2),
headers={'X-Timestamp':
normalize_timestamp(delete_at_timestamp + 1),
'X-Backend-Replication': 'True'})
resp = req.get_response(self.object_controller)
self.assertEqual(resp.status_int, 200)
self.assertEqual(expected_body, resp.body)
def test_HEAD_but_expired(self):
test_time = time() + 10000
delete_at_timestamp = int(test_time + 100)
# We have an object that expires in the future
now = time()
delete_at_timestamp = int(now + 100)
delete_at_container = str(
delete_at_timestamp /
self.object_controller.expiring_objects_container_divisor *
self.object_controller.expiring_objects_container_divisor)
req = Request.blank(
'/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
headers={'X-Timestamp': normalize_timestamp(test_time - 2000),
headers={'X-Timestamp': normalize_timestamp(now),
'X-Delete-At': str(delete_at_timestamp),
'X-Delete-At-Container': delete_at_container,
'Content-Length': '4',
@ -5955,26 +6028,43 @@ class TestObjectController(unittest.TestCase):
resp = req.get_response(self.object_controller)
self.assertEqual(resp.status_int, 201)
# It's accessible since it expires in the future
req = Request.blank(
'/sda1/p/a/c/o',
environ={'REQUEST_METHOD': 'HEAD'},
headers={'X-Timestamp': normalize_timestamp(test_time)})
resp = req.get_response(self.object_controller)
headers={'X-Timestamp': normalize_timestamp(now)})
with mock.patch('swift.obj.server.time.time', return_value=now):
resp = req.get_response(self.object_controller)
self.assertEqual(resp.status_int, 200)
# fix server time to now: delete-at is in future, verify GET is ok
# It's not accessible now since it expires in the past
req = Request.blank(
'/sda1/p/a/c/o',
environ={'REQUEST_METHOD': 'HEAD'},
headers={'X-Timestamp': normalize_timestamp(
delete_at_timestamp + 1)})
with mock.patch('swift.obj.server.time.time',
return_value=delete_at_timestamp + 1):
resp = req.get_response(self.object_controller)
self.assertEqual(resp.status_int, 404)
self.assertEqual(resp.headers['X-Backend-Timestamp'],
utils.Timestamp(now))
def test_POST_but_expired(self):
now = time()
with mock.patch('swift.obj.server.time.time', return_value=now):
delete_at_timestamp = int(now + 1)
delete_at_container = str(
delete_at_timestamp /
self.object_controller.expiring_objects_container_divisor *
self.object_controller.expiring_objects_container_divisor)
put_timestamp = normalize_timestamp(test_time - 1000)
delete_at_timestamp = int(now + 100)
delete_at_container = str(
delete_at_timestamp /
self.object_controller.expiring_objects_container_divisor *
self.object_controller.expiring_objects_container_divisor)
# We recreate the test object every time to ensure a clean test; a
# POST may change attributes of the object, so it's not safe to
# re-use.
def recreate_test_object(when):
req = Request.blank(
'/sda1/p/a/c/o',
environ={'REQUEST_METHOD': 'PUT'},
headers={'X-Timestamp': put_timestamp,
'/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
headers={'X-Timestamp': normalize_timestamp(when),
'X-Delete-At': str(delete_at_timestamp),
'X-Delete-At-Container': delete_at_container,
'Content-Length': '4',
@ -5982,76 +6072,69 @@ class TestObjectController(unittest.TestCase):
req.body = 'TEST'
resp = req.get_response(self.object_controller)
self.assertEqual(resp.status_int, 201)
req = Request.blank(
'/sda1/p/a/c/o',
environ={'REQUEST_METHOD': 'HEAD'},
headers={'X-Timestamp': normalize_timestamp(test_time)})
resp = req.get_response(self.object_controller)
self.assertEqual(resp.status_int, 200)
with mock.patch('swift.obj.server.time.time', return_value=now + 2):
req = Request.blank(
'/sda1/p/a/c/o',
environ={'REQUEST_METHOD': 'HEAD'},
headers={'X-Timestamp': normalize_timestamp(now + 2)})
resp = req.get_response(self.object_controller)
self.assertEqual(resp.status_int, 404)
self.assertEqual(resp.headers['X-Backend-Timestamp'],
utils.Timestamp(put_timestamp))
def test_POST_but_expired(self):
test_time = time() + 10000
delete_at_timestamp = int(test_time + 100)
delete_at_container = str(
delete_at_timestamp /
self.object_controller.expiring_objects_container_divisor *
self.object_controller.expiring_objects_container_divisor)
req = Request.blank(
'/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
headers={'X-Timestamp': normalize_timestamp(test_time - 2000),
'X-Delete-At': str(delete_at_timestamp),
'X-Delete-At-Container': delete_at_container,
'Content-Length': '4',
'Content-Type': 'application/octet-stream'})
req.body = 'TEST'
resp = req.get_response(self.object_controller)
self.assertEqual(resp.status_int, 201)
# You can POST to a not-yet-expired object
recreate_test_object(now)
the_time = now + 1
req = Request.blank(
'/sda1/p/a/c/o',
environ={'REQUEST_METHOD': 'POST'},
headers={'X-Timestamp': normalize_timestamp(test_time - 1500)})
resp = req.get_response(self.object_controller)
headers={'X-Timestamp': normalize_timestamp(the_time)})
with mock.patch('swift.obj.server.time.time', return_value=the_time):
resp = req.get_response(self.object_controller)
self.assertEqual(resp.status_int, 202)
delete_at_timestamp = int(time() + 2)
# You cannot POST to an expired object
now += 2
recreate_test_object(now)
the_time = delete_at_timestamp + 1
req = Request.blank(
'/sda1/p/a/c/o',
environ={'REQUEST_METHOD': 'POST'},
headers={'X-Timestamp': normalize_timestamp(the_time)})
with mock.patch('swift.obj.server.time.time', return_value=the_time):
resp = req.get_response(self.object_controller)
self.assertEqual(resp.status_int, 404)
def test_DELETE_can_skip_updating_expirer_queue(self):
policy = POLICIES.get_by_index(0)
test_time = time()
put_time = test_time
delete_time = test_time + 1
delete_at_timestamp = int(test_time + 10000)
delete_at_container = str(
delete_at_timestamp /
self.object_controller.expiring_objects_container_divisor *
self.object_controller.expiring_objects_container_divisor)
req = Request.blank(
'/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
headers={'X-Timestamp': normalize_timestamp(test_time - 1000),
headers={'X-Timestamp': normalize_timestamp(put_time),
'X-Delete-At': str(delete_at_timestamp),
'X-Delete-At-Container': delete_at_container,
'Content-Length': '4',
'Content-Type': 'application/octet-stream'})
req.body = 'TEST'
resp = req.get_response(self.object_controller)
# Mock out async_update so we don't get any async_pending files.
with mock.patch.object(self.object_controller, 'async_update'):
resp = req.get_response(self.object_controller)
self.assertEqual(resp.status_int, 201)
orig_time = object_server.time.time
req = Request.blank(
'/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'DELETE'},
headers={'X-Timestamp': normalize_timestamp(delete_time),
'X-Backend-Clean-Expiring-Object-Queue': 'false',
'X-If-Delete-At': str(delete_at_timestamp)})
resp = req.get_response(self.object_controller)
self.assertEqual(resp.status_int, 204)
async_pending_dir = os.path.join(
self.testdir, 'sda1', diskfile.get_async_dir(policy))
# empty dir or absent dir, either is fine
try:
t = time() + 3
object_server.time.time = lambda: t
req = Request.blank(
'/sda1/p/a/c/o',
environ={'REQUEST_METHOD': 'POST'},
headers={'X-Timestamp': normalize_timestamp(time())})
resp = req.get_response(self.object_controller)
self.assertEqual(resp.status_int, 404)
finally:
object_server.time.time = orig_time
self.assertEqual([], os.listdir(async_pending_dir))
except OSError as err:
self.assertEqual(err.errno, errno.ENOENT)
def test_DELETE_but_expired(self):
test_time = time() + 10000
@ -6118,7 +6201,7 @@ class TestObjectController(unittest.TestCase):
utils.Timestamp(test_timestamp).internal + '.data')
self.assertTrue(os.path.isfile(objfile))
# move time past expirery
# move time past expiry
with mock.patch('swift.obj.diskfile.time') as mock_time:
mock_time.time.return_value = test_time + 100
req = Request.blank(
@ -6298,6 +6381,111 @@ class TestObjectController(unittest.TestCase):
'DELETE', int(delete_at_timestamp1), 'a', 'c', 'o',
given_args[5], 'sda1', POLICIES[0]])
def test_PUT_can_skip_updating_expirer_queue(self):
policy = POLICIES.get_by_index(0)
test_time = time()
put_time = test_time
overwrite_time = test_time + 1
delete_at_timestamp = int(test_time + 10000)
delete_at_container = str(
delete_at_timestamp /
self.object_controller.expiring_objects_container_divisor *
self.object_controller.expiring_objects_container_divisor)
req = Request.blank(
'/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
headers={'X-Timestamp': normalize_timestamp(put_time),
'X-Delete-At': str(delete_at_timestamp),
'X-Delete-At-Container': delete_at_container,
'Content-Length': '4',
'Content-Type': 'application/octet-stream'})
req.body = 'TEST'
# Mock out async_update so we don't get any async_pending files.
with mock.patch.object(self.object_controller, 'async_update'):
resp = req.get_response(self.object_controller)
self.assertEqual(resp.status_int, 201)
# Overwrite with a non-expiring object
req = Request.blank(
'/sda1/p/a/c/o',
environ={'REQUEST_METHOD': 'PUT'},
headers={'X-Timestamp': normalize_timestamp(overwrite_time),
'X-Backend-Clean-Expiring-Object-Queue': 'false',
'Content-Length': '9',
'Content-Type': 'application/octet-stream'})
req.body = 'new stuff'
resp = req.get_response(self.object_controller)
self.assertEqual(resp.status_int, 201)
async_pending_dir = os.path.join(
self.testdir, 'sda1', diskfile.get_async_dir(policy))
# empty dir or absent dir, either is fine
try:
self.assertEqual([], os.listdir(async_pending_dir))
except OSError as err:
self.assertEqual(err.errno, errno.ENOENT)
def test_PUT_can_skip_deleting_expirer_queue_but_still_inserts(self):
policy = POLICIES.get_by_index(0)
test_time = time()
put_time = test_time
overwrite_time = test_time + 1
delete_at_timestamp_1 = int(test_time + 10000)
delete_at_timestamp_2 = int(test_time + 20000)
delete_at_container_1 = str(
delete_at_timestamp_1 /
self.object_controller.expiring_objects_container_divisor *
self.object_controller.expiring_objects_container_divisor)
delete_at_container_2 = str(
delete_at_timestamp_2 /
self.object_controller.expiring_objects_container_divisor *
self.object_controller.expiring_objects_container_divisor)
req = Request.blank(
'/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
headers={'X-Timestamp': normalize_timestamp(put_time),
'X-Delete-At': str(delete_at_timestamp_1),
'X-Delete-At-Container': delete_at_container_1,
'X-Delete-At-Host': '1.2.3.4',
'Content-Length': '4',
'Content-Type': 'application/octet-stream'})
req.body = 'TEST'
# Mock out async_update so we don't get any async_pending files.
with mock.patch.object(self.object_controller, 'async_update'):
resp = req.get_response(self.object_controller)
self.assertEqual(resp.status_int, 201)
# Overwrite with an expiring object
req = Request.blank(
'/sda1/p/a/c/o',
environ={'REQUEST_METHOD': 'PUT'},
headers={'X-Timestamp': normalize_timestamp(overwrite_time),
'X-Backend-Clean-Expiring-Object-Queue': 'false',
'X-Delete-At': str(delete_at_timestamp_2),
'X-Delete-At-Container': delete_at_container_2,
'X-Delete-At-Host': '1.2.3.4',
'Content-Length': '9',
'Content-Type': 'application/octet-stream'})
req.body = 'new stuff'
resp = req.get_response(self.object_controller)
self.assertEqual(resp.status_int, 201)
async_pendings = []
async_pending_dir = os.path.join(
self.testdir, 'sda1', diskfile.get_async_dir(policy))
for dirpath, _, filenames in os.walk(async_pending_dir):
for filename in filenames:
async_pendings.append(os.path.join(dirpath, filename))
self.assertEqual(len(async_pendings), 1)
async_pending_ops = []
for pending_file in async_pendings:
with open(pending_file) as fh:
async_pending = pickle.load(fh)
async_pending_ops.append(async_pending['op'])
self.assertEqual(async_pending_ops, ['PUT'])
def test_PUT_delete_at_in_past(self):
req = Request.blank(
'/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
@ -6310,6 +6498,48 @@ class TestObjectController(unittest.TestCase):
self.assertEqual(resp.status_int, 400)
self.assertTrue('X-Delete-At in past' in resp.body)
def test_POST_can_skip_updating_expirer_queue(self):
policy = POLICIES.get_by_index(0)
test_time = time()
put_time = test_time
overwrite_time = test_time + 1
delete_at_timestamp = int(test_time + 10000)
delete_at_container = str(
delete_at_timestamp /
self.object_controller.expiring_objects_container_divisor *
self.object_controller.expiring_objects_container_divisor)
req = Request.blank(
'/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
headers={'X-Timestamp': normalize_timestamp(put_time),
'X-Delete-At': str(delete_at_timestamp),
'X-Delete-At-Container': delete_at_container,
'Content-Length': '4',
'Content-Type': 'application/octet-stream'})
req.body = 'TEST'
# Mock out async_update so we don't get any async_pending files.
with mock.patch.object(self.object_controller, 'async_update'):
resp = req.get_response(self.object_controller)
self.assertEqual(resp.status_int, 201)
# POST to remove X-Delete-At
req = Request.blank(
'/sda1/p/a/c/o',
environ={'REQUEST_METHOD': 'POST'},
headers={'X-Timestamp': normalize_timestamp(overwrite_time),
'X-Backend-Clean-Expiring-Object-Queue': 'false',
'X-Delete-At': ''})
resp = req.get_response(self.object_controller)
self.assertEqual(resp.status_int, 202)
async_pending_dir = os.path.join(
self.testdir, 'sda1', diskfile.get_async_dir(policy))
# empty dir or absent dir, either is fine
try:
self.assertEqual([], os.listdir(async_pending_dir))
except OSError as err:
self.assertEqual(err.errno, errno.ENOENT)
def test_POST_delete_at_in_past(self):
req = Request.blank(
'/sda1/p/a/c/o',

View File

@ -25,7 +25,8 @@ from tempfile import mkdtemp
from shutil import rmtree
from test import listen_zero
from test.unit import (
make_timestamp_iter, debug_logger, patch_policies, mocked_http_conn)
make_timestamp_iter, debug_logger, patch_policies, mocked_http_conn,
FakeLogger)
from time import time
from distutils.dir_util import mkpath
@ -250,6 +251,72 @@ class TestObjectUpdater(unittest.TestCase):
# a warning indicating that the '99' policy isn't valid
check_with_idx('99', 1, should_skip=True)
def test_sweep_logs(self):
asyncdir = os.path.join(self.sda1, ASYNCDIR_BASE)
prefix_dir = os.path.join(asyncdir, 'abc')
mkpath(prefix_dir)
for o, t in [('abc', 123), ('def', 234), ('ghi', 345),
('jkl', 456), ('mno', 567)]:
ohash = hash_path('account', 'container', o)
o_path = os.path.join(prefix_dir, ohash + '-' +
normalize_timestamp(t))
write_pickle({}, o_path)
class MockObjectUpdater(object_updater.ObjectUpdater):
def process_object_update(self, update_path, device, policy):
os.unlink(update_path)
self.stats.successes += 1
self.stats.unlinks += 1
logger = FakeLogger()
ou = MockObjectUpdater({
'devices': self.devices_dir,
'mount_check': 'false',
'swift_dir': self.testdir,
'interval': '1',
'concurrency': '1',
'report_interval': '10.0',
'node_timeout': '5'}, logger=logger)
now = [time()]
def mock_time_function():
rv = now[0]
now[0] += 5
return rv
# With 10s between updates, time() advancing 5s every time we look,
# and 5 async_pendings on disk, we should get at least two progress
# lines.
with mock.patch('swift.obj.updater.time',
mock.MagicMock(time=mock_time_function)):
ou.object_sweep(self.sda1)
info_lines = logger.get_lines_for_level('info')
self.assertEqual(4, len(info_lines))
self.assertIn("sweep starting", info_lines[0])
self.assertIn(self.sda1, info_lines[0])
self.assertIn("sweep progress", info_lines[1])
# the space ensures it's a positive number
self.assertIn(
"2 successes, 0 failures, 0 quarantines, 2 unlinks, 0 error",
info_lines[1])
self.assertIn(self.sda1, info_lines[1])
self.assertIn("sweep progress", info_lines[2])
self.assertIn(
"4 successes, 0 failures, 0 quarantines, 4 unlinks, 0 error",
info_lines[2])
self.assertIn(self.sda1, info_lines[2])
self.assertIn("sweep complete", info_lines[3])
self.assertIn(
"5 successes, 0 failures, 0 quarantines, 5 unlinks, 0 error",
info_lines[3])
self.assertIn(self.sda1, info_lines[3])
@mock.patch.object(object_updater, 'check_drive')
def test_run_once_with_disk_unmounted(self, mock_check_drive):
mock_check_drive.return_value = False
@ -288,7 +355,7 @@ class TestObjectUpdater(unittest.TestCase):
self.assertEqual([
mock.call(self.devices_dir, 'sda1', True),
], mock_check_drive.mock_calls)
self.assertEqual(ou.logger.get_increment_counts(), {'errors': 1})
self.assertEqual(ou.logger.get_increment_counts(), {})
@mock.patch.object(object_updater, 'check_drive')
def test_run_once(self, mock_check_drive):

View File

@ -34,7 +34,7 @@ from six.moves import range
import swift
from swift.common import utils, swob, exceptions
from swift.common.exceptions import ChunkWriteTimeout
from swift.common.utils import Timestamp
from swift.common.utils import Timestamp, list_from_csv
from swift.proxy import server as proxy_server
from swift.proxy.controllers import obj
from swift.proxy.controllers.base import \
@ -45,7 +45,7 @@ from swift.common.storage_policy import POLICIES, ECDriverError, \
from test.unit import FakeRing, FakeMemcache, fake_http_connect, \
debug_logger, patch_policies, SlowBody, FakeStatus, \
DEFAULT_TEST_EC_TYPE, encode_frag_archive_bodies, make_ec_object_stub, \
fake_ec_node_response, StubResponse
fake_ec_node_response, StubResponse, mocked_http_conn
from test.unit.proxy.test_server import node_error_count
@ -452,6 +452,70 @@ class BaseObjectControllerMixin(object):
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 204)
def test_DELETE_limits_expirer_queue_updates(self):
req = swift.common.swob.Request.blank('/v1/a/c/o', method='DELETE')
codes = [204] * self.replicas()
captured_headers = []
def capture_headers(ip, port, device, part, method, path,
headers=None, **kwargs):
captured_headers.append(headers)
with set_http_connect(*codes, give_connect=capture_headers):
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 204) # sanity check
counts = {True: 0, False: 0, None: 0}
for headers in captured_headers:
v = headers.get('X-Backend-Clean-Expiring-Object-Queue')
norm_v = None if v is None else utils.config_true_value(v)
counts[norm_v] += 1
max_queue_updates = 2
o_replicas = self.replicas()
self.assertEqual(counts, {
True: min(max_queue_updates, o_replicas),
False: max(o_replicas - max_queue_updates, 0),
None: 0,
})
def test_expirer_DELETE_suppresses_expirer_queue_updates(self):
req = swift.common.swob.Request.blank(
'/v1/a/c/o', method='DELETE', headers={
'X-Backend-Clean-Expiring-Object-Queue': 'no'})
codes = [204] * self.replicas()
captured_headers = []
def capture_headers(ip, port, device, part, method, path,
headers=None, **kwargs):
captured_headers.append(headers)
with set_http_connect(*codes, give_connect=capture_headers):
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 204) # sanity check
counts = {True: 0, False: 0, None: 0}
for headers in captured_headers:
v = headers.get('X-Backend-Clean-Expiring-Object-Queue')
norm_v = None if v is None else utils.config_true_value(v)
counts[norm_v] += 1
o_replicas = self.replicas()
self.assertEqual(counts, {
True: 0,
False: o_replicas,
None: 0,
})
# Make sure we're not sending any expirer-queue update headers here.
# Since we're not updating the expirer queue, these headers would be
# superfluous.
for headers in captured_headers:
self.assertNotIn('X-Delete-At-Container', headers)
self.assertNotIn('X-Delete-At-Partition', headers)
self.assertNotIn('X-Delete-At-Host', headers)
self.assertNotIn('X-Delete-At-Device', headers)
def test_DELETE_write_affinity_before_replication(self):
policy_conf = self.app.get_policy_options(self.policy)
policy_conf.write_affinity_handoff_delete_count = self.replicas() / 2
@ -482,6 +546,69 @@ class BaseObjectControllerMixin(object):
self.assertEqual(resp.status_int, 204)
def test_PUT_limits_expirer_queue_deletes(self):
req = swift.common.swob.Request.blank(
'/v1/a/c/o', method='PUT', body='',
headers={'Content-Type': 'application/octet-stream'})
codes = [201] * self.replicas()
captured_headers = []
def capture_headers(ip, port, device, part, method, path,
headers=None, **kwargs):
captured_headers.append(headers)
expect_headers = {
'X-Obj-Metadata-Footer': 'yes',
'X-Obj-Multiphase-Commit': 'yes'
}
with set_http_connect(*codes, give_connect=capture_headers,
expect_headers=expect_headers):
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 201) # sanity check
counts = {True: 0, False: 0, None: 0}
for headers in captured_headers:
v = headers.get('X-Backend-Clean-Expiring-Object-Queue')
norm_v = None if v is None else utils.config_true_value(v)
counts[norm_v] += 1
max_queue_updates = 2
o_replicas = self.replicas()
self.assertEqual(counts, {
True: min(max_queue_updates, o_replicas),
False: max(o_replicas - max_queue_updates, 0),
None: 0,
})
def test_POST_limits_expirer_queue_deletes(self):
req = swift.common.swob.Request.blank(
'/v1/a/c/o', method='POST', body='',
headers={'Content-Type': 'application/octet-stream'})
codes = [201] * self.replicas()
captured_headers = []
def capture_headers(ip, port, device, part, method, path,
headers=None, **kwargs):
captured_headers.append(headers)
with set_http_connect(*codes, give_connect=capture_headers):
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 201) # sanity check
counts = {True: 0, False: 0, None: 0}
for headers in captured_headers:
v = headers.get('X-Backend-Clean-Expiring-Object-Queue')
norm_v = None if v is None else utils.config_true_value(v)
counts[norm_v] += 1
max_queue_updates = 2
o_replicas = self.replicas()
self.assertEqual(counts, {
True: min(max_queue_updates, o_replicas),
False: max(o_replicas - max_queue_updates, 0),
None: 0,
})
def test_POST_non_int_delete_after(self):
t = str(int(time.time() + 100)) + '.1'
req = swob.Request.blank('/v1/a/c/o', method='POST',
@ -676,20 +803,131 @@ class BaseObjectControllerMixin(object):
req, self.replicas(policy), 1, containers)
# how many of the backend headers have a container update
container_updates = len(
n_container_updates = len(
[headers for headers in backend_headers
if 'X-Container-Partition' in headers])
if num_containers <= self.quorum(policy):
# filling case
expected = min(self.quorum(policy) + 1,
self.replicas(policy))
else:
# container updates >= object replicas
expected = min(num_containers,
self.replicas(policy))
# how many object-server PUTs can fail and still let the
# client PUT succeed
n_can_fail = self.replicas(policy) - self.quorum(policy)
n_expected_updates = (
n_can_fail + utils.quorum_size(num_containers))
self.assertEqual(container_updates, expected)
# you get at least one update per container no matter what
n_expected_updates = max(
n_expected_updates, num_containers)
# you can't have more object requests with updates than you
# have object requests (the container stuff gets doubled up,
# but that's not important for purposes of durability)
n_expected_updates = min(
n_expected_updates, self.replicas(policy))
self.assertEqual(n_expected_updates, n_container_updates)
def test_delete_at_backend_requests(self):
t = str(int(time.time() + 100))
for policy in POLICIES:
req = swift.common.swob.Request.blank(
'/v1/a/c/o', method='PUT',
headers={'Content-Length': '0',
'X-Backend-Storage-Policy-Index': int(policy),
'X-Delete-At': t})
controller = self.controller_cls(self.app, 'a', 'c', 'o')
for num_del_at_nodes in range(1, 16):
containers = [
{'ip': '2.0.0.%s' % i, 'port': '70%s' % str(i).zfill(2),
'device': 'sdc'} for i in range(num_del_at_nodes)]
del_at_nodes = [
{'ip': '1.0.0.%s' % i, 'port': '60%s' % str(i).zfill(2),
'device': 'sdb'} for i in range(num_del_at_nodes)]
backend_headers = controller._backend_requests(
req, self.replicas(policy), 1, containers,
delete_at_container='dac', delete_at_partition=2,
delete_at_nodes=del_at_nodes)
devices = []
hosts = []
part = ctr = 0
for given_headers in backend_headers:
self.assertEqual(given_headers.get('X-Delete-At'), t)
if 'X-Delete-At-Partition' in given_headers:
self.assertEqual(
given_headers.get('X-Delete-At-Partition'), '2')
part += 1
if 'X-Delete-At-Container' in given_headers:
self.assertEqual(
given_headers.get('X-Delete-At-Container'), 'dac')
ctr += 1
devices += (
list_from_csv(given_headers.get('X-Delete-At-Device')))
hosts += (
list_from_csv(given_headers.get('X-Delete-At-Host')))
# same as in test_container_update_backend_requests
n_can_fail = self.replicas(policy) - self.quorum(policy)
n_expected_updates = (
n_can_fail + utils.quorum_size(num_del_at_nodes))
n_expected_hosts = max(
n_expected_updates, num_del_at_nodes)
self.assertEqual(len(hosts), n_expected_hosts)
self.assertEqual(len(devices), n_expected_hosts)
# parts don't get doubled up, maximum is count of obj requests
n_expected_parts = min(
n_expected_hosts, self.replicas(policy))
self.assertEqual(part, n_expected_parts)
self.assertEqual(ctr, n_expected_parts)
# check that hosts are correct
self.assertEqual(
set(hosts),
set('%s:%s' % (h['ip'], h['port']) for h in del_at_nodes))
self.assertEqual(set(devices), set(('sdb',)))
def test_smooth_distributed_backend_requests(self):
t = str(int(time.time() + 100))
for policy in POLICIES:
req = swift.common.swob.Request.blank(
'/v1/a/c/o', method='PUT',
headers={'Content-Length': '0',
'X-Backend-Storage-Policy-Index': int(policy),
'X-Delete-At': t})
controller = self.controller_cls(self.app, 'a', 'c', 'o')
for num_containers in range(1, 16):
containers = [
{'ip': '2.0.0.%s' % i, 'port': '70%s' % str(i).zfill(2),
'device': 'sdc'} for i in range(num_containers)]
del_at_nodes = [
{'ip': '1.0.0.%s' % i, 'port': '60%s' % str(i).zfill(2),
'device': 'sdb'} for i in range(num_containers)]
backend_headers = controller._backend_requests(
req, self.replicas(policy), 1, containers,
delete_at_container='dac', delete_at_partition=2,
delete_at_nodes=del_at_nodes)
# caculate no of expected updates, see
# test_container_update_backend_requests for explanation
n_expected_updates = min(max(
self.replicas(policy) - self.quorum(policy) +
utils.quorum_size(num_containers), num_containers),
self.replicas(policy))
# the first n_expected_updates servers should have received
# a container update
self.assertTrue(
all([h.get('X-Container-Partition')
for h in backend_headers[:n_expected_updates]]))
# the last n_expected_updates servers should have received
# the x-delete-at* headers
self.assertTrue(
all([h.get('X-Delete-At-Container')
for h in backend_headers[-n_expected_updates:]]))
def _check_write_affinity(
self, conf, policy_conf, policy, affinity_regions, affinity_count):
@ -1580,6 +1818,52 @@ class TestReplicatedObjController(BaseObjectControllerMixin,
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 202)
def test_x_timestamp_not_overridden(self):
def do_test(method, base_headers, resp_code):
# no given x-timestamp
req = swob.Request.blank(
'/v1/a/c/o', method=method, headers=base_headers)
codes = [resp_code] * self.replicas()
with mocked_http_conn(*codes) as fake_conn:
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, resp_code)
self.assertEqual(self.replicas(), len(fake_conn.requests))
for req in fake_conn.requests:
self.assertIn('X-Timestamp', req['headers'])
# check value can be parsed as valid timestamp
Timestamp(req['headers']['X-Timestamp'])
# given x-timestamp is retained
def do_check(ts):
headers = dict(base_headers)
headers['X-Timestamp'] = ts.internal
req = swob.Request.blank(
'/v1/a/c/o', method=method, headers=headers)
codes = [resp_code] * self.replicas()
with mocked_http_conn(*codes) as fake_conn:
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, resp_code)
self.assertEqual(self.replicas(), len(fake_conn.requests))
for req in fake_conn.requests:
self.assertEqual(ts.internal,
req['headers']['X-Timestamp'])
do_check(Timestamp.now())
do_check(Timestamp.now(offset=123))
# given x-timestamp gets sanity checked
headers = dict(base_headers)
headers['X-Timestamp'] = 'bad timestamp'
req = swob.Request.blank(
'/v1/a/c/o', method=method, headers=headers)
with mocked_http_conn() as fake_conn:
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 400)
self.assertIn('X-Timestamp should be a UNIX timestamp ', resp.body)
do_test('PUT', {'Content-Length': 0}, 200)
do_test('DELETE', {}, 204)
@patch_policies(
[StoragePolicy(0, '1-replica', True),
@ -4505,5 +4789,37 @@ class TestECDuplicationObjController(
self._test_determine_chunk_destinations_prioritize(1, 0)
class TestNumContainerUpdates(unittest.TestCase):
def test_it(self):
test_cases = [
# (container replicas, object replicas, object quorum, expected)
(3, 17, 13, 6), # EC 12+5
(3, 9, 4, 7), # EC 3+6
(3, 14, 11, 5), # EC 10+4
(5, 14, 11, 6), # EC 10+4, 5 container replicas
(7, 14, 11, 7), # EC 10+4, 7 container replicas
(3, 19, 16, 5), # EC 15+4
(5, 19, 16, 6), # EC 15+4, 5 container replicas
(3, 28, 22, 8), # EC (10+4)x2
(5, 28, 22, 9), # EC (10+4)x2, 5 container replicas
(3, 1, 1, 3), # 1 object replica
(3, 2, 1, 3), # 2 object replicas
(3, 3, 2, 3), # 3 object replicas
(3, 4, 2, 4), # 4 object replicas
(3, 5, 3, 4), # 5 object replicas
(3, 6, 3, 5), # 6 object replicas
(3, 7, 4, 5), # 7 object replicas
]
for c_replica, o_replica, o_quorum, exp in test_cases:
c_quorum = utils.quorum_size(c_replica)
got = obj.num_container_updates(c_replica, c_quorum,
o_replica, o_quorum)
self.assertEqual(
exp, got,
"Failed for c_replica=%d, o_replica=%d, o_quorum=%d" % (
c_replica, o_replica, o_quorum))
if __name__ == '__main__':
unittest.main()

View File

@ -62,6 +62,7 @@ from test.unit.helpers import setup_servers, teardown_servers
from swift.proxy import server as proxy_server
from swift.proxy.controllers.obj import ReplicatedObjectController
from swift.obj import server as object_server
from swift.common.bufferedhttp import BufferedHTTPResponse
from swift.common.middleware import proxy_logging, versioned_writes, \
copy, listing_formats
from swift.common.middleware.acl import parse_acl, format_acl
@ -5429,6 +5430,33 @@ class TestReplicatedObjectController(
self.assertEqual(
seen_headers, [
{'X-Container-Host': '10.0.0.0:1000',
'X-Container-Partition': '0',
'X-Container-Device': 'sda'},
{'X-Container-Host': '10.0.0.1:1001',
'X-Container-Partition': '0',
'X-Container-Device': 'sdb'},
{'X-Container-Host': None,
'X-Container-Partition': None,
'X-Container-Device': None}])
def test_PUT_x_container_headers_with_many_object_replicas(self):
POLICIES[0].object_ring.set_replicas(11)
req = Request.blank('/v1/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
headers={'Content-Length': '5'}, body='12345')
controller = ReplicatedObjectController(
self.app, 'a', 'c', 'o')
seen_headers = self._gather_x_container_headers(
controller.PUT, req,
# HEAD HEAD PUT PUT PUT PUT PUT PUT PUT PUT PUT PUT PUT
200, 200, 201, 201, 201, 201, 201, 201, 201, 201, 201, 201, 201)
self.assertEqual(
sorted(seen_headers), sorted([
{'X-Container-Host': '10.0.0.0:1000',
'X-Container-Partition': '0',
'X-Container-Device': 'sda'},
{'X-Container-Host': '10.0.0.0:1000',
'X-Container-Partition': '0',
'X-Container-Device': 'sda'},
@ -5437,7 +5465,29 @@ class TestReplicatedObjectController(
'X-Container-Device': 'sda'},
{'X-Container-Host': '10.0.0.1:1001',
'X-Container-Partition': '0',
'X-Container-Device': 'sdb'}])
'X-Container-Device': 'sdb'},
{'X-Container-Host': '10.0.0.1:1001',
'X-Container-Partition': '0',
'X-Container-Device': 'sdb'},
{'X-Container-Host': '10.0.0.2:1002',
'X-Container-Partition': '0',
'X-Container-Device': 'sdc'},
{'X-Container-Host': '10.0.0.2:1002',
'X-Container-Partition': '0',
'X-Container-Device': 'sdc'},
{'X-Container-Host': None,
'X-Container-Partition': None,
'X-Container-Device': None},
{'X-Container-Host': None,
'X-Container-Partition': None,
'X-Container-Device': None},
{'X-Container-Host': None,
'X-Container-Partition': None,
'X-Container-Device': None},
{'X-Container-Host': None,
'X-Container-Partition': None,
'X-Container-Device': None},
]))
def test_PUT_x_container_headers_with_more_container_replicas(self):
self.app.container_ring.set_replicas(4)
@ -5540,9 +5590,9 @@ class TestReplicatedObjectController(
'X-Delete-At-Partition': '0',
'X-Delete-At-Device': 'sdb'},
{'X-Delete-At-Host': None,
'X-Delete-At-Container': None,
'X-Delete-At-Partition': None,
'X-Delete-At-Device': None}
'X-Delete-At-Container': None,
'X-Delete-At-Device': None},
])
@mock.patch('time.time', new=lambda: STATIC_TIME)
@ -7272,6 +7322,48 @@ class TestObjectECRangedGET(unittest.TestCase):
self.assertIn('Content-Range', headers)
self.assertEqual('bytes */%d' % obj_len, headers['Content-Range'])
def test_unsatisfiable_socket_leak(self):
unclosed_http_responses = {}
tracked_responses = [0]
class LeakTrackingHTTPResponse(BufferedHTTPResponse):
def begin(self):
# no super(); we inherit from an old-style class (it's
# httplib's fault; don't try and fix it).
retval = BufferedHTTPResponse.begin(self)
if self.status != 204:
# This mock is overly broad and catches account and
# container HEAD requests too. We don't care about
# those; it's the object GETs that were leaky.
#
# Unfortunately, we don't have access to the request
# path here, so we use "status == 204" as a crude proxy
# for "not an object response".
unclosed_http_responses[id(self)] = self
tracked_responses[0] += 1
return retval
def close(self, *args, **kwargs):
rv = BufferedHTTPResponse.close(self, *args, **kwargs)
unclosed_http_responses.pop(id(self), None)
return rv
def __repr__(self):
swift_conn = getattr(self, 'swift_conn', None)
method = getattr(swift_conn, '_method', '<unknown>')
path = getattr(swift_conn, '_path', '<unknown>')
return '%s<method=%r path=%r>' % (
self.__class__.__name__, method, path)
obj_len = len(self.obj)
with mock.patch('swift.common.bufferedhttp.BufferedHTTPConnection'
'.response_class', LeakTrackingHTTPResponse):
status, headers, _junk = self._get_obj(
"bytes=%d-%d" % (obj_len, obj_len + 100))
self.assertEqual(status, 416) # sanity check
self.assertGreater(tracked_responses[0], 0) # ensure tracking happened
self.assertEqual(unclosed_http_responses, {})
def test_off_end(self):
# Ranged GET that's mostly off the end of the object, but overlaps
# it in just the last byte
@ -7837,7 +7929,7 @@ class TestContainerController(unittest.TestCase):
# fail to retrieve account info
test_status_map(
(503, 503, 503), # account_info fails on 503
404, missing_container=True)
500, missing_container=True)
# account fail after creation
test_status_map(
(404, 404, 404, # account_info fails on 404
@ -7848,7 +7940,7 @@ class TestContainerController(unittest.TestCase):
(503, 503, 404, # account_info fails on 404
503, 503, 503, # PUT account
503, 503, 404), # account_info fail
404, missing_container=True)
500, missing_container=True)
# put fails
test_status_map(
(404, 404, 404, # account_info fails on 404

11
tox.ini
View File

@ -59,11 +59,6 @@ commands =
basepython = python2.7
commands = ./.functests {posargs}
[testenv:func-post-as-copy]
commands = ./.functests {posargs}
setenv = SWIFT_TEST_IN_PROCESS=1
SWIFT_TEST_IN_PROCESS_OBJECT_POST_AS_COPY=True
[testenv:func-encryption]
commands = ./.functests {posargs}
setenv = SWIFT_TEST_IN_PROCESS=1
@ -120,3 +115,9 @@ commands = bindep test
[testenv:releasenotes]
commands = sphinx-build -a -W -E -d releasenotes/build/doctrees -b html releasenotes/source releasenotes/build/html
[testenv:func-post-as-copy]
skip_install = True
install_command = true {packages}
commands = true
whitelist_externals = true