merge up to trunk, rollback xml

This commit is contained in:
David Goetz 2011-07-22 12:58:22 -07:00
commit d234461344
33 changed files with 2357 additions and 217 deletions

119
bin/swift
View File

@ -150,22 +150,27 @@ class ClientException(Exception):
return b and '%s: %s' % (a, b) or a
def http_connection(url):
def http_connection(url, proxy=None):
"""
Make an HTTPConnection or HTTPSConnection
:param url: url to connect to
:param proxy: proxy to connect through, if any; None by default; str of the
format 'http://127.0.0.1:8888' to set one
:returns: tuple of (parsed url, connection object)
:raises ClientException: Unable to handle protocol scheme
"""
parsed = urlparse(url)
proxy_parsed = urlparse(proxy) if proxy else None
if parsed.scheme == 'http':
conn = HTTPConnection(parsed.netloc)
conn = HTTPConnection((proxy_parsed if proxy else parsed).netloc)
elif parsed.scheme == 'https':
conn = HTTPSConnection(parsed.netloc)
conn = HTTPSConnection((proxy_parsed if proxy else parsed).netloc)
else:
raise ClientException('Cannot handle protocol scheme %s for url %s' %
(parsed.scheme, repr(url)))
if proxy:
conn._set_tunnel(parsed.hostname, parsed.port)
return parsed, conn
@ -578,40 +583,60 @@ def head_object(url, token, container, name, http_conn=None):
return resp_headers
def put_object(url, token, container, name, contents, content_length=None,
etag=None, chunk_size=65536, content_type=None, headers=None,
http_conn=None):
def put_object(url, token=None, container=None, name=None, contents=None,
content_length=None, etag=None, chunk_size=65536,
content_type=None, headers=None, http_conn=None, proxy=None):
"""
Put an object
:param url: storage URL
:param token: auth token
:param container: container name that the object is in
:param name: object name to put
:param contents: a string or a file like object to read object data from
:param token: auth token; if None, no token will be sent
:param container: container name that the object is in; if None, the
container name is expected to be part of the url
:param name: object name to put; if None, the object name is expected to be
part of the url
:param contents: a string or a file like object to read object data from;
if None, a zero-byte put will be done
:param content_length: value to send as content-length header; also limits
the amount read from contents
:param etag: etag of contents
:param chunk_size: chunk size of data to write
:param content_type: value to send as content-type header
:param headers: additional headers to include in the request
the amount read from contents; if None, it will be
computed via the contents or chunked transfer
encoding will be used
:param etag: etag of contents; if None, no etag will be sent
:param chunk_size: chunk size of data to write; default 65536
:param content_type: value to send as content-type header; if None, no
content-type will be set (remote end will likely try
to auto-detect it)
:param headers: additional headers to include in the request, if any
:param http_conn: HTTP connection object (If None, it will create the
conn object)
:param proxy: proxy to connect through, if any; None by default; str of the
format 'http://127.0.0.1:8888' to set one
:returns: etag from server response
:raises ClientException: HTTP PUT request failed
"""
if http_conn:
parsed, conn = http_conn
else:
parsed, conn = http_connection(url)
path = '%s/%s/%s' % (parsed.path, quote(container), quote(name))
if not headers:
parsed, conn = http_connection(url, proxy=proxy)
path = parsed.path
if container:
path = '%s/%s' % (path.rstrip('/'), quote(container))
if name:
path = '%s/%s' % (path.rstrip('/'), quote(name))
if headers:
headers = dict(headers)
else:
headers = {}
headers['X-Auth-Token'] = token
if token:
headers['X-Auth-Token'] = token
if etag:
headers['ETag'] = etag.strip('"')
if content_length is not None:
headers['Content-Length'] = str(content_length)
else:
for n, v in headers.iteritems():
if n.lower() == 'content-length':
content_length = int(v)
if content_type is not None:
headers['Content-Type'] = content_type
if not contents:
@ -646,7 +671,7 @@ def put_object(url, token, container, name, contents, content_length=None,
raise ClientException('Object PUT failed', http_scheme=parsed.scheme,
http_host=conn.host, http_port=conn.port, http_path=path,
http_status=resp.status, http_reason=resp.reason)
return resp.getheader('etag').strip('"')
return resp.getheader('etag', '').strip('"')
def post_object(url, token, container, name, headers, http_conn=None):
@ -677,24 +702,40 @@ def post_object(url, token, container, name, headers, http_conn=None):
http_status=resp.status, http_reason=resp.reason)
def delete_object(url, token, container, name, http_conn=None):
def delete_object(url, token=None, container=None, name=None, http_conn=None,
headers=None, proxy=None):
"""
Delete object
:param url: storage URL
:param token: auth token
:param container: container name that the object is in
:param name: object name to delete
:param token: auth token; if None, no token will be sent
:param container: container name that the object is in; if None, the
container name is expected to be part of the url
:param name: object name to delete; if None, the object name is expected to
be part of the url
:param http_conn: HTTP connection object (If None, it will create the
conn object)
:param headers: additional headers to include in the request
:param proxy: proxy to connect through, if any; None by default; str of the
format 'http://127.0.0.1:8888' to set one
:raises ClientException: HTTP DELETE request failed
"""
if http_conn:
parsed, conn = http_conn
else:
parsed, conn = http_connection(url)
path = '%s/%s/%s' % (parsed.path, quote(container), quote(name))
conn.request('DELETE', path, '', {'X-Auth-Token': token})
parsed, conn = http_connection(url, proxy=proxy)
path = parsed.path
if container:
path = '%s/%s' % (path.rstrip('/'), quote(container))
if name:
path = '%s/%s' % (path.rstrip('/'), quote(name))
if headers:
headers = dict(headers)
else:
headers = {}
if token:
headers['X-Auth-Token'] = token
conn.request('DELETE', path, '', headers)
resp = conn.getresponse()
resp.read()
if resp.status < 200 or resp.status >= 300:
@ -1363,10 +1404,14 @@ Container: %s
Objects: %d
Bytes: %d
Read ACL: %s
Write ACL: %s'''.strip('\n') % (conn.url.rsplit('/', 1)[-1], args[0],
Write ACL: %s
Sync To: %s
Sync Key: %s'''.strip('\n') % (conn.url.rsplit('/', 1)[-1], args[0],
object_count, bytes_used,
headers.get('x-container-read', ''),
headers.get('x-container-write', '')))
headers.get('x-container-write', ''),
headers.get('x-container-sync-to', ''),
headers.get('x-container-sync-key', '')))
for key, value in headers.items():
if key.startswith('x-container-meta-'):
print_queue.put('%9s: %s' % ('Meta %s' %
@ -1375,7 +1420,8 @@ Write ACL: %s'''.strip('\n') % (conn.url.rsplit('/', 1)[-1], args[0],
if not key.startswith('x-container-meta-') and key not in (
'content-length', 'date', 'x-container-object-count',
'x-container-bytes-used', 'x-container-read',
'x-container-write'):
'x-container-write', 'x-container-sync-to',
'x-container-sync-key'):
print_queue.put(
'%9s: %s' % (key.title(), value))
except ClientException, err:
@ -1440,13 +1486,18 @@ def st_post(options, args, print_queue, error_queue):
parser.add_option('-w', '--write-acl', dest='write_acl', help='Sets the '
'Write ACL for containers. Quick summary of ACL syntax: account1, '
'account2:user2')
parser.add_option('-t', '--sync-to', dest='sync_to', help='Sets the '
'Sync To for containers, for multi-cluster replication.')
parser.add_option('-k', '--sync-key', dest='sync_key', help='Sets the '
'Sync Key for containers, for multi-cluster replication.')
parser.add_option('-m', '--meta', action='append', dest='meta', default=[],
help='Sets a meta data item with the syntax name:value. This option '
'may be repeated. Example: -m Color:Blue -m Size:Large')
(options, args) = parse_args(parser, args)
args = args[1:]
if (options.read_acl or options.write_acl) and not args:
exit('-r and -w options only allowed for containers')
if (options.read_acl or options.write_acl or options.sync_to or
options.sync_key) and not args:
exit('-r, -w, -t, and -k options only allowed for containers')
conn = Connection(options.auth, options.user, options.key)
if not args:
headers = {}
@ -1474,6 +1525,10 @@ def st_post(options, args, print_queue, error_queue):
headers['X-Container-Read'] = options.read_acl
if options.write_acl is not None:
headers['X-Container-Write'] = options.write_acl
if options.sync_to is not None:
headers['X-Container-Sync-To'] = options.sync_to
if options.sync_key is not None:
headers['X-Container-Sync-Key'] = options.sync_key
try:
conn.post_container(args[0], headers=headers)
except ClientException, err:

23
bin/swift-container-sync Executable file
View File

@ -0,0 +1,23 @@
#!/usr/bin/python
# Copyright (c) 2010-2011 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from swift.container.sync import ContainerSync
from swift.common.utils import parse_options
from swift.common.daemon import run_daemon
if __name__ == '__main__':
conf_file, options = parse_options(once=True)
run_daemon(ContainerSync, conf_file, **options)

View File

@ -34,3 +34,10 @@ Container Auditor
:undoc-members:
:show-inheritance:
Container Sync
==============
.. automodule:: swift.container.sync
:members:
:undoc-members:
:show-inheritance:

View File

@ -400,6 +400,8 @@ Sample configuration files are provided with all defaults in line-by-line commen
[container-auditor]
[container-sync]
#. Create `/etc/swift/container-server/2.conf`::
[DEFAULT]
@ -422,6 +424,8 @@ Sample configuration files are provided with all defaults in line-by-line commen
[container-auditor]
[container-sync]
#. Create `/etc/swift/container-server/3.conf`::
[DEFAULT]
@ -444,6 +448,8 @@ Sample configuration files are provided with all defaults in line-by-line commen
[container-auditor]
[container-sync]
#. Create `/etc/swift/container-server/4.conf`::
[DEFAULT]
@ -466,6 +472,8 @@ Sample configuration files are provided with all defaults in line-by-line commen
[container-auditor]
[container-sync]
#. Create `/etc/swift/object-server/1.conf`::

View File

@ -44,6 +44,7 @@ Overview and Concepts
overview_replication
ratelimit
overview_large_objects
overview_container_sync
Developer Documentation
=======================

View File

@ -27,11 +27,17 @@ validation.
Swift will make calls to the auth system, giving the auth token to be
validated. For a valid token, the auth system responds with an overall
expiration in seconds from now. Swift will cache the token up to the expiration
time. The included TempAuth also has the concept of admin and non-admin users
within an account. Admin users can do anything within the account. Non-admin
users can only perform operations per container based on the container's
X-Container-Read and X-Container-Write ACLs. For more information on ACLs, see
:mod:`swift.common.middleware.acl`
time.
The included TempAuth also has the concept of admin and non-admin users within
an account. Admin users can do anything within the account. Non-admin users can
only perform operations per container based on the container's X-Container-Read
and X-Container-Write ACLs. For more information on ACLs, see
:mod:`swift.common.middleware.acl`.
Additionally, if the auth system sets the request environ's swift_owner key to
True, the proxy will return additional header information in some requests,
such as the X-Container-Sync-Key for a container GET or HEAD.
The user starts a session by sending a ReST request to the auth system to
receive the auth token and a URL to the Swift system.

View File

@ -0,0 +1,228 @@
======================================
Container to Container Synchronization
======================================
--------
Overview
--------
Swift has a feature where all the contents of a container can be mirrored to
another container through background synchronization. Swift cluster operators
configure their cluster to allow/accept sync requests to/from other clusters,
and the user specifies where to sync their container to along with a secret
synchronization key.
.. note::
Container sync will sync object POSTs only if the proxy server is set to
use "object_post_as_copy = true" which is the default. So-called fast
object posts, "object_post_as_copy = false" do not update the container
listings and therefore can't be detected for synchronization.
.. note::
If you are using the large objects feature you will need to ensure both
your manifest file and your segment files are synced if they happen to be
in different containers.
--------------------------------------------
Configuring a Cluster's Allowable Sync Hosts
--------------------------------------------
The Swift cluster operator must allow synchronization with a set of hosts
before the user can enable container synchronization. First, the backend
container server needs to be given this list of hosts in the
container-server.conf file::
[DEFAULT]
# This is a comma separated list of hosts allowed in the
# X-Container-Sync-To field for containers.
# allowed_sync_hosts = 127.0.0.1
allowed_sync_hosts = host1,host2,etc.
...
[container-sync]
# You can override the default log routing for this app here (don't
# use set!):
# log_name = container-sync
# log_facility = LOG_LOCAL0
# log_level = INFO
# Will sync, at most, each container once per interval
# interval = 300
# Maximum amount of time to spend syncing each container
# container_time = 60
Tracking sync progress, problems, and just general activity can only be
achieved with log processing for this first release of container
synchronization. In that light, you may wish to set the above `log_` options to
direct the container-sync logs to a different file for easier monitoring.
Additionally, it should be noted there is no way for an end user to detect sync
progress or problems other than HEADing both containers and comparing the
overall information.
The authentication system also needs to be configured to allow synchronization
requests. Here is an example with TempAuth::
[filter:tempauth]
# This is a comma separated list of hosts allowed to send
# X-Container-Sync-Key requests.
# allowed_sync_hosts = 127.0.0.1
allowed_sync_hosts = host1,host2,etc.
The default of 127.0.0.1 is just so no configuration is required for SAIO
setups -- for testing.
----------------------------------------------------------
Using the ``swift`` tool to set up synchronized containers
----------------------------------------------------------
.. note::
You must be the account admin on the account to set synchronization targets
and keys.
You simply tell each container where to sync to and give it a secret
synchronization key. First, let's get the account details for our two cluster
accounts::
$ swift -A http://cluster1/auth/v1.0 -U test:tester -K testing stat -v
StorageURL: http://cluster1/v1/AUTH_208d1854-e475-4500-b315-81de645d060e
Auth Token: AUTH_tkd5359e46ff9e419fa193dbd367f3cd19
Account: AUTH_208d1854-e475-4500-b315-81de645d060e
Containers: 0
Objects: 0
Bytes: 0
$ swift -A http://cluster2/auth/v1.0 -U test2:tester2 -K testing2 stat -v
StorageURL: http://cluster2/v1/AUTH_33cdcad8-09fb-4940-90da-0f00cbf21c7c
Auth Token: AUTH_tk816a1aaf403c49adb92ecfca2f88e430
Account: AUTH_33cdcad8-09fb-4940-90da-0f00cbf21c7c
Containers: 0
Objects: 0
Bytes: 0
Now, let's make our first container and tell it to synchronize to a second
we'll make next::
$ swift -A http://cluster1/auth/v1.0 -U test:tester -K testing post \
-t 'http://cluster2/v1/AUTH_33cdcad8-09fb-4940-90da-0f00cbf21c7c/container2' \
-k 'secret' container1
The ``-t`` indicates the URL to sync to, which is the ``StorageURL`` from
cluster2 we retrieved above plus the container name. The ``-k`` specifies the
secret key the two containers will share for synchronization. Now, we'll do
something similar for the second cluster's container::
$ swift -A http://cluster2/auth/v1.0 -U test2:tester2 -K testing2 post \
-t 'http://cluster1/v1/AUTH_208d1854-e475-4500-b315-81de645d060e/container1' \
-k 'secret' container2
That's it. Now we can upload a bunch of stuff to the first container and watch
as it gets synchronized over to the second::
$ swift -A http://cluster1/auth/v1.0 -U test:tester -K testing \
upload container1 .
photo002.png
photo004.png
photo001.png
photo003.png
$ swift -A http://cluster2/auth/v1.0 -U test2:tester2 -K testing2 \
list container2
[Nothing there yet, so we wait a bit...]
[If you're an operator running SAIO and just testing, you may need to
run 'swift-init container-sync once' to perform a sync scan.]
$ swift -A http://cluster2/auth/v1.0 -U test2:tester2 -K testing2 \
list container2
photo001.png
photo002.png
photo003.png
photo004.png
You can also set up a chain of synced containers if you want more than two.
You'd point 1 -> 2, then 2 -> 3, and finally 3 -> 1 for three containers.
They'd all need to share the same secret synchronization key.
-----------------------------------
Using curl (or other tools) instead
-----------------------------------
So what's ``swift`` doing behind the scenes? Nothing overly complicated. It
translates the ``-t <value>`` option into an ``X-Container-Sync-To: <value>``
header and the ``-k <value>`` option into an ``X-Container-Sync-Key: <value>``
header.
For instance, when we created the first container above and told it to
synchronize to the second, we could have used this curl command::
$ curl -i -X POST -H 'X-Auth-Token: AUTH_tkd5359e46ff9e419fa193dbd367f3cd19' \
-H 'X-Container-Sync-To: http://cluster2/v1/AUTH_33cdcad8-09fb-4940-90da-0f00cbf21c7c/container2' \
-H 'X-Container-Sync-Key: secret' \
'http://cluster1/v1/AUTH_208d1854-e475-4500-b315-81de645d060e/container1'
HTTP/1.1 204 No Content
Content-Length: 0
Content-Type: text/plain; charset=UTF-8
Date: Thu, 24 Feb 2011 22:39:14 GMT
--------------------------------------------------
What's going on behind the scenes, in the cluster?
--------------------------------------------------
The swift-container-sync does the job of sending updates to the remote
container.
This is done by scanning the local devices for container databases and
checking for x-container-sync-to and x-container-sync-key metadata values.
If they exist, newer rows since the last sync will trigger PUTs or DELETEs
to the other container.
.. note::
Container sync will sync object POSTs only if the proxy server is set to
use "object_post_as_copy = true" which is the default. So-called fast
object posts, "object_post_as_copy = false" do not update the container
listings and therefore can't be detected for synchronization.
The actual syncing is slightly more complicated to make use of the three
(or number-of-replicas) main nodes for a container without each trying to
do the exact same work but also without missing work if one node happens to
be down.
Two sync points are kept per container database. All rows between the two
sync points trigger updates. Any rows newer than both sync points cause
updates depending on the node's position for the container (primary nodes
do one third, etc. depending on the replica count of course). After a sync
run, the first sync point is set to the newest ROWID known and the second
sync point is set to newest ROWID for which all updates have been sent.
An example may help. Assume replica count is 3 and perfectly matching
ROWIDs starting at 1.
First sync run, database has 6 rows:
* SyncPoint1 starts as -1.
* SyncPoint2 starts as -1.
* No rows between points, so no "all updates" rows.
* Six rows newer than SyncPoint1, so a third of the rows are sent
by node 1, another third by node 2, remaining third by node 3.
* SyncPoint1 is set as 6 (the newest ROWID known).
* SyncPoint2 is left as -1 since no "all updates" rows were synced.
Next sync run, database has 12 rows:
* SyncPoint1 starts as 6.
* SyncPoint2 starts as -1.
* The rows between -1 and 6 all trigger updates (most of which
should short-circuit on the remote end as having already been
done).
* Six more rows newer than SyncPoint1, so a third of the rows are
sent by node 1, another third by node 2, remaining third by node
3.
* SyncPoint1 is set as 12 (the newest ROWID known).
* SyncPoint2 is set as 6 (the newest "all updates" ROWID).
In this way, under normal circumstances each node sends its share of
updates each run and just sends a batch of older updates to ensure nothing
was missed.

View File

@ -13,9 +13,9 @@ special manifest file is created that, when downloaded, sends all the segments
concatenated as a single object. This also offers much greater upload speed
with the possibility of parallel uploads of the segments.
----------------------------------
-------------------------------------
Using ``swift`` for Segmented Objects
----------------------------------
-------------------------------------
The quickest way to try out this feature is use the included ``swift`` Swift Tool.
You can use the ``-S`` option to specify the segment size to use when splitting
@ -120,6 +120,13 @@ Additional Notes
for the manifest itself, so this method was chosen to at least offer change
detection.
.. note::
If you are using the container sync feature you will need to ensure both
your manifest file and your segment files are synced if they happen to be
in different containers.
-------
History
-------

View File

@ -35,20 +35,17 @@ rate_buffer_seconds 5 Number of seconds the rate counter can
faster than listed rate). A larger number
will result in larger spikes in rate but
better average accuracy.
account_ratelimit 0 If set, will limit all requests to
/account_name and PUTs to
/account_name/container_name. Number is in
requests per second
account_ratelimit 0 If set, will limit PUT and DELETE requests
to /account_name/container_name.
Number is in requests per second.
account_whitelist '' Comma separated lists of account names that
will not be rate limited.
account_blacklist '' Comma separated lists of account names that
will not be allowed. Returns a 497 response.
container_ratelimit_size '' When set with container_limit_x = r:
for containers of size x, limit requests
per second to r. Will limit GET and HEAD
requests to /account_name/container_name
and PUTs and DELETEs to
/account_name/container_name/object_name
per second to r. Will limit PUT, DELETE,
and POST requests to /a/c/o.
======================== ========= ===========================================
The container rate limits are linearly interpolated from the values given. A

View File

@ -7,6 +7,9 @@
# swift_dir = /etc/swift
# devices = /srv/node
# mount_check = true
# This is a comma separated list of hosts allowed in the X-Container-Sync-To
# field for containers.
# allowed_sync_hosts = 127.0.0.1
# You can specify default log routing here if you want:
# log_name = swift
# log_facility = LOG_LOCAL0
@ -60,3 +63,15 @@ use = egg:swift#container
# log_level = INFO
# Will audit, at most, 1 container per device per interval
# interval = 1800
[container-sync]
# You can override the default log routing for this app here (don't use set!):
# log_name = container-sync
# log_facility = LOG_LOCAL0
# log_level = INFO
# If you need to use an HTTP Proxy, set it here; defaults to no proxy.
# sync_proxy = http://127.0.0.1:8888
# Will sync, at most, each container once per interval
# interval = 300
# Maximum amount of time to spend syncing each container per pass
# container_time = 60

View File

@ -65,6 +65,9 @@ use = egg:swift#tempauth
# to the auth subsystem, for granting tokens, etc.
# auth_prefix = /auth/
# token_life = 86400
# This is a comma separated list of hosts allowed to send X-Container-Sync-Key
# requests.
# allowed_sync_hosts = 127.0.0.1
# Lastly, you need to list all the accounts/users you want here. The format is:
# user_<account>_<user> = <key> [group] [group] [...] [storage_url]
# There are special groups of:

View File

@ -80,7 +80,7 @@ setup(
'bin/swift-account-audit', 'bin/swift-account-reaper',
'bin/swift-account-replicator', 'bin/swift-account-server',
'bin/swift-container-auditor',
'bin/swift-container-replicator',
'bin/swift-container-replicator', 'bin/swift-container-sync',
'bin/swift-container-server', 'bin/swift-container-updater',
'bin/swift-drive-audit', 'bin/swift-get-nodes',
'bin/swift-init', 'bin/swift-object-auditor',

View File

@ -137,22 +137,27 @@ class ClientException(Exception):
return b and '%s: %s' % (a, b) or a
def http_connection(url):
def http_connection(url, proxy=None):
"""
Make an HTTPConnection or HTTPSConnection
:param url: url to connect to
:param proxy: proxy to connect through, if any; None by default; str of the
format 'http://127.0.0.1:8888' to set one
:returns: tuple of (parsed url, connection object)
:raises ClientException: Unable to handle protocol scheme
"""
parsed = urlparse(url)
proxy_parsed = urlparse(proxy) if proxy else None
if parsed.scheme == 'http':
conn = HTTPConnection(parsed.netloc)
conn = HTTPConnection((proxy_parsed if proxy else parsed).netloc)
elif parsed.scheme == 'https':
conn = HTTPSConnection(parsed.netloc)
conn = HTTPSConnection((proxy_parsed if proxy else parsed).netloc)
else:
raise ClientException('Cannot handle protocol scheme %s for url %s' %
(parsed.scheme, repr(url)))
if proxy:
conn._set_tunnel(parsed.hostname, parsed.port)
return parsed, conn
@ -565,40 +570,60 @@ def head_object(url, token, container, name, http_conn=None):
return resp_headers
def put_object(url, token, container, name, contents, content_length=None,
etag=None, chunk_size=65536, content_type=None, headers=None,
http_conn=None):
def put_object(url, token=None, container=None, name=None, contents=None,
content_length=None, etag=None, chunk_size=65536,
content_type=None, headers=None, http_conn=None, proxy=None):
"""
Put an object
:param url: storage URL
:param token: auth token
:param container: container name that the object is in
:param name: object name to put
:param contents: a string or a file like object to read object data from
:param token: auth token; if None, no token will be sent
:param container: container name that the object is in; if None, the
container name is expected to be part of the url
:param name: object name to put; if None, the object name is expected to be
part of the url
:param contents: a string or a file like object to read object data from;
if None, a zero-byte put will be done
:param content_length: value to send as content-length header; also limits
the amount read from contents
:param etag: etag of contents
:param chunk_size: chunk size of data to write
:param content_type: value to send as content-type header
:param headers: additional headers to include in the request
the amount read from contents; if None, it will be
computed via the contents or chunked transfer
encoding will be used
:param etag: etag of contents; if None, no etag will be sent
:param chunk_size: chunk size of data to write; default 65536
:param content_type: value to send as content-type header; if None, no
content-type will be set (remote end will likely try
to auto-detect it)
:param headers: additional headers to include in the request, if any
:param http_conn: HTTP connection object (If None, it will create the
conn object)
:param proxy: proxy to connect through, if any; None by default; str of the
format 'http://127.0.0.1:8888' to set one
:returns: etag from server response
:raises ClientException: HTTP PUT request failed
"""
if http_conn:
parsed, conn = http_conn
else:
parsed, conn = http_connection(url)
path = '%s/%s/%s' % (parsed.path, quote(container), quote(name))
if not headers:
parsed, conn = http_connection(url, proxy=proxy)
path = parsed.path
if container:
path = '%s/%s' % (path.rstrip('/'), quote(container))
if name:
path = '%s/%s' % (path.rstrip('/'), quote(name))
if headers:
headers = dict(headers)
else:
headers = {}
headers['X-Auth-Token'] = token
if token:
headers['X-Auth-Token'] = token
if etag:
headers['ETag'] = etag.strip('"')
if content_length is not None:
headers['Content-Length'] = str(content_length)
else:
for n, v in headers.iteritems():
if n.lower() == 'content-length':
content_length = int(v)
if content_type is not None:
headers['Content-Type'] = content_type
if not contents:
@ -633,7 +658,7 @@ def put_object(url, token, container, name, contents, content_length=None,
raise ClientException('Object PUT failed', http_scheme=parsed.scheme,
http_host=conn.host, http_port=conn.port, http_path=path,
http_status=resp.status, http_reason=resp.reason)
return resp.getheader('etag').strip('"')
return resp.getheader('etag', '').strip('"')
def post_object(url, token, container, name, headers, http_conn=None):
@ -664,24 +689,40 @@ def post_object(url, token, container, name, headers, http_conn=None):
http_status=resp.status, http_reason=resp.reason)
def delete_object(url, token, container, name, http_conn=None):
def delete_object(url, token=None, container=None, name=None, http_conn=None,
headers=None, proxy=None):
"""
Delete object
:param url: storage URL
:param token: auth token
:param container: container name that the object is in
:param name: object name to delete
:param token: auth token; if None, no token will be sent
:param container: container name that the object is in; if None, the
container name is expected to be part of the url
:param name: object name to delete; if None, the object name is expected to
be part of the url
:param http_conn: HTTP connection object (If None, it will create the
conn object)
:param headers: additional headers to include in the request
:param proxy: proxy to connect through, if any; None by default; str of the
format 'http://127.0.0.1:8888' to set one
:raises ClientException: HTTP DELETE request failed
"""
if http_conn:
parsed, conn = http_conn
else:
parsed, conn = http_connection(url)
path = '%s/%s/%s' % (parsed.path, quote(container), quote(name))
conn.request('DELETE', path, '', {'X-Auth-Token': token})
parsed, conn = http_connection(url, proxy=proxy)
path = parsed.path
if container:
path = '%s/%s' % (path.rstrip('/'), quote(container))
if name:
path = '%s/%s' % (path.rstrip('/'), quote(name))
if headers:
headers = dict(headers)
else:
headers = {}
if token:
headers['X-Auth-Token'] = token
conn.request('DELETE', path, '', headers)
resp = conn.getresponse()
resp.read()
if resp.status < 200 or resp.status >= 300:

View File

@ -666,7 +666,9 @@ class ContainerBroker(DatabaseBroker):
id TEXT,
status TEXT DEFAULT '',
status_changed_at TEXT DEFAULT '0',
metadata TEXT DEFAULT ''
metadata TEXT DEFAULT '',
x_container_sync_point1 INTEGER DEFAULT -1,
x_container_sync_point2 INTEGER DEFAULT -1
);
INSERT INTO container_stat (object_count, bytes_used)
@ -886,7 +888,8 @@ class ContainerBroker(DatabaseBroker):
:returns: dict with keys: account, container, created_at,
put_timestamp, delete_timestamp, object_count, bytes_used,
reported_put_timestamp, reported_delete_timestamp,
reported_object_count, reported_bytes_used, hash, id
reported_object_count, reported_bytes_used, hash, id,
x_container_sync_point1, and x_container_sync_point2.
If include_metadata is set, metadata is included as a key
pointing to a dict of tuples of the metadata
"""
@ -896,35 +899,83 @@ class ContainerBroker(DatabaseBroker):
if not self.stale_reads_ok:
raise
with self.get() as conn:
metadata = ''
if include_metadata:
metadata = ', metadata'
try:
data = conn.execute('''
SELECT account, container, created_at, put_timestamp,
delete_timestamp, object_count, bytes_used,
reported_put_timestamp, reported_delete_timestamp,
reported_object_count, reported_bytes_used, hash, id
%s
FROM container_stat
''' % metadata).fetchone()
except sqlite3.OperationalError, err:
if 'no such column: metadata' not in str(err):
raise
data = conn.execute('''
SELECT account, container, created_at, put_timestamp,
delete_timestamp, object_count, bytes_used,
reported_put_timestamp, reported_delete_timestamp,
reported_object_count, reported_bytes_used, hash, id
FROM container_stat''').fetchone()
data = None
trailing1 = 'metadata'
trailing2 = 'x_container_sync_point1, x_container_sync_point2'
while not data:
try:
data = conn.execute('''
SELECT account, container, created_at, put_timestamp,
delete_timestamp, object_count, bytes_used,
reported_put_timestamp, reported_delete_timestamp,
reported_object_count, reported_bytes_used, hash,
id, %s, %s
FROM container_stat
''' % (trailing1, trailing2)).fetchone()
except sqlite3.OperationalError, err:
if 'no such column: metadata' in str(err):
trailing1 = "'' as metadata"
elif 'no such column: x_container_sync_point' in str(err):
trailing2 = '-1 AS x_container_sync_point1, ' \
'-1 AS x_container_sync_point2'
else:
raise
data = dict(data)
if include_metadata:
try:
data['metadata'] = json.loads(data.get('metadata', ''))
except ValueError:
data['metadata'] = {}
elif 'metadata' in data:
del data['metadata']
return data
def set_x_container_sync_points(self, sync_point1, sync_point2):
with self.get() as conn:
orig_isolation_level = conn.isolation_level
try:
# We turn off auto-transactions to ensure the alter table
# commands are part of the transaction.
conn.isolation_level = None
conn.execute('BEGIN')
try:
self._set_x_container_sync_points(conn, sync_point1,
sync_point2)
except sqlite3.OperationalError, err:
if 'no such column: x_container_sync_point' not in str(err):
raise
conn.execute('''
ALTER TABLE container_stat
ADD COLUMN x_container_sync_point1 INTEGER DEFAULT -1
''')
conn.execute('''
ALTER TABLE container_stat
ADD COLUMN x_container_sync_point2 INTEGER DEFAULT -1
''')
self._set_x_container_sync_points(conn, sync_point1,
sync_point2)
conn.execute('COMMIT')
finally:
conn.isolation_level = orig_isolation_level
def _set_x_container_sync_points(self, conn, sync_point1, sync_point2):
if sync_point1 is not None and sync_point2 is not None:
conn.execute('''
UPDATE container_stat
SET x_container_sync_point1 = ?,
x_container_sync_point2 = ?
''', (sync_point1, sync_point2))
elif sync_point1 is not None:
conn.execute('''
UPDATE container_stat
SET x_container_sync_point1 = ?
''', (sync_point1,))
elif sync_point2 is not None:
conn.execute('''
UPDATE container_stat
SET x_container_sync_point2 = ?
''', (sync_point2,))
def reported(self, put_timestamp, delete_timestamp, object_count,
bytes_used):
"""

View File

@ -31,9 +31,10 @@ RUN_DIR = '/var/run/swift'
# auth-server has been removed from ALL_SERVERS, start it explicitly
ALL_SERVERS = ['account-auditor', 'account-server', 'container-auditor',
'container-replicator', 'container-server', 'container-updater',
'object-auditor', 'object-server', 'object-replicator', 'object-updater',
'proxy-server', 'account-replicator', 'account-reaper']
'container-replicator', 'container-server', 'container-sync',
'container-updater', 'object-auditor', 'object-server',
'object-replicator', 'object-updater', 'proxy-server',
'account-replicator', 'account-reaper']
MAIN_SERVERS = ['proxy-server', 'account-server', 'container-server',
'object-server']
REST_SERVERS = [s for s in ALL_SERVERS if s not in MAIN_SERVERS]

View File

@ -45,6 +45,10 @@ class DomainRemapMiddleware(object):
derive account and container names from elements in the domain name and
put those derived values into the URL path (leaving the Host header
unchanged).
Also note that using container sync with remapped domain names is not
advised. With container sync, you should use the true storage end points as
sync destinations.
"""
def __init__(self, app, conf):

View File

@ -105,16 +105,15 @@ class RateLimitMiddleware(object):
:param obj_name: object name from path
"""
keys = []
if self.account_ratelimit and account_name and (
not (container_name or obj_name) or
(container_name and not obj_name and
req_method in ('PUT', 'DELETE'))):
# COPYs are not limited
if self.account_ratelimit and \
account_name and container_name and not obj_name and \
req_method in ('PUT', 'DELETE'):
keys.append(("ratelimit/%s" % account_name,
self.account_ratelimit))
if account_name and container_name and (
(not obj_name and req_method in ('GET', 'HEAD')) or
(obj_name and req_method in ('PUT', 'DELETE'))):
if account_name and container_name and obj_name and \
req_method in ('PUT', 'DELETE', 'POST'):
container_size = None
memcache_key = get_container_memcache_key(account_name,
container_name)

View File

@ -128,13 +128,23 @@ def canonical_string(req):
"""
Canonicalize a request to a token that can be signed.
"""
amz_headers = {}
buf = "%s\n%s\n%s\n" % (req.method, req.headers.get('Content-MD5', ''),
req.headers.get('Content-Type') or '')
if 'Date' in req.headers:
buf += "%s\n" % req.headers['Date']
for amz_header in sorted((key.lower() for key in req.headers
if key.lower().startswith('x-amz-'))):
buf += "%s:%s\n" % (amz_header, req.headers[amz_header])
amz_headers[amz_header] = req.headers[amz_header]
if 'x-amz-date' in amz_headers:
buf += "\n"
elif 'Date' in req.headers:
buf += "%s\n" % req.headers['Date']
for k in sorted(key.lower() for key in amz_headers):
buf += "%s:%s\n" % (k, amz_headers[k])
path = req.path_qs
if '?' in path:
path, args = path.split('?', 1)

View File

@ -27,7 +27,8 @@ from webob.exc import HTTPBadRequest, HTTPForbidden, HTTPNotFound, \
HTTPUnauthorized
from swift.common.middleware.acl import clean_acl, parse_acl, referrer_allowed
from swift.common.utils import cache_from_env, get_logger, split_path
from swift.common.utils import cache_from_env, get_logger, get_remote_client, \
split_path
class TempAuth(object):
@ -70,6 +71,9 @@ class TempAuth(object):
if self.auth_prefix[-1] != '/':
self.auth_prefix += '/'
self.token_life = int(conf.get('token_life', 86400))
self.allowed_sync_hosts = [h.strip()
for h in conf.get('allowed_sync_hosts', '127.0.0.1').split(',')
if h.strip()]
self.users = {}
for conf_key in conf:
if conf_key.startswith('user_'):
@ -245,11 +249,20 @@ class TempAuth(object):
if '.reseller_admin' in user_groups and \
account != self.reseller_prefix and \
account[len(self.reseller_prefix)] != '.':
req.environ['swift_owner'] = True
return None
if account in user_groups and \
(req.method not in ('DELETE', 'PUT') or container):
# If the user is admin for the account and is not trying to do an
# account DELETE or PUT...
req.environ['swift_owner'] = True
return None
if (req.environ.get('swift_sync_key') and
req.environ['swift_sync_key'] ==
req.headers.get('x-container-sync-key', None) and
'x-timestamp' in req.headers and
(req.remote_addr in self.allowed_sync_hosts or
get_remote_client(req) in self.allowed_sync_hosts)):
return None
referrers, groups = parse_acl(getattr(req, 'acl', None))
if referrer_allowed(req.referer, referrers):

View File

@ -972,6 +972,32 @@ def urlparse(url):
return ModifiedParseResult(*stdlib_urlparse(url))
def validate_sync_to(value, allowed_sync_hosts):
p = urlparse(value)
if p.scheme not in ('http', 'https'):
return _('Invalid scheme %r in X-Container-Sync-To, must be "http" '
'or "https".') % p.scheme
if not p.path:
return _('Path required in X-Container-Sync-To')
if p.params or p.query or p.fragment:
return _('Params, queries, and fragments not allowed in '
'X-Container-Sync-To')
if p.hostname not in allowed_sync_hosts:
return _('Invalid host %r in X-Container-Sync-To') % p.hostname
return None
def get_remote_client(req):
# remote host for zeus
client = req.headers.get('x-cluster-client-ip')
if not client and 'x-forwarded-for' in req.headers:
# remote host for other lbs
client = req.headers['x-forwarded-for'].split(',')[0].strip()
if not client:
client = req.remote_addr
return client
def human_readable(value):
"""
Returns the number in a human readable format; for example 1048576 = "1Mi".

View File

@ -32,7 +32,7 @@ from webob.exc import HTTPAccepted, HTTPBadRequest, HTTPConflict, \
from swift.common.db import ContainerBroker
from swift.common.utils import get_logger, get_param, hash_path, \
normalize_timestamp, storage_directory, split_path
normalize_timestamp, storage_directory, split_path, validate_sync_to
from swift.common.constraints import CONTAINER_LISTING_LIMIT, \
check_mount, check_float, check_utf8
from swift.common.bufferedhttp import http_connect
@ -46,7 +46,8 @@ class ContainerController(object):
"""WSGI Controller for the container server."""
# Ensure these are all lowercase
save_headers = ['x-container-read', 'x-container-write']
save_headers = ['x-container-read', 'x-container-write',
'x-container-sync-key', 'x-container-sync-to']
def __init__(self, conf):
self.logger = get_logger(conf, log_route='container-server')
@ -55,6 +56,9 @@ class ContainerController(object):
('true', 't', '1', 'on', 'yes', 'y')
self.node_timeout = int(conf.get('node_timeout', 3))
self.conn_timeout = float(conf.get('conn_timeout', 0.5))
self.allowed_sync_hosts = [h.strip()
for h in conf.get('allowed_sync_hosts', '127.0.0.1').split(',')
if h.strip()]
self.replicator_rpc = ReplicatorRpc(self.root, DATADIR,
ContainerBroker, self.mount_check, logger=self.logger)
@ -174,6 +178,11 @@ class ContainerController(object):
not check_float(req.headers['x-timestamp']):
return HTTPBadRequest(body='Missing timestamp', request=req,
content_type='text/plain')
if 'x-container-sync-to' in req.headers:
err = validate_sync_to(req.headers['x-container-sync-to'],
self.allowed_sync_hosts)
if err:
return HTTPBadRequest(err)
if self.mount_check and not check_mount(self.root, drive):
return Response(status='507 %s is not mounted' % drive)
timestamp = normalize_timestamp(req.headers['x-timestamp'])
@ -199,6 +208,11 @@ class ContainerController(object):
if key.lower() in self.save_headers or
key.lower().startswith('x-container-meta-'))
if metadata:
if 'X-Container-Sync-To' in metadata:
if 'X-Container-Sync-To' not in broker.metadata or \
metadata['X-Container-Sync-To'][0] != \
broker.metadata['X-Container-Sync-To'][0]:
broker.set_x_container_sync_points(-1, -1)
broker.update_metadata(metadata)
resp = self.account_update(req, account, container, broker)
if resp:
@ -232,7 +246,8 @@ class ContainerController(object):
}
headers.update((key, value)
for key, (value, timestamp) in broker.metadata.iteritems()
if value != '')
if value != '' and (key.lower() in self.save_headers or
key.lower().startswith('x-container-meta-')))
return HTTPNoContent(request=req, headers=headers)
def GET(self, req):
@ -259,7 +274,8 @@ class ContainerController(object):
}
resp_headers.update((key, value)
for key, (value, timestamp) in broker.metadata.iteritems()
if value != '')
if value != '' and (key.lower() in self.save_headers or
key.lower().startswith('x-container-meta-')))
try:
path = get_param(req, 'path')
prefix = get_param(req, 'prefix')
@ -368,6 +384,11 @@ class ContainerController(object):
not check_float(req.headers['x-timestamp']):
return HTTPBadRequest(body='Missing or bad timestamp',
request=req, content_type='text/plain')
if 'x-container-sync-to' in req.headers:
err = validate_sync_to(req.headers['x-container-sync-to'],
self.allowed_sync_hosts)
if err:
return HTTPBadRequest(err)
if self.mount_check and not check_mount(self.root, drive):
return Response(status='507 %s is not mounted' % drive)
broker = self._get_container_broker(drive, part, account, container)
@ -380,6 +401,11 @@ class ContainerController(object):
if key.lower() in self.save_headers or
key.lower().startswith('x-container-meta-'))
if metadata:
if 'X-Container-Sync-To' in metadata:
if 'X-Container-Sync-To' not in broker.metadata or \
metadata['X-Container-Sync-To'][0] != \
broker.metadata['X-Container-Sync-To'][0]:
broker.set_x_container_sync_points(-1, -1)
broker.update_metadata(metadata)
return HTTPNoContent(request=req)

424
swift/container/sync.py Normal file
View File

@ -0,0 +1,424 @@
# Copyright (c) 2010-2011 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from time import ctime, time
from random import random, shuffle
from struct import unpack_from
from eventlet import sleep
from swift.container import server as container_server
from swift.common.client import ClientException, delete_object, put_object, \
quote
from swift.common.direct_client import direct_get_object
from swift.common.ring import Ring
from swift.common.db import ContainerBroker
from swift.common.utils import audit_location_generator, get_logger, \
hash_path, normalize_timestamp, TRUE_VALUES, validate_sync_to, whataremyips
from swift.common.daemon import Daemon
class _Iter2FileLikeObject(object):
"""
Returns an iterator's contents via :func:`read`, making it look like a file
object.
"""
def __init__(self, iterator):
self.iterator = iterator
self._chunk = ''
def read(self, size=-1):
"""
read([size]) -> read at most size bytes, returned as a string.
If the size argument is negative or omitted, read until EOF is reached.
Notice that when in non-blocking mode, less data than what was
requested may be returned, even if no size parameter was given.
"""
if size < 0:
chunk = self._chunk
self._chunk = ''
return chunk + ''.join(self.iterator)
chunk = self._chunk
self._chunk = ''
if chunk and len(chunk) <= size:
return chunk
try:
chunk += self.iterator.next()
except StopIteration:
pass
if len(chunk) <= size:
return chunk
self._chunk = chunk[size:]
return chunk[:size]
class ContainerSync(Daemon):
"""
Daemon to sync syncable containers.
This is done by scanning the local devices for container databases and
checking for x-container-sync-to and x-container-sync-key metadata values.
If they exist, newer rows since the last sync will trigger PUTs or DELETEs
to the other container.
.. note::
Container sync will sync object POSTs only if the proxy server is set
to use "object_post_as_copy = true" which is the default. So-called
fast object posts, "object_post_as_copy = false" do not update the
container listings and therefore can't be detected for synchronization.
The actual syncing is slightly more complicated to make use of the three
(or number-of-replicas) main nodes for a container without each trying to
do the exact same work but also without missing work if one node happens to
be down.
Two sync points are kept per container database. All rows between the two
sync points trigger updates. Any rows newer than both sync points cause
updates depending on the node's position for the container (primary nodes
do one third, etc. depending on the replica count of course). After a sync
run, the first sync point is set to the newest ROWID known and the second
sync point is set to newest ROWID for which all updates have been sent.
An example may help. Assume replica count is 3 and perfectly matching
ROWIDs starting at 1.
First sync run, database has 6 rows:
* SyncPoint1 starts as -1.
* SyncPoint2 starts as -1.
* No rows between points, so no "all updates" rows.
* Six rows newer than SyncPoint1, so a third of the rows are sent
by node 1, another third by node 2, remaining third by node 3.
* SyncPoint1 is set as 6 (the newest ROWID known).
* SyncPoint2 is left as -1 since no "all updates" rows were synced.
Next sync run, database has 12 rows:
* SyncPoint1 starts as 6.
* SyncPoint2 starts as -1.
* The rows between -1 and 6 all trigger updates (most of which
should short-circuit on the remote end as having already been
done).
* Six more rows newer than SyncPoint1, so a third of the rows are
sent by node 1, another third by node 2, remaining third by node
3.
* SyncPoint1 is set as 12 (the newest ROWID known).
* SyncPoint2 is set as 6 (the newest "all updates" ROWID).
In this way, under normal circumstances each node sends its share of
updates each run and just sends a batch of older updates to ensure nothing
was missed.
:param conf: The dict of configuration values from the [container-sync]
section of the container-server.conf
:param container_ring: If None, the <swift_dir>/container.ring.gz will be
loaded. This is overridden by unit tests.
:param object_ring: If None, the <swift_dir>/object.ring.gz will be loaded.
This is overridden by unit tests.
"""
def __init__(self, conf, container_ring=None, object_ring=None):
#: The dict of configuration values from the [container-sync] section
#: of the container-server.conf.
self.conf = conf
#: Logger to use for container-sync log lines.
self.logger = get_logger(conf, log_route='container-sync')
#: Path to the local device mount points.
self.devices = conf.get('devices', '/srv/node')
#: Indicates whether mount points should be verified as actual mount
#: points (normally true, false for tests and SAIO).
self.mount_check = \
conf.get('mount_check', 'true').lower() in TRUE_VALUES
#: Minimum time between full scans. This is to keep the daemon from
#: running wild on near empty systems.
self.interval = int(conf.get('interval', 300))
#: Maximum amount of time to spend syncing a container before moving on
#: to the next one. If a conatiner sync hasn't finished in this time,
#: it'll just be resumed next scan.
self.container_time = int(conf.get('container_time', 60))
#: The list of hosts we're allowed to send syncs to.
self.allowed_sync_hosts = [h.strip()
for h in conf.get('allowed_sync_hosts', '127.0.0.1').split(',')
if h.strip()]
self.proxy = conf.get('sync_proxy')
#: Number of containers with sync turned on that were successfully
#: synced.
self.container_syncs = 0
#: Number of successful DELETEs triggered.
self.container_deletes = 0
#: Number of successful PUTs triggered.
self.container_puts = 0
#: Number of containers that didn't have sync turned on.
self.container_skips = 0
#: Number of containers that had a failure of some type.
self.container_failures = 0
#: Time of last stats report.
self.reported = time()
swift_dir = conf.get('swift_dir', '/etc/swift')
#: swift.common.ring.Ring for locating containers.
self.container_ring = container_ring or \
Ring(os.path.join(swift_dir, 'container.ring.gz'))
#: swift.common.ring.Ring for locating objects.
self.object_ring = object_ring or \
Ring(os.path.join(swift_dir, 'object.ring.gz'))
self._myips = whataremyips()
self._myport = int(conf.get('bind_port', 6001))
def run_forever(self):
"""
Runs container sync scans until stopped.
"""
sleep(random() * self.interval)
while True:
begin = time()
all_locs = audit_location_generator(self.devices,
container_server.DATADIR,
mount_check=self.mount_check,
logger=self.logger)
for path, device, partition in all_locs:
self.container_sync(path)
if time() - self.reported >= 3600: # once an hour
self.report()
elapsed = time() - begin
if elapsed < self.interval:
sleep(self.interval - elapsed)
def run_once(self):
"""
Runs a single container sync scan.
"""
self.logger.info(_('Begin container sync "once" mode'))
begin = time()
all_locs = audit_location_generator(self.devices,
container_server.DATADIR,
mount_check=self.mount_check,
logger=self.logger)
for path, device, partition in all_locs:
self.container_sync(path)
if time() - self.reported >= 3600: # once an hour
self.report()
self.report()
elapsed = time() - begin
self.logger.info(
_('Container sync "once" mode completed: %.02fs'), elapsed)
def report(self):
"""
Writes a report of the stats to the logger and resets the stats for the
next report.
"""
self.logger.info(
_('Since %(time)s: %(sync)s synced [%(delete)s deletes, %(put)s '
'puts], %(skip)s skipped, %(fail)s failed'),
{'time': ctime(self.reported),
'sync': self.container_syncs,
'delete': self.container_deletes,
'put': self.container_puts,
'skip': self.container_skips,
'fail': self.container_failures})
self.reported = time()
self.container_syncs = 0
self.container_deletes = 0
self.container_puts = 0
self.container_skips = 0
self.container_failures = 0
def container_sync(self, path):
"""
Checks the given path for a container database, determines if syncing
is turned on for that database and, if so, sends any updates to the
other container.
:param path: the path to a container db
"""
try:
if not path.endswith('.db'):
return
broker = ContainerBroker(path)
info = broker.get_info()
x, nodes = self.container_ring.get_nodes(info['account'],
info['container'])
for ordinal, node in enumerate(nodes):
if node['ip'] in self._myips and node['port'] == self._myport:
break
else:
return
if not broker.is_deleted():
sync_to = None
sync_key = None
sync_point1 = info['x_container_sync_point1']
sync_point2 = info['x_container_sync_point2']
for key, (value, timestamp) in broker.metadata.iteritems():
if key.lower() == 'x-container-sync-to':
sync_to = value
elif key.lower() == 'x-container-sync-key':
sync_key = value
if not sync_to or not sync_key:
self.container_skips += 1
return
sync_to = sync_to.rstrip('/')
err = validate_sync_to(sync_to, self.allowed_sync_hosts)
if err:
self.logger.info(
_('ERROR %(db_file)s: %(validate_sync_to_err)s'),
{'db_file': broker.db_file,
'validate_sync_to_err': err})
self.container_failures += 1
return
stop_at = time() + self.container_time
while time() < stop_at and sync_point2 < sync_point1:
rows = broker.get_items_since(sync_point2, 1)
if not rows:
break
row = rows[0]
if row['ROWID'] >= sync_point1:
break
key = hash_path(info['account'], info['container'],
row['name'], raw_digest=True)
# This node will only intially sync out one third of the
# objects (if 3 replicas, 1/4 if 4, etc.). This section
# will attempt to sync previously skipped rows in case the
# other nodes didn't succeed.
if unpack_from('>I', key)[0] % \
self.container_ring.replica_count != ordinal:
if not self.container_sync_row(row, sync_to, sync_key,
broker, info):
return
sync_point2 = row['ROWID']
broker.set_x_container_sync_points(None, sync_point2)
while time() < stop_at:
rows = broker.get_items_since(sync_point1, 1)
if not rows:
break
row = rows[0]
key = hash_path(info['account'], info['container'],
row['name'], raw_digest=True)
# This node will only intially sync out one third of the
# objects (if 3 replicas, 1/4 if 4, etc.). It'll come back
# around to the section above and attempt to sync
# previously skipped rows in case the other nodes didn't
# succeed.
if unpack_from('>I', key)[0] % \
self.container_ring.replica_count == ordinal:
if not self.container_sync_row(row, sync_to, sync_key,
broker, info):
return
sync_point1 = row['ROWID']
broker.set_x_container_sync_points(sync_point1, None)
self.container_syncs += 1
except Exception, err:
self.container_failures += 1
self.logger.exception(_('ERROR Syncing %s'), (broker.db_file))
def container_sync_row(self, row, sync_to, sync_key, broker, info):
"""
Sends the update the row indicates to the sync_to container.
:param row: The updated row in the local database triggering the sync
update.
:param sync_to: The URL to the remote container.
:param sync_key: The X-Container-Sync-Key to use when sending requests
to the other container.
:param broker: The local container database broker.
:param info: The get_info result from the local container database
broker.
:returns: True on success
"""
try:
if row['deleted']:
try:
delete_object(sync_to, name=row['name'],
headers={'x-timestamp': row['created_at'],
'x-container-sync-key': sync_key},
proxy=self.proxy)
except ClientException, err:
if err.http_status != 404:
raise
self.container_deletes += 1
else:
part, nodes = self.object_ring.get_nodes(
info['account'], info['container'],
row['name'])
shuffle(nodes)
exc = None
looking_for_timestamp = float(row['created_at'])
timestamp = -1
headers = body = None
for node in nodes:
try:
these_headers, this_body = direct_get_object(node,
part, info['account'], info['container'],
row['name'], resp_chunk_size=65536)
this_timestamp = float(these_headers['x-timestamp'])
if this_timestamp > timestamp:
timestamp = this_timestamp
headers = these_headers
body = this_body
except ClientException, err:
# If any errors are not 404, make sure we report the
# non-404 one. We don't want to mistakenly assume the
# object no longer exists just because one says so and
# the others errored for some other reason.
if not exc or exc.http_status == 404:
exc = err
if timestamp < looking_for_timestamp:
if exc:
raise exc
raise Exception(_('Unknown exception trying to GET: '
'%(node)r %(account)r %(container)r %(object)r'),
{'node': node, 'part': part,
'account': info['account'],
'container': info['container'],
'object': row['name']})
for key in ('date', 'last-modified'):
if key in headers:
del headers[key]
if 'etag' in headers:
headers['etag'] = headers['etag'].strip('"')
headers['x-timestamp'] = row['created_at']
headers['x-container-sync-key'] = sync_key
put_object(sync_to, name=row['name'], headers=headers,
contents=_Iter2FileLikeObject(body), proxy=self.proxy)
self.container_puts += 1
except ClientException, err:
if err.http_status == 401:
self.logger.info(_('Unauth %(sync_from)r '
'=> %(sync_to)r'),
{'sync_from': '%s/%s' %
(quote(info['account']), quote(info['container'])),
'sync_to': sync_to})
elif err.http_status == 404:
self.logger.info(_('Not found %(sync_from)r '
'=> %(sync_to)r'),
{'sync_from': '%s/%s' %
(quote(info['account']), quote(info['container'])),
'sync_to': sync_to})
else:
self.logger.exception(
_('ERROR Syncing %(db_file)s %(row)s'),
{'db_file': broker.db_file, 'row': row})
self.container_failures += 1
return False
except Exception, err:
self.logger.exception(
_('ERROR Syncing %(db_file)s %(row)s'),
{'db_file': broker.db_file, 'row': row})
self.container_failures += 1
return False
return True

View File

@ -500,6 +500,7 @@ class ObjectController(object):
return error_response
file = DiskFile(self.devices, device, partition, account, container,
obj, self.logger, disk_chunk_size=self.disk_chunk_size)
orig_timestamp = file.metadata.get('X-Timestamp')
upload_expiration = time.time() + self.max_upload_time
etag = md5()
upload_size = 0
@ -544,13 +545,16 @@ class ObjectController(object):
metadata[header_caps] = request.headers[header_key]
file.put(fd, tmppath, metadata)
file.unlinkold(metadata['X-Timestamp'])
self.container_update('PUT', account, container, obj, request.headers,
{'x-size': file.metadata['Content-Length'],
'x-content-type': file.metadata['Content-Type'],
'x-timestamp': file.metadata['X-Timestamp'],
'x-etag': file.metadata['ETag'],
'x-trans-id': request.headers.get('x-trans-id', '-')},
device)
if not orig_timestamp or \
orig_timestamp < request.headers['x-timestamp']:
self.container_update('PUT', account, container, obj,
request.headers,
{'x-size': file.metadata['Content-Length'],
'x-content-type': file.metadata['Content-Type'],
'x-timestamp': file.metadata['X-Timestamp'],
'x-etag': file.metadata['ETag'],
'x-trans-id': request.headers.get('x-trans-id', '-')},
device)
resp = HTTPCreated(request=request, etag=etag)
return resp
@ -657,6 +661,8 @@ class ObjectController(object):
response.headers[key] = value
response.etag = file.metadata['ETag']
response.last_modified = float(file.metadata['X-Timestamp'])
# Needed for container sync feature
response.headers['X-Timestamp'] = file.metadata['X-Timestamp']
response.content_length = file_size
if 'Content-Encoding' in file.metadata:
response.content_encoding = file.metadata['Content-Encoding']
@ -680,6 +686,7 @@ class ObjectController(object):
response_class = HTTPNoContent
file = DiskFile(self.devices, device, partition, account, container,
obj, self.logger, disk_chunk_size=self.disk_chunk_size)
orig_timestamp = file.metadata.get('X-Timestamp')
if file.is_deleted():
response_class = HTTPNotFound
metadata = {
@ -688,10 +695,12 @@ class ObjectController(object):
with file.mkstemp() as (fd, tmppath):
file.put(fd, tmppath, metadata, extension='.ts')
file.unlinkold(metadata['X-Timestamp'])
self.container_update('DELETE', account, container, obj,
request.headers, {'x-timestamp': metadata['X-Timestamp'],
'x-trans-id': request.headers.get('x-trans-id', '-')},
device)
if not orig_timestamp or \
orig_timestamp < request.headers['x-timestamp']:
self.container_update('DELETE', account, container, obj,
request.headers, {'x-timestamp': metadata['X-Timestamp'],
'x-trans-id': request.headers.get('x-trans-id', '-')},
device)
resp = response_class(request=request)
return resp

View File

@ -42,7 +42,7 @@ from webob import Request, Response
from swift.common.ring import Ring
from swift.common.utils import cache_from_env, ContextPool, get_logger, \
normalize_timestamp, split_path, TRUE_VALUES
get_remote_client, normalize_timestamp, split_path, TRUE_VALUES
from swift.common.bufferedhttp import http_connect
from swift.common.constraints import check_metadata, check_object_creation, \
check_utf8, CONTAINER_LISTING_LIMIT, MAX_ACCOUNT_NAME_LENGTH, \
@ -418,8 +418,8 @@ class Controller(object):
:param account: account name for the container
:param container: container name to look up
:returns: tuple of (container partition, container nodes, container
read acl, container write acl) or (None, None, None, None) if
the container does not exist
read acl, container write acl, container sync key) or (None,
None, None, None, None) if the container does not exist
"""
partition, nodes = self.app.container_ring.get_nodes(
account, container)
@ -431,15 +431,17 @@ class Controller(object):
status = cache_value['status']
read_acl = cache_value['read_acl']
write_acl = cache_value['write_acl']
sync_key = cache_value.get('sync_key')
if status == 200:
return partition, nodes, read_acl, write_acl
return partition, nodes, read_acl, write_acl, sync_key
elif status == 404:
return None, None, None, None
return None, None, None, None, None
if not self.account_info(account, autocreate=account_autocreate)[1]:
return None, None, None, None
return None, None, None, None, None
result_code = 0
read_acl = None
write_acl = None
sync_key = None
container_size = None
attempts_left = self.app.container_ring.replica_count
headers = {'x-trans-id': self.trans_id}
@ -455,6 +457,7 @@ class Controller(object):
result_code = 200
read_acl = resp.getheader('x-container-read')
write_acl = resp.getheader('x-container-write')
sync_key = resp.getheader('x-container-sync-key')
container_size = \
resp.getheader('X-Container-Object-Count')
break
@ -483,11 +486,12 @@ class Controller(object):
{'status': result_code,
'read_acl': read_acl,
'write_acl': write_acl,
'sync_key': sync_key,
'container_size': container_size},
timeout=cache_timeout)
if result_code == 200:
return partition, nodes, read_acl, write_acl
return None, None, None, None
return partition, nodes, read_acl, write_acl, sync_key
return None, None, None, None, None
def iter_nodes(self, partition, nodes, ring):
"""
@ -684,6 +688,9 @@ class Controller(object):
raise
res.app_iter = file_iter()
update_headers(res, source.getheaders())
# Used by container sync feature
res.environ['swift_x_timestamp'] = \
source.getheader('x-timestamp')
update_headers(res, {'accept-ranges': 'bytes'})
res.status = source.status
res.content_length = source.getheader('Content-Length')
@ -694,6 +701,9 @@ class Controller(object):
elif 200 <= source.status <= 399:
res = status_map[source.status](request=req)
update_headers(res, source.getheaders())
# Used by container sync feature
res.environ['swift_x_timestamp'] = \
source.getheader('x-timestamp')
update_headers(res, {'accept-ranges': 'bytes'})
if req.method == 'HEAD':
res.content_length = source.getheader('Content-Length')
@ -902,7 +912,7 @@ class ObjectController(Controller):
error_response = check_metadata(req, 'object')
if error_response:
return error_response
container_partition, containers, _junk, req.acl = \
container_partition, containers, _junk, req.acl, _junk = \
self.container_info(self.account_name, self.container_name,
account_autocreate=self.app.account_autocreate)
if 'swift.authorize' in req.environ:
@ -960,7 +970,8 @@ class ObjectController(Controller):
@delay_denial
def PUT(self, req):
"""HTTP PUT request handler."""
container_partition, containers, _junk, req.acl = \
(container_partition, containers, _junk, req.acl,
req.environ['swift_sync_key']) = \
self.container_info(self.account_name, self.container_name,
account_autocreate=self.app.account_autocreate)
if 'swift.authorize' in req.environ:
@ -971,7 +982,27 @@ class ObjectController(Controller):
return HTTPNotFound(request=req)
partition, nodes = self.app.object_ring.get_nodes(
self.account_name, self.container_name, self.object_name)
req.headers['X-Timestamp'] = normalize_timestamp(time.time())
# Used by container sync feature
if 'x-timestamp' in req.headers:
try:
req.headers['X-Timestamp'] = \
normalize_timestamp(float(req.headers['x-timestamp']))
# For container sync PUTs, do a HEAD to see if we can
# shortcircuit
hreq = Request.blank(req.path_info,
environ={'REQUEST_METHOD': 'HEAD'})
self.GETorHEAD_base(hreq, _('Object'), partition, nodes,
hreq.path_info, self.app.object_ring.replica_count)
if 'swift_x_timestamp' in hreq.environ and \
float(hreq.environ['swift_x_timestamp']) >= \
float(req.headers['x-timestamp']):
return HTTPAccepted(request=req)
except ValueError:
return HTTPBadRequest(request=req, content_type='text/plain',
body='X-Timestamp should be a UNIX timestamp float value; '
'was %r' % req.headers['x-timestamp'])
else:
req.headers['X-Timestamp'] = normalize_timestamp(time.time())
# Sometimes the 'content-type' header exists, but is set to None.
content_type_manually_set = True
if not req.headers.get('content-type'):
@ -1151,7 +1182,8 @@ class ObjectController(Controller):
@delay_denial
def DELETE(self, req):
"""HTTP DELETE request handler."""
container_partition, containers, _junk, req.acl = \
(container_partition, containers, _junk, req.acl,
req.environ['swift_sync_key']) = \
self.container_info(self.account_name, self.container_name)
if 'swift.authorize' in req.environ:
aresp = req.environ['swift.authorize'](req)
@ -1161,7 +1193,17 @@ class ObjectController(Controller):
return HTTPNotFound(request=req)
partition, nodes = self.app.object_ring.get_nodes(
self.account_name, self.container_name, self.object_name)
req.headers['X-Timestamp'] = normalize_timestamp(time.time())
# Used by container sync feature
if 'x-timestamp' in req.headers:
try:
req.headers['X-Timestamp'] = \
normalize_timestamp(float(req.headers['x-timestamp']))
except ValueError:
return HTTPBadRequest(request=req, content_type='text/plain',
body='X-Timestamp should be a UNIX timestamp float value; '
'was %r' % req.headers['x-timestamp'])
else:
req.headers['X-Timestamp'] = normalize_timestamp(time.time())
headers = []
for container in containers:
nheaders = dict(req.headers.iteritems())
@ -1207,7 +1249,8 @@ class ContainerController(Controller):
server_type = _('Container')
# Ensure these are all lowercase
pass_through_headers = ['x-container-read', 'x-container-write']
pass_through_headers = ['x-container-read', 'x-container-write',
'x-container-sync-key', 'x-container-sync-to']
def __init__(self, app, account_name, container_name, **kwargs):
Controller.__init__(self, app)
@ -1244,6 +1287,7 @@ class ContainerController(Controller):
{'status': resp.status_int,
'read_acl': resp.headers.get('x-container-read'),
'write_acl': resp.headers.get('x-container-write'),
'sync_key': resp.headers.get('x-container-sync-key'),
'container_size': resp.headers.get('x-container-object-count')},
timeout=self.app.recheck_container_existence)
@ -1252,6 +1296,11 @@ class ContainerController(Controller):
aresp = req.environ['swift.authorize'](req)
if aresp:
return aresp
if not req.environ.get('swift_owner', False):
for key in ('x-container-read', 'x-container-write',
'x-container-sync-key', 'x-container-sync-to'):
if key in resp.headers:
del resp.headers[key]
return resp
@public
@ -1646,13 +1695,7 @@ class Application(BaseApplication):
the_request = quote(unquote(req.path))
if req.query_string:
the_request = the_request + '?' + req.query_string
# remote user for zeus
client = req.headers.get('x-cluster-client-ip')
if not client and 'x-forwarded-for' in req.headers:
# remote user for other lbs
client = req.headers['x-forwarded-for'].split(',')[0].strip()
if not client:
client = req.remote_addr
client = get_remote_client(req)
logged_headers = None
if self.log_headers:
logged_headers = '\n'.join('%s: %s' % (k, v)

View File

@ -61,8 +61,8 @@ def reset_environment():
print 'Giving up after %s retries.' % attempt
raise err
print err
print 'Retrying in 1 second...'
sleep(1)
print 'Retrying in 2 seconds...'
sleep(2)
except BaseException, err:
kill_pids(pids)
raise err

View File

@ -139,13 +139,16 @@ def mock_time():
class TestRateLimit(unittest.TestCase):
def setUp(self):
def _reset_time(self):
global time_ticker
time_ticker = 0
def setUp(self):
self.was_sleep = eventlet.sleep
eventlet.sleep = mock_sleep
self.was_time = time.time
time.time = mock_time
self._reset_time()
def tearDown(self):
eventlet.sleep = self.was_sleep
@ -186,31 +189,34 @@ class TestRateLimit(unittest.TestCase):
logger=FakeLogger())
the_app.memcache_client = fake_memcache
self.assertEquals(len(the_app.get_ratelimitable_key_tuples(
'GET', 'a', None, None)), 1)
self.assertEquals(len(the_app.get_ratelimitable_key_tuples(
'POST', 'a', 'c', None)), 0)
'DELETE', 'a', None, None)), 0)
self.assertEquals(len(the_app.get_ratelimitable_key_tuples(
'PUT', 'a', 'c', None)), 1)
self.assertEquals(len(the_app.get_ratelimitable_key_tuples(
'GET', 'a', 'c', None)), 1)
'DELETE', 'a', 'c', None)), 1)
self.assertEquals(len(the_app.get_ratelimitable_key_tuples(
'GET', 'a', 'c', 'o')), 0)
self.assertEquals(len(the_app.get_ratelimitable_key_tuples(
'PUT', 'a', 'c', 'o')), 1)
def test_ratelimit(self):
def test_account_ratelimit(self):
current_rate = 5
num_calls = 50
conf_dict = {'account_ratelimit': current_rate}
self.test_ratelimit = ratelimit.filter_factory(conf_dict)(FakeApp())
ratelimit.http_connect = mock_http_connect(204)
req = Request.blank('/v/a')
req.environ['swift.cache'] = FakeMemcache()
make_app_call = lambda: self.test_ratelimit(req.environ,
start_response)
begin = time.time()
self._run(make_app_call, num_calls, current_rate)
self.assertEquals(round(time.time() - begin, 1), 9.8)
for meth, exp_time in [('DELETE', 9.8), ('GET', 0),
('POST', 0), ('PUT', 9.8)]:
req = Request.blank('/v/a%s/c' % meth)
req.method = meth
req.environ['swift.cache'] = FakeMemcache()
make_app_call = lambda: self.test_ratelimit(req.environ,
start_response)
begin = time.time()
self._run(make_app_call, num_calls, current_rate,
check_time=bool(exp_time))
self.assertEquals(round(time.time() - begin, 1), exp_time)
self._reset_time()
def test_ratelimit_set_incr(self):
current_rate = 5
@ -218,7 +224,8 @@ class TestRateLimit(unittest.TestCase):
conf_dict = {'account_ratelimit': current_rate}
self.test_ratelimit = ratelimit.filter_factory(conf_dict)(FakeApp())
ratelimit.http_connect = mock_http_connect(204)
req = Request.blank('/v/a')
req = Request.blank('/v/a/c')
req.method = 'PUT'
req.environ['swift.cache'] = FakeMemcache()
req.environ['swift.cache'].init_incr_return_neg = True
make_app_call = lambda: self.test_ratelimit(req.environ,
@ -306,7 +313,8 @@ class TestRateLimit(unittest.TestCase):
self.test_ratelimit = dummy_filter_factory(conf_dict)(FakeApp())
ratelimit.http_connect = mock_http_connect(204)
self.test_ratelimit.log_sleep_time_seconds = .00001
req = Request.blank('/v/a')
req = Request.blank('/v/a/c')
req.method = 'PUT'
req.environ['swift.cache'] = FakeMemcache()
time_override = [0, 0, 0, 0, None]
@ -335,7 +343,7 @@ class TestRateLimit(unittest.TestCase):
logger=FakeLogger())
the_app.memcache_client = fake_memcache
req = lambda: None
req.method = 'GET'
req.method = 'PUT'
class rate_caller(Thread):
@ -346,7 +354,7 @@ class TestRateLimit(unittest.TestCase):
def run(self):
for j in range(num_calls):
self.result = the_app.handle_ratelimit(req, self.myname,
None, None)
'c', None)
nt = 15
begin = time.time()
@ -361,45 +369,6 @@ class TestRateLimit(unittest.TestCase):
time_took = time.time() - begin
self.assertEquals(1.5, round(time_took, 1))
def test_ratelimit_acc_vrs_container(self):
conf_dict = {'clock_accuracy': 1000,
'account_ratelimit': 10,
'max_sleep_time_seconds': 4,
'container_ratelimit_10': 6,
'container_ratelimit_50': 2,
'container_ratelimit_75': 1}
self.test_ratelimit = dummy_filter_factory(conf_dict)(FakeApp())
ratelimit.http_connect = mock_http_connect(204)
req = Request.blank('/v/a/c')
req.environ['swift.cache'] = FakeMemcache()
cont_key = get_container_memcache_key('a', 'c')
class rate_caller(Thread):
def __init__(self, parent, name):
Thread.__init__(self)
self.parent = parent
self.name = name
def run(self):
self.result = self.parent.test_ratelimit(req.environ,
start_response)
def runthreads(threads, nt):
for i in range(nt):
rc = rate_caller(self, "thread %s" % i)
rc.start()
threads.append(rc)
for thread in threads:
thread.join()
begin = time.time()
req.environ['swift.cache'].set(cont_key, {'container_size': 20})
begin = time.time()
threads = []
runthreads(threads, 3)
time_took = time.time() - begin
self.assertEquals(round(time_took, 1), .4)
def test_call_invalid_path(self):
env = {'REQUEST_METHOD': 'GET',
'SCRIPT_NAME': '',
@ -441,7 +410,8 @@ class TestRateLimit(unittest.TestCase):
conf_dict = {'account_ratelimit': current_rate}
self.test_ratelimit = ratelimit.filter_factory(conf_dict)(FakeApp())
ratelimit.http_connect = mock_http_connect(204)
req = Request.blank('/v/a')
req = Request.blank('/v/a/c')
req.method = 'PUT'
req.environ['swift.cache'] = FakeMemcache()
req.environ['swift.cache'].error_on_incr = True
make_app_call = lambda: self.test_ratelimit(req.environ,

View File

@ -573,6 +573,15 @@ class TestSwift3(unittest.TestCase):
verify('7506d97002c7d2de922cc0ec34af8846', '/bucket/object',
{'Content-Type': None, 'X-Amz-Something': 'test'})
verify('28f76d6162444a193b612cd6cb20e0be', '/bucket/object',
{'Content-Type': None,
'X-Amz-Date': 'Mon, 11 Jul 2011 10:52:57 +0000',
'Date': 'Tue, 12 Jul 2011 10:52:57 +0000'})
verify('ed6971e3eca5af4ee361f05d7c272e49', '/bucket/object',
{'Content-Type': None,
'Date': 'Tue, 12 Jul 2011 10:52:57 +0000'})
req1 = Request.blank('/', headers=
{'Content-Type': None, 'X-Amz-Something': 'test'})
req2 = Request.blank('/', headers=

View File

@ -56,15 +56,21 @@ class FakeMemcache(object):
class FakeApp(object):
def __init__(self, status_headers_body_iter=None):
def __init__(self, status_headers_body_iter=None, acl=None, sync_key=None):
self.calls = 0
self.status_headers_body_iter = status_headers_body_iter
if not self.status_headers_body_iter:
self.status_headers_body_iter = iter([('404 Not Found', {}, '')])
self.acl = acl
self.sync_key = sync_key
def __call__(self, env, start_response):
self.calls += 1
self.request = Request.blank('', environ=env)
if self.acl:
self.request.acl = self.acl
if self.sync_key:
self.request.environ['swift_sync_key'] = self.sync_key
if 'swift.authorize' in env:
resp = env['swift.authorize'](self.request)
if resp:
@ -138,7 +144,8 @@ class TestAuth(unittest.TestCase):
self.assertEquals(resp.status_int, 404)
def test_anon(self):
resp = self._make_request('/v1/AUTH_account').get_response(self.test_auth)
resp = \
self._make_request('/v1/AUTH_account').get_response(self.test_auth)
self.assertEquals(resp.status_int, 401)
self.assertEquals(resp.environ['swift.authorize'],
self.test_auth.authorize)
@ -289,30 +296,35 @@ class TestAuth(unittest.TestCase):
self.assertEquals(self.test_auth.authorize(req), None)
def test_account_put_permissions(self):
req = self._make_request('/v1/AUTH_new', environ={'REQUEST_METHOD': 'PUT'})
req = self._make_request('/v1/AUTH_new',
environ={'REQUEST_METHOD': 'PUT'})
req.remote_user = 'act:usr,act'
resp = self.test_auth.authorize(req)
self.assertEquals(resp.status_int, 403)
req = self._make_request('/v1/AUTH_new', environ={'REQUEST_METHOD': 'PUT'})
req = self._make_request('/v1/AUTH_new',
environ={'REQUEST_METHOD': 'PUT'})
req.remote_user = 'act:usr,act,AUTH_other'
resp = self.test_auth.authorize(req)
self.assertEquals(resp.status_int, 403)
# Even PUTs to your own account as account admin should fail
req = self._make_request('/v1/AUTH_old', environ={'REQUEST_METHOD': 'PUT'})
req = self._make_request('/v1/AUTH_old',
environ={'REQUEST_METHOD': 'PUT'})
req.remote_user = 'act:usr,act,AUTH_old'
resp = self.test_auth.authorize(req)
self.assertEquals(resp.status_int, 403)
req = self._make_request('/v1/AUTH_new', environ={'REQUEST_METHOD': 'PUT'})
req = self._make_request('/v1/AUTH_new',
environ={'REQUEST_METHOD': 'PUT'})
req.remote_user = 'act:usr,act,.reseller_admin'
resp = self.test_auth.authorize(req)
self.assertEquals(resp, None)
# .super_admin is not something the middleware should ever see or care
# about
req = self._make_request('/v1/AUTH_new', environ={'REQUEST_METHOD': 'PUT'})
req = self._make_request('/v1/AUTH_new',
environ={'REQUEST_METHOD': 'PUT'})
req.remote_user = 'act:usr,act,.super_admin'
resp = self.test_auth.authorize(req)
self.assertEquals(resp.status_int, 403)
@ -383,6 +395,152 @@ class TestAuth(unittest.TestCase):
headers={'X-Auth-User': 'act:usr'}).get_response(self.test_auth)
self.assertEquals(resp.status_int, 401)
def test_allowed_sync_hosts(self):
a = auth.filter_factory({'super_admin_key': 'supertest'})(FakeApp())
self.assertEquals(a.allowed_sync_hosts, ['127.0.0.1'])
a = auth.filter_factory({'super_admin_key': 'supertest',
'allowed_sync_hosts':
'1.1.1.1,2.1.1.1, 3.1.1.1 , 4.1.1.1,, , 5.1.1.1'})(FakeApp())
self.assertEquals(a.allowed_sync_hosts,
['1.1.1.1', '2.1.1.1', '3.1.1.1', '4.1.1.1', '5.1.1.1'])
def test_reseller_admin_is_owner(self):
orig_authorize = self.test_auth.authorize
owner_values = []
def mitm_authorize(req):
rv = orig_authorize(req)
owner_values.append(req.environ.get('swift_owner', False))
return rv
self.test_auth.authorize = mitm_authorize
req = self._make_request('/v1/AUTH_cfa',
headers={'X-Auth-Token': 'AUTH_t'})
req.remote_user = '.reseller_admin'
self.test_auth.authorize(req)
self.assertEquals(owner_values, [True])
def test_admin_is_owner(self):
orig_authorize = self.test_auth.authorize
owner_values = []
def mitm_authorize(req):
rv = orig_authorize(req)
owner_values.append(req.environ.get('swift_owner', False))
return rv
self.test_auth.authorize = mitm_authorize
req = self._make_request('/v1/AUTH_cfa',
headers={'X-Auth-Token': 'AUTH_t'})
req.remote_user = 'AUTH_cfa'
self.test_auth.authorize(req)
self.assertEquals(owner_values, [True])
def test_regular_is_not_owner(self):
orig_authorize = self.test_auth.authorize
owner_values = []
def mitm_authorize(req):
rv = orig_authorize(req)
owner_values.append(req.environ.get('swift_owner', False))
return rv
self.test_auth.authorize = mitm_authorize
req = self._make_request('/v1/AUTH_cfa/c',
headers={'X-Auth-Token': 'AUTH_t'})
req.remote_user = 'act:usr'
self.test_auth.authorize(req)
self.assertEquals(owner_values, [False])
def test_sync_request_success(self):
self.test_auth.app = FakeApp(iter([('204 No Content', {}, '')]),
sync_key='secret')
req = self._make_request('/v1/AUTH_cfa/c/o',
environ={'REQUEST_METHOD': 'DELETE'},
headers={'x-container-sync-key': 'secret',
'x-timestamp': '123.456'})
req.remote_addr = '127.0.0.1'
resp = req.get_response(self.test_auth)
self.assertEquals(resp.status_int, 204)
def test_sync_request_fail_key(self):
self.test_auth.app = FakeApp(iter([('204 No Content', {}, '')]),
sync_key='secret')
req = self._make_request('/v1/AUTH_cfa/c/o',
environ={'REQUEST_METHOD': 'DELETE'},
headers={'x-container-sync-key': 'wrongsecret',
'x-timestamp': '123.456'})
req.remote_addr = '127.0.0.1'
resp = req.get_response(self.test_auth)
self.assertEquals(resp.status_int, 401)
self.test_auth.app = FakeApp(iter([('204 No Content', {}, '')]),
sync_key='othersecret')
req = self._make_request('/v1/AUTH_cfa/c/o',
environ={'REQUEST_METHOD': 'DELETE'},
headers={'x-container-sync-key': 'secret',
'x-timestamp': '123.456'})
req.remote_addr = '127.0.0.1'
resp = req.get_response(self.test_auth)
self.assertEquals(resp.status_int, 401)
self.test_auth.app = FakeApp(iter([('204 No Content', {}, '')]),
sync_key=None)
req = self._make_request('/v1/AUTH_cfa/c/o',
environ={'REQUEST_METHOD': 'DELETE'},
headers={'x-container-sync-key': 'secret',
'x-timestamp': '123.456'})
req.remote_addr = '127.0.0.1'
resp = req.get_response(self.test_auth)
self.assertEquals(resp.status_int, 401)
def test_sync_request_fail_no_timestamp(self):
self.test_auth.app = FakeApp(iter([('204 No Content', {}, '')]),
sync_key='secret')
req = self._make_request('/v1/AUTH_cfa/c/o',
environ={'REQUEST_METHOD': 'DELETE'},
headers={'x-container-sync-key': 'secret'})
req.remote_addr = '127.0.0.1'
resp = req.get_response(self.test_auth)
self.assertEquals(resp.status_int, 401)
def test_sync_request_fail_sync_host(self):
self.test_auth.app = FakeApp(iter([('204 No Content', {}, '')]),
sync_key='secret')
req = self._make_request('/v1/AUTH_cfa/c/o',
environ={'REQUEST_METHOD': 'DELETE'},
headers={'x-container-sync-key': 'secret',
'x-timestamp': '123.456'})
req.remote_addr = '127.0.0.2'
resp = req.get_response(self.test_auth)
self.assertEquals(resp.status_int, 401)
def test_sync_request_success_lb_sync_host(self):
self.test_auth.app = FakeApp(iter([('204 No Content', {}, '')]),
sync_key='secret')
req = self._make_request('/v1/AUTH_cfa/c/o',
environ={'REQUEST_METHOD': 'DELETE'},
headers={'x-container-sync-key': 'secret',
'x-timestamp': '123.456',
'x-forwarded-for': '127.0.0.1'})
req.remote_addr = '127.0.0.2'
resp = req.get_response(self.test_auth)
self.assertEquals(resp.status_int, 204)
self.test_auth.app = FakeApp(iter([('204 No Content', {}, '')]),
sync_key='secret')
req = self._make_request('/v1/AUTH_cfa/c/o',
environ={'REQUEST_METHOD': 'DELETE'},
headers={'x-container-sync-key': 'secret',
'x-timestamp': '123.456',
'x-cluster-client-ip': '127.0.0.1'})
req.remote_addr = '127.0.0.2'
resp = req.get_response(self.test_auth)
self.assertEquals(resp.status_int, 204)
class TestParseUserCreation(unittest.TestCase):
def test_parse_user_creation(self):

View File

@ -155,8 +155,8 @@ class MockHttpTest(unittest.TestCase):
def fake_http_connection(*args, **kwargs):
_orig_http_connection = c.http_connection
def wrapper(url):
parsed, _conn = _orig_http_connection(url)
def wrapper(url, proxy=None):
parsed, _conn = _orig_http_connection(url, proxy=proxy)
conn = fake_http_connect(*args, **kwargs)()
def request(*args, **kwargs):
@ -430,7 +430,7 @@ class TestConnection(MockHttpTest):
def read(self, *args, **kwargs):
return ''
def local_http_connection(url):
def local_http_connection(url, proxy=None):
parsed = urlparse(url)
return parsed, LocalConnection()

View File

@ -893,6 +893,23 @@ class TestContainerBroker(unittest.TestCase):
self.assertEquals(info['object_count'], 0)
self.assertEquals(info['bytes_used'], 0)
info = broker.get_info()
self.assertEquals(info['x_container_sync_point1'], -1)
self.assertEquals(info['x_container_sync_point2'], -1)
def test_set_x_syncs(self):
broker = ContainerBroker(':memory:', account='test1', container='test2')
broker.initialize(normalize_timestamp('1'))
info = broker.get_info()
self.assertEquals(info['x_container_sync_point1'], -1)
self.assertEquals(info['x_container_sync_point2'], -1)
broker.set_x_container_sync_points(1, 2)
info = broker.get_info()
self.assertEquals(info['x_container_sync_point1'], 1)
self.assertEquals(info['x_container_sync_point2'], 2)
def test_get_report_info(self):
broker = ContainerBroker(':memory:', account='test1', container='test2')
broker.initialize(normalize_timestamp('1'))
@ -1352,6 +1369,81 @@ class TestContainerBrokerBeforeMetadata(TestContainerBroker):
conn.execute('SELECT metadata FROM container_stat')
def prexsync_create_container_stat_table(self, conn, put_timestamp=None):
"""
Copied from swift.common.db.ContainerBroker before the
x_container_sync_point[12] columns were added; used for testing with
TestContainerBrokerBeforeXSync.
Create the container_stat table which is specifc to the container DB.
:param conn: DB connection object
:param put_timestamp: put timestamp
"""
if put_timestamp is None:
put_timestamp = normalize_timestamp(0)
conn.executescript("""
CREATE TABLE container_stat (
account TEXT,
container TEXT,
created_at TEXT,
put_timestamp TEXT DEFAULT '0',
delete_timestamp TEXT DEFAULT '0',
object_count INTEGER,
bytes_used INTEGER,
reported_put_timestamp TEXT DEFAULT '0',
reported_delete_timestamp TEXT DEFAULT '0',
reported_object_count INTEGER DEFAULT 0,
reported_bytes_used INTEGER DEFAULT 0,
hash TEXT default '00000000000000000000000000000000',
id TEXT,
status TEXT DEFAULT '',
status_changed_at TEXT DEFAULT '0',
metadata TEXT DEFAULT ''
);
INSERT INTO container_stat (object_count, bytes_used)
VALUES (0, 0);
""")
conn.execute('''
UPDATE container_stat
SET account = ?, container = ?, created_at = ?, id = ?,
put_timestamp = ?
''', (self.account, self.container, normalize_timestamp(time()),
str(uuid4()), put_timestamp))
class TestContainerBrokerBeforeXSync(TestContainerBroker):
"""
Tests for swift.common.db.ContainerBroker against databases created before
the x_container_sync_point[12] columns were added.
"""
def setUp(self):
self._imported_create_container_stat_table = \
ContainerBroker.create_container_stat_table
ContainerBroker.create_container_stat_table = \
prexsync_create_container_stat_table
broker = ContainerBroker(':memory:', account='a', container='c')
broker.initialize(normalize_timestamp('1'))
exc = None
with broker.get() as conn:
try:
conn.execute('''SELECT x_container_sync_point1
FROM container_stat''')
except BaseException, err:
exc = err
self.assert_('no such column: x_container_sync_point1' in str(exc))
def tearDown(self):
ContainerBroker.create_container_stat_table = \
self._imported_create_container_stat_table
broker = ContainerBroker(':memory:', account='a', container='c')
broker.initialize(normalize_timestamp('1'))
with broker.get() as conn:
conn.execute('SELECT x_container_sync_point1 FROM container_stat')
class TestAccountBroker(unittest.TestCase):
""" Tests for swift.common.db.AccountBroker """

View File

@ -768,6 +768,26 @@ log_name = yarr'''
self.assertEquals(utils.human_readable(1237940039285380274899124224),
'1024Yi')
def test_validate_sync_to(self):
for goodurl in ('http://1.1.1.1/v1/a/c/o',
'http://1.1.1.1:8080/a/c/o',
'http://2.2.2.2/a/c/o',
'https://1.1.1.1/v1/a/c/o'):
self.assertEquals(utils.validate_sync_to(goodurl,
['1.1.1.1', '2.2.2.2']),
None)
for badurl in ('http://1.1.1.1',
'httpq://1.1.1.1/v1/a/c/o',
'http://1.1.1.1/v1/a/c/o?query',
'http://1.1.1.1/v1/a/c/o#frag',
'http://1.1.1.1/v1/a/c/o?query#frag',
'http://1.1.1.1/v1/a/c/o?query=param',
'http://1.1.1.1/v1/a/c/o?query=param#frag',
'http://1.1.1.2/v1/a/c/o'):
self.assertNotEquals(utils.validate_sync_to(badurl,
['1.1.1.1', '2.2.2.2']),
None)
def test_TRUE_VALUES(self):
for v in utils.TRUE_VALUES:
self.assertEquals(v, v.lower())

View File

@ -324,6 +324,76 @@ class TestContainerController(unittest.TestCase):
raise Exception(err)
self.assert_(not got_exc)
def test_PUT_reset_container_sync(self):
req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'PUT'},
headers={'x-timestamp': '1',
'x-container-sync-to': 'http://127.0.0.1:12345/v1/a/c'})
resp = self.controller.PUT(req)
self.assertEquals(resp.status_int, 201)
db = self.controller._get_container_broker('sda1', 'p', 'a', 'c')
info = db.get_info()
self.assertEquals(info['x_container_sync_point1'], -1)
self.assertEquals(info['x_container_sync_point2'], -1)
db.set_x_container_sync_points(123, 456)
info = db.get_info()
self.assertEquals(info['x_container_sync_point1'], 123)
self.assertEquals(info['x_container_sync_point2'], 456)
# Set to same value
req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'PUT'},
headers={'x-timestamp': '1',
'x-container-sync-to': 'http://127.0.0.1:12345/v1/a/c'})
resp = self.controller.PUT(req)
self.assertEquals(resp.status_int, 202)
db = self.controller._get_container_broker('sda1', 'p', 'a', 'c')
info = db.get_info()
self.assertEquals(info['x_container_sync_point1'], 123)
self.assertEquals(info['x_container_sync_point2'], 456)
# Set to new value
req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'PUT'},
headers={'x-timestamp': '1',
'x-container-sync-to': 'http://127.0.0.1:12345/v1/a/c2'})
resp = self.controller.PUT(req)
self.assertEquals(resp.status_int, 202)
db = self.controller._get_container_broker('sda1', 'p', 'a', 'c')
info = db.get_info()
self.assertEquals(info['x_container_sync_point1'], -1)
self.assertEquals(info['x_container_sync_point2'], -1)
def test_POST_reset_container_sync(self):
req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'PUT'},
headers={'x-timestamp': '1',
'x-container-sync-to': 'http://127.0.0.1:12345/v1/a/c'})
resp = self.controller.PUT(req)
self.assertEquals(resp.status_int, 201)
db = self.controller._get_container_broker('sda1', 'p', 'a', 'c')
info = db.get_info()
self.assertEquals(info['x_container_sync_point1'], -1)
self.assertEquals(info['x_container_sync_point2'], -1)
db.set_x_container_sync_points(123, 456)
info = db.get_info()
self.assertEquals(info['x_container_sync_point1'], 123)
self.assertEquals(info['x_container_sync_point2'], 456)
# Set to same value
req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'POST'},
headers={'x-timestamp': '1',
'x-container-sync-to': 'http://127.0.0.1:12345/v1/a/c'})
resp = self.controller.POST(req)
self.assertEquals(resp.status_int, 204)
db = self.controller._get_container_broker('sda1', 'p', 'a', 'c')
info = db.get_info()
self.assertEquals(info['x_container_sync_point1'], 123)
self.assertEquals(info['x_container_sync_point2'], 456)
# Set to new value
req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'POST'},
headers={'x-timestamp': '1',
'x-container-sync-to': 'http://127.0.0.1:12345/v1/a/c2'})
resp = self.controller.POST(req)
self.assertEquals(resp.status_int, 204)
db = self.controller._get_container_broker('sda1', 'p', 'a', 'c')
info = db.get_info()
self.assertEquals(info['x_container_sync_point1'], -1)
self.assertEquals(info['x_container_sync_point2'], -1)
def test_DELETE(self):
req = Request.blank('/sda1/p/a/c',
environ={'REQUEST_METHOD': 'PUT'}, headers={'X-Timestamp': '1'})

