Merge pull request #12 from pushpesh/functionalnosetestremove

Functionalnosetestremove
This commit is contained in:
Thiago da Silva 2014-05-07 07:56:13 -04:00
commit b6d1671b61
10 changed files with 1 additions and 1697 deletions

View File

@ -7,9 +7,4 @@ nosetests --exe $@
func1=$?
cd -
cd ${SRC_DIR}/test/functionalnosetests
nosetests --exe $@
func2=$?
cd -
exit $((func1 + func2))
exit $func1

View File

@ -1,175 +0,0 @@
# Copyright (c) 2010-2012 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from httplib import HTTPException
import os
import socket
import sys
from time import sleep
from test import get_config
from swiftclient import get_auth, http_connection
conf = get_config('func_test')
web_front_end = conf.get('web_front_end', 'integral')
normalized_urls = conf.get('normalized_urls', False)
# If no conf was read, we will fall back to old school env vars
swift_test_auth = os.environ.get('SWIFT_TEST_AUTH')
swift_test_user = [os.environ.get('SWIFT_TEST_USER'), None, None]
swift_test_key = [os.environ.get('SWIFT_TEST_KEY'), None, None]
swift_test_tenant = ['', '', '']
swift_test_perm = ['', '', '']
if conf:
swift_test_auth_version = str(conf.get('auth_version', '1'))
swift_test_auth = 'http'
if conf.get('auth_ssl', 'no').lower() in ('yes', 'true', 'on', '1'):
swift_test_auth = 'https'
if 'auth_prefix' not in conf:
conf['auth_prefix'] = '/'
try:
suffix = '://%(auth_host)s:%(auth_port)s%(auth_prefix)s' % conf
swift_test_auth += suffix
except KeyError:
pass # skip
if swift_test_auth_version == "1":
swift_test_auth += 'v1.0'
if 'account' in conf:
swift_test_user[0] = '%(account)s:%(username)s' % conf
else:
swift_test_user[0] = '%(username)s' % conf
swift_test_key[0] = conf['password']
try:
swift_test_user[1] = '%s%s' % (
'%s:' % conf['account2'] if 'account2' in conf else '',
conf['username2'])
swift_test_key[1] = conf['password2']
except KeyError as err:
pass # old conf, no second account tests can be run
try:
swift_test_user[2] = '%s%s' % ('%s:' % conf['account'] if 'account'
in conf else '', conf['username3'])
swift_test_key[2] = conf['password3']
except KeyError as err:
pass # old conf, no third account tests can be run
for _ in range(3):
swift_test_perm[_] = swift_test_user[_]
else:
swift_test_user[0] = conf['username']
swift_test_tenant[0] = conf['account']
swift_test_key[0] = conf['password']
swift_test_user[1] = conf['username2']
swift_test_tenant[1] = conf['account2']
swift_test_key[1] = conf['password2']
swift_test_user[2] = conf['username3']
swift_test_tenant[2] = conf['account']
swift_test_key[2] = conf['password3']
for _ in range(3):
swift_test_perm[_] = swift_test_tenant[_] + ':' \
+ swift_test_user[_]
skip = not all([swift_test_auth, swift_test_user[0], swift_test_key[0]])
if skip:
print >>sys.stderr, 'SKIPPING FUNCTIONAL TESTS DUE TO NO CONFIG'
skip2 = not all([not skip, swift_test_user[1], swift_test_key[1]])
if not skip and skip2:
print >>sys.stderr, \
'SKIPPING SECOND ACCOUNT FUNCTIONAL TESTS DUE TO NO CONFIG FOR THEM'
skip3 = not all([not skip, swift_test_user[2], swift_test_key[2]])
if not skip and skip3:
print >>sys.stderr, \
'SKIPPING THIRD ACCOUNT FUNCTIONAL TESTS DUE TO NO CONFIG FOR THEM'
class AuthError(Exception):
pass
class InternalServerError(Exception):
pass
url = [None, None, None]
token = [None, None, None]
parsed = [None, None, None]
conn = [None, None, None]
def retry(func, *args, **kwargs):
"""
You can use the kwargs to override the 'retries' (default: 5) and
'use_account' (default: 1).
"""
global url, token, parsed, conn
retries = kwargs.get('retries', 5)
use_account = 1
if 'use_account' in kwargs:
use_account = kwargs['use_account']
del kwargs['use_account']
use_account -= 1
attempts = 0
backoff = 1
while attempts <= retries:
attempts += 1
try:
if not url[use_account] or not token[use_account]:
url[use_account], token[use_account] = \
get_auth(swift_test_auth, swift_test_user[use_account],
swift_test_key[use_account],
snet=False,
tenant_name=swift_test_tenant[use_account],
auth_version=swift_test_auth_version,
os_options={})
parsed[use_account] = conn[use_account] = None
if not parsed[use_account] or not conn[use_account]:
parsed[use_account], conn[use_account] = \
http_connection(url[use_account])
return func(url[use_account], token[use_account],
parsed[use_account], conn[use_account],
*args, **kwargs)
except (socket.error, HTTPException):
if attempts > retries:
raise
parsed[use_account] = conn[use_account] = None
except AuthError:
url[use_account] = token[use_account] = None
continue
except InternalServerError:
pass
if attempts <= retries:
sleep(backoff)
backoff *= 2
raise Exception('No result after %s retries.' % retries)
def check_response(conn):
resp = conn.getresponse()
if resp.status == 401:
resp.read()
raise AuthError()
elif resp.status // 100 == 5:
resp.read()
raise InternalServerError()
return resp

View File

