Merge branch 'master' into feature/crypto
Change-Id: I42a749efbbe36e96f5cbf871bb51ca1fc9424514
This commit is contained in:
commit
1936299943
|
@ -3,4 +3,4 @@ branch = True
|
|||
omit = /usr*,setup.py,*egg*,.venv/*,.tox/*,test/*
|
||||
|
||||
[report]
|
||||
ignore-errors = True
|
||||
ignore_errors = True
|
||||
|
|
|
@ -176,8 +176,6 @@ Syslog log facility. The default is LOG_LOCAL0.
|
|||
Logging level. The default is INFO.
|
||||
.IP \fBlog_address\fR
|
||||
Logging address. The default is /dev/log.
|
||||
.IP \fBvm_test_mode\fR
|
||||
Indicates that you are using a VM environment. The default is no.
|
||||
.IP \fBper_diff\fR
|
||||
The default is 1000.
|
||||
.IP \fBmax_diffs\fR
|
||||
|
|
|
@ -182,8 +182,6 @@ Syslog log facility. The default is LOG_LOCAL0.
|
|||
Logging level. The default is INFO.
|
||||
.IP \fBlog_address\fR
|
||||
Logging address. The default is /dev/log.
|
||||
.IP \fBvm_test_mode\fR
|
||||
Indicates that you are using a VM environment. The default is no.
|
||||
.IP \fBer_diff\fR
|
||||
The default is 1000.
|
||||
.IP \fBmax_diffs\fR
|
||||
|
|
|
@ -185,8 +185,6 @@ Syslog log facility. The default is LOG_LOCAL0.
|
|||
Logging level. The default is INFO.
|
||||
.IP \fBlog_address\fR
|
||||
Logging address. The default is /dev/log.
|
||||
.IP \fBvm_test_mode\fR
|
||||
Indicates that you are using a VM environment. The default is no.
|
||||
.IP \fBdaemonize\fR
|
||||
Whether or not to run replication as a daemon. The default is yes.
|
||||
.IP "\fBrun_pause [deprecated]\fR"
|
||||
|
|
|
@ -20,7 +20,7 @@ use = egg:swift#account
|
|||
use = egg:swift#recon
|
||||
|
||||
[account-replicator]
|
||||
vm_test_mode = yes
|
||||
rsync_module = {replication_ip}::account{replication_port}
|
||||
|
||||
[account-auditor]
|
||||
|
||||
|
|
|
@ -20,7 +20,7 @@ use = egg:swift#account
|
|||
use = egg:swift#recon
|
||||
|
||||
[account-replicator]
|
||||
vm_test_mode = yes
|
||||
rsync_module = {replication_ip}::account{replication_port}
|
||||
|
||||
[account-auditor]
|
||||
|
||||
|
|
|
@ -20,7 +20,7 @@ use = egg:swift#account
|
|||
use = egg:swift#recon
|
||||
|
||||
[account-replicator]
|
||||
vm_test_mode = yes
|
||||
rsync_module = {replication_ip}::account{replication_port}
|
||||
|
||||
[account-auditor]
|
||||
|
||||
|
|
|
@ -20,7 +20,7 @@ use = egg:swift#account
|
|||
use = egg:swift#recon
|
||||
|
||||
[account-replicator]
|
||||
vm_test_mode = yes
|
||||
rsync_module = {replication_ip}::account{replication_port}
|
||||
|
||||
[account-auditor]
|
||||
|
||||
|
|
|
@ -20,7 +20,7 @@ use = egg:swift#container
|
|||
use = egg:swift#recon
|
||||
|
||||
[container-replicator]
|
||||
vm_test_mode = yes
|
||||
rsync_module = {replication_ip}::container{replication_port}
|
||||
|
||||
[container-updater]
|
||||
|
||||
|
|
|
@ -20,7 +20,7 @@ use = egg:swift#container
|
|||
use = egg:swift#recon
|
||||
|
||||
[container-replicator]
|
||||
vm_test_mode = yes
|
||||
rsync_module = {replication_ip}::container{replication_port}
|
||||
|
||||
[container-updater]
|
||||
|
||||
|
|
|
@ -20,7 +20,7 @@ use = egg:swift#container
|
|||
use = egg:swift#recon
|
||||
|
||||
[container-replicator]
|
||||
vm_test_mode = yes
|
||||
rsync_module = {replication_ip}::container{replication_port}
|
||||
|
||||
[container-updater]
|
||||
|
||||
|
|
|
@ -20,7 +20,7 @@ use = egg:swift#container
|
|||
use = egg:swift#recon
|
||||
|
||||
[container-replicator]
|
||||
vm_test_mode = yes
|
||||
rsync_module = {replication_ip}::container{replication_port}
|
||||
|
||||
[container-updater]
|
||||
|
||||
|
|
|
@ -20,7 +20,7 @@ use = egg:swift#object
|
|||
use = egg:swift#recon
|
||||
|
||||
[object-replicator]
|
||||
vm_test_mode = yes
|
||||
rsync_module = {replication_ip}::object{replication_port}
|
||||
|
||||
[object-reconstructor]
|
||||
|
||||
|
|
|
@ -20,7 +20,7 @@ use = egg:swift#object
|
|||
use = egg:swift#recon
|
||||
|
||||
[object-replicator]
|
||||
vm_test_mode = yes
|
||||
rsync_module = {replication_ip}::object{replication_port}
|
||||
|
||||
[object-reconstructor]
|
||||
|
||||
|
|
|
@ -20,7 +20,7 @@ use = egg:swift#object
|
|||
use = egg:swift#recon
|
||||
|
||||
[object-replicator]
|
||||
vm_test_mode = yes
|
||||
rsync_module = {replication_ip}::object{replication_port}
|
||||
|
||||
[object-reconstructor]
|
||||
|
||||
|
|
|
@ -20,7 +20,7 @@ use = egg:swift#object
|
|||
use = egg:swift#recon
|
||||
|
||||
[object-replicator]
|
||||
vm_test_mode = yes
|
||||
rsync_module = {replication_ip}::object{replication_port}
|
||||
|
||||
[object-reconstructor]
|
||||
|
||||
|
|
|
@ -1065,7 +1065,7 @@ proxy-server controller responsible for the request: "account", "container",
|
|||
middleware. The `<verb>` portion will be one of "GET", "HEAD", "POST", "PUT",
|
||||
"DELETE", "COPY", "OPTIONS", or "BAD_METHOD". The list of valid HTTP methods
|
||||
is configurable via the `log_statsd_valid_http_methods` config variable and
|
||||
the default setting yields the above behavior.
|
||||
the default setting yields the above behavior):
|
||||
|
||||
.. _Swift Origin Server: https://github.com/dpgoetz/sos
|
||||
|
||||
|
@ -1087,6 +1087,21 @@ Metric Name Description
|
|||
like the main timing metric.
|
||||
==================================================== ============================================
|
||||
|
||||
The `proxy-logging` middleware also groups these metrics by policy. The
|
||||
`<policy-index>` portion represents a policy index):
|
||||
|
||||
========================================================================== =====================================
|
||||
Metric Name Description
|
||||
-------------------------------------------------------------------------- -------------------------------------
|
||||
`proxy-server.object.policy.<policy-index>.<verb>.<status>.timing` Timing data for requests, aggregated
|
||||
by policy index.
|
||||
`proxy-server.object.policy.<policy-index>.GET.<status>.first-byte.timing` Timing data up to completion of
|
||||
sending the response headers,
|
||||
aggregated by policy index.
|
||||
`proxy-server.object.policy.<policy-index>.<verb>.<status>.xfer` Sum of bytes transferred in and out,
|
||||
aggregated by policy index.
|
||||
========================================================================== =====================================
|
||||
|
||||
Metrics for `tempauth` middleware (in the table, `<reseller_prefix>` represents
|
||||
the actual configured reseller_prefix or "`NONE`" if the reseller_prefix is the
|
||||
empty string):
|
||||
|
|
|
@ -7,7 +7,7 @@ metadata by using the Object Storage API, which is implemented as a set
|
|||
of Representational State Transfer (REST) web services.
|
||||
|
||||
For an introduction to OpenStack Object Storage, see `Object
|
||||
Storage <http://docs.openstack.org/admin-guide-cloud/content/ch_admin-openstack-object-storage.html>`__
|
||||
Storage <http://docs.openstack.org/admin-guide-cloud/objectstorage.html>`
|
||||
in the *OpenStack Cloud Administrator Guide*.
|
||||
|
||||
You use the HTTPS (SSL) protocol to interact with Object Storage, and
|
||||
|
|
|
@ -594,6 +594,17 @@ node_timeout DEFAULT or 10 Request timeout to external services.
|
|||
in the DEFAULT section, or 10 (though
|
||||
other sections use 3 as the final
|
||||
default).
|
||||
rsync_module {replication_ip}::object
|
||||
Format of the rsync module where the
|
||||
replicator will send data. The
|
||||
configuration value can include some
|
||||
variables that will be extracted from
|
||||
the ring. Variables must follow the
|
||||
format {NAME} where NAME is one of:
|
||||
ip, port, replication_ip,
|
||||
replication_port, region, zone, device,
|
||||
meta. See etc/rsyncd.conf-sample for
|
||||
some examples.
|
||||
================== ================= =======================================
|
||||
|
||||
[object-updater]
|
||||
|
@ -723,6 +734,18 @@ conn_timeout 0.5 Connection timeout to external
|
|||
services
|
||||
reclaim_age 604800 Time elapsed in seconds before a
|
||||
container can be reclaimed
|
||||
rsync_module {replication_ip}::container
|
||||
Format of the rsync module where the
|
||||
replicator will send data. The
|
||||
configuration value can include some
|
||||
variables that will be extracted from
|
||||
the ring. Variables must follow the
|
||||
format {NAME} where NAME is one of:
|
||||
ip, port, replication_ip,
|
||||
replication_port, region, zone,
|
||||
device, meta. See
|
||||
etc/rsyncd.conf-sample for some
|
||||
examples.
|
||||
================== ==================== ====================================
|
||||
|
||||
[container-updater]
|
||||
|
@ -850,6 +873,18 @@ node_timeout 10 Request timeout to external services
|
|||
conn_timeout 0.5 Connection timeout to external services
|
||||
reclaim_age 604800 Time elapsed in seconds before an
|
||||
account can be reclaimed
|
||||
rsync_module {replication_ip}::account
|
||||
Format of the rsync module where the
|
||||
replicator will send data. The
|
||||
configuration value can include some
|
||||
variables that will be extracted from
|
||||
the ring. Variables must follow the
|
||||
format {NAME} where NAME is one of:
|
||||
ip, port, replication_ip,
|
||||
replication_port, region, zone,
|
||||
device, meta. See
|
||||
etc/rsyncd.conf-sample for some
|
||||
examples.
|
||||
================== ================== ======================================
|
||||
|
||||
[account-auditor]
|
||||
|
@ -1241,13 +1276,13 @@ RAID it is also important to make sure that the proper sunit and swidth
|
|||
settings get set so that XFS can make most efficient use of the RAID array.
|
||||
|
||||
For a standard swift install, all data drives are mounted directly under
|
||||
/srv/node (as can be seen in the above example of mounting /def/sda1 as
|
||||
/srv/node/sda). If you choose to mount the drives in another directory,
|
||||
``/srv/node`` (as can be seen in the above example of mounting ``/dev/sda1`` as
|
||||
``/srv/node/sda``). If you choose to mount the drives in another directory,
|
||||
be sure to set the `devices` config option in all of the server configs to
|
||||
point to the correct directory.
|
||||
|
||||
The mount points for each drive in /srv/node/ should be owned by the root user
|
||||
almost exclusively (root:root 755). This is required to prevent rsync from
|
||||
The mount points for each drive in ``/srv/node/`` should be owned by the root user
|
||||
almost exclusively (``root:root 755``). This is required to prevent rsync from
|
||||
syncing files into the root drive in the event a drive is unmounted.
|
||||
|
||||
Swift uses system calls to reserve space for new objects being written into
|
||||
|
|
|
@ -251,9 +251,22 @@ of them will be made of parity data (calculations depending on ec_type).
|
|||
When deciding which devices to use in the EC policy's object ring, be sure to
|
||||
carefully consider the performance impacts. Running some performance
|
||||
benchmarking in a test environment for your configuration is highly recommended
|
||||
before deployment. Once you have configured your EC policy in `swift.conf` and
|
||||
created your object ring, your application is ready to start using EC simply by
|
||||
creating a container with the specified policy name and interacting as usual.
|
||||
before deployment.
|
||||
|
||||
To create the EC policy's object ring, the only difference in the usage of the
|
||||
``swift-ring-builder create`` command is the ``replicas`` parameter. The
|
||||
``replicas`` value is the number of fragments spread across the object servers
|
||||
associated with the ring; ``replicas`` must be equal to the sum of
|
||||
``ec_num_data_fragments`` and ``ec_num_parity_fragments``. For example::
|
||||
|
||||
swift-ring-builder object-1.builder create 10 14 1
|
||||
|
||||
Note that in this example the ``replicas`` value of 14 is based on the sum of
|
||||
10 EC data fragments and 4 EC parity fragments.
|
||||
|
||||
Once you have configured your EC policy in `swift.conf` and created your object
|
||||
ring, your application is ready to start using EC simply by creating a container
|
||||
with the specified policy name and interacting as usual.
|
||||
|
||||
.. note::
|
||||
|
||||
|
|
|
@ -168,6 +168,21 @@ on them than the disks in nodes A and B. If 80% full is the warning
|
|||
threshold for the cluster, node C's disks will reach 80% full while A
|
||||
and B's disks are only 72.7% full.
|
||||
|
||||
**********
|
||||
Dispersion
|
||||
**********
|
||||
|
||||
With each rebalance, the ring builder calculates a dispersion metric. This is
|
||||
the percentage of partitions in the ring that have too many replicas within a
|
||||
particular failure domain.
|
||||
|
||||
For example, if you have three servers in a cluster but two replicas for a
|
||||
partition get placed onto the same server, that partition will count towards the
|
||||
dispersion metric.
|
||||
|
||||
A lower dispersion value is better, and the value can be used to find the proper
|
||||
value for "overload".
|
||||
|
||||
*********************
|
||||
Partition Shift Value
|
||||
*********************
|
||||
|
|
|
@ -146,7 +146,7 @@ For SAIO replication
|
|||
|
||||
delete all configuration options in section [<*>-replicator]
|
||||
|
||||
#. Add configuration files for object-server, in /etc/swift/objec-server/
|
||||
#. Add configuration files for object-server, in /etc/swift/object-server/
|
||||
|
||||
* 5.conf::
|
||||
|
||||
|
@ -170,7 +170,7 @@ For SAIO replication
|
|||
use = egg:swift#recon
|
||||
|
||||
[object-replicator]
|
||||
vm_test_mode = yes
|
||||
rsync_module = {replication_ip}::object{replication_port}
|
||||
|
||||
* 6.conf::
|
||||
|
||||
|
@ -194,7 +194,7 @@ For SAIO replication
|
|||
use = egg:swift#recon
|
||||
|
||||
[object-replicator]
|
||||
vm_test_mode = yes
|
||||
rsync_module = {replication_ip}::object{replication_port}
|
||||
|
||||
* 7.conf::
|
||||
|
||||
|
@ -218,7 +218,7 @@ For SAIO replication
|
|||
use = egg:swift#recon
|
||||
|
||||
[object-replicator]
|
||||
vm_test_mode = yes
|
||||
rsync_module = {replication_ip}::object{replication_port}
|
||||
|
||||
* 8.conf::
|
||||
|
||||
|
@ -242,7 +242,7 @@ For SAIO replication
|
|||
use = egg:swift#recon
|
||||
|
||||
[object-replicator]
|
||||
vm_test_mode = yes
|
||||
rsync_module = {replication_ip}::object{replication_port}
|
||||
|
||||
#. Add configuration files for container-server, in /etc/swift/container-server/
|
||||
|
||||
|
@ -268,7 +268,7 @@ For SAIO replication
|
|||
use = egg:swift#recon
|
||||
|
||||
[container-replicator]
|
||||
vm_test_mode = yes
|
||||
rsync_module = {replication_ip}::container{replication_port}
|
||||
|
||||
* 6.conf::
|
||||
|
||||
|
@ -292,7 +292,7 @@ For SAIO replication
|
|||
use = egg:swift#recon
|
||||
|
||||
[container-replicator]
|
||||
vm_test_mode = yes
|
||||
rsync_module = {replication_ip}::container{replication_port}
|
||||
|
||||
* 7.conf::
|
||||
|
||||
|
@ -316,7 +316,7 @@ For SAIO replication
|
|||
use = egg:swift#recon
|
||||
|
||||
[container-replicator]
|
||||
vm_test_mode = yes
|
||||
rsync_module = {replication_ip}::container{replication_port}
|
||||
|
||||
* 8.conf::
|
||||
|
||||
|
@ -340,7 +340,7 @@ For SAIO replication
|
|||
use = egg:swift#recon
|
||||
|
||||
[container-replicator]
|
||||
vm_test_mode = yes
|
||||
rsync_module = {replication_ip}::container{replication_port}
|
||||
|
||||
#. Add configuration files for account-server, in /etc/swift/account-server/
|
||||
|
||||
|
@ -366,7 +366,7 @@ For SAIO replication
|
|||
use = egg:swift#recon
|
||||
|
||||
[account-replicator]
|
||||
vm_test_mode = yes
|
||||
rsync_module = {replication_ip}::account{replication_port}
|
||||
|
||||
* 6.conf::
|
||||
|
||||
|
@ -390,7 +390,7 @@ For SAIO replication
|
|||
use = egg:swift#recon
|
||||
|
||||
[account-replicator]
|
||||
vm_test_mode = yes
|
||||
rsync_module = {replication_ip}::account{replication_port}
|
||||
|
||||
* 7.conf::
|
||||
|
||||
|
@ -414,7 +414,7 @@ For SAIO replication
|
|||
use = egg:swift#recon
|
||||
|
||||
[account-replicator]
|
||||
vm_test_mode = yes
|
||||
rsync_module = {replication_ip}::account{replication_port}
|
||||
|
||||
* 8.conf::
|
||||
|
||||
|
@ -438,7 +438,7 @@ For SAIO replication
|
|||
use = egg:swift#recon
|
||||
|
||||
[account-replicator]
|
||||
vm_test_mode = yes
|
||||
rsync_module = {replication_ip}::account{replication_port}
|
||||
|
||||
|
||||
---------------------------------
|
||||
|
|
|
@ -90,7 +90,6 @@ use = egg:swift#recon
|
|||
# log_level = INFO
|
||||
# log_address = /dev/log
|
||||
#
|
||||
# vm_test_mode = no
|
||||
# per_diff = 1000
|
||||
# max_diffs = 100
|
||||
# concurrency = 8
|
||||
|
@ -111,6 +110,10 @@ use = egg:swift#recon
|
|||
# a different region than the local one.
|
||||
# rsync_compress = no
|
||||
#
|
||||
# Format of the rysnc module where the replicator will send data. See
|
||||
# etc/rsyncd.conf-sample for some usage examples.
|
||||
# rsync_module = {replication_ip}::account
|
||||
#
|
||||
# recon_cache_path = /var/cache/swift
|
||||
|
||||
[account-auditor]
|
||||
|
|
|
@ -99,7 +99,6 @@ use = egg:swift#recon
|
|||
# log_level = INFO
|
||||
# log_address = /dev/log
|
||||
#
|
||||
# vm_test_mode = no
|
||||
# per_diff = 1000
|
||||
# max_diffs = 100
|
||||
# concurrency = 8
|
||||
|
@ -120,6 +119,10 @@ use = egg:swift#recon
|
|||
# a different region than the local one.
|
||||
# rsync_compress = no
|
||||
#
|
||||
# Format of the rysnc module where the replicator will send data. See
|
||||
# etc/rsyncd.conf-sample for some usage examples.
|
||||
# rsync_module = {replication_ip}::container
|
||||
#
|
||||
# recon_cache_path = /var/cache/swift
|
||||
|
||||
[container-updater]
|
||||
|
|
|
@ -162,7 +162,6 @@ use = egg:swift#recon
|
|||
# log_level = INFO
|
||||
# log_address = /dev/log
|
||||
#
|
||||
# vm_test_mode = no
|
||||
# daemonize = on
|
||||
#
|
||||
# Time in seconds to wait between replication passes
|
||||
|
@ -195,6 +194,10 @@ use = egg:swift#recon
|
|||
# slow down the syncing process.
|
||||
# rsync_compress = no
|
||||
#
|
||||
# Format of the rysnc module where the replicator will send data. See
|
||||
# etc/rsyncd.conf-sample for some usage examples.
|
||||
# rsync_module = {replication_ip}::object
|
||||
#
|
||||
# node_timeout = <whatever's in the DEFAULT section or 10>
|
||||
# max duration of an http request; this is for REPLICATE finalization calls and
|
||||
# so should be longer than node_timeout
|
||||
|
|
|
@ -20,3 +20,59 @@ max connections = 8
|
|||
path = /srv/node
|
||||
read only = false
|
||||
lock file = /var/lock/object.lock
|
||||
|
||||
|
||||
# If rsync_module includes the device, you can tune rsyncd to permit 4
|
||||
# connections per device instead of simply allowing 8 connections for all
|
||||
# devices:
|
||||
# rsync_module = {replication_ip}::object_{device}
|
||||
#
|
||||
# (if devices in your object ring are named sda, sdb and sdc)
|
||||
#
|
||||
#[object_sda]
|
||||
#max connections = 4
|
||||
#path = /srv/node
|
||||
#read only = false
|
||||
#lock file = /var/lock/object_sda.lock
|
||||
#
|
||||
#[object_sdb]
|
||||
#max connections = 4
|
||||
#path = /srv/node
|
||||
#read only = false
|
||||
#lock file = /var/lock/object_sdb.lock
|
||||
#
|
||||
#[object_sdc]
|
||||
#max connections = 4
|
||||
#path = /srv/node
|
||||
#read only = false
|
||||
#lock file = /var/lock/object_sdc.lock
|
||||
|
||||
|
||||
# To emulate the deprecated option vm_test_mode = yes, set:
|
||||
# rsync_module = {replication_ip}::object{replication_port}
|
||||
#
|
||||
# So, on your SAIO, you have to set the following rsyncd configuration:
|
||||
#
|
||||
#[object6010]
|
||||
#max connections = 25
|
||||
#path = /srv/1/node/
|
||||
#read only = false
|
||||
#lock file = /var/lock/object6010.lock
|
||||
#
|
||||
#[object6020]
|
||||
#max connections = 25
|
||||
#path = /srv/2/node/
|
||||
#read only = false
|
||||
#lock file = /var/lock/object6020.lock
|
||||
#
|
||||
#[object6030]
|
||||
#max connections = 25
|
||||
#path = /srv/3/node/
|
||||
#read only = false
|
||||
#lock file = /var/lock/object6030.lock
|
||||
#
|
||||
#[object6040]
|
||||
#max connections = 25
|
||||
#path = /srv/4/node/
|
||||
#read only = false
|
||||
#lock file = /var/lock/object6040.lock
|
||||
|
|
|
@ -370,10 +370,10 @@ class AccountReaper(Daemon):
|
|||
if self.logger.getEffectiveLevel() <= DEBUG:
|
||||
self.logger.exception(
|
||||
_('Exception with %(ip)s:%(port)s/%(device)s'), node)
|
||||
self.stats_return_codes[err.http_status / 100] = \
|
||||
self.stats_return_codes.get(err.http_status / 100, 0) + 1
|
||||
self.stats_return_codes[err.http_status // 100] = \
|
||||
self.stats_return_codes.get(err.http_status // 100, 0) + 1
|
||||
self.logger.increment(
|
||||
'return_codes.%d' % (err.http_status / 100,))
|
||||
'return_codes.%d' % (err.http_status // 100,))
|
||||
except (Timeout, socket.error) as err:
|
||||
self.logger.error(
|
||||
_('Timeout Exception with %(ip)s:%(port)s/%(device)s'),
|
||||
|
@ -426,10 +426,10 @@ class AccountReaper(Daemon):
|
|||
_('Exception with %(ip)s:%(port)s/%(device)s'), node)
|
||||
failures += 1
|
||||
self.logger.increment('containers_failures')
|
||||
self.stats_return_codes[err.http_status / 100] = \
|
||||
self.stats_return_codes.get(err.http_status / 100, 0) + 1
|
||||
self.stats_return_codes[err.http_status // 100] = \
|
||||
self.stats_return_codes.get(err.http_status // 100, 0) + 1
|
||||
self.logger.increment(
|
||||
'return_codes.%d' % (err.http_status / 100,))
|
||||
'return_codes.%d' % (err.http_status // 100,))
|
||||
except (Timeout, socket.error) as err:
|
||||
self.logger.error(
|
||||
_('Timeout Exception with %(ip)s:%(port)s/%(device)s'),
|
||||
|
@ -502,10 +502,10 @@ class AccountReaper(Daemon):
|
|||
_('Exception with %(ip)s:%(port)s/%(device)s'), node)
|
||||
failures += 1
|
||||
self.logger.increment('objects_failures')
|
||||
self.stats_return_codes[err.http_status / 100] = \
|
||||
self.stats_return_codes.get(err.http_status / 100, 0) + 1
|
||||
self.stats_return_codes[err.http_status // 100] = \
|
||||
self.stats_return_codes.get(err.http_status // 100, 0) + 1
|
||||
self.logger.increment(
|
||||
'return_codes.%d' % (err.http_status / 100,))
|
||||
'return_codes.%d' % (err.http_status // 100,))
|
||||
except (Timeout, socket.error) as err:
|
||||
failures += 1
|
||||
self.logger.increment('objects_failures')
|
||||
|
|
|
@ -51,7 +51,7 @@ def size_suffix(size):
|
|||
for suffix in suffixes:
|
||||
if size < 1000:
|
||||
return "%s %s" % (size, suffix)
|
||||
size = size / 1000
|
||||
size = size // 1000
|
||||
return "%s %s" % (size, suffix)
|
||||
|
||||
|
||||
|
|
|
@ -31,7 +31,8 @@ import swift.common.db
|
|||
from swift.common.direct_client import quote
|
||||
from swift.common.utils import get_logger, whataremyips, storage_directory, \
|
||||
renamer, mkdirs, lock_parent_directory, config_true_value, \
|
||||
unlink_older_than, dump_recon_cache, rsync_ip, ismount, json, Timestamp
|
||||
unlink_older_than, dump_recon_cache, rsync_module_interpolation, ismount, \
|
||||
json, Timestamp
|
||||
from swift.common import ring
|
||||
from swift.common.ring.utils import is_local_device
|
||||
from swift.common.http import HTTP_NOT_FOUND, HTTP_INSUFFICIENT_STORAGE
|
||||
|
@ -165,11 +166,20 @@ class Replicator(Daemon):
|
|||
self.max_diffs = int(conf.get('max_diffs') or 100)
|
||||
self.interval = int(conf.get('interval') or
|
||||
conf.get('run_pause') or 30)
|
||||
self.vm_test_mode = config_true_value(conf.get('vm_test_mode', 'no'))
|
||||
self.node_timeout = int(conf.get('node_timeout', 10))
|
||||
self.conn_timeout = float(conf.get('conn_timeout', 0.5))
|
||||
self.rsync_compress = config_true_value(
|
||||
conf.get('rsync_compress', 'no'))
|
||||
self.rsync_module = conf.get('rsync_module', '').rstrip('/')
|
||||
if not self.rsync_module:
|
||||
self.rsync_module = '{replication_ip}::%s' % self.server_type
|
||||
if config_true_value(conf.get('vm_test_mode', 'no')):
|
||||
self.logger.warn('Option %(type)s-replicator/vm_test_mode is '
|
||||
'deprecated and will be removed in a future '
|
||||
'version. Update your configuration to use '
|
||||
'option %(type)s-replicator/rsync_module.'
|
||||
% {'type': self.server_type})
|
||||
self.rsync_module += '{replication_port}'
|
||||
self.reclaim_age = float(conf.get('reclaim_age', 86400 * 7))
|
||||
swift.common.db.DB_PREALLOCATION = \
|
||||
config_true_value(conf.get('db_preallocation', 'f'))
|
||||
|
@ -267,14 +277,9 @@ class Replicator(Daemon):
|
|||
:param different_region: if True, the destination node is in a
|
||||
different region
|
||||
"""
|
||||
device_ip = rsync_ip(device['replication_ip'])
|
||||
if self.vm_test_mode:
|
||||
remote_file = '%s::%s%s/%s/tmp/%s' % (
|
||||
device_ip, self.server_type, device['replication_port'],
|
||||
device['device'], local_id)
|
||||
else:
|
||||
remote_file = '%s::%s/%s/tmp/%s' % (
|
||||
device_ip, self.server_type, device['device'], local_id)
|
||||
rsync_module = rsync_module_interpolation(self.rsync_module, device)
|
||||
rsync_path = '%s/tmp/%s' % (device['device'], local_id)
|
||||
remote_file = '%s/%s' % (rsync_module, rsync_path)
|
||||
mtime = os.path.getmtime(broker.db_file)
|
||||
if not self._rsync_file(broker.db_file, remote_file,
|
||||
different_region=different_region):
|
||||
|
|
|
@ -80,6 +80,8 @@ from swift.common.utils import (get_logger, get_remote_client,
|
|||
get_valid_utf8_str, config_true_value,
|
||||
InputProxy, list_from_csv, get_policy_index)
|
||||
|
||||
from swift.common.storage_policy import POLICIES
|
||||
|
||||
QUOTE_SAFE = '/:'
|
||||
|
||||
|
||||
|
@ -195,17 +197,27 @@ class ProxyLoggingMiddleware(object):
|
|||
end_time_str,
|
||||
policy_index
|
||||
)))
|
||||
|
||||
# Log timing and bytes-transferred data to StatsD
|
||||
metric_name = self.statsd_metric_name(req, status_int, method)
|
||||
metric_name_policy = self.statsd_metric_name_policy(req, status_int,
|
||||
method,
|
||||
policy_index)
|
||||
# Only log data for valid controllers (or SOS) to keep the metric count
|
||||
# down (egregious errors will get logged by the proxy server itself).
|
||||
|
||||
if metric_name:
|
||||
self.access_logger.timing(metric_name + '.timing',
|
||||
(end_time - start_time) * 1000)
|
||||
self.access_logger.update_stats(metric_name + '.xfer',
|
||||
bytes_received + bytes_sent)
|
||||
if metric_name_policy:
|
||||
self.access_logger.timing(metric_name_policy + '.timing',
|
||||
(end_time - start_time) * 1000)
|
||||
self.access_logger.update_stats(metric_name_policy + '.xfer',
|
||||
bytes_received + bytes_sent)
|
||||
|
||||
def statsd_metric_name(self, req, status_int, method):
|
||||
def get_metric_name_type(self, req):
|
||||
if req.path.startswith('/v1/'):
|
||||
try:
|
||||
stat_type = [None, 'account', 'container',
|
||||
|
@ -214,12 +226,33 @@ class ProxyLoggingMiddleware(object):
|
|||
stat_type = 'object'
|
||||
else:
|
||||
stat_type = req.environ.get('swift.source')
|
||||
return stat_type
|
||||
|
||||
def statsd_metric_name(self, req, status_int, method):
|
||||
stat_type = self.get_metric_name_type(req)
|
||||
if stat_type is None:
|
||||
return None
|
||||
stat_method = method if method in self.valid_methods \
|
||||
else 'BAD_METHOD'
|
||||
return '.'.join((stat_type, stat_method, str(status_int)))
|
||||
|
||||
def statsd_metric_name_policy(self, req, status_int, method, policy_index):
|
||||
if policy_index is None:
|
||||
return None
|
||||
stat_type = self.get_metric_name_type(req)
|
||||
if stat_type == 'object':
|
||||
stat_method = method if method in self.valid_methods \
|
||||
else 'BAD_METHOD'
|
||||
# The policy may not exist
|
||||
policy = POLICIES.get_by_index(policy_index)
|
||||
if policy:
|
||||
return '.'.join((stat_type, 'policy', str(policy_index),
|
||||
stat_method, str(status_int)))
|
||||
else:
|
||||
return None
|
||||
else:
|
||||
return None
|
||||
|
||||
def __call__(self, env, start_response):
|
||||
if self.req_already_logged(env):
|
||||
return self.app(env, start_response)
|
||||
|
@ -270,10 +303,16 @@ class ProxyLoggingMiddleware(object):
|
|||
method = self.method_from_req(req)
|
||||
if method == 'GET':
|
||||
status_int = status_int_for_logging()
|
||||
policy_index = get_policy_index(req.headers, resp_headers)
|
||||
metric_name = self.statsd_metric_name(req, status_int, method)
|
||||
metric_name_policy = self.statsd_metric_name_policy(
|
||||
req, status_int, method, policy_index)
|
||||
if metric_name:
|
||||
self.access_logger.timing_since(
|
||||
metric_name + '.first-byte.timing', start_time)
|
||||
if metric_name_policy:
|
||||
self.access_logger.timing_since(
|
||||
metric_name_policy + '.first-byte.timing', start_time)
|
||||
|
||||
bytes_sent = 0
|
||||
client_disconnect = False
|
||||
|
|
|
@ -19,6 +19,7 @@ import time
|
|||
from swift import gettext_ as _
|
||||
|
||||
from swift import __version__ as swiftver
|
||||
from swift.common.storage_policy import POLICIES
|
||||
from swift.common.swob import Request, Response
|
||||
from swift.common.utils import get_logger, config_true_value, json, \
|
||||
SWIFT_CONF_FILE
|
||||
|
@ -58,11 +59,13 @@ class ReconMiddleware(object):
|
|||
'drive.recon')
|
||||
self.account_ring_path = os.path.join(swift_dir, 'account.ring.gz')
|
||||
self.container_ring_path = os.path.join(swift_dir, 'container.ring.gz')
|
||||
|
||||
self.rings = [self.account_ring_path, self.container_ring_path]
|
||||
# include all object ring files (for all policies)
|
||||
for f in os.listdir(swift_dir):
|
||||
if f.startswith('object') and f.endswith('ring.gz'):
|
||||
self.rings.append(os.path.join(swift_dir, f))
|
||||
for policy in POLICIES:
|
||||
self.rings.append(os.path.join(swift_dir,
|
||||
policy.ring_name + '.ring.gz'))
|
||||
|
||||
self.mount_check = config_true_value(conf.get('mount_check', 'true'))
|
||||
|
||||
def _from_recon_cache(self, cache_keys, cache_file, openr=open):
|
||||
|
|
|
@ -1415,7 +1415,7 @@ class SwiftLogFormatter(logging.Formatter):
|
|||
if self.max_line_length < 7:
|
||||
msg = msg[:self.max_line_length]
|
||||
else:
|
||||
approxhalf = (self.max_line_length - 5) / 2
|
||||
approxhalf = (self.max_line_length - 5) // 2
|
||||
msg = msg[:approxhalf] + " ... " + msg[-approxhalf:]
|
||||
return msg
|
||||
|
||||
|
@ -2703,6 +2703,33 @@ def rsync_ip(ip):
|
|||
return '[%s]' % ip
|
||||
|
||||
|
||||
def rsync_module_interpolation(template, device):
|
||||
"""
|
||||
Interpolate devices variables inside a rsync module template
|
||||
|
||||
:param template: rsync module template as a string
|
||||
:param device: a device from a ring
|
||||
|
||||
:returns: a string with all variables replaced by device attributes
|
||||
"""
|
||||
replacements = {
|
||||
'ip': rsync_ip(device.get('ip', '')),
|
||||
'port': device.get('port', ''),
|
||||
'replication_ip': rsync_ip(device.get('replication_ip', '')),
|
||||
'replication_port': device.get('replication_port', ''),
|
||||
'region': device.get('region', ''),
|
||||
'zone': device.get('zone', ''),
|
||||
'device': device.get('device', ''),
|
||||
'meta': device.get('meta', ''),
|
||||
}
|
||||
try:
|
||||
module = template.format(**replacements)
|
||||
except KeyError as e:
|
||||
raise ValueError('Cannot interpolate rsync_module, invalid variable: '
|
||||
'%s' % e)
|
||||
return module
|
||||
|
||||
|
||||
def get_valid_utf8_str(str_or_unicode):
|
||||
"""
|
||||
Get valid parts of utf-8 str from str, unicode and even invalid utf-8 str
|
||||
|
|
|
@ -6,9 +6,9 @@
|
|||
#, fuzzy
|
||||
msgid ""
|
||||
msgstr ""
|
||||
"Project-Id-Version: swift 2.3.1.dev243\n"
|
||||
"Project-Id-Version: swift 2.4.1.dev19\n"
|
||||
"Report-Msgid-Bugs-To: EMAIL@ADDRESS\n"
|
||||
"POT-Creation-Date: 2015-08-04 06:29+0000\n"
|
||||
"POT-Creation-Date: 2015-09-05 06:17+0000\n"
|
||||
"PO-Revision-Date: YEAR-MO-DA HO:MI+ZONE\n"
|
||||
"Last-Translator: FULL NAME <EMAIL@ADDRESS>\n"
|
||||
"Language-Team: LANGUAGE <LL@li.org>\n"
|
||||
|
@ -63,98 +63,104 @@ msgstr ""
|
|||
msgid "ERROR Could not get account info %s"
|
||||
msgstr ""
|
||||
|
||||
#: swift/account/reaper.py:134 swift/common/utils.py:2147
|
||||
#: swift/account/reaper.py:138 swift/common/utils.py:2147
|
||||
#: swift/obj/diskfile.py:296 swift/obj/updater.py:88 swift/obj/updater.py:131
|
||||
#, python-format
|
||||
msgid "Skipping %s as it is not mounted"
|
||||
msgstr ""
|
||||
|
||||
#: swift/account/reaper.py:138
|
||||
#: swift/account/reaper.py:142
|
||||
msgid "Exception in top-level account reaper loop"
|
||||
msgstr ""
|
||||
|
||||
#: swift/account/reaper.py:141
|
||||
#: swift/account/reaper.py:145
|
||||
#, python-format
|
||||
msgid "Devices pass completed: %.02fs"
|
||||
msgstr ""
|
||||
|
||||
#: swift/account/reaper.py:238
|
||||
#: swift/account/reaper.py:253
|
||||
#, python-format
|
||||
msgid "Beginning pass on account %s"
|
||||
msgstr ""
|
||||
|
||||
#: swift/account/reaper.py:255
|
||||
#: swift/account/reaper.py:278
|
||||
#, python-format
|
||||
msgid "Exception with containers for account %s"
|
||||
msgstr ""
|
||||
|
||||
#: swift/account/reaper.py:262
|
||||
#: swift/account/reaper.py:285
|
||||
#, python-format
|
||||
msgid "Exception with account %s"
|
||||
msgstr ""
|
||||
|
||||
#: swift/account/reaper.py:263
|
||||
#: swift/account/reaper.py:286
|
||||
#, python-format
|
||||
msgid "Incomplete pass on account %s"
|
||||
msgstr ""
|
||||
|
||||
#: swift/account/reaper.py:265
|
||||
#: swift/account/reaper.py:288
|
||||
#, python-format
|
||||
msgid ", %s containers deleted"
|
||||
msgstr ""
|
||||
|
||||
#: swift/account/reaper.py:267
|
||||
#: swift/account/reaper.py:290
|
||||
#, python-format
|
||||
msgid ", %s objects deleted"
|
||||
msgstr ""
|
||||
|
||||
#: swift/account/reaper.py:269
|
||||
#: swift/account/reaper.py:292
|
||||
#, python-format
|
||||
msgid ", %s containers remaining"
|
||||
msgstr ""
|
||||
|
||||
#: swift/account/reaper.py:272
|
||||
#: swift/account/reaper.py:295
|
||||
#, python-format
|
||||
msgid ", %s objects remaining"
|
||||
msgstr ""
|
||||
|
||||
#: swift/account/reaper.py:274
|
||||
#: swift/account/reaper.py:297
|
||||
#, python-format
|
||||
msgid ", %s containers possibly remaining"
|
||||
msgstr ""
|
||||
|
||||
#: swift/account/reaper.py:277
|
||||
#: swift/account/reaper.py:300
|
||||
#, python-format
|
||||
msgid ", %s objects possibly remaining"
|
||||
msgstr ""
|
||||
|
||||
#: swift/account/reaper.py:280
|
||||
#: swift/account/reaper.py:303
|
||||
msgid ", return codes: "
|
||||
msgstr ""
|
||||
|
||||
#: swift/account/reaper.py:284
|
||||
#: swift/account/reaper.py:307
|
||||
#, python-format
|
||||
msgid ", elapsed: %.02fs"
|
||||
msgstr ""
|
||||
|
||||
#: swift/account/reaper.py:290
|
||||
#: swift/account/reaper.py:313
|
||||
#, python-format
|
||||
msgid "Account %s has not been reaped since %s"
|
||||
msgstr ""
|
||||
|
||||
#: swift/account/reaper.py:349 swift/account/reaper.py:399
|
||||
#: swift/account/reaper.py:469 swift/container/updater.py:307
|
||||
#: swift/account/reaper.py:372 swift/account/reaper.py:426
|
||||
#: swift/account/reaper.py:502 swift/container/updater.py:307
|
||||
#, python-format
|
||||
msgid "Exception with %(ip)s:%(port)s/%(device)s"
|
||||
msgstr ""
|
||||
|
||||
#: swift/account/reaper.py:369
|
||||
#: swift/account/reaper.py:379 swift/account/reaper.py:435
|
||||
#: swift/account/reaper.py:513
|
||||
#, python-format
|
||||
msgid "Timeout Exception with %(ip)s:%(port)s/%(device)s"
|
||||
msgstr ""
|
||||
|
||||
#: swift/account/reaper.py:396
|
||||
#, python-format
|
||||
msgid "Exception with objects for container %(container)s for account %(account)s"
|
||||
msgstr ""
|
||||
|
||||
#: swift/account/server.py:275 swift/container/server.py:586
|
||||
#: swift/obj/server.py:911
|
||||
#: swift/obj/server.py:944
|
||||
#, python-format
|
||||
msgid "ERROR __call__ error with %(method)s %(path)s "
|
||||
msgstr ""
|
||||
|
@ -194,74 +200,74 @@ msgstr ""
|
|||
msgid "ERROR reading HTTP response from %s"
|
||||
msgstr ""
|
||||
|
||||
#: swift/common/db_replicator.py:196
|
||||
#: swift/common/db_replicator.py:197
|
||||
#, python-format
|
||||
msgid "Attempted to replicate %(count)d dbs in %(time).5f seconds (%(rate).5f/s)"
|
||||
msgstr ""
|
||||
|
||||
#: swift/common/db_replicator.py:202
|
||||
#: swift/common/db_replicator.py:203
|
||||
#, python-format
|
||||
msgid "Removed %(remove)d dbs"
|
||||
msgstr ""
|
||||
|
||||
#: swift/common/db_replicator.py:203
|
||||
#: swift/common/db_replicator.py:204
|
||||
#, python-format
|
||||
msgid "%(success)s successes, %(failure)s failures"
|
||||
msgstr ""
|
||||
|
||||
#: swift/common/db_replicator.py:243
|
||||
#: swift/common/db_replicator.py:251
|
||||
#, python-format
|
||||
msgid "ERROR rsync failed with %(code)s: %(args)s"
|
||||
msgstr ""
|
||||
|
||||
#: swift/common/db_replicator.py:312
|
||||
#: swift/common/db_replicator.py:320
|
||||
#, python-format
|
||||
msgid "ERROR Bad response %(status)s from %(host)s"
|
||||
msgstr ""
|
||||
|
||||
#: swift/common/db_replicator.py:478 swift/common/db_replicator.py:721
|
||||
#: swift/common/db_replicator.py:486 swift/common/db_replicator.py:750
|
||||
#, python-format
|
||||
msgid "Quarantining DB %s"
|
||||
msgstr ""
|
||||
|
||||
#: swift/common/db_replicator.py:481
|
||||
#: swift/common/db_replicator.py:489
|
||||
#, python-format
|
||||
msgid "ERROR reading db %s"
|
||||
msgstr ""
|
||||
|
||||
#: swift/common/db_replicator.py:530
|
||||
#: swift/common/db_replicator.py:542
|
||||
#, python-format
|
||||
msgid "ERROR Remote drive not mounted %s"
|
||||
msgstr ""
|
||||
|
||||
#: swift/common/db_replicator.py:532
|
||||
#: swift/common/db_replicator.py:544
|
||||
#, python-format
|
||||
msgid "ERROR syncing %(file)s with node %(node)s"
|
||||
msgstr ""
|
||||
|
||||
#: swift/common/db_replicator.py:560
|
||||
#: swift/common/db_replicator.py:583
|
||||
#, python-format
|
||||
msgid "ERROR while trying to clean up %s"
|
||||
msgstr ""
|
||||
|
||||
#: swift/common/db_replicator.py:586
|
||||
#: swift/common/db_replicator.py:611
|
||||
msgid "ERROR Failed to get my own IPs?"
|
||||
msgstr ""
|
||||
|
||||
#: swift/common/db_replicator.py:596
|
||||
#: swift/common/db_replicator.py:625
|
||||
#, python-format
|
||||
msgid "Skipping %(device)s as it is not mounted"
|
||||
msgstr ""
|
||||
|
||||
#: swift/common/db_replicator.py:605
|
||||
#: swift/common/db_replicator.py:634
|
||||
msgid "Beginning replication run"
|
||||
msgstr ""
|
||||
|
||||
#: swift/common/db_replicator.py:610
|
||||
#: swift/common/db_replicator.py:639
|
||||
msgid "Replication run OVER"
|
||||
msgstr ""
|
||||
|
||||
#: swift/common/db_replicator.py:623
|
||||
#: swift/common/db_replicator.py:652
|
||||
msgid "ERROR trying to replicate"
|
||||
msgstr ""
|
||||
|
||||
|
@ -456,42 +462,42 @@ msgstr ""
|
|||
msgid "Unable to find %s config section in %s"
|
||||
msgstr ""
|
||||
|
||||
#: swift/common/utils.py:2373
|
||||
#: swift/common/utils.py:2376
|
||||
#, python-format
|
||||
msgid "Invalid X-Container-Sync-To format %r"
|
||||
msgstr ""
|
||||
|
||||
#: swift/common/utils.py:2378
|
||||
#: swift/common/utils.py:2381
|
||||
#, python-format
|
||||
msgid "No realm key for %r"
|
||||
msgstr ""
|
||||
|
||||
#: swift/common/utils.py:2382
|
||||
#: swift/common/utils.py:2385
|
||||
#, python-format
|
||||
msgid "No cluster endpoint for %r %r"
|
||||
msgstr ""
|
||||
|
||||
#: swift/common/utils.py:2391
|
||||
#: swift/common/utils.py:2394
|
||||
#, python-format
|
||||
msgid ""
|
||||
"Invalid scheme %r in X-Container-Sync-To, must be \"//\", \"http\", or "
|
||||
"\"https\"."
|
||||
msgstr ""
|
||||
|
||||
#: swift/common/utils.py:2395
|
||||
#: swift/common/utils.py:2398
|
||||
msgid "Path required in X-Container-Sync-To"
|
||||
msgstr ""
|
||||
|
||||
#: swift/common/utils.py:2398
|
||||
#: swift/common/utils.py:2401
|
||||
msgid "Params, queries, and fragments not allowed in X-Container-Sync-To"
|
||||
msgstr ""
|
||||
|
||||
#: swift/common/utils.py:2403
|
||||
#: swift/common/utils.py:2406
|
||||
#, python-format
|
||||
msgid "Invalid host %r in X-Container-Sync-To"
|
||||
msgstr ""
|
||||
|
||||
#: swift/common/utils.py:2595
|
||||
#: swift/common/utils.py:2598
|
||||
msgid "Exception dumping recon cache"
|
||||
msgstr ""
|
||||
|
||||
|
@ -541,27 +547,27 @@ msgstr ""
|
|||
msgid "Warning: Cannot ratelimit without a memcached client"
|
||||
msgstr ""
|
||||
|
||||
#: swift/common/middleware/recon.py:80
|
||||
#: swift/common/middleware/recon.py:81
|
||||
msgid "Error reading recon cache file"
|
||||
msgstr ""
|
||||
|
||||
#: swift/common/middleware/recon.py:82
|
||||
#: swift/common/middleware/recon.py:83
|
||||
msgid "Error parsing recon cache file"
|
||||
msgstr ""
|
||||
|
||||
#: swift/common/middleware/recon.py:84
|
||||
#: swift/common/middleware/recon.py:85
|
||||
msgid "Error retrieving recon data"
|
||||
msgstr ""
|
||||
|
||||
#: swift/common/middleware/recon.py:158
|
||||
#: swift/common/middleware/recon.py:159
|
||||
msgid "Error listing devices"
|
||||
msgstr ""
|
||||
|
||||
#: swift/common/middleware/recon.py:254
|
||||
#: swift/common/middleware/recon.py:255
|
||||
msgid "Error reading ringfile"
|
||||
msgstr ""
|
||||
|
||||
#: swift/common/middleware/recon.py:268
|
||||
#: swift/common/middleware/recon.py:269
|
||||
msgid "Error reading swift.conf"
|
||||
msgstr ""
|
||||
|
||||
|
@ -733,7 +739,7 @@ msgid "ERROR: Failed to get paths to drive partitions: %s"
|
|||
msgstr ""
|
||||
|
||||
#: swift/container/updater.py:92 swift/obj/reconstructor.py:812
|
||||
#: swift/obj/replicator.py:497 swift/obj/replicator.py:585
|
||||
#: swift/obj/replicator.py:580 swift/obj/replicator.py:692
|
||||
#, python-format
|
||||
msgid "%s is not mounted"
|
||||
msgstr ""
|
||||
|
@ -855,43 +861,43 @@ msgstr ""
|
|||
msgid "Quarantined %(hsh_path)s to %(quar_path)s because it is not a directory"
|
||||
msgstr ""
|
||||
|
||||
#: swift/obj/diskfile.py:700
|
||||
#: swift/obj/diskfile.py:702
|
||||
msgid "Error hashing suffix"
|
||||
msgstr ""
|
||||
|
||||
#: swift/obj/diskfile.py:821
|
||||
#: swift/obj/diskfile.py:823
|
||||
#, python-format
|
||||
msgid "Quarantined %(object_path)s to %(quar_path)s because it is not a directory"
|
||||
msgstr ""
|
||||
|
||||
#: swift/obj/diskfile.py:1035
|
||||
#: swift/obj/diskfile.py:1037
|
||||
#, python-format
|
||||
msgid "Problem cleaning up %s"
|
||||
msgstr ""
|
||||
|
||||
#: swift/obj/diskfile.py:1342
|
||||
#: swift/obj/diskfile.py:1344
|
||||
#, python-format
|
||||
msgid "ERROR DiskFile %(data_file)s close failure: %(exc)s : %(stack)s"
|
||||
msgstr ""
|
||||
|
||||
#: swift/obj/diskfile.py:1612
|
||||
#: swift/obj/diskfile.py:1618
|
||||
#, python-format
|
||||
msgid ""
|
||||
"Client path %(client)s does not match path stored in object metadata "
|
||||
"%(meta)s"
|
||||
msgstr ""
|
||||
|
||||
#: swift/obj/diskfile.py:2048
|
||||
#: swift/obj/diskfile.py:2054
|
||||
#, python-format
|
||||
msgid "No space left on device for %s (%s)"
|
||||
msgstr ""
|
||||
|
||||
#: swift/obj/diskfile.py:2057
|
||||
#: swift/obj/diskfile.py:2063
|
||||
#, python-format
|
||||
msgid "Problem cleaning up %s (%s)"
|
||||
msgstr ""
|
||||
|
||||
#: swift/obj/diskfile.py:2060
|
||||
#: swift/obj/diskfile.py:2066
|
||||
#, python-format
|
||||
msgid "Problem writing durable state file %s (%s)"
|
||||
msgstr ""
|
||||
|
@ -948,14 +954,14 @@ msgid ""
|
|||
"%(time).2fs (%(rate).2f/sec, %(remaining)s remaining)"
|
||||
msgstr ""
|
||||
|
||||
#: swift/obj/reconstructor.py:369 swift/obj/replicator.py:429
|
||||
#: swift/obj/reconstructor.py:369 swift/obj/replicator.py:504
|
||||
#, python-format
|
||||
msgid ""
|
||||
"%(checked)d suffixes checked - %(hashed).2f%% hashed, %(synced).2f%% "
|
||||
"synced"
|
||||
msgstr ""
|
||||
|
||||
#: swift/obj/reconstructor.py:376 swift/obj/replicator.py:436
|
||||
#: swift/obj/reconstructor.py:376 swift/obj/replicator.py:511
|
||||
#, python-format
|
||||
msgid "Partition times: max %(max).4fs, min %(min).4fs, med %(med).4fs"
|
||||
msgstr ""
|
||||
|
@ -965,7 +971,7 @@ msgstr ""
|
|||
msgid "Nothing reconstructed for %s seconds."
|
||||
msgstr ""
|
||||
|
||||
#: swift/obj/reconstructor.py:413 swift/obj/replicator.py:473
|
||||
#: swift/obj/reconstructor.py:413 swift/obj/replicator.py:548
|
||||
msgid "Lockup detected.. killing live coros."
|
||||
msgstr ""
|
||||
|
||||
|
@ -979,7 +985,7 @@ msgstr ""
|
|||
msgid "%s responded as unmounted"
|
||||
msgstr ""
|
||||
|
||||
#: swift/obj/reconstructor.py:883 swift/obj/replicator.py:305
|
||||
#: swift/obj/reconstructor.py:883 swift/obj/replicator.py:357
|
||||
#, python-format
|
||||
msgid "Removing partition: %s"
|
||||
msgstr ""
|
||||
|
@ -1014,106 +1020,106 @@ msgstr ""
|
|||
msgid "Object reconstruction complete. (%.02f minutes)"
|
||||
msgstr ""
|
||||
|
||||
#: swift/obj/replicator.py:144
|
||||
#: swift/obj/replicator.py:173
|
||||
#, python-format
|
||||
msgid "Killing long-running rsync: %s"
|
||||
msgstr ""
|
||||
|
||||
#: swift/obj/replicator.py:158
|
||||
#: swift/obj/replicator.py:187
|
||||
#, python-format
|
||||
msgid "Bad rsync return code: %(ret)d <- %(args)s"
|
||||
msgstr ""
|
||||
|
||||
#: swift/obj/replicator.py:165 swift/obj/replicator.py:169
|
||||
#: swift/obj/replicator.py:194 swift/obj/replicator.py:198
|
||||
#, python-format
|
||||
msgid "Successful rsync of %(src)s at %(dst)s (%(time).03f)"
|
||||
msgstr ""
|
||||
|
||||
#: swift/obj/replicator.py:291
|
||||
#: swift/obj/replicator.py:327
|
||||
#, python-format
|
||||
msgid "Removing %s objects"
|
||||
msgstr ""
|
||||
|
||||
#: swift/obj/replicator.py:299
|
||||
#: swift/obj/replicator.py:346
|
||||
msgid "Error syncing handoff partition"
|
||||
msgstr ""
|
||||
|
||||
#: swift/obj/replicator.py:361
|
||||
#: swift/obj/replicator.py:422
|
||||
#, python-format
|
||||
msgid "%(ip)s/%(device)s responded as unmounted"
|
||||
msgstr ""
|
||||
|
||||
#: swift/obj/replicator.py:366
|
||||
#: swift/obj/replicator.py:429
|
||||
#, python-format
|
||||
msgid "Invalid response %(resp)s from %(ip)s"
|
||||
msgstr ""
|
||||
|
||||
#: swift/obj/replicator.py:401
|
||||
#: swift/obj/replicator.py:473
|
||||
#, python-format
|
||||
msgid "Error syncing with node: %s"
|
||||
msgstr ""
|
||||
|
||||
#: swift/obj/replicator.py:405
|
||||
#: swift/obj/replicator.py:478
|
||||
msgid "Error syncing partition"
|
||||
msgstr ""
|
||||
|
||||
#: swift/obj/replicator.py:418
|
||||
#: swift/obj/replicator.py:493
|
||||
#, python-format
|
||||
msgid ""
|
||||
"%(replicated)d/%(total)d (%(percentage).2f%%) partitions replicated in "
|
||||
"%(time).2fs (%(rate).2f/sec, %(remaining)s remaining)"
|
||||
msgstr ""
|
||||
|
||||
#: swift/obj/replicator.py:444
|
||||
#: swift/obj/replicator.py:519
|
||||
#, python-format
|
||||
msgid "Nothing replicated for %s seconds."
|
||||
msgstr ""
|
||||
|
||||
#: swift/obj/replicator.py:588
|
||||
#: swift/obj/replicator.py:695
|
||||
msgid "Ring change detected. Aborting current replication pass."
|
||||
msgstr ""
|
||||
|
||||
#: swift/obj/replicator.py:609
|
||||
#: swift/obj/replicator.py:723
|
||||
msgid "Exception in top-level replication loop"
|
||||
msgstr ""
|
||||
|
||||
#: swift/obj/replicator.py:618
|
||||
#: swift/obj/replicator.py:733
|
||||
msgid "Running object replicator in script mode."
|
||||
msgstr ""
|
||||
|
||||
#: swift/obj/replicator.py:636
|
||||
#: swift/obj/replicator.py:751
|
||||
#, python-format
|
||||
msgid "Object replication complete (once). (%.02f minutes)"
|
||||
msgstr ""
|
||||
|
||||
#: swift/obj/replicator.py:643
|
||||
#: swift/obj/replicator.py:762
|
||||
msgid "Starting object replicator in daemon mode."
|
||||
msgstr ""
|
||||
|
||||
#: swift/obj/replicator.py:647
|
||||
#: swift/obj/replicator.py:766
|
||||
msgid "Starting object replication pass."
|
||||
msgstr ""
|
||||
|
||||
#: swift/obj/replicator.py:652
|
||||
#: swift/obj/replicator.py:771
|
||||
#, python-format
|
||||
msgid "Object replication complete. (%.02f minutes)"
|
||||
msgstr ""
|
||||
|
||||
#: swift/obj/server.py:231
|
||||
#: swift/obj/server.py:240
|
||||
#, python-format
|
||||
msgid ""
|
||||
"ERROR Container update failed (saving for async update later): %(status)d"
|
||||
" response from %(ip)s:%(port)s/%(dev)s"
|
||||
msgstr ""
|
||||
|
||||
#: swift/obj/server.py:238
|
||||
#: swift/obj/server.py:247
|
||||
#, python-format
|
||||
msgid ""
|
||||
"ERROR container update failed with %(ip)s:%(port)s/%(dev)s (saving for "
|
||||
"async update later)"
|
||||
msgstr ""
|
||||
|
||||
#: swift/obj/server.py:273
|
||||
#: swift/obj/server.py:282
|
||||
#, python-format
|
||||
msgid ""
|
||||
"ERROR Container update failed: different numbers of hosts and devices in "
|
||||
|
@ -1167,21 +1173,21 @@ msgstr ""
|
|||
msgid "ERROR with remote server %(ip)s:%(port)s/%(device)s"
|
||||
msgstr ""
|
||||
|
||||
#: swift/proxy/server.py:405
|
||||
#: swift/proxy/server.py:414
|
||||
msgid "ERROR Unhandled exception in request"
|
||||
msgstr ""
|
||||
|
||||
#: swift/proxy/server.py:460
|
||||
#: swift/proxy/server.py:469
|
||||
#, python-format
|
||||
msgid "Node error limited %(ip)s:%(port)s (%(device)s)"
|
||||
msgstr ""
|
||||
|
||||
#: swift/proxy/server.py:477 swift/proxy/server.py:495
|
||||
#: swift/proxy/server.py:486 swift/proxy/server.py:504
|
||||
#, python-format
|
||||
msgid "%(msg)s %(ip)s:%(port)s/%(device)s"
|
||||
msgstr ""
|
||||
|
||||
#: swift/proxy/server.py:571
|
||||
#: swift/proxy/server.py:527
|
||||
#, python-format
|
||||
msgid "ERROR with %(type)s server %(ip)s:%(port)s/%(device)s re: %(info)s"
|
||||
msgstr ""
|
||||
|
@ -1190,129 +1196,129 @@ msgstr ""
|
|||
msgid "Account"
|
||||
msgstr ""
|
||||
|
||||
#: swift/proxy/controllers/base.py:797 swift/proxy/controllers/base.py:836
|
||||
#: swift/proxy/controllers/base.py:928 swift/proxy/controllers/obj.py:361
|
||||
#: swift/proxy/controllers/obj.py:581 swift/proxy/controllers/obj.py:1021
|
||||
#: swift/proxy/controllers/obj.py:1068 swift/proxy/controllers/obj.py:1082
|
||||
#: swift/proxy/controllers/obj.py:1889 swift/proxy/controllers/obj.py:2126
|
||||
#: swift/proxy/controllers/obj.py:2254 swift/proxy/controllers/obj.py:2439
|
||||
#: swift/proxy/controllers/base.py:803 swift/proxy/controllers/base.py:842
|
||||
#: swift/proxy/controllers/base.py:935 swift/proxy/controllers/obj.py:320
|
||||
#: swift/proxy/controllers/obj.py:847 swift/proxy/controllers/obj.py:894
|
||||
#: swift/proxy/controllers/obj.py:908 swift/proxy/controllers/obj.py:1717
|
||||
#: swift/proxy/controllers/obj.py:1954 swift/proxy/controllers/obj.py:2079
|
||||
#: swift/proxy/controllers/obj.py:2264
|
||||
msgid "Object"
|
||||
msgstr ""
|
||||
|
||||
#: swift/proxy/controllers/base.py:798 swift/proxy/controllers/base.py:837
|
||||
#: swift/proxy/controllers/base.py:804 swift/proxy/controllers/base.py:843
|
||||
msgid "Trying to read during GET (retrying)"
|
||||
msgstr ""
|
||||
|
||||
#: swift/proxy/controllers/base.py:929
|
||||
#: swift/proxy/controllers/base.py:936
|
||||
msgid "Trying to read during GET"
|
||||
msgstr ""
|
||||
|
||||
#: swift/proxy/controllers/base.py:933
|
||||
#: swift/proxy/controllers/base.py:940
|
||||
#, python-format
|
||||
msgid "Client did not read from proxy within %ss"
|
||||
msgstr ""
|
||||
|
||||
#: swift/proxy/controllers/base.py:938
|
||||
#: swift/proxy/controllers/base.py:945
|
||||
msgid "Client disconnected on read"
|
||||
msgstr ""
|
||||
|
||||
#: swift/proxy/controllers/base.py:940
|
||||
#: swift/proxy/controllers/base.py:947
|
||||
msgid "Trying to send to client"
|
||||
msgstr ""
|
||||
|
||||
#: swift/proxy/controllers/base.py:991 swift/proxy/controllers/base.py:1310
|
||||
#: swift/proxy/controllers/base.py:998 swift/proxy/controllers/base.py:1410
|
||||
#, python-format
|
||||
msgid "Trying to %(method)s %(path)s"
|
||||
msgstr ""
|
||||
|
||||
#: swift/proxy/controllers/base.py:1030 swift/proxy/controllers/base.py:1298
|
||||
#: swift/proxy/controllers/obj.py:384 swift/proxy/controllers/obj.py:1059
|
||||
#: swift/proxy/controllers/obj.py:2246 swift/proxy/controllers/obj.py:2484
|
||||
#: swift/proxy/controllers/base.py:1037 swift/proxy/controllers/base.py:1398
|
||||
#: swift/proxy/controllers/obj.py:343 swift/proxy/controllers/obj.py:885
|
||||
#: swift/proxy/controllers/obj.py:2071 swift/proxy/controllers/obj.py:2309
|
||||
msgid "ERROR Insufficient Storage"
|
||||
msgstr ""
|
||||
|
||||
#: swift/proxy/controllers/base.py:1033
|
||||
#: swift/proxy/controllers/base.py:1040
|
||||
#, python-format
|
||||
msgid "ERROR %(status)d %(body)s From %(type)s Server"
|
||||
msgstr ""
|
||||
|
||||
#: swift/proxy/controllers/base.py:1301
|
||||
#: swift/proxy/controllers/base.py:1401
|
||||
#, python-format
|
||||
msgid "ERROR %(status)d Trying to %(method)s %(path)sFrom Container Server"
|
||||
msgstr ""
|
||||
|
||||
#: swift/proxy/controllers/base.py:1431
|
||||
#: swift/proxy/controllers/base.py:1531
|
||||
#, python-format
|
||||
msgid "%(type)s returning 503 for %(statuses)s"
|
||||
msgstr ""
|
||||
|
||||
#: swift/proxy/controllers/container.py:98 swift/proxy/controllers/obj.py:163
|
||||
#: swift/proxy/controllers/container.py:98
|
||||
msgid "Container"
|
||||
msgstr ""
|
||||
|
||||
#: swift/proxy/controllers/obj.py:362
|
||||
#: swift/proxy/controllers/obj.py:321
|
||||
#, python-format
|
||||
msgid "Trying to get final status of PUT to %s"
|
||||
msgstr ""
|
||||
|
||||
#: swift/proxy/controllers/obj.py:388 swift/proxy/controllers/obj.py:2489
|
||||
#: swift/proxy/controllers/obj.py:347 swift/proxy/controllers/obj.py:2314
|
||||
#, python-format
|
||||
msgid "ERROR %(status)d %(body)s From Object Server re: %(path)s"
|
||||
msgstr ""
|
||||
|
||||
#: swift/proxy/controllers/obj.py:654
|
||||
#: swift/proxy/controllers/obj.py:548
|
||||
#, python-format
|
||||
msgid "Object PUT returning 412, %(statuses)r"
|
||||
msgstr ""
|
||||
|
||||
#: swift/proxy/controllers/obj.py:663
|
||||
#: swift/proxy/controllers/obj.py:561
|
||||
#, python-format
|
||||
msgid "Object PUT returning 202 for 409: %(req_timestamp)s <= %(timestamps)r"
|
||||
msgstr ""
|
||||
|
||||
#: swift/proxy/controllers/obj.py:1063 swift/proxy/controllers/obj.py:2249
|
||||
#: swift/proxy/controllers/obj.py:889 swift/proxy/controllers/obj.py:2074
|
||||
#, python-format
|
||||
msgid "ERROR %(status)d Expect: 100-continue From Object Server"
|
||||
msgstr ""
|
||||
|
||||
#: swift/proxy/controllers/obj.py:1069 swift/proxy/controllers/obj.py:2255
|
||||
#: swift/proxy/controllers/obj.py:895 swift/proxy/controllers/obj.py:2080
|
||||
#, python-format
|
||||
msgid "Expect: 100-continue on %s"
|
||||
msgstr ""
|
||||
|
||||
#: swift/proxy/controllers/obj.py:1083 swift/proxy/controllers/obj.py:1890
|
||||
#: swift/proxy/controllers/obj.py:909 swift/proxy/controllers/obj.py:1718
|
||||
#, python-format
|
||||
msgid "Trying to write to %s"
|
||||
msgstr ""
|
||||
|
||||
#: swift/proxy/controllers/obj.py:1134 swift/proxy/controllers/obj.py:2394
|
||||
#: swift/proxy/controllers/obj.py:960 swift/proxy/controllers/obj.py:2219
|
||||
#, python-format
|
||||
msgid "ERROR Client read timeout (%ss)"
|
||||
msgstr ""
|
||||
|
||||
#: swift/proxy/controllers/obj.py:1141 swift/proxy/controllers/obj.py:2401
|
||||
#: swift/proxy/controllers/obj.py:967 swift/proxy/controllers/obj.py:2226
|
||||
msgid "ERROR Exception causing client disconnect"
|
||||
msgstr ""
|
||||
|
||||
#: swift/proxy/controllers/obj.py:1146 swift/proxy/controllers/obj.py:2406
|
||||
#: swift/proxy/controllers/obj.py:972 swift/proxy/controllers/obj.py:2231
|
||||
msgid "Client disconnected without sending enough data"
|
||||
msgstr ""
|
||||
|
||||
#: swift/proxy/controllers/obj.py:1192
|
||||
#: swift/proxy/controllers/obj.py:1018
|
||||
#, python-format
|
||||
msgid "Object servers returned %s mismatched etags"
|
||||
msgstr ""
|
||||
|
||||
#: swift/proxy/controllers/obj.py:1196 swift/proxy/controllers/obj.py:2569
|
||||
#: swift/proxy/controllers/obj.py:1022 swift/proxy/controllers/obj.py:2393
|
||||
msgid "Object PUT"
|
||||
msgstr ""
|
||||
|
||||
#: swift/proxy/controllers/obj.py:2381
|
||||
#: swift/proxy/controllers/obj.py:2206
|
||||
#, python-format
|
||||
msgid "Not enough object servers ack'ed (got %d)"
|
||||
msgstr ""
|
||||
|
||||
#: swift/proxy/controllers/obj.py:2440
|
||||
#: swift/proxy/controllers/obj.py:2265
|
||||
#, python-format
|
||||
msgid "Trying to get %s status of PUT to %s"
|
||||
msgstr ""
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -2166,9 +2166,14 @@ class ECDiskFile(BaseDiskFile):
|
|||
|
||||
:param timestamp: the object timestamp, an instance of
|
||||
:class:`~swift.common.utils.Timestamp`
|
||||
:param frag_index: a fragment archive index, must be a whole number.
|
||||
:param frag_index: fragment archive index, must be
|
||||
a whole number or None.
|
||||
"""
|
||||
for ext in ('.data', '.ts'):
|
||||
exts = ['.ts']
|
||||
# when frag_index is None it's not possible to build a data file name
|
||||
if frag_index is not None:
|
||||
exts.append('.data')
|
||||
for ext in exts:
|
||||
purge_file = self.manager.make_on_disk_filename(
|
||||
timestamp, ext=ext, frag_index=frag_index)
|
||||
remove_file(os.path.join(self._datadir, purge_file))
|
||||
|
|
|
@ -36,7 +36,8 @@ from swift.common.bufferedhttp import http_connect
|
|||
from swift.common.daemon import Daemon
|
||||
from swift.common.ring.utils import is_local_device
|
||||
from swift.obj.ssync_sender import Sender as ssync_sender
|
||||
from swift.common.http import HTTP_OK, HTTP_INSUFFICIENT_STORAGE
|
||||
from swift.common.http import HTTP_OK, HTTP_NOT_FOUND, \
|
||||
HTTP_INSUFFICIENT_STORAGE
|
||||
from swift.obj.diskfile import DiskFileRouter, get_data_dir, \
|
||||
get_tmp_dir
|
||||
from swift.common.storage_policy import POLICIES, EC_POLICY
|
||||
|
@ -203,12 +204,14 @@ class ObjectReconstructor(Daemon):
|
|||
part, 'GET', path, headers=headers)
|
||||
with Timeout(self.node_timeout):
|
||||
resp = conn.getresponse()
|
||||
if resp.status != HTTP_OK:
|
||||
if resp.status not in [HTTP_OK, HTTP_NOT_FOUND]:
|
||||
self.logger.warning(
|
||||
_("Invalid response %(resp)s from %(full_path)s"),
|
||||
{'resp': resp.status,
|
||||
'full_path': self._full_path(node, part, path, policy)})
|
||||
resp = None
|
||||
elif resp.status == HTTP_NOT_FOUND:
|
||||
resp = None
|
||||
except (Exception, Timeout):
|
||||
self.logger.exception(
|
||||
_("Trying to GET %(full_path)s"), {
|
||||
|
@ -238,9 +241,8 @@ class ObjectReconstructor(Daemon):
|
|||
fi_to_rebuild = node['index']
|
||||
|
||||
# KISS send out connection requests to all nodes, see what sticks
|
||||
headers = {
|
||||
'X-Backend-Storage-Policy-Index': int(job['policy']),
|
||||
}
|
||||
headers = self.headers.copy()
|
||||
headers['X-Backend-Storage-Policy-Index'] = int(job['policy'])
|
||||
pile = GreenAsyncPile(len(part_nodes))
|
||||
path = metadata['name']
|
||||
for node in part_nodes:
|
||||
|
@ -542,6 +544,9 @@ class ObjectReconstructor(Daemon):
|
|||
frag_index=frag_index)
|
||||
df.purge(Timestamp(timestamp), frag_index)
|
||||
except DiskFileError:
|
||||
self.logger.exception(
|
||||
'Unable to purge DiskFile (%r %r %r)',
|
||||
object_hash, timestamp, frag_index)
|
||||
continue
|
||||
|
||||
def process_job(self, job):
|
||||
|
|
|
@ -31,8 +31,8 @@ from eventlet.support.greenlets import GreenletExit
|
|||
from swift.common.ring.utils import is_local_device
|
||||
from swift.common.utils import whataremyips, unlink_older_than, \
|
||||
compute_eta, get_logger, dump_recon_cache, ismount, \
|
||||
rsync_ip, mkdirs, config_true_value, list_from_csv, get_hub, \
|
||||
tpool_reraise, config_auto_int_value, storage_directory
|
||||
rsync_module_interpolation, mkdirs, config_true_value, list_from_csv, \
|
||||
get_hub, tpool_reraise, config_auto_int_value, storage_directory
|
||||
from swift.common.bufferedhttp import http_connect
|
||||
from swift.common.daemon import Daemon
|
||||
from swift.common.http import HTTP_OK, HTTP_INSUFFICIENT_STORAGE
|
||||
|
@ -62,7 +62,6 @@ class ObjectReplicator(Daemon):
|
|||
self.logger = logger or get_logger(conf, log_route='object-replicator')
|
||||
self.devices_dir = conf.get('devices', '/srv/node')
|
||||
self.mount_check = config_true_value(conf.get('mount_check', 'true'))
|
||||
self.vm_test_mode = config_true_value(conf.get('vm_test_mode', 'no'))
|
||||
self.swift_dir = conf.get('swift_dir', '/etc/swift')
|
||||
self.bind_ip = conf.get('bind_ip', '0.0.0.0')
|
||||
self.servers_per_port = int(conf.get('servers_per_port', '0') or 0)
|
||||
|
@ -81,6 +80,15 @@ class ObjectReplicator(Daemon):
|
|||
self.rsync_bwlimit = conf.get('rsync_bwlimit', '0')
|
||||
self.rsync_compress = config_true_value(
|
||||
conf.get('rsync_compress', 'no'))
|
||||
self.rsync_module = conf.get('rsync_module', '').rstrip('/')
|
||||
if not self.rsync_module:
|
||||
self.rsync_module = '{replication_ip}::object'
|
||||
if config_true_value(conf.get('vm_test_mode', 'no')):
|
||||
self.logger.warn('Option object-replicator/vm_test_mode is '
|
||||
'deprecated and will be removed in a future '
|
||||
'version. Update your configuration to use '
|
||||
'option object-replicator/rsync_module.')
|
||||
self.rsync_module += '{replication_port}'
|
||||
self.http_timeout = int(conf.get('http_timeout', 60))
|
||||
self.lockup_timeout = int(conf.get('lockup_timeout', 1800))
|
||||
self.recon_cache_path = conf.get('recon_cache_path',
|
||||
|
@ -223,11 +231,7 @@ class ObjectReplicator(Daemon):
|
|||
# Allow for compression, but only if the remote node is in
|
||||
# a different region than the local one.
|
||||
args.append('--compress')
|
||||
node_ip = rsync_ip(node['replication_ip'])
|
||||
if self.vm_test_mode:
|
||||
rsync_module = '%s::object%s' % (node_ip, node['replication_port'])
|
||||
else:
|
||||
rsync_module = '%s::object' % node_ip
|
||||
rsync_module = rsync_module_interpolation(self.rsync_module, node)
|
||||
had_any = False
|
||||
for suffix in suffixes:
|
||||
spath = join(job['path'], suffix)
|
||||
|
|
|
@ -156,13 +156,24 @@ class Receiver(object):
|
|||
self.request.environ['eventlet.minimum_write_chunk_size'] = 0
|
||||
self.device, self.partition, self.policy = \
|
||||
request_helpers.get_name_and_placement(self.request, 2, 2, False)
|
||||
|
||||
self.frag_index = self.node_index = None
|
||||
if self.request.headers.get('X-Backend-Ssync-Frag-Index'):
|
||||
self.frag_index = int(
|
||||
self.request.headers['X-Backend-Ssync-Frag-Index'])
|
||||
try:
|
||||
self.frag_index = int(
|
||||
self.request.headers['X-Backend-Ssync-Frag-Index'])
|
||||
except ValueError:
|
||||
raise swob.HTTPBadRequest(
|
||||
'Invalid X-Backend-Ssync-Frag-Index %r' %
|
||||
self.request.headers['X-Backend-Ssync-Frag-Index'])
|
||||
if self.request.headers.get('X-Backend-Ssync-Node-Index'):
|
||||
self.node_index = int(
|
||||
self.request.headers['X-Backend-Ssync-Node-Index'])
|
||||
try:
|
||||
self.node_index = int(
|
||||
self.request.headers['X-Backend-Ssync-Node-Index'])
|
||||
except ValueError:
|
||||
raise swob.HTTPBadRequest(
|
||||
'Invalid X-Backend-Ssync-Node-Index %r' %
|
||||
self.request.headers['X-Backend-Ssync-Node-Index'])
|
||||
if self.node_index != self.frag_index:
|
||||
# a primary node should only receive it's own fragments
|
||||
raise swob.HTTPBadRequest(
|
||||
|
|
|
@ -133,9 +133,16 @@ class Sender(object):
|
|||
# a sync job must use the node's index for the frag_index of the
|
||||
# rebuilt fragments instead of the frag_index from the job which
|
||||
# will be rebuilding them
|
||||
self.connection.putheader(
|
||||
'X-Backend-Ssync-Frag-Index', self.node.get(
|
||||
'index', self.job.get('frag_index', '')))
|
||||
frag_index = self.node.get('index', self.job.get('frag_index'))
|
||||
if frag_index is None:
|
||||
# replication jobs will not have a frag_index key;
|
||||
# reconstructor jobs with only tombstones will have a
|
||||
# frag_index key explicitly set to the value of None - in both
|
||||
# cases on the wire we write the empty string which
|
||||
# ssync_receiver will translate to None
|
||||
frag_index = ''
|
||||
self.connection.putheader('X-Backend-Ssync-Frag-Index',
|
||||
frag_index)
|
||||
# a revert job to a handoff will not have a node index
|
||||
self.connection.putheader('X-Backend-Ssync-Node-Index',
|
||||
self.node.get('index', ''))
|
||||
|
@ -144,10 +151,10 @@ class Sender(object):
|
|||
self.daemon.node_timeout, 'connect receive'):
|
||||
self.response = self.connection.getresponse()
|
||||
if self.response.status != http.HTTP_OK:
|
||||
self.response.read()
|
||||
err_msg = self.response.read()[:1024]
|
||||
raise exceptions.ReplicationException(
|
||||
'Expected status %s; got %s' %
|
||||
(http.HTTP_OK, self.response.status))
|
||||
'Expected status %s; got %s (%s)' %
|
||||
(http.HTTP_OK, self.response.status, err_msg))
|
||||
|
||||
def readline(self):
|
||||
"""
|
||||
|
|
|
@ -1018,3 +1018,26 @@ class File(Base):
|
|||
raise ResponseError(self.conn.response)
|
||||
self.md5 = self.compute_md5sum(six.StringIO(data))
|
||||
return resp
|
||||
|
||||
def post(self, hdrs=None, parms=None, cfg=None, return_resp=False):
|
||||
if hdrs is None:
|
||||
hdrs = {}
|
||||
if parms is None:
|
||||
parms = {}
|
||||
if cfg is None:
|
||||
cfg = {}
|
||||
|
||||
headers = self.make_headers(cfg=cfg)
|
||||
headers.update(hdrs)
|
||||
|
||||
self.conn.make_request('POST', self.path, hdrs=headers,
|
||||
parms=parms, cfg=cfg)
|
||||
|
||||
if self.conn.response.status not in (201, 202):
|
||||
raise ResponseError(self.conn.response, 'POST',
|
||||
self.conn.make_path(self.path))
|
||||
|
||||
if return_resp:
|
||||
return self.conn.response
|
||||
|
||||
return True
|
||||
|
|
|
@ -3197,6 +3197,22 @@ class TestTempurl(Base):
|
|||
else:
|
||||
self.fail('request did not error')
|
||||
|
||||
# try again using a tempurl POST to an already created object
|
||||
new_obj.write('', {}, parms=put_parms, cfg={'no_auth_token': True})
|
||||
expires = int(time.time()) + 86400
|
||||
sig = self.tempurl_sig(
|
||||
'POST', expires, self.env.conn.make_path(new_obj.path),
|
||||
self.env.tempurl_key)
|
||||
post_parms = {'temp_url_sig': sig,
|
||||
'temp_url_expires': str(expires)}
|
||||
try:
|
||||
new_obj.post({'x-object-manifest': '%s/foo' % other_container},
|
||||
parms=post_parms, cfg={'no_auth_token': True})
|
||||
except ResponseError as e:
|
||||
self.assertEqual(e.status, 400)
|
||||
else:
|
||||
self.fail('request did not error')
|
||||
|
||||
def test_HEAD(self):
|
||||
expires = int(time.time()) + 86400
|
||||
sig = self.tempurl_sig(
|
||||
|
|
|
@ -29,7 +29,8 @@ from six.moves.http_client import HTTPConnection
|
|||
from swiftclient import get_auth, head_account
|
||||
from swift.obj.diskfile import get_data_dir
|
||||
from swift.common.ring import Ring
|
||||
from swift.common.utils import readconf, renamer
|
||||
from swift.common.utils import readconf, renamer, \
|
||||
config_true_value, rsync_module_interpolation
|
||||
from swift.common.manager import Manager
|
||||
from swift.common.storage_policy import POLICIES, EC_POLICY, REPL_POLICY
|
||||
|
||||
|
@ -219,11 +220,12 @@ def get_ring(ring_name, required_replicas, required_devices,
|
|||
"unable to find ring device %s under %s's devices (%s)" % (
|
||||
dev['device'], server, conf['devices']))
|
||||
# verify server is exposing rsync device
|
||||
if conf.get('vm_test_mode', False):
|
||||
rsync_export = '%s%s' % (server, dev['replication_port'])
|
||||
else:
|
||||
rsync_export = server
|
||||
cmd = "rsync rsync://localhost/%s" % rsync_export
|
||||
rsync_export = conf.get('rsync_module', '').rstrip('/')
|
||||
if not rsync_export:
|
||||
rsync_export = '{replication_ip}::%s' % server
|
||||
if config_true_value(conf.get('vm_test_mode', 'no')):
|
||||
rsync_export += '{replication_port}'
|
||||
cmd = "rsync %s" % rsync_module_interpolation(rsync_export, dev)
|
||||
p = Popen(cmd, shell=True, stdout=PIPE)
|
||||
stdout, _stderr = p.communicate()
|
||||
if p.returncode:
|
||||
|
|
|
@ -15,6 +15,7 @@
|
|||
# limitations under the License.
|
||||
|
||||
from hashlib import md5
|
||||
import itertools
|
||||
import unittest
|
||||
import uuid
|
||||
import random
|
||||
|
@ -94,7 +95,7 @@ class TestReconstructorRevert(ECProbeTest):
|
|||
self.object_name, headers=headers_post)
|
||||
del headers_post['X-Auth-Token'] # WTF, where did this come from?
|
||||
|
||||
# these primaries can't servce the data any more, we expect 507
|
||||
# these primaries can't serve the data any more, we expect 507
|
||||
# here and not 404 because we're using mount_check to kill nodes
|
||||
for onode in (onodes[0], onodes[1]):
|
||||
try:
|
||||
|
@ -102,7 +103,7 @@ class TestReconstructorRevert(ECProbeTest):
|
|||
except direct_client.DirectClientException as err:
|
||||
self.assertEqual(err.http_status, 507)
|
||||
else:
|
||||
self.fail('Node data on %r was not fully destoryed!' %
|
||||
self.fail('Node data on %r was not fully destroyed!' %
|
||||
(onode,))
|
||||
|
||||
# now take out another primary
|
||||
|
@ -115,7 +116,7 @@ class TestReconstructorRevert(ECProbeTest):
|
|||
except direct_client.DirectClientException as err:
|
||||
self.assertEqual(err.http_status, 507)
|
||||
else:
|
||||
self.fail('Node data on %r was not fully destoryed!' %
|
||||
self.fail('Node data on %r was not fully destroyed!' %
|
||||
(onode,))
|
||||
|
||||
# make sure we can still GET the object and its correct
|
||||
|
@ -152,10 +153,10 @@ class TestReconstructorRevert(ECProbeTest):
|
|||
except direct_client.DirectClientException as err:
|
||||
self.assertEqual(err.http_status, 404)
|
||||
else:
|
||||
self.fail('Node data on %r was not fully destoryed!' %
|
||||
self.fail('Node data on %r was not fully destroyed!' %
|
||||
(hnode,))
|
||||
|
||||
def test_delete_propogate(self):
|
||||
def test_delete_propagate(self):
|
||||
# create EC container
|
||||
headers = {'X-Storage-Policy': self.policy.name}
|
||||
client.put_container(self.url, self.token, self.container_name,
|
||||
|
@ -164,56 +165,95 @@ class TestReconstructorRevert(ECProbeTest):
|
|||
# get our node lists
|
||||
opart, onodes = self.object_ring.get_nodes(
|
||||
self.account, self.container_name, self.object_name)
|
||||
hnodes = self.object_ring.get_more_nodes(opart)
|
||||
p_dev2 = self.device_dir('object', onodes[1])
|
||||
hnodes = list(itertools.islice(
|
||||
self.object_ring.get_more_nodes(opart), 2))
|
||||
|
||||
# PUT object
|
||||
contents = Body()
|
||||
client.put_object(self.url, self.token, self.container_name,
|
||||
self.object_name, contents=contents)
|
||||
|
||||
# now lets shut one down
|
||||
self.kill_drive(p_dev2)
|
||||
# now lets shut down a couple primaries
|
||||
failed_nodes = random.sample(onodes, 2)
|
||||
for node in failed_nodes:
|
||||
self.kill_drive(self.device_dir('object', node))
|
||||
|
||||
# delete on the ones that are left
|
||||
# Write tombstones over the nodes that are still online
|
||||
client.delete_object(self.url, self.token,
|
||||
self.container_name,
|
||||
self.object_name)
|
||||
|
||||
# spot check a node
|
||||
# spot check the primary nodes that are still online
|
||||
delete_timestamp = None
|
||||
for node in onodes:
|
||||
if node in failed_nodes:
|
||||
continue
|
||||
try:
|
||||
self.direct_get(node, opart)
|
||||
except direct_client.DirectClientException as err:
|
||||
self.assertEqual(err.http_status, 404)
|
||||
delete_timestamp = err.http_headers['X-Backend-Timestamp']
|
||||
else:
|
||||
self.fail('Node data on %r was not fully destroyed!' %
|
||||
(node,))
|
||||
|
||||
# repair the first primary
|
||||
self.revive_drive(self.device_dir('object', failed_nodes[0]))
|
||||
|
||||
# run the reconstructor on the *second* handoff node
|
||||
self.reconstructor.once(number=self.config_number(hnodes[1]))
|
||||
|
||||
# make sure it's tombstone was pushed out
|
||||
try:
|
||||
self.direct_get(onodes[0], opart)
|
||||
self.direct_get(hnodes[1], opart)
|
||||
except direct_client.DirectClientException as err:
|
||||
self.assertEqual(err.http_status, 404)
|
||||
self.assertNotIn('X-Backend-Timestamp', err.http_headers)
|
||||
else:
|
||||
self.fail('Node data on %r was not fully destoryed!' %
|
||||
(onodes[0],))
|
||||
self.fail('Found obj data on %r' % hnodes[1])
|
||||
|
||||
# enable the first node again
|
||||
self.revive_drive(p_dev2)
|
||||
|
||||
# propagate the delete...
|
||||
# fire up reconstructor on handoff nodes only
|
||||
for hnode in hnodes:
|
||||
hnode_id = (hnode['port'] - 6000) / 10
|
||||
self.reconstructor.once(number=hnode_id)
|
||||
|
||||
# check the first node to make sure its gone
|
||||
# ... and it's on the first failed (now repaired) primary
|
||||
try:
|
||||
self.direct_get(onodes[1], opart)
|
||||
self.direct_get(failed_nodes[0], opart)
|
||||
except direct_client.DirectClientException as err:
|
||||
self.assertEqual(err.http_status, 404)
|
||||
self.assertEqual(err.http_headers['X-Backend-Timestamp'],
|
||||
delete_timestamp)
|
||||
else:
|
||||
self.fail('Node data on %r was not fully destoryed!' %
|
||||
(onodes[0]))
|
||||
self.fail('Found obj data on %r' % failed_nodes[0])
|
||||
|
||||
# make sure proxy get can't find it
|
||||
# repair the second primary
|
||||
self.revive_drive(self.device_dir('object', failed_nodes[1]))
|
||||
|
||||
# run the reconstructor on the *first* handoff node
|
||||
self.reconstructor.once(number=self.config_number(hnodes[0]))
|
||||
|
||||
# make sure it's tombstone was pushed out
|
||||
try:
|
||||
self.direct_get(hnodes[0], opart)
|
||||
except direct_client.DirectClientException as err:
|
||||
self.assertEqual(err.http_status, 404)
|
||||
self.assertNotIn('X-Backend-Timestamp', err.http_headers)
|
||||
else:
|
||||
self.fail('Found obj data on %r' % hnodes[0])
|
||||
|
||||
# ... and now it's on the second failed primary too!
|
||||
try:
|
||||
self.direct_get(failed_nodes[1], opart)
|
||||
except direct_client.DirectClientException as err:
|
||||
self.assertEqual(err.http_status, 404)
|
||||
self.assertEqual(err.http_headers['X-Backend-Timestamp'],
|
||||
delete_timestamp)
|
||||
else:
|
||||
self.fail('Found obj data on %r' % failed_nodes[1])
|
||||
|
||||
# sanity make sure proxy get can't find it
|
||||
try:
|
||||
self.proxy_get()
|
||||
except Exception as err:
|
||||
self.assertEqual(err.http_status, 404)
|
||||
else:
|
||||
self.fail('Node data on %r was not fully destoryed!' %
|
||||
self.fail('Node data on %r was not fully destroyed!' %
|
||||
(onodes[0]))
|
||||
|
||||
def test_reconstruct_from_reverted_fragment_archive(self):
|
||||
|
|
|
@ -21,25 +21,39 @@ import mock
|
|||
from six import BytesIO
|
||||
|
||||
from test.unit import FakeLogger
|
||||
from swift.common.utils import get_logger
|
||||
from swift.common.utils import get_logger, split_path
|
||||
from swift.common.middleware import proxy_logging
|
||||
from swift.common.swob import Request, Response
|
||||
from swift.common import constraints
|
||||
from swift.common.storage_policy import StoragePolicy
|
||||
from test.unit import patch_policies
|
||||
|
||||
|
||||
class FakeApp(object):
|
||||
|
||||
def __init__(self, body=None, response_str='200 OK'):
|
||||
def __init__(self, body=None, response_str='200 OK', policy_idx='0'):
|
||||
if body is None:
|
||||
body = ['FAKE APP']
|
||||
|
||||
self.body = body
|
||||
self.response_str = response_str
|
||||
self.policy_idx = policy_idx
|
||||
|
||||
def __call__(self, env, start_response):
|
||||
start_response(self.response_str,
|
||||
[('Content-Type', 'text/plain'),
|
||||
('Content-Length', str(sum(map(len, self.body))))])
|
||||
try:
|
||||
# /v1/a/c or /v1/a/c/o
|
||||
split_path(env['PATH_INFO'], 3, 4, True)
|
||||
is_container_or_object_req = True
|
||||
except ValueError:
|
||||
is_container_or_object_req = False
|
||||
|
||||
headers = [('Content-Type', 'text/plain'),
|
||||
('Content-Length', str(sum(map(len, self.body))))]
|
||||
if is_container_or_object_req and self.policy_idx is not None:
|
||||
headers.append(('X-Backend-Storage-Policy-Index',
|
||||
str(self.policy_idx)))
|
||||
|
||||
start_response(self.response_str, headers)
|
||||
while env['wsgi.input'].read(5):
|
||||
pass
|
||||
return self.body
|
||||
|
@ -91,8 +105,12 @@ def start_response(*args):
|
|||
pass
|
||||
|
||||
|
||||
@patch_policies([StoragePolicy(0, 'zero', False)])
|
||||
class TestProxyLogging(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
pass
|
||||
|
||||
def _log_parts(self, app, should_be_empty=False):
|
||||
info_calls = app.access_logger.log_dict['info']
|
||||
if should_be_empty:
|
||||
|
@ -136,11 +154,14 @@ class TestProxyLogging(unittest.TestCase):
|
|||
for timing_call in timing_calls:
|
||||
self.assertNotEqual(not_exp_metric, timing_call[0][0])
|
||||
|
||||
def assertUpdateStats(self, exp_metric, exp_bytes, app):
|
||||
update_stats_calls = app.access_logger.log_dict['update_stats']
|
||||
self.assertEquals(1, len(update_stats_calls))
|
||||
self.assertEquals({}, update_stats_calls[0][1])
|
||||
self.assertEquals((exp_metric, exp_bytes), update_stats_calls[0][0])
|
||||
def assertUpdateStats(self, exp_metrics_and_values, app):
|
||||
update_stats_calls = sorted(app.access_logger.log_dict['update_stats'])
|
||||
got_metrics_values_and_kwargs = [(usc[0][0], usc[0][1], usc[1])
|
||||
for usc in update_stats_calls]
|
||||
exp_metrics_values_and_kwargs = [(emv[0], emv[1], {})
|
||||
for emv in exp_metrics_and_values]
|
||||
self.assertEqual(got_metrics_values_and_kwargs,
|
||||
exp_metrics_values_and_kwargs)
|
||||
|
||||
def test_log_request_statsd_invalid_stats_types(self):
|
||||
app = proxy_logging.ProxyLoggingMiddleware(FakeApp(), {})
|
||||
|
@ -198,14 +219,51 @@ class TestProxyLogging(unittest.TestCase):
|
|||
'wsgi.input': BytesIO(b'4321')})
|
||||
stub_times = [18.0, 20.71828182846]
|
||||
iter_response = app(req.environ, lambda *_: None)
|
||||
|
||||
self.assertEqual('7654321', ''.join(iter_response))
|
||||
self.assertTiming('%s.GET.321.timing' % exp_type, app,
|
||||
exp_timing=2.71828182846 * 1000)
|
||||
self.assertTimingSince(
|
||||
'%s.GET.321.first-byte.timing' % exp_type, app,
|
||||
exp_start=18.0)
|
||||
self.assertUpdateStats('%s.GET.321.xfer' % exp_type,
|
||||
4 + 7, app)
|
||||
if exp_type == 'object':
|
||||
# Object operations also return stats by policy
|
||||
# In this case, the value needs to match the timing for GET
|
||||
self.assertTiming('%s.policy.0.GET.321.timing' % exp_type,
|
||||
app, exp_timing=2.71828182846 * 1000)
|
||||
self.assertUpdateStats([('%s.GET.321.xfer' % exp_type,
|
||||
4 + 7),
|
||||
('object.policy.0.GET.321.xfer',
|
||||
4 + 7)],
|
||||
app)
|
||||
else:
|
||||
self.assertUpdateStats([('%s.GET.321.xfer' % exp_type,
|
||||
4 + 7)],
|
||||
app)
|
||||
|
||||
# GET Repeat the test above, but with a non-existent policy
|
||||
# Do this only for object types
|
||||
if exp_type == 'object':
|
||||
app = proxy_logging.ProxyLoggingMiddleware(
|
||||
FakeApp(body='7654321', response_str='321 Fubar',
|
||||
policy_idx='-1'), {})
|
||||
app.access_logger = FakeLogger()
|
||||
req = Request.blank(path, environ={
|
||||
'REQUEST_METHOD': 'GET',
|
||||
'wsgi.input': BytesIO(b'4321')})
|
||||
stub_times = [18.0, 20.71828182846]
|
||||
iter_response = app(req.environ, lambda *_: None)
|
||||
|
||||
self.assertEqual('7654321', ''.join(iter_response))
|
||||
self.assertTiming('%s.GET.321.timing' % exp_type, app,
|
||||
exp_timing=2.71828182846 * 1000)
|
||||
self.assertTimingSince(
|
||||
'%s.GET.321.first-byte.timing' % exp_type, app,
|
||||
exp_start=18.0)
|
||||
# No results returned for the non-existent policy
|
||||
self.assertUpdateStats([('%s.GET.321.xfer' % exp_type,
|
||||
4 + 7)],
|
||||
app)
|
||||
|
||||
# GET with swift.proxy_access_log_made already set
|
||||
app = proxy_logging.ProxyLoggingMiddleware(
|
||||
|
@ -241,8 +299,42 @@ class TestProxyLogging(unittest.TestCase):
|
|||
'%s.GET.314.first-byte.timing' % exp_type, app)
|
||||
self.assertNotTiming(
|
||||
'%s.PUT.314.first-byte.timing' % exp_type, app)
|
||||
self.assertUpdateStats(
|
||||
'%s.PUT.314.xfer' % exp_type, 6 + 8, app)
|
||||
if exp_type == 'object':
|
||||
# Object operations also return stats by policy In this
|
||||
# case, the value needs to match the timing for PUT.
|
||||
self.assertTiming('%s.policy.0.PUT.314.timing' %
|
||||
exp_type, app,
|
||||
exp_timing=7.3321 * 1000)
|
||||
self.assertUpdateStats(
|
||||
[('object.PUT.314.xfer', 6 + 8),
|
||||
('object.policy.0.PUT.314.xfer', 6 + 8)], app)
|
||||
else:
|
||||
self.assertUpdateStats(
|
||||
[('%s.PUT.314.xfer' % exp_type, 6 + 8)], app)
|
||||
|
||||
# PUT Repeat the test above, but with a non-existent policy
|
||||
# Do this only for object types
|
||||
if exp_type == 'object':
|
||||
app = proxy_logging.ProxyLoggingMiddleware(
|
||||
FakeApp(body='87654321', response_str='314 PiTown',
|
||||
policy_idx='-1'), {})
|
||||
app.access_logger = FakeLogger()
|
||||
req = Request.blank(path, environ={
|
||||
'REQUEST_METHOD': 'PUT',
|
||||
'wsgi.input': BytesIO(b'654321')})
|
||||
# (it's not a GET, so time() doesn't have a 2nd call)
|
||||
stub_times = [58.2, 58.2 + 7.3321]
|
||||
iter_response = app(req.environ, lambda *_: None)
|
||||
self.assertEqual('87654321', ''.join(iter_response))
|
||||
self.assertTiming('%s.PUT.314.timing' % exp_type, app,
|
||||
exp_timing=7.3321 * 1000)
|
||||
self.assertNotTiming(
|
||||
'%s.GET.314.first-byte.timing' % exp_type, app)
|
||||
self.assertNotTiming(
|
||||
'%s.PUT.314.first-byte.timing' % exp_type, app)
|
||||
# No results returned for the non-existent policy
|
||||
self.assertUpdateStats([('object.PUT.314.xfer', 6 + 8)],
|
||||
app)
|
||||
|
||||
def test_log_request_stat_method_filtering_default(self):
|
||||
method_map = {
|
||||
|
@ -266,8 +358,8 @@ class TestProxyLogging(unittest.TestCase):
|
|||
app.log_request(req, 299, 11, 3, now, now + 1.17)
|
||||
self.assertTiming('account.%s.299.timing' % exp_method, app,
|
||||
exp_timing=1.17 * 1000)
|
||||
self.assertUpdateStats('account.%s.299.xfer' % exp_method,
|
||||
11 + 3, app)
|
||||
self.assertUpdateStats([('account.%s.299.xfer' % exp_method,
|
||||
11 + 3)], app)
|
||||
|
||||
def test_log_request_stat_method_filtering_custom(self):
|
||||
method_map = {
|
||||
|
@ -293,8 +385,8 @@ class TestProxyLogging(unittest.TestCase):
|
|||
app.log_request(req, 911, 4, 43, now, now + 1.01)
|
||||
self.assertTiming('container.%s.911.timing' % exp_method, app,
|
||||
exp_timing=1.01 * 1000)
|
||||
self.assertUpdateStats('container.%s.911.xfer' % exp_method,
|
||||
4 + 43, app)
|
||||
self.assertUpdateStats([('container.%s.911.xfer' % exp_method,
|
||||
4 + 43)], app)
|
||||
|
||||
def test_basic_req(self):
|
||||
app = proxy_logging.ProxyLoggingMiddleware(FakeApp(), {})
|
||||
|
@ -336,7 +428,7 @@ class TestProxyLogging(unittest.TestCase):
|
|||
self.assertEquals(log_parts[6], '200')
|
||||
self.assertEquals(resp_body, 'somechunksof data')
|
||||
self.assertEquals(log_parts[11], str(len(resp_body)))
|
||||
self.assertUpdateStats('SOS.GET.200.xfer', len(resp_body), app)
|
||||
self.assertUpdateStats([('SOS.GET.200.xfer', len(resp_body))], app)
|
||||
|
||||
def test_log_headers(self):
|
||||
for conf_key in ['access_log_headers', 'log_headers']:
|
||||
|
@ -372,6 +464,7 @@ class TestProxyLogging(unittest.TestCase):
|
|||
self.assertTrue('Host: localhost:80' not in headers)
|
||||
|
||||
def test_upload_size(self):
|
||||
# Using default policy
|
||||
app = proxy_logging.ProxyLoggingMiddleware(FakeApp(),
|
||||
{'log_headers': 'yes'})
|
||||
app.access_logger = FakeLogger()
|
||||
|
@ -385,8 +478,46 @@ class TestProxyLogging(unittest.TestCase):
|
|||
log_parts = self._log_parts(app)
|
||||
self.assertEquals(log_parts[11], str(len('FAKE APP')))
|
||||
self.assertEquals(log_parts[10], str(len('some stuff')))
|
||||
self.assertUpdateStats('object.PUT.200.xfer',
|
||||
len('some stuff') + len('FAKE APP'),
|
||||
self.assertUpdateStats([('object.PUT.200.xfer',
|
||||
len('some stuff') + len('FAKE APP')),
|
||||
('object.policy.0.PUT.200.xfer',
|
||||
len('some stuff') + len('FAKE APP'))],
|
||||
app)
|
||||
|
||||
# Using a non-existent policy
|
||||
app = proxy_logging.ProxyLoggingMiddleware(FakeApp(policy_idx='-1'),
|
||||
{'log_headers': 'yes'})
|
||||
app.access_logger = FakeLogger()
|
||||
req = Request.blank(
|
||||
'/v1/a/c/o/foo',
|
||||
environ={'REQUEST_METHOD': 'PUT',
|
||||
'wsgi.input': BytesIO(b'some stuff')})
|
||||
resp = app(req.environ, start_response)
|
||||
# exhaust generator
|
||||
[x for x in resp]
|
||||
log_parts = self._log_parts(app)
|
||||
self.assertEquals(log_parts[11], str(len('FAKE APP')))
|
||||
self.assertEquals(log_parts[10], str(len('some stuff')))
|
||||
self.assertUpdateStats([('object.PUT.200.xfer',
|
||||
len('some stuff') + len('FAKE APP'))],
|
||||
app)
|
||||
|
||||
def test_upload_size_no_policy(self):
|
||||
app = proxy_logging.ProxyLoggingMiddleware(FakeApp(policy_idx=None),
|
||||
{'log_headers': 'yes'})
|
||||
app.access_logger = FakeLogger()
|
||||
req = Request.blank(
|
||||
'/v1/a/c/o/foo',
|
||||
environ={'REQUEST_METHOD': 'PUT',
|
||||
'wsgi.input': BytesIO(b'some stuff')})
|
||||
resp = app(req.environ, start_response)
|
||||
# exhaust generator
|
||||
[x for x in resp]
|
||||
log_parts = self._log_parts(app)
|
||||
self.assertEquals(log_parts[11], str(len('FAKE APP')))
|
||||
self.assertEquals(log_parts[10], str(len('some stuff')))
|
||||
self.assertUpdateStats([('object.PUT.200.xfer',
|
||||
len('some stuff') + len('FAKE APP'))],
|
||||
app)
|
||||
|
||||
def test_upload_line(self):
|
||||
|
@ -403,8 +534,8 @@ class TestProxyLogging(unittest.TestCase):
|
|||
log_parts = self._log_parts(app)
|
||||
self.assertEquals(log_parts[11], str(len('FAKE APP')))
|
||||
self.assertEquals(log_parts[10], str(len('some stuff\n')))
|
||||
self.assertUpdateStats('container.POST.200.xfer',
|
||||
len('some stuff\n') + len('FAKE APP'),
|
||||
self.assertUpdateStats([('container.POST.200.xfer',
|
||||
len('some stuff\n') + len('FAKE APP'))],
|
||||
app)
|
||||
|
||||
def test_log_query_string(self):
|
||||
|
@ -881,10 +1012,9 @@ class TestProxyLogging(unittest.TestCase):
|
|||
def test_policy_index(self):
|
||||
# Policy index can be specified by X-Backend-Storage-Policy-Index
|
||||
# in the request header for object API
|
||||
app = proxy_logging.ProxyLoggingMiddleware(FakeApp(), {})
|
||||
app = proxy_logging.ProxyLoggingMiddleware(FakeApp(policy_idx='1'), {})
|
||||
app.access_logger = FakeLogger()
|
||||
req = Request.blank('/v1/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
|
||||
headers={'X-Backend-Storage-Policy-Index': '1'})
|
||||
req = Request.blank('/v1/a/c/o', environ={'REQUEST_METHOD': 'PUT'})
|
||||
resp = app(req.environ, start_response)
|
||||
''.join(resp)
|
||||
log_parts = self._log_parts(app)
|
||||
|
|
|
@ -13,19 +13,21 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import array
|
||||
from contextlib import contextmanager
|
||||
import mock
|
||||
import os
|
||||
from posix import stat_result, statvfs_result
|
||||
from shutil import rmtree
|
||||
import unittest
|
||||
from unittest import TestCase
|
||||
from contextlib import contextmanager
|
||||
from posix import stat_result, statvfs_result
|
||||
import array
|
||||
from swift.common import ring, utils
|
||||
from shutil import rmtree
|
||||
import os
|
||||
import mock
|
||||
|
||||
from swift import __version__ as swiftver
|
||||
from swift.common import ring, utils
|
||||
from swift.common.swob import Request
|
||||
from swift.common.middleware import recon
|
||||
from swift.common.storage_policy import StoragePolicy
|
||||
from test.unit import patch_policies
|
||||
|
||||
|
||||
def fake_check_mount(a, b):
|
||||
|
@ -191,6 +193,7 @@ class FakeRecon(object):
|
|||
raise Exception
|
||||
|
||||
|
||||
@patch_policies(legacy_only=True)
|
||||
class TestReconSuccess(TestCase):
|
||||
|
||||
def setUp(self):
|
||||
|
@ -198,7 +201,6 @@ class TestReconSuccess(TestCase):
|
|||
# which will cause ring md5 checks to fail
|
||||
self.tempdir = '/tmp/swift_recon_md5_test'
|
||||
utils.mkdirs(self.tempdir)
|
||||
self._create_rings()
|
||||
self.app = recon.ReconMiddleware(FakeApp(),
|
||||
{'swift_dir': self.tempdir})
|
||||
self.mockos = MockOS()
|
||||
|
@ -213,6 +215,22 @@ class TestReconSuccess(TestCase):
|
|||
self.app._from_recon_cache = self.fakecache.fake_from_recon_cache
|
||||
self.frecon = FakeRecon()
|
||||
|
||||
self.ring_part_shift = 5
|
||||
self.ring_devs = [{'id': 0, 'zone': 0, 'weight': 1.0,
|
||||
'ip': '10.1.1.1', 'port': 6000,
|
||||
'device': 'sda1'},
|
||||
{'id': 1, 'zone': 0, 'weight': 1.0,
|
||||
'ip': '10.1.1.1', 'port': 6000,
|
||||
'device': 'sdb1'},
|
||||
None,
|
||||
{'id': 3, 'zone': 2, 'weight': 1.0,
|
||||
'ip': '10.1.2.1', 'port': 6000,
|
||||
'device': 'sdc1'},
|
||||
{'id': 4, 'zone': 2, 'weight': 1.0,
|
||||
'ip': '10.1.2.2', 'port': 6000,
|
||||
'device': 'sdd1'}]
|
||||
self._create_rings()
|
||||
|
||||
def tearDown(self):
|
||||
os.listdir = self.real_listdir
|
||||
utils.ismount = self.real_ismount
|
||||
|
@ -222,8 +240,7 @@ class TestReconSuccess(TestCase):
|
|||
del self.fakecache
|
||||
rmtree(self.tempdir)
|
||||
|
||||
def _create_rings(self):
|
||||
|
||||
def _create_ring(self, ringpath, replica_map, devs, part_shift):
|
||||
def fake_time():
|
||||
return 0
|
||||
|
||||
|
@ -232,85 +249,203 @@ class TestReconSuccess(TestCase):
|
|||
# not use the .gz extension in the gzip header
|
||||
return fname[:-3]
|
||||
|
||||
accountgz = os.path.join(self.tempdir, 'account.ring.gz')
|
||||
containergz = os.path.join(self.tempdir, 'container.ring.gz')
|
||||
objectgz = os.path.join(self.tempdir, 'object.ring.gz')
|
||||
objectgz_1 = os.path.join(self.tempdir, 'object-1.ring.gz')
|
||||
objectgz_2 = os.path.join(self.tempdir, 'object-2.ring.gz')
|
||||
|
||||
# make the rings unique so they have different md5 sums
|
||||
intended_replica2part2dev_id_a = [
|
||||
array.array('H', [3, 1, 3, 1]),
|
||||
array.array('H', [0, 3, 1, 4]),
|
||||
array.array('H', [1, 4, 0, 3])]
|
||||
intended_replica2part2dev_id_c = [
|
||||
array.array('H', [4, 3, 0, 1]),
|
||||
array.array('H', [0, 1, 3, 4]),
|
||||
array.array('H', [3, 4, 0, 1])]
|
||||
intended_replica2part2dev_id_o = [
|
||||
array.array('H', [0, 1, 0, 1]),
|
||||
array.array('H', [0, 1, 0, 1]),
|
||||
array.array('H', [3, 4, 3, 4])]
|
||||
intended_replica2part2dev_id_o_1 = [
|
||||
array.array('H', [1, 0, 1, 0]),
|
||||
array.array('H', [1, 0, 1, 0]),
|
||||
array.array('H', [4, 3, 4, 3])]
|
||||
intended_replica2part2dev_id_o_2 = [
|
||||
array.array('H', [1, 1, 1, 0]),
|
||||
array.array('H', [1, 0, 1, 3]),
|
||||
array.array('H', [4, 2, 4, 3])]
|
||||
|
||||
intended_devs = [{'id': 0, 'zone': 0, 'weight': 1.0,
|
||||
'ip': '10.1.1.1', 'port': 6000,
|
||||
'device': 'sda1'},
|
||||
{'id': 1, 'zone': 0, 'weight': 1.0,
|
||||
'ip': '10.1.1.1', 'port': 6000,
|
||||
'device': 'sdb1'},
|
||||
None,
|
||||
{'id': 3, 'zone': 2, 'weight': 1.0,
|
||||
'ip': '10.1.2.1', 'port': 6000,
|
||||
'device': 'sdc1'},
|
||||
{'id': 4, 'zone': 2, 'weight': 1.0,
|
||||
'ip': '10.1.2.2', 'port': 6000,
|
||||
'device': 'sdd1'}]
|
||||
|
||||
# eliminate time from the equation as gzip 2.6 includes
|
||||
# it in the header resulting in md5 file mismatch, also
|
||||
# have to mock basename as one version uses it, one doesn't
|
||||
with mock.patch("time.time", fake_time):
|
||||
with mock.patch("os.path.basename", fake_base):
|
||||
ring.RingData(intended_replica2part2dev_id_a,
|
||||
intended_devs, 5).save(accountgz, mtime=None)
|
||||
ring.RingData(intended_replica2part2dev_id_c,
|
||||
intended_devs, 5).save(containergz, mtime=None)
|
||||
ring.RingData(intended_replica2part2dev_id_o,
|
||||
intended_devs, 5).save(objectgz, mtime=None)
|
||||
ring.RingData(intended_replica2part2dev_id_o_1,
|
||||
intended_devs, 5).save(objectgz_1, mtime=None)
|
||||
ring.RingData(intended_replica2part2dev_id_o_2,
|
||||
intended_devs, 5).save(objectgz_2, mtime=None)
|
||||
ring.RingData(replica_map, devs, part_shift).save(ringpath,
|
||||
mtime=None)
|
||||
|
||||
def _create_rings(self):
|
||||
# make the rings unique so they have different md5 sums
|
||||
rings = {
|
||||
'account.ring.gz': [
|
||||
array.array('H', [3, 1, 3, 1]),
|
||||
array.array('H', [0, 3, 1, 4]),
|
||||
array.array('H', [1, 4, 0, 3])],
|
||||
'container.ring.gz': [
|
||||
array.array('H', [4, 3, 0, 1]),
|
||||
array.array('H', [0, 1, 3, 4]),
|
||||
array.array('H', [3, 4, 0, 1])],
|
||||
'object.ring.gz': [
|
||||
array.array('H', [0, 1, 0, 1]),
|
||||
array.array('H', [0, 1, 0, 1]),
|
||||
array.array('H', [3, 4, 3, 4])],
|
||||
'object-1.ring.gz': [
|
||||
array.array('H', [1, 0, 1, 0]),
|
||||
array.array('H', [1, 0, 1, 0]),
|
||||
array.array('H', [4, 3, 4, 3])],
|
||||
'object-2.ring.gz': [
|
||||
array.array('H', [1, 1, 1, 0]),
|
||||
array.array('H', [1, 0, 1, 3]),
|
||||
array.array('H', [4, 2, 4, 3])]
|
||||
}
|
||||
|
||||
for ringfn, replica_map in rings.iteritems():
|
||||
ringpath = os.path.join(self.tempdir, ringfn)
|
||||
self._create_ring(ringpath, replica_map, self.ring_devs,
|
||||
self.ring_part_shift)
|
||||
|
||||
@patch_policies([
|
||||
StoragePolicy(0, 'stagecoach'),
|
||||
StoragePolicy(1, 'pinto', is_deprecated=True),
|
||||
StoragePolicy(2, 'toyota', is_default=True),
|
||||
])
|
||||
def test_get_ring_md5(self):
|
||||
def fake_open(self, f):
|
||||
raise IOError
|
||||
|
||||
# We should only see configured and present rings, so to handle the
|
||||
# "normal" case just patch the policies to match the existing rings.
|
||||
expt_out = {'%s/account.ring.gz' % self.tempdir:
|
||||
'd288bdf39610e90d4f0b67fa00eeec4f',
|
||||
'%s/container.ring.gz' % self.tempdir:
|
||||
'9a5a05a8a4fbbc61123de792dbe4592d',
|
||||
'%s/object.ring.gz' % self.tempdir:
|
||||
'da02bfbd0bf1e7d56faea15b6fe5ab1e',
|
||||
'%s/object-1.ring.gz' % self.tempdir:
|
||||
'3f1899b27abf5f2efcc67d6fae1e1c64',
|
||||
'%s/object-2.ring.gz' % self.tempdir:
|
||||
'8f0e57079b3c245d9b3d5a428e9312ee',
|
||||
'8f0e57079b3c245d9b3d5a428e9312ee'}
|
||||
|
||||
# We need to instantiate app after overriding the configured policies.
|
||||
# object-{1,2}.ring.gz should both appear as they are present on disk
|
||||
# and were configured as policies.
|
||||
app = recon.ReconMiddleware(FakeApp(), {'swift_dir': self.tempdir})
|
||||
self.assertEquals(sorted(app.get_ring_md5().items()),
|
||||
sorted(expt_out.items()))
|
||||
|
||||
def test_get_ring_md5_ioerror_produces_none_hash(self):
|
||||
# Ring files that are present but produce an IOError on read should
|
||||
# still produce a ringmd5 entry with a None for the hash. Note that
|
||||
# this is different than if an expected ring file simply doesn't exist,
|
||||
# in which case it is excluded altogether from the ringmd5 response.
|
||||
|
||||
def fake_open(fn, fmode):
|
||||
raise IOError
|
||||
|
||||
expt_out = {'%s/account.ring.gz' % self.tempdir: None,
|
||||
'%s/container.ring.gz' % self.tempdir: None,
|
||||
'%s/object.ring.gz' % self.tempdir: None}
|
||||
ringmd5 = self.app.get_ring_md5(openr=fake_open)
|
||||
self.assertEquals(sorted(ringmd5.items()),
|
||||
sorted(expt_out.items()))
|
||||
|
||||
def test_get_ring_md5_failed_ring_hash_recovers_without_restart(self):
|
||||
# Ring files that are present but produce an IOError on read will
|
||||
# show a None hash, but if they can be read later their hash
|
||||
# should become available in the ringmd5 response.
|
||||
|
||||
def fake_open(fn, fmode):
|
||||
raise IOError
|
||||
|
||||
expt_out = {'%s/account.ring.gz' % self.tempdir: None,
|
||||
'%s/container.ring.gz' % self.tempdir: None,
|
||||
'%s/object.ring.gz' % self.tempdir: None}
|
||||
ringmd5 = self.app.get_ring_md5(openr=fake_open)
|
||||
self.assertEquals(sorted(ringmd5.items()),
|
||||
sorted(expt_out.items()))
|
||||
|
||||
# If we fix a ring and it can be read again, its hash should then
|
||||
# appear using the same app instance
|
||||
def fake_open_objonly(fn, fmode):
|
||||
if 'object' not in fn:
|
||||
raise IOError
|
||||
return open(fn, fmode)
|
||||
|
||||
expt_out = {'%s/account.ring.gz' % self.tempdir: None,
|
||||
'%s/container.ring.gz' % self.tempdir: None,
|
||||
'%s/object.ring.gz' % self.tempdir:
|
||||
'da02bfbd0bf1e7d56faea15b6fe5ab1e'}
|
||||
ringmd5 = self.app.get_ring_md5(openr=fake_open_objonly)
|
||||
self.assertEquals(sorted(ringmd5.items()),
|
||||
sorted(expt_out.items()))
|
||||
|
||||
@patch_policies([
|
||||
StoragePolicy(0, 'stagecoach'),
|
||||
StoragePolicy(2, 'bike', is_default=True),
|
||||
StoragePolicy(3502, 'train')
|
||||
])
|
||||
def test_get_ring_md5_missing_ring_recovers_without_restart(self):
|
||||
# If a configured ring is missing when the app is instantiated, but is
|
||||
# later moved into place, we shouldn't need to restart object-server
|
||||
# for it to appear in recon.
|
||||
expt_out = {'%s/account.ring.gz' % self.tempdir:
|
||||
'd288bdf39610e90d4f0b67fa00eeec4f',
|
||||
'%s/container.ring.gz' % self.tempdir:
|
||||
'9a5a05a8a4fbbc61123de792dbe4592d',
|
||||
'%s/object.ring.gz' % self.tempdir:
|
||||
'da02bfbd0bf1e7d56faea15b6fe5ab1e',
|
||||
'%s/object-2.ring.gz' % self.tempdir:
|
||||
'8f0e57079b3c245d9b3d5a428e9312ee'}
|
||||
|
||||
# We need to instantiate app after overriding the configured policies.
|
||||
# object-1.ring.gz should not appear as it's present but unconfigured.
|
||||
# object-3502.ring.gz should not appear as it's configured but not
|
||||
# present.
|
||||
app = recon.ReconMiddleware(FakeApp(), {'swift_dir': self.tempdir})
|
||||
self.assertEquals(sorted(app.get_ring_md5().items()),
|
||||
sorted(expt_out.items()))
|
||||
|
||||
# Simulate the configured policy's missing ringfile being moved into
|
||||
# place during runtime
|
||||
ringfn = 'object-3502.ring.gz'
|
||||
ringpath = os.path.join(self.tempdir, ringfn)
|
||||
ringmap = [array.array('H', [1, 2, 1, 4]),
|
||||
array.array('H', [4, 0, 1, 3]),
|
||||
array.array('H', [1, 1, 0, 3])]
|
||||
self._create_ring(os.path.join(self.tempdir, ringfn),
|
||||
ringmap, self.ring_devs, self.ring_part_shift)
|
||||
expt_out[ringpath] = 'acfa4b85396d2a33f361ebc07d23031d'
|
||||
|
||||
# We should now see it in the ringmd5 response, without a restart
|
||||
# (using the same app instance)
|
||||
self.assertEquals(sorted(app.get_ring_md5().items()),
|
||||
sorted(expt_out.items()))
|
||||
|
||||
@patch_policies([
|
||||
StoragePolicy(0, 'stagecoach', is_default=True),
|
||||
StoragePolicy(2, 'bike'),
|
||||
StoragePolicy(2305, 'taxi')
|
||||
])
|
||||
def test_get_ring_md5_excludes_configured_missing_obj_rings(self):
|
||||
# Object rings that are configured but missing aren't meant to appear
|
||||
# in the ringmd5 response.
|
||||
expt_out = {'%s/account.ring.gz' % self.tempdir:
|
||||
'd288bdf39610e90d4f0b67fa00eeec4f',
|
||||
'%s/container.ring.gz' % self.tempdir:
|
||||
'9a5a05a8a4fbbc61123de792dbe4592d',
|
||||
'%s/object.ring.gz' % self.tempdir:
|
||||
'da02bfbd0bf1e7d56faea15b6fe5ab1e',
|
||||
'%s/object-2.ring.gz' % self.tempdir:
|
||||
'8f0e57079b3c245d9b3d5a428e9312ee'}
|
||||
|
||||
# We need to instantiate app after overriding the configured policies.
|
||||
# object-1.ring.gz should not appear as it's present but unconfigured.
|
||||
# object-2305.ring.gz should not appear as it's configured but not
|
||||
# present.
|
||||
app = recon.ReconMiddleware(FakeApp(), {'swift_dir': self.tempdir})
|
||||
self.assertEquals(sorted(app.get_ring_md5().items()),
|
||||
sorted(expt_out.items()))
|
||||
|
||||
@patch_policies([
|
||||
StoragePolicy(0, 'zero', is_default=True),
|
||||
])
|
||||
def test_get_ring_md5_excludes_unconfigured_present_obj_rings(self):
|
||||
# Object rings that are present but not configured in swift.conf
|
||||
# aren't meant to appear in the ringmd5 response.
|
||||
expt_out = {'%s/account.ring.gz' % self.tempdir:
|
||||
'd288bdf39610e90d4f0b67fa00eeec4f',
|
||||
'%s/container.ring.gz' % self.tempdir:
|
||||
'9a5a05a8a4fbbc61123de792dbe4592d',
|
||||
'%s/object.ring.gz' % self.tempdir:
|
||||
'da02bfbd0bf1e7d56faea15b6fe5ab1e'}
|
||||
|
||||
self.assertEquals(sorted(self.app.get_ring_md5().items()),
|
||||
# We need to instantiate app after overriding the configured policies.
|
||||
# object-{1,2}.ring.gz should not appear as they are present on disk
|
||||
# but were not configured as policies.
|
||||
app = recon.ReconMiddleware(FakeApp(), {'swift_dir': self.tempdir})
|
||||
self.assertEquals(sorted(app.get_ring_md5().items()),
|
||||
sorted(expt_out.items()))
|
||||
|
||||
# cover error path
|
||||
self.app.get_ring_md5(openr=fake_open)
|
||||
|
||||
def test_from_recon_cache(self):
|
||||
oart = OpenAndReadTester(['{"notneeded": 5, "testkey1": "canhazio"}'])
|
||||
self.app._from_recon_cache = self.real_from_cache
|
||||
|
|
|
@ -737,22 +737,22 @@ class TestTempURL(unittest.TestCase):
|
|||
|
||||
def test_disallowed_header_object_manifest(self):
|
||||
self.tempurl = tempurl.filter_factory({})(self.auth)
|
||||
method = 'PUT'
|
||||
expires = int(time() + 86400)
|
||||
path = '/v1/a/c/o'
|
||||
key = 'abc'
|
||||
hmac_body = '%s\n%s\n%s' % (method, expires, path)
|
||||
sig = hmac.new(key, hmac_body, sha1).hexdigest()
|
||||
req = self._make_request(
|
||||
path, method='PUT', keys=[key],
|
||||
headers={'x-object-manifest': 'private/secret'},
|
||||
environ={'QUERY_STRING': 'temp_url_sig=%s&temp_url_expires=%s' % (
|
||||
sig, expires)})
|
||||
resp = req.get_response(self.tempurl)
|
||||
self.assertEquals(resp.status_int, 400)
|
||||
self.assertTrue('header' in resp.body)
|
||||
self.assertTrue('not allowed' in resp.body)
|
||||
self.assertTrue('X-Object-Manifest' in resp.body)
|
||||
for method in ('PUT', 'POST'):
|
||||
hmac_body = '%s\n%s\n%s' % (method, expires, path)
|
||||
sig = hmac.new(key, hmac_body, sha1).hexdigest()
|
||||
req = self._make_request(
|
||||
path, method=method, keys=[key],
|
||||
headers={'x-object-manifest': 'private/secret'},
|
||||
environ={'QUERY_STRING': 'temp_url_sig=%s&temp_url_expires=%s'
|
||||
% (sig, expires)})
|
||||
resp = req.get_response(self.tempurl)
|
||||
self.assertEquals(resp.status_int, 400)
|
||||
self.assertTrue('header' in resp.body)
|
||||
self.assertTrue('not allowed' in resp.body)
|
||||
self.assertTrue('X-Object-Manifest' in resp.body)
|
||||
|
||||
def test_removed_incoming_header(self):
|
||||
self.tempurl = tempurl.filter_factory({
|
||||
|
|
|
@ -364,10 +364,6 @@ class TestDBReplicator(unittest.TestCase):
|
|||
'replication_ip': '127.0.0.1', 'replication_port': '0',
|
||||
'device': 'sda1'}
|
||||
|
||||
def mock_rsync_ip(ip):
|
||||
self.assertEquals(fake_device['ip'], ip)
|
||||
return 'rsync_ip(%s)' % ip
|
||||
|
||||
class MyTestReplicator(TestReplicator):
|
||||
def __init__(self, db_file, remote_file):
|
||||
super(MyTestReplicator, self).__init__({})
|
||||
|
@ -381,20 +377,11 @@ class TestDBReplicator(unittest.TestCase):
|
|||
self_._rsync_file_called = True
|
||||
return False
|
||||
|
||||
with patch('swift.common.db_replicator.rsync_ip', mock_rsync_ip):
|
||||
broker = FakeBroker()
|
||||
remote_file = 'rsync_ip(127.0.0.1)::container/sda1/tmp/abcd'
|
||||
replicator = MyTestReplicator(broker.db_file, remote_file)
|
||||
replicator._rsync_db(broker, fake_device, ReplHttp(), 'abcd')
|
||||
self.assertTrue(replicator._rsync_file_called)
|
||||
|
||||
with patch('swift.common.db_replicator.rsync_ip', mock_rsync_ip):
|
||||
broker = FakeBroker()
|
||||
remote_file = 'rsync_ip(127.0.0.1)::container0/sda1/tmp/abcd'
|
||||
replicator = MyTestReplicator(broker.db_file, remote_file)
|
||||
replicator.vm_test_mode = True
|
||||
replicator._rsync_db(broker, fake_device, ReplHttp(), 'abcd')
|
||||
self.assertTrue(replicator._rsync_file_called)
|
||||
broker = FakeBroker()
|
||||
remote_file = '127.0.0.1::container/sda1/tmp/abcd'
|
||||
replicator = MyTestReplicator(broker.db_file, remote_file)
|
||||
replicator._rsync_db(broker, fake_device, ReplHttp(), 'abcd')
|
||||
self.assertTrue(replicator._rsync_file_called)
|
||||
|
||||
def test_rsync_db_rsync_file_failure(self):
|
||||
class MyTestReplicator(TestReplicator):
|
||||
|
|
|
@ -2244,6 +2244,58 @@ cluster_dfw1 = http://dfw1.host/v1/
|
|||
self.assertEqual(
|
||||
utils.rsync_ip('::ffff:192.0.2.128'), '[::ffff:192.0.2.128]')
|
||||
|
||||
def test_rsync_module_interpolation(self):
|
||||
fake_device = {'ip': '127.0.0.1', 'port': 11,
|
||||
'replication_ip': '127.0.0.2', 'replication_port': 12,
|
||||
'region': '1', 'zone': '2', 'device': 'sda1',
|
||||
'meta': 'just_a_string'}
|
||||
|
||||
self.assertEqual(
|
||||
utils.rsync_module_interpolation('{ip}', fake_device),
|
||||
'127.0.0.1')
|
||||
self.assertEqual(
|
||||
utils.rsync_module_interpolation('{port}', fake_device),
|
||||
'11')
|
||||
self.assertEqual(
|
||||
utils.rsync_module_interpolation('{replication_ip}', fake_device),
|
||||
'127.0.0.2')
|
||||
self.assertEqual(
|
||||
utils.rsync_module_interpolation('{replication_port}',
|
||||
fake_device),
|
||||
'12')
|
||||
self.assertEqual(
|
||||
utils.rsync_module_interpolation('{region}', fake_device),
|
||||
'1')
|
||||
self.assertEqual(
|
||||
utils.rsync_module_interpolation('{zone}', fake_device),
|
||||
'2')
|
||||
self.assertEqual(
|
||||
utils.rsync_module_interpolation('{device}', fake_device),
|
||||
'sda1')
|
||||
self.assertEqual(
|
||||
utils.rsync_module_interpolation('{meta}', fake_device),
|
||||
'just_a_string')
|
||||
|
||||
self.assertEqual(
|
||||
utils.rsync_module_interpolation('{replication_ip}::object',
|
||||
fake_device),
|
||||
'127.0.0.2::object')
|
||||
self.assertEqual(
|
||||
utils.rsync_module_interpolation('{ip}::container{port}',
|
||||
fake_device),
|
||||
'127.0.0.1::container11')
|
||||
self.assertEqual(
|
||||
utils.rsync_module_interpolation(
|
||||
'{replication_ip}::object_{device}', fake_device),
|
||||
'127.0.0.2::object_sda1')
|
||||
self.assertEqual(
|
||||
utils.rsync_module_interpolation(
|
||||
'127.0.0.3::object_{replication_port}', fake_device),
|
||||
'127.0.0.3::object_12')
|
||||
|
||||
self.assertRaises(ValueError, utils.rsync_module_interpolation,
|
||||
'{replication_ip}::object_{deivce}', fake_device)
|
||||
|
||||
def test_fallocate_reserve(self):
|
||||
|
||||
class StatVFS(object):
|
||||
|
|
|
@ -74,6 +74,51 @@ class TestContainerUpdater(unittest.TestCase):
|
|||
self.assertEqual(cu.node_timeout, 5)
|
||||
self.assertTrue(cu.get_account_ring() is not None)
|
||||
|
||||
@mock.patch.object(container_updater, 'ismount')
|
||||
@mock.patch.object(container_updater.ContainerUpdater, 'container_sweep')
|
||||
def test_run_once_with_device_unmounted(self, mock_sweep, mock_ismount):
|
||||
|
||||
mock_ismount.return_value = False
|
||||
cu = container_updater.ContainerUpdater({
|
||||
'devices': self.devices_dir,
|
||||
'mount_check': 'false',
|
||||
'swift_dir': self.testdir,
|
||||
'interval': '1',
|
||||
'concurrency': '1',
|
||||
'node_timeout': '15',
|
||||
'account_suppression_time': 0
|
||||
})
|
||||
containers_dir = os.path.join(self.sda1, DATADIR)
|
||||
os.mkdir(containers_dir)
|
||||
partition_dir = os.path.join(containers_dir, "a")
|
||||
os.mkdir(partition_dir)
|
||||
|
||||
cu.run_once()
|
||||
self.assertTrue(os.path.exists(containers_dir)) # sanity check
|
||||
|
||||
# only called if a partition dir exists
|
||||
self.assertTrue(mock_sweep.called)
|
||||
|
||||
mock_sweep.reset_mock()
|
||||
|
||||
cu = container_updater.ContainerUpdater({
|
||||
'devices': self.devices_dir,
|
||||
'mount_check': 'true',
|
||||
'swift_dir': self.testdir,
|
||||
'interval': '1',
|
||||
'concurrency': '1',
|
||||
'node_timeout': '15',
|
||||
'account_suppression_time': 0
|
||||
})
|
||||
cu.logger = FakeLogger()
|
||||
cu.run_once()
|
||||
log_lines = cu.logger.get_lines_for_level('warning')
|
||||
self.assertTrue(len(log_lines) > 0)
|
||||
msg = 'sda1 is not mounted'
|
||||
self.assertEqual(log_lines[0], msg)
|
||||
# Ensure that the container_sweep did not run
|
||||
self.assertFalse(mock_sweep.called)
|
||||
|
||||
def test_run_once(self):
|
||||
cu = container_updater.ContainerUpdater({
|
||||
'devices': self.devices_dir,
|
||||
|
@ -255,6 +300,5 @@ class TestContainerUpdater(unittest.TestCase):
|
|||
self.assertEqual(info['reported_object_count'], 1)
|
||||
self.assertEqual(info['reported_bytes_used'], 3)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
|
|
@ -3665,6 +3665,18 @@ class TestECDiskFile(DiskFileMixin, unittest.TestCase):
|
|||
df.purge(ts, 3)
|
||||
self.assertEqual(sorted(os.listdir(df._datadir)), [])
|
||||
|
||||
def test_purge_without_frag(self):
|
||||
ts = self.ts()
|
||||
df = self._simple_get_diskfile()
|
||||
df.delete(ts)
|
||||
|
||||
# sanity
|
||||
self.assertEqual(sorted(os.listdir(df._datadir)), [
|
||||
ts.internal + '.ts',
|
||||
])
|
||||
df.purge(ts, None)
|
||||
self.assertEqual(sorted(os.listdir(df._datadir)), [])
|
||||
|
||||
def test_purge_old_tombstone(self):
|
||||
old_ts = self.ts()
|
||||
ts = self.ts()
|
||||
|
|
|
@ -30,6 +30,7 @@ from contextlib import closing, nested, contextmanager
|
|||
from gzip import GzipFile
|
||||
from shutil import rmtree
|
||||
from swift.common import utils
|
||||
from swift.common.swob import HeaderKeyDict
|
||||
from swift.common.exceptions import DiskFileError
|
||||
from swift.obj import diskfile, reconstructor as object_reconstructor
|
||||
from swift.common import ring
|
||||
|
@ -683,6 +684,19 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
|
|||
self.assertEqual(
|
||||
len(self.reconstructor.logger.log_dict['warning']), 1)
|
||||
|
||||
def test_reconstructor_does_not_log_on_404(self):
|
||||
part = self.part_nums[0]
|
||||
node = POLICIES[0].object_ring.get_part_nodes(int(part))[0]
|
||||
with mocked_http_conn(404):
|
||||
self.reconstructor._get_response(node, part,
|
||||
path='some_path',
|
||||
headers={},
|
||||
policy=POLICIES[0])
|
||||
|
||||
# Make sure that no warnings are emitted for a 404
|
||||
len_warning_lines = len(self.logger.get_lines_for_level('warning'))
|
||||
self.assertEqual(len_warning_lines, 0)
|
||||
|
||||
def test_reconstructor_skips_bogus_partition_dirs(self):
|
||||
# A directory in the wrong place shouldn't crash the reconstructor
|
||||
self.reconstructor._reset_stats()
|
||||
|
@ -2415,11 +2429,8 @@ class TestObjectReconstructor(unittest.TestCase):
|
|||
self.assertFalse(os.access(df._datadir, os.F_OK))
|
||||
|
||||
def test_process_job_revert_cleanup_tombstone(self):
|
||||
replicas = self.policy.object_ring.replicas
|
||||
frag_index = random.randint(0, replicas - 1)
|
||||
sync_to = [random.choice([n for n in self.policy.object_ring.devs
|
||||
if n != self.local_dev])]
|
||||
sync_to[0]['index'] = frag_index
|
||||
partition = 0
|
||||
|
||||
part_path = os.path.join(self.devices, self.local_dev['device'],
|
||||
|
@ -2437,7 +2448,7 @@ class TestObjectReconstructor(unittest.TestCase):
|
|||
|
||||
job = {
|
||||
'job_type': object_reconstructor.REVERT,
|
||||
'frag_index': frag_index,
|
||||
'frag_index': None,
|
||||
'suffixes': [suffix],
|
||||
'sync_to': sync_to,
|
||||
'partition': partition,
|
||||
|
@ -2490,14 +2501,34 @@ class TestObjectReconstructor(unittest.TestCase):
|
|||
headers.update({'X-Object-Sysmeta-Ec-Etag': etag})
|
||||
responses.append((200, body, headers))
|
||||
|
||||
# make a hook point at
|
||||
# swift.obj.reconstructor.ObjectReconstructor._get_response
|
||||
called_headers = []
|
||||
orig_func = object_reconstructor.ObjectReconstructor._get_response
|
||||
|
||||
def _get_response_hook(self, node, part, path, headers, policy):
|
||||
called_headers.append(headers)
|
||||
return orig_func(self, node, part, path, headers, policy)
|
||||
|
||||
codes, body_iter, headers = zip(*responses)
|
||||
with mocked_http_conn(*codes, body_iter=body_iter, headers=headers):
|
||||
df = self.reconstructor.reconstruct_fa(
|
||||
job, node, metadata)
|
||||
fixed_body = ''.join(df.reader())
|
||||
self.assertEqual(len(fixed_body), len(broken_body))
|
||||
self.assertEqual(md5(fixed_body).hexdigest(),
|
||||
md5(broken_body).hexdigest())
|
||||
get_response_path = \
|
||||
'swift.obj.reconstructor.ObjectReconstructor._get_response'
|
||||
with mock.patch(get_response_path, _get_response_hook):
|
||||
with mocked_http_conn(
|
||||
*codes, body_iter=body_iter, headers=headers):
|
||||
df = self.reconstructor.reconstruct_fa(
|
||||
job, node, metadata)
|
||||
fixed_body = ''.join(df.reader())
|
||||
self.assertEqual(len(fixed_body), len(broken_body))
|
||||
self.assertEqual(md5(fixed_body).hexdigest(),
|
||||
md5(broken_body).hexdigest())
|
||||
for called_header in called_headers:
|
||||
called_header = HeaderKeyDict(called_header)
|
||||
self.assertTrue('Content-Length' in called_header)
|
||||
self.assertEqual(called_header['Content-Length'], '0')
|
||||
self.assertTrue('User-Agent' in called_header)
|
||||
user_agent = called_header['User-Agent']
|
||||
self.assertTrue(user_agent.startswith('obj-reconstructor'))
|
||||
|
||||
def test_reconstruct_fa_errors_works(self):
|
||||
job = {
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
# limitations under the License.
|
||||
|
||||
import contextlib
|
||||
import hashlib
|
||||
import os
|
||||
import shutil
|
||||
import tempfile
|
||||
|
@ -26,7 +27,7 @@ import six
|
|||
from swift.common import bufferedhttp
|
||||
from swift.common import exceptions
|
||||
from swift.common import swob
|
||||
from swift.common.storage_policy import POLICIES
|
||||
from swift.common.storage_policy import POLICIES, REPL_POLICY
|
||||
from swift.common import utils
|
||||
from swift.common.swob import HTTPException
|
||||
from swift.obj import diskfile
|
||||
|
@ -255,6 +256,23 @@ class TestReceiver(unittest.TestCase):
|
|||
self.assertEqual(rcvr.frag_index, 7)
|
||||
self.assertEqual(rcvr.node_index, 7)
|
||||
|
||||
@unit.patch_policies()
|
||||
def test_Receiver_with_invalid_indexes(self):
|
||||
# update router post policy patch
|
||||
self.controller._diskfile_router = diskfile.DiskFileRouter(
|
||||
self.conf, self.controller.logger)
|
||||
req = swob.Request.blank(
|
||||
'/sda1/1',
|
||||
environ={'REQUEST_METHOD': 'SSYNC',
|
||||
'HTTP_X_BACKEND_SSYNC_NODE_INDEX': 'None',
|
||||
'HTTP_X_BACKEND_SSYNC_FRAG_INDEX': 'None',
|
||||
'HTTP_X_BACKEND_STORAGE_POLICY_INDEX': '1'},
|
||||
body=':MISSING_CHECK: START\r\n'
|
||||
':MISSING_CHECK: END\r\n'
|
||||
':UPDATES: START\r\n:UPDATES: END\r\n')
|
||||
resp = req.get_response(self.controller)
|
||||
self.assertEqual(resp.status_int, 400)
|
||||
|
||||
@unit.patch_policies()
|
||||
def test_Receiver_with_mismatched_indexes(self):
|
||||
# update router post policy patch
|
||||
|
@ -1679,30 +1697,56 @@ class TestSsyncRxServer(unittest.TestCase):
|
|||
# server socket.
|
||||
|
||||
def setUp(self):
|
||||
self.ts = unit.make_timestamp_iter()
|
||||
self.rx_ip = '127.0.0.1'
|
||||
# dirs
|
||||
self.tmpdir = tempfile.mkdtemp()
|
||||
self.tempdir = os.path.join(self.tmpdir, 'tmp_test_obj_server')
|
||||
|
||||
self.devices = os.path.join(self.tempdir, 'srv/node')
|
||||
self.rx_devices = os.path.join(self.tempdir, 'rx/node')
|
||||
self.tx_devices = os.path.join(self.tempdir, 'tx/node')
|
||||
for device in ('sda1', 'sdb1'):
|
||||
os.makedirs(os.path.join(self.devices, device))
|
||||
for root in (self.rx_devices, self.tx_devices):
|
||||
os.makedirs(os.path.join(root, device))
|
||||
|
||||
self.conf = {
|
||||
'devices': self.devices,
|
||||
'devices': self.rx_devices,
|
||||
'swift_dir': self.tempdir,
|
||||
'mount_check': False,
|
||||
}
|
||||
self.rx_logger = debug_logger('test-object-server')
|
||||
rx_server = server.ObjectController(self.conf, logger=self.rx_logger)
|
||||
sock = eventlet.listen((self.rx_ip, 0))
|
||||
self.rx_app = server.ObjectController(self.conf, logger=self.rx_logger)
|
||||
self.sock = eventlet.listen((self.rx_ip, 0))
|
||||
self.rx_server = eventlet.spawn(
|
||||
eventlet.wsgi.server, sock, rx_server, utils.NullLogger())
|
||||
self.rx_port = sock.getsockname()[1]
|
||||
self.tx_logger = debug_logger('test-reconstructor')
|
||||
eventlet.wsgi.server, self.sock, self.rx_app, utils.NullLogger())
|
||||
self.rx_port = self.sock.getsockname()[1]
|
||||
self.tx_logger = debug_logger('test-daemon')
|
||||
self.policy = POLICIES[0]
|
||||
self.conf['devices'] = self.tx_devices
|
||||
self.daemon = ObjectReconstructor(self.conf, self.tx_logger)
|
||||
self.daemon._diskfile_mgr = self.daemon._df_router[POLICIES[0]]
|
||||
self.daemon._diskfile_mgr = self.daemon._df_router[self.policy]
|
||||
|
||||
self.nodes = [
|
||||
{
|
||||
'device': 'sda1',
|
||||
'ip': '127.0.0.1',
|
||||
'replication_ip': '127.0.0.1',
|
||||
'port': self.rx_port,
|
||||
'replication_port': self.rx_port,
|
||||
},
|
||||
{
|
||||
'device': 'sdb1',
|
||||
'ip': '127.0.0.1',
|
||||
'replication_ip': '127.0.0.1',
|
||||
'port': self.rx_port,
|
||||
'replication_port': self.rx_port,
|
||||
},
|
||||
]
|
||||
|
||||
def tearDown(self):
|
||||
self.rx_server.kill()
|
||||
self.sock.close()
|
||||
eventlet.sleep(0)
|
||||
shutil.rmtree(self.tmpdir)
|
||||
|
||||
def test_SSYNC_disconnect(self):
|
||||
|
@ -1770,6 +1814,107 @@ class TestSsyncRxServer(unittest.TestCase):
|
|||
# sanity check that the receiver did not proceed to missing_check
|
||||
self.assertFalse(mock_missing_check.called)
|
||||
|
||||
def test_sender_job_missing_frag_node_indexes(self):
|
||||
# replication jobs don't send frag_index, so we'll use a REPL_POLICY
|
||||
repl_policy = POLICIES[1]
|
||||
self.assertEqual(repl_policy.policy_type, REPL_POLICY)
|
||||
repl_mgr = self.daemon._df_router[repl_policy]
|
||||
self.daemon._diskfile_mgr = repl_mgr
|
||||
device = self.nodes[0]['device']
|
||||
# create a replicated object, on sender
|
||||
df = repl_mgr.get_diskfile(device, '0', 'a', 'c', 'o',
|
||||
policy=repl_policy)
|
||||
now = next(self.ts)
|
||||
metadata = {
|
||||
'X-Timestamp': now.internal,
|
||||
'Content-Type': 'text/plain',
|
||||
'Content-Length': '0',
|
||||
'ETag': hashlib.md5('').hexdigest(),
|
||||
}
|
||||
with df.create() as writer:
|
||||
writer.write('')
|
||||
writer.put(metadata)
|
||||
# sanity the object is on the sender
|
||||
self.assertTrue(df._datadir.startswith(self.tx_devices))
|
||||
# setup a ssync job
|
||||
suffix = os.path.basename(os.path.dirname(df._datadir))
|
||||
job = {
|
||||
'partition': 0,
|
||||
'policy': repl_policy,
|
||||
'device': device,
|
||||
}
|
||||
sender = ssync_sender.Sender(
|
||||
self.daemon, self.nodes[0], job, [suffix])
|
||||
success, _ = sender()
|
||||
self.assertTrue(success)
|
||||
# sanity object is synced to receiver
|
||||
remote_df = self.rx_app._diskfile_router[repl_policy].get_diskfile(
|
||||
device, '0', 'a', 'c', 'o', policy=repl_policy)
|
||||
self.assertTrue(remote_df._datadir.startswith(self.rx_devices))
|
||||
self.assertEqual(remote_df.read_metadata(), metadata)
|
||||
|
||||
def test_send_frag_index_none(self):
|
||||
# create an ec fragment on the remote node
|
||||
device = self.nodes[1]['device']
|
||||
remote_df = self.rx_app._diskfile_router[self.policy].get_diskfile(
|
||||
device, '1', 'a', 'c', 'o', policy=self.policy)
|
||||
ts1 = next(self.ts)
|
||||
data = 'frag_archive'
|
||||
metadata = {
|
||||
'ETag': hashlib.md5(data).hexdigest(),
|
||||
'X-Timestamp': ts1.internal,
|
||||
'Content-Length': len(data),
|
||||
'X-Object-Sysmeta-Ec-Frag-Index': '3',
|
||||
}
|
||||
with remote_df.create() as writer:
|
||||
writer.write(data)
|
||||
writer.put(metadata)
|
||||
writer.commit(ts1)
|
||||
# create a tombstone on the local node
|
||||
df = self.daemon._df_router[self.policy].get_diskfile(
|
||||
device, '1', 'a', 'c', 'o', policy=self.policy)
|
||||
suffix = os.path.basename(os.path.dirname(df._datadir))
|
||||
ts2 = next(self.ts)
|
||||
df.delete(ts2)
|
||||
# a reconstructor revert job with only tombstones will have frag_index
|
||||
# explicitly set to None
|
||||
job = {
|
||||
'frag_index': None,
|
||||
'partition': 1,
|
||||
'policy': self.policy,
|
||||
'device': device,
|
||||
}
|
||||
sender = ssync_sender.Sender(
|
||||
self.daemon, self.nodes[1], job, [suffix])
|
||||
success, _ = sender()
|
||||
self.assertTrue(success)
|
||||
# diskfile tombstone synced to receiver's datadir with timestamp
|
||||
self.assertTrue(remote_df._datadir.startswith(self.rx_devices))
|
||||
try:
|
||||
remote_df.read_metadata()
|
||||
except exceptions.DiskFileDeleted as e:
|
||||
self.assertEqual(e.timestamp, ts2)
|
||||
else:
|
||||
self.fail('Successfully opened remote DiskFile')
|
||||
|
||||
def test_bad_request_invalid_frag_index(self):
|
||||
with mock.patch('swift.obj.ssync_receiver.Receiver.missing_check')\
|
||||
as mock_missing_check:
|
||||
self.connection = bufferedhttp.BufferedHTTPConnection(
|
||||
'127.0.0.1:%s' % self.rx_port)
|
||||
self.connection.putrequest('SSYNC', '/sda1/0')
|
||||
self.connection.putheader('Transfer-Encoding', 'chunked')
|
||||
self.connection.putheader('X-Backend-Ssync-Frag-Index',
|
||||
'None')
|
||||
self.connection.endheaders()
|
||||
resp = self.connection.getresponse()
|
||||
self.assertEqual(400, resp.status)
|
||||
error_msg = resp.read()
|
||||
self.assertIn("Invalid X-Backend-Ssync-Frag-Index 'None'", error_msg)
|
||||
resp.close()
|
||||
# sanity check that the receiver did not proceed to missing_check
|
||||
self.assertFalse(mock_missing_check.called)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
|
|
@ -312,6 +312,74 @@ class TestSender(BaseTestSender):
|
|||
method_name, mock_method.mock_calls,
|
||||
expected_calls))
|
||||
|
||||
def test_connect_handoff_no_frag(self):
|
||||
node = dict(replication_ip='1.2.3.4', replication_port=5678,
|
||||
device='sda1')
|
||||
job = dict(partition='9', policy=POLICIES[0])
|
||||
self.sender = ssync_sender.Sender(self.daemon, node, job, None)
|
||||
self.sender.suffixes = ['abc']
|
||||
with mock.patch(
|
||||
'swift.obj.ssync_sender.bufferedhttp.BufferedHTTPConnection'
|
||||
) as mock_conn_class:
|
||||
mock_conn = mock_conn_class.return_value
|
||||
mock_resp = mock.MagicMock()
|
||||
mock_resp.status = 200
|
||||
mock_conn.getresponse.return_value = mock_resp
|
||||
self.sender.connect()
|
||||
mock_conn_class.assert_called_once_with('1.2.3.4:5678')
|
||||
expectations = {
|
||||
'putrequest': [
|
||||
mock.call('SSYNC', '/sda1/9'),
|
||||
],
|
||||
'putheader': [
|
||||
mock.call('Transfer-Encoding', 'chunked'),
|
||||
mock.call('X-Backend-Storage-Policy-Index', 0),
|
||||
mock.call('X-Backend-Ssync-Frag-Index', ''),
|
||||
mock.call('X-Backend-Ssync-Node-Index', ''),
|
||||
],
|
||||
'endheaders': [mock.call()],
|
||||
}
|
||||
for method_name, expected_calls in expectations.items():
|
||||
mock_method = getattr(mock_conn, method_name)
|
||||
self.assertEqual(expected_calls, mock_method.mock_calls,
|
||||
'connection method "%s" got %r not %r' % (
|
||||
method_name, mock_method.mock_calls,
|
||||
expected_calls))
|
||||
|
||||
def test_connect_handoff_none_frag(self):
|
||||
node = dict(replication_ip='1.2.3.4', replication_port=5678,
|
||||
device='sda1')
|
||||
job = dict(partition='9', policy=POLICIES[1], frag_index=None)
|
||||
self.sender = ssync_sender.Sender(self.daemon, node, job, None)
|
||||
self.sender.suffixes = ['abc']
|
||||
with mock.patch(
|
||||
'swift.obj.ssync_sender.bufferedhttp.BufferedHTTPConnection'
|
||||
) as mock_conn_class:
|
||||
mock_conn = mock_conn_class.return_value
|
||||
mock_resp = mock.MagicMock()
|
||||
mock_resp.status = 200
|
||||
mock_conn.getresponse.return_value = mock_resp
|
||||
self.sender.connect()
|
||||
mock_conn_class.assert_called_once_with('1.2.3.4:5678')
|
||||
expectations = {
|
||||
'putrequest': [
|
||||
mock.call('SSYNC', '/sda1/9'),
|
||||
],
|
||||
'putheader': [
|
||||
mock.call('Transfer-Encoding', 'chunked'),
|
||||
mock.call('X-Backend-Storage-Policy-Index', 1),
|
||||
mock.call('X-Backend-Ssync-Frag-Index', ''),
|
||||
mock.call('X-Backend-Ssync-Node-Index', ''),
|
||||
],
|
||||
'endheaders': [mock.call()],
|
||||
}
|
||||
for method_name, expected_calls in expectations.items():
|
||||
mock_method = getattr(mock_conn, method_name)
|
||||
self.assertEqual(expected_calls, mock_method.mock_calls,
|
||||
'connection method "%s" got %r not %r' % (
|
||||
method_name, mock_method.mock_calls,
|
||||
expected_calls))
|
||||
|
||||
def test_connect_handoff_replicated(self):
|
||||
node = dict(replication_ip='1.2.3.4', replication_port=5678,
|
||||
device='sda1')
|
||||
|
@ -523,6 +591,7 @@ class TestSender(BaseTestSender):
|
|||
self.assertEqual(candidates, {})
|
||||
|
||||
def test_connect_send_timeout(self):
|
||||
self.daemon.node_timeout = 0.01 # make disconnect fail fast
|
||||
self.daemon.conn_timeout = 0.01
|
||||
node = dict(replication_ip='1.2.3.4', replication_port=5678,
|
||||
device='sda1')
|
||||
|
@ -578,6 +647,7 @@ class TestSender(BaseTestSender):
|
|||
def getresponse(*args, **kwargs):
|
||||
response = FakeResponse()
|
||||
response.status = 503
|
||||
response.read = lambda: 'an error message'
|
||||
return response
|
||||
|
||||
missing_check_fn = 'swift.obj.ssync_sender.Sender.missing_check'
|
||||
|
@ -594,6 +664,7 @@ class TestSender(BaseTestSender):
|
|||
for line in error_lines:
|
||||
self.assertTrue(line.startswith(
|
||||
'1.2.3.4:5678/sda1/9 Expected status 200; got 503'))
|
||||
self.assertIn('an error message', line)
|
||||
# sanity check that Sender did not proceed to missing_check exchange
|
||||
self.assertFalse(mock_missing_check.called)
|
||||
|
||||
|
|
Loading…
Reference in New Issue