Merging master into feature/ec branch

Change-Id: Ifed60e050629c674a34ede78feacffd7e6ae243e
This commit is contained in:
John Dickinson 2014-08-04 17:02:12 -07:00
commit 9ace546b81
46 changed files with 1962 additions and 338 deletions

View File

@ -486,7 +486,7 @@ handoff_delete auto By default handoff partitions will be
the partition if it is successfully
replicated to n nodes. The default
setting should not be changed, except
for extremem situations.
for extreme situations.
node_timeout DEFAULT or 10 Request timeout to external services.
This uses what's set here, or what's set
in the DEFAULT section, or 10 (though

View File

@ -98,6 +98,7 @@ CQ :ref:`container-quotas`
CS :ref:`container-sync`
TA :ref:`common_tempauth`
DLO :ref:`dynamic-large-objects`
LE :ref:`list_endpoints`
======================= =============================
@ -126,9 +127,11 @@ status_int The response code for the request.
content_length The value of the Content-Length header in the response.
referer The value of the HTTP Referer header.
transaction_id The transaction id of the request.
user_agent The value of the HTTP User-Agent header. Swift's proxy
server sets its user-agent to
``"proxy-server <pid of the proxy>".``
user_agent The value of the HTTP User-Agent header. Swift services
report a user-agent string of the service name followed by
the process ID, such as ``"proxy-server <pid of the
proxy>"`` or ``"object-updater <pid of the object
updater>"``.
request_time The duration of the request.
additional_info Additional useful information.
server_pid The process id of the server

View File

@ -113,11 +113,11 @@ Swift is able to authenticate against OpenStack keystone via the
:mod:`swift.common.middleware.keystoneauth` middleware.
In order to use the ``keystoneauth`` middleware the ``authtoken``
middleware from python-keystoneclient will need to be configured.
middleware from keystonemiddleware will need to be configured.
The ``authtoken`` middleware performs the authentication token
validation and retrieves actual user authentication information. It
can be found in the python-keystoneclient distribution.
can be found in the keystonemiddleware distribution.
The ``keystoneauth`` middleware performs authorization and mapping the
``keystone`` roles to Swift's ACLs.
@ -149,7 +149,7 @@ and add auth_token and keystoneauth in your
add the configuration for the authtoken middleware::
[filter:authtoken]
paste.filter_factory = keystoneclient.middleware.auth_token:filter_factory
paste.filter_factory = keystonemiddleware.auth_token:filter_factory
auth_host = keystonehost
auth_port = 35357
auth_protocol = http

View File

@ -238,7 +238,7 @@ were determined by "walking" the ring until finding additional devices in other
zones. This was discarded as control was lost as to how many replicas for a
given partition moved at once. Keeping each replica independent allows for
moving only one partition replica within a given time window (except due to
device failures). Using the additional memory was deemed a good tradeoff for
device failures). Using the additional memory was deemed a good trade-off for
moving data around the cluster much less often.
Another ring design was tried where the partition to device assignments weren't

View File

@ -52,7 +52,7 @@
# reclaim_age = 604800
[pipeline:main]
pipeline = catch_errors cache proxy-server
pipeline = catch_errors proxy-logging cache proxy-server
[app:proxy-server]
use = egg:swift#proxy
@ -65,3 +65,44 @@ use = egg:swift#memcache
[filter:catch_errors]
use = egg:swift#catch_errors
# See proxy-server.conf-sample for options
[filter:proxy-logging]
use = egg:swift#proxy_logging
# If not set, logging directives from [DEFAULT] without "access_" will be used
# access_log_name = swift
# access_log_facility = LOG_LOCAL0
# access_log_level = INFO
# access_log_address = /dev/log
#
# If set, access_log_udp_host will override access_log_address
# access_log_udp_host =
# access_log_udp_port = 514
#
# You can use log_statsd_* from [DEFAULT] or override them here:
# access_log_statsd_host = localhost
# access_log_statsd_port = 8125
# access_log_statsd_default_sample_rate = 1.0
# access_log_statsd_sample_rate_factor = 1.0
# access_log_statsd_metric_prefix =
# access_log_headers = false
#
# If access_log_headers is True and access_log_headers_only is set only
# these headers are logged. Multiple headers can be defined as comma separated
# list like this: access_log_headers_only = Host, X-Object-Meta-Mtime
# access_log_headers_only =
#
# By default, the X-Auth-Token is logged. To obscure the value,
# set reveal_sensitive_prefix to the number of characters to log.
# For example, if set to 12, only the first 12 characters of the
# token appear in the log. An unauthorized access of the log file
# won't allow unauthorized usage of the token. However, the first
# 12 or so characters is unique enough that you can trace/debug
# token usage. Set to 0 to suppress the token completely (replaced
# by '...' in the log).
# Note: reveal_sensitive_prefix will not affect the value
# logged with access_log_headers=True.
# reveal_sensitive_prefix = 16
#
# What HTTP methods are allowed for StatsD logging (comma-sep); request methods
# not in this list will have "BAD_METHOD" for the <verb> portion of the metric.
# log_statsd_valid_http_methods = GET,HEAD,POST,PUT,DELETE,COPY,OPTIONS

View File

@ -266,7 +266,7 @@ user_test_tester3 = testing3
# there you can change it to: authtoken keystoneauth
#
# [filter:authtoken]
# paste.filter_factory = keystoneclient.middleware.auth_token:filter_factory
# paste.filter_factory = keystonemiddleware.auth_token:filter_factory
# auth_host = keystonehost
# auth_port = 35357
# auth_protocol = http

View File

@ -453,6 +453,7 @@ class AccountBroker(DatabaseBroker):
"""
def _really_merge_items(conn):
max_rowid = -1
curs = conn.cursor()
for rec in item_list:
record = [rec['name'], rec['put_timestamp'],
rec['delete_timestamp'], rec['object_count'],
@ -466,9 +467,9 @@ class AccountBroker(DatabaseBroker):
'''
if self.get_db_version(conn) >= 1:
query += ' AND deleted IN (0, 1)'
curs = conn.execute(query, (rec['name'],))
curs.row_factory = None
row = curs.fetchone()
curs_row = curs.execute(query, (rec['name'],))
curs_row.row_factory = None
row = curs_row.fetchone()
if row:
row = list(row)
for i in xrange(5):
@ -484,11 +485,11 @@ class AccountBroker(DatabaseBroker):
record[5] = 1
else:
record[5] = 0
conn.execute('''
curs.execute('''
DELETE FROM container WHERE name = ? AND
deleted IN (0, 1)
''', (record[0],))
conn.execute('''
curs.execute('''
INSERT INTO container (name, put_timestamp,
delete_timestamp, object_count, bytes_used,
deleted, storage_policy_index)
@ -498,12 +499,12 @@ class AccountBroker(DatabaseBroker):
max_rowid = max(max_rowid, rec['ROWID'])
if source:
try:
conn.execute('''
curs.execute('''
INSERT INTO incoming_sync (sync_point, remote_id)
VALUES (?, ?)
''', (max_rowid, source))
except sqlite3.IntegrityError:
conn.execute('''
curs.execute('''
UPDATE incoming_sync
SET sync_point=max(?, sync_point)
WHERE remote_id=?

View File

@ -927,7 +927,7 @@ class SwiftRecon(object):
self.auditor_check(hosts)
self.umount_check(hosts)
self.load_check(hosts)
self.disk_usage(hosts)
self.disk_usage(hosts, options.top, options.human_readable)
self.get_ringmd5(hosts, swift_dir)
self.quarantine_check(hosts)
self.socket_usage(hosts)

View File

@ -728,6 +728,7 @@ class SimpleClient(object):
max_backoff=5, retries=5):
self.url = url
self.token = token
self.attempts = 0 # needed in swif-dispersion-populate
self.starting_backoff = starting_backoff
self.max_backoff = max_backoff
self.retries = retries
@ -796,14 +797,14 @@ class SimpleClient(object):
def retry_request(self, method, **kwargs):
retries = kwargs.pop('retries', self.retries)
attempts = 0
self.attempts = 0
backoff = self.starting_backoff
while attempts <= retries:
attempts += 1
while self.attempts <= retries:
self.attempts += 1
try:
return self.base_request(method, **kwargs)
except (socket.error, httplib.HTTPException, urllib2.URLError):
if attempts > retries:
if self.attempts > retries:
raise
sleep(backoff)
backoff = min(backoff * 2, self.max_backoff)

View File

@ -31,7 +31,14 @@ The format of the form is::
<input type="submit" />
</form>
The <swift-url> is the URL to the Swift desination, such as::
Optionally, if you want the uploaded files to be temporary you can set
x-delete-at or x-delete-after attributes by adding one of these as a
form input::
<input type="hidden" name="x_delete_at" value="<unix-timestamp>" />
<input type="hidden" name="x_delete_after" value="<seconds>" />
The <swift-url> is the URL of the Swift destination, such as::
https://swift-cluster.example.com/v1/AUTH_account/container/object_prefix
@ -83,10 +90,12 @@ sample code for computing the signature::
max_file_size, max_file_count, expires)
signature = hmac.new(key, hmac_body, sha1).hexdigest()
The key is the value of the X-Account-Meta-Temp-URL-Key header on the
account.
The key is the value of either the X-Account-Meta-Temp-URL-Key or the
X-Account-Meta-Temp-Url-Key-2 header on the account.
Be certain to use the full path, from the /v1/ onward.
Note that x_delete_at and x_delete_after are not used in signature generation
as they are both optional attributes.
The command line tool ``swift-form-signature`` may be used (mostly
just when testing) to compute expires and signature.
@ -441,6 +450,19 @@ class FormPost(object):
subenv['PATH_INFO'].count('/') < 4:
subenv['PATH_INFO'] += '/'
subenv['PATH_INFO'] += attributes['filename'] or 'filename'
if 'x_delete_at' in attributes:
try:
subenv['HTTP_X_DELETE_AT'] = int(attributes['x_delete_at'])
except ValueError:
raise FormInvalid('x_delete_at not an integer: '
'Unix timestamp required.')
if 'x_delete_after' in attributes:
try:
subenv['HTTP_X_DELETE_AFTER'] = int(
attributes['x_delete_after'])
except ValueError:
raise FormInvalid('x_delete_after not an integer: '
'Number of seconds required.')
if 'content-type' in attributes:
subenv['CONTENT_TYPE'] = \
attributes['content-type'] or 'application/octet-stream'

View File

@ -20,11 +20,14 @@ This middleware makes it possible to integrate swift with software
that relies on data locality information to avoid network overhead,
such as Hadoop.
Answers requests of the form::
Using the original API, answers requests of the form::
/endpoints/{account}/{container}/{object}
/endpoints/{account}/{container}
/endpoints/{account}
/endpoints/v1/{account}/{container}/{object}
/endpoints/v1/{account}/{container}
/endpoints/v1/{account}
with a JSON-encoded list of endpoints of the form::
@ -38,6 +41,26 @@ correspondingly, e.g.::
http://10.1.1.1:6000/sda1/2/a/c2
http://10.1.1.1:6000/sda1/2/a
Using the v2 API, answers requests of the form::
/endpoints/v2/{account}/{container}/{object}
/endpoints/v2/{account}/{container}
/endpoints/v2/{account}
with a JSON-encoded dictionary containing a key 'endpoints' that maps to a list
of endpoints having the same form as described above, and a key 'headers' that
maps to a dictionary of headers that should be sent with a request made to
the endpoints, e.g.::
{ "endpoints": {"http://10.1.1.1:6010/sda1/2/a/c3/o1",
"http://10.1.1.1:6030/sda3/2/a/c3/o1",
"http://10.1.1.1:6040/sda4/2/a/c3/o1"},
"headers": {"X-Backend-Storage-Policy-Index": "1"}}
In this example, the 'headers' dictionary indicates that requests to the
endpoint URLs should include the header 'X-Backend-Storage-Policy-Index: 1'
because the object's container is using storage policy index 1.
The '/endpoints/' path is customizable ('list_endpoints_path'
configuration parameter).
@ -64,6 +87,8 @@ from swift.common.swob import HTTPBadRequest, HTTPMethodNotAllowed
from swift.common.storage_policy import POLICIES
from swift.proxy.controllers.base import get_container_info
RESPONSE_VERSIONS = (1.0, 2.0)
class ListEndpointsMiddleware(object):
"""
@ -87,6 +112,11 @@ class ListEndpointsMiddleware(object):
self.endpoints_path = conf.get('list_endpoints_path', '/endpoints/')
if not self.endpoints_path.endswith('/'):
self.endpoints_path += '/'
self.default_response_version = 1.0
self.response_map = {
1.0: self.v1_format_response,
2.0: self.v2_format_response,
}
def get_object_ring(self, policy_idx):
"""
@ -97,6 +127,71 @@ class ListEndpointsMiddleware(object):
"""
return POLICIES.get_object_ring(policy_idx, self.swift_dir)
def _parse_version(self, raw_version):
err_msg = 'Unsupported version %r' % raw_version
try:
version = float(raw_version.lstrip('v'))
except ValueError:
raise ValueError(err_msg)
if not any(version == v for v in RESPONSE_VERSIONS):
raise ValueError(err_msg)
return version
def _parse_path(self, request):
"""
Parse path parts of request into a tuple of version, account,
container, obj. Unspecified path parts are filled in as None,
except version which is always returned as a float using the
configured default response version if not specified in the
request.
:param request: the swob request
:returns: parsed path parts as a tuple with version filled in as
configured default response version if not specified.
:raises: ValueError if path is invalid, message will say why.
"""
clean_path = request.path[len(self.endpoints_path) - 1:]
# try to peel off version
try:
raw_version, rest = split_path(clean_path, 1, 2, True)
except ValueError:
raise ValueError('No account specified')
try:
version = self._parse_version(raw_version)
except ValueError:
if raw_version.startswith('v') and '_' not in raw_version:
# looks more like a invalid version than an account
raise
# probably no version specified, but if the client really
# said /endpoints/v_3/account they'll probably be sorta
# confused by the useless response and lack of error.
version = self.default_response_version
rest = clean_path
else:
rest = '/' + rest if rest else '/'
try:
account, container, obj = split_path(rest, 1, 3, True)
except ValueError:
raise ValueError('No account specified')
return version, account, container, obj
def v1_format_response(self, req, endpoints, **kwargs):
return Response(json.dumps(endpoints),
content_type='application/json')
def v2_format_response(self, req, endpoints, storage_policy_index,
**kwargs):
resp = {
'endpoints': endpoints,
'headers': {},
}
if storage_policy_index is not None:
resp['headers'][
'X-Backend-Storage-Policy-Index'] = str(storage_policy_index)
return Response(json.dumps(resp),
content_type='application/json')
def __call__(self, env, start_response):
request = Request(env)
if not request.path.startswith(self.endpoints_path):
@ -107,11 +202,9 @@ class ListEndpointsMiddleware(object):
req=request, headers={"Allow": "GET"})(env, start_response)
try:
clean_path = request.path[len(self.endpoints_path) - 1:]
account, container, obj = \
split_path(clean_path, 1, 3, True)
except ValueError:
return HTTPBadRequest('No account specified')(env, start_response)
version, account, container, obj = self._parse_path(request)
except ValueError as err:
return HTTPBadRequest(str(err))(env, start_response)
if account is not None:
account = unquote(account)
@ -120,16 +213,13 @@ class ListEndpointsMiddleware(object):
if obj is not None:
obj = unquote(obj)
storage_policy_index = None
if obj is not None:
# remove 'endpoints' from call to get_container_info
stripped = request.environ
if stripped['PATH_INFO'][:len(self.endpoints_path)] == \
self.endpoints_path:
stripped['PATH_INFO'] = "/v1/" + \
stripped['PATH_INFO'][len(self.endpoints_path):]
container_info = get_container_info(
stripped, self.app, swift_source='LE')
obj_ring = self.get_object_ring(container_info['storage_policy'])
{'PATH_INFO': '/v1/%s/%s' % (account, container)},
self.app, swift_source='LE')
storage_policy_index = container_info['storage_policy']
obj_ring = self.get_object_ring(storage_policy_index)
partition, nodes = obj_ring.get_nodes(
account, container, obj)
endpoint_template = 'http://{ip}:{port}/{device}/{partition}/' + \
@ -157,8 +247,10 @@ class ListEndpointsMiddleware(object):
obj=quote(obj or ''))
endpoints.append(endpoint)
return Response(json.dumps(endpoints),
content_type='application/json')(env, start_response)
resp = self.response_map[version](
request, endpoints=endpoints,
storage_policy_index=storage_policy_index)
return resp(env, start_response)
def filter_factory(global_conf, **local_conf):

