merge master to feature/crypto

Change-Id: I5169ed18cecd37c5d700c5e74d8aa3a98e73835a
This commit is contained in:
John Dickinson 2015-11-11 09:36:47 -08:00
commit 19226e4449
95 changed files with 3106 additions and 2061 deletions

33
.alltests Executable file
View File

@ -0,0 +1,33 @@
#!/bin/bash
TOP_DIR=$(python -c "import os; print os.path.dirname(os.path.realpath('$0'))")
echo "==== Unit tests ===="
resetswift
$TOP_DIR/.unittests $@
rvalue=$?
if [ $rvalue != 0 ] ; then
exit $rvalue
fi
echo "==== Func tests ===="
resetswift
startmain
$TOP_DIR/.functests $@
rvalue=$?
if [ $rvalue != 0 ] ; then
exit $rvalue
fi
echo "==== Probe tests ===="
resetswift
$TOP_DIR/.probetests $@
rvalue=$?
if [ $rvalue != 0 ] ; then
exit $rvalue
fi
echo "All tests runs fine"
exit 0

View File

@ -177,7 +177,7 @@ Logging level. The default is INFO.
.IP \fBlog_address\fR
Logging address. The default is /dev/log.
.IP \fBper_diff\fR
The default is 1000.
Maximum number of database rows that will be sync'd in a single HTTP replication request. The default is 1000.
.IP \fBmax_diffs\fR
This caps how long the replicator will spend trying to sync a given database per pass so the other databases don't get starved. The default is 100.
.IP \fBconcurrency\fR

View File

@ -183,7 +183,7 @@ Logging level. The default is INFO.
.IP \fBlog_address\fR
Logging address. The default is /dev/log.
.IP \fBer_diff\fR
The default is 1000.
Maximum number of database rows that will be sync'd in a single HTTP replication request. The default is 1000.
.IP \fBmax_diffs\fR
This caps how long the replicator will spend trying to sync a given database per pass so the other databases don't get starved. The default is 100.
.IP \fBconcurrency\fR

View File

@ -83,7 +83,9 @@ Get drive audit error stats
.IP "\fB-T, --time\fR"
Check time synchronization
.IP "\fB--all\fR"
Perform all checks. Equivalent to \-arudlqT \-\-md5
Perform all checks. Equivalent to \-arudlqT
\-\-md5 \-\-sockstat \-\-auditor \-\-updater \-\-expirer
\-\-driveaudit \-\-validate\-servers
.IP "\fB--region=REGION\fR"
Only query servers in specified region
.IP "\fB-z ZONE, --zone=ZONE\fR"

View File

@ -110,8 +110,8 @@ You can create scripts to create the account and container rings and rebalance.
cd /etc/swift
rm -f account.builder account.ring.gz backups/account.builder backups/account.ring.gz
swift-ring-builder account.builder create 18 3 1
swift-ring-builder account.builder add z1-<account-server-1>:6002/sdb1 1
swift-ring-builder account.builder add z2-<account-server-2>:6002/sdb1 1
swift-ring-builder account.builder add r1z1-<account-server-1>:6002/sdb1 1
swift-ring-builder account.builder add r1z2-<account-server-2>:6002/sdb1 1
swift-ring-builder account.builder rebalance
You need to replace the values of <account-server-1>,
@ -121,7 +121,8 @@ You can create scripts to create the account and container rings and rebalance.
6002, and have a storage device called "sdb1" (this is a directory
name created under /drives when we setup the account server). The
"z1", "z2", etc. designate zones, and you can choose whether you
put devices in the same or different zones.
put devices in the same or different zones. The "r1" designates
the region, with different regions specified as "r1", "r2", etc.
2. Make the script file executable and run it to create the account ring file::
@ -588,7 +589,9 @@ This information can also be queried via the swift-recon command line utility::
--md5 Get md5sum of servers ring and compare to local copy
--sockstat Get cluster socket usage stats
-T, --time Check time synchronization
--all Perform all checks. Equal to -arudlqT --md5 --sockstat
--all Perform all checks. Equal to
-arudlqT --md5 --sockstat --auditor --updater
--expirer --driveaudit --validate-servers
-z ZONE, --zone=ZONE Only query servers in specified zone
-t SECONDS, --timeout=SECONDS
Time to wait for a response from a server

View File

@ -1,5 +1,3 @@
.. _formpost:
====================
Form POST middleware
====================

View File

@ -293,10 +293,12 @@ a manifest object but a normal object with content same as what you would
get on a **GET** request to original manifest object.
To duplicate a manifest object:
* Use the **GET** operation to read the value of ``X-Object-Manifest`` and
use this value in the ``X-Object-Manifest`` request header in a **PUT**
operation.
* Alternatively, you can include *``?multipart-manifest=get``* query
string in the **COPY** request.
This creates a new manifest object that shares the same set of segment
objects as the original manifest object.

View File

@ -60,7 +60,7 @@ Content Distribution Network Integration
Alternative API
---------------
* `Swift3 <https://github.com/stackforge/swift3>`_ - Amazon S3 API emulation.
* `Swift3 <https://github.com/openstack/swift3>`_ - Amazon S3 API emulation.
* `CDMI <https://github.com/osaddon/cdmi>`_ - CDMI support
@ -81,8 +81,8 @@ Custom Logger Hooks
Storage Backends (DiskFile API implementations)
-----------------------------------------------
* `Swift-on-File <https://github.com/stackforge/swiftonfile>`_ - Enables objects created using Swift API to be accessed as files on a POSIX filesystem and vice versa.
* `swift-ceph-backend <https://github.com/stackforge/swift-ceph-backend>`_ - Ceph RADOS object server implementation for Swift.
* `Swift-on-File <https://github.com/openstack/swiftonfile>`_ - Enables objects created using Swift API to be accessed as files on a POSIX filesystem and vice versa.
* `swift-ceph-backend <https://github.com/openstack/swift-ceph-backend>`_ - Ceph RADOS object server implementation for Swift.
* `kinetic-swift <https://github.com/swiftstack/kinetic-swift>`_ - Seagate Kinetic Drive as backend for Swift
* `swift-scality-backend <https://github.com/scality/ScalitySproxydSwift>`_ - Scality sproxyd object server implementation for Swift.
@ -95,7 +95,7 @@ Developer Tools
* `SAIO Ansible playbook <https://github.com/thiagodasilva/swift-aio>`_ -
Quickly setup a standard development environment using Vagrant and Ansible in
a Fedora virtual machine (with built-in `Swift-on-File
<https://github.com/stackforge/swiftonfile>`_ support).
<https://github.com/openstack/swiftonfile>`_ support).
Other
-----

View File

@ -559,53 +559,60 @@ replication_failure_ratio 1.0 If the value of failures /
[object-replicator]
================== ================= =======================================
Option Default Description
------------------ ----------------- ---------------------------------------
log_name object-replicator Label used when logging
log_facility LOG_LOCAL0 Syslog log facility
log_level INFO Logging level
daemonize yes Whether or not to run replication as a
daemon
interval 30 Time in seconds to wait between
replication passes
concurrency 1 Number of replication workers to spawn
timeout 5 Timeout value sent to rsync --timeout
and --contimeout options
stats_interval 3600 Interval in seconds between logging
replication statistics
reclaim_age 604800 Time elapsed in seconds before an
object can be reclaimed
handoffs_first false If set to True, partitions that are
not supposed to be on the node will be
replicated first. The default setting
should not be changed, except for
extreme situations.
handoff_delete auto By default handoff partitions will be
removed when it has successfully
replicated to all the canonical nodes.
If set to an integer n, it will remove
the partition if it is successfully
replicated to n nodes. The default
setting should not be changed, except
for extreme situations.
node_timeout DEFAULT or 10 Request timeout to external services.
This uses what's set here, or what's set
in the DEFAULT section, or 10 (though
other sections use 3 as the final
default).
rsync_module {replication_ip}::object
Format of the rsync module where the
replicator will send data. The
configuration value can include some
variables that will be extracted from
the ring. Variables must follow the
format {NAME} where NAME is one of:
ip, port, replication_ip,
replication_port, region, zone, device,
meta. See etc/rsyncd.conf-sample for
some examples.
================== ================= =======================================
================== ======================== ================================
Option Default Description
------------------ ------------------------ --------------------------------
log_name object-replicator Label used when logging
log_facility LOG_LOCAL0 Syslog log facility
log_level INFO Logging level
daemonize yes Whether or not to run replication
as a daemon
interval 30 Time in seconds to wait between
replication passes
concurrency 1 Number of replication workers to
spawn
timeout 5 Timeout value sent to rsync
--timeout and --contimeout
options
stats_interval 3600 Interval in seconds between
logging replication statistics
reclaim_age 604800 Time elapsed in seconds before an
object can be reclaimed
handoffs_first false If set to True, partitions that
are not supposed to be on the
node will be replicated first.
The default setting should not be
changed, except for extreme
situations.
handoff_delete auto By default handoff partitions
will be removed when it has
successfully replicated to all
the canonical nodes. If set to an
integer n, it will remove the
partition if it is successfully
replicated to n nodes. The
default setting should not be
changed, except for extreme
situations.
node_timeout DEFAULT or 10 Request timeout to external
services. This uses what's set
here, or what's set in the
DEFAULT section, or 10 (though
other sections use 3 as the final
default).
rsync_module {replication_ip}::object Format of the rsync module where
the replicator will send data.
The configuration value can
include some variables that will
be extracted from the ring.
Variables must follow the format
{NAME} where NAME is one of: ip,
port, replication_ip,
replication_port, region, zone,
device, meta. See
etc/rsyncd.conf-sample for some
examples.
================== ======================== ================================
[object-updater]
@ -718,35 +725,53 @@ allow_versions false Enable/Disable object versioning feature
[container-replicator]
================== ==================== ====================================
Option Default Description
------------------ -------------------- ------------------------------------
log_name container-replicator Label used when logging
log_facility LOG_LOCAL0 Syslog log facility
log_level INFO Logging level
per_diff 1000
concurrency 8 Number of replication workers to
spawn
interval 30 Time in seconds to wait between
replication passes
node_timeout 10 Request timeout to external services
conn_timeout 0.5 Connection timeout to external
services
reclaim_age 604800 Time elapsed in seconds before a
container can be reclaimed
rsync_module {replication_ip}::container
Format of the rsync module where the
replicator will send data. The
configuration value can include some
variables that will be extracted from
the ring. Variables must follow the
format {NAME} where NAME is one of:
ip, port, replication_ip,
replication_port, region, zone,
device, meta. See
etc/rsyncd.conf-sample for some
examples.
================== ==================== ====================================
================== =========================== =============================
Option Default Description
------------------ --------------------------- -----------------------------
log_name container-replicator Label used when logging
log_facility LOG_LOCAL0 Syslog log facility
log_level INFO Logging level
per_diff 1000 Maximum number of database
rows that will be sync'd in a
single HTTP replication
request. Databases with less
than or equal to this number
of differing rows will always
be sync'd using an HTTP
replication request rather
than using rsync.
max_diffs 100 Maximum number of HTTP
replication requests attempted
on each replication pass for
any one container. This caps
how long the replicator will
spend trying to sync a given
database per pass so the other
databases don't get starved.
concurrency 8 Number of replication workers
to spawn
interval 30 Time in seconds to wait
between replication passes
node_timeout 10 Request timeout to external
services
conn_timeout 0.5 Connection timeout to external
services
reclaim_age 604800 Time elapsed in seconds before
a container can be reclaimed
rsync_module {replication_ip}::container Format of the rsync module
where the replicator will send
data. The configuration value
can include some variables
that will be extracted from
the ring. Variables must
follow the format {NAME} where
NAME is one of: ip, port,
replication_ip,
replication_port, region,
zone, device, meta. See
etc/rsyncd.conf-sample for
some examples.
================== =========================== =============================
[container-updater]
@ -859,33 +884,51 @@ set log_level INFO Logging level
[account-replicator]
================== ================== ======================================
Option Default Description
------------------ ------------------ --------------------------------------
log_name account-replicator Label used when logging
log_facility LOG_LOCAL0 Syslog log facility
log_level INFO Logging level
per_diff 1000
concurrency 8 Number of replication workers to spawn
interval 30 Time in seconds to wait between
replication passes
node_timeout 10 Request timeout to external services
conn_timeout 0.5 Connection timeout to external services
reclaim_age 604800 Time elapsed in seconds before an
account can be reclaimed
rsync_module {replication_ip}::account
Format of the rsync module where the
replicator will send data. The
configuration value can include some
variables that will be extracted from
the ring. Variables must follow the
format {NAME} where NAME is one of:
ip, port, replication_ip,
replication_port, region, zone,
device, meta. See
etc/rsyncd.conf-sample for some
examples.
================== ================== ======================================
================== ========================= ===============================
Option Default Description
------------------ ------------------------- -------------------------------
log_name account-replicator Label used when logging
log_facility LOG_LOCAL0 Syslog log facility
log_level INFO Logging level
per_diff 1000 Maximum number of database rows
that will be sync'd in a single
HTTP replication request.
Databases with less than or
equal to this number of
differing rows will always be
sync'd using an HTTP replication
request rather than using rsync.
max_diffs 100 Maximum number of HTTP
replication requests attempted
on each replication pass for any
one container. This caps how
long the replicator will spend
trying to sync a given database
per pass so the other databases
don't get starved.
concurrency 8 Number of replication workers
to spawn
interval 30 Time in seconds to wait between
replication passes
node_timeout 10 Request timeout to external
services
conn_timeout 0.5 Connection timeout to external
services
reclaim_age 604800 Time elapsed in seconds before
an account can be reclaimed
rsync_module {replication_ip}::account Format of the rsync module where
the replicator will send data.
The configuration value can
include some variables that will
be extracted from the ring.
Variables must follow the format
{NAME} where NAME is one of: ip,
port, replication_ip,
replication_port, region, zone,
device, meta. See
etc/rsyncd.conf-sample for some
examples.
================== ========================= ===============================
[account-auditor]

View File

@ -352,7 +352,7 @@ folks a start on their own code if they want to use repoze.what::
self.ssl = \
conf.get('ssl', 'false').lower() in ('true', 'on', '1', 'yes')
self.auth_prefix = conf.get('prefix', '/')
self.timeout = int(conf.get('node_timeout', 10))
self.timeout = float(conf.get('node_timeout', 10))
def authenticate(self, env, identity):
token = identity.get('token')

View File

@ -51,16 +51,16 @@ To execute the unit tests:
.. note::
As of tox version 2.0.0, most environment variables are not automatically
passed to the test environment. Swift's tox.ini overrides this default
behavior so that variable names matching SWIFT_* and *_proxy will be passed,
but you may need to run tox --recreate for this to take effect after
upgrading from tox<2.0.0.
passed to the test environment. Swift's `tox.ini` overrides this default
behavior so that variable names matching ``SWIFT_*`` and ``*_proxy`` will be
passed, but you may need to run `tox --recreate` for this to take effect
after upgrading from tox<2.0.0.
Conversely, if you do not want those environment variables to be passed to
the test environment then you will need to unset them before calling tox.
Also, if you ever encounter DistributionNotFound, try to use `tox --recreate`
or remove the .tox directory to force tox to recreate the dependency list.
or remove the `.tox` directory to force tox to recreate the dependency list.
The functional tests may be executed against a :doc:`development_saio` or
other running Swift cluster using the command:

View File

@ -41,7 +41,7 @@ changes to Swift.
Testing
-------
The `Development Guidelines <development_guidelines>`_ describes the testing
The :doc:`Development Guidelines <development_guidelines>` describes the testing
requirements before submitting Swift code.
In summary, you can execute tox from the swift home directory (where you

View File

@ -92,9 +92,9 @@ Domain Remap
Dynamic Large Objects
=====================
.. automodule:: swift.common.middleware.dlo
:members:
:show-inheritance:
DLO support centers around a user specified filter that matches
segments and concatenates them together in object listing order. Please see
the DLO docs for :ref:`dlo-doc` further details.
.. _formpost:
@ -187,14 +187,12 @@ Recon
:members:
:show-inheritance:
.. _slo-doc:
Static Large Objects
====================
.. automodule:: swift.common.middleware.slo
:members:
:show-inheritance:
Please see
the SLO docs for :ref:`slo-doc` further details.
.. _staticweb:

View File

@ -254,9 +254,11 @@ This configuration works as follows:
``admin`` or ``swiftoperator`` role(s). When validated, the service token
gives the ``service`` role.
* Swift interprets the above configuration as follows:
* Did the user token provide one of the roles listed in operator_roles?
* Did the service token have the ``service`` role as described by the
``SERVICE_service_roles`` options.
* If both conditions are met, the request is granted. Otherwise, Swift
rejects the request.

View File

@ -171,6 +171,7 @@ The sequence of events and actions are as follows:
a copy of the <user-token>. In the X-Service-Token header, place your
Service's token. If you use python-swiftclient you can achieve this
by:
* Putting the URL in the ``preauthurl`` parameter
* Putting the <user-token> in ``preauthtoken`` paramater
* Adding the X-Service-Token to the ``headers`` parameter
@ -251,7 +252,7 @@ However, if one Service is compromised, that Service can access
data created by another Service. To prevent this, multiple Service Prefixes may
be used. This also requires that the operator configure multiple service
roles. For example, in a system that has Glance and Cinder, the following
Swift configuration could be used:
Swift configuration could be used::
[keystoneauth]
reseller_prefix = AUTH_, IMAGE_, BLOCK_

View File

@ -17,112 +17,28 @@ with the possibility of parallel uploads of the segments.
.. _dynamic-large-objects:
.. _dlo-doc:
---------------------
Dynamic Large Objects
---------------------
---------------
Using ``swift``
---------------
The quickest way to try out this feature is use the ``swift`` Swift Tool
included with the `python-swiftclient`_ library. You can use the ``-S``
option to specify the segment size to use when splitting a large file. For
example::
swift upload test_container -S 1073741824 large_file
This would split the large_file into 1G segments and begin uploading those
segments in parallel. Once all the segments have been uploaded, ``swift`` will
then create the manifest file so the segments can be downloaded as one.
So now, the following ``swift`` command would download the entire large object::
swift download test_container large_file
``swift`` command uses a strict convention for its segmented object
support. In the above example it will upload all the segments into a
second container named test_container_segments. These segments will
have names like large_file/1290206778.25/21474836480/00000000,
large_file/1290206778.25/21474836480/00000001, etc.
The main benefit for using a separate container is that the main container
listings will not be polluted with all the segment names. The reason for using
the segment name format of <name>/<timestamp>/<size>/<segment> is so that an
upload of a new file with the same name won't overwrite the contents of the
first until the last moment when the manifest file is updated.
``swift`` will manage these segment files for you, deleting old segments on
deletes and overwrites, etc. You can override this behavior with the
``--leave-segments`` option if desired; this is useful if you want to have
multiple versions of the same large object available.
.. _`python-swiftclient`: http://github.com/openstack/python-swiftclient
----------
Direct API
----------
You can also work with the segments and manifests directly with HTTP
requests instead of having ``swift`` do that for you. You can just
upload the segments like you would any other object and the manifest
is just a zero-byte (not enforced) file with an extra
``X-Object-Manifest`` header.
All the object segments need to be in the same container, have a common object
name prefix, and sort in the order in which they should be concatenated.
Object names are sorted lexicographically as UTF-8 byte strings.
They don't have to be in the same container as the manifest file will be, which
is useful to keep container listings clean as explained above with ``swift``.
The manifest file is simply a zero-byte (not enforced) file with the extra
``X-Object-Manifest: <container>/<prefix>`` header, where ``<container>`` is
the container the object segments are in and ``<prefix>`` is the common prefix
for all the segments.
It is best to upload all the segments first and then create or update the
manifest. In this way, the full object won't be available for downloading until
the upload is complete. Also, you can upload a new set of segments to a second
location and then update the manifest to point to this new location. During the
upload of the new segments, the original manifest will still be available to
download the first set of segments.
.. note::
The manifest file should have no content. However, this is not enforced.
If the manifest path itself conforms to container/prefix specified in
X-Object-Manifest, and if manifest has some content/data in it, it would
also be considered as segment and manifest's content will be part of the
concatenated GET response. The order of concatenation follows the usual DLO
logic which is - the order of concatenation adheres to order returned when
segment names are sorted.
Here's an example using ``curl`` with tiny 1-byte segments::
# First, upload the segments
curl -X PUT -H 'X-Auth-Token: <token>' \
http://<storage_url>/container/myobject/00000001 --data-binary '1'
curl -X PUT -H 'X-Auth-Token: <token>' \
http://<storage_url>/container/myobject/00000002 --data-binary '2'
curl -X PUT -H 'X-Auth-Token: <token>' \
http://<storage_url>/container/myobject/00000003 --data-binary '3'
# Next, create the manifest file
curl -X PUT -H 'X-Auth-Token: <token>' \
-H 'X-Object-Manifest: container/myobject/' \
http://<storage_url>/container/myobject --data-binary ''
# And now we can download the segments as a single object
curl -H 'X-Auth-Token: <token>' \
http://<storage_url>/container/myobject
.. automodule:: swift.common.middleware.dlo
:members:
:show-inheritance:
.. _static-large-objects:
.. _slo-doc:
--------------------
Static Large Objects
--------------------
.. automodule:: swift.common.middleware.slo
:members:
:show-inheritance:
----------
Direct API
----------

View File

@ -90,8 +90,19 @@ use = egg:swift#recon
# log_level = INFO
# log_address = /dev/log
#
# Maximum number of database rows that will be sync'd in a single HTTP
# replication request. Databases with less than or equal to this number of
# differing rows will always be sync'd using an HTTP replication request rather
# than using rsync.
# per_diff = 1000
#
# Maximum number of HTTP replication requests attempted on each replication
# pass for any one container. This caps how long the replicator will spend
# trying to sync a given database per pass so the other databases don't get
# starved.
# max_diffs = 100
#
# Number of replication workers to spawn.
# concurrency = 8
#
# Time in seconds to wait between replication passes

View File

@ -99,8 +99,19 @@ use = egg:swift#recon
# log_level = INFO
# log_address = /dev/log
#
# Maximum number of database rows that will be sync'd in a single HTTP
# replication request. Databases with less than or equal to this number of
# differing rows will always be sync'd using an HTTP replication request rather
# than using rsync.
# per_diff = 1000
#
# Maximum number of HTTP replication requests attempted on each replication
# pass for any one container. This caps how long the replicator will spend
# trying to sync a given database per pass so the other databases don't get
# starved.
# max_diffs = 100
#
# Number of replication workers to spawn.
# concurrency = 8
#
# Time in seconds to wait between replication passes

View File

@ -2,7 +2,9 @@
# of appearance. Changing the order has an impact on the overall integration
# process, which may cause wedges in the gate later.
dnspython>=1.9.4
pbr>=1.6
dnspython>=1.12.0;python_version<'3.0'
dnspython3>=1.12.0;python_version>='3.0'
eventlet>=0.16.1,!=0.17.0
greenlet>=0.3.1
netifaces>=0.5,!=0.10.0,!=0.10.1

View File

@ -70,7 +70,7 @@ class AccountReaper(Daemon):
self.account_ring = None
self.container_ring = None
self.object_ring = None
self.node_timeout = int(conf.get('node_timeout', 10))
self.node_timeout = float(conf.get('node_timeout', 10))
self.conn_timeout = float(conf.get('conn_timeout', 0.5))
self.myips = whataremyips(conf.get('bind_ip', '0.0.0.0'))
self.bind_port = int(conf.get('bind_port', 0))

View File

@ -978,7 +978,8 @@ class SwiftRecon(object):
order.')
args.add_option('--all', action="store_true",
help="Perform all checks. Equal to \t\t\t-arudlqT "
"--md5 --sockstat --auditor --updater --expirer")
"--md5 --sockstat --auditor --updater --expirer "
"--driveaudit --validate-servers")
args.add_option('--region', type="int",
help="Only query servers in specified region")
args.add_option('--zone', '-z', type="int",
@ -1018,22 +1019,21 @@ class SwiftRecon(object):
if options.all:
if self.server_type == 'object':
self.async_check(hosts)
self.replication_check(hosts)
self.object_auditor_check(hosts)
self.updater_check(hosts)
self.expirer_check(hosts)
elif self.server_type == 'container':
self.replication_check(hosts)
self.auditor_check(hosts)
self.updater_check(hosts)
elif self.server_type == 'account':
self.replication_check(hosts)
self.auditor_check(hosts)
self.replication_check(hosts)
self.umount_check(hosts)
self.load_check(hosts)
self.disk_usage(hosts, options.top, options.lowest,
options.human_readable)
self.get_ringmd5(hosts, swift_dir)
self.get_swiftconfmd5(hosts)
self.quarantine_check(hosts)
self.socket_usage(hosts)
self.server_type_check(hosts)

View File

@ -306,18 +306,20 @@ def run_scenario(scenario):
command_f(*command)
rebalance_number = 1
parts_moved, old_balance = rb.rebalance(seed=seed)
parts_moved, old_balance, removed_devs = rb.rebalance(seed=seed)
rb.pretend_min_part_hours_passed()
print "\tRebalance 1: moved %d parts, balance is %.6f" % (
parts_moved, old_balance)
print "\tRebalance 1: moved %d parts, balance is %.6f, \
%d removed devs" % (
parts_moved, old_balance, removed_devs)
while True:
rebalance_number += 1
parts_moved, new_balance = rb.rebalance(seed=seed)
parts_moved, new_balance, removed_devs = rb.rebalance(seed=seed)
rb.pretend_min_part_hours_passed()
print "\tRebalance %d: moved %d parts, balance is %.6f" % (
rebalance_number, parts_moved, new_balance)
if parts_moved == 0:
print "\tRebalance %d: moved %d parts, balance is %.6f, \
%d removed devs" % (
rebalance_number, parts_moved, new_balance, removed_devs)
if parts_moved == 0 and removed_devs == 0:
break
if abs(new_balance - old_balance) < 1 and not (
old_balance == builder.MAX_BALANCE and

View File

@ -18,7 +18,7 @@ from __future__ import print_function
import logging
from errno import EEXIST
from itertools import islice, izip
from itertools import islice
from operator import itemgetter
from os import mkdir
from os.path import basename, abspath, dirname, exists, join as pathjoin
@ -27,6 +27,8 @@ from textwrap import wrap
from time import time
import optparse
import math
from six.moves import zip as izip
from six.moves import input
from swift.common import exceptions
@ -419,6 +421,8 @@ swift-ring-builder <builder_file> create <part_power> <replicas>
"""
swift-ring-builder <builder_file>
Shows information about the ring and the devices within.
Flags:
DEL - marked for removal and will be removed next rebalance.
"""
print('%s, build version %d' % (builder_file, builder.version))
regions = 0
@ -446,7 +450,7 @@ swift-ring-builder <builder_file>
if builder.devs:
print('Devices: id region zone ip address port '
'replication ip replication port name '
'weight partitions balance meta')
'weight partitions balance flags meta')
weighted_parts = builder.parts * builder.replicas / \
sum(d['weight'] for d in builder.devs if d is not None)
for dev in builder.devs:
@ -460,12 +464,13 @@ swift-ring-builder <builder_file>
else:
balance = 100.0 * dev['parts'] / \
(dev['weight'] * weighted_parts) - 100.0
flags = 'DEL' if dev in builder._remove_devs else ''
print(' %5d %7d %5d %15s %5d %15s %17d %9s %6.02f '
'%10s %7.02f %s' %
'%10s %7.02f %5s %s' %
(dev['id'], dev['region'], dev['zone'], dev['ip'],
dev['port'], dev['replication_ip'],
dev['replication_port'], dev['device'], dev['weight'],
dev['parts'], balance, dev['meta']))
dev['parts'], balance, flags, dev['meta']))
exit(EXIT_SUCCESS)
def search():
@ -795,7 +800,7 @@ swift-ring-builder <builder_file> rebalance [options]
devs_changed = builder.devs_changed
try:
last_balance = builder.get_balance()
parts, balance = builder.rebalance(seed=get_seed(3))
parts, balance, removed_devs = builder.rebalance(seed=get_seed(3))
except exceptions.RingBuilderError as e:
print('-' * 79)
print("An error has occurred during ring validation. Common\n"
@ -805,7 +810,7 @@ swift-ring-builder <builder_file> rebalance [options]
(e,))
print('-' * 79)
exit(EXIT_ERROR)
if not (parts or options.force):
if not (parts or options.force or removed_devs):
print('No partitions could be reassigned.')
print('Either none need to be or none can be due to '
'min_part_hours [%s].' % builder.min_part_hours)
@ -1190,12 +1195,12 @@ def main(arguments=None):
if argv[0].endswith('-safe'):
try:
with lock_parent_directory(abspath(builder_file), 15):
Commands.__dict__.get(command, Commands.unknown.im_func)()
Commands.__dict__.get(command, Commands.unknown.__func__)()
except exceptions.LockTimeout:
print("Ring/builder dir currently locked.")
exit(2)
else:
Commands.__dict__.get(command, Commands.unknown.im_func)()
Commands.__dict__.get(command, Commands.unknown.__func__)()
if __name__ == '__main__':

View File

@ -146,7 +146,9 @@ def check_metadata(req, target_type):
meta_count = 0
meta_size = 0
for key, value in req.headers.items():
if isinstance(value, basestring) and len(value) > MAX_HEADER_SIZE:
if (isinstance(value, six.string_types)
and len(value) > MAX_HEADER_SIZE):
return HTTPBadRequest(body='Header value too long: %s' %
key[:MAX_META_NAME_LENGTH],
request=req, content_type='text/plain')

View File