@ -1,203 +0,0 @@
#!/usr/bin/python
# Copyright (c) 2010-2012 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
from nose import SkipTest
from swift.common.constraints import MAX_META_COUNT, MAX_META_NAME_LENGTH, \
MAX_META_OVERALL_SIZE, MAX_META_VALUE_LENGTH
from swift_testing import check_response, retry, skip, web_front_end
class TestAccount(unittest.TestCase):
def test_metadata(self):
if skip:
raise SkipTest
def post(url, token, parsed, conn, value):
conn.request('POST', parsed.path, '',
{'X-Auth-Token': token, 'X-Account-Meta-Test': value})
return check_response(conn)
def head(url, token, parsed, conn):
conn.request('HEAD', parsed.path, '', {'X-Auth-Token': token})
return check_response(conn)
def get(url, token, parsed, conn):
conn.request('GET', parsed.path, '', {'X-Auth-Token': token})
return check_response(conn)
resp = retry(post, '')
resp.read()
self.assertEquals(resp.status, 204)
resp = retry(head)
resp.read()
self.assert_(resp.status in (200, 204), resp.status)
self.assertEquals(resp.getheader('x-account-meta-test'), None)
resp = retry(get)
resp.read()
self.assert_(resp.status in (200, 204), resp.status)
self.assertEquals(resp.getheader('x-account-meta-test'), None)
resp = retry(post, 'Value')
resp.read()
self.assertEquals(resp.status, 204)
resp = retry(head)
resp.read()
self.assert_(resp.status in (200, 204), resp.status)
self.assertEquals(resp.getheader('x-account-meta-test'), 'Value')
resp = retry(get)
resp.read()
self.assert_(resp.status in (200, 204), resp.status)
self.assertEquals(resp.getheader('x-account-meta-test'), 'Value')
def test_unicode_metadata(self):
if skip:
raise SkipTest
def post(url, token, parsed, conn, name, value):
conn.request('POST', parsed.path, '',
{'X-Auth-Token': token, name: value})
return check_response(conn)
def head(url, token, parsed, conn):
conn.request('HEAD', parsed.path, '', {'X-Auth-Token': token})
return check_response(conn)
uni_key = u'X-Account-Meta-uni\u0E12'
uni_value = u'uni\u0E12'
if (web_front_end == 'integral'):
resp = retry(post, uni_key, '1')
resp.read()
self.assertTrue(resp.status in (201, 204))
resp = retry(head)
resp.read()
self.assert_(resp.status in (200, 204), resp.status)
self.assertEquals(resp.getheader(uni_key.encode('utf-8')), '1')
resp = retry(post, 'X-Account-Meta-uni', uni_value)
resp.read()
self.assertEquals(resp.status, 204)
resp = retry(head)
resp.read()
self.assert_(resp.status in (200, 204), resp.status)
self.assertEquals(resp.getheader('X-Account-Meta-uni'),
uni_value.encode('utf-8'))
if (web_front_end == 'integral'):
resp = retry(post, uni_key, uni_value)
resp.read()
self.assertEquals(resp.status, 204)
resp = retry(head)
resp.read()
self.assert_(resp.status in (200, 204), resp.status)
self.assertEquals(resp.getheader(uni_key.encode('utf-8')),
uni_value.encode('utf-8'))
def test_multi_metadata(self):
if skip:
raise SkipTest
def post(url, token, parsed, conn, name, value):
conn.request('POST', parsed.path, '',
{'X-Auth-Token': token, name: value})
return check_response(conn)
def head(url, token, parsed, conn):
conn.request('HEAD', parsed.path, '', {'X-Auth-Token': token})
return check_response(conn)
resp = retry(post, 'X-Account-Meta-One', '1')
resp.read()
self.assertEquals(resp.status, 204)
resp = retry(head)
resp.read()
self.assert_(resp.status in (200, 204), resp.status)
self.assertEquals(resp.getheader('x-account-meta-one'), '1')
resp = retry(post, 'X-Account-Meta-Two', '2')
resp.read()
self.assertEquals(resp.status, 204)
resp = retry(head)
resp.read()
self.assert_(resp.status in (200, 204), resp.status)
self.assertEquals(resp.getheader('x-account-meta-one'), '1')
self.assertEquals(resp.getheader('x-account-meta-two'), '2')
def test_bad_metadata(self):
if skip:
raise SkipTest
def post(url, token, parsed, conn, extra_headers):
headers = {'X-Auth-Token': token}
headers.update(extra_headers)
conn.request('POST', parsed.path, '', headers)
return check_response(conn)
resp = retry(post,
{'X-Account-Meta-' + ('k' * MAX_META_NAME_LENGTH): 'v'})
resp.read()
self.assertEquals(resp.status, 204)
resp = retry(
post,
{'X-Account-Meta-' + ('k' * (MAX_META_NAME_LENGTH + 1)): 'v'})
resp.read()
self.assertEquals(resp.status, 400)
resp = retry(post,
{'X-Account-Meta-Too-Long': 'k' * MAX_META_VALUE_LENGTH})
resp.read()
self.assertEquals(resp.status, 204)
resp = retry(
post,
{'X-Account-Meta-Too-Long': 'k' * (MAX_META_VALUE_LENGTH + 1)})
resp.read()
self.assertEquals(resp.status, 400)
headers = {}
for x in xrange(MAX_META_COUNT):
headers['X-Account-Meta-%d' % x] = 'v'
resp = retry(post, headers)
resp.read()
self.assertEquals(resp.status, 204)
headers = {}
for x in xrange(MAX_META_COUNT + 1):
headers['X-Account-Meta-%d' % x] = 'v'
resp = retry(post, headers)
resp.read()
self.assertEquals(resp.status, 400)
headers = {}
header_value = 'k' * MAX_META_VALUE_LENGTH
size = 0
x = 0
while size < MAX_META_OVERALL_SIZE - 4 - MAX_META_VALUE_LENGTH:
size += 4 + MAX_META_VALUE_LENGTH
headers['X-Account-Meta-%04d' % x] = header_value
x += 1
if MAX_META_OVERALL_SIZE - size > 1:
headers['X-Account-Meta-k'] = \
'v' * (MAX_META_OVERALL_SIZE - size - 1)
resp = retry(post, headers)
resp.read()
self.assertEquals(resp.status, 204)
headers['X-Account-Meta-k'] = \
'v' * (MAX_META_OVERALL_SIZE - size)
resp = retry(post, headers)
resp.read()
self.assertEquals(resp.status, 400)
if __name__ == '__main__':
unittest.main()

View File