View File

@ -253,10 +253,10 @@ class ProxyLoggingMiddleware(object):
break
else:
if not chunk:
start_response_args[0][1].append(('content-length', '0'))
start_response_args[0][1].append(('Content-Length', '0'))
elif isinstance(iterable, list):
start_response_args[0][1].append(
('content-length', str(sum(len(i) for i in iterable))))
('Content-Length', str(sum(len(i) for i in iterable))))
start_response(*start_response_args[0])
req = Request(env)

View File

@ -46,9 +46,9 @@ limited to the expiration time set when the website created the link.
To create such temporary URLs, first an X-Account-Meta-Temp-URL-Key
header must be set on the Swift account. Then, an HMAC-SHA1 (RFC 2104)
signature is generated using the HTTP method to allow (GET or PUT),
the Unix timestamp the access should be allowed until, the full path
to the object, and the key set on the account.
signature is generated using the HTTP method to allow (GET, PUT,
DELETE, etc.), the Unix timestamp the access should be allowed until,
the full path to the object, and the key set on the account.
For example, here is code generating the signature for a GET for 60
seconds on /v1/AUTH_account/container/object::
@ -75,7 +75,7 @@ da39a3ee5e6b4b0d3255bfef95601890afd80709 and expires ends up
Any alteration of the resource path or query arguments would result
in 401 Unauthorized. Similary, a PUT where GET was the allowed method
would 401. HEAD is allowed if GET or PUT is allowed.
would 401. HEAD is allowed if GET, PUT, or POST is allowed.
Using this in combination with browser form post translation
middleware could also allow direct-from-browser uploads to specific
@ -300,6 +300,8 @@ class TempURL(object):
self._get_hmacs(env, temp_url_expires, keys) +
self._get_hmacs(env, temp_url_expires, keys,
request_method='GET') +
self._get_hmacs(env, temp_url_expires, keys,
request_method='POST') +
self._get_hmacs(env, temp_url_expires, keys,
request_method='PUT'))
else:

View File

@ -225,6 +225,21 @@ def remove_items(headers, condition):
return removed
def copy_header_subset(from_r, to_r, condition):
"""
Will copy desired subset of headers from from_r to to_r.
:param from_r: a swob Request or Response
:param to_r: a swob Request or Response
:param condition: a function that will be passed the header key as a
single argument and should return True if the header
is to be copied.
"""
for k, v in from_r.headers.items():
if condition(k):
to_r.headers[k] = v
def close_if_possible(maybe_closable):
close_method = getattr(maybe_closable, 'close', None)
if callable(close_method):

View File

@ -280,7 +280,6 @@ def parse_builder_ring_filename_args(argvish):
ring_file = first_arg
else:
ring_file = builder_file[:-len('.builder')]
if not first_arg.endswith('.ring.gz'):
ring_file += '.ring.gz'
return builder_file, ring_file

View File

@ -632,7 +632,7 @@ class Timestamp(object):
return INTERNAL_FORMAT % (self.timestamp, self.offset)
def __str__(self):
raise TypeError('You must specificy which string format is required')
raise TypeError('You must specify which string format is required')
def __float__(self):
return self.timestamp
@ -1629,26 +1629,32 @@ def lock_file(filename, timeout=10, append=False, unlink=True):
mode = 'a+'
else:
mode = 'r+'
fd = os.open(filename, flags)
file_obj = os.fdopen(fd, mode)
try:
with swift.common.exceptions.LockTimeout(timeout, filename):
while True:
try:
fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
break
except IOError as err:
if err.errno != errno.EAGAIN:
raise
sleep(0.01)
yield file_obj
finally:
while True:
fd = os.open(filename, flags)
file_obj = os.fdopen(fd, mode)
try:
with swift.common.exceptions.LockTimeout(timeout, filename):
while True:
try:
fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
break
except IOError as err:
if err.errno != errno.EAGAIN:
raise
sleep(0.01)
try:
if os.stat(filename).st_ino != os.fstat(fd).st_ino:
continue
except OSError as err:
if err.errno == errno.ENOENT:
continue
raise
yield file_obj
if unlink:
os.unlink(filename)
break
finally:
file_obj.close()
except UnboundLocalError:
pass # may have not actually opened the file
if unlink:
os.unlink(filename)
def lock_parent_directory(filename, timeout=10):

View File

@ -16,6 +16,7 @@
"""WSGI tools for use with swift."""
import errno
import inspect
import os
import signal
import time
@ -386,7 +387,14 @@ def run_server(conf, logger, sock, global_conf=None):
max_clients = int(conf.get('max_clients', '1024'))
pool = RestrictedGreenPool(size=max_clients)
try:
wsgi.server(sock, app, NullLogger(), custom_pool=pool)
# Disable capitalizing headers in Eventlet if possible. This is
# necessary for the AWS SDK to work with swift3 middleware.
argspec = inspect.getargspec(wsgi.server)
if 'capitalize_response_headers' in argspec.args:
wsgi.server(sock, app, NullLogger(), custom_pool=pool,
capitalize_response_headers=False)
else:
wsgi.server(sock, app, NullLogger(), custom_pool=pool)
except socket.error as err:
if err[0] != errno.EINVAL:
raise

View File

@ -328,7 +328,7 @@ class ContainerBroker(DatabaseBroker):
:param content_type: object content-type
:param etag: object etag
:param deleted: if True, marks the object as deleted and sets the
deteleted_at timestamp to timestamp
deleted_at timestamp to timestamp
:param storage_policy_index: the storage policy index for the object
"""
record = {'name': name, 'created_at': timestamp, 'size': size,
@ -582,7 +582,7 @@ class ContainerBroker(DatabaseBroker):
:param end_marker: end marker query
:param prefix: prefix query
:param delimiter: delimiter for query
:param path: if defined, will set the prefix and delimter based on
:param path: if defined, will set the prefix and delimiter based on
the path
:returns: list of tuples of (name, created_at, size, content_type,
@ -679,7 +679,7 @@ class ContainerBroker(DatabaseBroker):
break
elif end > 0:
marker = name[:end] + chr(ord(delimiter) + 1)
# we want result to be inclusinve of delim+1
# we want result to be inclusive of delim+1
delim_force_gte = True
dir_name = name[:end + 1]
if dir_name != orig_marker:
@ -696,11 +696,13 @@ class ContainerBroker(DatabaseBroker):
Merge items into the object table.
:param item_list: list of dictionaries of {'name', 'created_at',
'size', 'content_type', 'etag', 'deleted'}
'size', 'content_type', 'etag', 'deleted',
'storage_policy_index'}
:param source: if defined, update incoming_sync with the source
"""
def _really_merge_items(conn):
max_rowid = -1
curs = conn.cursor()
for rec in item_list:
rec.setdefault('storage_policy_index', 0) # legacy
query = '''
@ -710,7 +712,7 @@ class ContainerBroker(DatabaseBroker):
'''
if self.get_db_version(conn) >= 1:
query += ' AND deleted IN (0, 1)'
conn.execute(query, (rec['name'], rec['created_at'],
curs.execute(query, (rec['name'], rec['created_at'],
rec['storage_policy_index']))
query = '''
SELECT 1 FROM object WHERE name = ?
@ -718,9 +720,9 @@ class ContainerBroker(DatabaseBroker):
'''
if self.get_db_version(conn) >= 1:
query += ' AND deleted IN (0, 1)'
if not conn.execute(query, (
if not curs.execute(query, (
rec['name'], rec['storage_policy_index'])).fetchall():
conn.execute('''
curs.execute('''
INSERT INTO object (name, created_at, size,
content_type, etag, deleted, storage_policy_index)
VALUES (?, ?, ?, ?, ?, ?, ?)

View File

@ -49,6 +49,7 @@ from eventlet import Timeout
from swift import gettext_ as _
from swift.common.constraints import check_mount
from swift.common.request_helpers import is_sys_meta
from swift.common.utils import mkdirs, Timestamp, \
storage_directory, hash_path, renamer, fallocate, fsync, \
fdatasync, drop_buffer_cache, ThreadPool, lock_path, write_pickle, \
@ -1315,7 +1316,8 @@ class DiskFile(object):
self._metadata = self._failsafe_read_metadata(meta_file, meta_file)
sys_metadata = dict(
[(key, val) for key, val in datafile_metadata.iteritems()
if key.lower() in DATAFILE_SYSTEM_META])
if key.lower() in DATAFILE_SYSTEM_META
or is_sys_meta('object', key)])
self._metadata.update(sys_metadata)
else:
self._metadata = datafile_metadata

View File

@ -118,7 +118,7 @@ class ObjectExpirer(Daemon):
obj = o['name'].encode('utf8')
if processes > 0:
obj_process = int(
hashlib.md5('%s/%s' % (container, obj)).
hashlib.md5('%s/%s' % (str(container), obj)).
hexdigest(), 16)
if obj_process % processes != process:
continue

View File

@ -70,7 +70,7 @@ class ObjectController(server.ObjectController):
:param objdevice: device name that the object is in
:param policy_idx: the associated storage policy index
"""
headers_out['user-agent'] = 'obj-server %s' % os.getpid()
headers_out['user-agent'] = 'object-server %s' % os.getpid()
full_path = '/%s/%s/%s' % (account, container, obj)
if all([host, partition, contdevice]):
try:

View File

@ -86,7 +86,7 @@ class ObjectReplicator(Daemon):
self.disk_chunk_size = int(conf.get('disk_chunk_size', 65536))
self.headers = {
'Content-Length': '0',
'user-agent': 'obj-replicator %s' % os.getpid()}
'user-agent': 'object-replicator %s' % os.getpid()}
self.rsync_error_log_line_length = \
int(conf.get('rsync_error_log_line_length', 0))
self.handoffs_first = config_true_value(conf.get('handoffs_first',

View File

@ -38,7 +38,8 @@ from swift.common.exceptions import ConnectionTimeout, DiskFileQuarantined, \
DiskFileDeviceUnavailable, DiskFileExpired, ChunkReadTimeout
from swift.obj import ssync_receiver
from swift.common.http import is_success
from swift.common.request_helpers import get_name_and_placement, is_user_meta
from swift.common.request_helpers import get_name_and_placement, \
is_user_meta, is_sys_or_user_meta
from swift.common.swob import HTTPAccepted, HTTPBadRequest, HTTPCreated, \
HTTPInternalServerError, HTTPNoContent, HTTPNotFound, \
HTTPPreconditionFailed, HTTPRequestTimeout, HTTPUnprocessableEntity, \
@ -169,7 +170,7 @@ class ObjectController(object):
:param objdevice: device name that the object is in
:param policy_index: the associated storage policy index
"""
headers_out['user-agent'] = 'obj-server %s' % os.getpid()
headers_out['user-agent'] = 'object-server %s' % os.getpid()
full_path = '/%s/%s/%s' % (account, container, obj)
if all([host, partition, contdevice]):
try:
@ -342,7 +343,9 @@ class ObjectController(object):
return HTTPNotFound(request=request)
orig_timestamp = Timestamp(orig_metadata.get('X-Timestamp', 0))
if orig_timestamp >= req_timestamp:
return HTTPConflict(request=request)
return HTTPConflict(
request=request,
headers={'X-Backend-Timestamp': orig_timestamp.internal})
metadata = {'X-Timestamp': req_timestamp.internal}
metadata.update(val for val in request.headers.iteritems()
if is_user_meta('object', val[0]))
@ -402,8 +405,10 @@ class ObjectController(object):
return HTTPPreconditionFailed(request=request)
orig_timestamp = Timestamp(orig_metadata.get('X-Timestamp', 0))
if orig_timestamp and orig_timestamp >= req_timestamp:
return HTTPConflict(request=request)
if orig_timestamp >= req_timestamp:
return HTTPConflict(
request=request,
headers={'X-Backend-Timestamp': orig_timestamp.internal})
orig_delete_at = int(orig_metadata.get('X-Delete-At') or 0)
upload_expiration = time.time() + self.max_upload_time
etag = md5()
@ -445,7 +450,7 @@ class ObjectController(object):
'Content-Length': str(upload_size),
}
metadata.update(val for val in request.headers.iteritems()
if is_user_meta('object', val[0]))
if is_sys_or_user_meta('object', val[0]))
for header_key in (
request.headers.get('X-Backend-Replication-Headers') or
self.allowed_headers):
@ -503,7 +508,7 @@ class ObjectController(object):
response.headers['Content-Type'] = metadata.get(
'Content-Type', 'application/octet-stream')
for key, value in metadata.iteritems():
if is_user_meta('object', key) or \
if is_sys_or_user_meta('object', key) or \
key.lower() in self.allowed_headers:
response.headers[key] = value
response.etag = metadata['ETag']
@ -549,7 +554,7 @@ class ObjectController(object):
response.headers['Content-Type'] = metadata.get(
'Content-Type', 'application/octet-stream')
for key, value in metadata.iteritems():
if is_user_meta('object', key) or \
if is_sys_or_user_meta('object', key) or \
key.lower() in self.allowed_headers:
response.headers[key] = value
response.etag = metadata['ETag']
@ -598,6 +603,7 @@ class ObjectController(object):
response_class = HTTPNoContent
else:
response_class = HTTPConflict
response_timestamp = max(orig_timestamp, req_timestamp)
orig_delete_at = int(orig_metadata.get('X-Delete-At') or 0)
try:
req_if_delete_at_val = request.headers['x-if-delete-at']
@ -631,7 +637,9 @@ class ObjectController(object):
'DELETE', account, container, obj, request,
HeaderKeyDict({'x-timestamp': req_timestamp.internal}),
device, policy_idx)
return response_class(request=request)
return response_class(
request=request,
headers={'X-Backend-Timestamp': response_timestamp.internal})
@public
@replication