View File

@ -0,0 +1,824 @@
# Copyright (c) 2010-2011 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
from swift.container import sync
from swift.common import utils
from swift.common.client import ClientException
utils.HASH_PATH_SUFFIX = 'endcap'
class FakeRing(object):
def __init__(self):
self.replica_count = 3
self.devs = [{'ip': '10.0.0.%s' % x, 'port': 1000 + x, 'device': 'sda'}
for x in xrange(3)]
def get_nodes(self, account, container=None, obj=None):
return 1, list(self.devs)
class FakeContainerBroker(object):
def __init__(self, path, metadata=None, info=None, deleted=False,
items_since=None):
self.db_file = path
self.metadata = metadata if metadata else {}
self.info = info if info else {}
self.deleted = deleted
self.items_since = items_since if items_since else []
self.sync_point1 = -1
self.sync_point2 = -1
def get_info(self):
return self.info
def is_deleted(self):
return self.deleted
def get_items_since(self, sync_point, limit):
if sync_point < 0:
sync_point = 0
return self.items_since[sync_point:sync_point + limit]
def set_x_container_sync_points(self, sync_point1, sync_point2):
self.sync_point1 = sync_point1
self.sync_point2 = sync_point2
class TestContainerSync(unittest.TestCase):
def test_Iter2FileLikeObject(self):
flo = sync._Iter2FileLikeObject(iter(['123', '4567', '89', '0']))
expect = '1234567890'
got = flo.read(2)
self.assertTrue(len(got) <= 2)
self.assertEquals(got, expect[:len(got)])
expect = expect[len(got):]
got = flo.read(5)
self.assertTrue(len(got) <= 5)
self.assertEquals(got, expect[:len(got)])
expect = expect[len(got):]
self.assertEquals(flo.read(), expect)
self.assertEquals(flo.read(), '')
self.assertEquals(flo.read(2), '')
flo = sync._Iter2FileLikeObject(iter(['123', '4567', '89', '0']))
self.assertEquals(flo.read(), '1234567890')
self.assertEquals(flo.read(), '')
self.assertEquals(flo.read(2), '')
def test_init(self):
cring = FakeRing()
oring = FakeRing()
cs = sync.ContainerSync({}, container_ring=cring, object_ring=oring)
self.assertTrue(cs.container_ring is cring)
self.assertTrue(cs.object_ring is oring)
def test_run_forever(self):
# This runs runs_forever with fakes to succeed for two loops, the first
# causing a report but no interval sleep, the second no report but an
# interval sleep.
time_calls = [0]
sleep_calls = []
audit_location_generator_calls = [0]
def fake_time():
time_calls[0] += 1
returns = [1, # Initialized reported time
1, # Start time
3602, # Is it report time (yes)
3602, # Report time
3602, # Elapsed time for "under interval" (no)
3602, # Start time
3603, # Is it report time (no)
3603, # Elapsed time for "under interval" (yes)
]
if time_calls[0] == len(returns) + 1:
raise Exception('we are now done')
return returns[time_calls[0] - 1]
def fake_sleep(amount):
sleep_calls.append(amount)
def fake_audit_location_generator(*args, **kwargs):
audit_location_generator_calls[0] += 1
# Makes .container_sync() short-circuit because 'path' doesn't end
# with .db
return [('path', 'device', 'partition')]
orig_time = sync.time
orig_sleep = sync.sleep
orig_audit_location_generator = sync.audit_location_generator
try:
sync.time = fake_time
sync.sleep = fake_sleep
sync.audit_location_generator = fake_audit_location_generator
cs = sync.ContainerSync({}, container_ring=FakeRing(),
object_ring=FakeRing())
cs.run_forever()
except Exception, err:
if str(err) != 'we are now done':
raise
finally:
sync.time = orig_time
sync.sleep = orig_sleep
sync.audit_location_generator = orig_audit_location_generator
self.assertEquals(time_calls, [9])
self.assertEquals(len(sleep_calls), 2)
self.assertTrue(sleep_calls[0] <= cs.interval)
self.assertTrue(sleep_calls[1] == cs.interval - 1)
self.assertEquals(audit_location_generator_calls, [2])
self.assertEquals(cs.reported, 3602)
def test_run_once(self):
# This runs runs_once with fakes twice, the first causing an interim
# report, the second with no interm report.
time_calls = [0]
audit_location_generator_calls = [0]
def fake_time():
time_calls[0] += 1
returns = [1, # Initialized reported time
1, # Start time
3602, # Is it report time (yes)
3602, # Report time
3602, # End report time
3602, # For elapsed
3602, # Start time
3603, # Is it report time (no)
3604, # End report time
3605, # For elapsed
]
if time_calls[0] == len(returns) + 1:
raise Exception('we are now done')
return returns[time_calls[0] - 1]
def fake_audit_location_generator(*args, **kwargs):
audit_location_generator_calls[0] += 1
# Makes .container_sync() short-circuit because 'path' doesn't end
# with .db
return [('path', 'device', 'partition')]
orig_time = sync.time
orig_audit_location_generator = sync.audit_location_generator
try:
sync.time = fake_time
sync.audit_location_generator = fake_audit_location_generator
cs = sync.ContainerSync({}, container_ring=FakeRing(),
object_ring=FakeRing())
cs.run_once()
self.assertEquals(time_calls, [6])
self.assertEquals(audit_location_generator_calls, [1])
self.assertEquals(cs.reported, 3602)
cs.run_once()
except Exception, err:
if str(err) != 'we are now done':
raise
finally:
sync.time = orig_time
sync.audit_location_generator = orig_audit_location_generator
self.assertEquals(time_calls, [10])
self.assertEquals(audit_location_generator_calls, [2])
self.assertEquals(cs.reported, 3604)
def test_container_sync_not_db(self):
cring = FakeRing()
oring = FakeRing()
cs = sync.ContainerSync({}, container_ring=cring, object_ring=oring)
self.assertEquals(cs.container_failures, 0)
def test_container_sync_missing_db(self):
cring = FakeRing()
oring = FakeRing()
cs = sync.ContainerSync({}, container_ring=cring, object_ring=oring)
cs.container_sync('isa.db')
self.assertEquals(cs.container_failures, 1)
def test_container_sync_not_my_db(self):
# Db could be there due to handoff replication so test that we ignore
# those.
cring = FakeRing()
oring = FakeRing()
cs = sync.ContainerSync({}, container_ring=cring, object_ring=oring)
orig_ContainerBroker = sync.ContainerBroker
try:
sync.ContainerBroker = lambda p: FakeContainerBroker(p,
info={'account': 'a', 'container': 'c'})
cs._myips = ['127.0.0.1'] # No match
cs._myport = 1 # No match
cs.container_sync('isa.db')
self.assertEquals(cs.container_failures, 0)
cs._myips = ['10.0.0.0'] # Match
cs._myport = 1 # No match
cs.container_sync('isa.db')
self.assertEquals(cs.container_failures, 0)
cs._myips = ['127.0.0.1'] # No match
cs._myport = 1000 # Match
cs.container_sync('isa.db')
self.assertEquals(cs.container_failures, 0)
cs._myips = ['10.0.0.0'] # Match
cs._myport = 1000 # Match
# This complete match will cause the 1 container failure since the
# broker's info doesn't contain sync point keys
cs.container_sync('isa.db')
self.assertEquals(cs.container_failures, 1)
finally:
sync.ContainerBroker = orig_ContainerBroker
def test_container_sync_deleted(self):
cring = FakeRing()
oring = FakeRing()
cs = sync.ContainerSync({}, container_ring=cring, object_ring=oring)
orig_ContainerBroker = sync.ContainerBroker
try:
sync.ContainerBroker = lambda p: FakeContainerBroker(p,
info={'account': 'a', 'container': 'c'}, deleted=False)
cs._myips = ['10.0.0.0'] # Match
cs._myport = 1000 # Match
# This complete match will cause the 1 container failure since the
# broker's info doesn't contain sync point keys
cs.container_sync('isa.db')
self.assertEquals(cs.container_failures, 1)
sync.ContainerBroker = lambda p: FakeContainerBroker(p,
info={'account': 'a', 'container': 'c'}, deleted=True)
# This complete match will not cause any more container failures
# since the broker indicates deletion
cs.container_sync('isa.db')
self.assertEquals(cs.container_failures, 1)
finally:
sync.ContainerBroker = orig_ContainerBroker
def test_container_sync_no_to_or_key(self):
cring = FakeRing()
oring = FakeRing()
cs = sync.ContainerSync({}, container_ring=cring, object_ring=oring)
orig_ContainerBroker = sync.ContainerBroker
try:
sync.ContainerBroker = lambda p: FakeContainerBroker(p,
info={'account': 'a', 'container': 'c',
'x_container_sync_point1': -1,
'x_container_sync_point2': -1})
cs._myips = ['10.0.0.0'] # Match
cs._myport = 1000 # Match
# This complete match will be skipped since the broker's metadata
# has no x-container-sync-to or x-container-sync-key
cs.container_sync('isa.db')
self.assertEquals(cs.container_failures, 0)
self.assertEquals(cs.container_skips, 1)
sync.ContainerBroker = lambda p: FakeContainerBroker(p,
info={'account': 'a', 'container': 'c',
'x_container_sync_point1': -1,
'x_container_sync_point2': -1},
metadata={'x-container-sync-to': ('http://127.0.0.1/a/c', 1)})
cs._myips = ['10.0.0.0'] # Match
cs._myport = 1000 # Match
# This complete match will be skipped since the broker's metadata
# has no x-container-sync-key
cs.container_sync('isa.db')
self.assertEquals(cs.container_failures, 0)
self.assertEquals(cs.container_skips, 2)
sync.ContainerBroker = lambda p: FakeContainerBroker(p,
info={'account': 'a', 'container': 'c',
'x_container_sync_point1': -1,
'x_container_sync_point2': -1},
metadata={'x-container-sync-key': ('key', 1)})
cs._myips = ['10.0.0.0'] # Match
cs._myport = 1000 # Match
# This complete match will be skipped since the broker's metadata
# has no x-container-sync-to
cs.container_sync('isa.db')
self.assertEquals(cs.container_failures, 0)
self.assertEquals(cs.container_skips, 3)
sync.ContainerBroker = lambda p: FakeContainerBroker(p,
info={'account': 'a', 'container': 'c',
'x_container_sync_point1': -1,
'x_container_sync_point2': -1},
metadata={'x-container-sync-to': ('http://127.0.0.1/a/c', 1),
'x-container-sync-key': ('key', 1)})
cs._myips = ['10.0.0.0'] # Match
cs._myport = 1000 # Match
cs.allowed_sync_hosts = []
# This complete match will cause a container failure since the
# sync-to won't validate as allowed.
cs.container_sync('isa.db')
self.assertEquals(cs.container_failures, 1)
self.assertEquals(cs.container_skips, 3)
sync.ContainerBroker = lambda p: FakeContainerBroker(p,
info={'account': 'a', 'container': 'c',
'x_container_sync_point1': -1,
'x_container_sync_point2': -1},
metadata={'x-container-sync-to': ('http://127.0.0.1/a/c', 1),
'x-container-sync-key': ('key', 1)})
cs._myips = ['10.0.0.0'] # Match
cs._myport = 1000 # Match
cs.allowed_sync_hosts = ['127.0.0.1']
# This complete match will succeed completely since the broker
# get_items_since will return no new rows.
cs.container_sync('isa.db')
self.assertEquals(cs.container_failures, 1)
self.assertEquals(cs.container_skips, 3)
finally:
sync.ContainerBroker = orig_ContainerBroker
def test_container_stop_at(self):
cring = FakeRing()
oring = FakeRing()
cs = sync.ContainerSync({}, container_ring=cring, object_ring=oring)
orig_ContainerBroker = sync.ContainerBroker
orig_time = sync.time
try:
sync.ContainerBroker = lambda p: FakeContainerBroker(p,
info={'account': 'a', 'container': 'c',
'x_container_sync_point1': -1,
'x_container_sync_point2': -1},
metadata={'x-container-sync-to': ('http://127.0.0.1/a/c', 1),
'x-container-sync-key': ('key', 1)},
items_since=['erroneous data'])
cs._myips = ['10.0.0.0'] # Match
cs._myport = 1000 # Match
cs.allowed_sync_hosts = ['127.0.0.1']
# This sync will fail since the items_since data is bad.
cs.container_sync('isa.db')
self.assertEquals(cs.container_failures, 1)
self.assertEquals(cs.container_skips, 0)
# Set up fake times to make the sync short-circuit as having taken
# too long
fake_times = [
1.0, # Compute the time to move on
100000.0, # Compute if it's time to move on from first loop
100000.0] # Compute if it's time to move on from second loop
def fake_time():
return fake_times.pop(0)
sync.time = fake_time
# This same sync won't fail since it will look like it took so long
# as to be time to move on (before it ever actually tries to do
# anything).
cs.container_sync('isa.db')
self.assertEquals(cs.container_failures, 1)
self.assertEquals(cs.container_skips, 0)
finally:
sync.ContainerBroker = orig_ContainerBroker
sync.time = orig_time
def test_container_first_loop(self):
cring = FakeRing()
oring = FakeRing()
cs = sync.ContainerSync({}, container_ring=cring, object_ring=oring)
orig_ContainerBroker = sync.ContainerBroker
orig_hash_path = sync.hash_path
orig_delete_object = sync.delete_object
try:
def fake_hash_path(account, container, obj, raw_digest=False):
# Ensures that no rows match for full syncing, ordinal is 0 and
# all hashes are 0
return '\x00' * 16
sync.hash_path = fake_hash_path
fcb = FakeContainerBroker('path',
info={'account': 'a', 'container': 'c',
'x_container_sync_point1': 2,
'x_container_sync_point2': -1},
metadata={'x-container-sync-to': ('http://127.0.0.1/a/c', 1),
'x-container-sync-key': ('key', 1)},
items_since=[{'ROWID': 1, 'name': 'o'}])
sync.ContainerBroker = lambda p: fcb
cs._myips = ['10.0.0.0'] # Match
cs._myport = 1000 # Match
cs.allowed_sync_hosts = ['127.0.0.1']
cs.container_sync('isa.db')
# Succeeds because no rows match
self.assertEquals(cs.container_failures, 0)
self.assertEquals(cs.container_skips, 0)
self.assertEquals(fcb.sync_point1, None)
self.assertEquals(fcb.sync_point2, 1)
def fake_hash_path(account, container, obj, raw_digest=False):
# Ensures that all rows match for full syncing, ordinal is 0
# and all hashes are 1
return '\x01' * 16
sync.hash_path = fake_hash_path
fcb = FakeContainerBroker('path',
info={'account': 'a', 'container': 'c',
'x_container_sync_point1': 1,
'x_container_sync_point2': -1},
metadata={'x-container-sync-to': ('http://127.0.0.1/a/c', 1),
'x-container-sync-key': ('key', 1)},
items_since=[{'ROWID': 1, 'name': 'o'}])
sync.ContainerBroker = lambda p: fcb
cs._myips = ['10.0.0.0'] # Match
cs._myport = 1000 # Match
cs.allowed_sync_hosts = ['127.0.0.1']
cs.container_sync('isa.db')
# Succeeds because the two sync points haven't deviated enough yet
self.assertEquals(cs.container_failures, 0)
self.assertEquals(cs.container_skips, 0)
self.assertEquals(fcb.sync_point1, -1)
self.assertEquals(fcb.sync_point2, -1)
fcb = FakeContainerBroker('path',
info={'account': 'a', 'container': 'c',
'x_container_sync_point1': 2,
'x_container_sync_point2': -1},
metadata={'x-container-sync-to': ('http://127.0.0.1/a/c', 1),
'x-container-sync-key': ('key', 1)},
items_since=[{'ROWID': 1, 'name': 'o'}])
sync.ContainerBroker = lambda p: fcb
cs._myips = ['10.0.0.0'] # Match
cs._myport = 1000 # Match
cs.allowed_sync_hosts = ['127.0.0.1']
cs.container_sync('isa.db')
# Fails because container_sync_row will fail since the row has no
# 'deleted' key
self.assertEquals(cs.container_failures, 1)
self.assertEquals(cs.container_skips, 0)
self.assertEquals(fcb.sync_point1, -1)
self.assertEquals(fcb.sync_point2, -1)
fcb = FakeContainerBroker('path',
info={'account': 'a', 'container': 'c',
'x_container_sync_point1': 2,
'x_container_sync_point2': -1},
metadata={'x-container-sync-to': ('http://127.0.0.1/a/c', 1),
'x-container-sync-key': ('key', 1)},
items_since=[{'ROWID': 1, 'name': 'o', 'created_at': '1.2',
'deleted': True}])
sync.ContainerBroker = lambda p: fcb
cs._myips = ['10.0.0.0'] # Match
cs._myport = 1000 # Match
cs.allowed_sync_hosts = ['127.0.0.1']
cs.container_sync('isa.db')
# Fails because delete_object fails
self.assertEquals(cs.container_failures, 2)
self.assertEquals(cs.container_skips, 0)
self.assertEquals(fcb.sync_point1, -1)
self.assertEquals(fcb.sync_point2, -1)
def fake_delete_object(*args, **kwargs):
pass
sync.delete_object = fake_delete_object
fcb = FakeContainerBroker('path',
info={'account': 'a', 'container': 'c',
'x_container_sync_point1': 2,
'x_container_sync_point2': -1},
metadata={'x-container-sync-to': ('http://127.0.0.1/a/c', 1),
'x-container-sync-key': ('key', 1)},
items_since=[{'ROWID': 1, 'name': 'o', 'created_at': '1.2',
'deleted': True}])
sync.ContainerBroker = lambda p: fcb
cs._myips = ['10.0.0.0'] # Match
cs._myport = 1000 # Match
cs.allowed_sync_hosts = ['127.0.0.1']
cs.container_sync('isa.db')
# Succeeds because delete_object succeeds
self.assertEquals(cs.container_failures, 2)
self.assertEquals(cs.container_skips, 0)
self.assertEquals(fcb.sync_point1, None)
self.assertEquals(fcb.sync_point2, 1)
finally:
sync.ContainerBroker = orig_ContainerBroker
sync.hash_path = orig_hash_path
sync.delete_object = orig_delete_object
def test_container_second_loop(self):
cring = FakeRing()
oring = FakeRing()
cs = sync.ContainerSync({}, container_ring=cring, object_ring=oring)
orig_ContainerBroker = sync.ContainerBroker
orig_hash_path = sync.hash_path
orig_delete_object = sync.delete_object
try:
# We'll ensure the first loop is always skipped by keeping the two
# sync points equal
def fake_hash_path(account, container, obj, raw_digest=False):
# Ensures that no rows match for second loop, ordinal is 0 and
# all hashes are 1
return '\x01' * 16
sync.hash_path = fake_hash_path
fcb = FakeContainerBroker('path',
info={'account': 'a', 'container': 'c',
'x_container_sync_point1': -1,
'x_container_sync_point2': -1},
metadata={'x-container-sync-to': ('http://127.0.0.1/a/c', 1),
'x-container-sync-key': ('key', 1)},
items_since=[{'ROWID': 1, 'name': 'o'}])
sync.ContainerBroker = lambda p: fcb
cs._myips = ['10.0.0.0'] # Match
cs._myport = 1000 # Match
cs.allowed_sync_hosts = ['127.0.0.1']
cs.container_sync('isa.db')
# Succeeds because no rows match
self.assertEquals(cs.container_failures, 0)
self.assertEquals(cs.container_skips, 0)
self.assertEquals(fcb.sync_point1, 1)
self.assertEquals(fcb.sync_point2, None)
def fake_hash_path(account, container, obj, raw_digest=False):
# Ensures that all rows match for second loop, ordinal is 0 and
# all hashes are 0
return '\x00' * 16
def fake_delete_object(*args, **kwargs):
pass
sync.hash_path = fake_hash_path
sync.delete_object = fake_delete_object
fcb = FakeContainerBroker('path',
info={'account': 'a', 'container': 'c',
'x_container_sync_point1': -1,
'x_container_sync_point2': -1},
metadata={'x-container-sync-to': ('http://127.0.0.1/a/c', 1),
'x-container-sync-key': ('key', 1)},
items_since=[{'ROWID': 1, 'name': 'o'}])
sync.ContainerBroker = lambda p: fcb
cs._myips = ['10.0.0.0'] # Match
cs._myport = 1000 # Match
cs.allowed_sync_hosts = ['127.0.0.1']
cs.container_sync('isa.db')
# Fails because row is missing 'deleted' key
self.assertEquals(cs.container_failures, 1)
self.assertEquals(cs.container_skips, 0)
self.assertEquals(fcb.sync_point1, -1)
self.assertEquals(fcb.sync_point2, -1)
fcb = FakeContainerBroker('path',
info={'account': 'a', 'container': 'c',
'x_container_sync_point1': -1,
'x_container_sync_point2': -1},
metadata={'x-container-sync-to': ('http://127.0.0.1/a/c', 1),
'x-container-sync-key': ('key', 1)},
items_since=[{'ROWID': 1, 'name': 'o', 'created_at': '1.2',
'deleted': True}])
sync.ContainerBroker = lambda p: fcb
cs._myips = ['10.0.0.0'] # Match
cs._myport = 1000 # Match
cs.allowed_sync_hosts = ['127.0.0.1']
cs.container_sync('isa.db')
# Succeeds because row now has 'deleted' key and delete_object
# succeeds
self.assertEquals(cs.container_failures, 1)
self.assertEquals(cs.container_skips, 0)
self.assertEquals(fcb.sync_point1, 1)
self.assertEquals(fcb.sync_point2, None)
finally:
sync.ContainerBroker = orig_ContainerBroker
sync.hash_path = orig_hash_path
sync.delete_object = orig_delete_object
def test_container_sync_row_delete(self):
orig_delete_object = sync.delete_object
try:
def fake_delete_object(path, name=None, headers=None, proxy=None):
self.assertEquals(path, 'http://sync/to/path')
self.assertEquals(name, 'object')
self.assertEquals(headers,
{'x-container-sync-key': 'key', 'x-timestamp': '1.2'})
self.assertEquals(proxy, 'http://proxy')
sync.delete_object = fake_delete_object
cs = sync.ContainerSync({}, container_ring=FakeRing(),
object_ring=FakeRing())
cs.proxy = 'http://proxy'
# Success
self.assertTrue(cs.container_sync_row({'deleted': True,
'name': 'object', 'created_at': '1.2'}, 'http://sync/to/path',
'key', FakeContainerBroker('broker'), 'info'))
self.assertEquals(cs.container_deletes, 1)
exc = []
def fake_delete_object(path, name=None, headers=None, proxy=None):
exc.append(Exception('test exception'))
raise exc[-1]
sync.delete_object = fake_delete_object
# Failure because of delete_object exception
self.assertFalse(cs.container_sync_row({'deleted': True,
'name': 'object', 'created_at': '1.2'}, 'http://sync/to/path',
'key', FakeContainerBroker('broker'), 'info'))
self.assertEquals(cs.container_deletes, 1)
self.assertEquals(len(exc), 1)
self.assertEquals(str(exc[-1]), 'test exception')
def fake_delete_object(path, name=None, headers=None, proxy=None):
exc.append(ClientException('test client exception'))
raise exc[-1]
sync.delete_object = fake_delete_object
# Failure because of delete_object exception
self.assertFalse(cs.container_sync_row({'deleted': True,
'name': 'object', 'created_at': '1.2'}, 'http://sync/to/path',
'key', FakeContainerBroker('broker'), 'info'))
self.assertEquals(cs.container_deletes, 1)
self.assertEquals(len(exc), 2)
self.assertEquals(str(exc[-1]), 'test client exception')
def fake_delete_object(path, name=None, headers=None, proxy=None):
exc.append(ClientException('test client exception',
http_status=404))
raise exc[-1]
sync.delete_object = fake_delete_object
# Success because the object wasn't even found
self.assertTrue(cs.container_sync_row({'deleted': True,
'name': 'object', 'created_at': '1.2'}, 'http://sync/to/path',
'key', FakeContainerBroker('broker'), 'info'))
self.assertEquals(cs.container_deletes, 2)
self.assertEquals(len(exc), 3)
self.assertEquals(str(exc[-1]), 'test client exception: 404')
finally:
sync.delete_object = orig_delete_object
def test_container_sync_row_put(self):
orig_shuffle = sync.shuffle
orig_put_object = sync.put_object
orig_direct_get_object = sync.direct_get_object
try:
sync.shuffle = lambda x: x
def fake_put_object(sync_to, name=None, headers=None,
contents=None, proxy=None):
self.assertEquals(sync_to, 'http://sync/to/path')
self.assertEquals(name, 'object')
self.assertEquals(headers, {'x-container-sync-key': 'key',
'x-timestamp': '1.2',
'other-header': 'other header value',
'etag': 'etagvalue'})
self.assertEquals(contents.read(), 'contents')
self.assertEquals(proxy, 'http://proxy')
sync.put_object = fake_put_object
cs = sync.ContainerSync({}, container_ring=FakeRing(),
object_ring=FakeRing())
cs.proxy = 'http://proxy'
def fake_direct_get_object(node, part, account, container, obj,
resp_chunk_size=1):
return ({'other-header': 'other header value',
'etag': '"etagvalue"', 'x-timestamp': '1.2'},
iter('contents'))
sync.direct_get_object = fake_direct_get_object
# Success as everything says it worked
self.assertTrue(cs.container_sync_row({'deleted': False,
'name': 'object', 'created_at': '1.2'}, 'http://sync/to/path',
'key', FakeContainerBroker('broker'), {'account': 'a',
'container': 'c'}))
self.assertEquals(cs.container_puts, 1)
def fake_direct_get_object(node, part, account, container, obj,
resp_chunk_size=1):
return ({'date': 'date value',
'last-modified': 'last modified value',
'x-timestamp': '1.2',
'other-header': 'other header value',
'etag': '"etagvalue"'},
iter('contents'))
sync.direct_get_object = fake_direct_get_object
# Success as everything says it worked, also checks 'date' and
# 'last-modified' headers are removed and that 'etag' header is
# stripped of double quotes.
self.assertTrue(cs.container_sync_row({'deleted': False,
'name': 'object', 'created_at': '1.2'}, 'http://sync/to/path',
'key', FakeContainerBroker('broker'), {'account': 'a',
'container': 'c'}))
self.assertEquals(cs.container_puts, 2)
exc = []
def fake_direct_get_object(node, part, account, container, obj,
resp_chunk_size=1):
exc.append(Exception('test exception'))
raise exc[-1]
sync.direct_get_object = fake_direct_get_object
# Fail due to completely unexpected exception
self.assertFalse(cs.container_sync_row({'deleted': False,
'name': 'object', 'created_at': '1.2'}, 'http://sync/to/path',
'key', FakeContainerBroker('broker'), {'account': 'a',
'container': 'c'}))
self.assertEquals(cs.container_puts, 2)
self.assertEquals(len(exc), 1)
self.assertEquals(str(exc[-1]), 'test exception')
def fake_direct_get_object(node, part, account, container, obj,
resp_chunk_size=1):
exc.append(ClientException('test client exception'))
raise exc[-1]
sync.direct_get_object = fake_direct_get_object
# Fail due to all direct_get_object calls failing
self.assertFalse(cs.container_sync_row({'deleted': False,
'name': 'object', 'created_at': '1.2'}, 'http://sync/to/path',
'key', FakeContainerBroker('broker'), {'account': 'a',
'container': 'c'}))
self.assertEquals(cs.container_puts, 2)
self.assertEquals(len(exc), 4)
self.assertEquals(str(exc[-1]), 'test client exception')
def fake_direct_get_object(node, part, account, container, obj,
resp_chunk_size=1):
return ({'other-header': 'other header value',
'x-timestamp': '1.2', 'etag': '"etagvalue"'},
iter('contents'))
def fake_put_object(sync_to, name=None, headers=None,
contents=None, proxy=None):
raise ClientException('test client exception', http_status=401)
class FakeLogger(object):
def __init__(self):
self.err = ''
self.exc = ''
def info(self, err, *args, **kwargs):
self.err = err
def exception(self, exc, *args, **kwargs):
self.exc = exc
sync.direct_get_object = fake_direct_get_object
sync.put_object = fake_put_object
cs.logger = FakeLogger()
# Fail due to 401
self.assertFalse(cs.container_sync_row({'deleted': False,
'name': 'object', 'created_at': '1.2'}, 'http://sync/to/path',
'key', FakeContainerBroker('broker'), {'account': 'a',
'container': 'c'}))
self.assertEquals(cs.container_puts, 2)
self.assertTrue(cs.logger.err.startswith('Unauth '))
def fake_put_object(sync_to, name=None, headers=None,
contents=None, proxy=None):
raise ClientException('test client exception', http_status=404)
sync.put_object = fake_put_object
# Fail due to 404
self.assertFalse(cs.container_sync_row({'deleted': False,
'name': 'object', 'created_at': '1.2'}, 'http://sync/to/path',
'key', FakeContainerBroker('broker'), {'account': 'a',
'container': 'c'}))
self.assertEquals(cs.container_puts, 2)
self.assertTrue(cs.logger.err.startswith('Not found '))
def fake_put_object(sync_to, name=None, headers=None,
contents=None, proxy=None):
raise ClientException('test client exception', http_status=503)
sync.put_object = fake_put_object
# Fail due to 503
self.assertFalse(cs.container_sync_row({'deleted': False,
'name': 'object', 'created_at': '1.2'}, 'http://sync/to/path',
'key', FakeContainerBroker('broker'), {'account': 'a',
'container': 'c'}))
self.assertEquals(cs.container_puts, 2)
self.assertTrue(cs.logger.exc.startswith('ERROR Syncing '))
finally:
sync.shuffle = orig_shuffle
sync.put_object = orig_put_object
sync.direct_get_object = orig_direct_get_object
if __name__ == '__main__':
unittest.main()