@ -1,687 +0,0 @@
#!/usr/bin/python
# Copyright (c) 2010-2012 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import unittest
from nose import SkipTest
from uuid import uuid4
from swift.common.constraints import MAX_META_COUNT, MAX_META_NAME_LENGTH, \
MAX_META_OVERALL_SIZE, MAX_META_VALUE_LENGTH
from swift_testing import check_response, retry, skip, skip2, skip3, \
swift_test_perm, web_front_end
class TestContainer(unittest.TestCase):
def setUp(self):
if skip:
raise SkipTest
self.name = uuid4().hex
def put(url, token, parsed, conn):
conn.request('PUT', parsed.path + '/' + self.name, '',
{'X-Auth-Token': token})
return check_response(conn)
resp = retry(put)
resp.read()
self.assertEquals(resp.status, 201)
def tearDown(self):
if skip:
raise SkipTest
def get(url, token, parsed, conn):
conn.request('GET', parsed.path + '/' + self.name + '?format=json',
'', {'X-Auth-Token': token})
return check_response(conn)
def delete(url, token, parsed, conn, obj):
conn.request('DELETE',
'/'.join([parsed.path, self.name, obj['name']]), '',
{'X-Auth-Token': token})
return check_response(conn)
while True:
resp = retry(get)
body = resp.read()
self.assert_(resp.status // 100 == 2, resp.status)
objs = json.loads(body)
if not objs:
break
for obj in objs:
resp = retry(delete, obj)
resp.read()
self.assertEquals(resp.status, 204)
def delete(url, token, parsed, conn):
conn.request('DELETE', parsed.path + '/' + self.name, '',
{'X-Auth-Token': token})
return check_response(conn)
resp = retry(delete)
resp.read()
self.assertEquals(resp.status, 204)
def test_multi_metadata(self):
if skip:
raise SkipTest
def post(url, token, parsed, conn, name, value):
conn.request('POST', parsed.path + '/' + self.name, '',
{'X-Auth-Token': token, name: value})
return check_response(conn)
def head(url, token, parsed, conn):
conn.request('HEAD', parsed.path + '/' + self.name, '',
{'X-Auth-Token': token})
return check_response(conn)
resp = retry(post, 'X-Container-Meta-One', '1')
resp.read()
self.assertEquals(resp.status, 204)
resp = retry(head)
resp.read()
self.assert_(resp.status in (200, 204), resp.status)
self.assertEquals(resp.getheader('x-container-meta-one'), '1')
resp = retry(post, 'X-Container-Meta-Two', '2')
resp.read()
self.assertEquals(resp.status, 204)
resp = retry(head)
resp.read()
self.assert_(resp.status in (200, 204), resp.status)
self.assertEquals(resp.getheader('x-container-meta-one'), '1')
self.assertEquals(resp.getheader('x-container-meta-two'), '2')
def test_unicode_metadata(self):
if skip:
raise SkipTest
def post(url, token, parsed, conn, name, value):
conn.request('POST', parsed.path + '/' + self.name, '',
{'X-Auth-Token': token, name: value})
return check_response(conn)
def head(url, token, parsed, conn):
conn.request('HEAD', parsed.path + '/' + self.name, '',
{'X-Auth-Token': token})
return check_response(conn)
uni_key = u'X-Container-Meta-uni\u0E12'
uni_value = u'uni\u0E12'
if (web_front_end == 'integral'):
resp = retry(post, uni_key, '1')
resp.read()
self.assertEquals(resp.status, 204)
resp = retry(head)
resp.read()
self.assert_(resp.status in (200, 204), resp.status)
self.assertEquals(resp.getheader(uni_key.encode('utf-8')), '1')
resp = retry(post, 'X-Container-Meta-uni', uni_value)
resp.read()
self.assertEquals(resp.status, 204)
resp = retry(head)
resp.read()
self.assert_(resp.status in (200, 204), resp.status)
self.assertEquals(resp.getheader('X-Container-Meta-uni'),
uni_value.encode('utf-8'))
if (web_front_end == 'integral'):
resp = retry(post, uni_key, uni_value)
resp.read()
self.assertEquals(resp.status, 204)
resp = retry(head)
resp.read()
self.assert_(resp.status in (200, 204), resp.status)
self.assertEquals(resp.getheader(uni_key.encode('utf-8')),
uni_value.encode('utf-8'))
def test_PUT_metadata(self):
if skip:
raise SkipTest
def put(url, token, parsed, conn, name, value):
conn.request('PUT', parsed.path + '/' + name, '',
{'X-Auth-Token': token,
'X-Container-Meta-Test': value})
return check_response(conn)
def head(url, token, parsed, conn, name):
conn.request('HEAD', parsed.path + '/' + name, '',
{'X-Auth-Token': token})
return check_response(conn)
def get(url, token, parsed, conn, name):
conn.request('GET', parsed.path + '/' + name, '',
{'X-Auth-Token': token})
return check_response(conn)
def delete(url, token, parsed, conn, name):
conn.request('DELETE', parsed.path + '/' + name, '',
{'X-Auth-Token': token})
return check_response(conn)
name = uuid4().hex
resp = retry(put, name, 'Value')
resp.read()
self.assertEquals(resp.status, 201)
resp = retry(head, name)
resp.read()
self.assert_(resp.status in (200, 204), resp.status)
self.assertEquals(resp.getheader('x-container-meta-test'), 'Value')
resp = retry(get, name)
resp.read()
self.assert_(resp.status in (200, 204), resp.status)
self.assertEquals(resp.getheader('x-container-meta-test'), 'Value')
resp = retry(delete, name)
resp.read()
self.assertEquals(resp.status, 204)
name = uuid4().hex
resp = retry(put, name, '')
resp.read()
self.assertEquals(resp.status, 201)
resp = retry(head, name)
resp.read()
self.assert_(resp.status in (200, 204), resp.status)
self.assertEquals(resp.getheader('x-container-meta-test'), None)
resp = retry(get, name)
resp.read()
self.assert_(resp.status in (200, 204), resp.status)
self.assertEquals(resp.getheader('x-container-meta-test'), None)
resp = retry(delete, name)
resp.read()
self.assertEquals(resp.status, 204)
def test_POST_metadata(self):
if skip:
raise SkipTest
def post(url, token, parsed, conn, value):
conn.request('POST', parsed.path + '/' + self.name, '',
{'X-Auth-Token': token,
'X-Container-Meta-Test': value})
return check_response(conn)
def head(url, token, parsed, conn):
conn.request('HEAD', parsed.path + '/' + self.name, '',
{'X-Auth-Token': token})
return check_response(conn)
def get(url, token, parsed, conn):
conn.request('GET', parsed.path + '/' + self.name, '',
{'X-Auth-Token': token})
return check_response(conn)
resp = retry(head)
resp.read()
self.assert_(resp.status in (200, 204), resp.status)
self.assertEquals(resp.getheader('x-container-meta-test'), None)
resp = retry(get)
resp.read()
self.assert_(resp.status in (200, 204), resp.status)
self.assertEquals(resp.getheader('x-container-meta-test'), None)
resp = retry(post, 'Value')
resp.read()
self.assertEquals(resp.status, 204)
resp = retry(head)
resp.read()
self.assert_(resp.status in (200, 204), resp.status)
self.assertEquals(resp.getheader('x-container-meta-test'), 'Value')
resp = retry(get)
resp.read()
self.assert_(resp.status in (200, 204), resp.status)
self.assertEquals(resp.getheader('x-container-meta-test'), 'Value')
def test_PUT_bad_metadata(self):
if skip:
raise SkipTest
def put(url, token, parsed, conn, name, extra_headers):
headers = {'X-Auth-Token': token}
headers.update(extra_headers)
conn.request('PUT', parsed.path + '/' + name, '', headers)
return check_response(conn)
def delete(url, token, parsed, conn, name):
conn.request('DELETE', parsed.path + '/' + name, '',
{'X-Auth-Token': token})
return check_response(conn)
name = uuid4().hex
resp = retry(
put, name,
{'X-Container-Meta-' + ('k' * MAX_META_NAME_LENGTH): 'v'})
resp.read()
self.assertEquals(resp.status, 201)
resp = retry(delete, name)
resp.read()
self.assertEquals(resp.status, 204)
name = uuid4().hex
resp = retry(
put, name,
{'X-Container-Meta-' + ('k' * (MAX_META_NAME_LENGTH + 1)): 'v'})
resp.read()
self.assertEquals(resp.status, 400)
resp = retry(delete, name)
resp.read()
self.assertEquals(resp.status, 404)
name = uuid4().hex
resp = retry(
put, name,
{'X-Container-Meta-Too-Long': 'k' * MAX_META_VALUE_LENGTH})
resp.read()
self.assertEquals(resp.status, 201)
resp = retry(delete, name)
resp.read()
self.assertEquals(resp.status, 204)
name = uuid4().hex
resp = retry(
put, name,
{'X-Container-Meta-Too-Long': 'k' * (MAX_META_VALUE_LENGTH + 1)})
resp.read()
self.assertEquals(resp.status, 400)
resp = retry(delete, name)
resp.read()
self.assertEquals(resp.status, 404)
name = uuid4().hex
headers = {}
for x in xrange(MAX_META_COUNT):
headers['X-Container-Meta-%d' % x] = 'v'
resp = retry(put, name, headers)
resp.read()
self.assertEquals(resp.status, 201)
resp = retry(delete, name)
resp.read()
self.assertEquals(resp.status, 204)
name = uuid4().hex
headers = {}
for x in xrange(MAX_META_COUNT + 1):
headers['X-Container-Meta-%d' % x] = 'v'
resp = retry(put, name, headers)
resp.read()
self.assertEquals(resp.status, 400)
resp = retry(delete, name)
resp.read()
self.assertEquals(resp.status, 404)
name = uuid4().hex
headers = {}
header_value = 'k' * MAX_META_VALUE_LENGTH
size = 0
x = 0
while size < MAX_META_OVERALL_SIZE - 4 - MAX_META_VALUE_LENGTH:
size += 4 + MAX_META_VALUE_LENGTH
headers['X-Container-Meta-%04d' % x] = header_value
x += 1
if MAX_META_OVERALL_SIZE - size > 1:
headers['X-Container-Meta-k'] = \
'v' * (MAX_META_OVERALL_SIZE - size - 1)
resp = retry(put, name, headers)
resp.read()
self.assertEquals(resp.status, 201)
resp = retry(delete, name)
resp.read()
self.assertEquals(resp.status, 204)
name = uuid4().hex
headers['X-Container-Meta-k'] = \
'v' * (MAX_META_OVERALL_SIZE - size)
resp = retry(put, name, headers)
resp.read()
self.assertEquals(resp.status, 400)
resp = retry(delete, name)
resp.read()
self.assertEquals(resp.status, 404)
def test_POST_bad_metadata(self):
if skip:
raise SkipTest
def post(url, token, parsed, conn, extra_headers):
headers = {'X-Auth-Token': token}
headers.update(extra_headers)
conn.request('POST', parsed.path + '/' + self.name, '', headers)
return check_response(conn)
resp = retry(
post,
{'X-Container-Meta-' + ('k' * MAX_META_NAME_LENGTH): 'v'})
resp.read()
self.assertEquals(resp.status, 204)
resp = retry(
post,
{'X-Container-Meta-' + ('k' * (MAX_META_NAME_LENGTH + 1)): 'v'})
resp.read()
self.assertEquals(resp.status, 400)
resp = retry(
post,
{'X-Container-Meta-Too-Long': 'k' * MAX_META_VALUE_LENGTH})
resp.read()
self.assertEquals(resp.status, 204)
resp = retry(
post,
{'X-Container-Meta-Too-Long': 'k' * (MAX_META_VALUE_LENGTH + 1)})
resp.read()
self.assertEquals(resp.status, 400)
headers = {}
for x in xrange(MAX_META_COUNT):
headers['X-Container-Meta-%d' % x] = 'v'
resp = retry(post, headers)
resp.read()
self.assertEquals(resp.status, 204)
headers = {}
for x in xrange(MAX_META_COUNT + 1):
headers['X-Container-Meta-%d' % x] = 'v'
resp = retry(post, headers)
resp.read()
self.assertEquals(resp.status, 400)
headers = {}
header_value = 'k' * MAX_META_VALUE_LENGTH
size = 0
x = 0
while size < MAX_META_OVERALL_SIZE - 4 - MAX_META_VALUE_LENGTH:
size += 4 + MAX_META_VALUE_LENGTH
headers['X-Container-Meta-%04d' % x] = header_value
x += 1
if MAX_META_OVERALL_SIZE - size > 1:
headers['X-Container-Meta-k'] = \
'v' * (MAX_META_OVERALL_SIZE - size - 1)
resp = retry(post, headers)
resp.read()
self.assertEquals(resp.status, 204)
headers['X-Container-Meta-k'] = \
'v' * (MAX_META_OVERALL_SIZE - size)
resp = retry(post, headers)
resp.read()
self.assertEquals(resp.status, 400)
def test_public_container(self):
if skip:
raise SkipTest
def get(url, token, parsed, conn):
conn.request('GET', parsed.path + '/' + self.name)
return check_response(conn)
try:
resp = retry(get)
raise Exception('Should not have been able to GET')
except Exception as err:
self.assert_(str(err).startswith('No result after '), err)
def post(url, token, parsed, conn):
conn.request('POST', parsed.path + '/' + self.name, '',
{'X-Auth-Token': token,
'X-Container-Read': '.r:*,.rlistings'})
return check_response(conn)
resp = retry(post)
resp.read()
self.assertEquals(resp.status, 204)
resp = retry(get)
resp.read()
self.assertEquals(resp.status, 204)
def post(url, token, parsed, conn):
conn.request('POST', parsed.path + '/' + self.name, '',
{'X-Auth-Token': token, 'X-Container-Read': ''})
return check_response(conn)
resp = retry(post)
resp.read()
self.assertEquals(resp.status, 204)
try:
resp = retry(get)
raise Exception('Should not have been able to GET')
except Exception as err:
self.assert_(str(err).startswith('No result after '), err)
def test_cross_account_container(self):
if skip or skip2:
raise SkipTest
# Obtain the first account's string
first_account = ['unknown']
def get1(url, token, parsed, conn):
first_account[0] = parsed.path
conn.request('HEAD', parsed.path + '/' + self.name, '',
{'X-Auth-Token': token})
return check_response(conn)
resp = retry(get1)
resp.read()
# Ensure we can't access the container with the second account
def get2(url, token, parsed, conn):
conn.request('GET', first_account[0] + '/' + self.name, '',
{'X-Auth-Token': token})
return check_response(conn)
resp = retry(get2, use_account=2)
resp.read()
self.assertEquals(resp.status, 403)
# Make the container accessible by the second account
def post(url, token, parsed, conn):
conn.request('POST', parsed.path + '/' + self.name, '',
{'X-Auth-Token': token,
'X-Container-Read': swift_test_perm[1],
'X-Container-Write': swift_test_perm[1]})
return check_response(conn)
resp = retry(post)
resp.read()
self.assertEquals(resp.status, 204)
# Ensure we can now use the container with the second account
resp = retry(get2, use_account=2)
resp.read()
self.assertEquals(resp.status, 204)
# Make the container private again
def post(url, token, parsed, conn):
conn.request('POST', parsed.path + '/' + self.name, '',
{'X-Auth-Token': token, 'X-Container-Read': '',
'X-Container-Write': ''})
return check_response(conn)
resp = retry(post)
resp.read()
self.assertEquals(resp.status, 204)
# Ensure we can't access the container with the second account again
resp = retry(get2, use_account=2)
resp.read()
self.assertEquals(resp.status, 403)
def test_cross_account_public_container(self):
if skip or skip2:
raise SkipTest
# Obtain the first account's string
first_account = ['unknown']
def get1(url, token, parsed, conn):
first_account[0] = parsed.path
conn.request('HEAD', parsed.path + '/' + self.name, '',
{'X-Auth-Token': token})
return check_response(conn)
resp = retry(get1)
resp.read()
# Ensure we can't access the container with the second account
def get2(url, token, parsed, conn):
conn.request('GET', first_account[0] + '/' + self.name, '',
{'X-Auth-Token': token})
return check_response(conn)
resp = retry(get2, use_account=2)
resp.read()
self.assertEquals(resp.status, 403)
# Make the container completely public
def post(url, token, parsed, conn):
conn.request('POST', parsed.path + '/' + self.name, '',
{'X-Auth-Token': token,
'X-Container-Read': '.r:*,.rlistings'})
return check_response(conn)
resp = retry(post)
resp.read()
self.assertEquals(resp.status, 204)
# Ensure we can now read the container with the second account
resp = retry(get2, use_account=2)
resp.read()
self.assertEquals(resp.status, 204)
# But we shouldn't be able to write with the second account
def put2(url, token, parsed, conn):
conn.request('PUT', first_account[0] + '/' + self.name + '/object',
'test object', {'X-Auth-Token': token})
return check_response(conn)
resp = retry(put2, use_account=2)
resp.read()
self.assertEquals(resp.status, 403)
# Now make the container also writeable by the second account
def post(url, token, parsed, conn):
conn.request('POST', parsed.path + '/' + self.name, '',
{'X-Auth-Token': token,
'X-Container-Write': swift_test_perm[1]})
return check_response(conn)
resp = retry(post)
resp.read()
self.assertEquals(resp.status, 204)
# Ensure we can still read the container with the second account
resp = retry(get2, use_account=2)
resp.read()
self.assertEquals(resp.status, 204)
# And that we can now write with the second account
resp = retry(put2, use_account=2)
resp.read()
self.assertEquals(resp.status, 201)
def test_nonadmin_user(self):
if skip or skip3:
raise SkipTest
# Obtain the first account's string
first_account = ['unknown']
def get1(url, token, parsed, conn):
first_account[0] = parsed.path
conn.request('HEAD', parsed.path + '/' + self.name, '',
{'X-Auth-Token': token})
return check_response(conn)
resp = retry(get1)
resp.read()
# Ensure we can't access the container with the third account
def get3(url, token, parsed, conn):
conn.request('GET', first_account[0] + '/' + self.name, '',
{'X-Auth-Token': token})
return check_response(conn)
resp = retry(get3, use_account=3)
resp.read()
self.assertEquals(resp.status, 403)
# Make the container accessible by the third account
def post(url, token, parsed, conn):
conn.request('POST', parsed.path + '/' + self.name, '',
{'X-Auth-Token': token,
'X-Container-Read': swift_test_perm[2]})
return check_response(conn)
resp = retry(post)
resp.read()
self.assertEquals(resp.status, 204)
# Ensure we can now read the container with the third account
resp = retry(get3, use_account=3)
resp.read()
self.assertEquals(resp.status, 204)
# But we shouldn't be able to write with the third account
def put3(url, token, parsed, conn):
conn.request('PUT', first_account[0] + '/' + self.name + '/object',
'test object', {'X-Auth-Token': token})
return check_response(conn)
resp = retry(put3, use_account=3)
resp.read()
self.assertEquals(resp.status, 403)
# Now make the container also writeable by the third account
def post(url, token, parsed, conn):
conn.request('POST', parsed.path + '/' + self.name, '',
{'X-Auth-Token': token,
'X-Container-Write': swift_test_perm[2]})
return check_response(conn)
resp = retry(post)
resp.read()
self.assertEquals(resp.status, 204)
# Ensure we can still read the container with the third account
resp = retry(get3, use_account=3)
resp.read()
self.assertEquals(resp.status, 204)
# And that we can now write with the third account
resp = retry(put3, use_account=3)
resp.read()
self.assertEquals(resp.status, 201)
def test_long_name_content_type(self):
if skip:
raise SkipTest
def put(url, token, parsed, conn):
container_name = 'X' * 2048
conn.request('PUT', '%s/%s' % (parsed.path, container_name),
'there', {'X-Auth-Token': token})
return check_response(conn)
resp = retry(put)
resp.read()
self.assertEquals(resp.status, 400)
self.assertEquals(resp.getheader('Content-Type'),
'text/html; charset=UTF-8')
def test_null_name(self):
if skip:
raise SkipTest
def put(url, token, parsed, conn):
conn.request('PUT', '%s/abc%%00def' % parsed.path, '',
{'X-Auth-Token': token})
return check_response(conn)
resp = retry(put)
if (web_front_end == 'apache2'):
self.assertEquals(resp.status, 404)
else:
self.assertEquals(resp.read(), 'Invalid UTF8 or contains NULL')
self.assertEquals(resp.status, 412)
if __name__ == '__main__':
unittest.main()

View File

@ -1,602 +0,0 @@
#!/usr/bin/python
# Copyright (c) 2010-2012 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
from nose import SkipTest
from uuid import uuid4
from swift_testing import check_response, retry, skip, skip3, \
swift_test_perm, web_front_end
class TestObject(unittest.TestCase):
def setUp(self):
if skip:
raise SkipTest
self.container = uuid4().hex
def put(url, token, parsed, conn):
conn.request('PUT', parsed.path + '/' + self.container, '',
{'X-Auth-Token': token})
return check_response(conn)
resp = retry(put)
resp.read()
self.assertEquals(resp.status, 201)
self.obj = uuid4().hex
def put(url, token, parsed, conn):
conn.request('PUT', '%s/%s/%s' % (
parsed.path, self.container, self.obj), 'test',
{'X-Auth-Token': token})
return check_response(conn)
resp = retry(put)
resp.read()
self.assertEquals(resp.status, 201)
def tearDown(self):
if skip:
raise SkipTest
def delete(url, token, parsed, conn, obj):
conn.request('DELETE',
'%s/%s/%s' % (parsed.path, self.container, obj),
'', {'X-Auth-Token': token})
return check_response(conn)
# get list of objects in container
def list(url, token, parsed, conn):
conn.request('GET',
'%s/%s' % (parsed.path, self.container),
'', {'X-Auth-Token': token})
return check_response(conn)
resp = retry(list)
object_listing = resp.read()
self.assertEquals(resp.status, 200)
# iterate over object listing and delete all objects
for obj in object_listing.splitlines():
resp = retry(delete, obj)
resp.read()
self.assertEquals(resp.status, 204)
# delete the container
def delete(url, token, parsed, conn):
conn.request('DELETE', parsed.path + '/' + self.container, '',
{'X-Auth-Token': token})
return check_response(conn)
resp = retry(delete)
resp.read()
self.assertEquals(resp.status, 204)
def test_copy_object(self):
if skip:
raise SkipTest
source = '%s/%s' % (self.container, self.obj)
dest = '%s/%s' % (self.container, 'test_copy')
# get contents of source
def get_source(url, token, parsed, conn):
conn.request('GET',
'%s/%s' % (parsed.path, source),
'', {'X-Auth-Token': token})
return check_response(conn)
resp = retry(get_source)
source_contents = resp.read()
self.assertEquals(resp.status, 200)
self.assertEquals(source_contents, 'test')
# copy source to dest with X-Copy-From
def put(url, token, parsed, conn):
conn.request('PUT', '%s/%s' % (parsed.path, dest), '',
{'X-Auth-Token': token,
'Content-Length': '0',
'X-Copy-From': source})
return check_response(conn)
resp = retry(put)
resp.read()
self.assertEquals(resp.status, 201)
# contents of dest should be the same as source
def get_dest(url, token, parsed, conn):
conn.request('GET',
'%s/%s' % (parsed.path, dest),
'', {'X-Auth-Token': token})
return check_response(conn)
resp = retry(get_dest)
dest_contents = resp.read()
self.assertEquals(resp.status, 200)
self.assertEquals(dest_contents, source_contents)
# delete the copy
def delete(url, token, parsed, conn):
conn.request('DELETE', '%s/%s' % (parsed.path, dest), '',
{'X-Auth-Token': token})
return check_response(conn)
resp = retry(delete)
resp.read()
self.assertEquals(resp.status, 204)
# verify dest does not exist
resp = retry(get_dest)
resp.read()
self.assertEquals(resp.status, 404)
# copy source to dest with COPY
def copy(url, token, parsed, conn):
conn.request('COPY', '%s/%s' % (parsed.path, source), '',
{'X-Auth-Token': token,
'Destination': dest})
return check_response(conn)
resp = retry(copy)
resp.read()
self.assertEquals(resp.status, 201)
# contents of dest should be the same as source
resp = retry(get_dest)
dest_contents = resp.read()
self.assertEquals(resp.status, 200)
self.assertEquals(dest_contents, source_contents)
# delete the copy
resp = retry(delete)
resp.read()
self.assertEquals(resp.status, 204)
def test_public_object(self):
if skip:
raise SkipTest
def get(url, token, parsed, conn):
conn.request('GET',
'%s/%s/%s' % (parsed.path, self.container, self.obj))
return check_response(conn)
try:
resp = retry(get)
raise Exception('Should not have been able to GET')
except Exception as err:
self.assert_(str(err).startswith('No result after '))
def post(url, token, parsed, conn):
conn.request('POST', parsed.path + '/' + self.container, '',
{'X-Auth-Token': token,
'X-Container-Read': '.r:*'})
return check_response(conn)
resp = retry(post)
resp.read()
self.assertEquals(resp.status, 204)
resp = retry(get)
resp.read()
self.assertEquals(resp.status, 200)
def post(url, token, parsed, conn):
conn.request('POST', parsed.path + '/' + self.container, '',
{'X-Auth-Token': token, 'X-Container-Read': ''})
return check_response(conn)
resp = retry(post)
resp.read()
self.assertEquals(resp.status, 204)
try:
resp = retry(get)
raise Exception('Should not have been able to GET')
except Exception as err:
self.assert_(str(err).startswith('No result after '))
def test_private_object(self):
if skip or skip3:
raise SkipTest
# Ensure we can't access the object with the third account
def get(url, token, parsed, conn):
conn.request('GET', '%s/%s/%s' % (
parsed.path, self.container, self.obj), '',
{'X-Auth-Token': token})
return check_response(conn)
resp = retry(get, use_account=3)
resp.read()
self.assertEquals(resp.status, 403)
# create a shared container writable by account3
shared_container = uuid4().hex
def put(url, token, parsed, conn):
conn.request('PUT', '%s/%s' % (
parsed.path, shared_container), '',
{'X-Auth-Token': token,
'X-Container-Read': swift_test_perm[2],
'X-Container-Write': swift_test_perm[2]})
return check_response(conn)
resp = retry(put)
resp.read()
self.assertEquals(resp.status, 201)
# verify third account can not copy from private container
def copy(url, token, parsed, conn):
conn.request('PUT', '%s/%s/%s' % (
parsed.path, shared_container, 'private_object'), '',
{'X-Auth-Token': token,
'Content-Length': '0',
'X-Copy-From': '%s/%s' % (self.container, self.obj)})
return check_response(conn)
resp = retry(copy, use_account=3)
resp.read()
self.assertEquals(resp.status, 403)
# verify third account can write "obj1" to shared container
def put(url, token, parsed, conn):
conn.request('PUT', '%s/%s/%s' % (
parsed.path, shared_container, 'obj1'), 'test',
{'X-Auth-Token': token})
return check_response(conn)
resp = retry(put, use_account=3)
resp.read()
self.assertEquals(resp.status, 201)
# verify third account can copy "obj1" to shared container
def copy2(url, token, parsed, conn):
conn.request('COPY', '%s/%s/%s' % (
parsed.path, shared_container, 'obj1'), '',
{'X-Auth-Token': token,
'Destination': '%s/%s' % (shared_container, 'obj1')})
return check_response(conn)
resp = retry(copy2, use_account=3)
resp.read()
self.assertEquals(resp.status, 201)
# verify third account STILL can not copy from private container
def copy3(url, token, parsed, conn):
conn.request('COPY', '%s/%s/%s' % (
parsed.path, self.container, self.obj), '',
{'X-Auth-Token': token,
'Destination': '%s/%s' % (shared_container,
'private_object')})
return check_response(conn)
resp = retry(copy3, use_account=3)
resp.read()
self.assertEquals(resp.status, 403)
# clean up "obj1"
def delete(url, token, parsed, conn):
conn.request('DELETE', '%s/%s/%s' % (
parsed.path, shared_container, 'obj1'), '',
{'X-Auth-Token': token})
return check_response(conn)
resp = retry(delete)
resp.read()
self.assertEquals(resp.status, 204)
# clean up shared_container
def delete(url, token, parsed, conn):
conn.request('DELETE',
parsed.path + '/' + shared_container, '',
{'X-Auth-Token': token})
return check_response(conn)
resp = retry(delete)
resp.read()
self.assertEquals(resp.status, 204)
def test_manifest(self):
if skip:
raise SkipTest
# Data for the object segments
segments1 = ['one', 'two', 'three', 'four', 'five']
segments2 = ['six', 'seven', 'eight']
segments3 = ['nine', 'ten', 'eleven']
# Upload the first set of segments
def put(url, token, parsed, conn, objnum):
conn.request('PUT', '%s/%s/segments1/%s' % (
parsed.path, self.container, str(objnum)), segments1[objnum],
{'X-Auth-Token': token})
return check_response(conn)
for objnum in xrange(len(segments1)):
resp = retry(put, objnum)
resp.read()
self.assertEquals(resp.status, 201)
# Upload the manifest
def put(url, token, parsed, conn):
conn.request('PUT', '%s/%s/manifest' % (
parsed.path, self.container), '', {
'X-Auth-Token': token,
'X-Object-Manifest': '%s/segments1/' % self.container,
'Content-Type': 'text/jibberish', 'Content-Length': '0'})
return check_response(conn)
resp = retry(put)
resp.read()
self.assertEquals(resp.status, 201)
# Get the manifest (should get all the segments as the body)
def get(url, token, parsed, conn):
conn.request('GET', '%s/%s/manifest' % (
parsed.path, self.container), '', {'X-Auth-Token': token})
return check_response(conn)
resp = retry(get)
self.assertEquals(resp.read(), ''.join(segments1))
self.assertEquals(resp.status, 200)
self.assertEquals(resp.getheader('content-type'), 'text/jibberish')
# Get with a range at the start of the second segment
def get(url, token, parsed, conn):
conn.request('GET', '%s/%s/manifest' % (
parsed.path, self.container), '', {
'X-Auth-Token': token, 'Range': 'bytes=3-'})
return check_response(conn)
resp = retry(get)
self.assertEquals(resp.read(), ''.join(segments1[1:]))
self.assertEquals(resp.status, 206)
# Get with a range in the middle of the second segment
def get(url, token, parsed, conn):
conn.request('GET', '%s/%s/manifest' % (
parsed.path, self.container), '', {
'X-Auth-Token': token, 'Range': 'bytes=5-'})
return check_response(conn)
resp = retry(get)
self.assertEquals(resp.read(), ''.join(segments1)[5:])
self.assertEquals(resp.status, 206)
# Get with a full start and stop range
def get(url, token, parsed, conn):
conn.request('GET', '%s/%s/manifest' % (
parsed.path, self.container), '', {
'X-Auth-Token': token, 'Range': 'bytes=5-10'})
return check_response(conn)
resp = retry(get)
self.assertEquals(resp.read(), ''.join(segments1)[5:11])
self.assertEquals(resp.status, 206)
# Upload the second set of segments
def put(url, token, parsed, conn, objnum):
conn.request('PUT', '%s/%s/segments2/%s' % (
parsed.path, self.container, str(objnum)), segments2[objnum],
{'X-Auth-Token': token})
return check_response(conn)
for objnum in xrange(len(segments2)):
resp = retry(put, objnum)
resp.read()
self.assertEquals(resp.status, 201)
# Get the manifest (should still be the first segments of course)
def get(url, token, parsed, conn):
conn.request('GET', '%s/%s/manifest' % (
parsed.path, self.container), '', {'X-Auth-Token': token})
return check_response(conn)
resp = retry(get)
self.assertEquals(resp.read(), ''.join(segments1))
self.assertEquals(resp.status, 200)
# Update the manifest
def put(url, token, parsed, conn):
conn.request('PUT', '%s/%s/manifest' % (
parsed.path, self.container), '', {
'X-Auth-Token': token,
'X-Object-Manifest': '%s/segments2/' % self.container,
'Content-Length': '0'})
return check_response(conn)
resp = retry(put)
resp.read()
self.assertEquals(resp.status, 201)
# Get the manifest (should be the second set of segments now)
def get(url, token, parsed, conn):
conn.request('GET', '%s/%s/manifest' % (
parsed.path, self.container), '', {'X-Auth-Token': token})
return check_response(conn)
resp = retry(get)
self.assertEquals(resp.read(), ''.join(segments2))
self.assertEquals(resp.status, 200)
if not skip3:
# Ensure we can't access the manifest with the third account
def get(url, token, parsed, conn):
conn.request('GET', '%s/%s/manifest' % (
parsed.path, self.container), '', {'X-Auth-Token': token})
return check_response(conn)
resp = retry(get, use_account=3)
resp.read()
self.assertEquals(resp.status, 403)
# Grant access to the third account
def post(url, token, parsed, conn):
conn.request('POST', '%s/%s' % (parsed.path, self.container),
'', {'X-Auth-Token': token,
'X-Container-Read': swift_test_perm[2]})
return check_response(conn)
resp = retry(post)
resp.read()
self.assertEquals(resp.status, 204)
# The third account should be able to get the manifest now
def get(url, token, parsed, conn):
conn.request('GET', '%s/%s/manifest' % (
parsed.path, self.container), '', {'X-Auth-Token': token})
return check_response(conn)
resp = retry(get, use_account=3)
self.assertEquals(resp.read(), ''.join(segments2))
self.assertEquals(resp.status, 200)
# Create another container for the third set of segments
acontainer = uuid4().hex
def put(url, token, parsed, conn):
conn.request('PUT', parsed.path + '/' + acontainer, '',
{'X-Auth-Token': token})
return check_response(conn)
resp = retry(put)
resp.read()
self.assertEquals(resp.status, 201)
# Upload the third set of segments in the other container
def put(url, token, parsed, conn, objnum):
conn.request('PUT', '%s/%s/segments3/%s' % (
parsed.path, acontainer, str(objnum)), segments3[objnum],
{'X-Auth-Token': token})
return check_response(conn)
for objnum in xrange(len(segments3)):
resp = retry(put, objnum)
resp.read()
self.assertEquals(resp.status, 201)
# Update the manifest
def put(url, token, parsed, conn):
conn.request('PUT', '%s/%s/manifest' % (
parsed.path, self.container), '',
{'X-Auth-Token': token,
'X-Object-Manifest': '%s/segments3/' % acontainer,
'Content-Length': '0'})
return check_response(conn)
resp = retry(put)
resp.read()
self.assertEquals(resp.status, 201)
# Get the manifest to ensure it's the third set of segments
def get(url, token, parsed, conn):
conn.request('GET', '%s/%s/manifest' % (
parsed.path, self.container), '', {'X-Auth-Token': token})
return check_response(conn)
resp = retry(get)
self.assertEquals(resp.read(), ''.join(segments3))
self.assertEquals(resp.status, 200)
if not skip3:
# Ensure we can't access the manifest with the third account
# (because the segments are in a protected container even if the
# manifest itself is not).
def get(url, token, parsed, conn):
conn.request('GET', '%s/%s/manifest' % (
parsed.path, self.container), '', {'X-Auth-Token': token})
return check_response(conn)
resp = retry(get, use_account=3)
resp.read()
self.assertEquals(resp.status, 403)
# Grant access to the third account
def post(url, token, parsed, conn):
conn.request('POST', '%s/%s' % (parsed.path, acontainer),
'', {'X-Auth-Token': token,
'X-Container-Read': swift_test_perm[2]})
return check_response(conn)
resp = retry(post)
resp.read()
self.assertEquals(resp.status, 204)
# The third account should be able to get the manifest now
def get(url, token, parsed, conn):
conn.request('GET', '%s/%s/manifest' % (
parsed.path, self.container), '', {'X-Auth-Token': token})
return check_response(conn)
resp = retry(get, use_account=3)
self.assertEquals(resp.read(), ''.join(segments3))
self.assertEquals(resp.status, 200)
# Delete the manifest
def delete(url, token, parsed, conn, objnum):
conn.request('DELETE', '%s/%s/manifest' % (
parsed.path,
self.container), '', {'X-Auth-Token': token})
return check_response(conn)
resp = retry(delete, objnum)
resp.read()
self.assertEquals(resp.status, 204)
# Delete the third set of segments
def delete(url, token, parsed, conn, objnum):
conn.request('DELETE', '%s/%s/segments3/%s' % (
parsed.path, acontainer, str(objnum)), '',
{'X-Auth-Token': token})
return check_response(conn)
for objnum in xrange(len(segments3)):
resp = retry(delete, objnum)
resp.read()
self.assertEquals(resp.status, 204)
# Delete the second set of segments
def delete(url, token, parsed, conn, objnum):
conn.request('DELETE', '%s/%s/segments2/%s' % (
parsed.path, self.container, str(objnum)), '',
{'X-Auth-Token': token})
return check_response(conn)
for objnum in xrange(len(segments2)):
resp = retry(delete, objnum)
resp.read()
self.assertEquals(resp.status, 204)
# Delete the first set of segments
def delete(url, token, parsed, conn, objnum):
conn.request('DELETE', '%s/%s/segments1/%s' % (
parsed.path, self.container, str(objnum)), '',
{'X-Auth-Token': token})
return check_response(conn)
for objnum in xrange(len(segments1)):
resp = retry(delete, objnum)
resp.read()
self.assertEquals(resp.status, 204)
# Delete the extra container
def delete(url, token, parsed, conn):
conn.request('DELETE', '%s/%s' % (parsed.path, acontainer), '',
{'X-Auth-Token': token})
return check_response(conn)
resp = retry(delete)
resp.read()
self.assertEquals(resp.status, 204)
def test_delete_content_type(self):
if skip:
raise SkipTest
def put(url, token, parsed, conn):
conn.request('PUT', '%s/%s/hi' % (parsed.path, self.container),
'there', {'X-Auth-Token': token})
return check_response(conn)
resp = retry(put)
resp.read()
self.assertEquals(resp.status, 201)
def delete(url, token, parsed, conn):
conn.request('DELETE', '%s/%s/hi' % (parsed.path, self.container),
'', {'X-Auth-Token': token})
return check_response(conn)
resp = retry(delete)
resp.read()
self.assertEquals(resp.status, 204)
self.assertEquals(resp.getheader('Content-Type'),
'text/html; charset=UTF-8')
def test_null_name(self):
if skip:
raise SkipTest
def put(url, token, parsed, conn):
conn.request('PUT', '%s/%s/abc%%00def' % (
parsed.path,
self.container), 'test', {'X-Auth-Token': token})
return check_response(conn)
resp = retry(put)
if (web_front_end == 'apache2'):
self.assertEquals(resp.status, 404)
else:
self.assertEquals(resp.read(), 'Invalid UTF8 or contains NULL')
self.assertEquals(resp.status, 412)
if __name__ == '__main__':
unittest.main()

View File

@ -78,12 +78,6 @@ nosetests -v --exe \
--with-html-output \
--html-out-file functional_tests/gluster-swift-generic-functional-result.html \
test/functional || fail "Functional tests failed"
nosetests -v --exe \
--with-xunit \
--xunit-file functional_tests/gluster-swift-functionalnosetests-TC-report.xml \
--with-html-output \
--html-out-file functional_tests/gluster-swift-functionalnosetests-result.html \
test/functionalnosetests || fail "Functional-nose tests failed"
cleanup
exit 0

View File

@ -70,12 +70,6 @@ run_generic_tests()
--with-html-output \
--html-out-file functional_tests/gluster-swift-gswauth-generic-functional-result.html \
test/functional || fail "Functional tests failed"
nosetests -v --exe \
--with-xunit \
--xunit-file functional_tests/gluster-swift-gswauth-functionalnosetests-TC-report.xml \
--with-html-output \
--html-out-file functional_tests/gluster-swift-gswauth-functionalnosetests-result.html \
test/functionalnosetests || fail "Functional-nose tests failed"
}
### MAIN ###

View File

@ -97,12 +97,6 @@ nosetests -v --exe \
--with-html-output \
--html-out-file functional_tests/gluster-swift-keystone-generic-functional-result.html \
test/functional || fail "Functional tests failed"
nosetests -v --exe \
--with-xunit \
--xunit-file functional_tests/gluster-swift-keystone-functionalnosetests-TC-report.xml \
--with-html-output \
--html-out-file functional_tests/gluster-swift-keystone-functionalnosetests-result.html \
test/functionalnosetests || fail "Functional-nose tests failed"
cleanup
exit 0

View File

@ -91,12 +91,6 @@ nosetests -v --exe \
--with-html-output \
--html-out-file functional_tests/gluster-swift-swiftkerbauth-generic-functional-result.html \
test/functional_auth/swiftkerbauth || fail "Functional tests failed"
#nosetests -v --exe \
# --with-xunit \
# --xunit-file functional_tests/gluster-swift-swiftkerbauth-functionalnosetests-TC-report.xml \
# --with-html-output \
# --html-out-file functional_tests/gluster-swift-swiftkerbauth-functionalnosetests-result.html \
# test/functional || fail "Functional-nose tests failed"
cleanup
exit 0