diff --git a/bin/swift b/bin/swift index 5db2d7505a..3d9115780f 100755 --- a/bin/swift +++ b/bin/swift @@ -150,22 +150,27 @@ class ClientException(Exception): return b and '%s: %s' % (a, b) or a -def http_connection(url): +def http_connection(url, proxy=None): """ Make an HTTPConnection or HTTPSConnection :param url: url to connect to + :param proxy: proxy to connect through, if any; None by default; str of the + format 'http://127.0.0.1:8888' to set one :returns: tuple of (parsed url, connection object) :raises ClientException: Unable to handle protocol scheme """ parsed = urlparse(url) + proxy_parsed = urlparse(proxy) if proxy else None if parsed.scheme == 'http': - conn = HTTPConnection(parsed.netloc) + conn = HTTPConnection((proxy_parsed if proxy else parsed).netloc) elif parsed.scheme == 'https': - conn = HTTPSConnection(parsed.netloc) + conn = HTTPSConnection((proxy_parsed if proxy else parsed).netloc) else: raise ClientException('Cannot handle protocol scheme %s for url %s' % (parsed.scheme, repr(url))) + if proxy: + conn._set_tunnel(parsed.hostname, parsed.port) return parsed, conn @@ -578,40 +583,60 @@ def head_object(url, token, container, name, http_conn=None): return resp_headers -def put_object(url, token, container, name, contents, content_length=None, - etag=None, chunk_size=65536, content_type=None, headers=None, - http_conn=None): +def put_object(url, token=None, container=None, name=None, contents=None, + content_length=None, etag=None, chunk_size=65536, + content_type=None, headers=None, http_conn=None, proxy=None): """ Put an object :param url: storage URL - :param token: auth token - :param container: container name that the object is in - :param name: object name to put - :param contents: a string or a file like object to read object data from + :param token: auth token; if None, no token will be sent + :param container: container name that the object is in; if None, the + container name is expected to be part of the url + :param name: object name to put; if None, the object name is expected to be + part of the url + :param contents: a string or a file like object to read object data from; + if None, a zero-byte put will be done :param content_length: value to send as content-length header; also limits - the amount read from contents - :param etag: etag of contents - :param chunk_size: chunk size of data to write - :param content_type: value to send as content-type header - :param headers: additional headers to include in the request + the amount read from contents; if None, it will be + computed via the contents or chunked transfer + encoding will be used + :param etag: etag of contents; if None, no etag will be sent + :param chunk_size: chunk size of data to write; default 65536 + :param content_type: value to send as content-type header; if None, no + content-type will be set (remote end will likely try + to auto-detect it) + :param headers: additional headers to include in the request, if any :param http_conn: HTTP connection object (If None, it will create the conn object) + :param proxy: proxy to connect through, if any; None by default; str of the + format 'http://127.0.0.1:8888' to set one :returns: etag from server response :raises ClientException: HTTP PUT request failed """ if http_conn: parsed, conn = http_conn else: - parsed, conn = http_connection(url) - path = '%s/%s/%s' % (parsed.path, quote(container), quote(name)) - if not headers: + parsed, conn = http_connection(url, proxy=proxy) + path = parsed.path + if container: + path = '%s/%s' % (path.rstrip('/'), quote(container)) + if name: + path = '%s/%s' % (path.rstrip('/'), quote(name)) + if headers: + headers = dict(headers) + else: headers = {} - headers['X-Auth-Token'] = token + if token: + headers['X-Auth-Token'] = token if etag: headers['ETag'] = etag.strip('"') if content_length is not None: headers['Content-Length'] = str(content_length) + else: + for n, v in headers.iteritems(): + if n.lower() == 'content-length': + content_length = int(v) if content_type is not None: headers['Content-Type'] = content_type if not contents: @@ -646,7 +671,7 @@ def put_object(url, token, container, name, contents, content_length=None, raise ClientException('Object PUT failed', http_scheme=parsed.scheme, http_host=conn.host, http_port=conn.port, http_path=path, http_status=resp.status, http_reason=resp.reason) - return resp.getheader('etag').strip('"') + return resp.getheader('etag', '').strip('"') def post_object(url, token, container, name, headers, http_conn=None): @@ -677,24 +702,40 @@ def post_object(url, token, container, name, headers, http_conn=None): http_status=resp.status, http_reason=resp.reason) -def delete_object(url, token, container, name, http_conn=None): +def delete_object(url, token=None, container=None, name=None, http_conn=None, + headers=None, proxy=None): """ Delete object :param url: storage URL - :param token: auth token - :param container: container name that the object is in - :param name: object name to delete + :param token: auth token; if None, no token will be sent + :param container: container name that the object is in; if None, the + container name is expected to be part of the url + :param name: object name to delete; if None, the object name is expected to + be part of the url :param http_conn: HTTP connection object (If None, it will create the conn object) + :param headers: additional headers to include in the request + :param proxy: proxy to connect through, if any; None by default; str of the + format 'http://127.0.0.1:8888' to set one :raises ClientException: HTTP DELETE request failed """ if http_conn: parsed, conn = http_conn else: - parsed, conn = http_connection(url) - path = '%s/%s/%s' % (parsed.path, quote(container), quote(name)) - conn.request('DELETE', path, '', {'X-Auth-Token': token}) + parsed, conn = http_connection(url, proxy=proxy) + path = parsed.path + if container: + path = '%s/%s' % (path.rstrip('/'), quote(container)) + if name: + path = '%s/%s' % (path.rstrip('/'), quote(name)) + if headers: + headers = dict(headers) + else: + headers = {} + if token: + headers['X-Auth-Token'] = token + conn.request('DELETE', path, '', headers) resp = conn.getresponse() resp.read() if resp.status < 200 or resp.status >= 300: @@ -1363,10 +1404,14 @@ Container: %s Objects: %d Bytes: %d Read ACL: %s -Write ACL: %s'''.strip('\n') % (conn.url.rsplit('/', 1)[-1], args[0], +Write ACL: %s + Sync To: %s + Sync Key: %s'''.strip('\n') % (conn.url.rsplit('/', 1)[-1], args[0], object_count, bytes_used, headers.get('x-container-read', ''), - headers.get('x-container-write', ''))) + headers.get('x-container-write', ''), + headers.get('x-container-sync-to', ''), + headers.get('x-container-sync-key', ''))) for key, value in headers.items(): if key.startswith('x-container-meta-'): print_queue.put('%9s: %s' % ('Meta %s' % @@ -1375,7 +1420,8 @@ Write ACL: %s'''.strip('\n') % (conn.url.rsplit('/', 1)[-1], args[0], if not key.startswith('x-container-meta-') and key not in ( 'content-length', 'date', 'x-container-object-count', 'x-container-bytes-used', 'x-container-read', - 'x-container-write'): + 'x-container-write', 'x-container-sync-to', + 'x-container-sync-key'): print_queue.put( '%9s: %s' % (key.title(), value)) except ClientException, err: @@ -1440,13 +1486,18 @@ def st_post(options, args, print_queue, error_queue): parser.add_option('-w', '--write-acl', dest='write_acl', help='Sets the ' 'Write ACL for containers. Quick summary of ACL syntax: account1, ' 'account2:user2') + parser.add_option('-t', '--sync-to', dest='sync_to', help='Sets the ' + 'Sync To for containers, for multi-cluster replication.') + parser.add_option('-k', '--sync-key', dest='sync_key', help='Sets the ' + 'Sync Key for containers, for multi-cluster replication.') parser.add_option('-m', '--meta', action='append', dest='meta', default=[], help='Sets a meta data item with the syntax name:value. This option ' 'may be repeated. Example: -m Color:Blue -m Size:Large') (options, args) = parse_args(parser, args) args = args[1:] - if (options.read_acl or options.write_acl) and not args: - exit('-r and -w options only allowed for containers') + if (options.read_acl or options.write_acl or options.sync_to or + options.sync_key) and not args: + exit('-r, -w, -t, and -k options only allowed for containers') conn = Connection(options.auth, options.user, options.key) if not args: headers = {} @@ -1474,6 +1525,10 @@ def st_post(options, args, print_queue, error_queue): headers['X-Container-Read'] = options.read_acl if options.write_acl is not None: headers['X-Container-Write'] = options.write_acl + if options.sync_to is not None: + headers['X-Container-Sync-To'] = options.sync_to + if options.sync_key is not None: + headers['X-Container-Sync-Key'] = options.sync_key try: conn.post_container(args[0], headers=headers) except ClientException, err: diff --git a/bin/swift-container-sync b/bin/swift-container-sync new file mode 100755 index 0000000000..8310faa802 --- /dev/null +++ b/bin/swift-container-sync @@ -0,0 +1,23 @@ +#!/usr/bin/python +# Copyright (c) 2010-2011 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from swift.container.sync import ContainerSync +from swift.common.utils import parse_options +from swift.common.daemon import run_daemon + +if __name__ == '__main__': + conf_file, options = parse_options(once=True) + run_daemon(ContainerSync, conf_file, **options) diff --git a/doc/source/container.rst b/doc/source/container.rst index ca6d16c91c..d80adcaa32 100644 --- a/doc/source/container.rst +++ b/doc/source/container.rst @@ -34,3 +34,10 @@ Container Auditor :undoc-members: :show-inheritance: +Container Sync +============== + +.. automodule:: swift.container.sync + :members: + :undoc-members: + :show-inheritance: diff --git a/doc/source/development_saio.rst b/doc/source/development_saio.rst index c32c83bc1e..642bdcc670 100644 --- a/doc/source/development_saio.rst +++ b/doc/source/development_saio.rst @@ -400,6 +400,8 @@ Sample configuration files are provided with all defaults in line-by-line commen [container-auditor] + [container-sync] + #. Create `/etc/swift/container-server/2.conf`:: [DEFAULT] @@ -422,6 +424,8 @@ Sample configuration files are provided with all defaults in line-by-line commen [container-auditor] + [container-sync] + #. Create `/etc/swift/container-server/3.conf`:: [DEFAULT] @@ -444,6 +448,8 @@ Sample configuration files are provided with all defaults in line-by-line commen [container-auditor] + [container-sync] + #. Create `/etc/swift/container-server/4.conf`:: [DEFAULT] @@ -466,6 +472,8 @@ Sample configuration files are provided with all defaults in line-by-line commen [container-auditor] + [container-sync] + #. Create `/etc/swift/object-server/1.conf`:: diff --git a/doc/source/index.rst b/doc/source/index.rst index 7a3b742337..5a7cc228e5 100644 --- a/doc/source/index.rst +++ b/doc/source/index.rst @@ -44,6 +44,7 @@ Overview and Concepts overview_replication ratelimit overview_large_objects + overview_container_sync Developer Documentation ======================= diff --git a/doc/source/overview_auth.rst b/doc/source/overview_auth.rst index aa1cfbc9e9..ab637fc798 100644 --- a/doc/source/overview_auth.rst +++ b/doc/source/overview_auth.rst @@ -27,11 +27,17 @@ validation. Swift will make calls to the auth system, giving the auth token to be validated. For a valid token, the auth system responds with an overall expiration in seconds from now. Swift will cache the token up to the expiration -time. The included TempAuth also has the concept of admin and non-admin users -within an account. Admin users can do anything within the account. Non-admin -users can only perform operations per container based on the container's -X-Container-Read and X-Container-Write ACLs. For more information on ACLs, see -:mod:`swift.common.middleware.acl` +time. + +The included TempAuth also has the concept of admin and non-admin users within +an account. Admin users can do anything within the account. Non-admin users can +only perform operations per container based on the container's X-Container-Read +and X-Container-Write ACLs. For more information on ACLs, see +:mod:`swift.common.middleware.acl`. + +Additionally, if the auth system sets the request environ's swift_owner key to +True, the proxy will return additional header information in some requests, +such as the X-Container-Sync-Key for a container GET or HEAD. The user starts a session by sending a ReST request to the auth system to receive the auth token and a URL to the Swift system. diff --git a/doc/source/overview_container_sync.rst b/doc/source/overview_container_sync.rst new file mode 100644 index 0000000000..3fa7a47012 --- /dev/null +++ b/doc/source/overview_container_sync.rst @@ -0,0 +1,228 @@ +====================================== +Container to Container Synchronization +====================================== + +-------- +Overview +-------- + +Swift has a feature where all the contents of a container can be mirrored to +another container through background synchronization. Swift cluster operators +configure their cluster to allow/accept sync requests to/from other clusters, +and the user specifies where to sync their container to along with a secret +synchronization key. + +.. note:: + + Container sync will sync object POSTs only if the proxy server is set to + use "object_post_as_copy = true" which is the default. So-called fast + object posts, "object_post_as_copy = false" do not update the container + listings and therefore can't be detected for synchronization. + +.. note:: + + If you are using the large objects feature you will need to ensure both + your manifest file and your segment files are synced if they happen to be + in different containers. + +-------------------------------------------- +Configuring a Cluster's Allowable Sync Hosts +-------------------------------------------- + +The Swift cluster operator must allow synchronization with a set of hosts +before the user can enable container synchronization. First, the backend +container server needs to be given this list of hosts in the +container-server.conf file:: + + [DEFAULT] + # This is a comma separated list of hosts allowed in the + # X-Container-Sync-To field for containers. + # allowed_sync_hosts = 127.0.0.1 + allowed_sync_hosts = host1,host2,etc. + ... + + [container-sync] + # You can override the default log routing for this app here (don't + # use set!): + # log_name = container-sync + # log_facility = LOG_LOCAL0 + # log_level = INFO + # Will sync, at most, each container once per interval + # interval = 300 + # Maximum amount of time to spend syncing each container + # container_time = 60 + +Tracking sync progress, problems, and just general activity can only be +achieved with log processing for this first release of container +synchronization. In that light, you may wish to set the above `log_` options to +direct the container-sync logs to a different file for easier monitoring. +Additionally, it should be noted there is no way for an end user to detect sync +progress or problems other than HEADing both containers and comparing the +overall information. + +The authentication system also needs to be configured to allow synchronization +requests. Here is an example with TempAuth:: + + [filter:tempauth] + # This is a comma separated list of hosts allowed to send + # X-Container-Sync-Key requests. + # allowed_sync_hosts = 127.0.0.1 + allowed_sync_hosts = host1,host2,etc. + +The default of 127.0.0.1 is just so no configuration is required for SAIO +setups -- for testing. + +---------------------------------------------------------- +Using the ``swift`` tool to set up synchronized containers +---------------------------------------------------------- + +.. note:: + + You must be the account admin on the account to set synchronization targets + and keys. + +You simply tell each container where to sync to and give it a secret +synchronization key. First, let's get the account details for our two cluster +accounts:: + + $ swift -A http://cluster1/auth/v1.0 -U test:tester -K testing stat -v + StorageURL: http://cluster1/v1/AUTH_208d1854-e475-4500-b315-81de645d060e + Auth Token: AUTH_tkd5359e46ff9e419fa193dbd367f3cd19 + Account: AUTH_208d1854-e475-4500-b315-81de645d060e + Containers: 0 + Objects: 0 + Bytes: 0 + + $ swift -A http://cluster2/auth/v1.0 -U test2:tester2 -K testing2 stat -v + StorageURL: http://cluster2/v1/AUTH_33cdcad8-09fb-4940-90da-0f00cbf21c7c + Auth Token: AUTH_tk816a1aaf403c49adb92ecfca2f88e430 + Account: AUTH_33cdcad8-09fb-4940-90da-0f00cbf21c7c + Containers: 0 + Objects: 0 + Bytes: 0 + +Now, let's make our first container and tell it to synchronize to a second +we'll make next:: + + $ swift -A http://cluster1/auth/v1.0 -U test:tester -K testing post \ + -t 'http://cluster2/v1/AUTH_33cdcad8-09fb-4940-90da-0f00cbf21c7c/container2' \ + -k 'secret' container1 + +The ``-t`` indicates the URL to sync to, which is the ``StorageURL`` from +cluster2 we retrieved above plus the container name. The ``-k`` specifies the +secret key the two containers will share for synchronization. Now, we'll do +something similar for the second cluster's container:: + + $ swift -A http://cluster2/auth/v1.0 -U test2:tester2 -K testing2 post \ + -t 'http://cluster1/v1/AUTH_208d1854-e475-4500-b315-81de645d060e/container1' \ + -k 'secret' container2 + +That's it. Now we can upload a bunch of stuff to the first container and watch +as it gets synchronized over to the second:: + + $ swift -A http://cluster1/auth/v1.0 -U test:tester -K testing \ + upload container1 . + photo002.png + photo004.png + photo001.png + photo003.png + + $ swift -A http://cluster2/auth/v1.0 -U test2:tester2 -K testing2 \ + list container2 + + [Nothing there yet, so we wait a bit...] + [If you're an operator running SAIO and just testing, you may need to + run 'swift-init container-sync once' to perform a sync scan.] + + $ swift -A http://cluster2/auth/v1.0 -U test2:tester2 -K testing2 \ + list container2 + photo001.png + photo002.png + photo003.png + photo004.png + +You can also set up a chain of synced containers if you want more than two. +You'd point 1 -> 2, then 2 -> 3, and finally 3 -> 1 for three containers. +They'd all need to share the same secret synchronization key. + +----------------------------------- +Using curl (or other tools) instead +----------------------------------- + +So what's ``swift`` doing behind the scenes? Nothing overly complicated. It +translates the ``-t `` option into an ``X-Container-Sync-To: `` +header and the ``-k `` option into an ``X-Container-Sync-Key: `` +header. + +For instance, when we created the first container above and told it to +synchronize to the second, we could have used this curl command:: + + $ curl -i -X POST -H 'X-Auth-Token: AUTH_tkd5359e46ff9e419fa193dbd367f3cd19' \ + -H 'X-Container-Sync-To: http://cluster2/v1/AUTH_33cdcad8-09fb-4940-90da-0f00cbf21c7c/container2' \ + -H 'X-Container-Sync-Key: secret' \ + 'http://cluster1/v1/AUTH_208d1854-e475-4500-b315-81de645d060e/container1' + HTTP/1.1 204 No Content + Content-Length: 0 + Content-Type: text/plain; charset=UTF-8 + Date: Thu, 24 Feb 2011 22:39:14 GMT + +-------------------------------------------------- +What's going on behind the scenes, in the cluster? +-------------------------------------------------- + +The swift-container-sync does the job of sending updates to the remote +container. + +This is done by scanning the local devices for container databases and +checking for x-container-sync-to and x-container-sync-key metadata values. +If they exist, newer rows since the last sync will trigger PUTs or DELETEs +to the other container. + +.. note:: + + Container sync will sync object POSTs only if the proxy server is set to + use "object_post_as_copy = true" which is the default. So-called fast + object posts, "object_post_as_copy = false" do not update the container + listings and therefore can't be detected for synchronization. + +The actual syncing is slightly more complicated to make use of the three +(or number-of-replicas) main nodes for a container without each trying to +do the exact same work but also without missing work if one node happens to +be down. + +Two sync points are kept per container database. All rows between the two +sync points trigger updates. Any rows newer than both sync points cause +updates depending on the node's position for the container (primary nodes +do one third, etc. depending on the replica count of course). After a sync +run, the first sync point is set to the newest ROWID known and the second +sync point is set to newest ROWID for which all updates have been sent. + +An example may help. Assume replica count is 3 and perfectly matching +ROWIDs starting at 1. + + First sync run, database has 6 rows: + + * SyncPoint1 starts as -1. + * SyncPoint2 starts as -1. + * No rows between points, so no "all updates" rows. + * Six rows newer than SyncPoint1, so a third of the rows are sent + by node 1, another third by node 2, remaining third by node 3. + * SyncPoint1 is set as 6 (the newest ROWID known). + * SyncPoint2 is left as -1 since no "all updates" rows were synced. + + Next sync run, database has 12 rows: + + * SyncPoint1 starts as 6. + * SyncPoint2 starts as -1. + * The rows between -1 and 6 all trigger updates (most of which + should short-circuit on the remote end as having already been + done). + * Six more rows newer than SyncPoint1, so a third of the rows are + sent by node 1, another third by node 2, remaining third by node + 3. + * SyncPoint1 is set as 12 (the newest ROWID known). + * SyncPoint2 is set as 6 (the newest "all updates" ROWID). + +In this way, under normal circumstances each node sends its share of +updates each run and just sends a batch of older updates to ensure nothing +was missed. diff --git a/doc/source/overview_large_objects.rst b/doc/source/overview_large_objects.rst index 923afb872a..01477497d9 100644 --- a/doc/source/overview_large_objects.rst +++ b/doc/source/overview_large_objects.rst @@ -13,9 +13,9 @@ special manifest file is created that, when downloaded, sends all the segments concatenated as a single object. This also offers much greater upload speed with the possibility of parallel uploads of the segments. ----------------------------------- +------------------------------------- Using ``swift`` for Segmented Objects ----------------------------------- +------------------------------------- The quickest way to try out this feature is use the included ``swift`` Swift Tool. You can use the ``-S`` option to specify the segment size to use when splitting @@ -120,6 +120,13 @@ Additional Notes for the manifest itself, so this method was chosen to at least offer change detection. + +.. note:: + + If you are using the container sync feature you will need to ensure both + your manifest file and your segment files are synced if they happen to be + in different containers. + ------- History ------- diff --git a/doc/source/ratelimit.rst b/doc/source/ratelimit.rst index 4924b71153..3b5f95bd03 100644 --- a/doc/source/ratelimit.rst +++ b/doc/source/ratelimit.rst @@ -35,20 +35,17 @@ rate_buffer_seconds 5 Number of seconds the rate counter can faster than listed rate). A larger number will result in larger spikes in rate but better average accuracy. -account_ratelimit 0 If set, will limit all requests to - /account_name and PUTs to - /account_name/container_name. Number is in - requests per second +account_ratelimit 0 If set, will limit PUT and DELETE requests + to /account_name/container_name. + Number is in requests per second. account_whitelist '' Comma separated lists of account names that will not be rate limited. account_blacklist '' Comma separated lists of account names that will not be allowed. Returns a 497 response. container_ratelimit_size '' When set with container_limit_x = r: for containers of size x, limit requests - per second to r. Will limit GET and HEAD - requests to /account_name/container_name - and PUTs and DELETEs to - /account_name/container_name/object_name + per second to r. Will limit PUT, DELETE, + and POST requests to /a/c/o. ======================== ========= =========================================== The container rate limits are linearly interpolated from the values given. A diff --git a/etc/container-server.conf-sample b/etc/container-server.conf-sample index fb3a47835e..4c488c25c1 100644 --- a/etc/container-server.conf-sample +++ b/etc/container-server.conf-sample @@ -7,6 +7,9 @@ # swift_dir = /etc/swift # devices = /srv/node # mount_check = true +# This is a comma separated list of hosts allowed in the X-Container-Sync-To +# field for containers. +# allowed_sync_hosts = 127.0.0.1 # You can specify default log routing here if you want: # log_name = swift # log_facility = LOG_LOCAL0 @@ -60,3 +63,15 @@ use = egg:swift#container # log_level = INFO # Will audit, at most, 1 container per device per interval # interval = 1800 + +[container-sync] +# You can override the default log routing for this app here (don't use set!): +# log_name = container-sync +# log_facility = LOG_LOCAL0 +# log_level = INFO +# If you need to use an HTTP Proxy, set it here; defaults to no proxy. +# sync_proxy = http://127.0.0.1:8888 +# Will sync, at most, each container once per interval +# interval = 300 +# Maximum amount of time to spend syncing each container per pass +# container_time = 60 diff --git a/etc/proxy-server.conf-sample b/etc/proxy-server.conf-sample index ae170adfad..85f6c5b87e 100644 --- a/etc/proxy-server.conf-sample +++ b/etc/proxy-server.conf-sample @@ -65,6 +65,9 @@ use = egg:swift#tempauth # to the auth subsystem, for granting tokens, etc. # auth_prefix = /auth/ # token_life = 86400 +# This is a comma separated list of hosts allowed to send X-Container-Sync-Key +# requests. +# allowed_sync_hosts = 127.0.0.1 # Lastly, you need to list all the accounts/users you want here. The format is: # user__ = [group] [group] [...] [storage_url] # There are special groups of: diff --git a/setup.py b/setup.py index 1cf325f13e..bb40f81563 100644 --- a/setup.py +++ b/setup.py @@ -80,7 +80,7 @@ setup( 'bin/swift-account-audit', 'bin/swift-account-reaper', 'bin/swift-account-replicator', 'bin/swift-account-server', 'bin/swift-container-auditor', - 'bin/swift-container-replicator', + 'bin/swift-container-replicator', 'bin/swift-container-sync', 'bin/swift-container-server', 'bin/swift-container-updater', 'bin/swift-drive-audit', 'bin/swift-get-nodes', 'bin/swift-init', 'bin/swift-object-auditor', diff --git a/swift/common/client.py b/swift/common/client.py index 2e0727f737..ca75eb1630 100644 --- a/swift/common/client.py +++ b/swift/common/client.py @@ -137,22 +137,27 @@ class ClientException(Exception): return b and '%s: %s' % (a, b) or a -def http_connection(url): +def http_connection(url, proxy=None): """ Make an HTTPConnection or HTTPSConnection :param url: url to connect to + :param proxy: proxy to connect through, if any; None by default; str of the + format 'http://127.0.0.1:8888' to set one :returns: tuple of (parsed url, connection object) :raises ClientException: Unable to handle protocol scheme """ parsed = urlparse(url) + proxy_parsed = urlparse(proxy) if proxy else None if parsed.scheme == 'http': - conn = HTTPConnection(parsed.netloc) + conn = HTTPConnection((proxy_parsed if proxy else parsed).netloc) elif parsed.scheme == 'https': - conn = HTTPSConnection(parsed.netloc) + conn = HTTPSConnection((proxy_parsed if proxy else parsed).netloc) else: raise ClientException('Cannot handle protocol scheme %s for url %s' % (parsed.scheme, repr(url))) + if proxy: + conn._set_tunnel(parsed.hostname, parsed.port) return parsed, conn @@ -565,40 +570,60 @@ def head_object(url, token, container, name, http_conn=None): return resp_headers -def put_object(url, token, container, name, contents, content_length=None, - etag=None, chunk_size=65536, content_type=None, headers=None, - http_conn=None): +def put_object(url, token=None, container=None, name=None, contents=None, + content_length=None, etag=None, chunk_size=65536, + content_type=None, headers=None, http_conn=None, proxy=None): """ Put an object :param url: storage URL - :param token: auth token - :param container: container name that the object is in - :param name: object name to put - :param contents: a string or a file like object to read object data from + :param token: auth token; if None, no token will be sent + :param container: container name that the object is in; if None, the + container name is expected to be part of the url + :param name: object name to put; if None, the object name is expected to be + part of the url + :param contents: a string or a file like object to read object data from; + if None, a zero-byte put will be done :param content_length: value to send as content-length header; also limits - the amount read from contents - :param etag: etag of contents - :param chunk_size: chunk size of data to write - :param content_type: value to send as content-type header - :param headers: additional headers to include in the request + the amount read from contents; if None, it will be + computed via the contents or chunked transfer + encoding will be used + :param etag: etag of contents; if None, no etag will be sent + :param chunk_size: chunk size of data to write; default 65536 + :param content_type: value to send as content-type header; if None, no + content-type will be set (remote end will likely try + to auto-detect it) + :param headers: additional headers to include in the request, if any :param http_conn: HTTP connection object (If None, it will create the conn object) + :param proxy: proxy to connect through, if any; None by default; str of the + format 'http://127.0.0.1:8888' to set one :returns: etag from server response :raises ClientException: HTTP PUT request failed """ if http_conn: parsed, conn = http_conn else: - parsed, conn = http_connection(url) - path = '%s/%s/%s' % (parsed.path, quote(container), quote(name)) - if not headers: + parsed, conn = http_connection(url, proxy=proxy) + path = parsed.path + if container: + path = '%s/%s' % (path.rstrip('/'), quote(container)) + if name: + path = '%s/%s' % (path.rstrip('/'), quote(name)) + if headers: + headers = dict(headers) + else: headers = {} - headers['X-Auth-Token'] = token + if token: + headers['X-Auth-Token'] = token if etag: headers['ETag'] = etag.strip('"') if content_length is not None: headers['Content-Length'] = str(content_length) + else: + for n, v in headers.iteritems(): + if n.lower() == 'content-length': + content_length = int(v) if content_type is not None: headers['Content-Type'] = content_type if not contents: @@ -633,7 +658,7 @@ def put_object(url, token, container, name, contents, content_length=None, raise ClientException('Object PUT failed', http_scheme=parsed.scheme, http_host=conn.host, http_port=conn.port, http_path=path, http_status=resp.status, http_reason=resp.reason) - return resp.getheader('etag').strip('"') + return resp.getheader('etag', '').strip('"') def post_object(url, token, container, name, headers, http_conn=None): @@ -664,24 +689,40 @@ def post_object(url, token, container, name, headers, http_conn=None): http_status=resp.status, http_reason=resp.reason) -def delete_object(url, token, container, name, http_conn=None): +def delete_object(url, token=None, container=None, name=None, http_conn=None, + headers=None, proxy=None): """ Delete object :param url: storage URL - :param token: auth token - :param container: container name that the object is in - :param name: object name to delete + :param token: auth token; if None, no token will be sent + :param container: container name that the object is in; if None, the + container name is expected to be part of the url + :param name: object name to delete; if None, the object name is expected to + be part of the url :param http_conn: HTTP connection object (If None, it will create the conn object) + :param headers: additional headers to include in the request + :param proxy: proxy to connect through, if any; None by default; str of the + format 'http://127.0.0.1:8888' to set one :raises ClientException: HTTP DELETE request failed """ if http_conn: parsed, conn = http_conn else: - parsed, conn = http_connection(url) - path = '%s/%s/%s' % (parsed.path, quote(container), quote(name)) - conn.request('DELETE', path, '', {'X-Auth-Token': token}) + parsed, conn = http_connection(url, proxy=proxy) + path = parsed.path + if container: + path = '%s/%s' % (path.rstrip('/'), quote(container)) + if name: + path = '%s/%s' % (path.rstrip('/'), quote(name)) + if headers: + headers = dict(headers) + else: + headers = {} + if token: + headers['X-Auth-Token'] = token + conn.request('DELETE', path, '', headers) resp = conn.getresponse() resp.read() if resp.status < 200 or resp.status >= 300: diff --git a/swift/common/db.py b/swift/common/db.py index 8683d4e088..4a4c029cf7 100644 --- a/swift/common/db.py +++ b/swift/common/db.py @@ -666,7 +666,9 @@ class ContainerBroker(DatabaseBroker): id TEXT, status TEXT DEFAULT '', status_changed_at TEXT DEFAULT '0', - metadata TEXT DEFAULT '' + metadata TEXT DEFAULT '', + x_container_sync_point1 INTEGER DEFAULT -1, + x_container_sync_point2 INTEGER DEFAULT -1 ); INSERT INTO container_stat (object_count, bytes_used) @@ -886,7 +888,8 @@ class ContainerBroker(DatabaseBroker): :returns: dict with keys: account, container, created_at, put_timestamp, delete_timestamp, object_count, bytes_used, reported_put_timestamp, reported_delete_timestamp, - reported_object_count, reported_bytes_used, hash, id + reported_object_count, reported_bytes_used, hash, id, + x_container_sync_point1, and x_container_sync_point2. If include_metadata is set, metadata is included as a key pointing to a dict of tuples of the metadata """ @@ -896,35 +899,83 @@ class ContainerBroker(DatabaseBroker): if not self.stale_reads_ok: raise with self.get() as conn: - metadata = '' - if include_metadata: - metadata = ', metadata' - try: - data = conn.execute(''' - SELECT account, container, created_at, put_timestamp, - delete_timestamp, object_count, bytes_used, - reported_put_timestamp, reported_delete_timestamp, - reported_object_count, reported_bytes_used, hash, id - %s - FROM container_stat - ''' % metadata).fetchone() - except sqlite3.OperationalError, err: - if 'no such column: metadata' not in str(err): - raise - data = conn.execute(''' - SELECT account, container, created_at, put_timestamp, - delete_timestamp, object_count, bytes_used, - reported_put_timestamp, reported_delete_timestamp, - reported_object_count, reported_bytes_used, hash, id - FROM container_stat''').fetchone() + data = None + trailing1 = 'metadata' + trailing2 = 'x_container_sync_point1, x_container_sync_point2' + while not data: + try: + data = conn.execute(''' + SELECT account, container, created_at, put_timestamp, + delete_timestamp, object_count, bytes_used, + reported_put_timestamp, reported_delete_timestamp, + reported_object_count, reported_bytes_used, hash, + id, %s, %s + FROM container_stat + ''' % (trailing1, trailing2)).fetchone() + except sqlite3.OperationalError, err: + if 'no such column: metadata' in str(err): + trailing1 = "'' as metadata" + elif 'no such column: x_container_sync_point' in str(err): + trailing2 = '-1 AS x_container_sync_point1, ' \ + '-1 AS x_container_sync_point2' + else: + raise data = dict(data) if include_metadata: try: data['metadata'] = json.loads(data.get('metadata', '')) except ValueError: data['metadata'] = {} + elif 'metadata' in data: + del data['metadata'] return data + def set_x_container_sync_points(self, sync_point1, sync_point2): + with self.get() as conn: + orig_isolation_level = conn.isolation_level + try: + # We turn off auto-transactions to ensure the alter table + # commands are part of the transaction. + conn.isolation_level = None + conn.execute('BEGIN') + try: + self._set_x_container_sync_points(conn, sync_point1, + sync_point2) + except sqlite3.OperationalError, err: + if 'no such column: x_container_sync_point' not in str(err): + raise + conn.execute(''' + ALTER TABLE container_stat + ADD COLUMN x_container_sync_point1 INTEGER DEFAULT -1 + ''') + conn.execute(''' + ALTER TABLE container_stat + ADD COLUMN x_container_sync_point2 INTEGER DEFAULT -1 + ''') + self._set_x_container_sync_points(conn, sync_point1, + sync_point2) + conn.execute('COMMIT') + finally: + conn.isolation_level = orig_isolation_level + + def _set_x_container_sync_points(self, conn, sync_point1, sync_point2): + if sync_point1 is not None and sync_point2 is not None: + conn.execute(''' + UPDATE container_stat + SET x_container_sync_point1 = ?, + x_container_sync_point2 = ? + ''', (sync_point1, sync_point2)) + elif sync_point1 is not None: + conn.execute(''' + UPDATE container_stat + SET x_container_sync_point1 = ? + ''', (sync_point1,)) + elif sync_point2 is not None: + conn.execute(''' + UPDATE container_stat + SET x_container_sync_point2 = ? + ''', (sync_point2,)) + def reported(self, put_timestamp, delete_timestamp, object_count, bytes_used): """ diff --git a/swift/common/manager.py b/swift/common/manager.py index 902d188054..8ecd9b3801 100644 --- a/swift/common/manager.py +++ b/swift/common/manager.py @@ -31,9 +31,10 @@ RUN_DIR = '/var/run/swift' # auth-server has been removed from ALL_SERVERS, start it explicitly ALL_SERVERS = ['account-auditor', 'account-server', 'container-auditor', - 'container-replicator', 'container-server', 'container-updater', - 'object-auditor', 'object-server', 'object-replicator', 'object-updater', - 'proxy-server', 'account-replicator', 'account-reaper'] + 'container-replicator', 'container-server', 'container-sync', + 'container-updater', 'object-auditor', 'object-server', + 'object-replicator', 'object-updater', 'proxy-server', + 'account-replicator', 'account-reaper'] MAIN_SERVERS = ['proxy-server', 'account-server', 'container-server', 'object-server'] REST_SERVERS = [s for s in ALL_SERVERS if s not in MAIN_SERVERS] diff --git a/swift/common/middleware/domain_remap.py b/swift/common/middleware/domain_remap.py index a6ed943bb2..7846a05f29 100644 --- a/swift/common/middleware/domain_remap.py +++ b/swift/common/middleware/domain_remap.py @@ -45,6 +45,10 @@ class DomainRemapMiddleware(object): derive account and container names from elements in the domain name and put those derived values into the URL path (leaving the Host header unchanged). + + Also note that using container sync with remapped domain names is not + advised. With container sync, you should use the true storage end points as + sync destinations. """ def __init__(self, app, conf): diff --git a/swift/common/middleware/ratelimit.py b/swift/common/middleware/ratelimit.py index 836cb51bb2..18921f42d5 100644 --- a/swift/common/middleware/ratelimit.py +++ b/swift/common/middleware/ratelimit.py @@ -105,16 +105,15 @@ class RateLimitMiddleware(object): :param obj_name: object name from path """ keys = [] - if self.account_ratelimit and account_name and ( - not (container_name or obj_name) or - (container_name and not obj_name and - req_method in ('PUT', 'DELETE'))): + # COPYs are not limited + if self.account_ratelimit and \ + account_name and container_name and not obj_name and \ + req_method in ('PUT', 'DELETE'): keys.append(("ratelimit/%s" % account_name, self.account_ratelimit)) - if account_name and container_name and ( - (not obj_name and req_method in ('GET', 'HEAD')) or - (obj_name and req_method in ('PUT', 'DELETE'))): + if account_name and container_name and obj_name and \ + req_method in ('PUT', 'DELETE', 'POST'): container_size = None memcache_key = get_container_memcache_key(account_name, container_name) diff --git a/swift/common/middleware/swift3.py b/swift/common/middleware/swift3.py index df1ca3f755..95952bc3eb 100644 --- a/swift/common/middleware/swift3.py +++ b/swift/common/middleware/swift3.py @@ -128,13 +128,23 @@ def canonical_string(req): """ Canonicalize a request to a token that can be signed. """ + amz_headers = {} + buf = "%s\n%s\n%s\n" % (req.method, req.headers.get('Content-MD5', ''), req.headers.get('Content-Type') or '') - if 'Date' in req.headers: - buf += "%s\n" % req.headers['Date'] + for amz_header in sorted((key.lower() for key in req.headers if key.lower().startswith('x-amz-'))): - buf += "%s:%s\n" % (amz_header, req.headers[amz_header]) + amz_headers[amz_header] = req.headers[amz_header] + + if 'x-amz-date' in amz_headers: + buf += "\n" + elif 'Date' in req.headers: + buf += "%s\n" % req.headers['Date'] + + for k in sorted(key.lower() for key in amz_headers): + buf += "%s:%s\n" % (k, amz_headers[k]) + path = req.path_qs if '?' in path: path, args = path.split('?', 1) diff --git a/swift/common/middleware/tempauth.py b/swift/common/middleware/tempauth.py index f6d2fe8d26..4977ed5ac2 100644 --- a/swift/common/middleware/tempauth.py +++ b/swift/common/middleware/tempauth.py @@ -27,7 +27,8 @@ from webob.exc import HTTPBadRequest, HTTPForbidden, HTTPNotFound, \ HTTPUnauthorized from swift.common.middleware.acl import clean_acl, parse_acl, referrer_allowed -from swift.common.utils import cache_from_env, get_logger, split_path +from swift.common.utils import cache_from_env, get_logger, get_remote_client, \ + split_path class TempAuth(object): @@ -70,6 +71,9 @@ class TempAuth(object): if self.auth_prefix[-1] != '/': self.auth_prefix += '/' self.token_life = int(conf.get('token_life', 86400)) + self.allowed_sync_hosts = [h.strip() + for h in conf.get('allowed_sync_hosts', '127.0.0.1').split(',') + if h.strip()] self.users = {} for conf_key in conf: if conf_key.startswith('user_'): @@ -245,11 +249,20 @@ class TempAuth(object): if '.reseller_admin' in user_groups and \ account != self.reseller_prefix and \ account[len(self.reseller_prefix)] != '.': + req.environ['swift_owner'] = True return None if account in user_groups and \ (req.method not in ('DELETE', 'PUT') or container): # If the user is admin for the account and is not trying to do an # account DELETE or PUT... + req.environ['swift_owner'] = True + return None + if (req.environ.get('swift_sync_key') and + req.environ['swift_sync_key'] == + req.headers.get('x-container-sync-key', None) and + 'x-timestamp' in req.headers and + (req.remote_addr in self.allowed_sync_hosts or + get_remote_client(req) in self.allowed_sync_hosts)): return None referrers, groups = parse_acl(getattr(req, 'acl', None)) if referrer_allowed(req.referer, referrers): diff --git a/swift/common/utils.py b/swift/common/utils.py index f95fa4aa96..03c430d73f 100644 --- a/swift/common/utils.py +++ b/swift/common/utils.py @@ -972,6 +972,32 @@ def urlparse(url): return ModifiedParseResult(*stdlib_urlparse(url)) +def validate_sync_to(value, allowed_sync_hosts): + p = urlparse(value) + if p.scheme not in ('http', 'https'): + return _('Invalid scheme %r in X-Container-Sync-To, must be "http" ' + 'or "https".') % p.scheme + if not p.path: + return _('Path required in X-Container-Sync-To') + if p.params or p.query or p.fragment: + return _('Params, queries, and fragments not allowed in ' + 'X-Container-Sync-To') + if p.hostname not in allowed_sync_hosts: + return _('Invalid host %r in X-Container-Sync-To') % p.hostname + return None + + +def get_remote_client(req): + # remote host for zeus + client = req.headers.get('x-cluster-client-ip') + if not client and 'x-forwarded-for' in req.headers: + # remote host for other lbs + client = req.headers['x-forwarded-for'].split(',')[0].strip() + if not client: + client = req.remote_addr + return client + + def human_readable(value): """ Returns the number in a human readable format; for example 1048576 = "1Mi". diff --git a/swift/container/server.py b/swift/container/server.py index bc3856d18e..494e5f777f 100644 --- a/swift/container/server.py +++ b/swift/container/server.py @@ -32,7 +32,7 @@ from webob.exc import HTTPAccepted, HTTPBadRequest, HTTPConflict, \ from swift.common.db import ContainerBroker from swift.common.utils import get_logger, get_param, hash_path, \ - normalize_timestamp, storage_directory, split_path + normalize_timestamp, storage_directory, split_path, validate_sync_to from swift.common.constraints import CONTAINER_LISTING_LIMIT, \ check_mount, check_float, check_utf8 from swift.common.bufferedhttp import http_connect @@ -46,7 +46,8 @@ class ContainerController(object): """WSGI Controller for the container server.""" # Ensure these are all lowercase - save_headers = ['x-container-read', 'x-container-write'] + save_headers = ['x-container-read', 'x-container-write', + 'x-container-sync-key', 'x-container-sync-to'] def __init__(self, conf): self.logger = get_logger(conf, log_route='container-server') @@ -55,6 +56,9 @@ class ContainerController(object): ('true', 't', '1', 'on', 'yes', 'y') self.node_timeout = int(conf.get('node_timeout', 3)) self.conn_timeout = float(conf.get('conn_timeout', 0.5)) + self.allowed_sync_hosts = [h.strip() + for h in conf.get('allowed_sync_hosts', '127.0.0.1').split(',') + if h.strip()] self.replicator_rpc = ReplicatorRpc(self.root, DATADIR, ContainerBroker, self.mount_check, logger=self.logger) @@ -174,6 +178,11 @@ class ContainerController(object): not check_float(req.headers['x-timestamp']): return HTTPBadRequest(body='Missing timestamp', request=req, content_type='text/plain') + if 'x-container-sync-to' in req.headers: + err = validate_sync_to(req.headers['x-container-sync-to'], + self.allowed_sync_hosts) + if err: + return HTTPBadRequest(err) if self.mount_check and not check_mount(self.root, drive): return Response(status='507 %s is not mounted' % drive) timestamp = normalize_timestamp(req.headers['x-timestamp']) @@ -199,6 +208,11 @@ class ContainerController(object): if key.lower() in self.save_headers or key.lower().startswith('x-container-meta-')) if metadata: + if 'X-Container-Sync-To' in metadata: + if 'X-Container-Sync-To' not in broker.metadata or \ + metadata['X-Container-Sync-To'][0] != \ + broker.metadata['X-Container-Sync-To'][0]: + broker.set_x_container_sync_points(-1, -1) broker.update_metadata(metadata) resp = self.account_update(req, account, container, broker) if resp: @@ -232,7 +246,8 @@ class ContainerController(object): } headers.update((key, value) for key, (value, timestamp) in broker.metadata.iteritems() - if value != '') + if value != '' and (key.lower() in self.save_headers or + key.lower().startswith('x-container-meta-'))) return HTTPNoContent(request=req, headers=headers) def GET(self, req): @@ -259,7 +274,8 @@ class ContainerController(object): } resp_headers.update((key, value) for key, (value, timestamp) in broker.metadata.iteritems() - if value != '') + if value != '' and (key.lower() in self.save_headers or + key.lower().startswith('x-container-meta-'))) try: path = get_param(req, 'path') prefix = get_param(req, 'prefix') @@ -368,6 +384,11 @@ class ContainerController(object): not check_float(req.headers['x-timestamp']): return HTTPBadRequest(body='Missing or bad timestamp', request=req, content_type='text/plain') + if 'x-container-sync-to' in req.headers: + err = validate_sync_to(req.headers['x-container-sync-to'], + self.allowed_sync_hosts) + if err: + return HTTPBadRequest(err) if self.mount_check and not check_mount(self.root, drive): return Response(status='507 %s is not mounted' % drive) broker = self._get_container_broker(drive, part, account, container) @@ -380,6 +401,11 @@ class ContainerController(object): if key.lower() in self.save_headers or key.lower().startswith('x-container-meta-')) if metadata: + if 'X-Container-Sync-To' in metadata: + if 'X-Container-Sync-To' not in broker.metadata or \ + metadata['X-Container-Sync-To'][0] != \ + broker.metadata['X-Container-Sync-To'][0]: + broker.set_x_container_sync_points(-1, -1) broker.update_metadata(metadata) return HTTPNoContent(request=req) diff --git a/swift/container/sync.py b/swift/container/sync.py new file mode 100644 index 0000000000..22630ed75b --- /dev/null +++ b/swift/container/sync.py @@ -0,0 +1,424 @@ +# Copyright (c) 2010-2011 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +from time import ctime, time +from random import random, shuffle +from struct import unpack_from + +from eventlet import sleep + +from swift.container import server as container_server +from swift.common.client import ClientException, delete_object, put_object, \ + quote +from swift.common.direct_client import direct_get_object +from swift.common.ring import Ring +from swift.common.db import ContainerBroker +from swift.common.utils import audit_location_generator, get_logger, \ + hash_path, normalize_timestamp, TRUE_VALUES, validate_sync_to, whataremyips +from swift.common.daemon import Daemon + + +class _Iter2FileLikeObject(object): + """ + Returns an iterator's contents via :func:`read`, making it look like a file + object. + """ + + def __init__(self, iterator): + self.iterator = iterator + self._chunk = '' + + def read(self, size=-1): + """ + read([size]) -> read at most size bytes, returned as a string. + + If the size argument is negative or omitted, read until EOF is reached. + Notice that when in non-blocking mode, less data than what was + requested may be returned, even if no size parameter was given. + """ + if size < 0: + chunk = self._chunk + self._chunk = '' + return chunk + ''.join(self.iterator) + chunk = self._chunk + self._chunk = '' + if chunk and len(chunk) <= size: + return chunk + try: + chunk += self.iterator.next() + except StopIteration: + pass + if len(chunk) <= size: + return chunk + self._chunk = chunk[size:] + return chunk[:size] + + +class ContainerSync(Daemon): + """ + Daemon to sync syncable containers. + + This is done by scanning the local devices for container databases and + checking for x-container-sync-to and x-container-sync-key metadata values. + If they exist, newer rows since the last sync will trigger PUTs or DELETEs + to the other container. + + .. note:: + + Container sync will sync object POSTs only if the proxy server is set + to use "object_post_as_copy = true" which is the default. So-called + fast object posts, "object_post_as_copy = false" do not update the + container listings and therefore can't be detected for synchronization. + + The actual syncing is slightly more complicated to make use of the three + (or number-of-replicas) main nodes for a container without each trying to + do the exact same work but also without missing work if one node happens to + be down. + + Two sync points are kept per container database. All rows between the two + sync points trigger updates. Any rows newer than both sync points cause + updates depending on the node's position for the container (primary nodes + do one third, etc. depending on the replica count of course). After a sync + run, the first sync point is set to the newest ROWID known and the second + sync point is set to newest ROWID for which all updates have been sent. + + An example may help. Assume replica count is 3 and perfectly matching + ROWIDs starting at 1. + + First sync run, database has 6 rows: + + * SyncPoint1 starts as -1. + * SyncPoint2 starts as -1. + * No rows between points, so no "all updates" rows. + * Six rows newer than SyncPoint1, so a third of the rows are sent + by node 1, another third by node 2, remaining third by node 3. + * SyncPoint1 is set as 6 (the newest ROWID known). + * SyncPoint2 is left as -1 since no "all updates" rows were synced. + + Next sync run, database has 12 rows: + + * SyncPoint1 starts as 6. + * SyncPoint2 starts as -1. + * The rows between -1 and 6 all trigger updates (most of which + should short-circuit on the remote end as having already been + done). + * Six more rows newer than SyncPoint1, so a third of the rows are + sent by node 1, another third by node 2, remaining third by node + 3. + * SyncPoint1 is set as 12 (the newest ROWID known). + * SyncPoint2 is set as 6 (the newest "all updates" ROWID). + + In this way, under normal circumstances each node sends its share of + updates each run and just sends a batch of older updates to ensure nothing + was missed. + + :param conf: The dict of configuration values from the [container-sync] + section of the container-server.conf + :param container_ring: If None, the /container.ring.gz will be + loaded. This is overridden by unit tests. + :param object_ring: If None, the /object.ring.gz will be loaded. + This is overridden by unit tests. + """ + + def __init__(self, conf, container_ring=None, object_ring=None): + #: The dict of configuration values from the [container-sync] section + #: of the container-server.conf. + self.conf = conf + #: Logger to use for container-sync log lines. + self.logger = get_logger(conf, log_route='container-sync') + #: Path to the local device mount points. + self.devices = conf.get('devices', '/srv/node') + #: Indicates whether mount points should be verified as actual mount + #: points (normally true, false for tests and SAIO). + self.mount_check = \ + conf.get('mount_check', 'true').lower() in TRUE_VALUES + #: Minimum time between full scans. This is to keep the daemon from + #: running wild on near empty systems. + self.interval = int(conf.get('interval', 300)) + #: Maximum amount of time to spend syncing a container before moving on + #: to the next one. If a conatiner sync hasn't finished in this time, + #: it'll just be resumed next scan. + self.container_time = int(conf.get('container_time', 60)) + #: The list of hosts we're allowed to send syncs to. + self.allowed_sync_hosts = [h.strip() + for h in conf.get('allowed_sync_hosts', '127.0.0.1').split(',') + if h.strip()] + self.proxy = conf.get('sync_proxy') + #: Number of containers with sync turned on that were successfully + #: synced. + self.container_syncs = 0 + #: Number of successful DELETEs triggered. + self.container_deletes = 0 + #: Number of successful PUTs triggered. + self.container_puts = 0 + #: Number of containers that didn't have sync turned on. + self.container_skips = 0 + #: Number of containers that had a failure of some type. + self.container_failures = 0 + #: Time of last stats report. + self.reported = time() + swift_dir = conf.get('swift_dir', '/etc/swift') + #: swift.common.ring.Ring for locating containers. + self.container_ring = container_ring or \ + Ring(os.path.join(swift_dir, 'container.ring.gz')) + #: swift.common.ring.Ring for locating objects. + self.object_ring = object_ring or \ + Ring(os.path.join(swift_dir, 'object.ring.gz')) + self._myips = whataremyips() + self._myport = int(conf.get('bind_port', 6001)) + + def run_forever(self): + """ + Runs container sync scans until stopped. + """ + sleep(random() * self.interval) + while True: + begin = time() + all_locs = audit_location_generator(self.devices, + container_server.DATADIR, + mount_check=self.mount_check, + logger=self.logger) + for path, device, partition in all_locs: + self.container_sync(path) + if time() - self.reported >= 3600: # once an hour + self.report() + elapsed = time() - begin + if elapsed < self.interval: + sleep(self.interval - elapsed) + + def run_once(self): + """ + Runs a single container sync scan. + """ + self.logger.info(_('Begin container sync "once" mode')) + begin = time() + all_locs = audit_location_generator(self.devices, + container_server.DATADIR, + mount_check=self.mount_check, + logger=self.logger) + for path, device, partition in all_locs: + self.container_sync(path) + if time() - self.reported >= 3600: # once an hour + self.report() + self.report() + elapsed = time() - begin + self.logger.info( + _('Container sync "once" mode completed: %.02fs'), elapsed) + + def report(self): + """ + Writes a report of the stats to the logger and resets the stats for the + next report. + """ + self.logger.info( + _('Since %(time)s: %(sync)s synced [%(delete)s deletes, %(put)s ' + 'puts], %(skip)s skipped, %(fail)s failed'), + {'time': ctime(self.reported), + 'sync': self.container_syncs, + 'delete': self.container_deletes, + 'put': self.container_puts, + 'skip': self.container_skips, + 'fail': self.container_failures}) + self.reported = time() + self.container_syncs = 0 + self.container_deletes = 0 + self.container_puts = 0 + self.container_skips = 0 + self.container_failures = 0 + + def container_sync(self, path): + """ + Checks the given path for a container database, determines if syncing + is turned on for that database and, if so, sends any updates to the + other container. + + :param path: the path to a container db + """ + try: + if not path.endswith('.db'): + return + broker = ContainerBroker(path) + info = broker.get_info() + x, nodes = self.container_ring.get_nodes(info['account'], + info['container']) + for ordinal, node in enumerate(nodes): + if node['ip'] in self._myips and node['port'] == self._myport: + break + else: + return + if not broker.is_deleted(): + sync_to = None + sync_key = None + sync_point1 = info['x_container_sync_point1'] + sync_point2 = info['x_container_sync_point2'] + for key, (value, timestamp) in broker.metadata.iteritems(): + if key.lower() == 'x-container-sync-to': + sync_to = value + elif key.lower() == 'x-container-sync-key': + sync_key = value + if not sync_to or not sync_key: + self.container_skips += 1 + return + sync_to = sync_to.rstrip('/') + err = validate_sync_to(sync_to, self.allowed_sync_hosts) + if err: + self.logger.info( + _('ERROR %(db_file)s: %(validate_sync_to_err)s'), + {'db_file': broker.db_file, + 'validate_sync_to_err': err}) + self.container_failures += 1 + return + stop_at = time() + self.container_time + while time() < stop_at and sync_point2 < sync_point1: + rows = broker.get_items_since(sync_point2, 1) + if not rows: + break + row = rows[0] + if row['ROWID'] >= sync_point1: + break + key = hash_path(info['account'], info['container'], + row['name'], raw_digest=True) + # This node will only intially sync out one third of the + # objects (if 3 replicas, 1/4 if 4, etc.). This section + # will attempt to sync previously skipped rows in case the + # other nodes didn't succeed. + if unpack_from('>I', key)[0] % \ + self.container_ring.replica_count != ordinal: + if not self.container_sync_row(row, sync_to, sync_key, + broker, info): + return + sync_point2 = row['ROWID'] + broker.set_x_container_sync_points(None, sync_point2) + while time() < stop_at: + rows = broker.get_items_since(sync_point1, 1) + if not rows: + break + row = rows[0] + key = hash_path(info['account'], info['container'], + row['name'], raw_digest=True) + # This node will only intially sync out one third of the + # objects (if 3 replicas, 1/4 if 4, etc.). It'll come back + # around to the section above and attempt to sync + # previously skipped rows in case the other nodes didn't + # succeed. + if unpack_from('>I', key)[0] % \ + self.container_ring.replica_count == ordinal: + if not self.container_sync_row(row, sync_to, sync_key, + broker, info): + return + sync_point1 = row['ROWID'] + broker.set_x_container_sync_points(sync_point1, None) + self.container_syncs += 1 + except Exception, err: + self.container_failures += 1 + self.logger.exception(_('ERROR Syncing %s'), (broker.db_file)) + + def container_sync_row(self, row, sync_to, sync_key, broker, info): + """ + Sends the update the row indicates to the sync_to container. + + :param row: The updated row in the local database triggering the sync + update. + :param sync_to: The URL to the remote container. + :param sync_key: The X-Container-Sync-Key to use when sending requests + to the other container. + :param broker: The local container database broker. + :param info: The get_info result from the local container database + broker. + :returns: True on success + """ + try: + if row['deleted']: + try: + delete_object(sync_to, name=row['name'], + headers={'x-timestamp': row['created_at'], + 'x-container-sync-key': sync_key}, + proxy=self.proxy) + except ClientException, err: + if err.http_status != 404: + raise + self.container_deletes += 1 + else: + part, nodes = self.object_ring.get_nodes( + info['account'], info['container'], + row['name']) + shuffle(nodes) + exc = None + looking_for_timestamp = float(row['created_at']) + timestamp = -1 + headers = body = None + for node in nodes: + try: + these_headers, this_body = direct_get_object(node, + part, info['account'], info['container'], + row['name'], resp_chunk_size=65536) + this_timestamp = float(these_headers['x-timestamp']) + if this_timestamp > timestamp: + timestamp = this_timestamp + headers = these_headers + body = this_body + except ClientException, err: + # If any errors are not 404, make sure we report the + # non-404 one. We don't want to mistakenly assume the + # object no longer exists just because one says so and + # the others errored for some other reason. + if not exc or exc.http_status == 404: + exc = err + if timestamp < looking_for_timestamp: + if exc: + raise exc + raise Exception(_('Unknown exception trying to GET: ' + '%(node)r %(account)r %(container)r %(object)r'), + {'node': node, 'part': part, + 'account': info['account'], + 'container': info['container'], + 'object': row['name']}) + for key in ('date', 'last-modified'): + if key in headers: + del headers[key] + if 'etag' in headers: + headers['etag'] = headers['etag'].strip('"') + headers['x-timestamp'] = row['created_at'] + headers['x-container-sync-key'] = sync_key + put_object(sync_to, name=row['name'], headers=headers, + contents=_Iter2FileLikeObject(body), proxy=self.proxy) + self.container_puts += 1 + except ClientException, err: + if err.http_status == 401: + self.logger.info(_('Unauth %(sync_from)r ' + '=> %(sync_to)r'), + {'sync_from': '%s/%s' % + (quote(info['account']), quote(info['container'])), + 'sync_to': sync_to}) + elif err.http_status == 404: + self.logger.info(_('Not found %(sync_from)r ' + '=> %(sync_to)r'), + {'sync_from': '%s/%s' % + (quote(info['account']), quote(info['container'])), + 'sync_to': sync_to}) + else: + self.logger.exception( + _('ERROR Syncing %(db_file)s %(row)s'), + {'db_file': broker.db_file, 'row': row}) + self.container_failures += 1 + return False + except Exception, err: + self.logger.exception( + _('ERROR Syncing %(db_file)s %(row)s'), + {'db_file': broker.db_file, 'row': row}) + self.container_failures += 1 + return False + return True diff --git a/swift/obj/server.py b/swift/obj/server.py index 0592f24d2e..470ef4e8cc 100644 --- a/swift/obj/server.py +++ b/swift/obj/server.py @@ -500,6 +500,7 @@ class ObjectController(object): return error_response file = DiskFile(self.devices, device, partition, account, container, obj, self.logger, disk_chunk_size=self.disk_chunk_size) + orig_timestamp = file.metadata.get('X-Timestamp') upload_expiration = time.time() + self.max_upload_time etag = md5() upload_size = 0 @@ -544,13 +545,16 @@ class ObjectController(object): metadata[header_caps] = request.headers[header_key] file.put(fd, tmppath, metadata) file.unlinkold(metadata['X-Timestamp']) - self.container_update('PUT', account, container, obj, request.headers, - {'x-size': file.metadata['Content-Length'], - 'x-content-type': file.metadata['Content-Type'], - 'x-timestamp': file.metadata['X-Timestamp'], - 'x-etag': file.metadata['ETag'], - 'x-trans-id': request.headers.get('x-trans-id', '-')}, - device) + if not orig_timestamp or \ + orig_timestamp < request.headers['x-timestamp']: + self.container_update('PUT', account, container, obj, + request.headers, + {'x-size': file.metadata['Content-Length'], + 'x-content-type': file.metadata['Content-Type'], + 'x-timestamp': file.metadata['X-Timestamp'], + 'x-etag': file.metadata['ETag'], + 'x-trans-id': request.headers.get('x-trans-id', '-')}, + device) resp = HTTPCreated(request=request, etag=etag) return resp @@ -657,6 +661,8 @@ class ObjectController(object): response.headers[key] = value response.etag = file.metadata['ETag'] response.last_modified = float(file.metadata['X-Timestamp']) + # Needed for container sync feature + response.headers['X-Timestamp'] = file.metadata['X-Timestamp'] response.content_length = file_size if 'Content-Encoding' in file.metadata: response.content_encoding = file.metadata['Content-Encoding'] @@ -680,6 +686,7 @@ class ObjectController(object): response_class = HTTPNoContent file = DiskFile(self.devices, device, partition, account, container, obj, self.logger, disk_chunk_size=self.disk_chunk_size) + orig_timestamp = file.metadata.get('X-Timestamp') if file.is_deleted(): response_class = HTTPNotFound metadata = { @@ -688,10 +695,12 @@ class ObjectController(object): with file.mkstemp() as (fd, tmppath): file.put(fd, tmppath, metadata, extension='.ts') file.unlinkold(metadata['X-Timestamp']) - self.container_update('DELETE', account, container, obj, - request.headers, {'x-timestamp': metadata['X-Timestamp'], - 'x-trans-id': request.headers.get('x-trans-id', '-')}, - device) + if not orig_timestamp or \ + orig_timestamp < request.headers['x-timestamp']: + self.container_update('DELETE', account, container, obj, + request.headers, {'x-timestamp': metadata['X-Timestamp'], + 'x-trans-id': request.headers.get('x-trans-id', '-')}, + device) resp = response_class(request=request) return resp diff --git a/swift/proxy/server.py b/swift/proxy/server.py index 507ab9213f..44e8b8e6bb 100644 --- a/swift/proxy/server.py +++ b/swift/proxy/server.py @@ -42,7 +42,7 @@ from webob import Request, Response from swift.common.ring import Ring from swift.common.utils import cache_from_env, ContextPool, get_logger, \ - normalize_timestamp, split_path, TRUE_VALUES + get_remote_client, normalize_timestamp, split_path, TRUE_VALUES from swift.common.bufferedhttp import http_connect from swift.common.constraints import check_metadata, check_object_creation, \ check_utf8, CONTAINER_LISTING_LIMIT, MAX_ACCOUNT_NAME_LENGTH, \ @@ -418,8 +418,8 @@ class Controller(object): :param account: account name for the container :param container: container name to look up :returns: tuple of (container partition, container nodes, container - read acl, container write acl) or (None, None, None, None) if - the container does not exist + read acl, container write acl, container sync key) or (None, + None, None, None, None) if the container does not exist """ partition, nodes = self.app.container_ring.get_nodes( account, container) @@ -431,15 +431,17 @@ class Controller(object): status = cache_value['status'] read_acl = cache_value['read_acl'] write_acl = cache_value['write_acl'] + sync_key = cache_value.get('sync_key') if status == 200: - return partition, nodes, read_acl, write_acl + return partition, nodes, read_acl, write_acl, sync_key elif status == 404: - return None, None, None, None + return None, None, None, None, None if not self.account_info(account, autocreate=account_autocreate)[1]: - return None, None, None, None + return None, None, None, None, None result_code = 0 read_acl = None write_acl = None + sync_key = None container_size = None attempts_left = self.app.container_ring.replica_count headers = {'x-trans-id': self.trans_id} @@ -455,6 +457,7 @@ class Controller(object): result_code = 200 read_acl = resp.getheader('x-container-read') write_acl = resp.getheader('x-container-write') + sync_key = resp.getheader('x-container-sync-key') container_size = \ resp.getheader('X-Container-Object-Count') break @@ -483,11 +486,12 @@ class Controller(object): {'status': result_code, 'read_acl': read_acl, 'write_acl': write_acl, + 'sync_key': sync_key, 'container_size': container_size}, timeout=cache_timeout) if result_code == 200: - return partition, nodes, read_acl, write_acl - return None, None, None, None + return partition, nodes, read_acl, write_acl, sync_key + return None, None, None, None, None def iter_nodes(self, partition, nodes, ring): """ @@ -684,6 +688,9 @@ class Controller(object): raise res.app_iter = file_iter() update_headers(res, source.getheaders()) + # Used by container sync feature + res.environ['swift_x_timestamp'] = \ + source.getheader('x-timestamp') update_headers(res, {'accept-ranges': 'bytes'}) res.status = source.status res.content_length = source.getheader('Content-Length') @@ -694,6 +701,9 @@ class Controller(object): elif 200 <= source.status <= 399: res = status_map[source.status](request=req) update_headers(res, source.getheaders()) + # Used by container sync feature + res.environ['swift_x_timestamp'] = \ + source.getheader('x-timestamp') update_headers(res, {'accept-ranges': 'bytes'}) if req.method == 'HEAD': res.content_length = source.getheader('Content-Length') @@ -902,7 +912,7 @@ class ObjectController(Controller): error_response = check_metadata(req, 'object') if error_response: return error_response - container_partition, containers, _junk, req.acl = \ + container_partition, containers, _junk, req.acl, _junk = \ self.container_info(self.account_name, self.container_name, account_autocreate=self.app.account_autocreate) if 'swift.authorize' in req.environ: @@ -960,7 +970,8 @@ class ObjectController(Controller): @delay_denial def PUT(self, req): """HTTP PUT request handler.""" - container_partition, containers, _junk, req.acl = \ + (container_partition, containers, _junk, req.acl, + req.environ['swift_sync_key']) = \ self.container_info(self.account_name, self.container_name, account_autocreate=self.app.account_autocreate) if 'swift.authorize' in req.environ: @@ -971,7 +982,27 @@ class ObjectController(Controller): return HTTPNotFound(request=req) partition, nodes = self.app.object_ring.get_nodes( self.account_name, self.container_name, self.object_name) - req.headers['X-Timestamp'] = normalize_timestamp(time.time()) + # Used by container sync feature + if 'x-timestamp' in req.headers: + try: + req.headers['X-Timestamp'] = \ + normalize_timestamp(float(req.headers['x-timestamp'])) + # For container sync PUTs, do a HEAD to see if we can + # shortcircuit + hreq = Request.blank(req.path_info, + environ={'REQUEST_METHOD': 'HEAD'}) + self.GETorHEAD_base(hreq, _('Object'), partition, nodes, + hreq.path_info, self.app.object_ring.replica_count) + if 'swift_x_timestamp' in hreq.environ and \ + float(hreq.environ['swift_x_timestamp']) >= \ + float(req.headers['x-timestamp']): + return HTTPAccepted(request=req) + except ValueError: + return HTTPBadRequest(request=req, content_type='text/plain', + body='X-Timestamp should be a UNIX timestamp float value; ' + 'was %r' % req.headers['x-timestamp']) + else: + req.headers['X-Timestamp'] = normalize_timestamp(time.time()) # Sometimes the 'content-type' header exists, but is set to None. content_type_manually_set = True if not req.headers.get('content-type'): @@ -1151,7 +1182,8 @@ class ObjectController(Controller): @delay_denial def DELETE(self, req): """HTTP DELETE request handler.""" - container_partition, containers, _junk, req.acl = \ + (container_partition, containers, _junk, req.acl, + req.environ['swift_sync_key']) = \ self.container_info(self.account_name, self.container_name) if 'swift.authorize' in req.environ: aresp = req.environ['swift.authorize'](req) @@ -1161,7 +1193,17 @@ class ObjectController(Controller): return HTTPNotFound(request=req) partition, nodes = self.app.object_ring.get_nodes( self.account_name, self.container_name, self.object_name) - req.headers['X-Timestamp'] = normalize_timestamp(time.time()) + # Used by container sync feature + if 'x-timestamp' in req.headers: + try: + req.headers['X-Timestamp'] = \ + normalize_timestamp(float(req.headers['x-timestamp'])) + except ValueError: + return HTTPBadRequest(request=req, content_type='text/plain', + body='X-Timestamp should be a UNIX timestamp float value; ' + 'was %r' % req.headers['x-timestamp']) + else: + req.headers['X-Timestamp'] = normalize_timestamp(time.time()) headers = [] for container in containers: nheaders = dict(req.headers.iteritems()) @@ -1207,7 +1249,8 @@ class ContainerController(Controller): server_type = _('Container') # Ensure these are all lowercase - pass_through_headers = ['x-container-read', 'x-container-write'] + pass_through_headers = ['x-container-read', 'x-container-write', + 'x-container-sync-key', 'x-container-sync-to'] def __init__(self, app, account_name, container_name, **kwargs): Controller.__init__(self, app) @@ -1244,6 +1287,7 @@ class ContainerController(Controller): {'status': resp.status_int, 'read_acl': resp.headers.get('x-container-read'), 'write_acl': resp.headers.get('x-container-write'), + 'sync_key': resp.headers.get('x-container-sync-key'), 'container_size': resp.headers.get('x-container-object-count')}, timeout=self.app.recheck_container_existence) @@ -1252,6 +1296,11 @@ class ContainerController(Controller): aresp = req.environ['swift.authorize'](req) if aresp: return aresp + if not req.environ.get('swift_owner', False): + for key in ('x-container-read', 'x-container-write', + 'x-container-sync-key', 'x-container-sync-to'): + if key in resp.headers: + del resp.headers[key] return resp @public @@ -1646,13 +1695,7 @@ class Application(BaseApplication): the_request = quote(unquote(req.path)) if req.query_string: the_request = the_request + '?' + req.query_string - # remote user for zeus - client = req.headers.get('x-cluster-client-ip') - if not client and 'x-forwarded-for' in req.headers: - # remote user for other lbs - client = req.headers['x-forwarded-for'].split(',')[0].strip() - if not client: - client = req.remote_addr + client = get_remote_client(req) logged_headers = None if self.log_headers: logged_headers = '\n'.join('%s: %s' % (k, v) diff --git a/test/probe/common.py b/test/probe/common.py index b7a86287b4..1d01007c72 100644 --- a/test/probe/common.py +++ b/test/probe/common.py @@ -61,8 +61,8 @@ def reset_environment(): print 'Giving up after %s retries.' % attempt raise err print err - print 'Retrying in 1 second...' - sleep(1) + print 'Retrying in 2 seconds...' + sleep(2) except BaseException, err: kill_pids(pids) raise err diff --git a/test/unit/common/middleware/test_ratelimit.py b/test/unit/common/middleware/test_ratelimit.py index 97d9c92525..940c229a56 100644 --- a/test/unit/common/middleware/test_ratelimit.py +++ b/test/unit/common/middleware/test_ratelimit.py @@ -139,13 +139,16 @@ def mock_time(): class TestRateLimit(unittest.TestCase): - def setUp(self): + def _reset_time(self): global time_ticker time_ticker = 0 + + def setUp(self): self.was_sleep = eventlet.sleep eventlet.sleep = mock_sleep self.was_time = time.time time.time = mock_time + self._reset_time() def tearDown(self): eventlet.sleep = self.was_sleep @@ -186,31 +189,34 @@ class TestRateLimit(unittest.TestCase): logger=FakeLogger()) the_app.memcache_client = fake_memcache self.assertEquals(len(the_app.get_ratelimitable_key_tuples( - 'GET', 'a', None, None)), 1) - self.assertEquals(len(the_app.get_ratelimitable_key_tuples( - 'POST', 'a', 'c', None)), 0) + 'DELETE', 'a', None, None)), 0) self.assertEquals(len(the_app.get_ratelimitable_key_tuples( 'PUT', 'a', 'c', None)), 1) self.assertEquals(len(the_app.get_ratelimitable_key_tuples( - 'GET', 'a', 'c', None)), 1) + 'DELETE', 'a', 'c', None)), 1) self.assertEquals(len(the_app.get_ratelimitable_key_tuples( 'GET', 'a', 'c', 'o')), 0) self.assertEquals(len(the_app.get_ratelimitable_key_tuples( 'PUT', 'a', 'c', 'o')), 1) - def test_ratelimit(self): + def test_account_ratelimit(self): current_rate = 5 num_calls = 50 conf_dict = {'account_ratelimit': current_rate} self.test_ratelimit = ratelimit.filter_factory(conf_dict)(FakeApp()) ratelimit.http_connect = mock_http_connect(204) - req = Request.blank('/v/a') - req.environ['swift.cache'] = FakeMemcache() - make_app_call = lambda: self.test_ratelimit(req.environ, - start_response) - begin = time.time() - self._run(make_app_call, num_calls, current_rate) - self.assertEquals(round(time.time() - begin, 1), 9.8) + for meth, exp_time in [('DELETE', 9.8), ('GET', 0), + ('POST', 0), ('PUT', 9.8)]: + req = Request.blank('/v/a%s/c' % meth) + req.method = meth + req.environ['swift.cache'] = FakeMemcache() + make_app_call = lambda: self.test_ratelimit(req.environ, + start_response) + begin = time.time() + self._run(make_app_call, num_calls, current_rate, + check_time=bool(exp_time)) + self.assertEquals(round(time.time() - begin, 1), exp_time) + self._reset_time() def test_ratelimit_set_incr(self): current_rate = 5 @@ -218,7 +224,8 @@ class TestRateLimit(unittest.TestCase): conf_dict = {'account_ratelimit': current_rate} self.test_ratelimit = ratelimit.filter_factory(conf_dict)(FakeApp()) ratelimit.http_connect = mock_http_connect(204) - req = Request.blank('/v/a') + req = Request.blank('/v/a/c') + req.method = 'PUT' req.environ['swift.cache'] = FakeMemcache() req.environ['swift.cache'].init_incr_return_neg = True make_app_call = lambda: self.test_ratelimit(req.environ, @@ -306,7 +313,8 @@ class TestRateLimit(unittest.TestCase): self.test_ratelimit = dummy_filter_factory(conf_dict)(FakeApp()) ratelimit.http_connect = mock_http_connect(204) self.test_ratelimit.log_sleep_time_seconds = .00001 - req = Request.blank('/v/a') + req = Request.blank('/v/a/c') + req.method = 'PUT' req.environ['swift.cache'] = FakeMemcache() time_override = [0, 0, 0, 0, None] @@ -335,7 +343,7 @@ class TestRateLimit(unittest.TestCase): logger=FakeLogger()) the_app.memcache_client = fake_memcache req = lambda: None - req.method = 'GET' + req.method = 'PUT' class rate_caller(Thread): @@ -346,7 +354,7 @@ class TestRateLimit(unittest.TestCase): def run(self): for j in range(num_calls): self.result = the_app.handle_ratelimit(req, self.myname, - None, None) + 'c', None) nt = 15 begin = time.time() @@ -361,45 +369,6 @@ class TestRateLimit(unittest.TestCase): time_took = time.time() - begin self.assertEquals(1.5, round(time_took, 1)) - def test_ratelimit_acc_vrs_container(self): - conf_dict = {'clock_accuracy': 1000, - 'account_ratelimit': 10, - 'max_sleep_time_seconds': 4, - 'container_ratelimit_10': 6, - 'container_ratelimit_50': 2, - 'container_ratelimit_75': 1} - self.test_ratelimit = dummy_filter_factory(conf_dict)(FakeApp()) - ratelimit.http_connect = mock_http_connect(204) - req = Request.blank('/v/a/c') - req.environ['swift.cache'] = FakeMemcache() - cont_key = get_container_memcache_key('a', 'c') - - class rate_caller(Thread): - - def __init__(self, parent, name): - Thread.__init__(self) - self.parent = parent - self.name = name - - def run(self): - self.result = self.parent.test_ratelimit(req.environ, - start_response) - - def runthreads(threads, nt): - for i in range(nt): - rc = rate_caller(self, "thread %s" % i) - rc.start() - threads.append(rc) - for thread in threads: - thread.join() - begin = time.time() - req.environ['swift.cache'].set(cont_key, {'container_size': 20}) - begin = time.time() - threads = [] - runthreads(threads, 3) - time_took = time.time() - begin - self.assertEquals(round(time_took, 1), .4) - def test_call_invalid_path(self): env = {'REQUEST_METHOD': 'GET', 'SCRIPT_NAME': '', @@ -441,7 +410,8 @@ class TestRateLimit(unittest.TestCase): conf_dict = {'account_ratelimit': current_rate} self.test_ratelimit = ratelimit.filter_factory(conf_dict)(FakeApp()) ratelimit.http_connect = mock_http_connect(204) - req = Request.blank('/v/a') + req = Request.blank('/v/a/c') + req.method = 'PUT' req.environ['swift.cache'] = FakeMemcache() req.environ['swift.cache'].error_on_incr = True make_app_call = lambda: self.test_ratelimit(req.environ, diff --git a/test/unit/common/middleware/test_swift3.py b/test/unit/common/middleware/test_swift3.py index bca9badacb..2579cce83d 100644 --- a/test/unit/common/middleware/test_swift3.py +++ b/test/unit/common/middleware/test_swift3.py @@ -573,6 +573,15 @@ class TestSwift3(unittest.TestCase): verify('7506d97002c7d2de922cc0ec34af8846', '/bucket/object', {'Content-Type': None, 'X-Amz-Something': 'test'}) + verify('28f76d6162444a193b612cd6cb20e0be', '/bucket/object', + {'Content-Type': None, + 'X-Amz-Date': 'Mon, 11 Jul 2011 10:52:57 +0000', + 'Date': 'Tue, 12 Jul 2011 10:52:57 +0000'}) + + verify('ed6971e3eca5af4ee361f05d7c272e49', '/bucket/object', + {'Content-Type': None, + 'Date': 'Tue, 12 Jul 2011 10:52:57 +0000'}) + req1 = Request.blank('/', headers= {'Content-Type': None, 'X-Amz-Something': 'test'}) req2 = Request.blank('/', headers= diff --git a/test/unit/common/middleware/test_tempauth.py b/test/unit/common/middleware/test_tempauth.py index 02519c7ce2..df9a51cf8a 100644 --- a/test/unit/common/middleware/test_tempauth.py +++ b/test/unit/common/middleware/test_tempauth.py @@ -56,15 +56,21 @@ class FakeMemcache(object): class FakeApp(object): - def __init__(self, status_headers_body_iter=None): + def __init__(self, status_headers_body_iter=None, acl=None, sync_key=None): self.calls = 0 self.status_headers_body_iter = status_headers_body_iter if not self.status_headers_body_iter: self.status_headers_body_iter = iter([('404 Not Found', {}, '')]) + self.acl = acl + self.sync_key = sync_key def __call__(self, env, start_response): self.calls += 1 self.request = Request.blank('', environ=env) + if self.acl: + self.request.acl = self.acl + if self.sync_key: + self.request.environ['swift_sync_key'] = self.sync_key if 'swift.authorize' in env: resp = env['swift.authorize'](self.request) if resp: @@ -138,7 +144,8 @@ class TestAuth(unittest.TestCase): self.assertEquals(resp.status_int, 404) def test_anon(self): - resp = self._make_request('/v1/AUTH_account').get_response(self.test_auth) + resp = \ + self._make_request('/v1/AUTH_account').get_response(self.test_auth) self.assertEquals(resp.status_int, 401) self.assertEquals(resp.environ['swift.authorize'], self.test_auth.authorize) @@ -289,30 +296,35 @@ class TestAuth(unittest.TestCase): self.assertEquals(self.test_auth.authorize(req), None) def test_account_put_permissions(self): - req = self._make_request('/v1/AUTH_new', environ={'REQUEST_METHOD': 'PUT'}) + req = self._make_request('/v1/AUTH_new', + environ={'REQUEST_METHOD': 'PUT'}) req.remote_user = 'act:usr,act' resp = self.test_auth.authorize(req) self.assertEquals(resp.status_int, 403) - req = self._make_request('/v1/AUTH_new', environ={'REQUEST_METHOD': 'PUT'}) + req = self._make_request('/v1/AUTH_new', + environ={'REQUEST_METHOD': 'PUT'}) req.remote_user = 'act:usr,act,AUTH_other' resp = self.test_auth.authorize(req) self.assertEquals(resp.status_int, 403) # Even PUTs to your own account as account admin should fail - req = self._make_request('/v1/AUTH_old', environ={'REQUEST_METHOD': 'PUT'}) + req = self._make_request('/v1/AUTH_old', + environ={'REQUEST_METHOD': 'PUT'}) req.remote_user = 'act:usr,act,AUTH_old' resp = self.test_auth.authorize(req) self.assertEquals(resp.status_int, 403) - req = self._make_request('/v1/AUTH_new', environ={'REQUEST_METHOD': 'PUT'}) + req = self._make_request('/v1/AUTH_new', + environ={'REQUEST_METHOD': 'PUT'}) req.remote_user = 'act:usr,act,.reseller_admin' resp = self.test_auth.authorize(req) self.assertEquals(resp, None) # .super_admin is not something the middleware should ever see or care # about - req = self._make_request('/v1/AUTH_new', environ={'REQUEST_METHOD': 'PUT'}) + req = self._make_request('/v1/AUTH_new', + environ={'REQUEST_METHOD': 'PUT'}) req.remote_user = 'act:usr,act,.super_admin' resp = self.test_auth.authorize(req) self.assertEquals(resp.status_int, 403) @@ -383,6 +395,152 @@ class TestAuth(unittest.TestCase): headers={'X-Auth-User': 'act:usr'}).get_response(self.test_auth) self.assertEquals(resp.status_int, 401) + def test_allowed_sync_hosts(self): + a = auth.filter_factory({'super_admin_key': 'supertest'})(FakeApp()) + self.assertEquals(a.allowed_sync_hosts, ['127.0.0.1']) + a = auth.filter_factory({'super_admin_key': 'supertest', + 'allowed_sync_hosts': + '1.1.1.1,2.1.1.1, 3.1.1.1 , 4.1.1.1,, , 5.1.1.1'})(FakeApp()) + self.assertEquals(a.allowed_sync_hosts, + ['1.1.1.1', '2.1.1.1', '3.1.1.1', '4.1.1.1', '5.1.1.1']) + + def test_reseller_admin_is_owner(self): + orig_authorize = self.test_auth.authorize + owner_values = [] + + def mitm_authorize(req): + rv = orig_authorize(req) + owner_values.append(req.environ.get('swift_owner', False)) + return rv + + self.test_auth.authorize = mitm_authorize + + req = self._make_request('/v1/AUTH_cfa', + headers={'X-Auth-Token': 'AUTH_t'}) + req.remote_user = '.reseller_admin' + self.test_auth.authorize(req) + self.assertEquals(owner_values, [True]) + + def test_admin_is_owner(self): + orig_authorize = self.test_auth.authorize + owner_values = [] + + def mitm_authorize(req): + rv = orig_authorize(req) + owner_values.append(req.environ.get('swift_owner', False)) + return rv + + self.test_auth.authorize = mitm_authorize + + req = self._make_request('/v1/AUTH_cfa', + headers={'X-Auth-Token': 'AUTH_t'}) + req.remote_user = 'AUTH_cfa' + self.test_auth.authorize(req) + self.assertEquals(owner_values, [True]) + + def test_regular_is_not_owner(self): + orig_authorize = self.test_auth.authorize + owner_values = [] + + def mitm_authorize(req): + rv = orig_authorize(req) + owner_values.append(req.environ.get('swift_owner', False)) + return rv + + self.test_auth.authorize = mitm_authorize + + req = self._make_request('/v1/AUTH_cfa/c', + headers={'X-Auth-Token': 'AUTH_t'}) + req.remote_user = 'act:usr' + self.test_auth.authorize(req) + self.assertEquals(owner_values, [False]) + + def test_sync_request_success(self): + self.test_auth.app = FakeApp(iter([('204 No Content', {}, '')]), + sync_key='secret') + req = self._make_request('/v1/AUTH_cfa/c/o', + environ={'REQUEST_METHOD': 'DELETE'}, + headers={'x-container-sync-key': 'secret', + 'x-timestamp': '123.456'}) + req.remote_addr = '127.0.0.1' + resp = req.get_response(self.test_auth) + self.assertEquals(resp.status_int, 204) + + def test_sync_request_fail_key(self): + self.test_auth.app = FakeApp(iter([('204 No Content', {}, '')]), + sync_key='secret') + req = self._make_request('/v1/AUTH_cfa/c/o', + environ={'REQUEST_METHOD': 'DELETE'}, + headers={'x-container-sync-key': 'wrongsecret', + 'x-timestamp': '123.456'}) + req.remote_addr = '127.0.0.1' + resp = req.get_response(self.test_auth) + self.assertEquals(resp.status_int, 401) + + self.test_auth.app = FakeApp(iter([('204 No Content', {}, '')]), + sync_key='othersecret') + req = self._make_request('/v1/AUTH_cfa/c/o', + environ={'REQUEST_METHOD': 'DELETE'}, + headers={'x-container-sync-key': 'secret', + 'x-timestamp': '123.456'}) + req.remote_addr = '127.0.0.1' + resp = req.get_response(self.test_auth) + self.assertEquals(resp.status_int, 401) + + self.test_auth.app = FakeApp(iter([('204 No Content', {}, '')]), + sync_key=None) + req = self._make_request('/v1/AUTH_cfa/c/o', + environ={'REQUEST_METHOD': 'DELETE'}, + headers={'x-container-sync-key': 'secret', + 'x-timestamp': '123.456'}) + req.remote_addr = '127.0.0.1' + resp = req.get_response(self.test_auth) + self.assertEquals(resp.status_int, 401) + + def test_sync_request_fail_no_timestamp(self): + self.test_auth.app = FakeApp(iter([('204 No Content', {}, '')]), + sync_key='secret') + req = self._make_request('/v1/AUTH_cfa/c/o', + environ={'REQUEST_METHOD': 'DELETE'}, + headers={'x-container-sync-key': 'secret'}) + req.remote_addr = '127.0.0.1' + resp = req.get_response(self.test_auth) + self.assertEquals(resp.status_int, 401) + + def test_sync_request_fail_sync_host(self): + self.test_auth.app = FakeApp(iter([('204 No Content', {}, '')]), + sync_key='secret') + req = self._make_request('/v1/AUTH_cfa/c/o', + environ={'REQUEST_METHOD': 'DELETE'}, + headers={'x-container-sync-key': 'secret', + 'x-timestamp': '123.456'}) + req.remote_addr = '127.0.0.2' + resp = req.get_response(self.test_auth) + self.assertEquals(resp.status_int, 401) + + def test_sync_request_success_lb_sync_host(self): + self.test_auth.app = FakeApp(iter([('204 No Content', {}, '')]), + sync_key='secret') + req = self._make_request('/v1/AUTH_cfa/c/o', + environ={'REQUEST_METHOD': 'DELETE'}, + headers={'x-container-sync-key': 'secret', + 'x-timestamp': '123.456', + 'x-forwarded-for': '127.0.0.1'}) + req.remote_addr = '127.0.0.2' + resp = req.get_response(self.test_auth) + self.assertEquals(resp.status_int, 204) + + self.test_auth.app = FakeApp(iter([('204 No Content', {}, '')]), + sync_key='secret') + req = self._make_request('/v1/AUTH_cfa/c/o', + environ={'REQUEST_METHOD': 'DELETE'}, + headers={'x-container-sync-key': 'secret', + 'x-timestamp': '123.456', + 'x-cluster-client-ip': '127.0.0.1'}) + req.remote_addr = '127.0.0.2' + resp = req.get_response(self.test_auth) + self.assertEquals(resp.status_int, 204) + class TestParseUserCreation(unittest.TestCase): def test_parse_user_creation(self): diff --git a/test/unit/common/test_client.py b/test/unit/common/test_client.py index e6e1abb1dc..29d1e5ff5d 100644 --- a/test/unit/common/test_client.py +++ b/test/unit/common/test_client.py @@ -155,8 +155,8 @@ class MockHttpTest(unittest.TestCase): def fake_http_connection(*args, **kwargs): _orig_http_connection = c.http_connection - def wrapper(url): - parsed, _conn = _orig_http_connection(url) + def wrapper(url, proxy=None): + parsed, _conn = _orig_http_connection(url, proxy=proxy) conn = fake_http_connect(*args, **kwargs)() def request(*args, **kwargs): @@ -430,7 +430,7 @@ class TestConnection(MockHttpTest): def read(self, *args, **kwargs): return '' - def local_http_connection(url): + def local_http_connection(url, proxy=None): parsed = urlparse(url) return parsed, LocalConnection() diff --git a/test/unit/common/test_db.py b/test/unit/common/test_db.py index e63397c954..1c5bf2e65a 100644 --- a/test/unit/common/test_db.py +++ b/test/unit/common/test_db.py @@ -893,6 +893,23 @@ class TestContainerBroker(unittest.TestCase): self.assertEquals(info['object_count'], 0) self.assertEquals(info['bytes_used'], 0) + info = broker.get_info() + self.assertEquals(info['x_container_sync_point1'], -1) + self.assertEquals(info['x_container_sync_point2'], -1) + + def test_set_x_syncs(self): + broker = ContainerBroker(':memory:', account='test1', container='test2') + broker.initialize(normalize_timestamp('1')) + + info = broker.get_info() + self.assertEquals(info['x_container_sync_point1'], -1) + self.assertEquals(info['x_container_sync_point2'], -1) + + broker.set_x_container_sync_points(1, 2) + info = broker.get_info() + self.assertEquals(info['x_container_sync_point1'], 1) + self.assertEquals(info['x_container_sync_point2'], 2) + def test_get_report_info(self): broker = ContainerBroker(':memory:', account='test1', container='test2') broker.initialize(normalize_timestamp('1')) @@ -1352,6 +1369,81 @@ class TestContainerBrokerBeforeMetadata(TestContainerBroker): conn.execute('SELECT metadata FROM container_stat') +def prexsync_create_container_stat_table(self, conn, put_timestamp=None): + """ + Copied from swift.common.db.ContainerBroker before the + x_container_sync_point[12] columns were added; used for testing with + TestContainerBrokerBeforeXSync. + + Create the container_stat table which is specifc to the container DB. + + :param conn: DB connection object + :param put_timestamp: put timestamp + """ + if put_timestamp is None: + put_timestamp = normalize_timestamp(0) + conn.executescript(""" + CREATE TABLE container_stat ( + account TEXT, + container TEXT, + created_at TEXT, + put_timestamp TEXT DEFAULT '0', + delete_timestamp TEXT DEFAULT '0', + object_count INTEGER, + bytes_used INTEGER, + reported_put_timestamp TEXT DEFAULT '0', + reported_delete_timestamp TEXT DEFAULT '0', + reported_object_count INTEGER DEFAULT 0, + reported_bytes_used INTEGER DEFAULT 0, + hash TEXT default '00000000000000000000000000000000', + id TEXT, + status TEXT DEFAULT '', + status_changed_at TEXT DEFAULT '0', + metadata TEXT DEFAULT '' + ); + + INSERT INTO container_stat (object_count, bytes_used) + VALUES (0, 0); + """) + conn.execute(''' + UPDATE container_stat + SET account = ?, container = ?, created_at = ?, id = ?, + put_timestamp = ? + ''', (self.account, self.container, normalize_timestamp(time()), + str(uuid4()), put_timestamp)) + + +class TestContainerBrokerBeforeXSync(TestContainerBroker): + """ + Tests for swift.common.db.ContainerBroker against databases created before + the x_container_sync_point[12] columns were added. + """ + + def setUp(self): + self._imported_create_container_stat_table = \ + ContainerBroker.create_container_stat_table + ContainerBroker.create_container_stat_table = \ + prexsync_create_container_stat_table + broker = ContainerBroker(':memory:', account='a', container='c') + broker.initialize(normalize_timestamp('1')) + exc = None + with broker.get() as conn: + try: + conn.execute('''SELECT x_container_sync_point1 + FROM container_stat''') + except BaseException, err: + exc = err + self.assert_('no such column: x_container_sync_point1' in str(exc)) + + def tearDown(self): + ContainerBroker.create_container_stat_table = \ + self._imported_create_container_stat_table + broker = ContainerBroker(':memory:', account='a', container='c') + broker.initialize(normalize_timestamp('1')) + with broker.get() as conn: + conn.execute('SELECT x_container_sync_point1 FROM container_stat') + + class TestAccountBroker(unittest.TestCase): """ Tests for swift.common.db.AccountBroker """ diff --git a/test/unit/common/test_utils.py b/test/unit/common/test_utils.py index 67a6a1fbf2..aeaf49092e 100644 --- a/test/unit/common/test_utils.py +++ b/test/unit/common/test_utils.py @@ -768,6 +768,26 @@ log_name = yarr''' self.assertEquals(utils.human_readable(1237940039285380274899124224), '1024Yi') + def test_validate_sync_to(self): + for goodurl in ('http://1.1.1.1/v1/a/c/o', + 'http://1.1.1.1:8080/a/c/o', + 'http://2.2.2.2/a/c/o', + 'https://1.1.1.1/v1/a/c/o'): + self.assertEquals(utils.validate_sync_to(goodurl, + ['1.1.1.1', '2.2.2.2']), + None) + for badurl in ('http://1.1.1.1', + 'httpq://1.1.1.1/v1/a/c/o', + 'http://1.1.1.1/v1/a/c/o?query', + 'http://1.1.1.1/v1/a/c/o#frag', + 'http://1.1.1.1/v1/a/c/o?query#frag', + 'http://1.1.1.1/v1/a/c/o?query=param', + 'http://1.1.1.1/v1/a/c/o?query=param#frag', + 'http://1.1.1.2/v1/a/c/o'): + self.assertNotEquals(utils.validate_sync_to(badurl, + ['1.1.1.1', '2.2.2.2']), + None) + def test_TRUE_VALUES(self): for v in utils.TRUE_VALUES: self.assertEquals(v, v.lower()) diff --git a/test/unit/container/test_server.py b/test/unit/container/test_server.py index 117a0dccd5..d9856b3adc 100644 --- a/test/unit/container/test_server.py +++ b/test/unit/container/test_server.py @@ -324,6 +324,76 @@ class TestContainerController(unittest.TestCase): raise Exception(err) self.assert_(not got_exc) + def test_PUT_reset_container_sync(self): + req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'PUT'}, + headers={'x-timestamp': '1', + 'x-container-sync-to': 'http://127.0.0.1:12345/v1/a/c'}) + resp = self.controller.PUT(req) + self.assertEquals(resp.status_int, 201) + db = self.controller._get_container_broker('sda1', 'p', 'a', 'c') + info = db.get_info() + self.assertEquals(info['x_container_sync_point1'], -1) + self.assertEquals(info['x_container_sync_point2'], -1) + db.set_x_container_sync_points(123, 456) + info = db.get_info() + self.assertEquals(info['x_container_sync_point1'], 123) + self.assertEquals(info['x_container_sync_point2'], 456) + # Set to same value + req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'PUT'}, + headers={'x-timestamp': '1', + 'x-container-sync-to': 'http://127.0.0.1:12345/v1/a/c'}) + resp = self.controller.PUT(req) + self.assertEquals(resp.status_int, 202) + db = self.controller._get_container_broker('sda1', 'p', 'a', 'c') + info = db.get_info() + self.assertEquals(info['x_container_sync_point1'], 123) + self.assertEquals(info['x_container_sync_point2'], 456) + # Set to new value + req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'PUT'}, + headers={'x-timestamp': '1', + 'x-container-sync-to': 'http://127.0.0.1:12345/v1/a/c2'}) + resp = self.controller.PUT(req) + self.assertEquals(resp.status_int, 202) + db = self.controller._get_container_broker('sda1', 'p', 'a', 'c') + info = db.get_info() + self.assertEquals(info['x_container_sync_point1'], -1) + self.assertEquals(info['x_container_sync_point2'], -1) + + def test_POST_reset_container_sync(self): + req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'PUT'}, + headers={'x-timestamp': '1', + 'x-container-sync-to': 'http://127.0.0.1:12345/v1/a/c'}) + resp = self.controller.PUT(req) + self.assertEquals(resp.status_int, 201) + db = self.controller._get_container_broker('sda1', 'p', 'a', 'c') + info = db.get_info() + self.assertEquals(info['x_container_sync_point1'], -1) + self.assertEquals(info['x_container_sync_point2'], -1) + db.set_x_container_sync_points(123, 456) + info = db.get_info() + self.assertEquals(info['x_container_sync_point1'], 123) + self.assertEquals(info['x_container_sync_point2'], 456) + # Set to same value + req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'POST'}, + headers={'x-timestamp': '1', + 'x-container-sync-to': 'http://127.0.0.1:12345/v1/a/c'}) + resp = self.controller.POST(req) + self.assertEquals(resp.status_int, 204) + db = self.controller._get_container_broker('sda1', 'p', 'a', 'c') + info = db.get_info() + self.assertEquals(info['x_container_sync_point1'], 123) + self.assertEquals(info['x_container_sync_point2'], 456) + # Set to new value + req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'POST'}, + headers={'x-timestamp': '1', + 'x-container-sync-to': 'http://127.0.0.1:12345/v1/a/c2'}) + resp = self.controller.POST(req) + self.assertEquals(resp.status_int, 204) + db = self.controller._get_container_broker('sda1', 'p', 'a', 'c') + info = db.get_info() + self.assertEquals(info['x_container_sync_point1'], -1) + self.assertEquals(info['x_container_sync_point2'], -1) + def test_DELETE(self): req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'PUT'}, headers={'X-Timestamp': '1'}) diff --git a/test/unit/container/test_sync.py b/test/unit/container/test_sync.py new file mode 100644 index 0000000000..78489b9570 --- /dev/null +++ b/test/unit/container/test_sync.py @@ -0,0 +1,824 @@ +# Copyright (c) 2010-2011 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest + +from swift.container import sync +from swift.common import utils +from swift.common.client import ClientException + + +utils.HASH_PATH_SUFFIX = 'endcap' + + +class FakeRing(object): + + def __init__(self): + self.replica_count = 3 + self.devs = [{'ip': '10.0.0.%s' % x, 'port': 1000 + x, 'device': 'sda'} + for x in xrange(3)] + + def get_nodes(self, account, container=None, obj=None): + return 1, list(self.devs) + + +class FakeContainerBroker(object): + + def __init__(self, path, metadata=None, info=None, deleted=False, + items_since=None): + self.db_file = path + self.metadata = metadata if metadata else {} + self.info = info if info else {} + self.deleted = deleted + self.items_since = items_since if items_since else [] + self.sync_point1 = -1 + self.sync_point2 = -1 + + def get_info(self): + return self.info + + def is_deleted(self): + return self.deleted + + def get_items_since(self, sync_point, limit): + if sync_point < 0: + sync_point = 0 + return self.items_since[sync_point:sync_point + limit] + + def set_x_container_sync_points(self, sync_point1, sync_point2): + self.sync_point1 = sync_point1 + self.sync_point2 = sync_point2 + + +class TestContainerSync(unittest.TestCase): + + def test_Iter2FileLikeObject(self): + flo = sync._Iter2FileLikeObject(iter(['123', '4567', '89', '0'])) + expect = '1234567890' + + got = flo.read(2) + self.assertTrue(len(got) <= 2) + self.assertEquals(got, expect[:len(got)]) + expect = expect[len(got):] + + got = flo.read(5) + self.assertTrue(len(got) <= 5) + self.assertEquals(got, expect[:len(got)]) + expect = expect[len(got):] + + self.assertEquals(flo.read(), expect) + self.assertEquals(flo.read(), '') + self.assertEquals(flo.read(2), '') + + flo = sync._Iter2FileLikeObject(iter(['123', '4567', '89', '0'])) + self.assertEquals(flo.read(), '1234567890') + self.assertEquals(flo.read(), '') + self.assertEquals(flo.read(2), '') + + def test_init(self): + cring = FakeRing() + oring = FakeRing() + cs = sync.ContainerSync({}, container_ring=cring, object_ring=oring) + self.assertTrue(cs.container_ring is cring) + self.assertTrue(cs.object_ring is oring) + + def test_run_forever(self): + # This runs runs_forever with fakes to succeed for two loops, the first + # causing a report but no interval sleep, the second no report but an + # interval sleep. + time_calls = [0] + sleep_calls = [] + audit_location_generator_calls = [0] + + def fake_time(): + time_calls[0] += 1 + returns = [1, # Initialized reported time + 1, # Start time + 3602, # Is it report time (yes) + 3602, # Report time + 3602, # Elapsed time for "under interval" (no) + 3602, # Start time + 3603, # Is it report time (no) + 3603, # Elapsed time for "under interval" (yes) + ] + if time_calls[0] == len(returns) + 1: + raise Exception('we are now done') + return returns[time_calls[0] - 1] + + def fake_sleep(amount): + sleep_calls.append(amount) + + def fake_audit_location_generator(*args, **kwargs): + audit_location_generator_calls[0] += 1 + # Makes .container_sync() short-circuit because 'path' doesn't end + # with .db + return [('path', 'device', 'partition')] + + orig_time = sync.time + orig_sleep = sync.sleep + orig_audit_location_generator = sync.audit_location_generator + try: + sync.time = fake_time + sync.sleep = fake_sleep + sync.audit_location_generator = fake_audit_location_generator + cs = sync.ContainerSync({}, container_ring=FakeRing(), + object_ring=FakeRing()) + cs.run_forever() + except Exception, err: + if str(err) != 'we are now done': + raise + finally: + sync.time = orig_time + sync.sleep = orig_sleep + sync.audit_location_generator = orig_audit_location_generator + + self.assertEquals(time_calls, [9]) + self.assertEquals(len(sleep_calls), 2) + self.assertTrue(sleep_calls[0] <= cs.interval) + self.assertTrue(sleep_calls[1] == cs.interval - 1) + self.assertEquals(audit_location_generator_calls, [2]) + self.assertEquals(cs.reported, 3602) + + def test_run_once(self): + # This runs runs_once with fakes twice, the first causing an interim + # report, the second with no interm report. + time_calls = [0] + audit_location_generator_calls = [0] + + def fake_time(): + time_calls[0] += 1 + returns = [1, # Initialized reported time + 1, # Start time + 3602, # Is it report time (yes) + 3602, # Report time + 3602, # End report time + 3602, # For elapsed + 3602, # Start time + 3603, # Is it report time (no) + 3604, # End report time + 3605, # For elapsed + ] + if time_calls[0] == len(returns) + 1: + raise Exception('we are now done') + return returns[time_calls[0] - 1] + + def fake_audit_location_generator(*args, **kwargs): + audit_location_generator_calls[0] += 1 + # Makes .container_sync() short-circuit because 'path' doesn't end + # with .db + return [('path', 'device', 'partition')] + + orig_time = sync.time + orig_audit_location_generator = sync.audit_location_generator + try: + sync.time = fake_time + sync.audit_location_generator = fake_audit_location_generator + cs = sync.ContainerSync({}, container_ring=FakeRing(), + object_ring=FakeRing()) + cs.run_once() + self.assertEquals(time_calls, [6]) + self.assertEquals(audit_location_generator_calls, [1]) + self.assertEquals(cs.reported, 3602) + cs.run_once() + except Exception, err: + if str(err) != 'we are now done': + raise + finally: + sync.time = orig_time + sync.audit_location_generator = orig_audit_location_generator + + self.assertEquals(time_calls, [10]) + self.assertEquals(audit_location_generator_calls, [2]) + self.assertEquals(cs.reported, 3604) + + def test_container_sync_not_db(self): + cring = FakeRing() + oring = FakeRing() + cs = sync.ContainerSync({}, container_ring=cring, object_ring=oring) + self.assertEquals(cs.container_failures, 0) + + def test_container_sync_missing_db(self): + cring = FakeRing() + oring = FakeRing() + cs = sync.ContainerSync({}, container_ring=cring, object_ring=oring) + cs.container_sync('isa.db') + self.assertEquals(cs.container_failures, 1) + + def test_container_sync_not_my_db(self): + # Db could be there due to handoff replication so test that we ignore + # those. + cring = FakeRing() + oring = FakeRing() + cs = sync.ContainerSync({}, container_ring=cring, object_ring=oring) + orig_ContainerBroker = sync.ContainerBroker + try: + sync.ContainerBroker = lambda p: FakeContainerBroker(p, + info={'account': 'a', 'container': 'c'}) + cs._myips = ['127.0.0.1'] # No match + cs._myport = 1 # No match + cs.container_sync('isa.db') + self.assertEquals(cs.container_failures, 0) + + cs._myips = ['10.0.0.0'] # Match + cs._myport = 1 # No match + cs.container_sync('isa.db') + self.assertEquals(cs.container_failures, 0) + + cs._myips = ['127.0.0.1'] # No match + cs._myport = 1000 # Match + cs.container_sync('isa.db') + self.assertEquals(cs.container_failures, 0) + + cs._myips = ['10.0.0.0'] # Match + cs._myport = 1000 # Match + # This complete match will cause the 1 container failure since the + # broker's info doesn't contain sync point keys + cs.container_sync('isa.db') + self.assertEquals(cs.container_failures, 1) + finally: + sync.ContainerBroker = orig_ContainerBroker + + def test_container_sync_deleted(self): + cring = FakeRing() + oring = FakeRing() + cs = sync.ContainerSync({}, container_ring=cring, object_ring=oring) + orig_ContainerBroker = sync.ContainerBroker + try: + sync.ContainerBroker = lambda p: FakeContainerBroker(p, + info={'account': 'a', 'container': 'c'}, deleted=False) + cs._myips = ['10.0.0.0'] # Match + cs._myport = 1000 # Match + # This complete match will cause the 1 container failure since the + # broker's info doesn't contain sync point keys + cs.container_sync('isa.db') + self.assertEquals(cs.container_failures, 1) + + sync.ContainerBroker = lambda p: FakeContainerBroker(p, + info={'account': 'a', 'container': 'c'}, deleted=True) + # This complete match will not cause any more container failures + # since the broker indicates deletion + cs.container_sync('isa.db') + self.assertEquals(cs.container_failures, 1) + finally: + sync.ContainerBroker = orig_ContainerBroker + + def test_container_sync_no_to_or_key(self): + cring = FakeRing() + oring = FakeRing() + cs = sync.ContainerSync({}, container_ring=cring, object_ring=oring) + orig_ContainerBroker = sync.ContainerBroker + try: + sync.ContainerBroker = lambda p: FakeContainerBroker(p, + info={'account': 'a', 'container': 'c', + 'x_container_sync_point1': -1, + 'x_container_sync_point2': -1}) + cs._myips = ['10.0.0.0'] # Match + cs._myport = 1000 # Match + # This complete match will be skipped since the broker's metadata + # has no x-container-sync-to or x-container-sync-key + cs.container_sync('isa.db') + self.assertEquals(cs.container_failures, 0) + self.assertEquals(cs.container_skips, 1) + + sync.ContainerBroker = lambda p: FakeContainerBroker(p, + info={'account': 'a', 'container': 'c', + 'x_container_sync_point1': -1, + 'x_container_sync_point2': -1}, + metadata={'x-container-sync-to': ('http://127.0.0.1/a/c', 1)}) + cs._myips = ['10.0.0.0'] # Match + cs._myport = 1000 # Match + # This complete match will be skipped since the broker's metadata + # has no x-container-sync-key + cs.container_sync('isa.db') + self.assertEquals(cs.container_failures, 0) + self.assertEquals(cs.container_skips, 2) + + sync.ContainerBroker = lambda p: FakeContainerBroker(p, + info={'account': 'a', 'container': 'c', + 'x_container_sync_point1': -1, + 'x_container_sync_point2': -1}, + metadata={'x-container-sync-key': ('key', 1)}) + cs._myips = ['10.0.0.0'] # Match + cs._myport = 1000 # Match + # This complete match will be skipped since the broker's metadata + # has no x-container-sync-to + cs.container_sync('isa.db') + self.assertEquals(cs.container_failures, 0) + self.assertEquals(cs.container_skips, 3) + + sync.ContainerBroker = lambda p: FakeContainerBroker(p, + info={'account': 'a', 'container': 'c', + 'x_container_sync_point1': -1, + 'x_container_sync_point2': -1}, + metadata={'x-container-sync-to': ('http://127.0.0.1/a/c', 1), + 'x-container-sync-key': ('key', 1)}) + cs._myips = ['10.0.0.0'] # Match + cs._myport = 1000 # Match + cs.allowed_sync_hosts = [] + # This complete match will cause a container failure since the + # sync-to won't validate as allowed. + cs.container_sync('isa.db') + self.assertEquals(cs.container_failures, 1) + self.assertEquals(cs.container_skips, 3) + + sync.ContainerBroker = lambda p: FakeContainerBroker(p, + info={'account': 'a', 'container': 'c', + 'x_container_sync_point1': -1, + 'x_container_sync_point2': -1}, + metadata={'x-container-sync-to': ('http://127.0.0.1/a/c', 1), + 'x-container-sync-key': ('key', 1)}) + cs._myips = ['10.0.0.0'] # Match + cs._myport = 1000 # Match + cs.allowed_sync_hosts = ['127.0.0.1'] + # This complete match will succeed completely since the broker + # get_items_since will return no new rows. + cs.container_sync('isa.db') + self.assertEquals(cs.container_failures, 1) + self.assertEquals(cs.container_skips, 3) + finally: + sync.ContainerBroker = orig_ContainerBroker + + def test_container_stop_at(self): + cring = FakeRing() + oring = FakeRing() + cs = sync.ContainerSync({}, container_ring=cring, object_ring=oring) + orig_ContainerBroker = sync.ContainerBroker + orig_time = sync.time + try: + sync.ContainerBroker = lambda p: FakeContainerBroker(p, + info={'account': 'a', 'container': 'c', + 'x_container_sync_point1': -1, + 'x_container_sync_point2': -1}, + metadata={'x-container-sync-to': ('http://127.0.0.1/a/c', 1), + 'x-container-sync-key': ('key', 1)}, + items_since=['erroneous data']) + cs._myips = ['10.0.0.0'] # Match + cs._myport = 1000 # Match + cs.allowed_sync_hosts = ['127.0.0.1'] + # This sync will fail since the items_since data is bad. + cs.container_sync('isa.db') + self.assertEquals(cs.container_failures, 1) + self.assertEquals(cs.container_skips, 0) + + # Set up fake times to make the sync short-circuit as having taken + # too long + fake_times = [ + 1.0, # Compute the time to move on + 100000.0, # Compute if it's time to move on from first loop + 100000.0] # Compute if it's time to move on from second loop + + def fake_time(): + return fake_times.pop(0) + + sync.time = fake_time + # This same sync won't fail since it will look like it took so long + # as to be time to move on (before it ever actually tries to do + # anything). + cs.container_sync('isa.db') + self.assertEquals(cs.container_failures, 1) + self.assertEquals(cs.container_skips, 0) + finally: + sync.ContainerBroker = orig_ContainerBroker + sync.time = orig_time + + def test_container_first_loop(self): + cring = FakeRing() + oring = FakeRing() + cs = sync.ContainerSync({}, container_ring=cring, object_ring=oring) + orig_ContainerBroker = sync.ContainerBroker + orig_hash_path = sync.hash_path + orig_delete_object = sync.delete_object + try: + + def fake_hash_path(account, container, obj, raw_digest=False): + # Ensures that no rows match for full syncing, ordinal is 0 and + # all hashes are 0 + return '\x00' * 16 + + sync.hash_path = fake_hash_path + fcb = FakeContainerBroker('path', + info={'account': 'a', 'container': 'c', + 'x_container_sync_point1': 2, + 'x_container_sync_point2': -1}, + metadata={'x-container-sync-to': ('http://127.0.0.1/a/c', 1), + 'x-container-sync-key': ('key', 1)}, + items_since=[{'ROWID': 1, 'name': 'o'}]) + sync.ContainerBroker = lambda p: fcb + cs._myips = ['10.0.0.0'] # Match + cs._myport = 1000 # Match + cs.allowed_sync_hosts = ['127.0.0.1'] + cs.container_sync('isa.db') + # Succeeds because no rows match + self.assertEquals(cs.container_failures, 0) + self.assertEquals(cs.container_skips, 0) + self.assertEquals(fcb.sync_point1, None) + self.assertEquals(fcb.sync_point2, 1) + + def fake_hash_path(account, container, obj, raw_digest=False): + # Ensures that all rows match for full syncing, ordinal is 0 + # and all hashes are 1 + return '\x01' * 16 + + sync.hash_path = fake_hash_path + fcb = FakeContainerBroker('path', + info={'account': 'a', 'container': 'c', + 'x_container_sync_point1': 1, + 'x_container_sync_point2': -1}, + metadata={'x-container-sync-to': ('http://127.0.0.1/a/c', 1), + 'x-container-sync-key': ('key', 1)}, + items_since=[{'ROWID': 1, 'name': 'o'}]) + sync.ContainerBroker = lambda p: fcb + cs._myips = ['10.0.0.0'] # Match + cs._myport = 1000 # Match + cs.allowed_sync_hosts = ['127.0.0.1'] + cs.container_sync('isa.db') + # Succeeds because the two sync points haven't deviated enough yet + self.assertEquals(cs.container_failures, 0) + self.assertEquals(cs.container_skips, 0) + self.assertEquals(fcb.sync_point1, -1) + self.assertEquals(fcb.sync_point2, -1) + + fcb = FakeContainerBroker('path', + info={'account': 'a', 'container': 'c', + 'x_container_sync_point1': 2, + 'x_container_sync_point2': -1}, + metadata={'x-container-sync-to': ('http://127.0.0.1/a/c', 1), + 'x-container-sync-key': ('key', 1)}, + items_since=[{'ROWID': 1, 'name': 'o'}]) + sync.ContainerBroker = lambda p: fcb + cs._myips = ['10.0.0.0'] # Match + cs._myport = 1000 # Match + cs.allowed_sync_hosts = ['127.0.0.1'] + cs.container_sync('isa.db') + # Fails because container_sync_row will fail since the row has no + # 'deleted' key + self.assertEquals(cs.container_failures, 1) + self.assertEquals(cs.container_skips, 0) + self.assertEquals(fcb.sync_point1, -1) + self.assertEquals(fcb.sync_point2, -1) + + fcb = FakeContainerBroker('path', + info={'account': 'a', 'container': 'c', + 'x_container_sync_point1': 2, + 'x_container_sync_point2': -1}, + metadata={'x-container-sync-to': ('http://127.0.0.1/a/c', 1), + 'x-container-sync-key': ('key', 1)}, + items_since=[{'ROWID': 1, 'name': 'o', 'created_at': '1.2', + 'deleted': True}]) + sync.ContainerBroker = lambda p: fcb + cs._myips = ['10.0.0.0'] # Match + cs._myport = 1000 # Match + cs.allowed_sync_hosts = ['127.0.0.1'] + cs.container_sync('isa.db') + # Fails because delete_object fails + self.assertEquals(cs.container_failures, 2) + self.assertEquals(cs.container_skips, 0) + self.assertEquals(fcb.sync_point1, -1) + self.assertEquals(fcb.sync_point2, -1) + + def fake_delete_object(*args, **kwargs): + pass + + sync.delete_object = fake_delete_object + fcb = FakeContainerBroker('path', + info={'account': 'a', 'container': 'c', + 'x_container_sync_point1': 2, + 'x_container_sync_point2': -1}, + metadata={'x-container-sync-to': ('http://127.0.0.1/a/c', 1), + 'x-container-sync-key': ('key', 1)}, + items_since=[{'ROWID': 1, 'name': 'o', 'created_at': '1.2', + 'deleted': True}]) + sync.ContainerBroker = lambda p: fcb + cs._myips = ['10.0.0.0'] # Match + cs._myport = 1000 # Match + cs.allowed_sync_hosts = ['127.0.0.1'] + cs.container_sync('isa.db') + # Succeeds because delete_object succeeds + self.assertEquals(cs.container_failures, 2) + self.assertEquals(cs.container_skips, 0) + self.assertEquals(fcb.sync_point1, None) + self.assertEquals(fcb.sync_point2, 1) + finally: + sync.ContainerBroker = orig_ContainerBroker + sync.hash_path = orig_hash_path + sync.delete_object = orig_delete_object + + def test_container_second_loop(self): + cring = FakeRing() + oring = FakeRing() + cs = sync.ContainerSync({}, container_ring=cring, object_ring=oring) + orig_ContainerBroker = sync.ContainerBroker + orig_hash_path = sync.hash_path + orig_delete_object = sync.delete_object + try: + # We'll ensure the first loop is always skipped by keeping the two + # sync points equal + + def fake_hash_path(account, container, obj, raw_digest=False): + # Ensures that no rows match for second loop, ordinal is 0 and + # all hashes are 1 + return '\x01' * 16 + + sync.hash_path = fake_hash_path + fcb = FakeContainerBroker('path', + info={'account': 'a', 'container': 'c', + 'x_container_sync_point1': -1, + 'x_container_sync_point2': -1}, + metadata={'x-container-sync-to': ('http://127.0.0.1/a/c', 1), + 'x-container-sync-key': ('key', 1)}, + items_since=[{'ROWID': 1, 'name': 'o'}]) + sync.ContainerBroker = lambda p: fcb + cs._myips = ['10.0.0.0'] # Match + cs._myport = 1000 # Match + cs.allowed_sync_hosts = ['127.0.0.1'] + cs.container_sync('isa.db') + # Succeeds because no rows match + self.assertEquals(cs.container_failures, 0) + self.assertEquals(cs.container_skips, 0) + self.assertEquals(fcb.sync_point1, 1) + self.assertEquals(fcb.sync_point2, None) + + def fake_hash_path(account, container, obj, raw_digest=False): + # Ensures that all rows match for second loop, ordinal is 0 and + # all hashes are 0 + return '\x00' * 16 + + def fake_delete_object(*args, **kwargs): + pass + + sync.hash_path = fake_hash_path + sync.delete_object = fake_delete_object + fcb = FakeContainerBroker('path', + info={'account': 'a', 'container': 'c', + 'x_container_sync_point1': -1, + 'x_container_sync_point2': -1}, + metadata={'x-container-sync-to': ('http://127.0.0.1/a/c', 1), + 'x-container-sync-key': ('key', 1)}, + items_since=[{'ROWID': 1, 'name': 'o'}]) + sync.ContainerBroker = lambda p: fcb + cs._myips = ['10.0.0.0'] # Match + cs._myport = 1000 # Match + cs.allowed_sync_hosts = ['127.0.0.1'] + cs.container_sync('isa.db') + # Fails because row is missing 'deleted' key + self.assertEquals(cs.container_failures, 1) + self.assertEquals(cs.container_skips, 0) + self.assertEquals(fcb.sync_point1, -1) + self.assertEquals(fcb.sync_point2, -1) + + fcb = FakeContainerBroker('path', + info={'account': 'a', 'container': 'c', + 'x_container_sync_point1': -1, + 'x_container_sync_point2': -1}, + metadata={'x-container-sync-to': ('http://127.0.0.1/a/c', 1), + 'x-container-sync-key': ('key', 1)}, + items_since=[{'ROWID': 1, 'name': 'o', 'created_at': '1.2', + 'deleted': True}]) + sync.ContainerBroker = lambda p: fcb + cs._myips = ['10.0.0.0'] # Match + cs._myport = 1000 # Match + cs.allowed_sync_hosts = ['127.0.0.1'] + cs.container_sync('isa.db') + # Succeeds because row now has 'deleted' key and delete_object + # succeeds + self.assertEquals(cs.container_failures, 1) + self.assertEquals(cs.container_skips, 0) + self.assertEquals(fcb.sync_point1, 1) + self.assertEquals(fcb.sync_point2, None) + finally: + sync.ContainerBroker = orig_ContainerBroker + sync.hash_path = orig_hash_path + sync.delete_object = orig_delete_object + + def test_container_sync_row_delete(self): + orig_delete_object = sync.delete_object + try: + + def fake_delete_object(path, name=None, headers=None, proxy=None): + self.assertEquals(path, 'http://sync/to/path') + self.assertEquals(name, 'object') + self.assertEquals(headers, + {'x-container-sync-key': 'key', 'x-timestamp': '1.2'}) + self.assertEquals(proxy, 'http://proxy') + + sync.delete_object = fake_delete_object + cs = sync.ContainerSync({}, container_ring=FakeRing(), + object_ring=FakeRing()) + cs.proxy = 'http://proxy' + # Success + self.assertTrue(cs.container_sync_row({'deleted': True, + 'name': 'object', 'created_at': '1.2'}, 'http://sync/to/path', + 'key', FakeContainerBroker('broker'), 'info')) + self.assertEquals(cs.container_deletes, 1) + + exc = [] + + def fake_delete_object(path, name=None, headers=None, proxy=None): + exc.append(Exception('test exception')) + raise exc[-1] + + sync.delete_object = fake_delete_object + # Failure because of delete_object exception + self.assertFalse(cs.container_sync_row({'deleted': True, + 'name': 'object', 'created_at': '1.2'}, 'http://sync/to/path', + 'key', FakeContainerBroker('broker'), 'info')) + self.assertEquals(cs.container_deletes, 1) + self.assertEquals(len(exc), 1) + self.assertEquals(str(exc[-1]), 'test exception') + + def fake_delete_object(path, name=None, headers=None, proxy=None): + exc.append(ClientException('test client exception')) + raise exc[-1] + + sync.delete_object = fake_delete_object + # Failure because of delete_object exception + self.assertFalse(cs.container_sync_row({'deleted': True, + 'name': 'object', 'created_at': '1.2'}, 'http://sync/to/path', + 'key', FakeContainerBroker('broker'), 'info')) + self.assertEquals(cs.container_deletes, 1) + self.assertEquals(len(exc), 2) + self.assertEquals(str(exc[-1]), 'test client exception') + + def fake_delete_object(path, name=None, headers=None, proxy=None): + exc.append(ClientException('test client exception', + http_status=404)) + raise exc[-1] + + sync.delete_object = fake_delete_object + # Success because the object wasn't even found + self.assertTrue(cs.container_sync_row({'deleted': True, + 'name': 'object', 'created_at': '1.2'}, 'http://sync/to/path', + 'key', FakeContainerBroker('broker'), 'info')) + self.assertEquals(cs.container_deletes, 2) + self.assertEquals(len(exc), 3) + self.assertEquals(str(exc[-1]), 'test client exception: 404') + finally: + sync.delete_object = orig_delete_object + + def test_container_sync_row_put(self): + orig_shuffle = sync.shuffle + orig_put_object = sync.put_object + orig_direct_get_object = sync.direct_get_object + try: + sync.shuffle = lambda x: x + + def fake_put_object(sync_to, name=None, headers=None, + contents=None, proxy=None): + self.assertEquals(sync_to, 'http://sync/to/path') + self.assertEquals(name, 'object') + self.assertEquals(headers, {'x-container-sync-key': 'key', + 'x-timestamp': '1.2', + 'other-header': 'other header value', + 'etag': 'etagvalue'}) + self.assertEquals(contents.read(), 'contents') + self.assertEquals(proxy, 'http://proxy') + + sync.put_object = fake_put_object + + cs = sync.ContainerSync({}, container_ring=FakeRing(), + object_ring=FakeRing()) + cs.proxy = 'http://proxy' + + def fake_direct_get_object(node, part, account, container, obj, + resp_chunk_size=1): + return ({'other-header': 'other header value', + 'etag': '"etagvalue"', 'x-timestamp': '1.2'}, + iter('contents')) + + sync.direct_get_object = fake_direct_get_object + # Success as everything says it worked + self.assertTrue(cs.container_sync_row({'deleted': False, + 'name': 'object', 'created_at': '1.2'}, 'http://sync/to/path', + 'key', FakeContainerBroker('broker'), {'account': 'a', + 'container': 'c'})) + self.assertEquals(cs.container_puts, 1) + + def fake_direct_get_object(node, part, account, container, obj, + resp_chunk_size=1): + return ({'date': 'date value', + 'last-modified': 'last modified value', + 'x-timestamp': '1.2', + 'other-header': 'other header value', + 'etag': '"etagvalue"'}, + iter('contents')) + + sync.direct_get_object = fake_direct_get_object + # Success as everything says it worked, also checks 'date' and + # 'last-modified' headers are removed and that 'etag' header is + # stripped of double quotes. + self.assertTrue(cs.container_sync_row({'deleted': False, + 'name': 'object', 'created_at': '1.2'}, 'http://sync/to/path', + 'key', FakeContainerBroker('broker'), {'account': 'a', + 'container': 'c'})) + self.assertEquals(cs.container_puts, 2) + + exc = [] + + def fake_direct_get_object(node, part, account, container, obj, + resp_chunk_size=1): + exc.append(Exception('test exception')) + raise exc[-1] + + sync.direct_get_object = fake_direct_get_object + # Fail due to completely unexpected exception + self.assertFalse(cs.container_sync_row({'deleted': False, + 'name': 'object', 'created_at': '1.2'}, 'http://sync/to/path', + 'key', FakeContainerBroker('broker'), {'account': 'a', + 'container': 'c'})) + self.assertEquals(cs.container_puts, 2) + self.assertEquals(len(exc), 1) + self.assertEquals(str(exc[-1]), 'test exception') + + def fake_direct_get_object(node, part, account, container, obj, + resp_chunk_size=1): + exc.append(ClientException('test client exception')) + raise exc[-1] + + sync.direct_get_object = fake_direct_get_object + # Fail due to all direct_get_object calls failing + self.assertFalse(cs.container_sync_row({'deleted': False, + 'name': 'object', 'created_at': '1.2'}, 'http://sync/to/path', + 'key', FakeContainerBroker('broker'), {'account': 'a', + 'container': 'c'})) + self.assertEquals(cs.container_puts, 2) + self.assertEquals(len(exc), 4) + self.assertEquals(str(exc[-1]), 'test client exception') + + def fake_direct_get_object(node, part, account, container, obj, + resp_chunk_size=1): + return ({'other-header': 'other header value', + 'x-timestamp': '1.2', 'etag': '"etagvalue"'}, + iter('contents')) + + def fake_put_object(sync_to, name=None, headers=None, + contents=None, proxy=None): + raise ClientException('test client exception', http_status=401) + + class FakeLogger(object): + + def __init__(self): + self.err = '' + self.exc = '' + + def info(self, err, *args, **kwargs): + self.err = err + + def exception(self, exc, *args, **kwargs): + self.exc = exc + + sync.direct_get_object = fake_direct_get_object + sync.put_object = fake_put_object + cs.logger = FakeLogger() + # Fail due to 401 + self.assertFalse(cs.container_sync_row({'deleted': False, + 'name': 'object', 'created_at': '1.2'}, 'http://sync/to/path', + 'key', FakeContainerBroker('broker'), {'account': 'a', + 'container': 'c'})) + self.assertEquals(cs.container_puts, 2) + self.assertTrue(cs.logger.err.startswith('Unauth ')) + + def fake_put_object(sync_to, name=None, headers=None, + contents=None, proxy=None): + raise ClientException('test client exception', http_status=404) + + sync.put_object = fake_put_object + # Fail due to 404 + self.assertFalse(cs.container_sync_row({'deleted': False, + 'name': 'object', 'created_at': '1.2'}, 'http://sync/to/path', + 'key', FakeContainerBroker('broker'), {'account': 'a', + 'container': 'c'})) + self.assertEquals(cs.container_puts, 2) + self.assertTrue(cs.logger.err.startswith('Not found ')) + + def fake_put_object(sync_to, name=None, headers=None, + contents=None, proxy=None): + raise ClientException('test client exception', http_status=503) + + sync.put_object = fake_put_object + # Fail due to 503 + self.assertFalse(cs.container_sync_row({'deleted': False, + 'name': 'object', 'created_at': '1.2'}, 'http://sync/to/path', + 'key', FakeContainerBroker('broker'), {'account': 'a', + 'container': 'c'})) + self.assertEquals(cs.container_puts, 2) + self.assertTrue(cs.logger.exc.startswith('ERROR Syncing ')) + finally: + sync.shuffle = orig_shuffle + sync.put_object = orig_put_object + sync.direct_get_object = orig_direct_get_object + + +if __name__ == '__main__': + unittest.main()