@ -166,7 +166,7 @@ class Replicator(Daemon):
self.max_diffs = int(conf.get('max_diffs') or 100)
self.interval = int(conf.get('interval') or
conf.get('run_pause') or 30)
self.node_timeout = int(conf.get('node_timeout', 10))
self.node_timeout = float(conf.get('node_timeout', 10))
self.conn_timeout = float(conf.get('conn_timeout', 0.5))
self.rsync_compress = config_true_value(
conf.get('rsync_compress', 'no'))
@ -434,8 +434,12 @@ class Replicator(Daemon):
if self._in_sync(rinfo, info, broker, local_sync):
return True
# if the difference in rowids between the two differs by
# more than 50%, rsync then do a remote merge.
if rinfo['max_row'] / float(info['max_row']) < 0.5:
# more than 50% and the difference is greater than per_diff,
# rsync then do a remote merge.
# NOTE: difference > per_diff stops us from dropping to rsync
# on smaller containers, who have only a few rows to sync.
if rinfo['max_row'] / float(info['max_row']) < 0.5 and \
info['max_row'] - rinfo['max_row'] > self.per_diff:
self.stats['remote_merge'] += 1
self.logger.increment('remote_merges')
return self._rsync_db(broker, node, http, info['id'],

View File

@ -23,6 +23,7 @@ import socket
from time import time
from eventlet import sleep, Timeout
import six
from six.moves.http_client import HTTPException
from swift.common.bufferedhttp import http_connect
@ -399,7 +400,7 @@ def direct_put_object(node, part, account, container, name, contents,
headers['Content-Type'] = 'application/octet-stream'
if not contents:
headers['Content-Length'] = '0'
if isinstance(contents, basestring):
if isinstance(contents, six.string_types):
contents = [contents]
# Incase the caller want to insert an object with specific age
add_ts = 'X-Timestamp' not in headers

View File

@ -27,7 +27,7 @@ from time import gmtime, strftime, time
from zlib import compressobj
from swift.common.utils import quote
from swift.common.http import HTTP_NOT_FOUND
from swift.common.http import HTTP_NOT_FOUND, HTTP_MULTIPLE_CHOICES
from swift.common.swob import Request
from swift.common.wsgi import loadapp, pipeline_property
@ -256,6 +256,8 @@ class InternalClient(object):
(path, quote(marker), quote(end_marker)),
{}, acceptable_statuses)
if not resp.status_int == 200:
if resp.status_int >= HTTP_MULTIPLE_CHOICES:
''.join(resp.app_iter)
break
data = json.loads(resp.body)
if not data:

View File

@ -121,6 +121,67 @@ class Bulk(object):
Only regular files will be uploaded. Empty directories, symlinks, etc will
not be uploaded.
Content Type:
If the content-type header is set in the extract-archive call, Swift will
assign that content-type to all the underlying files. The bulk middleware
will extract the archive file and send the internal files using PUT
operations using the same headers from the original request
(e.g. auth-tokens, content-Type, etc.). Notice that any middleware call
that follows the bulk middleware does not know if this was a bulk request
or if these were individual requests sent by the user.
In order to make Swift detect the content-type for the files based on the
file extension, the content-type in the extract-archive call should not be
set. Alternatively, it is possible to explicitly tell swift to detect the
content type using this header:
X-Detect-Content-Type:true
For example:
curl -X PUT http://127.0.0.1/v1/AUTH_acc/cont/$?extract-archive=tar -T
backup.tar -H "Content-Type: application/x-tar" -H "X-Auth-Token: xxx"
-H "X-Detect-Content-Type:true"
Assigning Metadata:
The tar file format (1) allows for UTF-8 key/value pairs to be associated
with each file in an archive. If a file has extended attributes, then tar
will store those as key/value pairs. The bulk middleware can read those
extended attributes and convert them to Swift object metadata. Attributes
starting with "user.meta" are converted to object metadata, and
"user.mime_type" is converted to Content-Type.
For example:
setfattr -n user.mime_type -v "application/python-setup" setup.py
setfattr -n user.meta.lunch -v "burger and fries" setup.py
setfattr -n user.meta.dinner -v "baked ziti" setup.py
setfattr -n user.stuff -v "whee" setup.py
Will get translated to headers:
Content-Type: application/python-setup
X-Object-Meta-Lunch: burger and fries
X-Object-Meta-Dinner: baked ziti
The bulk middleware will handle xattrs stored by both GNU and BSD tar (2).
Only xattrs user.mime_type and user.meta.* are processed. Other attributes
are ignored.
Notes:
(1) The POSIX 1003.1-2001 (pax) format. The default format on GNU tar
1.27.1 or later.
(2) Even with pax-format tarballs, different encoders store xattrs slightly
differently; for example, GNU tar stores the xattr "user.userattribute" as
pax header "SCHILY.xattr.user.userattribute", while BSD tar (which uses
libarchive) stores it as "LIBARCHIVE.xattr.user.userattribute".
Response:
The response from bulk operations functions differently from other swift
responses. This is because a short request body sent from the client could
result in many operations on the proxy server and precautions need to be

View File

@ -13,6 +13,107 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Middleware that will provide Dynamic Large Object (DLO) support.
---------------
Using ``swift``
---------------
The quickest way to try out this feature is use the ``swift`` Swift Tool
included with the `python-swiftclient`_ library. You can use the ``-S``
option to specify the segment size to use when splitting a large file. For
example::
swift upload test_container -S 1073741824 large_file
This would split the large_file into 1G segments and begin uploading those
segments in parallel. Once all the segments have been uploaded, ``swift`` will
then create the manifest file so the segments can be downloaded as one.
So now, the following ``swift`` command would download the entire large
object::
swift download test_container large_file
``swift`` command uses a strict convention for its segmented object
support. In the above example it will upload all the segments into a
second container named test_container_segments. These segments will
have names like large_file/1290206778.25/21474836480/00000000,
large_file/1290206778.25/21474836480/00000001, etc.
The main benefit for using a separate container is that the main container
listings will not be polluted with all the segment names. The reason for using
the segment name format of <name>/<timestamp>/<size>/<segment> is so that an
upload of a new file with the same name won't overwrite the contents of the
first until the last moment when the manifest file is updated.
``swift`` will manage these segment files for you, deleting old segments on
deletes and overwrites, etc. You can override this behavior with the
``--leave-segments`` option if desired; this is useful if you want to have
multiple versions of the same large object available.
.. _`python-swiftclient`: http://github.com/openstack/python-swiftclient
----------
Direct API
----------
You can also work with the segments and manifests directly with HTTP
requests instead of having ``swift`` do that for you. You can just
upload the segments like you would any other object and the manifest
is just a zero-byte (not enforced) file with an extra
``X-Object-Manifest`` header.
All the object segments need to be in the same container, have a common object
name prefix, and sort in the order in which they should be concatenated.
Object names are sorted lexicographically as UTF-8 byte strings.
They don't have to be in the same container as the manifest file will be, which
is useful to keep container listings clean as explained above with ``swift``.
The manifest file is simply a zero-byte (not enforced) file with the extra
``X-Object-Manifest: <container>/<prefix>`` header, where ``<container>`` is
the container the object segments are in and ``<prefix>`` is the common prefix
for all the segments.
It is best to upload all the segments first and then create or update the
manifest. In this way, the full object won't be available for downloading
until the upload is complete. Also, you can upload a new set of segments to
a second location and then update the manifest to point to this new location.
During the upload of the new segments, the original manifest will still be
available to download the first set of segments.
.. note::
The manifest file should have no content. However, this is not enforced.
If the manifest path itself conforms to container/prefix specified in
X-Object-Manifest, and if manifest has some content/data in it, it would
also be considered as segment and manifest's content will be part of the
concatenated GET response. The order of concatenation follows the usual DLO
logic which is - the order of concatenation adheres to order returned when
segment names are sorted.
Here's an example using ``curl`` with tiny 1-byte segments::
# First, upload the segments
curl -X PUT -H 'X-Auth-Token: <token>' \
http://<storage_url>/container/myobject/00000001 --data-binary '1'
curl -X PUT -H 'X-Auth-Token: <token>' \
http://<storage_url>/container/myobject/00000002 --data-binary '2'
curl -X PUT -H 'X-Auth-Token: <token>' \
http://<storage_url>/container/myobject/00000003 --data-binary '3'
# Next, create the manifest file
curl -X PUT -H 'X-Auth-Token: <token>' \
-H 'X-Object-Manifest: container/myobject/' \
http://<storage_url>/container/myobject --data-binary ''
# And now we can download the segments as a single object
curl -H 'X-Auth-Token: <token>' \
http://<storage_url>/container/myobject
"""
import os
import six

View File

@ -79,12 +79,7 @@ class NameCheckMiddleware(object):
self.logger.debug("name_check: self.forbidden_chars %s" %
self.forbidden_chars)
for c in unquote(req.path):
if c in self.forbidden_chars:
return True
else:
pass
return False
return any((c in unquote(req.path)) for c in self.forbidden_chars)
def check_length(self, req):
'''
@ -93,10 +88,7 @@ class NameCheckMiddleware(object):
Returns False if the length is <= the maximum
'''
length = len(unquote(req.path))
if length > self.maximum_length:
return True
else:
return False
return length > self.maximum_length
def check_regexp(self, req):
'''

View File

@ -26,26 +26,31 @@ defined manifest of the object segments is used.
Uploading the Manifest
----------------------
After the user has uploaded the objects to be concatenated a manifest is
After the user has uploaded the objects to be concatenated, a manifest is
uploaded. The request must be a PUT with the query parameter::
?multipart-manifest=put
The body of this request will be an ordered list of files in
json data format. The data to be supplied for each segment is::
The body of this request will be an ordered list of segment descriptions in
JSON format. The data to be supplied for each segment is:
path: the path to the segment object (not including account)
/container/object_name
etag: the etag given back when the segment object was PUT,
or null
size_bytes: the size of the complete segment object in
bytes, or null
range: (Optional) the range within the object to use as a
segment. If omitted, the entire object is used.
=========== ========================================================
Key Description
=========== ========================================================
path the path to the segment object (not including account)
/container/object_name
etag the ETag given back when the segment object was PUT,
or null
size_bytes the size of the complete segment object in
bytes, or null
range (optional) the (inclusive) range within the object to
use as a segment. If omitted, the entire object is used.
=========== ========================================================
The format of the list will be::
The format of the list will be:
.. code::
json:
[{"path": "/cont/object",
"etag": "etagoftheobjectsegment",
"size_bytes": 10485760,
@ -84,6 +89,42 @@ segments of a SLO manifest can even be other SLO manifests. Treat them as any
other object i.e., use the Etag and Content-Length given on the PUT of the
sub-SLO in the manifest to the parent SLO.
-------------------
Range Specification
-------------------
Users now have the ability to specify ranges for SLO segments.
Users can now include an optional 'range' field in segment descriptions
to specify which bytes from the underlying object should be used for the
segment data. Only one range may be specified per segment.
.. note::
The 'etag' and 'size_bytes' fields still describe the backing object as a
whole.
If a user uploads this manifest:
.. code::
[{"path": "/con/obj_seg_1", "etag": null, "size_bytes": 2097152,
"range": "0-1048576"},
{"path": "/con/obj_seg_2", "etag": null, "size_bytes": 2097152,
"range": "512-1550000"},
{"path": "/con/obj_seg_1", "etag": null, "size_bytes": 2097152,
"range": "-2048"}]
The segment will consist of the first 1048576 bytes of /con/obj_seg_1,
followed by bytes 513 through 1550000 (inclusive) of /con/obj_seg_2, and
finally bytes 2095104 through 2097152 (i.e., the last 2048 bytes) of
/con/obj_seg_1.
.. note::
The minimum sized range is min_segment_size, which by
default is 1048576 (1MB).
-------------------------
Retrieving a Large Object
-------------------------
@ -184,32 +225,132 @@ DEFAULT_MAX_MANIFEST_SEGMENTS = 1000
DEFAULT_MAX_MANIFEST_SIZE = 1024 * 1024 * 2 # 2 MiB
def parse_input(raw_data):
REQUIRED_SLO_KEYS = set(['path', 'etag', 'size_bytes'])
OPTIONAL_SLO_KEYS = set(['range'])
ALLOWED_SLO_KEYS = REQUIRED_SLO_KEYS | OPTIONAL_SLO_KEYS
def parse_and_validate_input(req_body, req_path, min_segment_size):
"""
Given a request will parse the body and return a list of dictionaries
:raises: HTTPException on parse errors
Given a request body, parses it and returns a list of dictionaries.
The output structure is nearly the same as the input structure, but it
is not an exact copy. Given a valid input dictionary `d_in`, its
corresponding output dictionary `d_out` will be as follows:
* d_out['etag'] == d_in['etag']
* d_out['path'] == d_in['path']
* d_in['size_bytes'] can be a string ("12") or an integer (12), but
d_out['size_bytes'] is an integer.
* (optional) d_in['range'] is a string of the form "M-N", "M-", or
"-N", where M and N are non-negative integers. d_out['range'] is the
corresponding swob.Range object. If d_in does not have a key
'range', neither will d_out.
:raises: HTTPException on parse errors or semantic errors (e.g. bogus
JSON structure, syntactically invalid ranges)
:returns: a list of dictionaries on success
"""
try:
parsed_data = json.loads(raw_data)
parsed_data = json.loads(req_body)
except ValueError:
raise HTTPBadRequest("Manifest must be valid json.")
raise HTTPBadRequest("Manifest must be valid JSON.\n")
req_keys = set(['path', 'etag', 'size_bytes'])
opt_keys = set(['range'])
try:
for seg_dict in parsed_data:
if (not (req_keys <= set(seg_dict) <= req_keys | opt_keys) or
'/' not in seg_dict['path'].lstrip('/')):
raise HTTPBadRequest('Invalid SLO Manifest File')
if not isinstance(parsed_data, list):
raise HTTPBadRequest("Manifest must be a list.\n")
if seg_dict.get('range'):
try:
seg_dict['range'] = Range('bytes=%s' % seg_dict['range'])
except ValueError:
raise HTTPBadRequest('Invalid SLO Manifest File')
except (AttributeError, TypeError):
raise HTTPBadRequest('Invalid SLO Manifest File')
# If we got here, req_path refers to an object, so this won't ever raise
# ValueError.
vrs, account, _junk = split_path(req_path, 3, 3, True)
errors = []
num_segs = len(parsed_data)
for seg_index, seg_dict in enumerate(parsed_data):
if not isinstance(seg_dict, dict):
errors.append("Index %d: not a JSON object" % seg_index)
continue
missing_keys = [k for k in REQUIRED_SLO_KEYS if k not in seg_dict]
if missing_keys:
errors.append(
"Index %d: missing keys %s"
% (seg_index,
", ".join('"%s"' % (mk,) for mk in sorted(missing_keys))))
continue
extraneous_keys = [k for k in seg_dict if k not in ALLOWED_SLO_KEYS]
if extraneous_keys:
errors.append(
"Index %d: extraneous keys %s"
% (seg_index,
", ".join('"%s"' % (ek,)
for ek in sorted(extraneous_keys))))
continue
if not isinstance(seg_dict['path'], basestring):
errors.append("Index %d: \"path\" must be a string" % seg_index)
continue
if not (seg_dict['etag'] is None or
isinstance(seg_dict['etag'], basestring)):
errors.append(
"Index %d: \"etag\" must be a string or null" % seg_index)
continue
if '/' not in seg_dict['path'].strip('/'):
errors.append(
"Index %d: path does not refer to an object. Path must be of "
"the form /container/object." % seg_index)
continue
seg_size = seg_dict['size_bytes']
if seg_size is not None:
try:
seg_size = int(seg_size)
seg_dict['size_bytes'] = seg_size
except (TypeError, ValueError):
errors.append("Index %d: invalid size_bytes" % seg_index)
continue
if (seg_size < min_segment_size and seg_index < num_segs - 1):
errors.append("Index %d: too small; each segment, except "
"the last, must be at least %d bytes."
% (seg_index, min_segment_size))
continue
obj_path = '/'.join(['', vrs, account, seg_dict['path'].lstrip('/')])
if req_path == quote(obj_path):
errors.append(
"Index %d: manifest must not include itself as a segment"
% seg_index)
continue
if seg_dict.get('range'):
try:
seg_dict['range'] = Range('bytes=%s' % seg_dict['range'])
except ValueError:
errors.append("Index %d: invalid range" % seg_index)
continue
if len(seg_dict['range'].ranges) > 1:
errors.append("Index %d: multiple ranges (only one allowed)"
% seg_index)
continue
# If the user *told* us the object's size, we can check range
# satisfiability right now. If they lied about the size, we'll
# fail that validation later.
if (seg_size is not None and
len(seg_dict['range'].ranges_for_length(seg_size)) != 1):
errors.append("Index %d: unsatisfiable range" % seg_index)
continue
if errors:
error_message = "".join(e + "\n" for e in errors)
raise HTTPBadRequest(error_message,
headers={"Content-Type": "text/plain"})
return parsed_data
@ -639,7 +780,9 @@ class StaticLargeObject(object):
if req.content_length is None and \
req.headers.get('transfer-encoding', '').lower() != 'chunked':
raise HTTPLengthRequired(request=req)
parsed_data = parse_input(req.body_file.read(self.max_manifest_size))
parsed_data = parse_and_validate_input(
req.body_file.read(self.max_manifest_size),
req.path, self.min_segment_size)
problem_segments = []
if len(parsed_data) > self.max_manifest_segments:
@ -658,23 +801,6 @@ class StaticLargeObject(object):
if isinstance(obj_name, six.text_type):
obj_name = obj_name.encode('utf-8')
obj_path = '/'.join(['', vrs, account, obj_name.lstrip('/')])
if req.path == quote(obj_path):
raise HTTPConflict(
'Manifest object name "%s" '
'cannot be included in the manifest'
% obj_name)
try:
seg_size = int(seg_dict['size_bytes'])
except (ValueError, TypeError):
if seg_dict['size_bytes'] is None:
seg_size = None
else:
raise HTTPBadRequest('Invalid Manifest File')
if seg_size is not None and seg_size < self.min_segment_size and \
index < len(parsed_data) - 1:
raise HTTPBadRequest(
'Each segment, except the last, must be at least '
'%d bytes.' % self.min_segment_size)
new_env = req.environ.copy()
new_env['PATH_INFO'] = obj_path
@ -693,34 +819,35 @@ class StaticLargeObject(object):
if head_seg_resp.is_success:
segment_length = head_seg_resp.content_length
if seg_dict.get('range'):
# Since we now know the length, we can normalize the ranges
# Since we now know the length, we can normalize the
# range. We know that there is exactly one range
# requested since we checked that earlier in
# parse_and_validate_input().
ranges = seg_dict['range'].ranges_for_length(
head_seg_resp.content_length)
if not ranges:
problem_segments.append([quote(obj_name),
'Unsatisfiable Range'])
elif len(ranges) > 1:
problem_segments.append([quote(obj_name),
'Multiple Ranges'])
elif ranges == [(0, head_seg_resp.content_length)]:
# Just one range, and it exactly matches the object.
# Why'd we do this again?
seg_dict['range'] = None
del seg_dict['range']
segment_length = head_seg_resp.content_length
else:
range = ranges[0]
seg_dict['range'] = '%d-%d' % (range[0], range[1] - 1)
segment_length = range[1] - range[0]
rng = ranges[0]
seg_dict['range'] = '%d-%d' % (rng[0], rng[1] - 1)
segment_length = rng[1] - rng[0]
if segment_length < self.min_segment_size and \
index < len(parsed_data) - 1:
raise HTTPBadRequest(
'Each segment, except the last, must be at least '
'%d bytes.' % self.min_segment_size)
problem_segments.append(
[quote(obj_name),
'Too small; each segment, except the last, must be '
'at least %d bytes.' % self.min_segment_size])
total_size += segment_length
if seg_size is not None and \
seg_size != head_seg_resp.content_length:
if seg_dict['size_bytes'] is not None and \
seg_dict['size_bytes'] != head_seg_resp.content_length:
problem_segments.append([quote(obj_name), 'Size Mismatch'])
if seg_dict['etag'] is None or \
seg_dict['etag'] == head_seg_resp.etag:

View File

@ -92,6 +92,7 @@ Example usage of this middleware via ``swift``:
Turn on listings::
swift post -r '.r:*,.rlistings' container
swift post -m 'web-listings: true' container
Now you should see object listings for paths and pseudo paths that have no
@ -121,8 +122,8 @@ import json
import time
from swift.common.utils import human_readable, split_path, config_true_value, \
quote, register_swift_info
from swift.common.wsgi import make_pre_authed_env, WSGIContext
quote, register_swift_info, get_logger
from swift.common.wsgi import make_env, WSGIContext
from swift.common.http import is_success, is_redirection, HTTP_NOT_FOUND
from swift.common.swob import Response, HTTPMovedPermanently, HTTPNotFound
from swift.proxy.controllers.base import get_container_info
@ -167,7 +168,7 @@ class _StaticWebContext(WSGIContext):
save_response_status = self._response_status
save_response_headers = self._response_headers
save_response_exc_info = self._response_exc_info
resp = self._app_call(make_pre_authed_env(
resp = self._app_call(make_env(
env, 'GET', '/%s/%s/%s/%s%s' % (
self.version, self.account, self.container,
self._get_status_int(), self._error),
@ -236,7 +237,7 @@ class _StaticWebContext(WSGIContext):
body += ' </body>\n</html>\n'
resp = HTTPNotFound(body=body)(env, self._start_response)
return self._error_response(resp, env, start_response)
tmp_env = make_pre_authed_env(
tmp_env = make_env(
env, 'GET', '/%s/%s/%s' % (
self.version, self.account, self.container),
self.agent, swift_source='SW')
@ -429,7 +430,7 @@ class _StaticWebContext(WSGIContext):
return resp
if status_int == HTTP_NOT_FOUND:
if env['PATH_INFO'][-1] != '/':
tmp_env = make_pre_authed_env(
tmp_env = make_env(
env, 'GET', '/%s/%s/%s' % (
self.version, self.account, self.container),
self.agent, swift_source='SW')
@ -463,6 +464,7 @@ class StaticWeb(object):
self.app = app
#: The filter configuration dict.
self.conf = conf
self.logger = get_logger(conf, log_route='staticweb')
def __call__(self, env, start_response):
"""
@ -472,6 +474,11 @@ class StaticWeb(object):
:param start_response: The WSGI start_response hook.
"""
env['staticweb.start_time'] = time.time()
if 'swift.authorize' not in env:
self.logger.warning(
'No authentication middleware authorized request yet. '
'Skipping staticweb')
return self.app(env, start_response)
try:
(version, account, container, obj) = \
split_path(env['PATH_INFO'], 2, 4, True)

View File

@ -23,6 +23,7 @@ import hmac
import base64
from eventlet import Timeout
import six
from six.moves.urllib.parse import unquote
from swift.common.swob import Response, Request
from swift.common.swob import HTTPBadRequest, HTTPForbidden, HTTPNotFound, \
@ -71,16 +72,16 @@ class TempAuth(object):
The reseller prefix specifies which parts of the account namespace this
middleware is responsible for managing authentication and authorization.
By default, the prefix is AUTH so accounts and tokens are prefixed
by AUTH_. When a request's token and/or path start with AUTH_, this
By default, the prefix is 'AUTH' so accounts and tokens are prefixed
by 'AUTH\_'. When a request's token and/or path start with 'AUTH\_', this
middleware knows it is responsible.
We allow the reseller prefix to be a list. In tempauth, the first item
in the list is used as the prefix for tokens and user groups. The
other prefixes provide alternate accounts that user's can access. For
example if the reseller prefix list is 'AUTH, OTHER', a user with
admin access to AUTH_account also has admin access to
OTHER_account.
admin access to 'AUTH_account' also has admin access to
'OTHER_account'.
Required Group:
@ -98,7 +99,7 @@ class TempAuth(object):
is not processed.
The X-Service-Token is useful when combined with multiple reseller prefix
items. In the following configuration, accounts prefixed SERVICE_
items. In the following configuration, accounts prefixed 'SERVICE\_'
are only accessible if X-Auth-Token is from the end-user and
X-Service-Token is from the ``glance`` user::
@ -460,7 +461,7 @@ class TempAuth(object):
if not isinstance(result[key], list):
return "Value for key '%s' must be a list" % key
for grantee in result[key]:
if not isinstance(grantee, basestring):
if not isinstance(grantee, six.string_types):
return "Elements of '%s' list must be strings" % key
# Everything looks fine, no errors found

View File

@ -27,6 +27,7 @@ import warnings
from array import array
from collections import defaultdict
import six
from six.moves import range
from time import time
@ -395,9 +396,11 @@ class RingBuilder(object):
below 1% or doesn't change by more than 1% (only happens with ring that
can't be balanced no matter what).
:returns: (number_of_partitions_altered, resulting_balance)
:returns: (number_of_partitions_altered, resulting_balance,
number_of_removed_devices)
"""
num_devices = len([d for d in self._iter_devs() if d['weight'] > 0])
removed_devs = 0
if num_devices < self.replicas:
warnings.warn(RingValidationWarning(
"Replica count of %(replicas)s requires more "
@ -424,7 +427,7 @@ class RingBuilder(object):
self._initial_balance()
self.devs_changed = False
self._build_dispersion_graph()
return self.parts, self.get_balance()
return self.parts, self.get_balance(), removed_devs
changed_parts = 0
self._update_last_part_moves()
last_balance = 0
@ -447,6 +450,7 @@ class RingBuilder(object):
remove_dev_id = self._remove_devs.pop()['id']
self.logger.debug("Removing dev %d", remove_dev_id)
self.devs[remove_dev_id] = None
removed_devs += 1
balance = self.get_balance()
if balance < 1 or abs(last_balance - balance) < 1 or \
changed_parts == self.parts:
@ -456,7 +460,7 @@ class RingBuilder(object):
self.version += 1
changed_parts = self._build_dispersion_graph(old_replica2part2dev)
return changed_parts, balance
return changed_parts, balance, removed_devs
def _build_dispersion_graph(self, old_replica2part2dev=None):
"""
@ -501,7 +505,7 @@ class RingBuilder(object):
dispersion_graph = {}
# go over all the devices holding each replica part by part
for part_id, dev_ids in enumerate(
itertools.izip(*self._replica2part2dev)):
six.moves.zip(*self._replica2part2dev)):
# count the number of replicas of this part for each tier of each
# device, some devices may have overlapping tiers!
replicas_at_tier = defaultdict(int)

View File

@ -1324,9 +1324,9 @@ class Response(object):
if self.status_int in RESPONSE_REASONS:
title, exp = RESPONSE_REASONS[self.status_int]
if exp:
body = '<html><h1>%s</h1><p>%s</p></html>' % (title, exp)
if '%(' in body:
body = body % defaultdict(lambda: 'unknown', self.__dict__)
body = '<html><h1>%s</h1><p>%s</p></html>' % (
title,
exp % defaultdict(lambda: 'unknown', self.__dict__))
self.content_length = len(body)
return [body]
return ['']

View File

@ -249,7 +249,7 @@ def backward(f, blocksize=4096):
f.seek(0, os.SEEK_END)
if f.tell() == 0:
return
last_row = ''
last_row = b''
while f.tell() != 0:
try:
f.seek(-blocksize, os.SEEK_CUR)
@ -258,7 +258,7 @@ def backward(f, blocksize=4096):
f.seek(-blocksize, os.SEEK_CUR)
block = f.read(blocksize)
f.seek(-blocksize, os.SEEK_CUR)
rows = block.split('\n')
rows = block.split(b'\n')
rows[-1] = rows[-1] + last_row
while rows:
last_row = rows.pop(-1)
@ -1739,7 +1739,7 @@ def expand_ipv6(address):
def whataremyips(bind_ip=None):
"""
Get "our" IP addresses ("us" being the set of services configured by
one *.conf file). If our REST listens on a specific address, return it.
one `*.conf` file). If our REST listens on a specific address, return it.
Otherwise, if listen on '0.0.0.0' or '::' return all addresses, including
the loopback.
@ -3078,15 +3078,15 @@ class ThreadPool(object):
def run_in_thread(self, func, *args, **kwargs):
"""
Runs func(*args, **kwargs) in a thread. Blocks the current greenlet
Runs ``func(*args, **kwargs)`` in a thread. Blocks the current greenlet
until results are available.
Exceptions thrown will be reraised in the calling thread.
If the threadpool was initialized with nthreads=0, it invokes
func(*args, **kwargs) directly, followed by eventlet.sleep() to ensure
the eventlet hub has a chance to execute. It is more likely the hub
will be invoked when queuing operations to an external thread.
``func(*args, **kwargs)`` directly, followed by eventlet.sleep() to
ensure the eventlet hub has a chance to execute. It is more likely the
hub will be invoked when queuing operations to an external thread.
:returns: result of calling func
:raises: whatever func raises
@ -3126,7 +3126,7 @@ class ThreadPool(object):
def force_run_in_thread(self, func, *args, **kwargs):
"""
Runs func(*args, **kwargs) in a thread. Blocks the current greenlet
Runs ``func(*args, **kwargs)`` in a thread. Blocks the current greenlet
until results are available.
Exceptions thrown will be reraised in the calling thread.

View File

@ -597,6 +597,8 @@ class PortPidState(object):
def port_index_pairs(self):
"""
Returns current (port, server index) pairs.
:returns: A set of (port, server_idx) tuples for currently-tracked
ports, sockets, and PIDs.
"""
@ -711,6 +713,8 @@ class ServersPerPortStrategy(object):
def loop_timeout(self):
"""
Return timeout before checking for reloaded rings.
:returns: The time to wait for a child to exit before checking for
reloaded rings (new ports).
"""

View File

@ -86,7 +86,7 @@ class ContainerController(BaseStorageServer):
self.log_requests = config_true_value(conf.get('log_requests', 'true'))
self.root = conf.get('devices', '/srv/node')
self.mount_check = config_true_value(conf.get('mount_check', 'true'))
self.node_timeout = int(conf.get('node_timeout', 3))
self.node_timeout = float(conf.get('node_timeout', 3))
self.conn_timeout = float(conf.get('conn_timeout', 0.5))
#: ContainerSyncCluster instance for validating sync-to values.
self.realms_conf = ContainerSyncRealms(

View File

@ -49,7 +49,7 @@ class ContainerUpdater(Daemon):
self.account_ring = None
self.concurrency = int(conf.get('concurrency', 4))
self.slowdown = float(conf.get('slowdown', 0.01))
self.node_timeout = int(conf.get('node_timeout', 3))
self.node_timeout = float(conf.get('node_timeout', 3))
self.conn_timeout = float(conf.get('conn_timeout', 0.5))
self.no_changes = 0
self.successes = 0

View File

@ -10,14 +10,13 @@
# Tom Cocozzello <tjcocozz@us.ibm.com>, 2015. #zanata
msgid ""
msgstr ""
"Project-Id-Version: swift 2.4.1.dev48\n"
"Project-Id-Version: swift 2.5.1.dev70\n"
"Report-Msgid-Bugs-To: EMAIL@ADDRESS\n"
"POT-Creation-Date: 2015-09-28 06:27+0000\n"
"POT-Creation-Date: 2015-10-23 06:34+0000\n"
"PO-Revision-Date: 2015-08-11 11:22+0000\n"
"Last-Translator: openstackjenkins <jenkins@openstack.org>\n"
"Language: de\n"
"Language-Team: German (http://www.transifex.com/openstack/swift/language/"
"de/)\n"
"Language-Team: German\n"
"Plural-Forms: nplurals=2; plural=(n != 1)\n"
"MIME-Version: 1.0\n"
"Content-Type: text/plain; charset=utf-8\n"

View File

@ -8,14 +8,13 @@
# Tom Cocozzello <tjcocozz@us.ibm.com>, 2015. #zanata
msgid ""
msgstr ""
"Project-Id-Version: swift 2.4.1.dev48\n"
"Project-Id-Version: swift 2.5.1.dev70\n"
"Report-Msgid-Bugs-To: EMAIL@ADDRESS\n"
"POT-Creation-Date: 2015-09-28 06:27+0000\n"
"POT-Creation-Date: 2015-10-23 06:34+0000\n"
"PO-Revision-Date: 2015-09-09 05:36+0000\n"
"Last-Translator: Carlos A. Muñoz <camunoz@redhat.com>\n"
"Language: es\n"
"Language-Team: Spanish (http://www.transifex.com/openstack/swift/language/"
"es/)\n"
"Language-Team: Spanish\n"
"Plural-Forms: nplurals=2; plural=(n != 1)\n"
"MIME-Version: 1.0\n"
"Content-Type: text/plain; charset=utf-8\n"

View File

@ -8,14 +8,13 @@
# Tom Cocozzello <tjcocozz@us.ibm.com>, 2015. #zanata
msgid ""
msgstr ""
"Project-Id-Version: swift 2.4.1.dev48\n"
"Project-Id-Version: swift 2.5.1.dev70\n"
"Report-Msgid-Bugs-To: EMAIL@ADDRESS\n"
"POT-Creation-Date: 2015-09-28 06:27+0000\n"
"POT-Creation-Date: 2015-10-23 06:34+0000\n"
"PO-Revision-Date: 2015-08-11 11:22+0000\n"
"Last-Translator: openstackjenkins <jenkins@openstack.org>\n"
"Language: fr\n"
"Language-Team: French (http://www.transifex.com/openstack/swift/language/"
"fr/)\n"
"Language-Team: French\n"
"Plural-Forms: nplurals=2; plural=(n > 1)\n"
"MIME-Version: 1.0\n"
"Content-Type: text/plain; charset=utf-8\n"

View File

@ -7,14 +7,13 @@
# Tom Cocozzello <tjcocozz@us.ibm.com>, 2015. #zanata
msgid ""
msgstr ""
"Project-Id-Version: swift 2.4.1.dev48\n"
"Project-Id-Version: swift 2.5.1.dev70\n"
"Report-Msgid-Bugs-To: EMAIL@ADDRESS\n"
"POT-Creation-Date: 2015-09-28 06:27+0000\n"
"POT-Creation-Date: 2015-10-23 06:34+0000\n"
"PO-Revision-Date: 2015-08-11 11:22+0000\n"
"Last-Translator: openstackjenkins <jenkins@openstack.org>\n"
"Language: it\n"
"Language-Team: Italian (http://www.transifex.com/openstack/swift/language/"
"it/)\n"
"Language-Team: Italian\n"
"Plural-Forms: nplurals=2; plural=(n != 1)\n"
"MIME-Version: 1.0\n"
"Content-Type: text/plain; charset=utf-8\n"

View File

@ -9,14 +9,13 @@
# Tom Cocozzello <tjcocozz@us.ibm.com>, 2015. #zanata
msgid ""
msgstr ""
"Project-Id-Version: swift 2.4.1.dev48\n"
"Project-Id-Version: swift 2.5.1.dev70\n"
"Report-Msgid-Bugs-To: EMAIL@ADDRESS\n"
"POT-Creation-Date: 2015-09-28 06:27+0000\n"
"POT-Creation-Date: 2015-10-23 06:34+0000\n"
"PO-Revision-Date: 2015-09-26 09:26+0000\n"
"Last-Translator: Akihiro Motoki <amotoki@gmail.com>\n"
"Language: ja\n"
"Language-Team: Japanese (http://www.transifex.com/openstack/swift/language/"
"ja/)\n"
"Language-Team: Japanese\n"
"Plural-Forms: nplurals=1; plural=0\n"
"MIME-Version: 1.0\n"
"Content-Type: text/plain; charset=utf-8\n"

View File

@ -9,14 +9,13 @@
# OpenStack Infra <zanata@openstack.org>, 2015. #zanata
msgid ""
msgstr ""
"Project-Id-Version: swift 2.4.1.dev48\n"
"Project-Id-Version: swift 2.5.1.dev70\n"
"Report-Msgid-Bugs-To: EMAIL@ADDRESS\n"
"POT-Creation-Date: 2015-09-28 06:27+0000\n"
"POT-Creation-Date: 2015-10-23 06:34+0000\n"
"PO-Revision-Date: 2015-09-09 05:10+0000\n"
"Last-Translator: Ying Chun Guo <daisy.ycguo@gmail.com>\n"
"Language: ko_KR\n"
"Language-Team: Korean (Korea) (http://www.transifex.com/openstack/swift/"
"language/ko_KR/)\n"
"Language-Team: Korean (South Korea)\n"
"Plural-Forms: nplurals=1; plural=0\n"
"MIME-Version: 1.0\n"
"Content-Type: text/plain; charset=utf-8\n"

View File

@ -11,14 +11,13 @@
# OpenStack Infra <zanata@openstack.org>, 2015. #zanata
msgid ""
msgstr ""
"Project-Id-Version: swift 2.4.1.dev48\n"
"Project-Id-Version: swift 2.5.1.dev70\n"
"Report-Msgid-Bugs-To: EMAIL@ADDRESS\n"
"POT-Creation-Date: 2015-09-28 06:27+0000\n"
"POT-Creation-Date: 2015-10-23 06:34+0000\n"
"PO-Revision-Date: 2015-08-11 11:22+0000\n"
"Last-Translator: openstackjenkins <jenkins@openstack.org>\n"
"Language: pt_BR\n"
"Language-Team: Portuguese (Brazil) (http://www.transifex.com/openstack/swift/"
"language/pt_BR/)\n"
"Language-Team: Portuguese (Brazil)\n"
"Plural-Forms: nplurals=2; plural=(n > 1)\n"
"MIME-Version: 1.0\n"
"Content-Type: text/plain; charset=utf-8\n"

View File

@ -7,14 +7,13 @@
# OpenStack Infra <zanata@openstack.org>, 2015. #zanata
msgid ""
msgstr ""
"Project-Id-Version: swift 2.4.1.dev48\n"
"Project-Id-Version: swift 2.5.1.dev70\n"
"Report-Msgid-Bugs-To: EMAIL@ADDRESS\n"
"POT-Creation-Date: 2015-09-28 06:27+0000\n"
"POT-Creation-Date: 2015-10-23 06:34+0000\n"
"PO-Revision-Date: 2015-08-11 11:22+0000\n"
"Last-Translator: openstackjenkins <jenkins@openstack.org>\n"
"Language: ru\n"
"Language-Team: Russian (http://www.transifex.com/openstack/swift/language/"
"ru/)\n"
"Language-Team: Russian\n"
"Plural-Forms: nplurals=4; plural=(n%10==1 && n%100!=11 ? 0 : n%10>=2 && n"
"%10<=4 && (n%100<12 || n%100>14) ? 1 : n%10==0 || (n%10>=5 && n%10<=9) || (n"
"%100>=11 && n%100<=14)? 2 : 3)\n"

View File

@ -1,19 +0,0 @@
# Translations template for swift.
# Copyright (C) 2015 ORGANIZATION
# This file is distributed under the same license as the swift project.
# FIRST AUTHOR <EMAIL@ADDRESS>, 2015.
#
#, fuzzy
msgid ""
msgstr ""
"Project-Id-Version: swift 2.3.1.dev213\n"
"Report-Msgid-Bugs-To: EMAIL@ADDRESS\n"
"POT-Creation-Date: 2015-07-29 06:35+0000\n"
"PO-Revision-Date: YEAR-MO-DA HO:MI+ZONE\n"
"Last-Translator: FULL NAME <EMAIL@ADDRESS>\n"
"Language-Team: LANGUAGE <LL@li.org>\n"
"MIME-Version: 1.0\n"
"Content-Type: text/plain; charset=utf-8\n"
"Content-Transfer-Encoding: 8bit\n"
"Generated-By: Babel 2.0\n"

View File

@ -1,19 +0,0 @@
# Translations template for swift.
# Copyright (C) 2015 ORGANIZATION
# This file is distributed under the same license as the swift project.
# FIRST AUTHOR <EMAIL@ADDRESS>, 2015.
#
#, fuzzy
msgid ""
msgstr ""
"Project-Id-Version: swift 2.3.1.dev213\n"
"Report-Msgid-Bugs-To: EMAIL@ADDRESS\n"
"POT-Creation-Date: 2015-07-29 06:35+0000\n"
"PO-Revision-Date: YEAR-MO-DA HO:MI+ZONE\n"
"Last-Translator: FULL NAME <EMAIL@ADDRESS>\n"
"Language-Team: LANGUAGE <LL@li.org>\n"
"MIME-Version: 1.0\n"
"Content-Type: text/plain; charset=utf-8\n"
"Content-Transfer-Encoding: 8bit\n"
"Generated-By: Babel 2.0\n"

View File

@ -1,19 +0,0 @@
# Translations template for swift.
# Copyright (C) 2015 ORGANIZATION
# This file is distributed under the same license as the swift project.
# FIRST AUTHOR <EMAIL@ADDRESS>, 2015.
#
#, fuzzy
msgid ""
msgstr ""
"Project-Id-Version: swift 2.3.1.dev213\n"
"Report-Msgid-Bugs-To: EMAIL@ADDRESS\n"
"POT-Creation-Date: 2015-07-29 06:35+0000\n"
"PO-Revision-Date: YEAR-MO-DA HO:MI+ZONE\n"
"Last-Translator: FULL NAME <EMAIL@ADDRESS>\n"
"Language-Team: LANGUAGE <LL@li.org>\n"
"MIME-Version: 1.0\n"
"Content-Type: text/plain; charset=utf-8\n"
"Content-Transfer-Encoding: 8bit\n"
"Generated-By: Babel 2.0\n"

View File

@ -1,19 +0,0 @@
# Translations template for swift.
# Copyright (C) 2015 ORGANIZATION
# This file is distributed under the same license as the swift project.
# FIRST AUTHOR <EMAIL@ADDRESS>, 2015.
#
#, fuzzy
msgid ""
msgstr ""
"Project-Id-Version: swift 2.3.1.dev213\n"
"Report-Msgid-Bugs-To: EMAIL@ADDRESS\n"
"POT-Creation-Date: 2015-07-29 06:35+0000\n"
"PO-Revision-Date: YEAR-MO-DA HO:MI+ZONE\n"
"Last-Translator: FULL NAME <EMAIL@ADDRESS>\n"
"Language-Team: LANGUAGE <LL@li.org>\n"
"MIME-Version: 1.0\n"
"Content-Type: text/plain; charset=utf-8\n"
"Content-Transfer-Encoding: 8bit\n"
"Generated-By: Babel 2.0\n"

View File

@ -7,14 +7,13 @@
# OpenStack Infra <zanata@openstack.org>, 2015. #zanata
msgid ""
msgstr ""
"Project-Id-Version: swift 2.4.1.dev48\n"
"Project-Id-Version: swift 2.5.1.dev70\n"
"Report-Msgid-Bugs-To: EMAIL@ADDRESS\n"
"POT-Creation-Date: 2015-09-28 06:27+0000\n"
"POT-Creation-Date: 2015-10-23 06:34+0000\n"
"PO-Revision-Date: 2015-09-04 07:42+0000\n"
"Last-Translator: İşbaran Akçayır <isbaran@gmail.com>\n"
"Language: tr_TR\n"
"Language-Team: Turkish (Turkey) (http://www.transifex.com/openstack/swift/"
"language/tr_TR/)\n"
"Language-Team: Turkish (Turkey)\n"
"Plural-Forms: nplurals=1; plural=0\n"
"MIME-Version: 1.0\n"
"Content-Type: text/plain; charset=utf-8\n"

View File

@ -8,14 +8,13 @@
# OpenStack Infra <zanata@openstack.org>, 2015. #zanata
msgid ""
msgstr ""
"Project-Id-Version: swift 2.4.1.dev48\n"
"Project-Id-Version: swift 2.5.1.dev70\n"
"Report-Msgid-Bugs-To: EMAIL@ADDRESS\n"
"POT-Creation-Date: 2015-09-28 06:27+0000\n"
"POT-Creation-Date: 2015-10-23 06:34+0000\n"
"PO-Revision-Date: 2015-08-11 11:22+0000\n"
"Last-Translator: openstackjenkins <jenkins@openstack.org>\n"
"Language: zh_Hans_CN\n"
"Language-Team: Chinese (China) (http://www.transifex.com/openstack/swift/"
"language/zh_CN/)\n"
"Language-Team: Chinese (China)\n"
"Plural-Forms: nplurals=1; plural=0\n"
"MIME-Version: 1.0\n"
"Content-Type: text/plain; charset=utf-8\n"

View File

@ -7,14 +7,13 @@
# OpenStack Infra <zanata@openstack.org>, 2015. #zanata
msgid ""
msgstr ""
"Project-Id-Version: swift 2.4.1.dev48\n"
"Project-Id-Version: swift 2.5.1.dev70\n"
"Report-Msgid-Bugs-To: EMAIL@ADDRESS\n"
"POT-Creation-Date: 2015-09-28 06:27+0000\n"
"POT-Creation-Date: 2015-10-23 06:34+0000\n"
"PO-Revision-Date: 2015-08-11 11:22+0000\n"
"Last-Translator: openstackjenkins <jenkins@openstack.org>\n"
"Language: zh_Hant_TW\n"
"Language-Team: Chinese (Taiwan) (http://www.transifex.com/openstack/swift/"
"language/zh_TW/)\n"
"Language-Team: Chinese (Taiwan)\n"
"Plural-Forms: nplurals=1; plural=0\n"
"MIME-Version: 1.0\n"
"Content-Type: text/plain; charset=utf-8\n"

View File

@ -447,7 +447,7 @@ class BaseDiskFileManager(object):
Parse an on disk file name.
:param filename: the data file name including extension
:returns: a dict, with keys for timestamp, and ext::
:returns: a dict, with keys for timestamp, and ext:
* timestamp is a :class:`~swift.common.utils.Timestamp`
* ext is a string, the file extension including the leading dot or
@ -895,8 +895,10 @@ class BaseDiskFileManager(object):
be yielded.
timestamps is a dict which may contain items mapping:
ts_data -> timestamp of data or tombstone file,
ts_meta -> timestamp of meta file, if one exists
where timestamps are instances of
:class:`~swift.common.utils.Timestamp`
"""
@ -1961,7 +1963,7 @@ class DiskFileManager(BaseDiskFileManager):
Returns the timestamp extracted .data file name.
:param filename: the data file name including extension
:returns: a dict, with keys for timestamp, and ext::
:returns: a dict, with keys for timestamp, and ext:
* timestamp is a :class:`~swift.common.utils.Timestamp`
* ext is a string, the file extension including the leading dot or
@ -2241,12 +2243,12 @@ class ECDiskFileManager(BaseDiskFileManager):
be stripped off to retrieve the timestamp.
:param filename: the data file name including extension
:returns: a dict, with keys for timestamp, frag_index, and ext::
:returns: a dict, with keys for timestamp, frag_index, and ext:
* timestamp is a :class:`~swift.common.utils.Timestamp`
* frag_index is an int or None
* ext is a string, the file extension including the leading dot or
the empty string if the filename has no extenstion.
the empty string if the filename has no extension.
:raises DiskFileError: if any part of the filename is not able to be
validated.

View File

@ -19,6 +19,7 @@ import random
import time
import itertools
from collections import defaultdict
import six
import six.moves.cPickle as pickle
import shutil
@ -799,7 +800,7 @@ class ObjectReconstructor(Daemon):
self._diskfile_mgr = self._df_router[policy]
self.load_object_ring(policy)
data_dir = get_data_dir(policy)
local_devices = list(itertools.ifilter(
local_devices = list(six.moves.filter(
lambda dev: dev and is_local_device(
ips, self.port,
dev['replication_ip'], dev['replication_port']),

View File

@ -895,7 +895,10 @@ class ObjectController(BaseStorageServer):
container, obj, request, device,
policy)
if orig_timestamp < req_timestamp:
disk_file.delete(req_timestamp)
try:
disk_file.delete(req_timestamp)
except DiskFileNoSpace:
return HTTPInsufficientStorage(drive=device, request=request)
self.container_update(
'DELETE', account, container, obj, request,
HeaderKeyDict({'x-timestamp': req_timestamp.internal}),

View File

@ -13,9 +13,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import six
from six.moves import urllib
from itertools import ifilter
from swift.common import bufferedhttp
from swift.common import exceptions
from swift.common import http
@ -266,7 +266,7 @@ class Sender(object):
self.job['policy'], self.suffixes,
frag_index=self.job.get('frag_index'))
if self.remote_check_objs is not None:
hash_gen = ifilter(
hash_gen = six.moves.filter(
lambda path_objhash_timestamps:
path_objhash_timestamps[1] in
self.remote_check_objs, hash_gen)

View File

@ -48,7 +48,7 @@ class ObjectUpdater(Daemon):
self.container_ring = None
self.concurrency = int(conf.get('concurrency', 1))
self.slowdown = float(conf.get('slowdown', 0.01))
self.node_timeout = int(conf.get('node_timeout', 10))
self.node_timeout = float(conf.get('node_timeout', 10))
self.conn_timeout = float(conf.get('conn_timeout', 0.5))
self.successes = 0
self.failures = 0

View File

@ -317,6 +317,7 @@ def get_account_info(env, app, swift_source=None):
This call bypasses auth. Success does not imply that the request has
authorization to the account.
:raises ValueError: when path can't be split(path, 2, 4)
"""
(version, account, _junk, _junk) = \
@ -919,6 +920,7 @@ class ResumingGetter(object):
if nchunks % 5 == 0:
sleep()
part_iter = None
try:
while True:
start_byte, end_byte, length, headers, part = \
@ -930,9 +932,12 @@ class ResumingGetter(object):
'entity_length': length, 'headers': headers,
'part_iter': part_iter}
self.pop_range()
except GeneratorExit:
if part_iter:
part_iter.close()
raise
except StopIteration:
req.environ['swift.non_client_disconnect'] = True
return
except ChunkReadTimeout:
self.app.exception_occurred(node[0], _('Object'),
@ -1283,7 +1288,7 @@ class Controller(object):
def generate_request_headers(self, orig_req=None, additional=None,
transfer=False):
"""
Create a list of headers to be used in backend requets
Create a list of headers to be used in backend requests
:param orig_req: the original request sent by the client to the proxy
:param additional: additional headers to send to the backend

View File

@ -24,6 +24,7 @@
# These shenanigans are to ensure all related objects can be garbage
# collected. We've seen objects hang around forever otherwise.
import six
from six.moves.urllib.parse import unquote, quote
import collections
@ -67,7 +68,7 @@ from swift.common.swob import HTTPAccepted, HTTPBadRequest, HTTPNotFound, \
HTTPPreconditionFailed, HTTPRequestEntityTooLarge, HTTPRequestTimeout, \
HTTPServerError, HTTPServiceUnavailable, Request, HeaderKeyDict, \
HTTPClientDisconnect, HTTPUnprocessableEntity, Response, HTTPException, \
HTTPRequestedRangeNotSatisfiable, Range
HTTPRequestedRangeNotSatisfiable, Range, HTTPInternalServerError
from swift.common.request_helpers import is_sys_or_user_meta, is_sys_meta, \
copy_header_subset, update_content_type
@ -163,15 +164,15 @@ class BaseObjectController(Controller):
all_nodes = itertools.chain(primary_nodes,
ring.get_more_nodes(partition))
first_n_local_nodes = list(itertools.islice(
itertools.ifilter(is_local, all_nodes), num_locals))
six.moves.filter(is_local, all_nodes), num_locals))
# refresh it; it moved when we computed first_n_local_nodes
all_nodes = itertools.chain(primary_nodes,
ring.get_more_nodes(partition))
local_first_node_iter = itertools.chain(
first_n_local_nodes,
itertools.ifilter(lambda node: node not in first_n_local_nodes,
all_nodes))
six.moves.filter(lambda node: node not in first_n_local_nodes,
all_nodes))
return self.app.iter_nodes(
ring, partition, node_iter=local_first_node_iter)
@ -975,10 +976,15 @@ class ReplicatedObjectController(BaseObjectController):
_('Client disconnected without sending last chunk'))
self.app.logger.increment('client_disconnects')
raise HTTPClientDisconnect(request=req)
except (Exception, Timeout):
except Timeout:
self.app.logger.exception(
_('ERROR Exception causing client disconnect'))
raise HTTPClientDisconnect(request=req)
except Exception:
self.app.logger.exception(
_('ERROR Exception transferring data to object servers %s'),
{'path': req.path})
raise HTTPInternalServerError(request=req)
if req.content_length and bytes_transferred < req.content_length:
req.client_disconnect = True
self.app.logger.warn(
@ -2266,10 +2272,15 @@ class ECObjectController(BaseObjectController):
raise HTTPClientDisconnect(request=req)
except HTTPException:
raise
except (Exception, Timeout):
except Timeout:
self.app.logger.exception(
_('ERROR Exception causing client disconnect'))
raise HTTPClientDisconnect(request=req)
except Exception:
self.app.logger.exception(
_('ERROR Exception transferring data to object servers %s'),
{'path': req.path})
raise HTTPInternalServerError(request=req)
def _have_adequate_responses(
self, statuses, min_responses, conditional_func):

View File

@ -87,8 +87,8 @@ class Application(object):
swift_dir = conf.get('swift_dir', '/etc/swift')
self.swift_dir = swift_dir
self.node_timeout = int(conf.get('node_timeout', 10))
self.recoverable_node_timeout = int(
self.node_timeout = float(conf.get('node_timeout', 10))
self.recoverable_node_timeout = float(
conf.get('recoverable_node_timeout', self.node_timeout))
self.conn_timeout = float(conf.get('conn_timeout', 0.5))
self.client_timeout = int(conf.get('client_timeout', 60))

View File

@ -2440,7 +2440,7 @@ class TestSlo(Base):
def test_slo_overwrite_segment_with_manifest(self):
file_item = self.env.container.file("seg_b")
try:
with self.assertRaises(ResponseError) as catcher:
file_item.write(
json.dumps([
{'size_bytes': 1024 * 1024,
@ -2453,10 +2453,7 @@ class TestSlo(Base):
'etag': hashlib.md5('c' * 1024 * 1024).hexdigest(),
'path': '/%s/%s' % (self.env.container.name, 'seg_c')}]),
parms={'multipart-manifest': 'put'})
except ResponseError as err:
self.assertEqual(409, err.status)
else:
self.fail("Expected ResponseError but didn't get it")
self.assertEqual(400, catcher.exception.status)
def test_slo_copy(self):
file_item = self.env.container.file("manifest-abcde")

View File

@ -41,7 +41,8 @@ import logging.handlers
from six.moves.http_client import HTTPException
from swift.common import constraints, storage_policy
from swift.common.storage_policy import StoragePolicy, ECStoragePolicy
from swift.common.storage_policy import (StoragePolicy, ECStoragePolicy,
VALID_EC_TYPES)
import functools
import six.moves.cPickle as pickle
from gzip import GzipFile
@ -56,6 +57,22 @@ if not os.path.basename(sys.argv[0]).startswith('swift'):
utils.HASH_PATH_SUFFIX = 'endcap'
EC_TYPE_PREFERENCE = [
'liberasurecode_rs_vand',
'jerasure_rs_vand',
]
for eclib_name in EC_TYPE_PREFERENCE:
if eclib_name in VALID_EC_TYPES:
break
else:
raise SystemExit('ERROR: unable to find suitable PyECLib type'
' (none of %r found in %r)' % (
EC_TYPE_PREFERENCE,
VALID_EC_TYPES,
))
DEFAULT_TEST_EC_TYPE = eclib_name
def patch_policies(thing_or_policies=None, legacy_only=False,
with_ec_default=False, fake_ring_args=None):
if isinstance(thing_or_policies, (
@ -70,7 +87,7 @@ def patch_policies(thing_or_policies=None, legacy_only=False,
elif with_ec_default:
default_policies = [
ECStoragePolicy(0, name='ec', is_default=True,
ec_type='jerasure_rs_vand', ec_ndata=10,
ec_type=DEFAULT_TEST_EC_TYPE, ec_ndata=10,
ec_nparity=4, ec_segment_size=4096),
StoragePolicy(1, name='unu'),
]

View File

@ -22,7 +22,6 @@ import unittest
from logging import DEBUG
from mock import patch, call, DEFAULT
from contextlib import nested
import six
from swift.account import reaper
@ -226,6 +225,11 @@ class TestReaper(unittest.TestCase):
def fake_account_ring(self):
return FakeRing()
def test_creation(self):
# later config should be extended to assert more config options
r = reaper.AccountReaper({'node_timeout': '3.5'})
self.assertEqual(r.node_timeout, 3.5)
def test_delay_reaping_conf_default(self):
r = reaper.AccountReaper({})
self.assertEqual(r.delay_reaping, 0)
@ -415,15 +419,14 @@ class TestReaper(unittest.TestCase):
self.reap_obj_fail = False
self.amount_delete_fail = 0
self.max_delete_fail = 0
ctx = [patch('swift.account.reaper.direct_get_container',
self.fake_direct_get_container),
patch('swift.account.reaper.direct_delete_container',
self.fake_direct_delete_container),
patch('swift.account.reaper.AccountReaper.get_container_ring',
self.fake_container_ring),
patch('swift.account.reaper.AccountReaper.reap_object',
self.fake_reap_object)]
with nested(*ctx):
with patch('swift.account.reaper.direct_get_container',
self.fake_direct_get_container), \
patch('swift.account.reaper.direct_delete_container',
self.fake_direct_delete_container), \
patch('swift.account.reaper.AccountReaper.get_container_ring',
self.fake_container_ring), \
patch('swift.account.reaper.AccountReaper.reap_object',
self.fake_reap_object):
r.reap_container('a', 'partition', acc_nodes, 'c')
self.assertEqual(r.logger.get_increment_counts()['return_codes.4'], 1)
self.assertEqual(r.stats_containers_deleted, 1)
@ -434,15 +437,14 @@ class TestReaper(unittest.TestCase):
self.reap_obj_fail = False
self.amount_delete_fail = 0
self.max_delete_fail = 2
ctx = [patch('swift.account.reaper.direct_get_container',
self.fake_direct_get_container),
patch('swift.account.reaper.direct_delete_container',
self.fake_direct_delete_container),
patch('swift.account.reaper.AccountReaper.get_container_ring',
self.fake_container_ring),
patch('swift.account.reaper.AccountReaper.reap_object',
self.fake_reap_object)]
with nested(*ctx):
with patch('swift.account.reaper.direct_get_container',
self.fake_direct_get_container), \
patch('swift.account.reaper.direct_delete_container',
self.fake_direct_delete_container), \
patch('swift.account.reaper.AccountReaper.get_container_ring',
self.fake_container_ring), \
patch('swift.account.reaper.AccountReaper.reap_object',
self.fake_reap_object):
r.reap_container('a', 'partition', acc_nodes, 'c')
self.assertEqual(r.logger.get_increment_counts()['return_codes.4'], 2)
self.assertEqual(r.stats_containers_possibly_remaining, 1)
@ -453,15 +455,14 @@ class TestReaper(unittest.TestCase):
self.reap_obj_fail = False
self.amount_delete_fail = 0
self.max_delete_fail = 3
ctx = [patch('swift.account.reaper.direct_get_container',
self.fake_direct_get_container),
patch('swift.account.reaper.direct_delete_container',
self.fake_direct_delete_container),
patch('swift.account.reaper.AccountReaper.get_container_ring',
self.fake_container_ring),
patch('swift.account.reaper.AccountReaper.reap_object',
self.fake_reap_object)]
with nested(*ctx):
with patch('swift.account.reaper.direct_get_container',
self.fake_direct_get_container), \
patch('swift.account.reaper.direct_delete_container',
self.fake_direct_delete_container), \
patch('swift.account.reaper.AccountReaper.get_container_ring',
self.fake_container_ring), \
patch('swift.account.reaper.AccountReaper.reap_object',
self.fake_reap_object):
r.reap_container('a', 'partition', acc_nodes, 'c')
self.assertEqual(r.logger.get_increment_counts()['return_codes.4'], 3)
self.assertEqual(r.stats_containers_remaining, 1)
@ -532,11 +533,10 @@ class TestReaper(unittest.TestCase):
self.r = r = self.init_reaper({}, fakelogger=True)
self.called_amount = 0
r.start_time = time.time()
ctx = [patch('swift.account.reaper.AccountReaper.reap_container',
self.fake_reap_container),
patch('swift.account.reaper.AccountReaper.get_account_ring',
self.fake_account_ring)]
with nested(*ctx):
with patch('swift.account.reaper.AccountReaper.reap_container',
self.fake_reap_container), \
patch('swift.account.reaper.AccountReaper.get_account_ring',
self.fake_account_ring):
nodes = r.get_account_ring().get_part_nodes()
self.assertTrue(r.reap_account(broker, 'partition', nodes))
self.assertTrue(r.logger.get_lines_for_level(
@ -548,13 +548,12 @@ class TestReaper(unittest.TestCase):
self.called_amount = 0
conf = {'devices': devices}
r = self.init_reaper(conf)
ctx = [patch('swift.account.reaper.AccountBroker',
FakeAccountBroker),
patch('swift.account.reaper.AccountReaper.get_account_ring',
self.fake_account_ring),
patch('swift.account.reaper.AccountReaper.reap_account',
self.fake_reap_account)]
with nested(*ctx):
with patch('swift.account.reaper.AccountBroker',
FakeAccountBroker), \
patch('swift.account.reaper.AccountReaper.get_account_ring',
self.fake_account_ring), \
patch('swift.account.reaper.AccountReaper.reap_account',
self.fake_reap_account):
r.reap_device('sda1')
self.assertEqual(self.called_amount, 1)
@ -563,13 +562,12 @@ class TestReaper(unittest.TestCase):
self.called_amount = 0
conf = {'devices': devices}
r = self.init_reaper(conf=conf)
ctx = [patch('swift.account.reaper.AccountBroker',
FakeAccountBroker),
patch('swift.account.reaper.AccountReaper.get_account_ring',
self.fake_account_ring),
patch('swift.account.reaper.AccountReaper.reap_account',
self.fake_reap_account)]
with nested(*ctx):
with patch('swift.account.reaper.AccountBroker',
FakeAccountBroker), \
patch('swift.account.reaper.AccountReaper.get_account_ring',
self.fake_account_ring), \
patch('swift.account.reaper.AccountReaper.reap_account',
self.fake_reap_account):
r.reap_device('sda1')
self.assertEqual(self.called_amount, 0)
@ -578,13 +576,12 @@ class TestReaper(unittest.TestCase):
self.called_amount = 0
conf = {'devices': devices}
r = self.init_reaper(conf, myips=['10.10.1.2'])
ctx = [patch('swift.account.reaper.AccountBroker',
FakeAccountBroker),
patch('swift.account.reaper.AccountReaper.get_account_ring',
self.fake_account_ring),
patch('swift.account.reaper.AccountReaper.reap_account',
self.fake_reap_account)]
with nested(*ctx):
with patch('swift.account.reaper.AccountBroker',
FakeAccountBroker), \
patch('swift.account.reaper.AccountReaper.get_account_ring',
self.fake_account_ring), \
patch('swift.account.reaper.AccountReaper.reap_account',
self.fake_reap_account):
r.reap_device('sda1')
self.assertEqual(self.called_amount, 0)
@ -627,14 +624,14 @@ class TestReaper(unittest.TestCase):
account_nodes, container):
container_reaped[0] += 1
ctx = [patch('swift.account.reaper.AccountBroker',
FakeAccountBroker),
patch('swift.account.reaper.AccountBroker.list_containers_iter',
fake_list_containers_iter),
patch('swift.account.reaper.AccountReaper.reap_container',
fake_reap_container), ]
fake_ring = FakeRing()
with nested(*ctx):
with patch('swift.account.reaper.AccountBroker',
FakeAccountBroker), \
patch(
'swift.account.reaper.AccountBroker.list_containers_iter',
fake_list_containers_iter), \
patch('swift.account.reaper.AccountReaper.reap_container',
fake_reap_container):
fake_broker = FakeAccountBroker(['c', 'd', 'e'])
r.reap_account(fake_broker, 10, fake_ring.nodes, 0)
self.assertEqual(container_reaped[0], 1)

View File

@ -78,7 +78,7 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync):
broker.put_container('/a/c', time.time(), 0, 0, 0,
POLICIES.default.idx)
# replicate
daemon = replicator.AccountReplicator({})
daemon = replicator.AccountReplicator({'per_diff': 1})
def _rsync_file(db_file, remote_file, **kwargs):
remote_server, remote_path = remote_file.split('/', 1)

View File

@ -25,7 +25,8 @@ from swift.common import ring, utils
from swift.common.swob import Request
from swift.common.storage_policy import StoragePolicy, POLICIES
from swift.cli.info import print_db_info_metadata, print_ring_locations, \
print_info, print_obj_metadata, print_obj, InfoSystemExit
print_info, print_obj_metadata, print_obj, InfoSystemExit, \
print_item_locations
from swift.account.server import AccountController
from swift.container.server import ContainerController
from swift.obj.diskfile import write_metadata
@ -231,6 +232,171 @@ No user metadata found in db file''' % POLICIES[0].name
self.assertTrue(exp_obj1 in out.getvalue())
self.assertTrue(exp_obj2 in out.getvalue())
def test_print_item_locations_invalid_args(self):
# No target specified
self.assertRaises(InfoSystemExit, print_item_locations,
None)
# Need a ring or policy
self.assertRaises(InfoSystemExit, print_item_locations,
None, account='account', obj='object')
# No account specified
self.assertRaises(InfoSystemExit, print_item_locations,
None, container='con')
# No policy named 'xyz' (unrecognized policy)
self.assertRaises(InfoSystemExit, print_item_locations,
None, obj='object', policy_name='xyz')
# No container specified
objring = ring.Ring(self.testdir, ring_name='object')
self.assertRaises(InfoSystemExit, print_item_locations,
objring, account='account', obj='object')
def test_print_item_locations_ring_policy_mismatch_no_target(self):
out = StringIO()
with mock.patch('sys.stdout', out):
objring = ring.Ring(self.testdir, ring_name='object')
# Test mismatch of ring and policy name (valid policy)
self.assertRaises(InfoSystemExit, print_item_locations,
objring, policy_name='zero')
self.assertTrue('Warning: mismatch between ring and policy name!'
in out.getvalue())
self.assertTrue('No target specified' in out.getvalue())
def test_print_item_locations_invalid_policy_no_target(self):
out = StringIO()
policy_name = 'nineteen'
with mock.patch('sys.stdout', out):
objring = ring.Ring(self.testdir, ring_name='object')
self.assertRaises(InfoSystemExit, print_item_locations,
objring, policy_name=policy_name)
exp_msg = 'Warning: Policy %s is not valid' % policy_name
self.assertTrue(exp_msg in out.getvalue())
self.assertTrue('No target specified' in out.getvalue())
def test_print_item_locations_policy_object(self):
out = StringIO()
part = '1'
with mock.patch('sys.stdout', out):
print_item_locations(None, partition=part, policy_name='zero',
swift_dir=self.testdir)
exp_part_msg = 'Partition\t%s' % part
exp_acct_msg = 'Account \tNone'
exp_cont_msg = 'Container\tNone'
exp_obj_msg = 'Object \tNone'
self.assertTrue(exp_part_msg in out.getvalue())
self.assertTrue(exp_acct_msg in out.getvalue())
self.assertTrue(exp_cont_msg in out.getvalue())
self.assertTrue(exp_obj_msg in out.getvalue())
def test_print_item_locations_dashed_ring_name_partition(self):
out = StringIO()
part = '1'
with mock.patch('sys.stdout', out):
print_item_locations(None, policy_name='one',
ring_name='foo-bar', partition=part,
swift_dir=self.testdir)
exp_part_msg = 'Partition\t%s' % part
exp_acct_msg = 'Account \tNone'
exp_cont_msg = 'Container\tNone'
exp_obj_msg = 'Object \tNone'
self.assertTrue(exp_part_msg in out.getvalue())
self.assertTrue(exp_acct_msg in out.getvalue())
self.assertTrue(exp_cont_msg in out.getvalue())
self.assertTrue(exp_obj_msg in out.getvalue())
def test_print_item_locations_account_with_ring(self):
out = StringIO()
account = 'account'
with mock.patch('sys.stdout', out):
account_ring = ring.Ring(self.testdir, ring_name=account)
print_item_locations(account_ring, account=account)
exp_msg = 'Account \t%s' % account
self.assertTrue(exp_msg in out.getvalue())
exp_warning = 'Warning: account specified ' + \
'but ring not named "account"'
self.assertTrue(exp_warning in out.getvalue())
exp_acct_msg = 'Account \t%s' % account
exp_cont_msg = 'Container\tNone'
exp_obj_msg = 'Object \tNone'
self.assertTrue(exp_acct_msg in out.getvalue())
self.assertTrue(exp_cont_msg in out.getvalue())
self.assertTrue(exp_obj_msg in out.getvalue())
def test_print_item_locations_account_no_ring(self):
out = StringIO()
account = 'account'
with mock.patch('sys.stdout', out):
print_item_locations(None, account=account,
swift_dir=self.testdir)
exp_acct_msg = 'Account \t%s' % account
exp_cont_msg = 'Container\tNone'
exp_obj_msg = 'Object \tNone'
self.assertTrue(exp_acct_msg in out.getvalue())
self.assertTrue(exp_cont_msg in out.getvalue())
self.assertTrue(exp_obj_msg in out.getvalue())
def test_print_item_locations_account_container_ring(self):
out = StringIO()
account = 'account'
container = 'container'
with mock.patch('sys.stdout', out):
container_ring = ring.Ring(self.testdir, ring_name='container')
print_item_locations(container_ring, account=account,
container=container)
exp_acct_msg = 'Account \t%s' % account
exp_cont_msg = 'Container\t%s' % container
exp_obj_msg = 'Object \tNone'
self.assertTrue(exp_acct_msg in out.getvalue())
self.assertTrue(exp_cont_msg in out.getvalue())
self.assertTrue(exp_obj_msg in out.getvalue())
def test_print_item_locations_account_container_no_ring(self):
out = StringIO()
account = 'account'
container = 'container'
with mock.patch('sys.stdout', out):
print_item_locations(None, account=account,
container=container, swift_dir=self.testdir)
exp_acct_msg = 'Account \t%s' % account
exp_cont_msg = 'Container\t%s' % container
exp_obj_msg = 'Object \tNone'
self.assertTrue(exp_acct_msg in out.getvalue())
self.assertTrue(exp_cont_msg in out.getvalue())
self.assertTrue(exp_obj_msg in out.getvalue())
def test_print_item_locations_account_container_object_ring(self):
out = StringIO()
account = 'account'
container = 'container'
obj = 'object'
with mock.patch('sys.stdout', out):
object_ring = ring.Ring(self.testdir, ring_name='object')
print_item_locations(object_ring, ring_name='object',
account=account, container=container,
obj=obj)
exp_acct_msg = 'Account \t%s' % account
exp_cont_msg = 'Container\t%s' % container
exp_obj_msg = 'Object \t%s' % obj
self.assertTrue(exp_acct_msg in out.getvalue())
self.assertTrue(exp_cont_msg in out.getvalue())
self.assertTrue(exp_obj_msg in out.getvalue())
def test_print_item_locations_account_container_object_dashed_ring(self):
out = StringIO()
account = 'account'
container = 'container'
obj = 'object'
with mock.patch('sys.stdout', out):
object_ring = ring.Ring(self.testdir, ring_name='object-1')
print_item_locations(object_ring, ring_name='object-1',
account=account, container=container,
obj=obj)
exp_acct_msg = 'Account \t%s' % account
exp_cont_msg = 'Container\t%s' % container
exp_obj_msg = 'Object \t%s' % obj
self.assertTrue(exp_acct_msg in out.getvalue())
self.assertTrue(exp_cont_msg in out.getvalue())
self.assertTrue(exp_obj_msg in out.getvalue())
def test_print_info(self):
db_file = 'foo'
self.assertRaises(InfoSystemExit, print_info, 'object', db_file)

View File

@ -13,7 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from contextlib import nested
import json
import mock
import os
@ -240,11 +239,8 @@ class TestRecon(unittest.TestCase):
mock_scout.return_value = scout_instance
stdout = StringIO()
mock_hash = mock.MagicMock()
patches = [
mock.patch('sys.stdout', new=stdout),
mock.patch('swift.cli.recon.md5', new=mock_hash),
]
with nested(*patches):
with mock.patch('sys.stdout', new=stdout), \
mock.patch('swift.cli.recon.md5', new=mock_hash):
mock_hash.return_value.hexdigest.return_value = \
empty_file_hash
self.recon_instance.get_ringmd5(hosts, self.swift_dir)
@ -295,11 +291,9 @@ class TestRecon(unittest.TestCase):
return url, response, status, 0, 0
stdout = StringIO()
patches = [
mock.patch('swift.cli.recon.Scout.scout', mock_scout_quarantine),
mock.patch('sys.stdout', new=stdout),
]
with nested(*patches):
with mock.patch('swift.cli.recon.Scout.scout',
mock_scout_quarantine), \
mock.patch('sys.stdout', new=stdout):
self.recon_instance.quarantine_check(hosts)
output = stdout.getvalue()
@ -332,11 +326,9 @@ class TestRecon(unittest.TestCase):
return url, response, status, 0, 0
stdout = StringIO()
patches = [
mock.patch('swift.cli.recon.Scout.scout', mock_scout_driveaudit),
mock.patch('sys.stdout', new=stdout),
]
with nested(*patches):
with mock.patch('swift.cli.recon.Scout.scout',
mock_scout_driveaudit), \
mock.patch('sys.stdout', new=stdout):
self.recon_instance.driveaudit_check(hosts)
output = stdout.getvalue()
@ -394,19 +386,15 @@ class TestReconCommands(unittest.TestCase):
return url, response, status
stdout = StringIO()
patches = [
mock.patch('swift.cli.recon.Scout.scout_server_type',
mock_scout_server_type),
mock.patch('sys.stdout', new=stdout),
]
res_object = 'Invalid: http://127.0.0.1:6010/ is object-server'
res_container = 'Invalid: http://127.0.0.1:6011/ is container-server'
res_account = 'Invalid: http://127.0.0.1:6012/ is account-server'
valid = "1/1 hosts ok, 0 error[s] while checking hosts."
# Test for object server type - default
with nested(*patches):
with mock.patch('swift.cli.recon.Scout.scout_server_type',
mock_scout_server_type), \
mock.patch('sys.stdout', new=stdout):
self.recon.server_type_check(hosts)
output = stdout.getvalue()
@ -415,7 +403,9 @@ class TestReconCommands(unittest.TestCase):
stdout.truncate(0)
# Test ok for object server type - default
with nested(*patches):
with mock.patch('swift.cli.recon.Scout.scout_server_type',
mock_scout_server_type), \
mock.patch('sys.stdout', new=stdout):
self.recon.server_type_check([hosts[0]])
output = stdout.getvalue()
@ -423,7 +413,9 @@ class TestReconCommands(unittest.TestCase):
stdout.truncate(0)
# Test for account server type
with nested(*patches):
with mock.patch('swift.cli.recon.Scout.scout_server_type',
mock_scout_server_type), \
mock.patch('sys.stdout', new=stdout):
self.recon.server_type = 'account'
self.recon.server_type_check(hosts)
@ -433,7 +425,9 @@ class TestReconCommands(unittest.TestCase):
stdout.truncate(0)
# Test ok for account server type
with nested(*patches):
with mock.patch('swift.cli.recon.Scout.scout_server_type',
mock_scout_server_type), \
mock.patch('sys.stdout', new=stdout):
self.recon.server_type = 'account'
self.recon.server_type_check([hosts[2]])
@ -442,7 +436,9 @@ class TestReconCommands(unittest.TestCase):
stdout.truncate(0)
# Test for container server type
with nested(*patches):
with mock.patch('swift.cli.recon.Scout.scout_server_type',
mock_scout_server_type), \
mock.patch('sys.stdout', new=stdout):
self.recon.server_type = 'container'
self.recon.server_type_check(hosts)
@ -452,7 +448,9 @@ class TestReconCommands(unittest.TestCase):
stdout.truncate(0)
# Test ok for container server type
with nested(*patches):
with mock.patch('swift.cli.recon.Scout.scout_server_type',
mock_scout_server_type), \
mock.patch('sys.stdout', new=stdout):
self.recon.server_type = 'container'
self.recon.server_type_check([hosts[1]])

View File

@ -30,7 +30,7 @@ from swift.common.ring import RingBuilder
class RunSwiftRingBuilderMixin(object):
def run_srb(self, *argv):
if len(argv) == 1 and isinstance(argv[0], basestring):
if len(argv) == 1 and isinstance(argv[0], six.string_types):
# convert a single string to a list
argv = shlex.split(argv[0])
mock_stdout = six.StringIO()
@ -1663,6 +1663,49 @@ class TestCommands(unittest.TestCase, RunSwiftRingBuilderMixin):
argv = ["", self.tmpfile]
self.assertRaises(SystemExit, ringbuilder.main, argv)
def test_default_show_removed(self):
mock_stdout = six.StringIO()
mock_stderr = six.StringIO()
self.create_sample_ring()
# Note: it also sets device's weight to zero.
argv = ["", self.tmpfile, "remove", "--id", "1"]
self.assertRaises(SystemExit, ringbuilder.main, argv)
# Setting another device's weight to zero to be sure we distinguish
# real removed device and device with zero weight.
argv = ["", self.tmpfile, "set_weight", "0", "--id", "3"]
self.assertRaises(SystemExit, ringbuilder.main, argv)
argv = ["", self.tmpfile]
with mock.patch("sys.stdout", mock_stdout):
with mock.patch("sys.stderr", mock_stderr):
self.assertRaises(SystemExit, ringbuilder.main, argv)
expected = "%s, build version 6\n" \
"64 partitions, 3.000000 replicas, 4 regions, 4 zones, " \
"4 devices, 100.00 balance, 0.00 dispersion\n" \
"The minimum number of hours before a partition can be " \
"reassigned is 1\n" \
"The overload factor is 0.00%% (0.000000)\n" \
"Devices: id region zone ip address port " \
"replication ip replication port name weight " \
"partitions balance flags meta\n" \
" 0 0 0 127.0.0.1 6000 " \
"127.0.0.1 6000 sda1 100.00" \
" 0 -100.00 some meta data\n" \
" 1 1 1 127.0.0.2 6001 " \
"127.0.0.2 6001 sda2 0.00" \
" 0 0.00 DEL \n" \
" 2 2 2 127.0.0.3 6002 " \
"127.0.0.3 6002 sdc3 100.00" \
" 0 -100.00 \n" \
" 3 3 3 127.0.0.4 6003 " \
"127.0.0.4 6003 sdd4 0.00" \
" 0 0.00 \n" % self.tmpfile
self.assertEqual(expected, mock_stdout.getvalue())
def test_rebalance(self):
self.create_sample_ring()
argv = ["", self.tmpfile, "rebalance", "3"]
@ -1696,6 +1739,21 @@ class TestCommands(unittest.TestCase, RunSwiftRingBuilderMixin):
err = e
self.assertEqual(err.code, 2)
def test_rebalance_remove_zero_weighted_device(self):
self.create_sample_ring()
ring = RingBuilder.load(self.tmpfile)
ring.set_dev_weight(3, 0.0)
ring.rebalance()
ring.remove_dev(3)
ring.save(self.tmpfile)
# Test rebalance after remove 0 weighted device
argv = ["", self.tmpfile, "rebalance", "3"]
self.assertRaises(SystemExit, ringbuilder.main, argv)
ring = RingBuilder.load(self.tmpfile)
self.assertTrue(ring.validate())
self.assertEqual(ring.devs[3], None)
def test_write_ring(self):
self.create_sample_ring()
argv = ["", self.tmpfile, "rebalance"]
@ -1738,7 +1796,7 @@ class TestCommands(unittest.TestCase, RunSwiftRingBuilderMixin):
os.path.basename(self.tmpfile) + ".ring.gz")
os.remove(self.tmpfile) # loses file...
argv = ["", backup_file, "write_builder"]
argv = ["", backup_file, "write_builder", "24"]
self.assertEqual(ringbuilder.main(argv), None)
def test_warn_at_risk(self):

View File

@ -14,7 +14,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import contextlib
import hashlib
import json
import mock
@ -701,12 +700,11 @@ class TestDloGetManifest(DloTestCase):
'/v1/AUTH_test/mancon/manifest',
environ={'REQUEST_METHOD': 'GET'})
with contextlib.nested(
mock.patch('swift.common.request_helpers.time.time',
mock_time),
with mock.patch('swift.common.request_helpers.time.time',
mock_time), \
mock.patch('swift.common.request_helpers.is_success',
mock_is_success),
mock.patch.object(dlo, 'is_success', mock_is_success)):
mock_is_success), \
mock.patch.object(dlo, 'is_success', mock_is_success):
status, headers, body, exc = self.call_dlo(
req, expect_exception=True)

View File

@ -19,7 +19,6 @@ from six.moves import range
import hashlib
import time
import unittest
from contextlib import nested
from mock import patch
from hashlib import md5
from swift.common import swob, utils
@ -119,28 +118,171 @@ class TestSloMiddleware(SloTestCase):
self.assertTrue(
resp.startswith('X-Static-Large-Object is a reserved header'))
def test_parse_input(self):
self.assertRaises(HTTPException, slo.parse_input, 'some non json')
self.assertRaises(HTTPException, slo.parse_input, '[{}]')
self.assertRaises(HTTPException, slo.parse_input, json.dumps(
[{'path': '/cont/object', 'etag': 'etagoftheobjecitsegment',
'size_bytes': 100, 'foo': 'bar'}]))
self.assertRaises(HTTPException, slo.parse_input, json.dumps(
[{'path': '/cont/object', 'etag': 'etagoftheobjecitsegment',
'size_bytes': 100, 'range': 'non-range value'}]))
def _put_bogus_slo(self, manifest_text,
manifest_path='/v1/a/c/the-manifest',
min_segment_size=1):
with self.assertRaises(HTTPException) as catcher:
slo.parse_and_validate_input(manifest_text, manifest_path,
min_segment_size)
self.assertEqual(400, catcher.exception.status_int)
return catcher.exception.body
def _put_slo(self, manifest_text, manifest_path='/v1/a/c/the-manifest',
min_segment_size=1):
return slo.parse_and_validate_input(manifest_text, manifest_path,
min_segment_size)
def test_bogus_input(self):
self.assertEqual('Manifest must be valid JSON.\n',
self._put_bogus_slo('some non json'))
self.assertEqual('Manifest must be a list.\n',
self._put_bogus_slo('{}'))
self.assertEqual('Index 0: not a JSON object\n',
self._put_bogus_slo('["zombocom"]'))
def test_bogus_input_bad_keys(self):
self.assertEqual(
"Index 0: extraneous keys \"baz\", \"foo\"\n",
self._put_bogus_slo(json.dumps(
[{'path': '/cont/object', 'etag': 'etagoftheobjectsegment',
'size_bytes': 100,
'foo': 'bar', 'baz': 'quux'}])))
def test_bogus_input_ranges(self):
self.assertEqual(
"Index 0: invalid range\n",
self._put_bogus_slo(json.dumps(
[{'path': '/cont/object', 'etag': 'blah',
'size_bytes': 100, 'range': 'non-range value'}])))
self.assertEqual(
"Index 0: multiple ranges (only one allowed)\n",
self._put_bogus_slo(json.dumps(
[{'path': '/cont/object', 'etag': 'blah',
'size_bytes': 100, 'range': '1-20,30-40'}])))
def test_bogus_input_unsatisfiable_range(self):
self.assertEqual(
"Index 0: unsatisfiable range\n",
self._put_bogus_slo(json.dumps(
[{'path': '/cont/object', 'etag': 'blah',
'size_bytes': 100, 'range': '8888-9999'}])))
# since size is optional, we have to be able to defer this check
segs = self._put_slo(json.dumps(
[{'path': '/cont/object', 'etag': 'blah',
'size_bytes': None, 'range': '8888-9999'}]))
self.assertEqual(1, len(segs))
def test_bogus_input_path(self):
self.assertEqual(
"Index 0: path does not refer to an object. Path must be of the "
"form /container/object.\n"
"Index 1: path does not refer to an object. Path must be of the "
"form /container/object.\n",
self._put_bogus_slo(json.dumps(
[{'path': '/cont', 'etag': 'etagoftheobjectsegment',
'size_bytes': 100},
{'path': '/c-trailing-slash/', 'etag': 'e',
'size_bytes': 100},
{'path': '/con/obj', 'etag': 'e',
'size_bytes': 100},
{'path': '/con/obj-trailing-slash/', 'etag': 'e',
'size_bytes': 100},
{'path': '/con/obj/with/slashes', 'etag': 'e',
'size_bytes': 100}])))
def test_bogus_input_multiple(self):
self.assertEqual(
"Index 0: invalid range\nIndex 1: not a JSON object\n",
self._put_bogus_slo(json.dumps(
[{'path': '/cont/object', 'etag': 'etagoftheobjectsegment',
'size_bytes': 100, 'range': 'non-range value'},
None])))
def test_bogus_input_size_bytes(self):
self.assertEqual(
"Index 0: invalid size_bytes\n",
self._put_bogus_slo(json.dumps(
[{'path': '/cont/object', 'etag': 'blah', 'size_bytes': "fht"},
{'path': '/cont/object', 'etag': 'blah', 'size_bytes': None},
{'path': '/cont/object', 'etag': 'blah', 'size_bytes': 100}],
)))
self.assertEqual(
"Index 0: invalid size_bytes\n",
self._put_bogus_slo(json.dumps(
[{'path': '/cont/object', 'etag': 'blah', 'size_bytes': []}],
)))
def test_bogus_input_self_referential(self):
self.assertEqual(
"Index 0: manifest must not include itself as a segment\n",
self._put_bogus_slo(json.dumps(
[{'path': '/c/the-manifest', 'etag': 'gate',
'size_bytes': 100, 'range': 'non-range value'}])))
def test_bogus_input_self_referential_non_ascii(self):
self.assertEqual(
"Index 0: manifest must not include itself as a segment\n",
self._put_bogus_slo(
json.dumps([{'path': u'/c/あ_1',
'etag': 'a', 'size_bytes': 1}]),
manifest_path=quote(u'/v1/a/c/あ_1')))
def test_bogus_input_self_referential_last_segment(self):
test_json_data = json.dumps([
{'path': '/c/seg_1', 'etag': 'a', 'size_bytes': 1},
{'path': '/c/seg_2', 'etag': 'a', 'size_bytes': 1},
{'path': '/c/seg_3', 'etag': 'a', 'size_bytes': 1},
{'path': '/c/the-manifest', 'etag': 'a', 'size_bytes': 1},
])
self.assertEqual(
"Index 3: manifest must not include itself as a segment\n",
self._put_bogus_slo(
test_json_data,
manifest_path=quote('/v1/a/c/the-manifest')))
def test_bogus_input_undersize_segment(self):
self.assertEqual(
"Index 1: too small; each segment, except the last, "
"must be at least 1000 bytes.\n"
"Index 2: too small; each segment, except the last, "
"must be at least 1000 bytes.\n",
self._put_bogus_slo(
json.dumps([
{'path': u'/c/s1', 'etag': 'a', 'size_bytes': 1000},
{'path': u'/c/s2', 'etag': 'b', 'size_bytes': 999},
{'path': u'/c/s3', 'etag': 'c', 'size_bytes': 998},
# No error for this one since size_bytes is unspecified
{'path': u'/c/s4', 'etag': 'd', 'size_bytes': None},
{'path': u'/c/s5', 'etag': 'e', 'size_bytes': 996}]),
min_segment_size=1000))
def test_valid_input(self):
data = json.dumps(
[{'path': '/cont/object', 'etag': 'etagoftheobjecitsegment',
[{'path': '/cont/object', 'etag': 'etagoftheobjectsegment',
'size_bytes': 100}])
self.assertEqual('/cont/object',
slo.parse_input(data)[0]['path'])
self.assertEqual(
'/cont/object',
slo.parse_and_validate_input(data, '/v1/a/cont/man', 1)[0]['path'])
data = json.dumps(
[{'path': '/cont/object', 'etag': 'etagoftheobjecitsegment',
'size_bytes': 100, 'range': '0-40,30-90'}])
parsed = slo.parse_input(data)
[{'path': '/cont/object', 'etag': 'etagoftheobjectsegment',
'size_bytes': 100, 'range': '0-40'}])
parsed = slo.parse_and_validate_input(data, '/v1/a/cont/man', 1)
self.assertEqual('/cont/object', parsed[0]['path'])
self.assertEqual([(0, 40), (30, 90)], parsed[0]['range'].ranges)
self.assertEqual([(0, 40)], parsed[0]['range'].ranges)
data = json.dumps(
[{'path': '/cont/object', 'etag': 'etagoftheobjectsegment',
'size_bytes': None, 'range': '0-40'}])
parsed = slo.parse_and_validate_input(data, '/v1/a/cont/man', 1)
self.assertEqual('/cont/object', parsed[0]['path'])
self.assertEqual(None, parsed[0]['size_bytes'])
self.assertEqual([(0, 40)], parsed[0]['range'].ranges)
class TestSloPutManifest(SloTestCase):
@ -331,7 +473,7 @@ class TestSloPutManifest(SloTestCase):
environ={'REQUEST_METHOD': 'PUT'}, headers={'Accept': 'test'},
body=test_xml_data)
no_xml = self.slo(req.environ, fake_start_response)
self.assertEqual(no_xml, ['Manifest must be valid json.'])
self.assertEqual(no_xml, ['Manifest must be valid JSON.\n'])
def test_handle_multipart_put_bad_data(self):
bad_data = json.dumps([{'path': '/cont/object',
@ -358,6 +500,7 @@ class TestSloPutManifest(SloTestCase):
'etag': 'etagoftheobj', 'size_bytes': 100}]),
json.dumps([{'path': 12, 'size_bytes': 100}]),
json.dumps([{'path': 12, 'size_bytes': 100}]),
json.dumps([{'path': '/c/o', 'etag': 123, 'size_bytes': 100}]),
json.dumps([{'path': None, 'etag': 'etagoftheobj',
'size_bytes': 100}])]:
req = Request.blank(
@ -421,46 +564,6 @@ class TestSloPutManifest(SloTestCase):
self.assertEqual(errors[4][0], '/checktest/slob')
self.assertEqual(errors[4][1], 'Etag Mismatch')
def test_handle_multipart_put_manifest_equal_slo(self):
test_json_data = json.dumps([{'path': '/cont/object',
'etag': 'etagoftheobjectsegment',
'size_bytes': 100}])
req = Request.blank(
'/v1/AUTH_test/cont/object?multipart-manifest=put',
environ={'REQUEST_METHOD': 'PUT'}, headers={'Accept': 'test'},
body=test_json_data)
status, headers, body = self.call_slo(req)
self.assertEqual(status, '409 Conflict')
self.assertEqual(self.app.call_count, 0)
def test_handle_multipart_put_manifest_equal_slo_non_ascii(self):
test_json_data = json.dumps([{'path': u'/cont/あ_1',
'etag': 'a',
'size_bytes': 1}])
path = quote(u'/v1/AUTH_test/cont/あ_1')
req = Request.blank(
path + '?multipart-manifest=put',
environ={'REQUEST_METHOD': 'PUT'}, headers={'Accept': 'test'},
body=test_json_data)
status, headers, body = self.call_slo(req)
self.assertEqual(status, '409 Conflict')
self.assertEqual(self.app.call_count, 0)
def test_handle_multipart_put_manifest_equal_last_segment(self):
test_json_data = json.dumps([{'path': '/cont/object',
'etag': 'etagoftheobjectsegment',
'size_bytes': 100},
{'path': '/cont/object2',
'etag': 'etagoftheobjectsegment',
'size_bytes': 100}])
req = Request.blank(
'/v1/AUTH_test/cont/object2?multipart-manifest=put',
environ={'REQUEST_METHOD': 'PUT'}, headers={'Accept': 'test'},
body=test_json_data)
status, headers, body = self.call_slo(req)
self.assertEqual(status, '409 Conflict')
self.assertEqual(self.app.call_count, 1)
def test_handle_multipart_put_skip_size_check(self):
good_data = json.dumps(
[{'path': '/checktest/a_1', 'etag': 'a', 'size_bytes': None},
@ -495,6 +598,24 @@ class TestSloPutManifest(SloTestCase):
self.slo.handle_multipart_put(req, fake_start_response)
self.assertEqual(cm.exception.status_int, 400)
def test_handle_multipart_put_skip_size_check_no_early_bailout(self):
with patch.object(self.slo, 'min_segment_size', 50):
# The first is too small (it's 10 bytes but min size is 50), and
# the second has a bad etag. Make sure both errors show up in
# the response.
test_json_data = json.dumps([{'path': '/cont/small_object',
'etag': 'etagoftheobjectsegment',
'size_bytes': None},
{'path': '/cont/object2',
'etag': 'wrong wrong wrong',
'size_bytes': 100}])
req = Request.blank('/v1/AUTH_test/c/o', body=test_json_data)
with self.assertRaises(HTTPException) as cm:
self.slo.handle_multipart_put(req, fake_start_response)
self.assertEqual(cm.exception.status_int, 400)
self.assertIn('at least 50 bytes', cm.exception.body)
self.assertIn('Etag Mismatch', cm.exception.body)
def test_handle_multipart_put_skip_etag_check(self):
good_data = json.dumps(
[{'path': '/checktest/a_1', 'etag': None, 'size_bytes': 1},
@ -526,6 +647,7 @@ class TestSloPutManifest(SloTestCase):
with self.assertRaises(HTTPException) as catcher:
self.slo.handle_multipart_put(req, fake_start_response)
self.assertEqual(400, catcher.exception.status_int)
self.assertIn("Unsatisfiable Range", catcher.exception.body)
def test_handle_single_ranges(self):
good_data = json.dumps(
@ -572,25 +694,6 @@ class TestSloPutManifest(SloTestCase):
self.assertEqual('etagoftheobjectsegment', manifest_data[3]['hash'])
self.assertEqual('10-40', manifest_data[3]['range'])
def test_handle_multiple_ranges_error(self):
good_data = json.dumps(
[{'path': '/checktest/a_1', 'etag': None,
'size_bytes': 1, 'range': '0-100'},
{'path': '/checktest/b_2', 'etag': None,
'size_bytes': 2, 'range': '-1,0-0'},
{'path': '/cont/object', 'etag': None,
'size_bytes': None, 'range': '10-30,20-40'}])
req = Request.blank(
'/v1/AUTH_test/checktest/man_3?multipart-manifest=put',
environ={'REQUEST_METHOD': 'PUT'}, body=good_data)
status, headers, body = self.call_slo(req)
self.assertEqual(status, '400 Bad Request')
self.assertEqual(self.app.call_count, 3)
self.assertEqual(body, '\n'.join([
'Errors:',
'/checktest/b_2, Multiple Ranges',
'/cont/object, Multiple Ranges']))
class TestSloDeleteManifest(SloTestCase):
@ -2146,13 +2249,13 @@ class TestSloGetManifest(SloTestCase):
'/v1/AUTH_test/gettest/manifest-abcd',
environ={'REQUEST_METHOD': 'GET'})
with nested(patch.object(slo, 'is_success', mock_is_success),
patch('swift.common.request_helpers.time.time',
mock_time),
patch('swift.common.request_helpers.is_success',
mock_is_success)):
status, headers, body, exc = self.call_slo(
req, expect_exception=True)
with patch.object(slo, 'is_success', mock_is_success), \
patch('swift.common.request_helpers.time.time',
mock_time), \
patch('swift.common.request_helpers.is_success',
mock_is_success):
status, headers, body, exc = self.call_slo(
req, expect_exception=True)
self.assertIsInstance(exc, SegmentError)
self.assertEqual(status, '200 OK')

View File

@ -15,8 +15,9 @@
import json
import unittest
import mock
from swift.common.swob import Request, Response
from swift.common.swob import Request, Response, HTTPUnauthorized
from swift.common.middleware import staticweb
@ -36,7 +37,8 @@ meta_map = {
'web-error': 'error.html',
'web-listings': 't',
'web-listings-css': 'listing.css'}},
'c6': {'meta': {'web-listings': 't'}},
'c6': {'meta': {'web-listings': 't',
'web-error': 'error.html'}},
'c7': {'meta': {'web-listings': 'f'}},
'c8': {'meta': {'web-error': 'error.html',
'web-listings': 't',
@ -73,6 +75,10 @@ class FakeApp(object):
def __call__(self, env, start_response):
self.calls += 1
if 'swift.authorize' in env:
resp = env['swift.authorize'](Request(env))
if resp:
return resp(env, start_response)
if env['PATH_INFO'] == '/':
return Response(status='404 Not Found')(env, start_response)
elif env['PATH_INFO'] == '/v1':
@ -182,6 +188,14 @@ class FakeApp(object):
return self.listing(env, start_response)
elif env['PATH_INFO'] == '/v1/a/c6/subdir':
return Response(status='404 Not Found')(env, start_response)
elif env['PATH_INFO'] == '/v1/a/c6/401error.html':
return Response(status='200 Ok', body='''
<html>
<body style="background: #000000; color: #ffaaaa">
<p>Hey, you're not authorized to see this!</p>
</body>
</html>
'''.strip())(env, start_response)
elif env['PATH_INFO'] in ('/v1/a/c7', '/v1/a/c7/'):
return self.listing(env, start_response)
elif env['PATH_INFO'] in ('/v1/a/c8', '/v1/a/c8/'):
@ -379,11 +393,30 @@ class FakeApp(object):
body=body)(env, start_response)
class FakeAuthFilter(object):
def __init__(self, app, deny_objects=False, deny_listing=False):
self.app = app
self.deny_objects = deny_objects
self.deny_listing = deny_listing
def authorize(self, req):
path_parts = req.path.strip('/').split('/')
if ((self.deny_objects and len(path_parts) > 3)
or (self.deny_listing and len(path_parts) == 3)):
return HTTPUnauthorized()
def __call__(self, env, start_response):
env['swift.authorize'] = self.authorize
return self.app(env, start_response)
class TestStaticWeb(unittest.TestCase):
def setUp(self):
self.app = FakeApp()
self.test_staticweb = staticweb.filter_factory({})(self.app)
self.test_staticweb = FakeAuthFilter(
staticweb.filter_factory({})(self.app))
self._orig_get_container_info = staticweb.get_container_info
staticweb.get_container_info = mock_get_container_info
@ -597,6 +630,27 @@ class TestStaticWeb(unittest.TestCase):
'/v1/a/c6/subdir').get_response(self.test_staticweb)
self.assertEqual(resp.status_int, 301)
def test_container6listing(self):
# container6 has web-listings = t, web-error=error.html
resp = Request.blank('/v1/a/c6/').get_response(self.test_staticweb)
self.assertEqual(resp.status_int, 200)
# expect custom 401 if request is not auth'd for listing but is auth'd
# to GET objects
test_staticweb = FakeAuthFilter(
staticweb.filter_factory({})(self.app), deny_listing=True)
resp = Request.blank('/v1/a/c6/').get_response(test_staticweb)
self.assertEqual(resp.status_int, 401)
self.assertIn("Hey, you're not authorized to see this!", resp.body)
# expect default 401 if request is not auth'd for listing or object GET
test_staticweb = FakeAuthFilter(
staticweb.filter_factory({})(self.app), deny_listing=True,
deny_objects=True)
resp = Request.blank('/v1/a/c6/').get_response(test_staticweb)
self.assertEqual(resp.status_int, 401)
self.assertNotIn("Hey, you're not authorized to see this!", resp.body)
def test_container7listing(self):
resp = Request.blank('/v1/a/c7/').get_response(self.test_staticweb)
self.assertEqual(resp.status_int, 404)
@ -701,6 +755,41 @@ class TestStaticWeb(unittest.TestCase):
self.assertEqual(resp.body, '1')
self.assertEqual(self.app.calls, 1)
def test_no_auth_middleware(self):
resp = Request.blank('/v1/a/c3').get_response(self.test_staticweb)
self.assertEqual(resp.status_int, 301)
# Test without an authentication middleware before staticweb
# This is no longer handled by staticweb middleware, thus not returning
# a 301 redirect
self.test_staticweb = staticweb.filter_factory({})(self.app)
resp = Request.blank('/v1/a/c3').get_response(self.test_staticweb)
self.assertEqual(resp.status_int, 200)
def test_subrequest_not_override_auth(self):
app_call = \
'swift.common.middleware.staticweb._StaticWebContext._app_call'
orig_app_call = staticweb._StaticWebContext._app_call
_fail = self.fail
def hook_app_call(self, env):
if 'swift.authorize_override' in env:
_fail('staticweb must not create authorize info by itself')
return orig_app_call(self, env)
with mock.patch(app_call, hook_app_call):
# testing for _listing container
resp = Request.blank('/v1/a/c4/').get_response(self.test_staticweb)
self.assertEqual(resp.status_int, 200) # sanity
# testing for _listing object subdir
resp = Request.blank(
'/v1/a/c4/unknown').get_response(self.test_staticweb)
self.assertEqual(resp.status_int, 404)
# testing for _error_response
resp = Request.blank('/v1/a/c5/').get_response(self.test_staticweb)
self.assertEqual(resp.status_int, 503) # sanity
if __name__ == '__main__':
unittest.main()

View File

@ -16,7 +16,7 @@
import json
import unittest
from contextlib import contextmanager, nested
from contextlib import contextmanager
from base64 import b64encode
from time import time
import mock
@ -273,8 +273,8 @@ class TestAuth(unittest.TestCase):
headers={'X-Auth-Token': 't',
'AUTHORIZATION': 'AWS s3:s3:pass'})
with nested(mock.patch('base64.urlsafe_b64decode'),
mock.patch('base64.encodestring')) as (msg, sign):
with mock.patch('base64.urlsafe_b64decode') as msg, \
mock.patch('base64.encodestring') as sign:
msg.return_value = ''
sign.return_value = 'pass'
resp = req.get_response(local_auth)

View File

@ -309,6 +309,22 @@ class TestRingBuilder(unittest.TestCase):
rb.rebalance()
rb.validate()
def test_remove_zero_weighted(self):
rb = ring.RingBuilder(8, 3, 0)
rb.add_dev({'id': 0, 'device': 'd0', 'ip': '10.0.0.1',
'port': 6002, 'weight': 1000.0, 'region': 0, 'zone': 1})
rb.add_dev({'id': 1, 'device': 'd1', 'ip': '10.0.0.2',
'port': 6002, 'weight': 0.0, 'region': 0, 'zone': 2})
rb.add_dev({'id': 2, 'device': 'd2', 'ip': '10.0.0.3',
'port': 6002, 'weight': 1000.0, 'region': 0, 'zone': 3})
rb.add_dev({'id': 3, 'device': 'd3', 'ip': '10.0.0.1',
'port': 6002, 'weight': 1000.0, 'region': 0, 'zone': 1})
rb.rebalance()
rb.remove_dev(1)
parts, balance, removed = rb.rebalance()
self.assertEqual(removed, 1)
def test_shuffled_gather(self):
if self._shuffled_gather_helper() and \
self._shuffled_gather_helper():
@ -366,7 +382,7 @@ class TestRingBuilder(unittest.TestCase):
rb.add_dev({'region': 1, 'zone': 2, 'weight': 4000.0,
'ip': '10.1.1.3', 'port': 10000, 'device': 'sdb'})
_, balance = rb.rebalance(seed=2)
_, balance, _ = rb.rebalance(seed=2)
# maybe not *perfect*, but should be close
self.assertTrue(balance <= 1)
@ -795,7 +811,7 @@ class TestRingBuilder(unittest.TestCase):
# it's as balanced as it gets, so nothing moves anymore
rb.pretend_min_part_hours_passed()
parts_moved, _balance = rb.rebalance(seed=1)
parts_moved, _balance, _removed = rb.rebalance(seed=1)
self.assertEqual(parts_moved, 0)
def test_region_fullness_with_balanceable_ring(self):
@ -867,7 +883,7 @@ class TestRingBuilder(unittest.TestCase):
rb.add_dev({'id': 3, 'region': 1, 'zone': 1, 'weight': 0.25,
'ip': '127.0.0.1', 'port': 10004, 'device': 'sda1'})
rb.pretend_min_part_hours_passed()
changed_parts, _balance = rb.rebalance(seed=2)
changed_parts, _balance, _removed = rb.rebalance(seed=2)
# there's not enough room in r1 for every partition to have a replica
# in it, so only 86 assignments occur in r1 (that's ~1/5 of the total,
@ -920,7 +936,7 @@ class TestRingBuilder(unittest.TestCase):
for weight in range(0, 101, 10):
rb.set_dev_weight(5, weight)
rb.pretend_min_part_hours_passed()
changed_parts, _balance = rb.rebalance(seed=2)
changed_parts, _balance, _removed = rb.rebalance(seed=2)
rb.validate()
moved_partitions.append(changed_parts)
# Ensure that the second region has enough partitions

View File

@ -280,6 +280,11 @@ class TestDBReplicator(unittest.TestCase):
def stub_delete_db(self, broker):
self.delete_db_calls.append('/path/to/file')
def test_creation(self):
# later config should be extended to assert more config options
replicator = TestReplicator({'node_timeout': '3.5'})
self.assertEqual(replicator.node_timeout, 3.5)
def test_repl_connection(self):
node = {'replication_ip': '127.0.0.1', 'replication_port': 80,
'device': 'sdb1'}
@ -1187,9 +1192,9 @@ class TestReplToNode(unittest.TestCase):
db_replicator.ring = FakeRing()
self.delete_db_calls = []
self.broker = FakeBroker()
self.replicator = TestReplicator({})
self.replicator = TestReplicator({'per_diff': 10})
self.fake_node = {'ip': '127.0.0.1', 'device': 'sda1', 'port': 1000}
self.fake_info = {'id': 'a', 'point': -1, 'max_row': 10, 'hash': 'b',
self.fake_info = {'id': 'a', 'point': -1, 'max_row': 20, 'hash': 'b',
'created_at': 100, 'put_timestamp': 0,
'delete_timestamp': 0, 'count': 0,
'metadata': {
@ -1201,7 +1206,7 @@ class TestReplToNode(unittest.TestCase):
self.replicator._http_connect = lambda *args: self.http
def test_repl_to_node_usync_success(self):
rinfo = {"id": 3, "point": -1, "max_row": 5, "hash": "c"}
rinfo = {"id": 3, "point": -1, "max_row": 10, "hash": "c"}
self.http = ReplHttp(simplejson.dumps(rinfo))
local_sync = self.broker.get_sync()
self.assertEqual(self.replicator._repl_to_node(
@ -1212,7 +1217,7 @@ class TestReplToNode(unittest.TestCase):
])
def test_repl_to_node_rsync_success(self):
rinfo = {"id": 3, "point": -1, "max_row": 4, "hash": "c"}
rinfo = {"id": 3, "point": -1, "max_row": 9, "hash": "c"}
self.http = ReplHttp(simplejson.dumps(rinfo))
self.broker.get_sync()
self.assertEqual(self.replicator._repl_to_node(
@ -1229,7 +1234,7 @@ class TestReplToNode(unittest.TestCase):
])
def test_repl_to_node_already_in_sync(self):
rinfo = {"id": 3, "point": -1, "max_row": 10, "hash": "b"}
rinfo = {"id": 3, "point": -1, "max_row": 20, "hash": "b"}
self.http = ReplHttp(simplejson.dumps(rinfo))
self.broker.get_sync()
self.assertEqual(self.replicator._repl_to_node(
@ -1266,6 +1271,26 @@ class TestReplToNode(unittest.TestCase):
self.assertEqual(self.replicator._repl_to_node(
self.fake_node, FakeBroker(), '0', self.fake_info), False)
def test_repl_to_node_small_container_always_usync(self):
# Tests that a small container that is > 50% out of sync will
# still use usync.
rinfo = {"id": 3, "point": -1, "hash": "c"}
# Turn per_diff back to swift's default.
self.replicator.per_diff = 1000
for r, l in ((5, 20), (40, 100), (450, 1000), (550, 1500)):
rinfo['max_row'] = r
self.fake_info['max_row'] = l
self.replicator._usync_db = mock.Mock(return_value=True)
self.http = ReplHttp(simplejson.dumps(rinfo))
local_sync = self.broker.get_sync()
self.assertEqual(self.replicator._repl_to_node(
self.fake_node, self.broker, '0', self.fake_info), True)
self.replicator._usync_db.assert_has_calls([
mock.call(max(rinfo['point'], local_sync), self.broker,
self.http, rinfo['id'], self.fake_info['id'])
])
class FakeHTTPResponse(object):

View File

@ -628,6 +628,59 @@ class TestInternalClient(unittest.TestCase):
self.assertEqual('one\xc3\xa9 two'.split(), items)
def test_iter_item_read_response_if_status_is_acceptable(self):
class Response(object):
def __init__(self, status_int, body, app_iter):
self.status_int = status_int
self.body = body
self.app_iter = app_iter
class InternalClient(internal_client.InternalClient):
def __init__(self, test, responses):
self.test = test
self.responses = responses
def make_request(
self, method, path, headers, acceptable_statuses,
body_file=None):
resp = self.responses.pop(0)
if resp.status_int in acceptable_statuses or \
resp.status_int // 100 in acceptable_statuses:
return resp
if resp:
raise internal_client.UnexpectedResponse(
'Unexpected response: %s' % resp.status_int, resp)
num_list = []
def generate_resp_body():
for i in range(1, 5):
yield str(i)
num_list.append(i)
exp_items = []
responses = [Response(204, json.dumps([]), generate_resp_body())]
items = []
client = InternalClient(self, responses)
for item in client._iter_items('/'):
items.append(item)
self.assertEqual(exp_items, items)
self.assertEqual(len(num_list), 0)
responses = [Response(300, json.dumps([]), generate_resp_body())]
client = InternalClient(self, responses)
self.assertRaises(internal_client.UnexpectedResponse,
next, client._iter_items('/'))
exp_items = []
responses = [Response(404, json.dumps([]), generate_resp_body())]
items = []
client = InternalClient(self, responses)
for item in client._iter_items('/'):
items.append(item)
self.assertEqual(exp_items, items)
self.assertEqual(len(num_list), 4)
def test_set_metadata(self):
class InternalClient(internal_client.InternalClient):
def __init__(self, test, path, exp_headers):

View File

@ -20,7 +20,7 @@ from functools import partial
from six.moves.configparser import ConfigParser
from tempfile import NamedTemporaryFile
from test.unit import patch_policies, FakeRing, temptree
from test.unit import patch_policies, FakeRing, temptree, DEFAULT_TEST_EC_TYPE
from swift.common.storage_policy import (
StoragePolicyCollection, POLICIES, PolicyError, parse_storage_policies,
reload_storage_policies, get_policy_string, split_policy_string,
@ -70,7 +70,7 @@ class TestStoragePolicies(unittest.TestCase):
StoragePolicy(1, 'one'),
StoragePolicy(2, 'two'),
StoragePolicy(3, 'three', is_deprecated=True),
ECStoragePolicy(10, 'ten', ec_type='jerasure_rs_vand',
ECStoragePolicy(10, 'ten', ec_type=DEFAULT_TEST_EC_TYPE,
ec_ndata=10, ec_nparity=4),
])
def test_swift_info(self):
@ -144,7 +144,8 @@ class TestStoragePolicies(unittest.TestCase):
test_policies = [StoragePolicy(0, 'aay', True),
StoragePolicy(1, 'bee', False),
StoragePolicy(2, 'cee', False),
ECStoragePolicy(10, 'ten', ec_type='jerasure_rs_vand',
ECStoragePolicy(10, 'ten',
ec_type=DEFAULT_TEST_EC_TYPE,
ec_ndata=10, ec_nparity=3)]
policies = StoragePolicyCollection(test_policies)
for policy in policies:
@ -295,7 +296,7 @@ class TestStoragePolicies(unittest.TestCase):
StoragePolicy(1, 'one'),
StoragePolicy(2, 'two'),
StoragePolicy(3, 'three', is_deprecated=True),
ECStoragePolicy(10, 'ten', ec_type='jerasure_rs_vand',
ECStoragePolicy(10, 'ten', ec_type=DEFAULT_TEST_EC_TYPE,
ec_ndata=10, ec_nparity=3),
]
policies = StoragePolicyCollection(test_policies)
@ -561,9 +562,9 @@ class TestStoragePolicies(unittest.TestCase):
[storage-policy:1]
name = ec10-4
policy_type = erasure_coding
ec_type = jerasure_rs_vand
ec_type = %(ec_type)s
ec_num_data_fragments = 10
""")
""" % {'ec_type': DEFAULT_TEST_EC_TYPE})
self.assertRaisesWithMessage(PolicyError,
'Invalid ec_num_parity_fragments',
@ -576,10 +577,11 @@ class TestStoragePolicies(unittest.TestCase):
[storage-policy:1]
name = ec10-4
policy_type = erasure_coding
ec_type = jerasure_rs_vand
ec_type = %(ec_type)s
ec_num_data_fragments = 10
ec_num_parity_fragments = %s
""" % num_parity)
ec_num_parity_fragments = %(num_parity)s
""" % {'ec_type': DEFAULT_TEST_EC_TYPE,
'num_parity': num_parity})
self.assertRaisesWithMessage(PolicyError,
'Invalid ec_num_parity_fragments',
@ -592,9 +594,9 @@ class TestStoragePolicies(unittest.TestCase):
[storage-policy:1]
name = ec10-4
policy_type = erasure_coding
ec_type = jerasure_rs_vand
ec_type = %(ec_type)s
ec_num_parity_fragments = 4
""")
""" % {'ec_type': DEFAULT_TEST_EC_TYPE})
self.assertRaisesWithMessage(PolicyError,
'Invalid ec_num_data_fragments',
@ -607,10 +609,10 @@ class TestStoragePolicies(unittest.TestCase):
[storage-policy:1]
name = ec10-4
policy_type = erasure_coding
ec_type = jerasure_rs_vand
ec_num_data_fragments = %s
ec_type = %(ec_type)s
ec_num_data_fragments = %(num_data)s
ec_num_parity_fragments = 4
""" % num_data)
""" % {'num_data': num_data, 'ec_type': DEFAULT_TEST_EC_TYPE})
self.assertRaisesWithMessage(PolicyError,
'Invalid ec_num_data_fragments',
@ -624,11 +626,12 @@ class TestStoragePolicies(unittest.TestCase):
[storage-policy:1]
name = ec10-4
policy_type = erasure_coding
ec_object_segment_size = %s
ec_type = jerasure_rs_vand
ec_object_segment_size = %(segment_size)s
ec_type = %(ec_type)s
ec_num_data_fragments = 10
ec_num_parity_fragments = 4
""" % segment_size)
""" % {'segment_size': segment_size,
'ec_type': DEFAULT_TEST_EC_TYPE})
self.assertRaisesWithMessage(PolicyError,
'Invalid ec_object_segment_size',
@ -900,7 +903,7 @@ class TestStoragePolicies(unittest.TestCase):
def test_quorum_size_erasure_coding(self):
test_ec_policies = [
ECStoragePolicy(10, 'ec8-2', ec_type='jerasure_rs_vand',
ECStoragePolicy(10, 'ec8-2', ec_type=DEFAULT_TEST_EC_TYPE,
ec_ndata=8, ec_nparity=2),
ECStoragePolicy(11, 'df10-6', ec_type='flat_xor_hd_4',
ec_ndata=10, ec_nparity=6),
@ -913,14 +916,14 @@ class TestStoragePolicies(unittest.TestCase):
def test_validate_ring(self):
test_policies = [
ECStoragePolicy(0, 'ec8-2', ec_type='jerasure_rs_vand',
ECStoragePolicy(0, 'ec8-2', ec_type=DEFAULT_TEST_EC_TYPE,
ec_ndata=8, ec_nparity=2,
object_ring=FakeRing(replicas=8),
is_default=True),
ECStoragePolicy(1, 'ec10-4', ec_type='jerasure_rs_vand',
ECStoragePolicy(1, 'ec10-4', ec_type=DEFAULT_TEST_EC_TYPE,
ec_ndata=10, ec_nparity=4,
object_ring=FakeRing(replicas=10)),
ECStoragePolicy(2, 'ec4-2', ec_type='jerasure_rs_vand',
ECStoragePolicy(2, 'ec4-2', ec_type=DEFAULT_TEST_EC_TYPE,
ec_ndata=4, ec_nparity=2,
object_ring=FakeRing(replicas=7)),
]
@ -939,10 +942,10 @@ class TestStoragePolicies(unittest.TestCase):
StoragePolicy(0, 'zero', is_default=True),
StoragePolicy(1, 'one', is_deprecated=True),
ECStoragePolicy(10, 'ten',
ec_type='jerasure_rs_vand',
ec_type=DEFAULT_TEST_EC_TYPE,
ec_ndata=10, ec_nparity=3),
ECStoragePolicy(11, 'done', is_deprecated=True,
ec_type='jerasure_rs_vand',
ec_type=DEFAULT_TEST_EC_TYPE,
ec_ndata=10, ec_nparity=3),
]
policies = StoragePolicyCollection(test_policies)
@ -975,7 +978,7 @@ class TestStoragePolicies(unittest.TestCase):
'default': False,
'deprecated': False,
'policy_type': EC_POLICY,
'ec_type': 'jerasure_rs_vand',
'ec_type': DEFAULT_TEST_EC_TYPE,
'ec_num_data_fragments': 10,
'ec_num_parity_fragments': 3,
'ec_object_segment_size': DEFAULT_EC_OBJECT_SEGMENT_SIZE,
@ -989,7 +992,7 @@ class TestStoragePolicies(unittest.TestCase):
'default': False,
'deprecated': True,
'policy_type': EC_POLICY,
'ec_type': 'jerasure_rs_vand',
'ec_type': DEFAULT_TEST_EC_TYPE,
'ec_num_data_fragments': 10,
'ec_num_parity_fragments': 3,
'ec_object_segment_size': DEFAULT_EC_OBJECT_SEGMENT_SIZE,

View File

@ -46,7 +46,6 @@ import traceback
import unittest
import fcntl
import shutil
from contextlib import nested
from getpass import getuser
from shutil import rmtree
@ -54,6 +53,7 @@ from functools import partial
from tempfile import TemporaryFile, NamedTemporaryFile, mkdtemp
from netifaces import AF_INET6
from mock import MagicMock, patch
from six.moves.configparser import NoSectionError, NoOptionError
from swift.common.exceptions import (Timeout, MessageTimeout,
ConnectionTimeout, LockTimeout,
@ -955,23 +955,23 @@ class TestUtils(unittest.TestCase):
# all of the boundary conditions and typical conditions.
# Block boundaries are marked with '<>' characters
blocksize = 25
lines = ['123456789x12345678><123456789\n', # block larger than rest
'123456789x123>\n', # block ends just before \n character
'123423456789\n',
'123456789x\n', # block ends at the end of line
'<123456789x123456789x123\n',
'<6789x123\n', # block ends at the beginning of the line
'6789x1234\n',
'1234><234\n', # block ends typically in the middle of line
'123456789x123456789\n']
lines = [b'123456789x12345678><123456789\n', # block larger than rest
b'123456789x123>\n', # block ends just before \n character
b'123423456789\n',
b'123456789x\n', # block ends at the end of line
b'<123456789x123456789x123\n',
b'<6789x123\n', # block ends at the beginning of the line
b'6789x1234\n',
b'1234><234\n', # block ends typically in the middle of line
b'123456789x123456789\n']
with TemporaryFile('r+w') as f:
with TemporaryFile() as f:
for line in lines:
f.write(line)
count = len(lines) - 1
for line in utils.backward(f, blocksize):
self.assertEqual(line, lines[count].split('\n')[0])
self.assertEqual(line, lines[count].split(b'\n')[0])
count -= 1
# Empty file case
@ -1546,9 +1546,8 @@ class TestUtils(unittest.TestCase):
def my_ifaddress_error(interface):
raise ValueError
with nested(
patch('netifaces.interfaces', my_interfaces),
patch('netifaces.ifaddresses', my_ifaddress_error)):
with patch('netifaces.interfaces', my_interfaces), \
patch('netifaces.ifaddresses', my_ifaddress_error):
self.assertEqual(utils.whataremyips(), [])
def test_whataremyips_ipv6(self):
@ -1562,19 +1561,16 @@ class TestUtils(unittest.TestCase):
return {AF_INET6:
[{'netmask': 'ffff:ffff:ffff:ffff::',
'addr': '%s%%%s' % (test_ipv6_address, test_interface)}]}
with nested(
patch('netifaces.interfaces', my_ipv6_interfaces),
patch('netifaces.ifaddresses', my_ipv6_ifaddresses)):
with patch('netifaces.interfaces', my_ipv6_interfaces), \
patch('netifaces.ifaddresses', my_ipv6_ifaddresses):
myips = utils.whataremyips()
self.assertEqual(len(myips), 1)
self.assertEqual(myips[0], test_ipv6_address)
def test_hash_path(self):
_prefix = utils.HASH_PATH_PREFIX
utils.HASH_PATH_PREFIX = ''
# Yes, these tests are deliberately very fragile. We want to make sure
# that if someones changes the results hash_path produces, they know it
try:
with mock.patch('swift.common.utils.HASH_PATH_PREFIX', ''):
self.assertEqual(utils.hash_path('a'),
'1c84525acb02107ea475dcd3d09c2c58')
self.assertEqual(utils.hash_path('a', 'c'),
@ -1590,8 +1586,60 @@ class TestUtils(unittest.TestCase):
utils.HASH_PATH_PREFIX = 'abcdef'
self.assertEqual(utils.hash_path('a', 'c', 'o', raw_digest=False),
'363f9b535bfb7d17a43a46a358afca0e')
finally:
utils.HASH_PATH_PREFIX = _prefix
def test_validate_hash_conf(self):
# no section causes InvalidHashPathConfigError
self._test_validate_hash_conf([], [], True)
# 'swift-hash' section is there but no options causes
# InvalidHashPathConfigError
self._test_validate_hash_conf(['swift-hash'], [], True)
# if we have the section and either of prefix or suffix,
# InvalidHashPathConfigError doesn't occur
self._test_validate_hash_conf(
['swift-hash'], ['swift_hash_path_prefix'], False)
self._test_validate_hash_conf(
['swift-hash'], ['swift_hash_path_suffix'], False)
# definitely, we have the section and both of them,
# InvalidHashPathConfigError doesn't occur
self._test_validate_hash_conf(
['swift-hash'],
['swift_hash_path_suffix', 'swift_hash_path_prefix'], False)
# But invalid section name should make an error even if valid
# options are there
self._test_validate_hash_conf(
['swift-hash-xxx'],
['swift_hash_path_suffix', 'swift_hash_path_prefix'], True)
def _test_validate_hash_conf(self, sections, options, should_raise_error):
class FakeConfigParser(object):
def read(self, conf_path):
return True
def get(self, section, option):
if section not in sections:
raise NoSectionError('section error')
elif option not in options:
raise NoOptionError('option error', 'this option')
else:
return 'some_option_value'
with mock.patch('swift.common.utils.HASH_PATH_PREFIX', ''), \
mock.patch('swift.common.utils.HASH_PATH_SUFFIX', ''), \
mock.patch('swift.common.utils.ConfigParser',
FakeConfigParser):
try:
utils.validate_hash_conf()
except utils.InvalidHashPathConfigError:
if not should_raise_error:
self.fail('validate_hash_conf should not raise an error')
else:
if should_raise_error:
self.fail('validate_hash_conf should raise an error')
def test_load_libc_function(self):
self.assertTrue(callable(
@ -1879,10 +1927,9 @@ log_name = %(yarr)s'''
curr_time[0] += 0.001
curr_time[0] += duration
with nested(
patch('time.time', my_time),
patch('time.sleep', my_sleep),
patch('eventlet.sleep', my_sleep)):
with patch('time.time', my_time), \
patch('time.sleep', my_sleep), \
patch('eventlet.sleep', my_sleep):
start = time.time()
func(*args, **kwargs)
# make sure it's accurate to 10th of a second, converting the time
@ -3828,9 +3875,8 @@ class TestRateLimitedIterator(unittest.TestCase):
curr_time[0] += 0.001
curr_time[0] += duration
with nested(
patch('time.time', my_time),
patch('eventlet.sleep', my_sleep)):
with patch('time.time', my_time), \
patch('eventlet.sleep', my_sleep):
return func(*args, **kwargs)
def test_rate_limiting(self):

View File

@ -22,7 +22,6 @@ import socket
import unittest
import os
from textwrap import dedent
from contextlib import nested
from collections import defaultdict
from eventlet import listen
@ -413,13 +412,12 @@ class TestWSGI(unittest.TestCase):
with open(conf_file, 'w') as f:
f.write(contents.replace('TEMPDIR', t))
_fake_rings(t)
with nested(
mock.patch('swift.proxy.server.Application.'
'modify_wsgi_pipeline'),
mock.patch('swift.common.wsgi.wsgi'),
mock.patch('swift.common.wsgi.eventlet'),
mock.patch('swift.common.wsgi.inspect',
getargspec=argspec_stub)) as (_, _wsgi, _, _):
with mock.patch('swift.proxy.server.Application.'
'modify_wsgi_pipeline'), \
mock.patch('swift.common.wsgi.wsgi') as _wsgi, \
mock.patch('swift.common.wsgi.eventlet'), \
mock.patch('swift.common.wsgi.inspect',
getargspec=argspec_stub):
conf = wsgi.appconfig(conf_file)
logger = logging.getLogger('test')
sock = listen(('localhost', 0))
@ -658,15 +656,15 @@ class TestWSGI(unittest.TestCase):
self.assertEqual(kwargs['global_conf'],
{'log_name': 'log_name', 'test1': 'one'})
with nested(
mock.patch.object(wsgi, '_initrp', _initrp),
mock.patch.object(wsgi, 'get_socket'),
mock.patch.object(wsgi, 'drop_privileges'),
mock.patch.object(wsgi, 'loadapp', _loadapp),
mock.patch.object(wsgi, 'capture_stdio'),
mock.patch.object(wsgi, 'run_server')):
with mock.patch.object(wsgi, '_initrp', _initrp), \
mock.patch.object(wsgi, 'get_socket'), \
mock.patch.object(wsgi, 'drop_privileges'), \
mock.patch.object(wsgi, 'loadapp', _loadapp), \
mock.patch.object(wsgi, 'capture_stdio'), \
mock.patch.object(wsgi, 'run_server'):
wsgi.run_wsgi('conf_file', 'app_section',
global_conf_callback=_global_conf_callback)
self.assertEqual(calls['_global_conf_callback'], 1)
self.assertEqual(calls['_loadapp'], 1)
@ -683,13 +681,12 @@ class TestWSGI(unittest.TestCase):
def _loadapp(uri, name=None, **kwargs):
calls['_loadapp'] += 1
with nested(
mock.patch.object(wsgi, '_initrp', _initrp),
mock.patch.object(wsgi, 'get_socket'),
mock.patch.object(wsgi, 'drop_privileges'),
mock.patch.object(wsgi, 'loadapp', _loadapp),
mock.patch.object(wsgi, 'capture_stdio'),
mock.patch.object(wsgi, 'run_server')):
with mock.patch.object(wsgi, '_initrp', _initrp), \
mock.patch.object(wsgi, 'get_socket'), \
mock.patch.object(wsgi, 'drop_privileges'), \
mock.patch.object(wsgi, 'loadapp', _loadapp), \
mock.patch.object(wsgi, 'capture_stdio'), \
mock.patch.object(wsgi, 'run_server'):
rc = wsgi.run_wsgi('conf_file', 'app_section')
self.assertEqual(calls['_initrp'], 1)
self.assertEqual(calls['_loadapp'], 1)
@ -764,13 +761,12 @@ class TestWSGI(unittest.TestCase):
def _loadapp(uri, name=None, **kwargs):
calls['_loadapp'] += 1
with nested(
mock.patch.object(wsgi, '_initrp', _initrp),
mock.patch.object(wsgi, 'get_socket'),
mock.patch.object(wsgi, 'drop_privileges'),
mock.patch.object(wsgi, 'loadapp', _loadapp),
mock.patch.object(wsgi, 'capture_stdio'),
mock.patch.object(wsgi, 'run_server')):
with mock.patch.object(wsgi, '_initrp', _initrp), \
mock.patch.object(wsgi, 'get_socket'), \
mock.patch.object(wsgi, 'drop_privileges'), \
mock.patch.object(wsgi, 'loadapp', _loadapp), \
mock.patch.object(wsgi, 'capture_stdio'), \
mock.patch.object(wsgi, 'run_server'):
rc = wsgi.run_wsgi('conf_file', 'app_section')
self.assertEqual(calls['_initrp'], 1)
self.assertEqual(calls['_loadapp'], 0)

View File

@ -12,7 +12,6 @@
# limitations under the License.
import json
import contextlib
import mock
import operator
import time
@ -567,10 +566,8 @@ class TestReconcilerUtils(unittest.TestCase):
mock_direct_delete = mock.MagicMock()
mock_direct_delete.side_effect = stub_resp
with contextlib.nested(
mock.patch(mock_path, mock_direct_delete),
mock.patch('eventlet.greenpool.DEBUG', False),
):
with mock.patch(mock_path, mock_direct_delete), \
mock.patch('eventlet.greenpool.DEBUG', False):
rv = reconciler.direct_delete_container_entry(
self.fake_ring, 'a', 'c', 'o')
self.assertEqual(rv, None)
@ -623,11 +620,9 @@ class TestReconcilerUtils(unittest.TestCase):
fake_hc = fake_http_connect(200, 200, 200, give_connect=test_connect)
now = time.time()
with contextlib.nested(
mock.patch(mock_path, fake_hc),
with mock.patch(mock_path, fake_hc), \
mock.patch('swift.container.reconciler.time.time',
lambda: now),
):
lambda: now):
ret = reconciler.add_to_reconciler_queue(
self.fake_ring, 'a', 'c', 'o', 17, 5948918.63946, 'PUT',
force=True)

View File

@ -192,7 +192,7 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync):
storage_policy_index=broker.storage_policy_index)
# replicate
node = {'device': 'sdc', 'replication_ip': '127.0.0.1'}
daemon = replicator.ContainerReplicator({})
daemon = replicator.ContainerReplicator({'per_diff': 1})
def _rsync_file(db_file, remote_file, **kwargs):
remote_server, remote_path = remote_file.split('/', 1)

View File

@ -92,6 +92,12 @@ class TestContainerController(unittest.TestCase):
self.assertEqual(str(policy_index),
resp.headers['X-Backend-Storage-Policy-Index'])
def test_creation(self):
# later config should be extended to assert more config options
replicator = container_server.ContainerController(
{'node_timeout': '3.5'})
self.assertEqual(replicator.node_timeout, 3.5)
def test_get_and_validate_policy_index(self):
# no policy is OK
req = Request.blank('/sda1/p/a/container_default', method='PUT',

View File

@ -16,7 +16,6 @@
import os
import unittest
from contextlib import nested
from textwrap import dedent
import mock
@ -492,10 +491,9 @@ class TestContainerSync(unittest.TestCase):
metadata={'x-container-sync-to': ('http://127.0.0.1/a/c', 1),
'x-container-sync-key': ('key', 1)},
items_since=[{'ROWID': 1, 'name': 'o'}])
with nested(
mock.patch('swift.container.sync.ContainerBroker',
lambda p: fcb),
mock.patch('swift.container.sync.hash_path', fake_hash_path)):
with mock.patch('swift.container.sync.ContainerBroker',
lambda p: fcb), \
mock.patch('swift.container.sync.hash_path', fake_hash_path):
cs._myips = ['10.0.0.0'] # Match
cs._myport = 1000 # Match
cs.allowed_sync_hosts = ['127.0.0.1']
@ -520,10 +518,9 @@ class TestContainerSync(unittest.TestCase):
'x-container-sync-key':
('key', 1)},
items_since=[{'ROWID': 1, 'name': 'o'}])
with nested(
mock.patch('swift.container.sync.ContainerBroker',
lambda p: fcb),
mock.patch('swift.container.sync.hash_path', fake_hash_path)):
with mock.patch('swift.container.sync.ContainerBroker',
lambda p: fcb), \
mock.patch('swift.container.sync.hash_path', fake_hash_path):
cs._myips = ['10.0.0.0'] # Match
cs._myport = 1000 # Match
cs.allowed_sync_hosts = ['127.0.0.1']
@ -567,11 +564,10 @@ class TestContainerSync(unittest.TestCase):
'x-container-sync-key': ('key', 1)},
items_since=[{'ROWID': 1, 'name': 'o', 'created_at': '1.2',
'deleted': True}])
with nested(
mock.patch('swift.container.sync.ContainerBroker',
lambda p: fcb),
with mock.patch('swift.container.sync.ContainerBroker',
lambda p: fcb), \
mock.patch('swift.container.sync.delete_object',
fake_delete_object)):
fake_delete_object):
cs._myips = ['10.0.0.0'] # Match
cs._myport = 1000 # Match
cs.allowed_sync_hosts = ['127.0.0.1']
@ -592,11 +588,10 @@ class TestContainerSync(unittest.TestCase):
'x-container-sync-key': ('key', 1)},
items_since=[{'ROWID': 1, 'name': 'o', 'created_at': '1.2',
'deleted': True}])
with nested(
mock.patch('swift.container.sync.ContainerBroker',
lambda p: fcb),
with mock.patch('swift.container.sync.ContainerBroker',
lambda p: fcb), \
mock.patch('swift.container.sync.delete_object',
lambda *x, **y: None)):
lambda *x, **y: None):
cs._myips = ['10.0.0.0'] # Match
cs._myport = 1000 # Match
cs.allowed_sync_hosts = ['127.0.0.1']

View File

@ -64,14 +64,14 @@ class TestContainerUpdater(unittest.TestCase):
'swift_dir': self.testdir,
'interval': '1',
'concurrency': '2',
'node_timeout': '5',
'node_timeout': '5.5',
})
self.assertTrue(hasattr(cu, 'logger'))
self.assertTrue(cu.logger is not None)
self.assertEqual(cu.devices, self.devices_dir)
self.assertEqual(cu.interval, 1)
self.assertEqual(cu.concurrency, 2)
self.assertEqual(cu.node_timeout, 5)
self.assertEqual(cu.node_timeout, 5.5)
self.assertTrue(cu.get_account_ring() is not None)
@mock.patch.object(container_updater, 'ismount')

80
test/unit/obj/common.py Normal file
View File

@ -0,0 +1,80 @@
# Copyright (c) 2013 - 2015 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import hashlib
import shutil
import tempfile
import unittest
import time
from swift.common.storage_policy import POLICIES
from swift.common.utils import Timestamp
from swift.obj import diskfile
from test.unit import debug_logger
class FakeReplicator(object):
def __init__(self, testdir, policy=None):
self.logger = debug_logger('test-ssync-sender')
self.conn_timeout = 1
self.node_timeout = 2
self.http_timeout = 3
self.network_chunk_size = 65536
self.disk_chunk_size = 4096
conf = {
'devices': testdir,
'mount_check': 'false',
}
policy = POLICIES.default if policy is None else policy
self._diskfile_router = diskfile.DiskFileRouter(conf, self.logger)
self._diskfile_mgr = self._diskfile_router[policy]
class BaseTest(unittest.TestCase):
def setUp(self):
# daemon will be set in subclass setUp
self.daemon = None
self.tmpdir = tempfile.mkdtemp()
def tearDown(self):
shutil.rmtree(self.tmpdir, ignore_errors=True)
def _make_open_diskfile(self, device='dev', partition='9',
account='a', container='c', obj='o', body='test',
extra_metadata=None, policy=None,
frag_index=None, timestamp=None, df_mgr=None):
policy = policy or POLICIES.legacy
object_parts = account, container, obj
timestamp = Timestamp(time.time()) if timestamp is None else timestamp
if df_mgr is None:
df_mgr = self.daemon._diskfile_router[policy]
df = df_mgr.get_diskfile(
device, partition, *object_parts, policy=policy,
frag_index=frag_index)
content_length = len(body)
etag = hashlib.md5(body).hexdigest()
with df.create() as writer:
writer.write(body)
metadata = {
'X-Timestamp': timestamp.internal,
'Content-Length': str(content_length),
'ETag': etag,
}
if extra_metadata:
metadata.update(extra_metadata)
writer.put(metadata)
writer.commit(timestamp)
df.open()
return df

View File

@ -33,13 +33,13 @@ from shutil import rmtree
from time import time
from tempfile import mkdtemp
from hashlib import md5
from contextlib import closing, nested, contextmanager
from contextlib import closing, contextmanager
from gzip import GzipFile
from eventlet import hubs, timeout, tpool
from test.unit import (FakeLogger, mock as unit_mock, temptree,
patch_policies, debug_logger, EMPTY_ETAG,
make_timestamp_iter)
make_timestamp_iter, DEFAULT_TEST_EC_TYPE)
from nose import SkipTest
from swift.obj import diskfile
@ -59,7 +59,7 @@ from swift.common.storage_policy import (
test_policies = [
StoragePolicy(0, name='zero', is_default=True),
ECStoragePolicy(1, name='one', is_default=False,
ec_type='jerasure_rs_vand',
ec_type=DEFAULT_TEST_EC_TYPE,
ec_ndata=10, ec_nparity=4),
]
@ -689,11 +689,10 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin):
def test_get_diskfile_from_hash_dev_path_fail(self):
self.df_mgr.get_dev_path = mock.MagicMock(return_value=None)
with nested(
mock.patch(self._manager_mock('diskfile_cls')),
mock.patch(self._manager_mock('hash_cleanup_listdir')),
mock.patch('swift.obj.diskfile.read_metadata')) as \
(dfclass, hclistdir, readmeta):
with mock.patch(self._manager_mock('diskfile_cls')), \
mock.patch(self._manager_mock(
'hash_cleanup_listdir')) as hclistdir, \
mock.patch('swift.obj.diskfile.read_metadata') as readmeta:
hclistdir.return_value = ['1381679759.90941.data']
readmeta.return_value = {'name': '/a/c/o'}
self.assertRaises(
@ -703,12 +702,12 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin):
def test_get_diskfile_from_hash_not_dir(self):
self.df_mgr.get_dev_path = mock.MagicMock(return_value='/srv/dev/')
with nested(
mock.patch(self._manager_mock('diskfile_cls')),
mock.patch(self._manager_mock('hash_cleanup_listdir')),
mock.patch('swift.obj.diskfile.read_metadata'),
mock.patch(self._manager_mock('quarantine_renamer'))) as \
(dfclass, hclistdir, readmeta, quarantine_renamer):
with mock.patch(self._manager_mock('diskfile_cls')), \
mock.patch(self._manager_mock(
'hash_cleanup_listdir')) as hclistdir, \
mock.patch('swift.obj.diskfile.read_metadata') as readmeta, \
mock.patch(self._manager_mock(
'quarantine_renamer')) as quarantine_renamer:
osexc = OSError()
osexc.errno = errno.ENOTDIR
hclistdir.side_effect = osexc
@ -723,11 +722,10 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin):
def test_get_diskfile_from_hash_no_dir(self):
self.df_mgr.get_dev_path = mock.MagicMock(return_value='/srv/dev/')
with nested(
mock.patch(self._manager_mock('diskfile_cls')),
mock.patch(self._manager_mock('hash_cleanup_listdir')),
mock.patch('swift.obj.diskfile.read_metadata')) as \
(dfclass, hclistdir, readmeta):
with mock.patch(self._manager_mock('diskfile_cls')), \
mock.patch(self._manager_mock(
'hash_cleanup_listdir')) as hclistdir, \
mock.patch('swift.obj.diskfile.read_metadata') as readmeta:
osexc = OSError()
osexc.errno = errno.ENOENT
hclistdir.side_effect = osexc
@ -739,11 +737,10 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin):
def test_get_diskfile_from_hash_other_oserror(self):
self.df_mgr.get_dev_path = mock.MagicMock(return_value='/srv/dev/')
with nested(
mock.patch(self._manager_mock('diskfile_cls')),
mock.patch(self._manager_mock('hash_cleanup_listdir')),
mock.patch('swift.obj.diskfile.read_metadata')) as \
(dfclass, hclistdir, readmeta):
with mock.patch(self._manager_mock('diskfile_cls')), \
mock.patch(self._manager_mock(
'hash_cleanup_listdir')) as hclistdir, \
mock.patch('swift.obj.diskfile.read_metadata') as readmeta:
osexc = OSError()
hclistdir.side_effect = osexc
readmeta.return_value = {'name': '/a/c/o'}
@ -754,11 +751,10 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin):
def test_get_diskfile_from_hash_no_actual_files(self):
self.df_mgr.get_dev_path = mock.MagicMock(return_value='/srv/dev/')
with nested(
mock.patch(self._manager_mock('diskfile_cls')),
mock.patch(self._manager_mock('hash_cleanup_listdir')),
mock.patch('swift.obj.diskfile.read_metadata')) as \
(dfclass, hclistdir, readmeta):
with mock.patch(self._manager_mock('diskfile_cls')), \
mock.patch(self._manager_mock(
'hash_cleanup_listdir')) as hclistdir, \
mock.patch('swift.obj.diskfile.read_metadata') as readmeta:
hclistdir.return_value = []
readmeta.return_value = {'name': '/a/c/o'}
self.assertRaises(
@ -768,11 +764,10 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin):
def test_get_diskfile_from_hash_read_metadata_problem(self):
self.df_mgr.get_dev_path = mock.MagicMock(return_value='/srv/dev/')
with nested(
mock.patch(self._manager_mock('diskfile_cls')),
mock.patch(self._manager_mock('hash_cleanup_listdir')),
mock.patch('swift.obj.diskfile.read_metadata')) as \
(dfclass, hclistdir, readmeta):
with mock.patch(self._manager_mock('diskfile_cls')), \
mock.patch(self._manager_mock(
'hash_cleanup_listdir')) as hclistdir, \
mock.patch('swift.obj.diskfile.read_metadata') as readmeta:
hclistdir.return_value = ['1381679759.90941.data']
readmeta.side_effect = EOFError()
self.assertRaises(
@ -782,11 +777,10 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin):
def test_get_diskfile_from_hash_no_meta_name(self):
self.df_mgr.get_dev_path = mock.MagicMock(return_value='/srv/dev/')
with nested(
mock.patch(self._manager_mock('diskfile_cls')),
mock.patch(self._manager_mock('hash_cleanup_listdir')),
mock.patch('swift.obj.diskfile.read_metadata')) as \
(dfclass, hclistdir, readmeta):
with mock.patch(self._manager_mock('diskfile_cls')), \
mock.patch(self._manager_mock(
'hash_cleanup_listdir')) as hclistdir, \
mock.patch('swift.obj.diskfile.read_metadata') as readmeta:
hclistdir.return_value = ['1381679759.90941.data']
readmeta.return_value = {}
try:
@ -799,11 +793,10 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin):
def test_get_diskfile_from_hash_bad_meta_name(self):
self.df_mgr.get_dev_path = mock.MagicMock(return_value='/srv/dev/')
with nested(
mock.patch(self._manager_mock('diskfile_cls')),
mock.patch(self._manager_mock('hash_cleanup_listdir')),
mock.patch('swift.obj.diskfile.read_metadata')) as \
(dfclass, hclistdir, readmeta):
with mock.patch(self._manager_mock('diskfile_cls')), \
mock.patch(self._manager_mock(
'hash_cleanup_listdir')) as hclistdir, \
mock.patch('swift.obj.diskfile.read_metadata') as readmeta:
hclistdir.return_value = ['1381679759.90941.data']
readmeta.return_value = {'name': 'bad'}
try:
@ -816,11 +809,10 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin):
def test_get_diskfile_from_hash(self):
self.df_mgr.get_dev_path = mock.MagicMock(return_value='/srv/dev/')
with nested(
mock.patch(self._manager_mock('diskfile_cls')),
mock.patch(self._manager_mock('hash_cleanup_listdir')),
mock.patch('swift.obj.diskfile.read_metadata')) as \
(dfclass, hclistdir, readmeta):
with mock.patch(self._manager_mock('diskfile_cls')) as dfclass, \
mock.patch(self._manager_mock(
'hash_cleanup_listdir')) as hclistdir, \
mock.patch('swift.obj.diskfile.read_metadata') as readmeta:
hclistdir.return_value = ['1381679759.90941.data']
readmeta.return_value = {'name': '/a/c/o'}
self.df_mgr.get_diskfile_from_hash(
@ -924,9 +916,8 @@ class DiskFileManagerMixin(BaseDiskFileTestMixin):
expected_items = [
(os.path.join(part_path, hash_[-3:], hash_), hash_, timestamps)
for hash_, timestamps in expected.items()]
with nested(
mock.patch('os.listdir', _listdir),
mock.patch('os.unlink')):
with mock.patch('os.listdir', _listdir), \
mock.patch('os.unlink'):
df_mgr = self.df_router[policy]
hash_items = list(df_mgr.yield_hashes(
device, part, policy, **kwargs))

View File

@ -26,7 +26,7 @@ import random
import struct
from eventlet import Timeout, sleep
from contextlib import closing, nested, contextmanager
from contextlib import closing, contextmanager
from gzip import GzipFile
from shutil import rmtree
from swift.common import utils
@ -39,7 +39,8 @@ from swift.common.storage_policy import (StoragePolicy, ECStoragePolicy,
from swift.obj.reconstructor import REVERT
from test.unit import (patch_policies, debug_logger, mocked_http_conn,
FabricatedRing, make_timestamp_iter)
FabricatedRing, make_timestamp_iter,
DEFAULT_TEST_EC_TYPE)
@contextmanager
@ -131,7 +132,8 @@ def get_header_frag_index(self, body):
@patch_policies([StoragePolicy(0, name='zero', is_default=True),
ECStoragePolicy(1, name='one', ec_type='jerasure_rs_vand',
ECStoragePolicy(1, name='one',
ec_type=DEFAULT_TEST_EC_TYPE,
ec_ndata=2, ec_nparity=1)])
class TestGlobalSetupObjectReconstructor(unittest.TestCase):
@ -1060,25 +1062,24 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
def test_process_job_all_timeout(self):
self.reconstructor._reset_stats()
with mock_ssync_sender():
with nested(mocked_http_conn(*[Timeout()] * 8)):
found_jobs = []
for part_info in self.reconstructor.collect_parts():
jobs = self.reconstructor.build_reconstruction_jobs(
part_info)
found_jobs.extend(jobs)
for job in jobs:
self.logger._clear()
self.reconstructor.process_job(job)
for line in self.logger.get_lines_for_level('error'):
self.assertTrue('Timeout (Nones)' in line)
self.assertStatCount(
'update_stats', 'suffix.hashes', 0)
self.assertStatCount(
'update_stats', 'suffix.syncs', 0)
self.assertEqual(self.reconstructor.suffix_sync, 0)
self.assertEqual(self.reconstructor.suffix_count, 0)
self.assertEqual(len(found_jobs), 6)
with mock_ssync_sender(), mocked_http_conn(*[Timeout()] * 8):
found_jobs = []
for part_info in self.reconstructor.collect_parts():
jobs = self.reconstructor.build_reconstruction_jobs(
part_info)
found_jobs.extend(jobs)
for job in jobs:
self.logger._clear()
self.reconstructor.process_job(job)
for line in self.logger.get_lines_for_level('error'):
self.assertTrue('Timeout (Nones)' in line)
self.assertStatCount(
'update_stats', 'suffix.hashes', 0)
self.assertStatCount(
'update_stats', 'suffix.syncs', 0)
self.assertEqual(self.reconstructor.suffix_sync, 0)
self.assertEqual(self.reconstructor.suffix_count, 0)
self.assertEqual(len(found_jobs), 6)
@patch_policies(with_ec_default=True)
@ -1174,10 +1175,10 @@ class TestObjectReconstructor(unittest.TestCase):
'replication_port': self.port,
})
self.reconstructor.bind_ip = '0.0.0.0' # use whataremyips
with nested(mock.patch('swift.obj.reconstructor.whataremyips',
return_value=[self.ip]),
mock.patch.object(self.policy.object_ring, '_devs',
new=stub_ring_devs)):
with mock.patch('swift.obj.reconstructor.whataremyips',
return_value=[self.ip]), \
mock.patch.object(self.policy.object_ring, '_devs',
new=stub_ring_devs):
part_infos = list(self.reconstructor.collect_parts())
found_parts = sorted(int(p['partition']) for p in part_infos)
expected_parts = sorted(itertools.chain(
@ -1226,10 +1227,10 @@ class TestObjectReconstructor(unittest.TestCase):
'replication_port': self.port,
})
self.reconstructor.bind_ip = '0.0.0.0' # use whataremyips
with nested(mock.patch('swift.obj.reconstructor.whataremyips',
return_value=[self.ip]),
mock.patch.object(self.policy.object_ring, '_devs',
new=stub_ring_devs)):
with mock.patch('swift.obj.reconstructor.whataremyips',
return_value=[self.ip]), \
mock.patch.object(self.policy.object_ring, '_devs',
new=stub_ring_devs):
part_infos = list(self.reconstructor.collect_parts())
found_parts = sorted(int(p['partition']) for p in part_infos)
expected_parts = sorted(itertools.chain(
@ -1266,10 +1267,10 @@ class TestObjectReconstructor(unittest.TestCase):
'replication_port': self.port,
} for dev in local_devs]
self.reconstructor.bind_ip = '0.0.0.0' # use whataremyips
with nested(mock.patch('swift.obj.reconstructor.whataremyips',
return_value=[self.ip]),
mock.patch.object(self.policy.object_ring, '_devs',
new=stub_ring_devs)):
with mock.patch('swift.obj.reconstructor.whataremyips',
return_value=[self.ip]), \
mock.patch.object(self.policy.object_ring, '_devs',
new=stub_ring_devs):
part_infos = list(self.reconstructor.collect_parts())
found_parts = sorted(int(p['partition']) for p in part_infos)
expected_parts = sorted(itertools.chain(
@ -1297,10 +1298,10 @@ class TestObjectReconstructor(unittest.TestCase):
'replication_ip': self.ip,
'replication_port': self.port
} for dev in local_devs]
with nested(mock.patch('swift.obj.reconstructor.whataremyips',
return_value=[self.ip]),
mock.patch.object(self.policy.object_ring, '_devs',
new=stub_ring_devs)):
with mock.patch('swift.obj.reconstructor.whataremyips',
return_value=[self.ip]), \
mock.patch.object(self.policy.object_ring, '_devs',
new=stub_ring_devs):
part_infos = list(self.reconstructor.collect_parts())
self.assertEqual(2, len(part_infos)) # sanity
self.assertEqual(set(int(p['partition']) for p in part_infos),
@ -1312,12 +1313,12 @@ class TestObjectReconstructor(unittest.TestCase):
paths.append(os.path.join(devices, device))
return False
with nested(mock.patch('swift.obj.reconstructor.whataremyips',
return_value=[self.ip]),
mock.patch.object(self.policy.object_ring, '_devs',
new=stub_ring_devs),
mock.patch('swift.obj.diskfile.check_mount',
fake_check_mount)):
with mock.patch('swift.obj.reconstructor.whataremyips',
return_value=[self.ip]), \
mock.patch.object(self.policy.object_ring, '_devs',
new=stub_ring_devs), \
mock.patch('swift.obj.diskfile.check_mount',
fake_check_mount):
part_infos = list(self.reconstructor.collect_parts())
self.assertEqual(2, len(part_infos)) # sanity, same jobs
self.assertEqual(set(int(p['partition']) for p in part_infos),
@ -1331,12 +1332,12 @@ class TestObjectReconstructor(unittest.TestCase):
self.assertTrue(self.reconstructor.mount_check)
for policy in POLICIES:
self.assertTrue(self.reconstructor._df_router[policy].mount_check)
with nested(mock.patch('swift.obj.reconstructor.whataremyips',
return_value=[self.ip]),
mock.patch.object(self.policy.object_ring, '_devs',
new=stub_ring_devs),
mock.patch('swift.obj.diskfile.check_mount',
fake_check_mount)):
with mock.patch('swift.obj.reconstructor.whataremyips',
return_value=[self.ip]), \
mock.patch.object(self.policy.object_ring, '_devs',
new=stub_ring_devs), \
mock.patch('swift.obj.diskfile.check_mount',
fake_check_mount):
part_infos = list(self.reconstructor.collect_parts())
self.assertEqual([], part_infos) # sanity, no jobs
@ -1351,12 +1352,12 @@ class TestObjectReconstructor(unittest.TestCase):
else:
return False
with nested(mock.patch('swift.obj.reconstructor.whataremyips',
return_value=[self.ip]),
mock.patch.object(self.policy.object_ring, '_devs',
new=stub_ring_devs),
mock.patch('swift.obj.diskfile.check_mount',
fake_check_mount)):
with mock.patch('swift.obj.reconstructor.whataremyips',
return_value=[self.ip]), \
mock.patch.object(self.policy.object_ring, '_devs',
new=stub_ring_devs), \
mock.patch('swift.obj.diskfile.check_mount',
fake_check_mount):
part_infos = list(self.reconstructor.collect_parts())
self.assertEqual(1, len(part_infos)) # only sda picked up (part 0)
self.assertEqual(part_infos[0]['partition'], 0)
@ -1373,14 +1374,14 @@ class TestObjectReconstructor(unittest.TestCase):
fake_unlink = mock.MagicMock()
self.reconstructor.reclaim_age = 1000
now = time.time()
with nested(mock.patch('swift.obj.reconstructor.whataremyips',
return_value=[self.ip]),
mock.patch('swift.obj.reconstructor.time.time',
return_value=now),
mock.patch.object(self.policy.object_ring, '_devs',
new=stub_ring_devs),
mock.patch('swift.obj.reconstructor.unlink_older_than',
fake_unlink)):
with mock.patch('swift.obj.reconstructor.whataremyips',
return_value=[self.ip]), \
mock.patch('swift.obj.reconstructor.time.time',
return_value=now), \
mock.patch.object(self.policy.object_ring, '_devs',
new=stub_ring_devs), \
mock.patch('swift.obj.reconstructor.unlink_older_than',
fake_unlink):
self.assertEqual([], list(self.reconstructor.collect_parts()))
# each local device hash unlink_older_than called on it,
# with now - self.reclaim_age
@ -1406,10 +1407,10 @@ class TestObjectReconstructor(unittest.TestCase):
datadir_path = os.path.join(self.devices, self.local_dev['device'],
diskfile.get_data_dir(self.policy))
utils.mkdirs(os.path.dirname(datadir_path))
with nested(mock.patch('swift.obj.reconstructor.whataremyips',
return_value=[self.ip]),
mock.patch('swift.obj.reconstructor.mkdirs',
side_effect=OSError('kaboom!'))):
with mock.patch('swift.obj.reconstructor.whataremyips',
return_value=[self.ip]), \
mock.patch('swift.obj.reconstructor.mkdirs',
side_effect=OSError('kaboom!')):
self.assertEqual([], list(self.reconstructor.collect_parts()))
error_lines = self.logger.get_lines_for_level('error')
self.assertEqual(len(error_lines), 1)
@ -1511,10 +1512,10 @@ class TestObjectReconstructor(unittest.TestCase):
('sda', 843),
]),
)
with nested(mock.patch('swift.obj.reconstructor.whataremyips',
return_value=[self.ip]),
mock.patch.object(self.policy.object_ring, '_devs',
new=stub_ring_devs)):
with mock.patch('swift.obj.reconstructor.whataremyips',
return_value=[self.ip]), \
mock.patch.object(self.policy.object_ring, '_devs',
new=stub_ring_devs):
for kwargs, expected_parts in expected:
part_infos = list(self.reconstructor.collect_parts(**kwargs))
expected_paths = set(
@ -1851,12 +1852,11 @@ class TestObjectReconstructor(unittest.TestCase):
ssync_calls = []
with nested(
mock_ssync_sender(ssync_calls),
with mock_ssync_sender(ssync_calls), \
mock.patch('swift.obj.diskfile.ECDiskFileManager._get_hashes',
return_value=(None, stub_hashes))):
with mocked_http_conn(*codes, body_iter=body_iter) as request_log:
self.reconstructor.process_job(job)
return_value=(None, stub_hashes)), \
mocked_http_conn(*codes, body_iter=body_iter) as request_log:
self.reconstructor.process_job(job)
expected_suffix_calls = set([
('10.0.0.1', '/sdb/0'),
@ -1904,12 +1904,11 @@ class TestObjectReconstructor(unittest.TestCase):
codes, body_iter = zip(*responses)
ssync_calls = []
with nested(
mock_ssync_sender(ssync_calls),
with mock_ssync_sender(ssync_calls), \
mock.patch('swift.obj.diskfile.ECDiskFileManager._get_hashes',
return_value=(None, stub_hashes))):
with mocked_http_conn(*codes, body_iter=body_iter) as request_log:
self.reconstructor.process_job(job)
return_value=(None, stub_hashes)), \
mocked_http_conn(*codes, body_iter=body_iter) as request_log:
self.reconstructor.process_job(job)
expected_suffix_calls = set([
('10.0.0.1', '/sdb/0'),
@ -1975,12 +1974,11 @@ class TestObjectReconstructor(unittest.TestCase):
codes, body_iter = zip(*responses)
ssync_calls = []
with nested(
mock_ssync_sender(ssync_calls),
with mock_ssync_sender(ssync_calls), \
mock.patch('swift.obj.diskfile.ECDiskFileManager._get_hashes',
return_value=(None, stub_hashes))):
with mocked_http_conn(*codes, body_iter=body_iter) as request_log:
self.reconstructor.process_job(job)
return_value=(None, stub_hashes)), \
mocked_http_conn(*codes, body_iter=body_iter) as request_log:
self.reconstructor.process_job(job)
expected_suffix_calls = set([
('10.0.0.1', '/sdb/0'),
@ -2041,12 +2039,11 @@ class TestObjectReconstructor(unittest.TestCase):
ssync_calls = []
with nested(
mock_ssync_sender(ssync_calls),
with mock_ssync_sender(ssync_calls), \
mock.patch('swift.obj.diskfile.ECDiskFileManager._get_hashes',
return_value=(None, stub_hashes))):
with mocked_http_conn(*codes, body_iter=body_iter) as request_log:
self.reconstructor.process_job(job)
return_value=(None, stub_hashes)), \
mocked_http_conn(*codes, body_iter=body_iter) as request_log:
self.reconstructor.process_job(job)
expected_suffix_calls = set([
('10.0.0.1', '/sdb/0'),
@ -2114,14 +2111,13 @@ class TestObjectReconstructor(unittest.TestCase):
])
ssync_calls = []
with nested(
mock_ssync_sender(ssync_calls,
response_callback=ssync_response_callback),
with mock_ssync_sender(ssync_calls,
response_callback=ssync_response_callback), \
mock.patch('swift.obj.diskfile.ECDiskFileManager._get_hashes',
return_value=(None, stub_hashes))):
with mocked_http_conn(*[200] * len(expected_suffix_calls),
body=pickle.dumps({})) as request_log:
self.reconstructor.process_job(job)
return_value=(None, stub_hashes)), \
mocked_http_conn(*[200] * len(expected_suffix_calls),
body=pickle.dumps({})) as request_log:
self.reconstructor.process_job(job)
found_suffix_calls = set((r['ip'], r['path'])
for r in request_log.requests)
@ -2176,12 +2172,11 @@ class TestObjectReconstructor(unittest.TestCase):
for r in expected_suffix_calls]
ssync_calls = []
with nested(
mock_ssync_sender(ssync_calls),
with mock_ssync_sender(ssync_calls), \
mock.patch('swift.obj.diskfile.ECDiskFileManager._get_hashes',
return_value=(None, stub_hashes))):
with mocked_http_conn(*codes) as request_log:
self.reconstructor.process_job(job)
return_value=(None, stub_hashes)), \
mocked_http_conn(*codes) as request_log:
self.reconstructor.process_job(job)
found_suffix_calls = set((r['ip'], r['path'])
for r in request_log.requests)
@ -2217,12 +2212,11 @@ class TestObjectReconstructor(unittest.TestCase):
}
ssync_calls = []
with nested(
mock_ssync_sender(ssync_calls),
with mock_ssync_sender(ssync_calls), \
mock.patch('swift.obj.diskfile.ECDiskFileManager._get_hashes',
return_value=(None, stub_hashes))):
with mocked_http_conn(200, body=pickle.dumps({})) as request_log:
self.reconstructor.process_job(job)
return_value=(None, stub_hashes)), \
mocked_http_conn(200, body=pickle.dumps({})) as request_log:
self.reconstructor.process_job(job)
expected_suffix_calls = set([
(sync_to[0]['ip'], '/%s/0/123-abc' % sync_to[0]['device']),
@ -2279,14 +2273,13 @@ class TestObjectReconstructor(unittest.TestCase):
])
ssync_calls = []
with nested(
mock_ssync_sender(ssync_calls,
response_callback=ssync_response_callback),
with mock_ssync_sender(ssync_calls,
response_callback=ssync_response_callback), \
mock.patch('swift.obj.diskfile.ECDiskFileManager._get_hashes',
return_value=(None, stub_hashes))):
with mocked_http_conn(*[200] * len(expected_suffix_calls),
body=pickle.dumps({})) as request_log:
self.reconstructor.process_job(job)
return_value=(None, stub_hashes)), \
mocked_http_conn(*[200] * len(expected_suffix_calls),
body=pickle.dumps({})) as request_log:
self.reconstructor.process_job(job)
found_suffix_calls = set((r['ip'], r['path'])
for r in request_log.requests)
@ -2339,14 +2332,13 @@ class TestObjectReconstructor(unittest.TestCase):
])
ssync_calls = []
with nested(
mock_ssync_sender(ssync_calls,
response_callback=ssync_response_callback),
with mock_ssync_sender(ssync_calls,
response_callback=ssync_response_callback), \
mock.patch('swift.obj.diskfile.ECDiskFileManager._get_hashes',
return_value=(None, stub_hashes))):
with mocked_http_conn(*[200] * len(expected_suffix_calls),
body=pickle.dumps({})) as request_log:
self.reconstructor.process_job(job)
return_value=(None, stub_hashes)), \
mocked_http_conn(*[200] * len(expected_suffix_calls),
body=pickle.dumps({})) as request_log:
self.reconstructor.process_job(job)
found_suffix_calls = set((r['ip'], r['path'])
for r in request_log.requests)

View File

@ -45,7 +45,7 @@ from nose import SkipTest
from swift import __version__ as swift_version
from swift.common.http import is_success
from test.unit import FakeLogger, debug_logger, mocked_http_conn, \
make_timestamp_iter
make_timestamp_iter, DEFAULT_TEST_EC_TYPE
from test.unit import connect_tcp, readuntil2crlfs, patch_policies
from swift.obj import server as object_server
from swift.obj import diskfile
@ -57,7 +57,7 @@ from swift.common.swob import Request, HeaderKeyDict, WsgiBytesIO
from swift.common.splice import splice
from swift.common.storage_policy import (StoragePolicy, ECStoragePolicy,
POLICIES, EC_POLICY)
from swift.common.exceptions import DiskFileDeviceUnavailable
from swift.common.exceptions import DiskFileDeviceUnavailable, DiskFileNoSpace
def mock_time(*args, **kwargs):
@ -66,7 +66,7 @@ def mock_time(*args, **kwargs):
test_policies = [
StoragePolicy(0, name='zero', is_default=True),
ECStoragePolicy(1, name='one', ec_type='jerasure_rs_vand',
ECStoragePolicy(1, name='one', ec_type=DEFAULT_TEST_EC_TYPE,
ec_ndata=10, ec_nparity=4),
]
@ -2605,6 +2605,29 @@ class TestObjectController(unittest.TestCase):
finally:
self.object_controller.container_update = orig_cu
def test_DELETE_full_drive(self):
def mock_diskfile_delete(self, timestamp):
raise DiskFileNoSpace()
t_put = utils.Timestamp(time())
req = Request.blank('/sda1/p/a/c/o',
environ={'REQUEST_METHOD': 'PUT'},
headers={'X-Timestamp': t_put.internal,
'Content-Length': 0,
'Content-Type': 'plain/text'})
resp = req.get_response(self.object_controller)
self.assertEqual(resp.status_int, 201)
with mock.patch('swift.obj.diskfile.BaseDiskFile.delete',
mock_diskfile_delete):
t_delete = utils.Timestamp(time())
req = Request.blank('/sda1/p/a/c/o',
environ={'REQUEST_METHOD': 'DELETE'},
headers={'X-Timestamp': t_delete.internal})
resp = req.get_response(self.object_controller)
self.assertEqual(resp.status_int, 507)
def test_object_update_with_offset(self):
ts = (utils.Timestamp(t).internal for t in
itertools.count(int(time())))

909
test/unit/obj/test_ssync.py Normal file
View File

@ -0,0 +1,909 @@
# Copyright (c) 2013 - 2015 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from collections import defaultdict
import mock
import os
import time
import unittest
import eventlet
import itertools
from six.moves import urllib
from swift.common.exceptions import DiskFileNotExist, DiskFileError, \
DiskFileDeleted
from swift.common import utils
from swift.common.storage_policy import POLICIES
from swift.common.utils import Timestamp
from swift.obj import ssync_sender, server
from swift.obj.reconstructor import RebuildingECDiskFileStream
from test.unit import patch_policies
from test.unit.obj.common import BaseTest, FakeReplicator
class TestBaseSsync(BaseTest):
"""
Provides a framework to test end to end interactions between sender and
receiver. The basis for each test is actual diskfile state on either side.
The connection between sender and receiver is wrapped to capture ssync
traffic for subsequent verification of the protocol. Assertions are made
about the final state of the sender and receiver diskfiles.
"""
def setUp(self):
super(TestBaseSsync, self).setUp()
self.device = 'dev'
self.partition = '9'
# sender side setup
self.tx_testdir = os.path.join(self.tmpdir, 'tmp_test_ssync_sender')
utils.mkdirs(os.path.join(self.tx_testdir, self.device))
self.daemon = FakeReplicator(self.tx_testdir)
# rx side setup
self.rx_testdir = os.path.join(self.tmpdir, 'tmp_test_ssync_receiver')
utils.mkdirs(os.path.join(self.rx_testdir, self.device))
conf = {
'devices': self.rx_testdir,
'mount_check': 'false',
'replication_one_per_device': 'false',
'log_requests': 'false'}
self.rx_controller = server.ObjectController(conf)
self.ts_iter = (Timestamp(t)
for t in itertools.count(int(time.time())))
self.rx_ip = '127.0.0.1'
sock = eventlet.listen((self.rx_ip, 0))
self.rx_server = eventlet.spawn(
eventlet.wsgi.server, sock, self.rx_controller, utils.NullLogger())
self.rx_port = sock.getsockname()[1]
self.rx_node = {'replication_ip': self.rx_ip,
'replication_port': self.rx_port,
'device': self.device}
def tearDown(self):
self.rx_server.kill()
super(TestBaseSsync, self).tearDown()
def make_connect_wrapper(self, sender):
"""
Make a wrapper function for the ssync_sender.Sender.connect() method
that will in turn wrap the HTTConnection.send() and the
Sender.readline() so that ssync protocol messages can be captured.
"""
orig_connect = sender.connect
trace = dict(messages=[])
def add_trace(type, msg):
# record a protocol event for later analysis
if msg.strip():
trace['messages'].append((type, msg.strip()))
def make_send_wrapper(send):
def wrapped_send(msg):
_msg = msg.split('\r\n', 1)[1]
_msg = _msg.rsplit('\r\n', 1)[0]
add_trace('tx', _msg)
send(msg)
return wrapped_send
def make_readline_wrapper(readline):
def wrapped_readline():
data = readline()
add_trace('rx', data)
bytes_read = trace.setdefault('readline_bytes', 0)
trace['readline_bytes'] = bytes_read + len(data)
return data
return wrapped_readline
def wrapped_connect():
orig_connect()
sender.connection.send = make_send_wrapper(
sender.connection.send)
sender.readline = make_readline_wrapper(sender.readline)
return wrapped_connect, trace
def _create_ondisk_files(self, df_mgr, obj_name, policy, timestamp,
frag_indexes=None):
frag_indexes = [None] if frag_indexes is None else frag_indexes
metadata = {'Content-Type': 'plain/text'}
diskfiles = []
for frag_index in frag_indexes:
object_data = '/a/c/%s___%s' % (obj_name, frag_index)
if frag_index is not None:
metadata['X-Object-Sysmeta-Ec-Frag-Index'] = str(frag_index)
df = self._make_open_diskfile(
device=self.device, partition=self.partition, account='a',
container='c', obj=obj_name, body=object_data,
extra_metadata=metadata, timestamp=timestamp, policy=policy,
frag_index=frag_index, df_mgr=df_mgr)
# sanity checks
listing = os.listdir(df._datadir)
self.assertTrue(listing)
for filename in listing:
self.assertTrue(filename.startswith(timestamp.internal))
diskfiles.append(df)
return diskfiles
def _open_tx_diskfile(self, obj_name, policy, frag_index=None):
df_mgr = self.daemon._diskfile_router[policy]
df = df_mgr.get_diskfile(
self.device, self.partition, account='a', container='c',
obj=obj_name, policy=policy, frag_index=frag_index)
df.open()
return df
def _open_rx_diskfile(self, obj_name, policy, frag_index=None):
df = self.rx_controller.get_diskfile(
self.device, self.partition, 'a', 'c', obj_name, policy=policy,
frag_index=frag_index)
df.open()
return df
def _verify_diskfile_sync(self, tx_df, rx_df, frag_index, same_etag=False):
# verify that diskfiles' metadata match
# sanity check, they are not the same ondisk files!
self.assertNotEqual(tx_df._datadir, rx_df._datadir)
rx_metadata = dict(rx_df.get_metadata())
for k, v in tx_df.get_metadata().items():
if k == 'X-Object-Sysmeta-Ec-Frag-Index':
# if tx_df had a frag_index then rx_df should also have one
self.assertTrue(k in rx_metadata)
self.assertEqual(frag_index, int(rx_metadata.pop(k)))
elif k == 'ETag' and not same_etag:
self.assertNotEqual(v, rx_metadata.pop(k, None))
continue
else:
self.assertEqual(v, rx_metadata.pop(k), k)
self.assertFalse(rx_metadata)
expected_body = '%s___%s' % (tx_df._name, frag_index)
actual_body = ''.join([chunk for chunk in rx_df.reader()])
self.assertEqual(expected_body, actual_body)
def _analyze_trace(self, trace):
"""
Parse protocol trace captured by fake connection, making some
assertions along the way, and return results as a dict of form:
results = {'tx_missing': <list of messages>,
'rx_missing': <list of messages>,
'tx_updates': <list of subreqs>,
'rx_updates': <list of messages>}
Each subreq is a dict with keys: 'method', 'path', 'headers', 'body'
"""
def tx_missing(results, line):
self.assertEqual('tx', line[0])
results['tx_missing'].append(line[1])
def rx_missing(results, line):
self.assertEqual('rx', line[0])
parts = line[1].split('\r\n')
for part in parts:
results['rx_missing'].append(part)
def tx_updates(results, line):
self.assertEqual('tx', line[0])
subrequests = results['tx_updates']
if line[1].startswith(('PUT', 'DELETE', 'POST')):
parts = line[1].split('\r\n')
method, path = parts[0].split()
subreq = {'method': method, 'path': path, 'req': line[1],
'headers': parts[1:]}
subrequests.append(subreq)
else:
self.assertTrue(subrequests)
body = (subrequests[-1]).setdefault('body', '')
body += line[1]
subrequests[-1]['body'] = body
def rx_updates(results, line):
self.assertEqual('rx', line[0])
results.setdefault['rx_updates'].append(line[1])
def unexpected(results, line):
results.setdefault('unexpected', []).append(line)
# each trace line is a tuple of ([tx|rx], msg)
handshakes = iter([(('tx', ':MISSING_CHECK: START'), tx_missing),
(('tx', ':MISSING_CHECK: END'), unexpected),
(('rx', ':MISSING_CHECK: START'), rx_missing),
(('rx', ':MISSING_CHECK: END'), unexpected),
(('tx', ':UPDATES: START'), tx_updates),
(('tx', ':UPDATES: END'), unexpected),
(('rx', ':UPDATES: START'), rx_updates),
(('rx', ':UPDATES: END'), unexpected)])
expect_handshake = next(handshakes)
phases = ('tx_missing', 'rx_missing', 'tx_updates', 'rx_updates')
results = dict((k, []) for k in phases)
handler = unexpected
lines = list(trace.get('messages', []))
lines.reverse()
while lines:
line = lines.pop()
if line == expect_handshake[0]:
handler = expect_handshake[1]
try:
expect_handshake = next(handshakes)
except StopIteration:
# should be the last line
self.assertFalse(
lines, 'Unexpected trailing lines %s' % lines)
continue
handler(results, line)
try:
# check all handshakes occurred
missed = next(handshakes)
self.fail('Handshake %s not found' % str(missed[0]))
except StopIteration:
pass
# check no message outside of a phase
self.assertFalse(results.get('unexpected'),
'Message outside of a phase: %s' % results.get(None))
return results
def _verify_ondisk_files(self, tx_objs, policy, tx_frag_index=None,
rx_frag_index=None):
"""
Verify tx and rx files that should be in sync.
:param tx_objs: sender diskfiles
:param policy: storage policy instance
:param tx_frag_index: the fragment index of tx diskfiles that should
have been used as a source for sync'ing
:param rx_frag_index: the fragment index of expected rx diskfiles
"""
for o_name, diskfiles in tx_objs.items():
for tx_df in diskfiles:
# check tx file still intact - ssync does not do any cleanup!
tx_df.open()
if tx_frag_index is None or tx_df._frag_index == tx_frag_index:
# this diskfile should have been sync'd,
# check rx file is ok
rx_df = self._open_rx_diskfile(
o_name, policy, rx_frag_index)
# for EC revert job or replication etags should match
match_etag = (tx_frag_index == rx_frag_index)
self._verify_diskfile_sync(
tx_df, rx_df, rx_frag_index, match_etag)
else:
# this diskfile should not have been sync'd,
# check no rx file,
self.assertRaises(DiskFileNotExist, self._open_rx_diskfile,
o_name, policy,
frag_index=tx_df._frag_index)
def _verify_tombstones(self, tx_objs, policy):
# verify tx and rx tombstones that should be in sync
for o_name, diskfiles in tx_objs.items():
try:
self._open_tx_diskfile(o_name, policy)
self.fail('DiskFileDeleted expected')
except DiskFileDeleted as exc:
tx_delete_time = exc.timestamp
try:
self._open_rx_diskfile(o_name, policy)
self.fail('DiskFileDeleted expected')
except DiskFileDeleted as exc:
rx_delete_time = exc.timestamp
self.assertEqual(tx_delete_time, rx_delete_time)
@patch_policies(with_ec_default=True)
class TestSsyncEC(TestBaseSsync):
def test_handoff_fragment_revert(self):
# test that a sync_revert type job does send the correct frag archives
# to the receiver
policy = POLICIES.default
rx_node_index = 0
tx_node_index = 1
# for a revert job we iterate over frag index that belongs on
# remote node
frag_index = rx_node_index
# create sender side diskfiles...
tx_objs = {}
rx_objs = {}
tx_tombstones = {}
tx_df_mgr = self.daemon._diskfile_router[policy]
rx_df_mgr = self.rx_controller._diskfile_router[policy]
# o1 has primary and handoff fragment archives
t1 = next(self.ts_iter)
tx_objs['o1'] = self._create_ondisk_files(
tx_df_mgr, 'o1', policy, t1, (rx_node_index, tx_node_index))
# o2 only has primary
t2 = next(self.ts_iter)
tx_objs['o2'] = self._create_ondisk_files(
tx_df_mgr, 'o2', policy, t2, (tx_node_index,))
# o3 only has handoff
t3 = next(self.ts_iter)
tx_objs['o3'] = self._create_ondisk_files(
tx_df_mgr, 'o3', policy, t3, (rx_node_index,))
# o4 primary and handoff fragment archives on tx, handoff in sync on rx
t4 = next(self.ts_iter)
tx_objs['o4'] = self._create_ondisk_files(
tx_df_mgr, 'o4', policy, t4, (tx_node_index, rx_node_index,))
rx_objs['o4'] = self._create_ondisk_files(
rx_df_mgr, 'o4', policy, t4, (rx_node_index,))
# o5 is a tombstone, missing on receiver
t5 = next(self.ts_iter)
tx_tombstones['o5'] = self._create_ondisk_files(
tx_df_mgr, 'o5', policy, t5, (tx_node_index,))
tx_tombstones['o5'][0].delete(t5)
suffixes = set()
for diskfiles in (tx_objs.values() + tx_tombstones.values()):
for df in diskfiles:
suffixes.add(os.path.basename(os.path.dirname(df._datadir)))
# create ssync sender instance...
job = {'device': self.device,
'partition': self.partition,
'policy': policy,
'frag_index': frag_index}
node = dict(self.rx_node)
node.update({'index': rx_node_index})
sender = ssync_sender.Sender(self.daemon, node, job, suffixes)
# wrap connection from tx to rx to capture ssync messages...
sender.connect, trace = self.make_connect_wrapper(sender)
# run the sync protocol...
sender()
# verify protocol
results = self._analyze_trace(trace)
# sender has handoff frags for o1, o3 and o4 and ts for o5
self.assertEqual(4, len(results['tx_missing']))
# receiver is missing frags for o1, o3 and ts for o5
self.assertEqual(3, len(results['rx_missing']))
self.assertEqual(3, len(results['tx_updates']))
self.assertFalse(results['rx_updates'])
sync_paths = []
for subreq in results.get('tx_updates'):
if subreq.get('method') == 'PUT':
self.assertTrue(
'X-Object-Sysmeta-Ec-Frag-Index: %s' % rx_node_index
in subreq.get('headers'))
expected_body = '%s___%s' % (subreq['path'], rx_node_index)
self.assertEqual(expected_body, subreq['body'])
elif subreq.get('method') == 'DELETE':
self.assertEqual('/a/c/o5', subreq['path'])
sync_paths.append(subreq.get('path'))
self.assertEqual(['/a/c/o1', '/a/c/o3', '/a/c/o5'], sorted(sync_paths))
# verify on disk files...
self._verify_ondisk_files(
tx_objs, policy, frag_index, rx_node_index)
self._verify_tombstones(tx_tombstones, policy)
def test_fragment_sync(self):
# check that a sync_only type job does call reconstructor to build a
# diskfile to send, and continues making progress despite an error
# when building one diskfile
policy = POLICIES.default
rx_node_index = 0
tx_node_index = 1
# for a sync job we iterate over frag index that belongs on local node
frag_index = tx_node_index
# create sender side diskfiles...
tx_objs = {}
tx_tombstones = {}
rx_objs = {}
tx_df_mgr = self.daemon._diskfile_router[policy]
rx_df_mgr = self.rx_controller._diskfile_router[policy]
# o1 only has primary
t1 = next(self.ts_iter)
tx_objs['o1'] = self._create_ondisk_files(
tx_df_mgr, 'o1', policy, t1, (tx_node_index,))
# o2 only has primary
t2 = next(self.ts_iter)
tx_objs['o2'] = self._create_ondisk_files(
tx_df_mgr, 'o2', policy, t2, (tx_node_index,))
# o3 only has primary
t3 = next(self.ts_iter)
tx_objs['o3'] = self._create_ondisk_files(
tx_df_mgr, 'o3', policy, t3, (tx_node_index,))
# o4 primary fragment archives on tx, handoff in sync on rx
t4 = next(self.ts_iter)
tx_objs['o4'] = self._create_ondisk_files(
tx_df_mgr, 'o4', policy, t4, (tx_node_index,))
rx_objs['o4'] = self._create_ondisk_files(
rx_df_mgr, 'o4', policy, t4, (rx_node_index,))
# o5 is a tombstone, missing on receiver
t5 = next(self.ts_iter)
tx_tombstones['o5'] = self._create_ondisk_files(
tx_df_mgr, 'o5', policy, t5, (tx_node_index,))
tx_tombstones['o5'][0].delete(t5)
suffixes = set()
for diskfiles in (tx_objs.values() + tx_tombstones.values()):
for df in diskfiles:
suffixes.add(os.path.basename(os.path.dirname(df._datadir)))
reconstruct_fa_calls = []
def fake_reconstruct_fa(job, node, metadata):
reconstruct_fa_calls.append((job, node, policy, metadata))
if len(reconstruct_fa_calls) == 2:
# simulate second reconstruct failing
raise DiskFileError
content = '%s___%s' % (metadata['name'], rx_node_index)
return RebuildingECDiskFileStream(
metadata, rx_node_index, iter([content]))
# create ssync sender instance...
job = {'device': self.device,
'partition': self.partition,
'policy': policy,
'frag_index': frag_index,
'sync_diskfile_builder': fake_reconstruct_fa}
node = dict(self.rx_node)
node.update({'index': rx_node_index})
sender = ssync_sender.Sender(self.daemon, node, job, suffixes)
# wrap connection from tx to rx to capture ssync messages...
sender.connect, trace = self.make_connect_wrapper(sender)
# run the sync protocol...
sender()
# verify protocol
results = self._analyze_trace(trace)
# sender has primary for o1, o2 and o3, o4 and ts for o5
self.assertEqual(5, len(results['tx_missing']))
# receiver is missing o1, o2 and o3 and ts for o5
self.assertEqual(4, len(results['rx_missing']))
# sender can only construct 2 out of 3 missing frags
self.assertEqual(3, len(results['tx_updates']))
self.assertEqual(3, len(reconstruct_fa_calls))
self.assertFalse(results['rx_updates'])
actual_sync_paths = []
for subreq in results.get('tx_updates'):
if subreq.get('method') == 'PUT':
self.assertTrue(
'X-Object-Sysmeta-Ec-Frag-Index: %s' % rx_node_index
in subreq.get('headers'))
expected_body = '%s___%s' % (subreq['path'], rx_node_index)
self.assertEqual(expected_body, subreq['body'])
elif subreq.get('method') == 'DELETE':
self.assertEqual('/a/c/o5', subreq['path'])
actual_sync_paths.append(subreq.get('path'))
# remove the failed df from expected synced df's
expect_sync_paths = ['/a/c/o1', '/a/c/o2', '/a/c/o3', '/a/c/o5']
failed_path = reconstruct_fa_calls[1][3]['name']
expect_sync_paths.remove(failed_path)
failed_obj = None
for obj, diskfiles in tx_objs.items():
if diskfiles[0]._name == failed_path:
failed_obj = obj
# sanity check
self.assertTrue(tx_objs.pop(failed_obj))
# verify on disk files...
self.assertEqual(sorted(expect_sync_paths), sorted(actual_sync_paths))
self._verify_ondisk_files(
tx_objs, policy, frag_index, rx_node_index)
self._verify_tombstones(tx_tombstones, policy)
def test_send_with_frag_index_none(self):
policy = POLICIES.default
tx_df_mgr = self.daemon._diskfile_router[policy]
rx_df_mgr = self.rx_controller._diskfile_router[policy]
# create an ec fragment on the remote node
ts1 = next(self.ts_iter)
remote_df = self._create_ondisk_files(
rx_df_mgr, 'o', policy, ts1, (3,))[0]
# create a tombstone on the local node
df = self._create_ondisk_files(
tx_df_mgr, 'o', policy, ts1, (3,))[0]
suffix = os.path.basename(os.path.dirname(df._datadir))
ts2 = next(self.ts_iter)
df.delete(ts2)
# a reconstructor revert job with only tombstones will have frag_index
# explicitly set to None
job = {
'frag_index': None,
'partition': self.partition,
'policy': policy,
'device': self.device,
}
sender = ssync_sender.Sender(
self.daemon, self.rx_node, job, [suffix])
success, _ = sender()
self.assertTrue(success)
try:
remote_df.read_metadata()
except DiskFileDeleted as e:
self.assertEqual(e.timestamp, ts2)
else:
self.fail('Successfully opened remote DiskFile')
def test_send_invalid_frag_index(self):
policy = POLICIES.default
job = {'frag_index': 'Not a number',
'device': self.device,
'partition': self.partition,
'policy': policy}
sender = ssync_sender.Sender(
self.daemon, self.rx_node, job, ['abc'])
success, _ = sender()
self.assertFalse(success)
error_log_lines = self.daemon.logger.get_lines_for_level('error')
self.assertEqual(1, len(error_log_lines))
error_msg = error_log_lines[0]
self.assertIn("Expected status 200; got 400", error_msg)
self.assertIn("Invalid X-Backend-Ssync-Frag-Index 'Not a number'",
error_msg)
@patch_policies
class TestSsyncReplication(TestBaseSsync):
def test_sync(self):
policy = POLICIES.default
rx_node_index = 0
# create sender side diskfiles...
tx_objs = {}
rx_objs = {}
tx_tombstones = {}
rx_tombstones = {}
tx_df_mgr = self.daemon._diskfile_router[policy]
rx_df_mgr = self.rx_controller._diskfile_router[policy]
# o1 and o2 are on tx only
t1 = next(self.ts_iter)
tx_objs['o1'] = self._create_ondisk_files(tx_df_mgr, 'o1', policy, t1)
t2 = next(self.ts_iter)
tx_objs['o2'] = self._create_ondisk_files(tx_df_mgr, 'o2', policy, t2)
# o3 is on tx and older copy on rx
t3a = next(self.ts_iter)
rx_objs['o3'] = self._create_ondisk_files(rx_df_mgr, 'o3', policy, t3a)
t3b = next(self.ts_iter)
tx_objs['o3'] = self._create_ondisk_files(tx_df_mgr, 'o3', policy, t3b)
# o4 in sync on rx and tx
t4 = next(self.ts_iter)
tx_objs['o4'] = self._create_ondisk_files(tx_df_mgr, 'o4', policy, t4)
rx_objs['o4'] = self._create_ondisk_files(rx_df_mgr, 'o4', policy, t4)
# o5 is a tombstone, missing on receiver
t5 = next(self.ts_iter)
tx_tombstones['o5'] = self._create_ondisk_files(
tx_df_mgr, 'o5', policy, t5)
tx_tombstones['o5'][0].delete(t5)
# o6 is a tombstone, in sync on tx and rx
t6 = next(self.ts_iter)
tx_tombstones['o6'] = self._create_ondisk_files(
tx_df_mgr, 'o6', policy, t6)
tx_tombstones['o6'][0].delete(t6)
rx_tombstones['o6'] = self._create_ondisk_files(
rx_df_mgr, 'o6', policy, t6)
rx_tombstones['o6'][0].delete(t6)
# o7 is a tombstone on tx, older data on rx
t7a = next(self.ts_iter)
rx_objs['o7'] = self._create_ondisk_files(rx_df_mgr, 'o7', policy, t7a)
t7b = next(self.ts_iter)
tx_tombstones['o7'] = self._create_ondisk_files(
tx_df_mgr, 'o7', policy, t7b)
tx_tombstones['o7'][0].delete(t7b)
suffixes = set()
for diskfiles in (tx_objs.values() + tx_tombstones.values()):
for df in diskfiles:
suffixes.add(os.path.basename(os.path.dirname(df._datadir)))
# create ssync sender instance...
job = {'device': self.device,
'partition': self.partition,
'policy': policy}
node = dict(self.rx_node)
node.update({'index': rx_node_index})
sender = ssync_sender.Sender(self.daemon, node, job, suffixes)
# wrap connection from tx to rx to capture ssync messages...
sender.connect, trace = self.make_connect_wrapper(sender)
# run the sync protocol...
success, in_sync_objs = sender()
self.assertEqual(7, len(in_sync_objs))
self.assertTrue(success)
# verify protocol
results = self._analyze_trace(trace)
self.assertEqual(7, len(results['tx_missing']))
self.assertEqual(5, len(results['rx_missing']))
self.assertEqual(5, len(results['tx_updates']))
self.assertFalse(results['rx_updates'])
sync_paths = []
for subreq in results.get('tx_updates'):
if subreq.get('method') == 'PUT':
self.assertTrue(
subreq['path'] in ('/a/c/o1', '/a/c/o2', '/a/c/o3'))
expected_body = '%s___None' % subreq['path']
self.assertEqual(expected_body, subreq['body'])
elif subreq.get('method') == 'DELETE':
self.assertTrue(subreq['path'] in ('/a/c/o5', '/a/c/o7'))
sync_paths.append(subreq.get('path'))
self.assertEqual(
['/a/c/o1', '/a/c/o2', '/a/c/o3', '/a/c/o5', '/a/c/o7'],
sorted(sync_paths))
# verify on disk files...
self._verify_ondisk_files(tx_objs, policy)
self._verify_tombstones(tx_tombstones, policy)
def test_nothing_to_sync(self):
job = {'device': self.device,
'partition': self.partition,
'policy': POLICIES.default}
node = {'replication_ip': self.rx_ip,
'replication_port': self.rx_port,
'device': self.device,
'index': 0}
sender = ssync_sender.Sender(self.daemon, node, job, ['abc'])
# wrap connection from tx to rx to capture ssync messages...
sender.connect, trace = self.make_connect_wrapper(sender)
result, in_sync_objs = sender()
self.assertTrue(result)
self.assertFalse(in_sync_objs)
results = self._analyze_trace(trace)
self.assertFalse(results['tx_missing'])
self.assertFalse(results['rx_missing'])
self.assertFalse(results['tx_updates'])
self.assertFalse(results['rx_updates'])
# Minimal receiver response as read by sender:
# 2 <-- initial \r\n to start ssync exchange
# + 23 <-- :MISSING CHECK START\r\n
# + 2 <-- \r\n (minimal missing check response)
# + 21 <-- :MISSING CHECK END\r\n
# + 17 <-- :UPDATES START\r\n
# + 15 <-- :UPDATES END\r\n
# TOTAL = 80
self.assertEqual(80, trace.get('readline_bytes'))
def test_meta_file_sync(self):
policy = POLICIES.default
rx_node_index = 0
# create diskfiles...
tx_objs = {}
rx_objs = {}
tx_tombstones = {}
rx_tombstones = {}
tx_df_mgr = self.daemon._diskfile_router[policy]
rx_df_mgr = self.rx_controller._diskfile_router[policy]
expected_subreqs = defaultdict(list)
# o1 on tx only with meta file
t1 = next(self.ts_iter)
tx_objs['o1'] = self._create_ondisk_files(tx_df_mgr, 'o1', policy, t1)
t1_meta = next(self.ts_iter)
metadata = {'X-Timestamp': t1_meta.internal,
'X-Object-Meta-Test': 'o1',
'X-Object-Sysmeta-Test': 'sys_o1'}
tx_objs['o1'][0].write_metadata(metadata)
expected_subreqs['PUT'].append('o1')
expected_subreqs['POST'].append('o1')
# o2 on tx with meta, on rx without meta
t2 = next(self.ts_iter)
tx_objs['o2'] = self._create_ondisk_files(tx_df_mgr, 'o2', policy, t2)
t2_meta = next(self.ts_iter)
metadata = {'X-Timestamp': t2_meta.internal,
'X-Object-Meta-Test': 'o2',
'X-Object-Sysmeta-Test': 'sys_o2'}
tx_objs['o2'][0].write_metadata(metadata)
rx_objs['o2'] = self._create_ondisk_files(rx_df_mgr, 'o2', policy, t2)
expected_subreqs['POST'].append('o2')
# o3 is on tx with meta, rx has newer data but no meta
t3a = next(self.ts_iter)
tx_objs['o3'] = self._create_ondisk_files(tx_df_mgr, 'o3', policy, t3a)
t3b = next(self.ts_iter)
rx_objs['o3'] = self._create_ondisk_files(rx_df_mgr, 'o3', policy, t3b)
t3_meta = next(self.ts_iter)
metadata = {'X-Timestamp': t3_meta.internal,
'X-Object-Meta-Test': 'o3',
'X-Object-Sysmeta-Test': 'sys_o3'}
tx_objs['o3'][0].write_metadata(metadata)
expected_subreqs['POST'].append('o3')
# o4 is on tx with meta, rx has older data and up to date meta
t4a = next(self.ts_iter)
rx_objs['o4'] = self._create_ondisk_files(rx_df_mgr, 'o4', policy, t4a)
t4b = next(self.ts_iter)
tx_objs['o4'] = self._create_ondisk_files(tx_df_mgr, 'o4', policy, t4b)
t4_meta = next(self.ts_iter)
metadata = {'X-Timestamp': t4_meta.internal,
'X-Object-Meta-Test': 'o4',
'X-Object-Sysmeta-Test': 'sys_o4'}
tx_objs['o4'][0].write_metadata(metadata)
rx_objs['o4'][0].write_metadata(metadata)
expected_subreqs['PUT'].append('o4')
# o5 is on tx with meta, rx is in sync with data and meta
t5 = next(self.ts_iter)
rx_objs['o5'] = self._create_ondisk_files(rx_df_mgr, 'o5', policy, t5)
tx_objs['o5'] = self._create_ondisk_files(tx_df_mgr, 'o5', policy, t5)
t5_meta = next(self.ts_iter)
metadata = {'X-Timestamp': t5_meta.internal,
'X-Object-Meta-Test': 'o5',
'X-Object-Sysmeta-Test': 'sys_o5'}
tx_objs['o5'][0].write_metadata(metadata)
rx_objs['o5'][0].write_metadata(metadata)
# o6 is tombstone on tx, rx has older data and meta
t6 = next(self.ts_iter)
tx_tombstones['o6'] = self._create_ondisk_files(
tx_df_mgr, 'o6', policy, t6)
rx_tombstones['o6'] = self._create_ondisk_files(
rx_df_mgr, 'o6', policy, t6)
metadata = {'X-Timestamp': next(self.ts_iter).internal,
'X-Object-Meta-Test': 'o6',
'X-Object-Sysmeta-Test': 'sys_o6'}
rx_tombstones['o6'][0].write_metadata(metadata)
tx_tombstones['o6'][0].delete(next(self.ts_iter))
expected_subreqs['DELETE'].append('o6')
# o7 is tombstone on rx, tx has older data and meta,
# no subreqs expected...
t7 = next(self.ts_iter)
tx_objs['o7'] = self._create_ondisk_files(tx_df_mgr, 'o7', policy, t7)
rx_tombstones['o7'] = self._create_ondisk_files(
rx_df_mgr, 'o7', policy, t7)
metadata = {'X-Timestamp': next(self.ts_iter).internal,
'X-Object-Meta-Test': 'o7',
'X-Object-Sysmeta-Test': 'sys_o7'}
tx_objs['o7'][0].write_metadata(metadata)
rx_tombstones['o7'][0].delete(next(self.ts_iter))
suffixes = set()
for diskfiles in (tx_objs.values() + tx_tombstones.values()):
for df in diskfiles:
suffixes.add(os.path.basename(os.path.dirname(df._datadir)))
# create ssync sender instance...
job = {'device': self.device,
'partition': self.partition,
'policy': policy}
node = dict(self.rx_node)
node.update({'index': rx_node_index})
sender = ssync_sender.Sender(self.daemon, node, job, suffixes)
# wrap connection from tx to rx to capture ssync messages...
sender.connect, trace = self.make_connect_wrapper(sender)
# run the sync protocol...
success, in_sync_objs = sender()
self.assertEqual(7, len(in_sync_objs))
self.assertTrue(success)
# verify protocol
results = self._analyze_trace(trace)
self.assertEqual(7, len(results['tx_missing']))
self.assertEqual(5, len(results['rx_missing']))
for subreq in results.get('tx_updates'):
obj = subreq['path'].split('/')[3]
method = subreq['method']
self.assertTrue(obj in expected_subreqs[method],
'Unexpected %s subreq for object %s, expected %s'
% (method, obj, expected_subreqs[method]))
expected_subreqs[method].remove(obj)
if method == 'PUT':
expected_body = '%s___None' % subreq['path']
self.assertEqual(expected_body, subreq['body'])
# verify all expected subreqs consumed
for _method, expected in expected_subreqs.items():
self.assertFalse(expected)
self.assertFalse(results['rx_updates'])
# verify on disk files...
del tx_objs['o7'] # o7 not expected to be sync'd
self._verify_ondisk_files(tx_objs, policy)
self._verify_tombstones(tx_tombstones, policy)
for oname, rx_obj in rx_objs.items():
df = rx_obj[0].open()
metadata = df.get_metadata()
self.assertEqual(metadata['X-Object-Meta-Test'], oname)
self.assertEqual(metadata['X-Object-Sysmeta-Test'], 'sys_' + oname)
def test_meta_file_not_synced_to_legacy_receiver(self):
# verify that the sender does sync a data file to a legacy receiver,
# but does not PUT meta file content to a legacy receiver
policy = POLICIES.default
rx_node_index = 0
# create diskfiles...
tx_df_mgr = self.daemon._diskfile_router[policy]
rx_df_mgr = self.rx_controller._diskfile_router[policy]
# rx has data at t1 but no meta
# object is on tx with data at t2, meta at t3,
t1 = next(self.ts_iter)
self._create_ondisk_files(rx_df_mgr, 'o1', policy, t1)
t2 = next(self.ts_iter)
tx_obj = self._create_ondisk_files(tx_df_mgr, 'o1', policy, t2)[0]
t3 = next(self.ts_iter)
metadata = {'X-Timestamp': t3.internal,
'X-Object-Meta-Test': 'o3',
'X-Object-Sysmeta-Test': 'sys_o3'}
tx_obj.write_metadata(metadata)
suffixes = [os.path.basename(os.path.dirname(tx_obj._datadir))]
# create ssync sender instance...
job = {'device': self.device,
'partition': self.partition,
'policy': policy}
node = dict(self.rx_node)
node.update({'index': rx_node_index})
sender = ssync_sender.Sender(self.daemon, node, job, suffixes)
# wrap connection from tx to rx to capture ssync messages...
sender.connect, trace = self.make_connect_wrapper(sender)
def _legacy_check_missing(self, line):
# reproduces behavior of 'legacy' ssync receiver missing_checks()
parts = line.split()
object_hash = urllib.parse.unquote(parts[0])
timestamp = urllib.parse.unquote(parts[1])
want = False
try:
df = self.diskfile_mgr.get_diskfile_from_hash(
self.device, self.partition, object_hash, self.policy,
frag_index=self.frag_index)
except DiskFileNotExist:
want = True
else:
try:
df.open()
except DiskFileDeleted as err:
want = err.timestamp < timestamp
except DiskFileError:
want = True
else:
want = df.timestamp < timestamp
if want:
return urllib.parse.quote(object_hash)
return None
# run the sync protocol...
func = 'swift.obj.ssync_receiver.Receiver._check_missing'
with mock.patch(func, _legacy_check_missing):
success, in_sync_objs = sender()
self.assertEqual(1, len(in_sync_objs))
self.assertTrue(success)
# verify protocol, expecting only a PUT to legacy receiver
results = self._analyze_trace(trace)
self.assertEqual(1, len(results['tx_missing']))
self.assertEqual(1, len(results['rx_missing']))
self.assertEqual(1, len(results['tx_updates']))
self.assertEqual('PUT', results['tx_updates'][0]['method'])
self.assertFalse(results['rx_updates'])
# verify on disk files...
rx_obj = self._open_rx_diskfile('o1', policy)
tx_obj = self._open_tx_diskfile('o1', policy)
# with legacy behavior rx_obj data and meta timestamps are equal
self.assertEqual(t2, rx_obj.data_timestamp)
self.assertEqual(t2, rx_obj.timestamp)
# with legacy behavior rx_obj data timestamp should equal tx_obj
self.assertEqual(rx_obj.data_timestamp, tx_obj.data_timestamp)
# tx meta file should not have been sync'd to rx data file
self.assertNotIn('X-Object-Meta-Test', rx_obj.get_metadata())
if __name__ == '__main__':
unittest.main()

View File

@ -13,8 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import contextlib
import hashlib
import os
import shutil
import tempfile
@ -27,7 +25,7 @@ import six
from swift.common import bufferedhttp
from swift.common import exceptions
from swift.common import swob
from swift.common.storage_policy import POLICIES, REPL_POLICY
from swift.common.storage_policy import POLICIES
from swift.common import utils
from swift.common.swob import HTTPException
from swift.obj import diskfile
@ -367,17 +365,12 @@ class TestReceiver(unittest.TestCase):
self.assertFalse(mocked_replication_semaphore.release.called)
def test_SSYNC_mount_check(self):
with contextlib.nested(
mock.patch.object(
self.controller, 'replication_semaphore'),
with mock.patch.object(self.controller, 'replication_semaphore'), \
mock.patch.object(
self.controller._diskfile_router[POLICIES.legacy],
'mount_check', False),
'mount_check', False), \
mock.patch('swift.obj.diskfile.check_mount',
return_value=False)) as (
mocked_replication_semaphore,
mocked_mount_check,
mocked_check_mount):
return_value=False) as mocked_check_mount:
req = swob.Request.blank(
'/device/partition', environ={'REQUEST_METHOD': 'SSYNC'})
resp = req.get_response(self.controller)
@ -387,17 +380,12 @@ class TestReceiver(unittest.TestCase):
self.assertEqual(resp.status_int, 200)
self.assertFalse(mocked_check_mount.called)
with contextlib.nested(
mock.patch.object(
self.controller, 'replication_semaphore'),
with mock.patch.object(self.controller, 'replication_semaphore'), \
mock.patch.object(
self.controller._diskfile_router[POLICIES.legacy],
'mount_check', True),
'mount_check', True), \
mock.patch('swift.obj.diskfile.check_mount',
return_value=False)) as (
mocked_replication_semaphore,
mocked_mount_check,
mocked_check_mount):
return_value=False) as mocked_check_mount:
req = swob.Request.blank(
'/device/partition', environ={'REQUEST_METHOD': 'SSYNC'})
resp = req.get_response(self.controller)
@ -932,13 +920,11 @@ class TestReceiver(unittest.TestCase):
return self.mock_socket
self.controller.client_timeout = 0.01
with contextlib.nested(
mock.patch.object(
ssync_receiver.eventlet.greenio, 'shutdown_safe'),
with mock.patch.object(ssync_receiver.eventlet.greenio,
'shutdown_safe') as mock_shutdown_safe, \
mock.patch.object(
self.controller, 'DELETE',
return_value=swob.HTTPNoContent())) as (
mock_shutdown_safe, mock_delete):
return_value=swob.HTTPNoContent()):
req = swob.Request.blank(
'/device/partition',
environ={'REQUEST_METHOD': 'SSYNC'},
@ -1584,10 +1570,9 @@ class TestReceiver(unittest.TestCase):
_requests.append(request)
return swob.HTTPNoContent()
with contextlib.nested(
mock.patch.object(self.controller, 'PUT', _PUT),
mock.patch.object(self.controller, 'POST', _POST),
mock.patch.object(self.controller, 'DELETE', _DELETE)):
with mock.patch.object(self.controller, 'PUT', _PUT), \
mock.patch.object(self.controller, 'POST', _POST), \
mock.patch.object(self.controller, 'DELETE', _DELETE):
self.controller.logger = mock.MagicMock()
req = swob.Request.blank(
'/device/partition',
@ -1823,51 +1808,28 @@ class TestSsyncRxServer(unittest.TestCase):
# server socket.
def setUp(self):
self.ts = unit.make_timestamp_iter()
self.rx_ip = '127.0.0.1'
# dirs
self.tmpdir = tempfile.mkdtemp()
self.tempdir = os.path.join(self.tmpdir, 'tmp_test_obj_server')
self.rx_devices = os.path.join(self.tempdir, 'rx/node')
self.tx_devices = os.path.join(self.tempdir, 'tx/node')
self.devices = os.path.join(self.tempdir, 'srv/node')
for device in ('sda1', 'sdb1'):
for root in (self.rx_devices, self.tx_devices):
os.makedirs(os.path.join(root, device))
os.makedirs(os.path.join(self.devices, device))
self.conf = {
'devices': self.rx_devices,
'devices': self.devices,
'swift_dir': self.tempdir,
'mount_check': False,
}
self.rx_logger = debug_logger('test-object-server')
self.rx_app = server.ObjectController(self.conf, logger=self.rx_logger)
rx_server = server.ObjectController(self.conf, logger=self.rx_logger)
self.sock = eventlet.listen((self.rx_ip, 0))
self.rx_server = eventlet.spawn(
eventlet.wsgi.server, self.sock, self.rx_app, utils.NullLogger())
eventlet.wsgi.server, self.sock, rx_server, utils.NullLogger())
self.rx_port = self.sock.getsockname()[1]
self.tx_logger = debug_logger('test-daemon')
self.policy = POLICIES[0]
self.conf['devices'] = self.tx_devices
self.tx_logger = debug_logger('test-reconstructor')
self.daemon = ObjectReconstructor(self.conf, self.tx_logger)
self.daemon._diskfile_mgr = self.daemon._df_router[self.policy]
self.nodes = [
{
'device': 'sda1',
'ip': '127.0.0.1',
'replication_ip': '127.0.0.1',
'port': self.rx_port,
'replication_port': self.rx_port,
},
{
'device': 'sdb1',
'ip': '127.0.0.1',
'replication_ip': '127.0.0.1',
'port': self.rx_port,
'replication_port': self.rx_port,
},
]
self.daemon._diskfile_mgr = self.daemon._df_router[POLICIES[0]]
def tearDown(self):
self.rx_server.kill()
@ -1940,89 +1902,6 @@ class TestSsyncRxServer(unittest.TestCase):
# sanity check that the receiver did not proceed to missing_check
self.assertFalse(mock_missing_check.called)
def test_sender_job_missing_frag_node_indexes(self):
# replication jobs don't send frag_index, so we'll use a REPL_POLICY
repl_policy = POLICIES[1]
self.assertEqual(repl_policy.policy_type, REPL_POLICY)
repl_mgr = self.daemon._df_router[repl_policy]
self.daemon._diskfile_mgr = repl_mgr
device = self.nodes[0]['device']
# create a replicated object, on sender
df = repl_mgr.get_diskfile(device, '0', 'a', 'c', 'o',
policy=repl_policy)
now = next(self.ts)
metadata = {
'X-Timestamp': now.internal,
'Content-Type': 'text/plain',
'Content-Length': '0',
'ETag': hashlib.md5('').hexdigest(),
}
with df.create() as writer:
writer.write('')
writer.put(metadata)
# sanity the object is on the sender
self.assertTrue(df._datadir.startswith(self.tx_devices))
# setup a ssync job
suffix = os.path.basename(os.path.dirname(df._datadir))
job = {
'partition': 0,
'policy': repl_policy,
'device': device,
}
sender = ssync_sender.Sender(
self.daemon, self.nodes[0], job, [suffix])
success, _ = sender()
self.assertTrue(success)
# sanity object is synced to receiver
remote_df = self.rx_app._diskfile_router[repl_policy].get_diskfile(
device, '0', 'a', 'c', 'o', policy=repl_policy)
self.assertTrue(remote_df._datadir.startswith(self.rx_devices))
self.assertEqual(remote_df.read_metadata(), metadata)
def test_send_frag_index_none(self):
# create an ec fragment on the remote node
device = self.nodes[1]['device']
remote_df = self.rx_app._diskfile_router[self.policy].get_diskfile(
device, '1', 'a', 'c', 'o', policy=self.policy)
ts1 = next(self.ts)
data = 'frag_archive'
metadata = {
'ETag': hashlib.md5(data).hexdigest(),
'X-Timestamp': ts1.internal,
'Content-Length': len(data),
'X-Object-Sysmeta-Ec-Frag-Index': '3',
}
with remote_df.create() as writer:
writer.write(data)
writer.put(metadata)
writer.commit(ts1)
# create a tombstone on the local node
df = self.daemon._df_router[self.policy].get_diskfile(
device, '1', 'a', 'c', 'o', policy=self.policy)
suffix = os.path.basename(os.path.dirname(df._datadir))
ts2 = next(self.ts)
df.delete(ts2)
# a reconstructor revert job with only tombstones will have frag_index
# explicitly set to None
job = {
'frag_index': None,
'partition': 1,
'policy': self.policy,
'device': device,
}
sender = ssync_sender.Sender(
self.daemon, self.nodes[1], job, [suffix])
success, _ = sender()
self.assertTrue(success)
# diskfile tombstone synced to receiver's datadir with timestamp
self.assertTrue(remote_df._datadir.startswith(self.rx_devices))
try:
remote_df.read_metadata()
except exceptions.DiskFileDeleted as e:
self.assertEqual(e.timestamp, ts2)
else:
self.fail('Successfully opened remote DiskFile')
def test_bad_request_invalid_frag_index(self):
with mock.patch('swift.obj.ssync_receiver.Receiver.missing_check')\
as mock_missing_check:

View File

@ -12,47 +12,21 @@
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from collections import defaultdict
import hashlib
import os
import shutil
import tempfile
import time
import unittest
import eventlet
import itertools
import mock
import six
from six.moves import urllib
from swift.common import exceptions, utils
from swift.common.storage_policy import POLICIES
from swift.common.exceptions import DiskFileNotExist, DiskFileError, \
DiskFileDeleted
from swift.common.utils import Timestamp
from swift.obj import ssync_sender, diskfile, server, ssync_receiver
from swift.obj.reconstructor import RebuildingECDiskFileStream
from swift.obj import ssync_sender, diskfile, ssync_receiver
from test.unit import debug_logger, patch_policies, make_timestamp_iter
class FakeReplicator(object):
def __init__(self, testdir, policy=None):
self.logger = debug_logger('test-ssync-sender')
self.conn_timeout = 1
self.node_timeout = 2
self.http_timeout = 3
self.network_chunk_size = 65536
self.disk_chunk_size = 4096
conf = {
'devices': testdir,
'mount_check': 'false',
}
policy = POLICIES.default if policy is None else policy
self._diskfile_router = diskfile.DiskFileRouter(conf, self.logger)
self._diskfile_mgr = self._diskfile_router[policy]
from test.unit import patch_policies, make_timestamp_iter
from test.unit.obj.common import FakeReplicator, BaseTest
class NullBufferedHTTPConnection(object):
@ -105,49 +79,16 @@ class FakeConnection(object):
self.closed = True
class BaseTestSender(unittest.TestCase):
@patch_policies()
class TestSender(BaseTest):
def setUp(self):
self.tmpdir = tempfile.mkdtemp()
super(TestSender, self).setUp()
self.testdir = os.path.join(self.tmpdir, 'tmp_test_ssync_sender')
utils.mkdirs(os.path.join(self.testdir, 'dev'))
self.daemon = FakeReplicator(self.testdir)
self.sender = ssync_sender.Sender(self.daemon, None, None, None)
def tearDown(self):
shutil.rmtree(self.tmpdir, ignore_errors=True)
def _make_open_diskfile(self, device='dev', partition='9',
account='a', container='c', obj='o', body='test',
extra_metadata=None, policy=None,
frag_index=None, timestamp=None, df_mgr=None):
policy = policy or POLICIES.legacy
object_parts = account, container, obj
timestamp = Timestamp(time.time()) if timestamp is None else timestamp
if df_mgr is None:
df_mgr = self.daemon._diskfile_router[policy]
df = df_mgr.get_diskfile(
device, partition, *object_parts, policy=policy,
frag_index=frag_index)
content_length = len(body)
etag = hashlib.md5(body).hexdigest()
with df.create() as writer:
writer.write(body)
metadata = {
'X-Timestamp': timestamp.internal,
'Content-Length': str(content_length),
'ETag': etag,
}
if extra_metadata:
metadata.update(extra_metadata)
writer.put(metadata)
writer.commit(timestamp)
df.open()
return df
@patch_policies()
class TestSender(BaseTestSender):
def test_call_catches_MessageTimeout(self):
def connect(self):
@ -1598,826 +1539,6 @@ class TestSender(BaseTestSender):
self.assertTrue(self.sender.connection.closed)
class TestBaseSsync(BaseTestSender):
"""
Provides a framework to test end to end interactions between sender and
receiver. The basis for each test is actual diskfile state on either side.
The connection between sender and receiver is wrapped to capture ssync
traffic for subsequent verification of the protocol. Assertions are made
about the final state of the sender and receiver diskfiles.
"""
def make_connect_wrapper(self, sender):
"""
Make a wrapper function for the ssync_sender.Sender.connect() method
that will in turn wrap the HTTConnection.send() and the
Sender.readline() so that ssync protocol messages can be captured.
"""
orig_connect = sender.connect
trace = dict(messages=[])
def add_trace(type, msg):
# record a protocol event for later analysis
if msg.strip():
trace['messages'].append((type, msg.strip()))
def make_send_wrapper(send):
def wrapped_send(msg):
_msg = msg.split('\r\n', 1)[1]
_msg = _msg.rsplit('\r\n', 1)[0]
add_trace('tx', _msg)
send(msg)
return wrapped_send
def make_readline_wrapper(readline):
def wrapped_readline():
data = readline()
add_trace('rx', data)
bytes_read = trace.setdefault('readline_bytes', 0)
trace['readline_bytes'] = bytes_read + len(data)
return data
return wrapped_readline
def wrapped_connect():
orig_connect()
sender.connection.send = make_send_wrapper(
sender.connection.send)
sender.readline = make_readline_wrapper(sender.readline)
return wrapped_connect, trace
def setUp(self):
self.device = 'dev'
self.partition = '9'
self.tmpdir = tempfile.mkdtemp()
# sender side setup
self.tx_testdir = os.path.join(self.tmpdir, 'tmp_test_ssync_sender')
utils.mkdirs(os.path.join(self.tx_testdir, self.device))
self.daemon = FakeReplicator(self.tx_testdir)
# rx side setup
self.rx_testdir = os.path.join(self.tmpdir, 'tmp_test_ssync_receiver')
utils.mkdirs(os.path.join(self.rx_testdir, self.device))
conf = {
'devices': self.rx_testdir,
'mount_check': 'false',
'replication_one_per_device': 'false',
'log_requests': 'false'}
self.rx_controller = server.ObjectController(conf)
self.ts_iter = (Timestamp(t)
for t in itertools.count(int(time.time())))
self.rx_ip = '127.0.0.1'
sock = eventlet.listen((self.rx_ip, 0))
self.rx_server = eventlet.spawn(
eventlet.wsgi.server, sock, self.rx_controller, utils.NullLogger())
self.rx_port = sock.getsockname()[1]
self.rx_node = {'replication_ip': self.rx_ip,
'replication_port': self.rx_port,
'device': self.device}
def tearDown(self):
self.rx_server.kill()
shutil.rmtree(self.tmpdir, ignore_errors=True)
def _create_ondisk_files(self, df_mgr, obj_name, policy, timestamp,
frag_indexes=None):
frag_indexes = [None] if frag_indexes is None else frag_indexes
metadata = {'Content-Type': 'plain/text'}
diskfiles = []
for frag_index in frag_indexes:
object_data = '/a/c/%s___%s' % (obj_name, frag_index)
if frag_index is not None:
metadata['X-Object-Sysmeta-Ec-Frag-Index'] = str(frag_index)
df = self._make_open_diskfile(
device=self.device, partition=self.partition, account='a',
container='c', obj=obj_name, body=object_data,
extra_metadata=metadata, timestamp=timestamp, policy=policy,
frag_index=frag_index, df_mgr=df_mgr)
# sanity checks
listing = os.listdir(df._datadir)
self.assertTrue(listing)
for filename in listing:
self.assertTrue(filename.startswith(timestamp.internal))
diskfiles.append(df)
return diskfiles
def _open_tx_diskfile(self, obj_name, policy, frag_index=None):
df_mgr = self.daemon._diskfile_router[policy]
df = df_mgr.get_diskfile(
self.device, self.partition, account='a', container='c',
obj=obj_name, policy=policy, frag_index=frag_index)
df.open()
return df
def _open_rx_diskfile(self, obj_name, policy, frag_index=None):
df = self.rx_controller.get_diskfile(
self.device, self.partition, 'a', 'c', obj_name, policy=policy,
frag_index=frag_index)
df.open()
return df
def _verify_diskfile_sync(self, tx_df, rx_df, frag_index, same_etag=False):
# verify that diskfiles' metadata match
# sanity check, they are not the same ondisk files!
self.assertNotEqual(tx_df._datadir, rx_df._datadir)
rx_metadata = dict(rx_df.get_metadata())
for k, v in tx_df.get_metadata().items():
if k == 'X-Object-Sysmeta-Ec-Frag-Index':
# if tx_df had a frag_index then rx_df should also have one
self.assertTrue(k in rx_metadata)
self.assertEqual(frag_index, int(rx_metadata.pop(k)))
elif k == 'ETag' and not same_etag:
self.assertNotEqual(v, rx_metadata.pop(k, None))
continue
else:
self.assertEqual(v, rx_metadata.pop(k), k)
self.assertFalse(rx_metadata)
expected_body = '%s___%s' % (tx_df._name, frag_index)
actual_body = ''.join([chunk for chunk in rx_df.reader()])
self.assertEqual(expected_body, actual_body)
def _analyze_trace(self, trace):
"""
Parse protocol trace captured by fake connection, making some
assertions along the way, and return results as a dict of form:
results = {'tx_missing': <list of messages>,
'rx_missing': <list of messages>,
'tx_updates': <list of subreqs>,
'rx_updates': <list of messages>}
Each subreq is a dict with keys: 'method', 'path', 'headers', 'body'
"""
def tx_missing(results, line):
self.assertEqual('tx', line[0])
results['tx_missing'].append(line[1])
def rx_missing(results, line):
self.assertEqual('rx', line[0])
parts = line[1].split('\r\n')
for part in parts:
results['rx_missing'].append(part)
def tx_updates(results, line):
self.assertEqual('tx', line[0])
subrequests = results['tx_updates']
if line[1].startswith(('PUT', 'DELETE', 'POST')):
parts = line[1].split('\r\n')
method, path = parts[0].split()
subreq = {'method': method, 'path': path, 'req': line[1],
'headers': parts[1:]}
subrequests.append(subreq)
else:
self.assertTrue(subrequests)
body = (subrequests[-1]).setdefault('body', '')
body += line[1]
subrequests[-1]['body'] = body
def rx_updates(results, line):
self.assertEqual('rx', line[0])
results.setdefault['rx_updates'].append(line[1])
def unexpected(results, line):
results.setdefault('unexpected', []).append(line)
# each trace line is a tuple of ([tx|rx], msg)
handshakes = iter([(('tx', ':MISSING_CHECK: START'), tx_missing),
(('tx', ':MISSING_CHECK: END'), unexpected),
(('rx', ':MISSING_CHECK: START'), rx_missing),
(('rx', ':MISSING_CHECK: END'), unexpected),
(('tx', ':UPDATES: START'), tx_updates),
(('tx', ':UPDATES: END'), unexpected),
(('rx', ':UPDATES: START'), rx_updates),
(('rx', ':UPDATES: END'), unexpected)])
expect_handshake = next(handshakes)
phases = ('tx_missing', 'rx_missing', 'tx_updates', 'rx_updates')
results = dict((k, []) for k in phases)
handler = unexpected
lines = list(trace.get('messages', []))
lines.reverse()
while lines:
line = lines.pop()
if line == expect_handshake[0]:
handler = expect_handshake[1]
try:
expect_handshake = next(handshakes)
except StopIteration:
# should be the last line
self.assertFalse(
lines, 'Unexpected trailing lines %s' % lines)
continue
handler(results, line)
try:
# check all handshakes occurred
missed = next(handshakes)
self.fail('Handshake %s not found' % str(missed[0]))
except StopIteration:
pass
# check no message outside of a phase
self.assertFalse(results.get('unexpected'),
'Message outside of a phase: %s' % results.get(None))
return results
def _verify_ondisk_files(self, tx_objs, policy, tx_frag_index=None,
rx_frag_index=None):
"""
Verify tx and rx files that should be in sync.
:param tx_objs: sender diskfiles
:param policy: storage policy instance
:param tx_frag_index: the fragment index of tx diskfiles that should
have been used as a source for sync'ing
:param rx_frag_index: the fragment index of expected rx diskfiles
"""
for o_name, diskfiles in tx_objs.items():
for tx_df in diskfiles:
# check tx file still intact - ssync does not do any cleanup!
tx_df.open()
if tx_frag_index is None or tx_df._frag_index == tx_frag_index:
# this diskfile should have been sync'd,
# check rx file is ok
rx_df = self._open_rx_diskfile(
o_name, policy, rx_frag_index)
# for EC revert job or replication etags should match
match_etag = (tx_frag_index == rx_frag_index)
self._verify_diskfile_sync(
tx_df, rx_df, rx_frag_index, match_etag)
else:
# this diskfile should not have been sync'd,
# check no rx file,
self.assertRaises(DiskFileNotExist, self._open_rx_diskfile,
o_name, policy,
frag_index=tx_df._frag_index)
def _verify_tombstones(self, tx_objs, policy):
# verify tx and rx tombstones that should be in sync
for o_name, diskfiles in tx_objs.items():
try:
self._open_tx_diskfile(o_name, policy)
self.fail('DiskFileDeleted expected')
except DiskFileDeleted as exc:
tx_delete_time = exc.timestamp
try:
self._open_rx_diskfile(o_name, policy)
self.fail('DiskFileDeleted expected')
except DiskFileDeleted as exc:
rx_delete_time = exc.timestamp
self.assertEqual(tx_delete_time, rx_delete_time)
@patch_policies(with_ec_default=True)
class TestSsyncEC(TestBaseSsync):
def test_handoff_fragment_revert(self):
# test that a sync_revert type job does send the correct frag archives
# to the receiver
policy = POLICIES.default
rx_node_index = 0
tx_node_index = 1
# for a revert job we iterate over frag index that belongs on
# remote node
frag_index = rx_node_index
# create sender side diskfiles...
tx_objs = {}
rx_objs = {}
tx_tombstones = {}
tx_df_mgr = self.daemon._diskfile_router[policy]
rx_df_mgr = self.rx_controller._diskfile_router[policy]
# o1 has primary and handoff fragment archives
t1 = next(self.ts_iter)
tx_objs['o1'] = self._create_ondisk_files(
tx_df_mgr, 'o1', policy, t1, (rx_node_index, tx_node_index))
# o2 only has primary
t2 = next(self.ts_iter)
tx_objs['o2'] = self._create_ondisk_files(
tx_df_mgr, 'o2', policy, t2, (tx_node_index,))
# o3 only has handoff
t3 = next(self.ts_iter)
tx_objs['o3'] = self._create_ondisk_files(
tx_df_mgr, 'o3', policy, t3, (rx_node_index,))
# o4 primary and handoff fragment archives on tx, handoff in sync on rx
t4 = next(self.ts_iter)
tx_objs['o4'] = self._create_ondisk_files(
tx_df_mgr, 'o4', policy, t4, (tx_node_index, rx_node_index,))
rx_objs['o4'] = self._create_ondisk_files(
rx_df_mgr, 'o4', policy, t4, (rx_node_index,))
# o5 is a tombstone, missing on receiver
t5 = next(self.ts_iter)
tx_tombstones['o5'] = self._create_ondisk_files(
tx_df_mgr, 'o5', policy, t5, (tx_node_index,))
tx_tombstones['o5'][0].delete(t5)
suffixes = set()
for diskfiles in (tx_objs.values() + tx_tombstones.values()):
for df in diskfiles:
suffixes.add(os.path.basename(os.path.dirname(df._datadir)))
# create ssync sender instance...
job = {'device': self.device,
'partition': self.partition,
'policy': policy,
'frag_index': frag_index}
node = dict(self.rx_node)
node.update({'index': rx_node_index})
sender = ssync_sender.Sender(self.daemon, node, job, suffixes)
# wrap connection from tx to rx to capture ssync messages...
sender.connect, trace = self.make_connect_wrapper(sender)
# run the sync protocol...
sender()
# verify protocol
results = self._analyze_trace(trace)
# sender has handoff frags for o1, o3 and o4 and ts for o5
self.assertEqual(4, len(results['tx_missing']))
# receiver is missing frags for o1, o3 and ts for o5
self.assertEqual(3, len(results['rx_missing']))
self.assertEqual(3, len(results['tx_updates']))
self.assertFalse(results['rx_updates'])
sync_paths = []
for subreq in results.get('tx_updates'):
if subreq.get('method') == 'PUT':
self.assertTrue(
'X-Object-Sysmeta-Ec-Frag-Index: %s' % rx_node_index
in subreq.get('headers'))
expected_body = '%s___%s' % (subreq['path'], rx_node_index)
self.assertEqual(expected_body, subreq['body'])
elif subreq.get('method') == 'DELETE':
self.assertEqual('/a/c/o5', subreq['path'])
sync_paths.append(subreq.get('path'))
self.assertEqual(['/a/c/o1', '/a/c/o3', '/a/c/o5'], sorted(sync_paths))
# verify on disk files...
self._verify_ondisk_files(
tx_objs, policy, frag_index, rx_node_index)
self._verify_tombstones(tx_tombstones, policy)
def test_fragment_sync(self):
# check that a sync_only type job does call reconstructor to build a
# diskfile to send, and continues making progress despite an error
# when building one diskfile
policy = POLICIES.default
rx_node_index = 0
tx_node_index = 1
# for a sync job we iterate over frag index that belongs on local node
frag_index = tx_node_index
# create sender side diskfiles...
tx_objs = {}
tx_tombstones = {}
rx_objs = {}
tx_df_mgr = self.daemon._diskfile_router[policy]
rx_df_mgr = self.rx_controller._diskfile_router[policy]
# o1 only has primary
t1 = next(self.ts_iter)
tx_objs['o1'] = self._create_ondisk_files(
tx_df_mgr, 'o1', policy, t1, (tx_node_index,))
# o2 only has primary
t2 = next(self.ts_iter)
tx_objs['o2'] = self._create_ondisk_files(
tx_df_mgr, 'o2', policy, t2, (tx_node_index,))
# o3 only has primary
t3 = next(self.ts_iter)
tx_objs['o3'] = self._create_ondisk_files(
tx_df_mgr, 'o3', policy, t3, (tx_node_index,))
# o4 primary fragment archives on tx, handoff in sync on rx
t4 = next(self.ts_iter)
tx_objs['o4'] = self._create_ondisk_files(
tx_df_mgr, 'o4', policy, t4, (tx_node_index,))
rx_objs['o4'] = self._create_ondisk_files(
rx_df_mgr, 'o4', policy, t4, (rx_node_index,))
# o5 is a tombstone, missing on receiver
t5 = next(self.ts_iter)
tx_tombstones['o5'] = self._create_ondisk_files(
tx_df_mgr, 'o5', policy, t5, (tx_node_index,))
tx_tombstones['o5'][0].delete(t5)
suffixes = set()
for diskfiles in (tx_objs.values() + tx_tombstones.values()):
for df in diskfiles:
suffixes.add(os.path.basename(os.path.dirname(df._datadir)))
reconstruct_fa_calls = []
def fake_reconstruct_fa(job, node, metadata):
reconstruct_fa_calls.append((job, node, policy, metadata))
if len(reconstruct_fa_calls) == 2:
# simulate second reconstruct failing
raise DiskFileError
content = '%s___%s' % (metadata['name'], rx_node_index)
return RebuildingECDiskFileStream(
metadata, rx_node_index, iter([content]))
# create ssync sender instance...
job = {'device': self.device,
'partition': self.partition,
'policy': policy,
'frag_index': frag_index,
'sync_diskfile_builder': fake_reconstruct_fa}
node = dict(self.rx_node)
node.update({'index': rx_node_index})
sender = ssync_sender.Sender(self.daemon, node, job, suffixes)
# wrap connection from tx to rx to capture ssync messages...
sender.connect, trace = self.make_connect_wrapper(sender)
# run the sync protocol...
sender()
# verify protocol
results = self._analyze_trace(trace)
# sender has primary for o1, o2 and o3, o4 and ts for o5
self.assertEqual(5, len(results['tx_missing']))
# receiver is missing o1, o2 and o3 and ts for o5
self.assertEqual(4, len(results['rx_missing']))
# sender can only construct 2 out of 3 missing frags
self.assertEqual(3, len(results['tx_updates']))
self.assertEqual(3, len(reconstruct_fa_calls))
self.assertFalse(results['rx_updates'])
actual_sync_paths = []
for subreq in results.get('tx_updates'):
if subreq.get('method') == 'PUT':
self.assertTrue(
'X-Object-Sysmeta-Ec-Frag-Index: %s' % rx_node_index
in subreq.get('headers'))
expected_body = '%s___%s' % (subreq['path'], rx_node_index)
self.assertEqual(expected_body, subreq['body'])
elif subreq.get('method') == 'DELETE':
self.assertEqual('/a/c/o5', subreq['path'])
actual_sync_paths.append(subreq.get('path'))
# remove the failed df from expected synced df's
expect_sync_paths = ['/a/c/o1', '/a/c/o2', '/a/c/o3', '/a/c/o5']
failed_path = reconstruct_fa_calls[1][3]['name']
expect_sync_paths.remove(failed_path)
failed_obj = None
for obj, diskfiles in tx_objs.items():
if diskfiles[0]._name == failed_path:
failed_obj = obj
# sanity check
self.assertTrue(tx_objs.pop(failed_obj))
# verify on disk files...
self.assertEqual(sorted(expect_sync_paths), sorted(actual_sync_paths))
self._verify_ondisk_files(
tx_objs, policy, frag_index, rx_node_index)
self._verify_tombstones(tx_tombstones, policy)
@patch_policies
class TestSsyncReplication(TestBaseSsync):
def test_sync(self):
policy = POLICIES.default
rx_node_index = 0
# create sender side diskfiles...
tx_objs = {}
rx_objs = {}
tx_tombstones = {}
rx_tombstones = {}
tx_df_mgr = self.daemon._diskfile_router[policy]
rx_df_mgr = self.rx_controller._diskfile_router[policy]
# o1 and o2 are on tx only
t1 = next(self.ts_iter)
tx_objs['o1'] = self._create_ondisk_files(tx_df_mgr, 'o1', policy, t1)
t2 = next(self.ts_iter)
tx_objs['o2'] = self._create_ondisk_files(tx_df_mgr, 'o2', policy, t2)
# o3 is on tx and older copy on rx
t3a = next(self.ts_iter)
rx_objs['o3'] = self._create_ondisk_files(rx_df_mgr, 'o3', policy, t3a)
t3b = next(self.ts_iter)
tx_objs['o3'] = self._create_ondisk_files(tx_df_mgr, 'o3', policy, t3b)
# o4 in sync on rx and tx
t4 = next(self.ts_iter)
tx_objs['o4'] = self._create_ondisk_files(tx_df_mgr, 'o4', policy, t4)
rx_objs['o4'] = self._create_ondisk_files(rx_df_mgr, 'o4', policy, t4)
# o5 is a tombstone, missing on receiver
t5 = next(self.ts_iter)
tx_tombstones['o5'] = self._create_ondisk_files(
tx_df_mgr, 'o5', policy, t5)
tx_tombstones['o5'][0].delete(t5)
# o6 is a tombstone, in sync on tx and rx
t6 = next(self.ts_iter)
tx_tombstones['o6'] = self._create_ondisk_files(
tx_df_mgr, 'o6', policy, t6)
tx_tombstones['o6'][0].delete(t6)
rx_tombstones['o6'] = self._create_ondisk_files(
rx_df_mgr, 'o6', policy, t6)
rx_tombstones['o6'][0].delete(t6)
# o7 is a tombstone on tx, older data on rx
t7a = next(self.ts_iter)
rx_objs['o7'] = self._create_ondisk_files(rx_df_mgr, 'o7', policy, t7a)
t7b = next(self.ts_iter)
tx_tombstones['o7'] = self._create_ondisk_files(
tx_df_mgr, 'o7', policy, t7b)
tx_tombstones['o7'][0].delete(t7b)
suffixes = set()
for diskfiles in (tx_objs.values() + tx_tombstones.values()):
for df in diskfiles:
suffixes.add(os.path.basename(os.path.dirname(df._datadir)))
# create ssync sender instance...
job = {'device': self.device,
'partition': self.partition,
'policy': policy}
node = dict(self.rx_node)
node.update({'index': rx_node_index})
sender = ssync_sender.Sender(self.daemon, node, job, suffixes)
# wrap connection from tx to rx to capture ssync messages...
sender.connect, trace = self.make_connect_wrapper(sender)
# run the sync protocol...
success, in_sync_objs = sender()
self.assertEqual(7, len(in_sync_objs), trace['messages'])
self.assertTrue(success)
# verify protocol
results = self._analyze_trace(trace)
self.assertEqual(7, len(results['tx_missing']))
self.assertEqual(5, len(results['rx_missing']))
self.assertEqual(5, len(results['tx_updates']))
self.assertFalse(results['rx_updates'])
sync_paths = []
for subreq in results.get('tx_updates'):
if subreq.get('method') == 'PUT':
self.assertTrue(
subreq['path'] in ('/a/c/o1', '/a/c/o2', '/a/c/o3'))
expected_body = '%s___None' % subreq['path']
self.assertEqual(expected_body, subreq['body'])
elif subreq.get('method') == 'DELETE':
self.assertTrue(subreq['path'] in ('/a/c/o5', '/a/c/o7'))
sync_paths.append(subreq.get('path'))
self.assertEqual(
['/a/c/o1', '/a/c/o2', '/a/c/o3', '/a/c/o5', '/a/c/o7'],
sorted(sync_paths))
# verify on disk files...
self._verify_ondisk_files(tx_objs, policy)
self._verify_tombstones(tx_tombstones, policy)
def test_nothing_to_sync(self):
job = {'device': self.device,
'partition': self.partition,
'policy': POLICIES.default}
node = {'replication_ip': self.rx_ip,
'replication_port': self.rx_port,
'device': self.device,
'index': 0}
sender = ssync_sender.Sender(self.daemon, node, job, ['abc'])
# wrap connection from tx to rx to capture ssync messages...
sender.connect, trace = self.make_connect_wrapper(sender)
result, in_sync_objs = sender()
self.assertTrue(result)
self.assertFalse(in_sync_objs)
results = self._analyze_trace(trace)
self.assertFalse(results['tx_missing'])
self.assertFalse(results['rx_missing'])
self.assertFalse(results['tx_updates'])
self.assertFalse(results['rx_updates'])
# Minimal receiver response as read by sender:
# 2 <-- initial \r\n to start ssync exchange
# + 23 <-- :MISSING CHECK START\r\n
# + 2 <-- \r\n (minimal missing check response)
# + 21 <-- :MISSING CHECK END\r\n
# + 17 <-- :UPDATES START\r\n
# + 15 <-- :UPDATES END\r\n
# TOTAL = 80
self.assertEqual(80, trace.get('readline_bytes'))
def test_meta_file_sync(self):
policy = POLICIES.default
rx_node_index = 0
# create diskfiles...
tx_objs = {}
rx_objs = {}
tx_tombstones = {}
rx_tombstones = {}
tx_df_mgr = self.daemon._diskfile_router[policy]
rx_df_mgr = self.rx_controller._diskfile_router[policy]
expected_subreqs = defaultdict(list)
# o1 on tx only with meta file
t1 = next(self.ts_iter)
tx_objs['o1'] = self._create_ondisk_files(tx_df_mgr, 'o1', policy, t1)
t1_meta = next(self.ts_iter)
metadata = {'X-Timestamp': t1_meta.internal,
'X-Object-Meta-Test': 'o1',
'X-Object-Sysmeta-Test': 'sys_o1'}
tx_objs['o1'][0].write_metadata(metadata)
expected_subreqs['PUT'].append('o1')
expected_subreqs['POST'].append('o1')
# o2 on tx with meta, on rx without meta
t2 = next(self.ts_iter)
tx_objs['o2'] = self._create_ondisk_files(tx_df_mgr, 'o2', policy, t2)
t2_meta = next(self.ts_iter)
metadata = {'X-Timestamp': t2_meta.internal,
'X-Object-Meta-Test': 'o2',
'X-Object-Sysmeta-Test': 'sys_o2'}
tx_objs['o2'][0].write_metadata(metadata)
rx_objs['o2'] = self._create_ondisk_files(rx_df_mgr, 'o2', policy, t2)
expected_subreqs['POST'].append('o2')
# o3 is on tx with meta, rx has newer data but no meta
t3a = next(self.ts_iter)
tx_objs['o3'] = self._create_ondisk_files(tx_df_mgr, 'o3', policy, t3a)
t3b = next(self.ts_iter)
rx_objs['o3'] = self._create_ondisk_files(rx_df_mgr, 'o3', policy, t3b)
t3_meta = next(self.ts_iter)
metadata = {'X-Timestamp': t3_meta.internal,
'X-Object-Meta-Test': 'o3',
'X-Object-Sysmeta-Test': 'sys_o3'}
tx_objs['o3'][0].write_metadata(metadata)
expected_subreqs['POST'].append('o3')
# o4 is on tx with meta, rx has older data and up to date meta
t4a = next(self.ts_iter)
rx_objs['o4'] = self._create_ondisk_files(rx_df_mgr, 'o4', policy, t4a)
t4b = next(self.ts_iter)
tx_objs['o4'] = self._create_ondisk_files(tx_df_mgr, 'o4', policy, t4b)
t4_meta = next(self.ts_iter)
metadata = {'X-Timestamp': t4_meta.internal,
'X-Object-Meta-Test': 'o4',
'X-Object-Sysmeta-Test': 'sys_o4'}
tx_objs['o4'][0].write_metadata(metadata)
rx_objs['o4'][0].write_metadata(metadata)
expected_subreqs['PUT'].append('o4')
# o5 is on tx with meta, rx is in sync with data and meta
t5 = next(self.ts_iter)
rx_objs['o5'] = self._create_ondisk_files(rx_df_mgr, 'o5', policy, t5)
tx_objs['o5'] = self._create_ondisk_files(tx_df_mgr, 'o5', policy, t5)
t5_meta = next(self.ts_iter)
metadata = {'X-Timestamp': t5_meta.internal,
'X-Object-Meta-Test': 'o5',
'X-Object-Sysmeta-Test': 'sys_o5'}
tx_objs['o5'][0].write_metadata(metadata)
rx_objs['o5'][0].write_metadata(metadata)
# o6 is tombstone on tx, rx has older data and meta
t6 = next(self.ts_iter)
tx_tombstones['o6'] = self._create_ondisk_files(
tx_df_mgr, 'o6', policy, t6)
rx_tombstones['o6'] = self._create_ondisk_files(
rx_df_mgr, 'o6', policy, t6)
metadata = {'X-Timestamp': next(self.ts_iter).internal,
'X-Object-Meta-Test': 'o6',
'X-Object-Sysmeta-Test': 'sys_o6'}
rx_tombstones['o6'][0].write_metadata(metadata)
tx_tombstones['o6'][0].delete(next(self.ts_iter))
expected_subreqs['DELETE'].append('o6')
# o7 is tombstone on rx, tx has older data and meta,
# no subreqs expected...
t7 = next(self.ts_iter)
tx_objs['o7'] = self._create_ondisk_files(tx_df_mgr, 'o7', policy, t7)
rx_tombstones['o7'] = self._create_ondisk_files(
rx_df_mgr, 'o7', policy, t7)
metadata = {'X-Timestamp': next(self.ts_iter).internal,
'X-Object-Meta-Test': 'o7',
'X-Object-Sysmeta-Test': 'sys_o7'}
tx_objs['o7'][0].write_metadata(metadata)
rx_tombstones['o7'][0].delete(next(self.ts_iter))
suffixes = set()
for diskfiles in (tx_objs.values() + tx_tombstones.values()):
for df in diskfiles:
suffixes.add(os.path.basename(os.path.dirname(df._datadir)))
# create ssync sender instance...
job = {'device': self.device,
'partition': self.partition,
'policy': policy}
node = dict(self.rx_node)
node.update({'index': rx_node_index})
sender = ssync_sender.Sender(self.daemon, node, job, suffixes)
# wrap connection from tx to rx to capture ssync messages...
sender.connect, trace = self.make_connect_wrapper(sender)
# run the sync protocol...
success, in_sync_objs = sender()
self.assertEqual(7, len(in_sync_objs))
self.assertTrue(success)
# verify protocol
results = self._analyze_trace(trace)
self.assertEqual(7, len(results['tx_missing']))
self.assertEqual(5, len(results['rx_missing']))
for subreq in results.get('tx_updates'):
obj = subreq['path'].split('/')[3]
method = subreq['method']
self.assertTrue(obj in expected_subreqs[method],
'Unexpected %s subreq for object %s, expected %s'
% (method, obj, expected_subreqs[method]))
expected_subreqs[method].remove(obj)
if method == 'PUT':
expected_body = '%s___None' % subreq['path']
self.assertEqual(expected_body, subreq['body'])
# verify all expected subreqs consumed
for _method, expected in expected_subreqs.items():
self.assertFalse(expected)
self.assertFalse(results['rx_updates'])
# verify on disk files...
del tx_objs['o7'] # o7 not expected to be sync'd
self._verify_ondisk_files(tx_objs, policy)
self._verify_tombstones(tx_tombstones, policy)
for oname, rx_obj in rx_objs.items():
df = rx_obj[0].open()
metadata = df.get_metadata()
self.assertEqual(metadata['X-Object-Meta-Test'], oname)
self.assertEqual(metadata['X-Object-Sysmeta-Test'], 'sys_' + oname)
def test_meta_file_not_synced_to_legacy_receiver(self):
# verify that the sender does sync a data file to a legacy receiver,
# but does not PUT meta file content to a legacy receiver
policy = POLICIES.default
rx_node_index = 0
# create diskfiles...
tx_df_mgr = self.daemon._diskfile_router[policy]
rx_df_mgr = self.rx_controller._diskfile_router[policy]
# rx has data at t1 but no meta
# object is on tx with data at t2, meta at t3,
t1 = next(self.ts_iter)
self._create_ondisk_files(rx_df_mgr, 'o1', policy, t1)
t2 = next(self.ts_iter)
tx_obj = self._create_ondisk_files(tx_df_mgr, 'o1', policy, t2)[0]
t3 = next(self.ts_iter)
metadata = {'X-Timestamp': t3.internal,
'X-Object-Meta-Test': 'o3',
'X-Object-Sysmeta-Test': 'sys_o3'}
tx_obj.write_metadata(metadata)
suffixes = [os.path.basename(os.path.dirname(tx_obj._datadir))]
# create ssync sender instance...
job = {'device': self.device,
'partition': self.partition,
'policy': policy}
node = dict(self.rx_node)
node.update({'index': rx_node_index})
sender = ssync_sender.Sender(self.daemon, node, job, suffixes)
# wrap connection from tx to rx to capture ssync messages...
sender.connect, trace = self.make_connect_wrapper(sender)
def _legacy_check_missing(self, line):
# reproduces behavior of 'legacy' ssync receiver missing_checks()
parts = line.split()
object_hash = urllib.parse.unquote(parts[0])
timestamp = urllib.parse.unquote(parts[1])
want = False
try:
df = self.diskfile_mgr.get_diskfile_from_hash(
self.device, self.partition, object_hash, self.policy,
frag_index=self.frag_index)
except exceptions.DiskFileNotExist:
want = True
else:
try:
df.open()
except exceptions.DiskFileDeleted as err:
want = err.timestamp < timestamp
except exceptions.DiskFileError as err:
want = True
else:
want = df.timestamp < timestamp
if want:
return urllib.parse.quote(object_hash)
return None
# run the sync protocol...
func = 'swift.obj.ssync_receiver.Receiver._check_missing'
with mock.patch(func, _legacy_check_missing):
success, in_sync_objs = sender()
self.assertEqual(1, len(in_sync_objs))
self.assertTrue(success)
# verify protocol, expecting only a PUT to legacy receiver
results = self._analyze_trace(trace)
self.assertEqual(1, len(results['tx_missing']))
self.assertEqual(1, len(results['rx_missing']))
self.assertEqual(1, len(results['tx_updates']))
self.assertEqual('PUT', results['tx_updates'][0]['method'])
self.assertFalse(results['rx_updates'])
# verify on disk files...
rx_obj = self._open_rx_diskfile('o1', policy)
tx_obj = self._open_tx_diskfile('o1', policy)
# with legacy behavior rx_obj data and meta timestamps are equal
self.assertEqual(t2, rx_obj.data_timestamp)
self.assertEqual(t2, rx_obj.timestamp)
# with legacy behavior rx_obj data timestamp should equal tx_obj
self.assertEqual(rx_obj.data_timestamp, tx_obj.data_timestamp)
# tx meta file should not have been sync'd to rx data file
self.assertNotIn('X-Object-Meta-Test', rx_obj.get_metadata())
class TestModuleMethods(unittest.TestCase):
def test_encode_missing(self):
object_hash = '9d41d8cd98f00b204e9800998ecf0abc'
@ -2458,7 +1579,7 @@ class TestModuleMethods(unittest.TestCase):
expected = {'data': True, 'meta': True}
self.assertEqual(ssync_sender.decode_wanted(parts), expected)
# you don't really these next few...
# you don't really expect these next few...
parts = ['md']
expected = {'data': True, 'meta': True}
self.assertEqual(ssync_sender.decode_wanted(parts), expected)

View File

@ -84,13 +84,13 @@ class TestObjectUpdater(unittest.TestCase):
'swift_dir': self.testdir,
'interval': '1',
'concurrency': '2',
'node_timeout': '5'})
'node_timeout': '5.5'})
self.assertTrue(hasattr(cu, 'logger'))
self.assertTrue(cu.logger is not None)
self.assertEqual(cu.devices, self.devices_dir)
self.assertEqual(cu.interval, 1)
self.assertEqual(cu.concurrency, 2)
self.assertEqual(cu.node_timeout, 5)
self.assertEqual(cu.node_timeout, 5.5)
self.assertTrue(cu.get_container_ring() is not None)
@mock.patch('os.listdir')

View File

@ -452,6 +452,28 @@ class TestFuncs(unittest.TestCase):
resp = base.OPTIONS(req)
self.assertEqual(resp.status_int, 200)
def test_options_with_null_allow_origin(self):
base = Controller(self.app)
base.account_name = 'a'
base.container_name = 'c'
def my_container_info(*args):
return {
'cors': {
'allow_origin': '*',
}
}
base.container_info = my_container_info
req = Request.blank('/v1/a/c/o',
environ={'swift.cache': FakeCache()},
headers={'Origin': '*',
'Access-Control-Request-Method': 'GET'})
with patch('swift.proxy.controllers.base.'
'http_connect', fake_http_connect(200)):
resp = base.OPTIONS(req)
self.assertEqual(resp.status_int, 200)
def test_options_unauthorized(self):
base = Controller(self.app)
base.account_name = 'a'
@ -507,6 +529,16 @@ class TestFuncs(unittest.TestCase):
resp,
headers_to_container_info(headers.items(), 200))
def test_container_info_without_req(self):
base = Controller(self.app)
base.account_name = 'a'
base.container_name = 'c'
container_info = \
base.container_info(base.account_name,
base.container_name)
self.assertEqual(container_info['status'], 0)
def test_headers_to_account_info_missing(self):
resp = headers_to_account_info({}, 404)
self.assertEqual(resp['status'], 404)
@ -684,6 +716,19 @@ class TestFuncs(unittest.TestCase):
for k, v in bad_hdrs.items():
self.assertFalse(k.lower() in dst_headers)
def test_generate_request_headers_with_no_orig_req(self):
base = Controller(self.app)
src_headers = {'x-remove-base-meta-owner': 'x',
'x-base-meta-size': '151M',
'new-owner': 'Kun'}
dst_headers = base.generate_request_headers(None,
additional=src_headers)
expected_headers = {'x-base-meta-size': '151M',
'connection': 'close'}
for k, v in expected_headers.items():
self.assertDictContainsSubset(expected_headers, dst_headers)
self.assertEqual('', dst_headers['Referer'])
def test_client_chunk_size(self):
class TestSource(object):

View File

@ -30,7 +30,7 @@ from six import BytesIO
from six.moves import range
import swift
from swift.common import utils, swob
from swift.common import utils, swob, exceptions
from swift.proxy import server as proxy_server
from swift.proxy.controllers import obj
from swift.proxy.controllers.base import get_info as _real_get_info
@ -612,6 +612,66 @@ class TestReplicatedObjController(BaseObjectControllerMixin,
node_error_count(self.app, object_ring.devs[1]),
self.app.error_suppression_limit + 1)
def test_PUT_error_during_transfer_data(self):
class FakeReader(object):
def read(self, size):
raise exceptions.ChunkReadError(None)
req = swob.Request.blank('/v1/a/c/o.jpg', method='PUT',
body='test body')
req.environ['wsgi.input'] = FakeReader()
req.headers['content-length'] = '6'
with set_http_connect(201, 201, 201):
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 499)
def test_PUT_chunkreadtimeout_during_transfer_data(self):
class FakeReader(object):
def read(self, size):
raise exceptions.ChunkReadTimeout(None)
req = swob.Request.blank('/v1/a/c/o.jpg', method='PUT',
body='test body')
req.environ['wsgi.input'] = FakeReader()
req.headers['content-length'] = '6'
with set_http_connect(201, 201, 201):
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 408)
def test_PUT_timeout_during_transfer_data(self):
class FakeReader(object):
def read(self, size):
raise exceptions.Timeout(None)
req = swob.Request.blank('/v1/a/c/o.jpg', method='PUT',
body='test body')
req.environ['wsgi.input'] = FakeReader()
req.headers['content-length'] = '6'
with set_http_connect(201, 201, 201):
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 499)
def test_PUT_exception_during_transfer_data(self):
class FakeReader(object):
def read(self, size):
raise Exception
req = swob.Request.blank('/v1/a/c/o.jpg', method='PUT',
body='test body')
req.environ['wsgi.input'] = FakeReader()
req.headers['content-length'] = '6'
with set_http_connect(201, 201, 201):
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 500)
def test_GET_simple(self):
req = swift.common.swob.Request.blank('/v1/a/c/o')
with set_http_connect(200):
@ -1266,6 +1326,86 @@ class TestECObjController(BaseObjectControllerMixin, unittest.TestCase):
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 201)
def test_PUT_ec_error_during_transfer_data(self):
class FakeReader(object):
def read(self, size):
raise exceptions.ChunkReadError(None)
req = swob.Request.blank('/v1/a/c/o.jpg', method='PUT',
body='test body')
req.environ['wsgi.input'] = FakeReader()
req.headers['content-length'] = '6'
codes = [201] * self.replicas()
expect_headers = {
'X-Obj-Metadata-Footer': 'yes',
'X-Obj-Multiphase-Commit': 'yes'
}
with set_http_connect(*codes, expect_headers=expect_headers):
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 499)
def test_PUT_ec_chunkreadtimeout_during_transfer_data(self):
class FakeReader(object):
def read(self, size):
raise exceptions.ChunkReadTimeout(None)
req = swob.Request.blank('/v1/a/c/o.jpg', method='PUT',
body='test body')
req.environ['wsgi.input'] = FakeReader()
req.headers['content-length'] = '6'
codes = [201] * self.replicas()
expect_headers = {
'X-Obj-Metadata-Footer': 'yes',
'X-Obj-Multiphase-Commit': 'yes'
}
with set_http_connect(*codes, expect_headers=expect_headers):
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 408)
def test_PUT_ec_timeout_during_transfer_data(self):
class FakeReader(object):
def read(self, size):
raise exceptions.Timeout(None)
req = swob.Request.blank('/v1/a/c/o.jpg', method='PUT',
body='test body')
req.environ['wsgi.input'] = FakeReader()
req.headers['content-length'] = '6'
codes = [201] * self.replicas()
expect_headers = {
'X-Obj-Metadata-Footer': 'yes',
'X-Obj-Multiphase-Commit': 'yes'
}
with set_http_connect(*codes, expect_headers=expect_headers):
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 499)
def test_PUT_ec_exception_during_transfer_data(self):
class FakeReader(object):
def read(self, size):
raise Exception(None)
req = swob.Request.blank('/v1/a/c/o.jpg', method='PUT',
body='test body')
req.environ['wsgi.input'] = FakeReader()
req.headers['content-length'] = '6'
codes = [201] * self.replicas()
expect_headers = {
'X-Obj-Metadata-Footer': 'yes',
'X-Obj-Multiphase-Commit': 'yes'
}
with set_http_connect(*codes, expect_headers=expect_headers):
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 500)
def test_PUT_with_body(self):
req = swift.common.swob.Request.blank('/v1/a/c/o', method='PUT')
segment_size = self.policy.ec_segment_size
@ -2001,7 +2141,7 @@ class TestECObjController(BaseObjectControllerMixin, unittest.TestCase):
try:
resp.body
except ECDriverError:
pass
resp._app_iter.close()
else:
self.fail('invalid ec fragment response body did not blow up!')
error_lines = self.logger.get_lines_for_level('error')

View File

@ -22,7 +22,7 @@ import os
import pickle
import sys
import unittest
from contextlib import closing, contextmanager, nested
from contextlib import closing, contextmanager
from gzip import GzipFile
from shutil import rmtree
import gc
@ -55,7 +55,7 @@ from swift.common.utils import hash_path, json, storage_directory, \
from test.unit import (
connect_tcp, readuntil2crlfs, FakeLogger, FakeRing, fake_http_connect,
FakeMemcache, debug_logger, patch_policies, write_fake_ring,
mocked_http_conn, generate_bad_metadata_headers)
mocked_http_conn, generate_bad_metadata_headers, DEFAULT_TEST_EC_TYPE)
from swift.proxy import server as proxy_server
from swift.proxy.controllers.obj import ReplicatedObjectController
from swift.account import server as account_server
@ -139,7 +139,7 @@ def do_setup(the_object_server):
StoragePolicy(0, 'zero', True),
StoragePolicy(1, 'one', False),
StoragePolicy(2, 'two', False),
ECStoragePolicy(3, 'ec', ec_type='jerasure_rs_vand',
ECStoragePolicy(3, 'ec', ec_type=DEFAULT_TEST_EC_TYPE,
ec_ndata=2, ec_nparity=1, ec_segment_size=4096)])
obj_rings = {
0: ('sda1', 'sdb1'),
@ -684,6 +684,16 @@ class TestController(unittest.TestCase):
@patch_policies([StoragePolicy(0, 'zero', True, object_ring=FakeRing())])
class TestProxyServer(unittest.TestCase):
def test_creation(self):
# later config should be extended to assert more config options
app = proxy_server.Application({'node_timeout': '3.5',
'recoverable_node_timeout': '1.5'},
FakeMemcache(),
container_ring=FakeRing(),
account_ring=FakeRing())
self.assertEqual(app.node_timeout, 3.5)
self.assertEqual(app.recoverable_node_timeout, 1.5)
def test_get_object_ring(self):
baseapp = proxy_server.Application({},
FakeMemcache(),
@ -1821,7 +1831,7 @@ class TestObjectController(unittest.TestCase):
'4096')
self.assertEqual(
lmeta['x-object-sysmeta-ec-scheme'],
'jerasure_rs_vand 2+1')
'%s 2+1' % DEFAULT_TEST_EC_TYPE)
self.assertEqual(
lmeta['etag'],
md5(contents).hexdigest())
@ -2039,10 +2049,8 @@ class TestObjectController(unittest.TestCase):
commit_confirmation = \
'swift.proxy.controllers.obj.ECPutter.send_commit_confirmation'
with nested(
mock.patch('swift.obj.server.md5', busted_md5_constructor),
mock.patch(commit_confirmation, mock_committer)) as \
(_junk, commit_call):
with mock.patch('swift.obj.server.md5', busted_md5_constructor), \
mock.patch(commit_confirmation, mock_committer):
fd = sock.makefile()
fd.write('PUT /v1/a/ec-con/quorum HTTP/1.1\r\n'
'Host: localhost\r\n'
@ -2092,10 +2100,8 @@ class TestObjectController(unittest.TestCase):
commit_confirmation = \
'swift.proxy.controllers.obj.ECPutter.send_commit_confirmation'
with nested(
mock.patch(read_footer),
mock.patch(commit_confirmation, mock_committer)) as \
(read_footer_call, commit_call):
with mock.patch(read_footer) as read_footer_call, \
mock.patch(commit_confirmation, mock_committer):
# Emulate missing footer MIME doc in all object-servers
read_footer_call.side_effect = HTTPBadRequest(
body="couldn't find footer MIME doc")
@ -3587,17 +3593,17 @@ class TestObjectController(unittest.TestCase):
dev['ip'] = '127.0.0.1'
dev['port'] = 1
class SlowBody(object):
class DisconnectedBody(object):
def __init__(self):
self.sent = 0
def read(self, size=-1):
raise Exception('Disconnected')
return ''
req = Request.blank('/v1/a/c/o',
environ={'REQUEST_METHOD': 'PUT',
'wsgi.input': SlowBody()},
'wsgi.input': DisconnectedBody()},
headers={'Content-Length': '4',
'Content-Type': 'text/plain'})
self.app.update_request(req)
@ -3881,11 +3887,10 @@ class TestObjectController(unittest.TestCase):
def test_iter_nodes_gives_extra_if_error_limited_inline(self):
object_ring = self.app.get_object_ring(None)
with nested(
mock.patch.object(self.app, 'sort_nodes', lambda n: n),
with mock.patch.object(self.app, 'sort_nodes', lambda n: n), \
mock.patch.object(self.app, 'request_node_count',
lambda r: 6),
mock.patch.object(object_ring, 'max_more_nodes', 99)):
lambda r: 6), \
mock.patch.object(object_ring, 'max_more_nodes', 99):
first_nodes = list(self.app.iter_nodes(object_ring, 0))
second_nodes = []
for node in self.app.iter_nodes(object_ring, 0):
@ -3899,18 +3904,16 @@ class TestObjectController(unittest.TestCase):
object_ring = self.app.get_object_ring(None)
node_list = [dict(id=n, ip='1.2.3.4', port=n, device='D')
for n in range(10)]
with nested(
mock.patch.object(self.app, 'sort_nodes', lambda n: n),
with mock.patch.object(self.app, 'sort_nodes', lambda n: n), \
mock.patch.object(self.app, 'request_node_count',
lambda r: 3)):
lambda r: 3):
got_nodes = list(self.app.iter_nodes(object_ring, 0,
node_iter=iter(node_list)))
self.assertEqual(node_list[:3], got_nodes)
with nested(
mock.patch.object(self.app, 'sort_nodes', lambda n: n),
with mock.patch.object(self.app, 'sort_nodes', lambda n: n), \
mock.patch.object(self.app, 'request_node_count',
lambda r: 1000000)):
lambda r: 1000000):
got_nodes = list(self.app.iter_nodes(object_ring, 0,
node_iter=iter(node_list)))
self.assertEqual(node_list, got_nodes)
@ -5638,9 +5641,8 @@ class TestObjectController(unittest.TestCase):
# read most of the object, and disconnect
fd.read(10)
fd.close()
sock.close()
sleep(0)
sock.fd._sock.close()
sleep(0.1)
# check for disconnect message!
expected = ['Client disconnected on read'] * 2
@ -5648,6 +5650,45 @@ class TestObjectController(unittest.TestCase):
_test_servers[0].logger.get_lines_for_level('warning'),
expected)
@unpatch_policies
def test_ec_client_put_disconnect(self):
prolis = _test_sockets[0]
# create connection
sock = connect_tcp(('localhost', prolis.getsockname()[1]))
fd = sock.makefile()
# create container
fd.write('PUT /v1/a/ec-discon HTTP/1.1\r\n'
'Host: localhost\r\n'
'Content-Length: 0\r\n'
'X-Storage-Token: t\r\n'
'X-Storage-Policy: ec\r\n'
'\r\n')
fd.flush()
headers = readuntil2crlfs(fd)
exp = 'HTTP/1.1 2'
self.assertEqual(headers[:len(exp)], exp)
# create object
obj = 'a' * 4 * 64 * 2 ** 10
fd.write('PUT /v1/a/ec-discon/test HTTP/1.1\r\n'
'Host: localhost\r\n'
'Content-Length: %d\r\n'
'X-Storage-Token: t\r\n'
'Content-Type: donuts\r\n'
'\r\n%s' % (len(obj), obj[:-10]))
fd.flush()
fd.close()
sock.close()
# sleep to trampoline enough
sleep(0.1)
expected = ['Client disconnected without sending enough data']
warns = _test_servers[0].logger.get_lines_for_level('warning')
self.assertEqual(expected, warns)
errors = _test_servers[0].logger.get_lines_for_level('error')
self.assertEqual([], errors)
@unpatch_policies
def test_leak_1(self):
_request_instances = weakref.WeakKeyDictionary()
@ -5696,8 +5737,7 @@ class TestObjectController(unittest.TestCase):
exp = 'HTTP/1.1 200'
self.assertEqual(headers[:len(exp)], exp)
fd.read(1)
fd.close()
sock.close()
sock.fd._sock.close()
# Make sure the GC is run again for pythons without reference
# counting
for i in range(4):
@ -6160,20 +6200,18 @@ class TestECMismatchedFA(unittest.TestCase):
# Server obj1 will have the first version of the object (obj2 also
# gets it, but that gets stepped on later)
prosrv._error_limiting = {}
with nested(
mock.patch.object(obj3srv, 'PUT', bad_disk),
with mock.patch.object(obj3srv, 'PUT', bad_disk), \
mock.patch(
'swift.common.storage_policy.ECStoragePolicy.quorum')):
'swift.common.storage_policy.ECStoragePolicy.quorum'):
type(ec_policy).quorum = mock.PropertyMock(return_value=2)
resp = put_req1.get_response(prosrv)
self.assertEqual(resp.status_int, 201)
# Servers obj2 and obj3 will have the second version of the object.
prosrv._error_limiting = {}
with nested(
mock.patch.object(obj1srv, 'PUT', bad_disk),
with mock.patch.object(obj1srv, 'PUT', bad_disk), \
mock.patch(
'swift.common.storage_policy.ECStoragePolicy.quorum')):
'swift.common.storage_policy.ECStoragePolicy.quorum'):
type(ec_policy).quorum = mock.PropertyMock(return_value=2)
resp = put_req2.get_response(prosrv)
self.assertEqual(resp.status_int, 201)
@ -6183,9 +6221,8 @@ class TestECMismatchedFA(unittest.TestCase):
environ={"REQUEST_METHOD": "GET"},
headers={"X-Auth-Token": "t"})
prosrv._error_limiting = {}
with nested(
mock.patch.object(obj1srv, 'GET', bad_disk),
mock.patch.object(obj2srv, 'GET', bad_disk)):
with mock.patch.object(obj1srv, 'GET', bad_disk), \
mock.patch.object(obj2srv, 'GET', bad_disk):
resp = get_req.get_response(prosrv)
self.assertEqual(resp.status_int, 503)