View File

@ -258,7 +258,7 @@ class ObjectUpdater(Daemon):
:param headers: headers to send with the update
"""
headers_out = headers.copy()
headers_out['user-agent'] = 'obj-updater %s' % os.getpid()
headers_out['user-agent'] = 'object-updater %s' % os.getpid()
try:
with ConnectionTimeout(self.conn_timeout):
conn = http_connect(node['ip'], node['port'], node['device'],

View File

@ -20,7 +20,7 @@ import time
from swift.common.utils import public, csv_append, Timestamp
from swift.common.constraints import check_metadata
from swift.common import constraints
from swift.common.http import HTTP_ACCEPTED
from swift.common.http import HTTP_ACCEPTED, is_success
from swift.proxy.controllers.base import Controller, delay_denial, \
cors_validation, clear_info_cache
from swift.common.storage_policy import POLICIES
@ -144,10 +144,14 @@ class ContainerController(Controller):
if self.app.max_containers_per_account > 0 and \
container_count >= self.app.max_containers_per_account and \
self.account_name not in self.app.max_containers_whitelist:
resp = HTTPForbidden(request=req)
resp.body = 'Reached container limit of %s' % \
self.app.max_containers_per_account
return resp
container_info = \
self.container_info(self.account_name, self.container_name,
req)
if not is_success(container_info.get('status')):
resp = HTTPForbidden(request=req)
resp.body = 'Reached container limit of %s' % \
self.app.max_containers_per_account
return resp
container_partition, containers = self.app.container_ring.get_nodes(
self.account_name, self.container_name)
headers = self._backend_requests(req, len(containers),

View File

@ -56,7 +56,8 @@ from swift.common.swob import HTTPAccepted, HTTPBadRequest, HTTPNotFound, \
HTTPPreconditionFailed, HTTPRequestEntityTooLarge, HTTPRequestTimeout, \
HTTPServerError, HTTPServiceUnavailable, Request, \
HTTPClientDisconnect, HTTPNotImplemented
from swift.common.request_helpers import is_user_meta
from swift.common.request_helpers import is_sys_or_user_meta, is_sys_meta, \
remove_items, copy_header_subset
def copy_headers_into(from_r, to_r):
@ -67,7 +68,7 @@ def copy_headers_into(from_r, to_r):
"""
pass_headers = ['x-delete-at']
for k, v in from_r.headers.items():
if is_user_meta('object', k) or k.lower() in pass_headers:
if is_sys_or_user_meta('object', k) or k.lower() in pass_headers:
to_r.headers[k] = v
@ -624,8 +625,14 @@ class ObjectController(Controller):
if not content_type_manually_set:
sink_req.headers['Content-Type'] = \
source_resp.headers['Content-Type']
if not config_true_value(
if config_true_value(
sink_req.headers.get('x-fresh-metadata', 'false')):
# post-as-copy: ignore new sysmeta, copy existing sysmeta
condition = lambda k: is_sys_meta('object', k)
remove_items(sink_req.headers, condition)
copy_header_subset(source_resp, sink_req, condition)
else:
# copy/update existing sysmeta and user meta
copy_headers_into(source_resp, sink_req)
copy_headers_into(req, sink_req)
# copy over x-static-large-object for POSTs and manifest copies

View File

@ -490,6 +490,7 @@ class Application(object):
handoff_nodes = node_iter
nodes_left = self.request_node_count(len(primary_nodes))
log_handoffs_threshold = nodes_left - len(primary_nodes)
for node in primary_nodes:
if not self.error_limited(node):
yield node
@ -501,11 +502,11 @@ class Application(object):
for node in handoff_nodes:
if not self.error_limited(node):
handoffs += 1
if self.log_handoffs:
if self.log_handoffs and handoffs > log_handoffs_threshold:
self.logger.increment('handoff_count')
self.logger.warning(
'Handoff requested (%d)' % handoffs)
if handoffs == len(primary_nodes):
if handoffs - log_handoffs_threshold == len(primary_nodes):
self.logger.increment('handoff_all_count')
yield node
if not self.error_limited(node):

View File

@ -32,7 +32,8 @@ from shutil import rmtree
from tempfile import mkdtemp
from test import get_config
from test.functional.swift_test_client import Connection, ResponseError
from test.functional.swift_test_client import Account, Connection, \
ResponseError
# This has the side effect of mocking out the xattr module so that unit tests
# (and in this case, when in-process functional tests are called for) can run
# on file systems that don't support extended attributes.
@ -513,6 +514,12 @@ def teardown_package():
global orig_collate
locale.setlocale(locale.LC_COLLATE, orig_collate)
# clean up containers and objects left behind after running tests
conn = Connection(config)
conn.authenticate()
account = Account(conn, config.get('account', config['username']))
account.delete_containers()
global in_process
if in_process:
try:

206
test/probe/brain.py Normal file
View File

@ -0,0 +1,206 @@
#!/usr/bin/python -u
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
import itertools
import uuid
from optparse import OptionParser
from urlparse import urlparse
import random
from swift.common.manager import Manager
from swift.common import utils, ring
from swift.common.storage_policy import POLICIES
from swift.common.http import HTTP_NOT_FOUND
from swiftclient import client, get_auth, ClientException
TIMEOUT = 60
def meta_command(name, bases, attrs):
"""
Look for attrs with a truthy attribute __command__ and add them to an
attribute __commands__ on the type that maps names to decorated methods.
The decorated methods' doc strings also get mapped in __docs__.
Also adds a method run(command_name, *args, **kwargs) that will
execute the method mapped to the name in __commands__.
"""
commands = {}
docs = {}
for attr, value in attrs.items():
if getattr(value, '__command__', False):
commands[attr] = value
# methods have always have a __doc__ attribute, sometimes empty
docs[attr] = (getattr(value, '__doc__', None) or
'perform the %s command' % attr).strip()
attrs['__commands__'] = commands
attrs['__docs__'] = docs
def run(self, command, *args, **kwargs):
return self.__commands__[command](self, *args, **kwargs)
attrs.setdefault('run', run)
return type(name, bases, attrs)
def command(f):
f.__command__ = True
return f
class BrainSplitter(object):
__metaclass__ = meta_command
def __init__(self, url, token, container_name='test', object_name='test',
server_type='container'):
self.url = url
self.token = token
self.account = utils.split_path(urlparse(url).path, 2, 2)[1]
self.container_name = container_name
self.object_name = object_name
server_list = ['%s-server' % server_type] if server_type else ['all']
self.servers = Manager(server_list)
policies = list(POLICIES)
random.shuffle(policies)
self.policies = itertools.cycle(policies)
o = object_name if server_type == 'object' else None
c = container_name if server_type in ('object', 'container') else None
part, nodes = ring.Ring(
'/etc/swift/%s.ring.gz' % server_type).get_nodes(
self.account, c, o)
node_ids = [n['id'] for n in nodes]
if all(n_id in node_ids for n_id in (0, 1)):
self.primary_numbers = (1, 2)
self.handoff_numbers = (3, 4)
else:
self.primary_numbers = (3, 4)
self.handoff_numbers = (1, 2)
@command
def start_primary_half(self):
"""
start servers 1 & 2
"""
tuple(self.servers.start(number=n) for n in self.primary_numbers)
@command
def stop_primary_half(self):
"""
stop servers 1 & 2
"""
tuple(self.servers.stop(number=n) for n in self.primary_numbers)
@command
def start_handoff_half(self):
"""
start servers 3 & 4
"""
tuple(self.servers.start(number=n) for n in self.handoff_numbers)
@command
def stop_handoff_half(self):
"""
stop servers 3 & 4
"""
tuple(self.servers.stop(number=n) for n in self.handoff_numbers)
@command
def put_container(self, policy_index=None):
"""
put container with next storage policy
"""
policy = self.policies.next()
if policy_index is not None:
policy = POLICIES.get_by_index(int(policy_index))
if not policy:
raise ValueError('Unknown policy with index %s' % policy)
headers = {'X-Storage-Policy': policy.name}
client.put_container(self.url, self.token, self.container_name,
headers=headers)
@command
def delete_container(self):
"""
delete container
"""
client.delete_container(self.url, self.token, self.container_name)
@command
def put_object(self, headers=None):
"""
issue put for zero byte test object
"""
client.put_object(self.url, self.token, self.container_name,
self.object_name, headers=headers)
@command
def delete_object(self):
"""
issue delete for test object
"""
try:
client.delete_object(self.url, self.token, self.container_name,
self.object_name)
except ClientException as err:
if err.http_status != HTTP_NOT_FOUND:
raise
parser = OptionParser('%prog [options] '
'<command>[:<args>[,<args>...]] [<command>...]')
parser.usage += '\n\nCommands:\n\t' + \
'\n\t'.join("%s - %s" % (name, doc) for name, doc in
BrainSplitter.__docs__.items())
parser.add_option('-c', '--container', default='container-%s' % uuid.uuid4(),
help='set container name')
parser.add_option('-o', '--object', default='object-%s' % uuid.uuid4(),
help='set object name')
parser.add_option('-s', '--server_type', default='container',
help='set server type')
def main():
options, commands = parser.parse_args()
if not commands:
parser.print_help()
return 'ERROR: must specify at least one command'
for cmd_args in commands:
cmd = cmd_args.split(':', 1)[0]
if cmd not in BrainSplitter.__commands__:
parser.print_help()
return 'ERROR: unknown command %s' % cmd
url, token = get_auth('http://127.0.0.1:8080/auth/v1.0',
'test:tester', 'testing')
brain = BrainSplitter(url, token, options.container, options.object,
options.server_type)
for cmd_args in commands:
parts = cmd_args.split(':', 1)
command = parts[0]
if len(parts) > 1:
args = utils.list_from_csv(parts[1])
else:
args = ()
try:
brain.run(command, *args)
except ClientException as e:
print '**WARNING**: %s raised %s' % (command, e)
print 'STATUS'.join(['*' * 25] * 2)
brain.servers.status()
sys.exit()
if __name__ == "__main__":
sys.exit(main())

View File

@ -13,166 +13,26 @@
# limitations under the License.
from hashlib import md5
import sys
import itertools
import time
import unittest
import uuid
from optparse import OptionParser
from urlparse import urlparse
import random
from nose import SkipTest
from swift.common.manager import Manager
from swift.common.internal_client import InternalClient
from swift.common import utils, direct_client, ring
from swift.common import utils, direct_client
from swift.common.storage_policy import POLICIES
from swift.common.http import HTTP_NOT_FOUND
from test.probe.brain import BrainSplitter
from test.probe.common import reset_environment, get_to_final_state
from swiftclient import client, get_auth, ClientException
from swiftclient import client, ClientException
TIMEOUT = 60
def meta_command(name, bases, attrs):
"""
Look for attrs with a truthy attribute __command__ and add them to an
attribute __commands__ on the type that maps names to decorated methods.
The decorated methods' doc strings also get mapped in __docs__.
Also adds a method run(command_name, *args, **kwargs) that will
execute the method mapped to the name in __commands__.
"""
commands = {}
docs = {}
for attr, value in attrs.items():
if getattr(value, '__command__', False):
commands[attr] = value
# methods have always have a __doc__ attribute, sometimes empty
docs[attr] = (getattr(value, '__doc__', None) or
'perform the %s command' % attr).strip()
attrs['__commands__'] = commands
attrs['__docs__'] = docs
def run(self, command, *args, **kwargs):
return self.__commands__[command](self, *args, **kwargs)
attrs.setdefault('run', run)
return type(name, bases, attrs)
def command(f):
f.__command__ = True
return f
class BrainSplitter(object):
__metaclass__ = meta_command
def __init__(self, url, token, container_name='test', object_name='test'):
self.url = url
self.token = token
self.account = utils.split_path(urlparse(url).path, 2, 2)[1]
self.container_name = container_name
self.object_name = object_name
self.servers = Manager(['container-server'])
policies = list(POLICIES)
random.shuffle(policies)
self.policies = itertools.cycle(policies)
container_part, container_nodes = ring.Ring(
'/etc/swift/container.ring.gz').get_nodes(
self.account, self.container_name)
container_node_ids = [n['id'] for n in container_nodes]
if all(n_id in container_node_ids for n_id in (0, 1)):
self.primary_numbers = (1, 2)
self.handoff_numbers = (3, 4)
else:
self.primary_numbers = (3, 4)
self.handoff_numbers = (1, 2)
@command
def start_primary_half(self):
"""
start container servers 1 & 2
"""
tuple(self.servers.start(number=n) for n in self.primary_numbers)
@command
def stop_primary_half(self):
"""
stop container servers 1 & 2
"""
tuple(self.servers.stop(number=n) for n in self.primary_numbers)
@command
def start_handoff_half(self):
"""
start container servers 3 & 4
"""
tuple(self.servers.start(number=n) for n in self.handoff_numbers)
@command
def stop_handoff_half(self):
"""
stop container servers 3 & 4
"""
tuple(self.servers.stop(number=n) for n in self.handoff_numbers)
@command
def put_container(self, policy_index=None):
"""
put container with next storage policy
"""
policy = self.policies.next()
if policy_index is not None:
policy = POLICIES.get_by_index(int(policy_index))
if not policy:
raise ValueError('Unknown policy with index %s' % policy)
headers = {'X-Storage-Policy': policy.name}
client.put_container(self.url, self.token, self.container_name,
headers=headers)
@command
def delete_container(self):
"""
delete container
"""
client.delete_container(self.url, self.token, self.container_name)
@command
def put_object(self, headers=None):
"""
issue put for zero byte test object
"""
client.put_object(self.url, self.token, self.container_name,
self.object_name, headers=headers)
@command
def delete_object(self):
"""
issue delete for test object
"""
try:
client.delete_object(self.url, self.token, self.container_name,
self.object_name)
except ClientException as err:
if err.http_status != HTTP_NOT_FOUND:
raise
parser = OptionParser('%prog split-brain [options] '
'<command>[:<args>[,<args>...]] [<command>...]')
parser.usage += '\n\nCommands:\n\t' + \
'\n\t'.join("%s - %s" % (name, doc) for name, doc in
BrainSplitter.__docs__.items())
parser.add_option('-c', '--container', default='container-%s' % uuid.uuid4(),
help='set container name')
parser.add_option('-o', '--object', default='object-%s' % uuid.uuid4(),
help='set object name')
class TestContainerMergePolicyIndex(unittest.TestCase):
def setUp(self):
@ -184,7 +44,7 @@ class TestContainerMergePolicyIndex(unittest.TestCase):
self.container_name = 'container-%s' % uuid.uuid4()
self.object_name = 'object-%s' % uuid.uuid4()
self.brain = BrainSplitter(self.url, self.token, self.container_name,
self.object_name)
self.object_name, 'container')
def test_merge_storage_policy_index(self):
# generic split brain
@ -594,37 +454,5 @@ class TestContainerMergePolicyIndex(unittest.TestCase):
self.fail('Found unexpected object %r in the queue' % obj)
def main():
options, commands = parser.parse_args()
commands.remove('split-brain')
if not commands:
parser.print_help()
return 'ERROR: must specify at least one command'
for cmd_args in commands:
cmd = cmd_args.split(':', 1)[0]
if cmd not in BrainSplitter.__commands__:
parser.print_help()
return 'ERROR: unknown command %s' % cmd
url, token = get_auth('http://127.0.0.1:8080/auth/v1.0',
'test:tester', 'testing')
brain = BrainSplitter(url, token, options.container, options.object)
for cmd_args in commands:
parts = cmd_args.split(':', 1)
command = parts[0]
if len(parts) > 1:
args = utils.list_from_csv(parts[1])
else:
args = ()
try:
brain.run(command, *args)
except ClientException as e:
print '**WARNING**: %s raised %s' % (command, e)
print 'STATUS'.join(['*' * 25] * 2)
brain.servers.status()
sys.exit()
if __name__ == "__main__":
if any('split-brain' in arg for arg in sys.argv):
sys.exit(main())
unittest.main()

View File

@ -0,0 +1,186 @@
#!/usr/bin/python -u
# Copyright (c) 2010-2012 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from io import StringIO
from tempfile import mkdtemp
from textwrap import dedent
import os
import shutil
import unittest
import uuid
from swift.common import internal_client
from test.probe.brain import BrainSplitter
from test.probe.common import kill_servers, reset_environment, \
get_to_final_state
class Test(unittest.TestCase):
def setUp(self):
"""
Reset all environment and start all servers.
"""
(self.pids, self.port2server, self.account_ring, self.container_ring,
self.object_ring, self.policy, self.url, self.token,
self.account, self.configs) = reset_environment()
self.container_name = 'container-%s' % uuid.uuid4()
self.object_name = 'object-%s' % uuid.uuid4()
self.brain = BrainSplitter(self.url, self.token, self.container_name,
self.object_name, 'object')
self.tempdir = mkdtemp()
conf_path = os.path.join(self.tempdir, 'internal_client.conf')
conf_body = """
[DEFAULT]
swift_dir = /etc/swift
[pipeline:main]
pipeline = catch_errors cache proxy-server
[app:proxy-server]
use = egg:swift#proxy
object_post_as_copy = false
[filter:cache]
use = egg:swift#memcache
[filter:catch_errors]
use = egg:swift#catch_errors
"""
with open(conf_path, 'w') as f:
f.write(dedent(conf_body))
self.int_client = internal_client.InternalClient(conf_path, 'test', 1)
def tearDown(self):
"""
Stop all servers.
"""
kill_servers(self.port2server, self.pids)
shutil.rmtree(self.tempdir)
def _put_object(self, headers=None):
headers = headers or {}
self.int_client.upload_object(StringIO(u'stuff'), self.account,
self.container_name,
self.object_name, headers)
def _post_object(self, headers):
self.int_client.set_object_metadata(self.account, self.container_name,
self.object_name, headers)
def _get_object_metadata(self):
return self.int_client.get_object_metadata(self.account,
self.container_name,
self.object_name)
def test_sysmeta_after_replication_with_subsequent_post(self):
sysmeta = {'x-object-sysmeta-foo': 'sysmeta-foo'}
usermeta = {'x-object-meta-bar': 'meta-bar'}
self.brain.put_container(policy_index=0)
# put object
self._put_object()
# put newer object with sysmeta to first server subset
self.brain.stop_primary_half()
self._put_object(headers=sysmeta)
metadata = self._get_object_metadata()
for key in sysmeta:
self.assertTrue(key in metadata)
self.assertEqual(metadata[key], sysmeta[key])
self.brain.start_primary_half()
# post some user meta to second server subset
self.brain.stop_handoff_half()
self._post_object(usermeta)
metadata = self._get_object_metadata()
for key in usermeta:
self.assertTrue(key in metadata)
self.assertEqual(metadata[key], usermeta[key])
for key in sysmeta:
self.assertFalse(key in metadata)
self.brain.start_handoff_half()
# run replicator
get_to_final_state()
# check user metadata has been replicated to first server subset
# and sysmeta is unchanged
self.brain.stop_primary_half()
metadata = self._get_object_metadata()
expected = dict(sysmeta)
expected.update(usermeta)
for key in expected.keys():
self.assertTrue(key in metadata, key)
self.assertEqual(metadata[key], expected[key])
self.brain.start_primary_half()
# check user metadata and sysmeta both on second server subset
self.brain.stop_handoff_half()
metadata = self._get_object_metadata()
for key in expected.keys():
self.assertTrue(key in metadata, key)
self.assertEqual(metadata[key], expected[key])
def test_sysmeta_after_replication_with_prior_post(self):
sysmeta = {'x-object-sysmeta-foo': 'sysmeta-foo'}
usermeta = {'x-object-meta-bar': 'meta-bar'}
self.brain.put_container(policy_index=0)
# put object
self._put_object()
# put user meta to first server subset
self.brain.stop_handoff_half()
self._post_object(headers=usermeta)
metadata = self._get_object_metadata()
for key in usermeta:
self.assertTrue(key in metadata)
self.assertEqual(metadata[key], usermeta[key])
self.brain.start_handoff_half()
# put newer object with sysmeta to second server subset
self.brain.stop_primary_half()
self._put_object(headers=sysmeta)
metadata = self._get_object_metadata()
for key in sysmeta:
self.assertTrue(key in metadata)
self.assertEqual(metadata[key], sysmeta[key])
self.brain.start_primary_half()
# run replicator
get_to_final_state()
# check stale user metadata is not replicated to first server subset
# and sysmeta is unchanged
self.brain.stop_primary_half()
metadata = self._get_object_metadata()
for key in sysmeta:
self.assertTrue(key in metadata)
self.assertEqual(metadata[key], sysmeta[key])
for key in usermeta:
self.assertFalse(key in metadata)
self.brain.start_primary_half()
# check stale user metadata is removed from second server subset
# and sysmeta is replicated
self.brain.stop_handoff_half()
metadata = self._get_object_metadata()
for key in sysmeta:
self.assertTrue(key in metadata)
self.assertEqual(metadata[key], sysmeta[key])
for key in usermeta:
self.assertFalse(key in metadata)
if __name__ == "__main__":
unittest.main()

View File

@ -152,10 +152,7 @@ class FakeRing(Ring):
def clear_errors(self):
for dev in self.devs:
for key in ('errors', 'last_error'):
try:
del dev[key]
except KeyError:
pass
dev.pop(key, None)
def set_replicas(self, replicas):
self.replicas = replicas
@ -658,7 +655,10 @@ def fake_http_connect(*code_iter, **kwargs):
def getexpect(self):
if isinstance(self.expect_status, (Exception, Timeout)):
raise self.expect_status
return FakeConn(self.expect_status)
headers = {}
if self.expect_status == 409:
headers['X-Backend-Timestamp'] = self.timestamp
return FakeConn(self.expect_status, headers=headers)
def getheaders(self):
etag = self.etag
@ -671,6 +671,7 @@ def fake_http_connect(*code_iter, **kwargs):
headers = {'content-length': len(self.body),
'content-type': 'x-application/test',
'x-timestamp': self.timestamp,
'x-backend-timestamp': self.timestamp,
'last-modified': self.timestamp,
'x-object-meta-test': 'testing',
'x-delete-at': '9876543210',

View File

@ -1692,6 +1692,156 @@ class TestFormPost(unittest.TestCase):
self.assertEquals(exc_info, None)
self.assertTrue('FormPost: expired not an integer' in body)
def test_x_delete_at(self):
delete_at = int(time() + 100)
x_delete_body_part = [
'------WebKitFormBoundaryNcxTqxSlX7t4TDkR',
'Content-Disposition: form-data; name="x_delete_at"',
'',
str(delete_at),
]
key = 'abc'
sig, env, body = self._make_sig_env_body(
'/v1/AUTH_test/container', '', 1024, 10, int(time() + 86400), key)
env['wsgi.input'] = StringIO('\r\n'.join(x_delete_body_part + body))
env['swift.account/AUTH_test'] = self._fake_cache_env(
'AUTH_test', [key])
self.app = FakeApp(iter([('201 Created', {}, ''),
('201 Created', {}, '')]))
self.auth = tempauth.filter_factory({})(self.app)
self.formpost = formpost.filter_factory({})(self.auth)
status = [None]
headers = [None]
exc_info = [None]
def start_response(s, h, e=None):
status[0] = s
headers[0] = h
exc_info[0] = e
body = ''.join(self.formpost(env, start_response))
status = status[0]
headers = headers[0]
exc_info = exc_info[0]
self.assertEquals(status, '201 Created')
self.assertTrue('201 Created' in body)
self.assertEquals(len(self.app.requests), 2)
self.assertTrue("X-Delete-At" in self.app.requests[0].headers)
self.assertTrue("X-Delete-At" in self.app.requests[1].headers)
self.assertEquals(delete_at,
self.app.requests[0].headers["X-Delete-At"])
self.assertEquals(delete_at,
self.app.requests[1].headers["X-Delete-At"])
def test_x_delete_at_not_int(self):
delete_at = "2014-07-16"
x_delete_body_part = [
'------WebKitFormBoundaryNcxTqxSlX7t4TDkR',
'Content-Disposition: form-data; name="x_delete_at"',
'',
str(delete_at),
]
key = 'abc'
sig, env, body = self._make_sig_env_body(
'/v1/AUTH_test/container', '', 1024, 10, int(time() + 86400), key)
env['wsgi.input'] = StringIO('\r\n'.join(x_delete_body_part + body))
env['swift.account/AUTH_test'] = self._fake_cache_env(
'AUTH_test', [key])
self.app = FakeApp(iter([('201 Created', {}, ''),
('201 Created', {}, '')]))
self.auth = tempauth.filter_factory({})(self.app)
self.formpost = formpost.filter_factory({})(self.auth)
status = [None]
headers = [None]
exc_info = [None]
def start_response(s, h, e=None):
status[0] = s
headers[0] = h
exc_info[0] = e
body = ''.join(self.formpost(env, start_response))
status = status[0]
headers = headers[0]
exc_info = exc_info[0]
self.assertEquals(status, '400 Bad Request')
self.assertTrue('FormPost: x_delete_at not an integer' in body)
def test_x_delete_after(self):
delete_after = 100
x_delete_body_part = [
'------WebKitFormBoundaryNcxTqxSlX7t4TDkR',
'Content-Disposition: form-data; name="x_delete_after"',
'',
str(delete_after),
]
key = 'abc'
sig, env, body = self._make_sig_env_body(
'/v1/AUTH_test/container', '', 1024, 10, int(time() + 86400), key)
env['wsgi.input'] = StringIO('\r\n'.join(x_delete_body_part + body))
env['swift.account/AUTH_test'] = self._fake_cache_env(
'AUTH_test', [key])
self.app = FakeApp(iter([('201 Created', {}, ''),
('201 Created', {}, '')]))
self.auth = tempauth.filter_factory({})(self.app)
self.formpost = formpost.filter_factory({})(self.auth)
status = [None]
headers = [None]
exc_info = [None]
def start_response(s, h, e=None):
status[0] = s
headers[0] = h
exc_info[0] = e
body = ''.join(self.formpost(env, start_response))
status = status[0]
headers = headers[0]
exc_info = exc_info[0]
self.assertEquals(status, '201 Created')
self.assertTrue('201 Created' in body)
self.assertEquals(len(self.app.requests), 2)
self.assertTrue("X-Delete-After" in self.app.requests[0].headers)
self.assertTrue("X-Delete-After" in self.app.requests[1].headers)
self.assertEqual(delete_after,
self.app.requests[0].headers["X-Delete-After"])
self.assertEqual(delete_after,
self.app.requests[1].headers["X-Delete-After"])
def test_x_delete_after_not_int(self):
delete_after = "2 days"
x_delete_body_part = [
'------WebKitFormBoundaryNcxTqxSlX7t4TDkR',
'Content-Disposition: form-data; name="x_delete_after"',
'',
str(delete_after),
]
key = 'abc'
sig, env, body = self._make_sig_env_body(
'/v1/AUTH_test/container', '', 1024, 10, int(time() + 86400), key)
env['wsgi.input'] = StringIO('\r\n'.join(x_delete_body_part + body))
env['swift.account/AUTH_test'] = self._fake_cache_env(
'AUTH_test', [key])
self.app = FakeApp(iter([('201 Created', {}, ''),
('201 Created', {}, '')]))
self.auth = tempauth.filter_factory({})(self.app)
self.formpost = formpost.filter_factory({})(self.auth)
status = [None]
headers = [None]
exc_info = [None]
def start_response(s, h, e=None):
status[0] = s
headers[0] = h
exc_info[0] = e
body = ''.join(self.formpost(env, start_response))
status = status[0]
headers = headers[0]
exc_info = exc_info[0]
self.assertEquals(status, '400 Bad Request')
self.assertTrue('FormPost: x_delete_after not an integer' in body)
if __name__ == '__main__':
unittest.main()

View File

@ -110,10 +110,87 @@ class TestListEndpoints(unittest.TestCase):
info['storage_policy'] = self.policy_to_test
(version, account, container, unused) = \
split_path(env['PATH_INFO'], 3, 4, True)
self.assertEquals((version, account, container, unused),
self.expected_path)
self.assertEquals((version, account, container),
self.expected_path[:3])
return info
def test_parse_response_version(self):
expectations = {
'': 1.0, # legacy compat
'/1': 1.0,
'/v1': 1.0,
'/1.0': 1.0,
'/v1.0': 1.0,
'/2': 2.0,
'/v2': 2.0,
'/2.0': 2.0,
'/v2.0': 2.0,
}
accounts = (
'AUTH_test',
'test',
'verybadreseller_prefix'
'verybadaccount'
)
for expected_account in accounts:
for version, expected in expectations.items():
path = '/endpoints%s/%s/c/o' % (version, expected_account)
req = Request.blank(path)
version, account, container, obj = \
self.list_endpoints._parse_path(req)
try:
self.assertEqual(version, expected)
self.assertEqual(account, expected_account)
except AssertionError:
self.fail('Unexpected result from parse path %r: %r != %r'
% (path, (version, account),
(expected, expected_account)))
def test_parse_version_that_looks_like_account(self):
"""
Demonstrate the failure mode for versions that look like accounts,
if you can make _parse_path better and this is the *only* test that
fails you can delete it ;)
"""
bad_versions = (
'v_3',
'verybadreseller_prefix',
)
for bad_version in bad_versions:
req = Request.blank('/endpoints/%s/a/c/o' % bad_version)
version, account, container, obj = \
self.list_endpoints._parse_path(req)
self.assertEqual(version, 1.0)
self.assertEqual(account, bad_version)
self.assertEqual(container, 'a')
self.assertEqual(obj, 'c/o')
def test_parse_account_that_looks_like_version(self):
"""
Demonstrate the failure mode for accounts that looks like versions,
if you can make _parse_path better and this is the *only* test that
fails you can delete it ;)
"""
bad_accounts = (
'v3.0', 'verybaddaccountwithnoprefix',
)
for bad_account in bad_accounts:
req = Request.blank('/endpoints/%s/c/o' % bad_account)
self.assertRaises(ValueError,
self.list_endpoints._parse_path, req)
even_worse_accounts = {
'v1': 1.0,
'v2.0': 2.0,
}
for bad_account, guessed_version in even_worse_accounts.items():
req = Request.blank('/endpoints/%s/c/o' % bad_account)
version, account, container, obj = \
self.list_endpoints._parse_path(req)
self.assertEqual(version, guessed_version)
self.assertEqual(account, 'c')
self.assertEqual(container, 'o')
self.assertEqual(obj, None)
def test_get_object_ring(self):
self.assertEquals(isinstance(self.list_endpoints.get_object_ring(0),
ring.Ring), True)
@ -121,6 +198,38 @@ class TestListEndpoints(unittest.TestCase):
ring.Ring), True)
self.assertRaises(ValueError, self.list_endpoints.get_object_ring, 99)
def test_parse_path_no_version_specified(self):
req = Request.blank('/endpoints/a/c/o1')
version, account, container, obj = \
self.list_endpoints._parse_path(req)
self.assertEqual(account, 'a')
self.assertEqual(container, 'c')
self.assertEqual(obj, 'o1')
def test_parse_path_with_valid_version(self):
req = Request.blank('/endpoints/v2/a/c/o1')
version, account, container, obj = \
self.list_endpoints._parse_path(req)
self.assertEqual(version, 2.0)
self.assertEqual(account, 'a')
self.assertEqual(container, 'c')
self.assertEqual(obj, 'o1')
def test_parse_path_with_invalid_version(self):
req = Request.blank('/endpoints/v3/a/c/o1')
self.assertRaises(ValueError, self.list_endpoints._parse_path,
req)
def test_parse_path_with_no_account(self):
bad_paths = ('v1', 'v2', '')
for path in bad_paths:
req = Request.blank('/endpoints/%s' % path)
try:
self.list_endpoints._parse_path(req)
self.fail('Expected ValueError to be raised')
except ValueError as err:
self.assertEqual(str(err), 'No account specified')
def test_get_endpoint(self):
# Expected results for objects taken from test_ring
# Expected results for others computed by manually invoking
@ -134,7 +243,7 @@ class TestListEndpoints(unittest.TestCase):
"http://10.1.2.2:6000/sdd1/1/a/c/o1"
])
# test policies with default endpoint name
# test policies with no version endpoint name
expected = [[
"http://10.1.1.1:6000/sdb1/1/a/c/o1",
"http://10.1.2.2:6000/sdd1/1/a/c/o1"], [
@ -245,6 +354,82 @@ class TestListEndpoints(unittest.TestCase):
self.assertEquals(resp.content_type, 'application/json')
self.assertEquals(json.loads(resp.body), expected[pol.idx])
def test_v1_response(self):
req = Request.blank('/endpoints/v1/a/c/o1')
resp = req.get_response(self.list_endpoints)
expected = ["http://10.1.1.1:6000/sdb1/1/a/c/o1",
"http://10.1.2.2:6000/sdd1/1/a/c/o1"]
self.assertEqual(resp.body, json.dumps(expected))
def test_v2_obj_response(self):
req = Request.blank('/endpoints/v2/a/c/o1')
resp = req.get_response(self.list_endpoints)
expected = {
'endpoints': ["http://10.1.1.1:6000/sdb1/1/a/c/o1",
"http://10.1.2.2:6000/sdd1/1/a/c/o1"],
'headers': {'X-Backend-Storage-Policy-Index': "0"},
}
self.assertEqual(resp.body, json.dumps(expected))
for policy in POLICIES:
patch_path = 'swift.common.middleware.list_endpoints' \
'.get_container_info'
mock_get_container_info = lambda *args, **kwargs: \
{'storage_policy': int(policy)}
with mock.patch(patch_path, mock_get_container_info):
resp = req.get_response(self.list_endpoints)
part, nodes = policy.object_ring.get_nodes('a', 'c', 'o1')
[node.update({'part': part}) for node in nodes]
path = 'http://%(ip)s:%(port)s/%(device)s/%(part)s/a/c/o1'
expected = {
'headers': {
'X-Backend-Storage-Policy-Index': str(int(policy))},
'endpoints': [path % node for node in nodes],
}
self.assertEqual(resp.body, json.dumps(expected))
def test_v2_non_obj_response(self):
# account
req = Request.blank('/endpoints/v2/a')
resp = req.get_response(self.list_endpoints)
expected = {
'endpoints': ["http://10.1.2.1:6000/sdc1/0/a",
"http://10.1.1.1:6000/sda1/0/a",
"http://10.1.1.1:6000/sdb1/0/a"],
'headers': {},
}
# container
self.assertEqual(resp.body, json.dumps(expected))
req = Request.blank('/endpoints/v2/a/c')
resp = req.get_response(self.list_endpoints)
expected = {
'endpoints': ["http://10.1.2.2:6000/sdd1/0/a/c",
"http://10.1.1.1:6000/sda1/0/a/c",
"http://10.1.2.1:6000/sdc1/0/a/c"],
'headers': {},
}
self.assertEqual(resp.body, json.dumps(expected))
def test_version_account_response(self):
req = Request.blank('/endpoints/a')
resp = req.get_response(self.list_endpoints)
expected = ["http://10.1.2.1:6000/sdc1/0/a",
"http://10.1.1.1:6000/sda1/0/a",
"http://10.1.1.1:6000/sdb1/0/a"]
self.assertEqual(resp.body, json.dumps(expected))
req = Request.blank('/endpoints/v1.0/a')
resp = req.get_response(self.list_endpoints)
self.assertEqual(resp.body, json.dumps(expected))
req = Request.blank('/endpoints/v2/a')
resp = req.get_response(self.list_endpoints)
expected = {
'endpoints': ["http://10.1.2.1:6000/sdc1/0/a",
"http://10.1.1.1:6000/sda1/0/a",
"http://10.1.1.1:6000/sdb1/0/a"],
'headers': {},
}
self.assertEqual(resp.body, json.dumps(expected))
if __name__ == '__main__':
unittest.main()

View File

@ -451,6 +451,23 @@ class TestTempURL(unittest.TestCase):
self.assertEquals(req.environ['swift.authorize_override'], True)
self.assertEquals(req.environ['REMOTE_USER'], '.wsgi.tempurl')
def test_head_allowed_by_post(self):
method = 'POST'
expires = int(time() + 86400)
path = '/v1/a/c/o'
key = 'abc'
hmac_body = '%s\n%s\n%s' % (method, expires, path)
sig = hmac.new(key, hmac_body, sha1).hexdigest()
req = self._make_request(
path, keys=[key],
environ={'REQUEST_METHOD': 'HEAD',
'QUERY_STRING': 'temp_url_sig=%s&temp_url_expires=%s' % (
sig, expires)})
resp = req.get_response(self.tempurl)
self.assertEquals(resp.status_int, 404)
self.assertEquals(req.environ['swift.authorize_override'], True)
self.assertEquals(req.environ['REMOTE_USER'], '.wsgi.tempurl')
def test_head_otherwise_not_allowed(self):
method = 'PUT'
expires = int(time() + 86400)

View File

@ -149,7 +149,7 @@ class TestUtils(unittest.TestCase):
self.assertEquals((
'container.builder', 'container.ring.gz'
), parse_builder_ring_filename_args(args.split()))
# builer name arg should always fall through
# builder name arg should always fall through
args = 'swift-ring-builder test create'
self.assertEquals((
'test', 'test.ring.gz'

View File

@ -1211,6 +1211,7 @@ class TestSimpleClient(unittest.TestCase):
request.assert_called_with('http://127.0.0.1?format=json', data=None,
headers={'X-Auth-Token': 'token'})
self.assertEqual([None, None], retval)
self.assertEqual(sc.attempts, 2)
@mock.patch('eventlet.green.urllib2.urlopen')
def test_get_with_retries_param(self, mock_urlopen):

View File

@ -16,9 +16,10 @@
"""Tests for swift.common.request_helpers"""
import unittest
from swift.common.swob import Request
from swift.common.request_helpers import is_sys_meta, is_user_meta, \
is_sys_or_user_meta, strip_sys_meta_prefix, strip_user_meta_prefix, \
remove_items
remove_items, copy_header_subset
server_types = ['account', 'container', 'object']
@ -68,3 +69,15 @@ class TestRequestHelpers(unittest.TestCase):
rem = remove_items(src, test)
self.assertEquals(src, {'c': 'd'})
self.assertEquals(rem, {'a': 'b'})
def test_copy_header_subset(self):
src = {'a': 'b',
'c': 'd'}
from_req = Request.blank('/path', environ={}, headers=src)
to_req = Request.blank('/path', {})
test = lambda x: x.lower() == 'a'
copy_header_subset(from_req, to_req, test)
self.assertTrue('A' in to_req.headers)
self.assertEqual(to_req.headers['A'], 'b')
self.assertFalse('c' in to_req.headers)
self.assertFalse('C' in to_req.headers)

View File

@ -2351,6 +2351,77 @@ cluster_dfw1 = http://dfw1.host/v1/
self.assertRaises(OSError, os.remove, nt.name)
def test_lock_file_unlinked_after_open(self):
os_open = os.open
first_pass = [True]
def deleting_open(filename, flags):
# unlink the file after it's opened. once.
fd = os_open(filename, flags)
if first_pass[0]:
os.unlink(filename)
first_pass[0] = False
return fd
with NamedTemporaryFile(delete=False) as nt:
with mock.patch('os.open', deleting_open):
with utils.lock_file(nt.name, unlink=True) as f:
self.assertNotEqual(os.fstat(nt.fileno()).st_ino,
os.fstat(f.fileno()).st_ino)
first_pass = [True]
def recreating_open(filename, flags):
# unlink and recreate the file after it's opened
fd = os_open(filename, flags)
if first_pass[0]:
os.unlink(filename)
os.close(os_open(filename, os.O_CREAT | os.O_RDWR))
first_pass[0] = False
return fd
with NamedTemporaryFile(delete=False) as nt:
with mock.patch('os.open', recreating_open):
with utils.lock_file(nt.name, unlink=True) as f:
self.assertNotEqual(os.fstat(nt.fileno()).st_ino,
os.fstat(f.fileno()).st_ino)
def test_lock_file_held_on_unlink(self):
os_unlink = os.unlink
def flocking_unlink(filename):
# make sure the lock is held when we unlink
fd = os.open(filename, os.O_RDWR)
self.assertRaises(
IOError, fcntl.flock, fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
os.close(fd)
os_unlink(filename)
with NamedTemporaryFile(delete=False) as nt:
with mock.patch('os.unlink', flocking_unlink):
with utils.lock_file(nt.name, unlink=True):
pass
def test_lock_file_no_unlink_if_fail(self):
os_open = os.open
with NamedTemporaryFile(delete=True) as nt:
def lock_on_open(filename, flags):
# lock the file on another fd after it's opened.
fd = os_open(filename, flags)
fd2 = os_open(filename, flags)
fcntl.flock(fd2, fcntl.LOCK_EX | fcntl.LOCK_NB)
return fd
try:
timedout = False
with mock.patch('os.open', lock_on_open):
with utils.lock_file(nt.name, unlink=False, timeout=0.01):
pass
except LockTimeout:
timedout = True
self.assert_(timedout)
self.assert_(os.path.exists(nt.name))
def test_ismount_path_does_not_exist(self):
tmpdir = mkdtemp()
try:

View File

@ -333,10 +333,11 @@ class TestWSGI(unittest.TestCase):
'modify_wsgi_pipeline'):
with mock.patch('swift.common.wsgi.wsgi') as _wsgi:
with mock.patch('swift.common.wsgi.eventlet') as _eventlet:
conf = wsgi.appconfig(conf_file)
logger = logging.getLogger('test')
sock = listen(('localhost', 0))
wsgi.run_server(conf, logger, sock)
with mock.patch('swift.common.wsgi.inspect'):
conf = wsgi.appconfig(conf_file)
logger = logging.getLogger('test')
sock = listen(('localhost', 0))
wsgi.run_server(conf, logger, sock)
self.assertEquals('HTTP/1.0',
_wsgi.HttpProtocol.default_request_version)
self.assertEquals(30, _wsgi.WRITE_TIMEOUT)
@ -354,6 +355,43 @@ class TestWSGI(unittest.TestCase):
self.assert_('custom_pool' in kwargs)
self.assertEquals(1000, kwargs['custom_pool'].size)
def test_run_server_with_latest_eventlet(self):
config = """
[DEFAULT]
swift_dir = TEMPDIR
[pipeline:main]
pipeline = proxy-server
[app:proxy-server]
use = egg:swift#proxy
"""
def argspec_stub(server):
return mock.MagicMock(args=['capitalize_response_headers'])
contents = dedent(config)
with temptree(['proxy-server.conf']) as t:
conf_file = os.path.join(t, 'proxy-server.conf')
with open(conf_file, 'w') as f:
f.write(contents.replace('TEMPDIR', t))
_fake_rings(t)
with nested(
mock.patch('swift.proxy.server.Application.'
'modify_wsgi_pipeline'),
mock.patch('swift.common.wsgi.wsgi'),
mock.patch('swift.common.wsgi.eventlet'),
mock.patch('swift.common.wsgi.inspect',
getargspec=argspec_stub)) as (_, _wsgi, _, _):
conf = wsgi.appconfig(conf_file)
logger = logging.getLogger('test')
sock = listen(('localhost', 0))
wsgi.run_server(conf, logger, sock)
_wsgi.server.assert_called()
args, kwargs = _wsgi.server.call_args
self.assertEquals(kwargs.get('capitalize_response_headers'), False)
def test_run_server_conf_dir(self):
config_dir = {
'proxy-server.conf.d/pipeline.conf': """
@ -382,11 +420,12 @@ class TestWSGI(unittest.TestCase):
with mock.patch('swift.common.wsgi.wsgi') as _wsgi:
with mock.patch('swift.common.wsgi.eventlet') as _eventlet:
with mock.patch.dict('os.environ', {'TZ': ''}):
conf = wsgi.appconfig(conf_dir)
logger = logging.getLogger('test')
sock = listen(('localhost', 0))
wsgi.run_server(conf, logger, sock)
self.assert_(os.environ['TZ'] is not '')
with mock.patch('swift.common.wsgi.inspect'):
conf = wsgi.appconfig(conf_dir)
logger = logging.getLogger('test')
sock = listen(('localhost', 0))
wsgi.run_server(conf, logger, sock)
self.assert_(os.environ['TZ'] is not '')
self.assertEquals('HTTP/1.0',
_wsgi.HttpProtocol.default_request_version)

View File

@ -1080,6 +1080,25 @@ class TestDiskFile(unittest.TestCase):
# new fast-post updateable keys are added
self.assertEquals('Value2', df._metadata['X-Object-Meta-Key2'])
def test_disk_file_preserves_sysmeta(self):
# build an object with some meta (ts 41)
orig_metadata = {'X-Object-Sysmeta-Key1': 'Value1',
'Content-Type': 'text/garbage'}
df = self._get_open_disk_file(ts=41, extra_metadata=orig_metadata)
with df.open():
self.assertEquals('1024', df._metadata['Content-Length'])
# write some new metadata (fast POST, don't send orig meta, ts 42)
df = self._simple_get_diskfile()
df.write_metadata({'X-Timestamp': Timestamp(42).internal,
'X-Object-Sysmeta-Key1': 'Value2',
'X-Object-Meta-Key3': 'Value3'})
df = self._simple_get_diskfile()
with df.open():
# non-fast-post updateable keys are preserved
self.assertEquals('text/garbage', df._metadata['Content-Type'])
# original sysmeta keys are preserved
self.assertEquals('Value1', df._metadata['X-Object-Sysmeta-Key1'])
def test_disk_file_reader_iter(self):
df = self._create_test_file('1234567890')
quarantine_msgs = []

View File

@ -153,11 +153,12 @@ class TestObjectExpirer(TestCase):
def delete_container(*a, **kw):
pass
ukey = u'3'
containers = {
0: set('1-one 2-two 3-three'.split()),
1: set('2-two 3-three 4-four'.split()),
2: set('5-five 6-six'.split()),
3: set('7-seven'.split()),
ukey: set(u'7-seven\u2661'.split()),
}
x = ObjectExpirer({})
x.swift = InternalClient(containers)
@ -168,6 +169,8 @@ class TestObjectExpirer(TestCase):
x.run_once()
self.assertNotEqual(deleted_objects, x.deleted_objects)
deleted_objects = deepcopy(x.deleted_objects)
self.assertEqual(containers[ukey].pop(),
deleted_objects[ukey].pop().decode('utf8'))
self.assertEqual(containers, deleted_objects)
def test_delete_object(self):

View File

@ -689,7 +689,7 @@ class TestObjectReplicator(unittest.TestCase):
self.replicator.partition_times = []
self.headers = {'Content-Length': '0',
'user-agent': 'obj-replicator %s' % os.getpid()}
'user-agent': 'object-replicator %s' % os.getpid()}
self.replicator.logger = mock_logger = mock.MagicMock()
mock_tpool_reraise.return_value = (0, {})

View File

@ -30,6 +30,7 @@ from time import gmtime, strftime, time, struct_time
from tempfile import mkdtemp
from hashlib import md5
import itertools
import tempfile
from eventlet import sleep, spawn, wsgi, listen, Timeout, tpool
@ -39,7 +40,7 @@ from test.unit import FakeLogger, debug_logger, mocked_http_conn
from test.unit import connect_tcp, readuntil2crlfs, patch_policies
from swift.obj import server as object_server
from swift.obj import diskfile
from swift.common import utils, storage_policy
from swift.common import utils, storage_policy, bufferedhttp
from swift.common.utils import hash_path, mkdirs, normalize_timestamp, \
NullLogger, storage_directory, public, replication
from swift.common import constraints
@ -254,9 +255,9 @@ class TestObjectController(unittest.TestCase):
def test_POST_old_timestamp(self):
ts = time()
timestamp = normalize_timestamp(ts)
orig_timestamp = utils.Timestamp(ts).internal
req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
headers={'X-Timestamp': timestamp,
headers={'X-Timestamp': orig_timestamp,
'Content-Type': 'application/x-test',
'X-Object-Meta-1': 'One',
'X-Object-Meta-Two': 'Two'})
@ -267,13 +268,14 @@ class TestObjectController(unittest.TestCase):
# Same timestamp should result in 409
req = Request.blank('/sda1/p/a/c/o',
environ={'REQUEST_METHOD': 'POST'},
headers={'X-Timestamp': timestamp,
headers={'X-Timestamp': orig_timestamp,
'X-Object-Meta-3': 'Three',
'X-Object-Meta-4': 'Four',
'Content-Encoding': 'gzip',
'Content-Type': 'application/x-test'})
resp = req.get_response(self.object_controller)
self.assertEquals(resp.status_int, 409)
self.assertEqual(resp.headers['X-Backend-Timestamp'], orig_timestamp)
# Earlier timestamp should result in 409
timestamp = normalize_timestamp(ts - 1)
@ -286,6 +288,7 @@ class TestObjectController(unittest.TestCase):
'Content-Type': 'application/x-test'})
resp = req.get_response(self.object_controller)
self.assertEquals(resp.status_int, 409)
self.assertEqual(resp.headers['X-Backend-Timestamp'], orig_timestamp)
def test_POST_not_exist(self):
timestamp = normalize_timestamp(time())
@ -635,9 +638,10 @@ class TestObjectController(unittest.TestCase):
def test_PUT_old_timestamp(self):
ts = time()
orig_timestamp = utils.Timestamp(ts).internal
req = Request.blank(
'/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
headers={'X-Timestamp': normalize_timestamp(ts),
headers={'X-Timestamp': orig_timestamp,
'Content-Length': '6',
'Content-Type': 'application/octet-stream'})
req.body = 'VERIFY'
@ -651,6 +655,7 @@ class TestObjectController(unittest.TestCase):
req.body = 'VERIFY TWO'
resp = req.get_response(self.object_controller)
self.assertEquals(resp.status_int, 409)
self.assertEqual(resp.headers['X-Backend-Timestamp'], orig_timestamp)
req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
headers={
@ -660,6 +665,7 @@ class TestObjectController(unittest.TestCase):
req.body = 'VERIFY THREE'
resp = req.get_response(self.object_controller)
self.assertEquals(resp.status_int, 409)
self.assertEqual(resp.headers['X-Backend-Timestamp'], orig_timestamp)
def test_PUT_no_etag(self):
req = Request.blank(
@ -730,6 +736,181 @@ class TestObjectController(unittest.TestCase):
resp = req.get_response(self.object_controller)
self.assertEquals(resp.status_int, 408)
def test_PUT_system_metadata(self):
# check that sysmeta is stored in diskfile
timestamp = normalize_timestamp(time())
req = Request.blank(
'/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
headers={'X-Timestamp': timestamp,
'Content-Type': 'text/plain',
'ETag': '1000d172764c9dbc3a5798a67ec5bb76',
'X-Object-Meta-1': 'One',
'X-Object-Sysmeta-1': 'One',
'X-Object-Sysmeta-Two': 'Two'})
req.body = 'VERIFY SYSMETA'
resp = req.get_response(self.object_controller)
self.assertEquals(resp.status_int, 201)
objfile = os.path.join(
self.testdir, 'sda1',
storage_directory(diskfile.get_data_dir(0), 'p',
hash_path('a', 'c', 'o')),
timestamp + '.data')
self.assert_(os.path.isfile(objfile))
self.assertEquals(open(objfile).read(), 'VERIFY SYSMETA')
self.assertEquals(diskfile.read_metadata(objfile),
{'X-Timestamp': timestamp,
'Content-Length': '14',
'Content-Type': 'text/plain',
'ETag': '1000d172764c9dbc3a5798a67ec5bb76',
'name': '/a/c/o',
'X-Object-Meta-1': 'One',
'X-Object-Sysmeta-1': 'One',
'X-Object-Sysmeta-Two': 'Two'})
def test_POST_system_metadata(self):
# check that diskfile sysmeta is not changed by a POST
timestamp1 = normalize_timestamp(time())
req = Request.blank(
'/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
headers={'X-Timestamp': timestamp1,
'Content-Type': 'text/plain',
'ETag': '1000d172764c9dbc3a5798a67ec5bb76',
'X-Object-Meta-1': 'One',
'X-Object-Sysmeta-1': 'One',
'X-Object-Sysmeta-Two': 'Two'})
req.body = 'VERIFY SYSMETA'
resp = req.get_response(self.object_controller)
self.assertEquals(resp.status_int, 201)
timestamp2 = normalize_timestamp(time())
req = Request.blank(
'/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'POST'},
headers={'X-Timestamp': timestamp2,
'X-Object-Meta-1': 'Not One',
'X-Object-Sysmeta-1': 'Not One',
'X-Object-Sysmeta-Two': 'Not Two'})
resp = req.get_response(self.object_controller)
self.assertEquals(resp.status_int, 202)
# original .data file metadata should be unchanged
objfile = os.path.join(
self.testdir, 'sda1',
storage_directory(diskfile.get_data_dir(0), 'p',
hash_path('a', 'c', 'o')),
timestamp1 + '.data')
self.assert_(os.path.isfile(objfile))
self.assertEquals(open(objfile).read(), 'VERIFY SYSMETA')
self.assertEquals(diskfile.read_metadata(objfile),
{'X-Timestamp': timestamp1,
'Content-Length': '14',
'Content-Type': 'text/plain',
'ETag': '1000d172764c9dbc3a5798a67ec5bb76',
'name': '/a/c/o',
'X-Object-Meta-1': 'One',
'X-Object-Sysmeta-1': 'One',
'X-Object-Sysmeta-Two': 'Two'})
# .meta file metadata should have only user meta items
metafile = os.path.join(
self.testdir, 'sda1',
storage_directory(diskfile.get_data_dir(0), 'p',
hash_path('a', 'c', 'o')),
timestamp2 + '.meta')
self.assert_(os.path.isfile(metafile))
self.assertEquals(diskfile.read_metadata(metafile),
{'X-Timestamp': timestamp2,
'name': '/a/c/o',
'X-Object-Meta-1': 'Not One'})
def test_PUT_then_fetch_system_metadata(self):
timestamp = normalize_timestamp(time())
req = Request.blank(
'/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
headers={'X-Timestamp': timestamp,
'Content-Type': 'text/plain',
'ETag': '1000d172764c9dbc3a5798a67ec5bb76',
'X-Object-Meta-1': 'One',
'X-Object-Sysmeta-1': 'One',
'X-Object-Sysmeta-Two': 'Two'})
req.body = 'VERIFY SYSMETA'
resp = req.get_response(self.object_controller)
self.assertEquals(resp.status_int, 201)
def check_response(resp):
self.assertEquals(resp.status_int, 200)
self.assertEquals(resp.content_length, 14)
self.assertEquals(resp.content_type, 'text/plain')
self.assertEquals(resp.headers['content-type'], 'text/plain')
self.assertEquals(
resp.headers['last-modified'],
strftime('%a, %d %b %Y %H:%M:%S GMT',
gmtime(math.ceil(float(timestamp)))))
self.assertEquals(resp.headers['etag'],
'"1000d172764c9dbc3a5798a67ec5bb76"')
self.assertEquals(resp.headers['x-object-meta-1'], 'One')
self.assertEquals(resp.headers['x-object-sysmeta-1'], 'One')
self.assertEquals(resp.headers['x-object-sysmeta-two'], 'Two')
req = Request.blank('/sda1/p/a/c/o',
environ={'REQUEST_METHOD': 'HEAD'})
resp = req.get_response(self.object_controller)
check_response(resp)
req = Request.blank('/sda1/p/a/c/o',
environ={'REQUEST_METHOD': 'GET'})
resp = req.get_response(self.object_controller)
check_response(resp)
def test_PUT_then_POST_then_fetch_system_metadata(self):
timestamp = normalize_timestamp(time())
req = Request.blank(
'/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
headers={'X-Timestamp': timestamp,
'Content-Type': 'text/plain',
'ETag': '1000d172764c9dbc3a5798a67ec5bb76',
'X-Object-Meta-1': 'One',
'X-Object-Sysmeta-1': 'One',
'X-Object-Sysmeta-Two': 'Two'})
req.body = 'VERIFY SYSMETA'
resp = req.get_response(self.object_controller)
self.assertEquals(resp.status_int, 201)
timestamp2 = normalize_timestamp(time())
req = Request.blank(
'/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'POST'},
headers={'X-Timestamp': timestamp2,
'X-Object-Meta-1': 'Not One',
'X-Object-Sysmeta-1': 'Not One',
'X-Object-Sysmeta-Two': 'Not Two'})
resp = req.get_response(self.object_controller)
self.assertEquals(resp.status_int, 202)
def check_response(resp):
# user meta should be updated but not sysmeta
self.assertEquals(resp.status_int, 200)
self.assertEquals(resp.content_length, 14)
self.assertEquals(resp.content_type, 'text/plain')
self.assertEquals(resp.headers['content-type'], 'text/plain')
self.assertEquals(
resp.headers['last-modified'],
strftime('%a, %d %b %Y %H:%M:%S GMT',
gmtime(math.ceil(float(timestamp2)))))
self.assertEquals(resp.headers['etag'],
'"1000d172764c9dbc3a5798a67ec5bb76"')
self.assertEquals(resp.headers['x-object-meta-1'], 'Not One')
self.assertEquals(resp.headers['x-object-sysmeta-1'], 'One')
self.assertEquals(resp.headers['x-object-sysmeta-two'], 'Two')
req = Request.blank('/sda1/p/a/c/o',
environ={'REQUEST_METHOD': 'HEAD'})
resp = req.get_response(self.object_controller)
check_response(resp)
req = Request.blank('/sda1/p/a/c/o',
environ={'REQUEST_METHOD': 'GET'})
resp = req.get_response(self.object_controller)
check_response(resp)
def test_PUT_container_connection(self):
def mock_http_connect(response, with_exc=False):
@ -1604,10 +1785,10 @@ class TestObjectController(unittest.TestCase):
self.assertTrue(os.path.isfile(ts_1000_file))
self.assertEquals(len(os.listdir(os.path.dirname(ts_1000_file))), 1)
timestamp = normalize_timestamp(1002)
orig_timestamp = utils.Timestamp(1002).internal
req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
headers={
'X-Timestamp': timestamp,
'X-Timestamp': orig_timestamp,
'Content-Type': 'application/octet-stream',
'Content-Length': '4',
})
@ -1619,7 +1800,7 @@ class TestObjectController(unittest.TestCase):
self.testdir, 'sda1',
storage_directory(diskfile.get_data_dir(0), 'p',
hash_path('a', 'c', 'o')),
utils.Timestamp(timestamp).internal + '.data')
orig_timestamp + '.data')
self.assertTrue(os.path.isfile(data_1002_file))
self.assertEquals(len(os.listdir(os.path.dirname(data_1002_file))), 1)
@ -1630,6 +1811,7 @@ class TestObjectController(unittest.TestCase):
headers={'X-Timestamp': timestamp})
resp = req.get_response(self.object_controller)
self.assertEquals(resp.status_int, 409)
self.assertEqual(resp.headers['X-Backend-Timestamp'], orig_timestamp)
ts_1001_file = os.path.join(
self.testdir, 'sda1',
storage_directory(diskfile.get_data_dir(0), 'p',
@ -1658,10 +1840,10 @@ class TestObjectController(unittest.TestCase):
# updates, making sure container update is called in the correct
# state.
start = time()
timestamp = utils.Timestamp(start)
orig_timestamp = utils.Timestamp(start)
req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
headers={
'X-Timestamp': timestamp.internal,
'X-Timestamp': orig_timestamp.internal,
'Content-Type': 'application/octet-stream',
'Content-Length': '4',
})
@ -1685,6 +1867,8 @@ class TestObjectController(unittest.TestCase):
headers={'X-Timestamp': timestamp.internal})
resp = req.get_response(self.object_controller)
self.assertEquals(resp.status_int, 409)
self.assertEqual(resp.headers['x-backend-timestamp'],
orig_timestamp.internal)
objfile = os.path.join(
self.testdir, 'sda1',
storage_directory(diskfile.get_data_dir(0), 'p',
@ -2319,7 +2503,7 @@ class TestObjectController(unittest.TestCase):
given_args,
['127.0.0.1', '1234', 'sdc1', 1, 'PUT', '/a/c/o', {
'x-timestamp': '1', 'x-out': 'set',
'user-agent': 'obj-server %s' % os.getpid(),
'user-agent': 'object-server %s' % os.getpid(),
'X-Backend-Storage-Policy-Index': policy.idx}])
@patch_policies([storage_policy.StoragePolicy(0, 'zero', True),
@ -2399,7 +2583,7 @@ class TestObjectController(unittest.TestCase):
'x-timestamp': utils.Timestamp('12345').internal,
'X-Backend-Storage-Policy-Index': '37',
'referer': 'PUT http://localhost/sda1/p/a/c/o',
'user-agent': 'obj-server %d' % os.getpid(),
'user-agent': 'object-server %d' % os.getpid(),
'X-Backend-Storage-Policy-Index': policy.idx,
'x-trans-id': '-'})})
self.assertEquals(
@ -2417,7 +2601,7 @@ class TestObjectController(unittest.TestCase):
'x-size': '0',
'x-timestamp': utils.Timestamp('12345').internal,
'referer': 'PUT http://localhost/sda1/p/a/c/o',
'user-agent': 'obj-server %d' % os.getpid(),
'user-agent': 'object-server %d' % os.getpid(),
# system account storage policy is 0
'X-Backend-Storage-Policy-Index': 0,
'x-trans-id': '-'})})
@ -2436,7 +2620,7 @@ class TestObjectController(unittest.TestCase):
'x-size': '0',
'x-timestamp': utils.Timestamp('12345').internal,
'referer': 'PUT http://localhost/sda1/p/a/c/o',
'user-agent': 'obj-server %d' % os.getpid(),
'user-agent': 'object-server %d' % os.getpid(),
# system account storage policy is 0
'X-Backend-Storage-Policy-Index': 0,
'x-trans-id': '-'})})
@ -2507,7 +2691,7 @@ class TestObjectController(unittest.TestCase):
'x-timestamp': utils.Timestamp('12345').internal,
'X-Backend-Storage-Policy-Index': '26',
'referer': 'PUT http://localhost/sda1/p/a/c/o',
'user-agent': 'obj-server %d' % os.getpid(),
'user-agent': 'object-server %d' % os.getpid(),
'x-trans-id': '-'})})
self.assertEquals(
http_connect_args[1],
@ -2525,7 +2709,7 @@ class TestObjectController(unittest.TestCase):
'x-timestamp': utils.Timestamp('12345').internal,
'X-Backend-Storage-Policy-Index': '26',
'referer': 'PUT http://localhost/sda1/p/a/c/o',
'user-agent': 'obj-server %d' % os.getpid(),
'user-agent': 'object-server %d' % os.getpid(),
'x-trans-id': '-'})})
def test_object_delete_at_aysnc_update(self):
@ -2639,7 +2823,7 @@ class TestObjectController(unittest.TestCase):
'06fbf0b514e5199dfc4e00f42eb5ea83-%s' %
utils.Timestamp(1).internal))),
{'headers': {'x-timestamp': '1', 'x-out': 'set',
'user-agent': 'obj-server %s' % os.getpid(),
'user-agent': 'object-server %s' % os.getpid(),
'X-Backend-Storage-Policy-Index': policy.idx},
'account': 'a', 'container': 'c', 'obj': 'o', 'op': 'PUT'})
@ -2680,7 +2864,8 @@ class TestObjectController(unittest.TestCase):
'06fbf0b514e5199dfc4e00f42eb5ea83-%s' %
utils.Timestamp(1).internal))),
{'headers': {'x-timestamp': '1', 'x-out': str(status),
'user-agent': 'obj-server %s' % os.getpid(),
'user-agent':
'object-server %s' % os.getpid(),
'X-Backend-Storage-Policy-Index':
policy.idx},
'account': 'a', 'container': 'c', 'obj': 'o',
@ -2803,7 +2988,7 @@ class TestObjectController(unittest.TestCase):
self.assertEqual(method, 'PUT')
self.assertEqual(path, '/cdevice/cpartition/a/c/o')
self.assertEqual(headers, HeaderKeyDict({
'user-agent': 'obj-server %s' % os.getpid(),
'user-agent': 'object-server %s' % os.getpid(),
'x-size': '0',
'x-etag': 'd41d8cd98f00b204e9800998ecf8427e',
'x-content-type': 'text/plain',
@ -2844,7 +3029,7 @@ class TestObjectController(unittest.TestCase):
self.assertEqual(data, {
'headers': HeaderKeyDict({
'X-Size': '0',
'User-Agent': 'obj-server %s' % os.getpid(),
'User-Agent': 'object-server %s' % os.getpid(),
'X-Content-Type': 'text/plain',
'X-Timestamp': utils.Timestamp(1).internal,
'X-Trans-Id': '123',
@ -4106,5 +4291,81 @@ class TestObjectController(unittest.TestCase):
self.assertEquals(resp.status_int, 201)
self.assertTrue(os.path.isdir(object_dir))
class TestObjectServer(unittest.TestCase):
def setUp(self):
# dirs
self.tempdir = os.path.join(tempfile.mkdtemp(), 'tmp_test_obj_server')
self.devices = os.path.join(self.tempdir, 'srv/node')
for device in ('sda1', 'sdb1'):
os.makedirs(os.path.join(self.devices, device))
conf = {
'devices': self.devices,
'swift_dir': self.tempdir,
'mount_check': 'false',
}
self.logger = debug_logger('test-object-server')
app = object_server.ObjectController(conf, logger=self.logger)
sock = listen(('127.0.0.1', 0))
self.server = spawn(wsgi.server, sock, app, utils.NullLogger())
self.port = sock.getsockname()[1]
def test_not_found(self):
conn = bufferedhttp.http_connect('127.0.0.1', self.port, 'sda1', '0',
'GET', '/a/c/o')
resp = conn.getresponse()
self.assertEqual(resp.status, 404)
resp.read()
resp.close()
def test_expect_on_put(self):
test_body = 'test'
headers = {
'Expect': '100-continue',
'Content-Length': len(test_body),
'X-Timestamp': utils.Timestamp(time()).internal,
}
conn = bufferedhttp.http_connect('127.0.0.1', self.port, 'sda1', '0',
'PUT', '/a/c/o', headers=headers)
resp = conn.getexpect()
self.assertEqual(resp.status, 100)
conn.send(test_body)
resp = conn.getresponse()
self.assertEqual(resp.status, 201)
resp.read()
resp.close()
def test_expect_on_put_conflict(self):
test_body = 'test'
put_timestamp = utils.Timestamp(time())
headers = {
'Expect': '100-continue',
'Content-Length': len(test_body),
'X-Timestamp': put_timestamp.internal,
}
conn = bufferedhttp.http_connect('127.0.0.1', self.port, 'sda1', '0',
'PUT', '/a/c/o', headers=headers)
resp = conn.getexpect()
self.assertEqual(resp.status, 100)
conn.send(test_body)
resp = conn.getresponse()
self.assertEqual(resp.status, 201)
resp.read()
resp.close()
# and again with same timestamp
conn = bufferedhttp.http_connect('127.0.0.1', self.port, 'sda1', '0',
'PUT', '/a/c/o', headers=headers)
resp = conn.getexpect()
self.assertEqual(resp.status, 409)
headers = HeaderKeyDict(resp.getheaders())
self.assertEqual(headers['X-Backend-Timestamp'], put_timestamp)
resp.read()
resp.close()
if __name__ == '__main__':
unittest.main()

View File

@ -268,6 +268,9 @@ def save_globals():
None)
orig_account_info = getattr(swift.proxy.controllers.Controller,
'account_info', None)
orig_container_info = getattr(swift.proxy.controllers.Controller,
'container_info', None)
try:
yield True
finally:
@ -276,6 +279,7 @@ def save_globals():
swift.proxy.controllers.obj.http_connect = orig_http_connect
swift.proxy.controllers.account.http_connect = orig_http_connect
swift.proxy.controllers.container.http_connect = orig_http_connect
swift.proxy.controllers.Controller.container_info = orig_container_info
def set_http_connect(*args, **kwargs):
@ -2377,34 +2381,85 @@ class TestObjectController(unittest.TestCase):
collected_nodes.append(node)
self.assertEquals(len(collected_nodes), 9)
# zero error-limited primary nodes -> no handoff warnings
self.app.log_handoffs = True
self.app.logger = FakeLogger()
object_ring.max_more_nodes = 2
self.app.request_node_count = lambda r: 7
object_ring.max_more_nodes = 20
partition, nodes = object_ring.get_nodes('account',
'container',
'object')
collected_nodes = []
for node in self.app.iter_nodes(object_ring,
partition):
for node in self.app.iter_nodes(object_ring, partition):
collected_nodes.append(node)
self.assertEquals(len(collected_nodes), 5)
self.assertEquals(
self.app.logger.log_dict['warning'],
[(('Handoff requested (1)',), {}),
(('Handoff requested (2)',), {})])
self.app.log_handoffs = False
self.app.logger = FakeLogger()
object_ring.max_more_nodes = 2
partition, nodes = object_ring.get_nodes('account',
'container',
'object')
collected_nodes = []
for node in self.app.iter_nodes(object_ring,
partition):
collected_nodes.append(node)
self.assertEquals(len(collected_nodes), 5)
self.assertEquals(len(collected_nodes), 7)
self.assertEquals(self.app.logger.log_dict['warning'], [])
self.assertEquals(self.app.logger.get_increments(), [])
# one error-limited primary node -> one handoff warning
self.app.log_handoffs = True
self.app.logger = FakeLogger()
self.app.request_node_count = lambda r: 7
object_ring.clear_errors()
object_ring._devs[0]['errors'] = 999
object_ring._devs[0]['last_error'] = 2 ** 63 - 1
collected_nodes = []
for node in self.app.iter_nodes(object_ring, partition):
collected_nodes.append(node)
self.assertEquals(len(collected_nodes), 7)
self.assertEquals(self.app.logger.log_dict['warning'], [
(('Handoff requested (5)',), {})])
self.assertEquals(self.app.logger.get_increments(),
['handoff_count'])
# two error-limited primary nodes -> two handoff warnings
self.app.log_handoffs = True
self.app.logger = FakeLogger()
self.app.request_node_count = lambda r: 7
object_ring.clear_errors()
for i in range(2):
object_ring._devs[i]['errors'] = 999
object_ring._devs[i]['last_error'] = 2 ** 63 - 1
collected_nodes = []
for node in self.app.iter_nodes(object_ring, partition):
collected_nodes.append(node)
self.assertEquals(len(collected_nodes), 7)
self.assertEquals(self.app.logger.log_dict['warning'], [
(('Handoff requested (5)',), {}),
(('Handoff requested (6)',), {})])
self.assertEquals(self.app.logger.get_increments(),
['handoff_count',
'handoff_count'])
# all error-limited primary nodes -> four handoff warnings,
# plus a handoff-all metric
self.app.log_handoffs = True
self.app.logger = FakeLogger()
self.app.request_node_count = lambda r: 10
object_ring.set_replicas(4) # otherwise we run out of handoffs
object_ring.clear_errors()
for i in range(4):
object_ring._devs[i]['errors'] = 999
object_ring._devs[i]['last_error'] = 2 ** 63 - 1
collected_nodes = []
for node in self.app.iter_nodes(object_ring, partition):
collected_nodes.append(node)
self.assertEquals(len(collected_nodes), 10)
self.assertEquals(self.app.logger.log_dict['warning'], [
(('Handoff requested (7)',), {}),
(('Handoff requested (8)',), {}),
(('Handoff requested (9)',), {}),
(('Handoff requested (10)',), {})])
self.assertEquals(self.app.logger.get_increments(),
['handoff_count',
'handoff_count',
'handoff_count',
'handoff_count',
'handoff_all_count'])
finally:
object_ring.max_more_nodes = 0
@ -5178,7 +5233,14 @@ class TestContainerController(unittest.TestCase):
self.app.max_containers_per_account = 12345
controller = proxy_server.ContainerController(self.app, 'account',
'container')
self.assert_status_map(controller.PUT, (201, 201, 201), 403,
self.assert_status_map(controller.PUT,
(200, 200, 201, 201, 201), 201,
missing_container=True)
controller = proxy_server.ContainerController(self.app, 'account',
'container_new')
self.assert_status_map(controller.PUT, (200, 404, 404, 404), 403,
missing_container=True)
self.app.max_containers_per_account = 12345

View File

@ -0,0 +1,361 @@
# Copyright (c) 2010-2012 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import os
from tempfile import mkdtemp
from urllib import quote
from swift.common.storage_policy import StoragePolicy
from swift.common.swob import Request
from swift.common.utils import mkdirs, split_path
from swift.common.wsgi import monkey_patch_mimetools, WSGIContext
from swift.obj import server as object_server
from swift.proxy import server as proxy
import swift.proxy.controllers
from test.unit import FakeMemcache, debug_logger, FakeRing, \
fake_http_connect, patch_policies
class FakeServerConnection(WSGIContext):
'''Fakes an HTTPConnection to a server instance.'''
def __init__(self, app):
super(FakeServerConnection, self).__init__(app)
self.data = ''
def getheaders(self):
return self._response_headers
def read(self, amt=None):
try:
result = self.resp_iter.next()
return result
except StopIteration:
return ''
def getheader(self, name, default=None):
result = self._response_header_value(name)
return result if result else default
def getresponse(self):
environ = {'REQUEST_METHOD': self.method}
req = Request.blank(self.path, environ, headers=self.req_headers,
body=self.data)
self.resp = self._app_call(req.environ)
self.resp_iter = iter(self.resp)
if self._response_headers is None:
self._response_headers = []
status_parts = self._response_status.split(' ', 1)
self.status = int(status_parts[0])
self.reason = status_parts[1] if len(status_parts) == 2 else ''
return self
def getexpect(self):
class ContinueResponse(object):
status = 100
return ContinueResponse()
def send(self, data):
self.data = data
def __call__(self, ipaddr, port, device, partition, method, path,
headers=None, query_string=None):
self.path = quote('/' + device + '/' + str(partition) + path)
self.method = method
self.req_headers = headers
return self
def get_http_connect(account_func, container_func, object_func):
'''Returns a http_connect function that delegates to
entity-specific http_connect methods based on request path.
'''
def http_connect(ipaddr, port, device, partition, method, path,
headers=None, query_string=None):
a, c, o = split_path(path, 1, 3, True)
if o:
func = object_func
elif c:
func = container_func
else:
func = account_func
resp = func(ipaddr, port, device, partition, method, path,
headers=headers, query_string=query_string)
return resp
return http_connect
@patch_policies([StoragePolicy(0, 'zero', True,
object_ring=FakeRing(replicas=1))])
class TestObjectSysmeta(unittest.TestCase):
'''Tests object sysmeta is correctly handled by combination
of proxy server and object server.
'''
def _assertStatus(self, resp, expected):
self.assertEqual(resp.status_int, expected,
'Expected %d, got %s'
% (expected, resp.status))
def _assertInHeaders(self, resp, expected):
for key, val in expected.iteritems():
self.assertTrue(key in resp.headers,
'Header %s missing from %s' % (key, resp.headers))
self.assertEqual(val, resp.headers[key],
'Expected header %s:%s, got %s:%s'
% (key, val, key, resp.headers[key]))
def _assertNotInHeaders(self, resp, unexpected):
for key, val in unexpected.iteritems():
self.assertFalse(key in resp.headers,
'Header %s not expected in %s'
% (key, resp.headers))
def setUp(self):
self.app = proxy.Application(None, FakeMemcache(),
logger=debug_logger('proxy-ut'),
account_ring=FakeRing(replicas=1),
container_ring=FakeRing(replicas=1))
monkey_patch_mimetools()
self.testdir = \
os.path.join(mkdtemp(), 'tmp_test_object_server_ObjectController')
mkdirs(os.path.join(self.testdir, 'sda1', 'tmp'))
conf = {'devices': self.testdir, 'mount_check': 'false'}
self.obj_ctlr = object_server.ObjectController(
conf, logger=debug_logger('obj-ut'))
http_connect = get_http_connect(fake_http_connect(200),
fake_http_connect(200),
FakeServerConnection(self.obj_ctlr))
swift.proxy.controllers.base.http_connect = http_connect
swift.proxy.controllers.obj.http_connect = http_connect
original_sysmeta_headers_1 = {'x-object-sysmeta-test0': 'val0',
'x-object-sysmeta-test1': 'val1'}
original_sysmeta_headers_2 = {'x-object-sysmeta-test2': 'val2'}
changed_sysmeta_headers = {'x-object-sysmeta-test0': '',
'x-object-sysmeta-test1': 'val1 changed'}
new_sysmeta_headers = {'x-object-sysmeta-test3': 'val3'}
original_meta_headers_1 = {'x-object-meta-test0': 'meta0',
'x-object-meta-test1': 'meta1'}
original_meta_headers_2 = {'x-object-meta-test2': 'meta2'}
changed_meta_headers = {'x-object-meta-test0': '',
'x-object-meta-test1': 'meta1 changed'}
new_meta_headers = {'x-object-meta-test3': 'meta3'}
bad_headers = {'x-account-sysmeta-test1': 'bad1'}
def test_PUT_sysmeta_then_GET(self):
path = '/v1/a/c/o'
env = {'REQUEST_METHOD': 'PUT'}
hdrs = dict(self.original_sysmeta_headers_1)
hdrs.update(self.original_meta_headers_1)
hdrs.update(self.bad_headers)
req = Request.blank(path, environ=env, headers=hdrs, body='x')
resp = req.get_response(self.app)
self._assertStatus(resp, 201)
req = Request.blank(path, environ={})
resp = req.get_response(self.app)
self._assertStatus(resp, 200)
self._assertInHeaders(resp, self.original_sysmeta_headers_1)
self._assertInHeaders(resp, self.original_meta_headers_1)
self._assertNotInHeaders(resp, self.bad_headers)
def test_PUT_sysmeta_then_HEAD(self):
path = '/v1/a/c/o'
env = {'REQUEST_METHOD': 'PUT'}
hdrs = dict(self.original_sysmeta_headers_1)
hdrs.update(self.original_meta_headers_1)
hdrs.update(self.bad_headers)
req = Request.blank(path, environ=env, headers=hdrs, body='x')
resp = req.get_response(self.app)
self._assertStatus(resp, 201)
env = {'REQUEST_METHOD': 'HEAD'}
req = Request.blank(path, environ=env)
resp = req.get_response(self.app)
self._assertStatus(resp, 200)
self._assertInHeaders(resp, self.original_sysmeta_headers_1)
self._assertInHeaders(resp, self.original_meta_headers_1)
self._assertNotInHeaders(resp, self.bad_headers)
def test_sysmeta_replaced_by_PUT(self):
path = '/v1/a/c/o'
env = {'REQUEST_METHOD': 'PUT'}
hdrs = dict(self.original_sysmeta_headers_1)
hdrs.update(self.original_sysmeta_headers_2)
hdrs.update(self.original_meta_headers_1)
hdrs.update(self.original_meta_headers_2)
req = Request.blank(path, environ=env, headers=hdrs, body='x')
resp = req.get_response(self.app)
self._assertStatus(resp, 201)
env = {'REQUEST_METHOD': 'PUT'}
hdrs = dict(self.changed_sysmeta_headers)
hdrs.update(self.new_sysmeta_headers)
hdrs.update(self.changed_meta_headers)
hdrs.update(self.new_meta_headers)
hdrs.update(self.bad_headers)
req = Request.blank(path, environ=env, headers=hdrs, body='x')
resp = req.get_response(self.app)
self._assertStatus(resp, 201)
req = Request.blank(path, environ={})
resp = req.get_response(self.app)
self._assertStatus(resp, 200)
self._assertInHeaders(resp, self.changed_sysmeta_headers)
self._assertInHeaders(resp, self.new_sysmeta_headers)
self._assertNotInHeaders(resp, self.original_sysmeta_headers_2)
self._assertInHeaders(resp, self.changed_meta_headers)
self._assertInHeaders(resp, self.new_meta_headers)
self._assertNotInHeaders(resp, self.original_meta_headers_2)
def _test_sysmeta_not_updated_by_POST(self):
# check sysmeta is not changed by a POST but user meta is replaced
path = '/v1/a/c/o'
env = {'REQUEST_METHOD': 'PUT'}
hdrs = dict(self.original_sysmeta_headers_1)
hdrs.update(self.original_meta_headers_1)
req = Request.blank(path, environ=env, headers=hdrs, body='x')
resp = req.get_response(self.app)
self._assertStatus(resp, 201)
env = {'REQUEST_METHOD': 'POST'}
hdrs = dict(self.changed_sysmeta_headers)
hdrs.update(self.new_sysmeta_headers)
hdrs.update(self.changed_meta_headers)
hdrs.update(self.new_meta_headers)
hdrs.update(self.bad_headers)
req = Request.blank(path, environ=env, headers=hdrs)
resp = req.get_response(self.app)
self._assertStatus(resp, 202)
req = Request.blank(path, environ={})
resp = req.get_response(self.app)
self._assertStatus(resp, 200)
self._assertInHeaders(resp, self.original_sysmeta_headers_1)
self._assertNotInHeaders(resp, self.new_sysmeta_headers)
self._assertInHeaders(resp, self.changed_meta_headers)
self._assertInHeaders(resp, self.new_meta_headers)
self._assertNotInHeaders(resp, self.bad_headers)
env = {'REQUEST_METHOD': 'PUT'}
hdrs = dict(self.changed_sysmeta_headers)
hdrs.update(self.new_sysmeta_headers)
hdrs.update(self.bad_headers)
req = Request.blank(path, environ=env, headers=hdrs, body='x')
resp = req.get_response(self.app)
self._assertStatus(resp, 201)
req = Request.blank(path, environ={})
resp = req.get_response(self.app)
self._assertStatus(resp, 200)
self._assertInHeaders(resp, self.changed_sysmeta_headers)
self._assertInHeaders(resp, self.new_sysmeta_headers)
self._assertNotInHeaders(resp, self.original_sysmeta_headers_2)
def test_sysmeta_not_updated_by_POST(self):
self.app.object_post_as_copy = False
self._test_sysmeta_not_updated_by_POST()
def test_sysmeta_not_updated_by_POST_as_copy(self):
self.app.object_post_as_copy = True
self._test_sysmeta_not_updated_by_POST()
def test_sysmeta_updated_by_COPY(self):
# check sysmeta is updated by a COPY in same way as user meta
path = '/v1/a/c/o'
dest = '/c/o2'
env = {'REQUEST_METHOD': 'PUT'}
hdrs = dict(self.original_sysmeta_headers_1)
hdrs.update(self.original_sysmeta_headers_2)
hdrs.update(self.original_meta_headers_1)
hdrs.update(self.original_meta_headers_2)
req = Request.blank(path, environ=env, headers=hdrs, body='x')
resp = req.get_response(self.app)
self._assertStatus(resp, 201)
env = {'REQUEST_METHOD': 'COPY'}
hdrs = dict(self.changed_sysmeta_headers)
hdrs.update(self.new_sysmeta_headers)
hdrs.update(self.changed_meta_headers)
hdrs.update(self.new_meta_headers)
hdrs.update(self.bad_headers)
hdrs.update({'Destination': dest})
req = Request.blank(path, environ=env, headers=hdrs)
resp = req.get_response(self.app)
self._assertStatus(resp, 201)
self._assertInHeaders(resp, self.changed_sysmeta_headers)
self._assertInHeaders(resp, self.new_sysmeta_headers)
self._assertInHeaders(resp, self.original_sysmeta_headers_2)
self._assertInHeaders(resp, self.changed_meta_headers)
self._assertInHeaders(resp, self.new_meta_headers)
self._assertInHeaders(resp, self.original_meta_headers_2)
self._assertNotInHeaders(resp, self.bad_headers)
req = Request.blank('/v1/a/c/o2', environ={})
resp = req.get_response(self.app)
self._assertStatus(resp, 200)
self._assertInHeaders(resp, self.changed_sysmeta_headers)
self._assertInHeaders(resp, self.new_sysmeta_headers)
self._assertInHeaders(resp, self.original_sysmeta_headers_2)
self._assertInHeaders(resp, self.changed_meta_headers)
self._assertInHeaders(resp, self.new_meta_headers)
self._assertInHeaders(resp, self.original_meta_headers_2)
self._assertNotInHeaders(resp, self.bad_headers)
def test_sysmeta_updated_by_COPY_from(self):
# check sysmeta is updated by a COPY in same way as user meta
path = '/v1/a/c/o'
env = {'REQUEST_METHOD': 'PUT'}
hdrs = dict(self.original_sysmeta_headers_1)
hdrs.update(self.original_sysmeta_headers_2)
hdrs.update(self.original_meta_headers_1)
hdrs.update(self.original_meta_headers_2)
req = Request.blank(path, environ=env, headers=hdrs, body='x')
resp = req.get_response(self.app)
self._assertStatus(resp, 201)
env = {'REQUEST_METHOD': 'PUT'}
hdrs = dict(self.changed_sysmeta_headers)
hdrs.update(self.new_sysmeta_headers)
hdrs.update(self.changed_meta_headers)
hdrs.update(self.new_meta_headers)
hdrs.update(self.bad_headers)
hdrs.update({'X-Copy-From': '/c/o'})
req = Request.blank('/v1/a/c/o2', environ=env, headers=hdrs, body='')
resp = req.get_response(self.app)
self._assertStatus(resp, 201)
self._assertInHeaders(resp, self.changed_sysmeta_headers)
self._assertInHeaders(resp, self.new_sysmeta_headers)
self._assertInHeaders(resp, self.original_sysmeta_headers_2)
self._assertInHeaders(resp, self.changed_meta_headers)
self._assertInHeaders(resp, self.new_meta_headers)
self._assertInHeaders(resp, self.original_meta_headers_2)
self._assertNotInHeaders(resp, self.bad_headers)
req = Request.blank('/v1/a/c/o2', environ={})
resp = req.get_response(self.app)
self._assertStatus(resp, 200)
self._assertInHeaders(resp, self.changed_sysmeta_headers)
self._assertInHeaders(resp, self.new_sysmeta_headers)
self._assertInHeaders(resp, self.original_sysmeta_headers_2)
self._assertInHeaders(resp, self.changed_meta_headers)
self._assertInHeaders(resp, self.new_meta_headers)
self._assertInHeaders(resp, self.original_meta_headers_2)
self._assertNotInHeaders(resp, self.bad_headers)