Remove six

This mostly affects tests. Nothing too complicated

Signed-off-by: Stephen Finucane <stephenfin@redhat.com>
Change-Id: Iabc78f651e1d48db35638280722f8019798eccd6
This commit is contained in:
Stephen Finucane 2022-02-17 11:37:13 +00:00
parent 4983b90983
commit fa137a5bf1
17 changed files with 116 additions and 175 deletions

View File

@ -1,3 +1 @@
requests>=2.4.0
six>=1.9.0

View File

@ -40,7 +40,7 @@ import datetime
import json
import time
from six.moves.urllib.parse import urljoin
from urllib.parse import urljoin
# Note that while we import keystoneauth1 here, we *don't* need to add it to
# requirements.txt -- this entire module only makes sense (and should only be

View File

@ -22,11 +22,10 @@ import logging
import warnings
from requests.exceptions import RequestException, SSLError
from six.moves import http_client
from six.moves.urllib.parse import quote as _quote, unquote
from six.moves.urllib.parse import urljoin, urlparse, urlunparse
import http.client as http_client
from urllib.parse import quote as _quote, unquote
from urllib.parse import urljoin, urlparse, urlunparse
from time import sleep, time
import six
from swiftclient import version as swiftclient_version
from swiftclient.exceptions import ClientException
@ -165,34 +164,20 @@ def http_log(args, kwargs, resp, body):
def parse_header_string(data):
if not isinstance(data, (six.text_type, six.binary_type)):
if not isinstance(data, (str, bytes)):
data = str(data)
if six.PY2:
if isinstance(data, six.text_type):
# Under Python2 requests only returns binary_type, but if we get
# some stray text_type input, this should prevent unquote from
# interpreting %-encoded data as raw code-points.
data = data.encode('utf8')
if isinstance(data, bytes):
# Under Python3 requests only returns text_type and tosses (!) the
# rest of the headers. If that ever changes, this should be a sane
# approach.
try:
unquoted = unquote(data).decode('utf8')
data = data.decode('ascii')
except UnicodeDecodeError:
try:
return data.decode('utf8')
except UnicodeDecodeError:
return quote(data).decode('utf8')
else:
if isinstance(data, six.binary_type):
# Under Python3 requests only returns text_type and tosses (!) the
# rest of the headers. If that ever changes, this should be a sane
# approach.
try:
data = data.decode('ascii')
except UnicodeDecodeError:
data = quote(data)
try:
unquoted = unquote(data, errors='strict')
except UnicodeDecodeError:
return data
data = quote(data)
try:
unquoted = unquote(data, errors='strict')
except UnicodeDecodeError:
return data
return unquoted
@ -201,20 +186,18 @@ def quote(value, safe='/'):
Patched version of urllib.quote that encodes utf8 strings before quoting.
On Python 3, call directly urllib.parse.quote().
"""
if six.PY3:
return _quote(value, safe=safe)
return _quote(encode_utf8(value), safe)
return _quote(value, safe=safe)
def encode_utf8(value):
if type(value) in six.integer_types + (float, bool):
if type(value) in (int, float, bool):
# As of requests 2.11.0, headers must be byte- or unicode-strings.
# Convert some known-good types as a convenience for developers.
# Note that we *don't* convert subclasses, as they may have overriddden
# __str__ or __repr__.
# See https://github.com/kennethreitz/requests/pull/3366 for more info
value = str(value)
if isinstance(value, six.text_type):
if isinstance(value, str):
value = value.encode('utf8')
return value
@ -226,7 +209,7 @@ def encode_meta_headers(headers):
value = encode_utf8(value)
header = header.lower()
if (isinstance(header, six.string_types) and
if (isinstance(header, str) and
header.startswith(USER_METADATA_TYPE)):
header = encode_utf8(header)
@ -457,12 +440,12 @@ class HTTPConnection(object):
old_getheader = self.resp.raw.getheader
def _decode_header(string):
if string is None or six.PY2:
if string is None:
return string
return string.encode('iso-8859-1').decode('utf-8')
def _encode_header(string):
if string is None or six.PY2:
if string is None:
return string
return string.encode('utf-8').decode('iso-8859-1')
@ -1441,7 +1424,7 @@ def put_object(url, token=None, container=None, name=None, contents=None,
warnings.warn(warn_msg, stacklevel=2)
# Match requests's is_stream test
if hasattr(contents, '__iter__') and not isinstance(contents, (
six.text_type, six.binary_type, list, tuple, dict)):
str, bytes, list, tuple, dict)):
contents = iter_wrapper(contents)
conn.request('PUT', path, contents, headers)

View File

@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from six.moves import urllib
import urllib
class ClientException(Exception):

View File

@ -13,11 +13,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import six
import sys
from concurrent.futures import ThreadPoolExecutor
from six.moves.queue import PriorityQueue
from queue import PriorityQueue
class OutputManager(object):
@ -70,12 +69,8 @@ class OutputManager(object):
self.print_pool.submit(self._write, data, self.print_stream)
def _write(self, data, stream):
if six.PY3:
stream.buffer.write(data)
stream.flush()
if six.PY2:
stream.write(data)
stream.flush()
stream.buffer.write(data)
stream.flush()
def print_msg(self, msg, *fmt_args):
if fmt_args:
@ -100,8 +95,6 @@ class OutputManager(object):
def _print(self, item, stream=None):
if stream is None:
stream = self.print_stream
if six.PY2 and isinstance(item, six.text_type):
item = item.encode('utf8')
print(item, file=stream)
def _print_error(self, item, count=1):

View File

@ -21,6 +21,7 @@ from concurrent.futures import as_completed, CancelledError, TimeoutError
from copy import deepcopy
from errno import EEXIST, ENOENT
from hashlib import md5
from io import StringIO
from os import environ, makedirs, stat, utime
from os.path import (
basename, dirname, getmtime, getsize, isdir, join, sep as os_path_sep
@ -29,10 +30,9 @@ from posixpath import join as urljoin
from random import shuffle
from time import time
from threading import Thread
from six import Iterator, StringIO, string_types, text_type
from six.moves.queue import Queue
from six.moves.queue import Empty as QueueEmpty
from six.moves.urllib.parse import quote
from queue import Queue
from queue import Empty as QueueEmpty
from urllib.parse import quote
import json
@ -54,7 +54,7 @@ DISK_BUFFER = 2 ** 16
logger = logging.getLogger("swiftclient.service")
class ResultsIterator(Iterator):
class ResultsIterator:
def __init__(self, futures):
self.futures = interruptable_as_completed(futures)
@ -321,10 +321,10 @@ class SwiftUploadObject(object):
options to be specified separately for each individual object.
"""
def __init__(self, source, object_name=None, options=None):
if isinstance(source, string_types):
if isinstance(source, str):
self.object_name = object_name or source
elif source is None or hasattr(source, 'read'):
if not object_name or not isinstance(object_name, string_types):
if not object_name or not isinstance(object_name, str):
raise SwiftError('Object names must be specified as '
'strings for uploads from None or file '
'like objects.')
@ -347,7 +347,7 @@ class SwiftPostObject(object):
specified separately for each individual object.
"""
def __init__(self, object_name, options=None):
if not (isinstance(object_name, string_types) and object_name):
if not (isinstance(object_name, str) and object_name):
raise SwiftError(
"Object names must be specified as non-empty strings"
)
@ -361,7 +361,7 @@ class SwiftDeleteObject(object):
specified separately for each individual object.
"""
def __init__(self, object_name, options=None):
if not (isinstance(object_name, string_types) and object_name):
if not (isinstance(object_name, str) and object_name):
raise SwiftError(
"Object names must be specified as non-empty strings"
)
@ -377,7 +377,7 @@ class SwiftCopyObject(object):
destination and fresh_metadata should be set in options
"""
def __init__(self, object_name, options=None):
if not (isinstance(object_name, string_types) and object_name):
if not (isinstance(object_name, str) and object_name):
raise SwiftError(
"Object names must be specified as non-empty strings"
)
@ -835,7 +835,7 @@ class SwiftService(object):
post_objects = []
for o in objects:
if isinstance(o, string_types):
if isinstance(o, str):
obj = SwiftPostObject(o)
post_objects.append(obj)
elif isinstance(o, SwiftPostObject):
@ -1637,7 +1637,7 @@ class SwiftService(object):
upload_objects = []
for o in objects:
if isinstance(o, string_types):
if isinstance(o, str):
obj = SwiftUploadObject(o, urljoin(pseudo_folder,
o.lstrip('/')))
upload_objects.append(obj)
@ -2035,7 +2035,7 @@ class SwiftService(object):
segment_results.sort(key=lambda di: di['segment_index'])
for seg in segment_results:
seg_loc = seg['segment_location'].lstrip('/')
if isinstance(seg_loc, text_type):
if isinstance(seg_loc, str):
seg_loc = seg_loc.encode('utf-8')
manifest_data = json.dumps([
@ -2578,7 +2578,7 @@ class SwiftService(object):
delete_objects = []
for o in objects:
if isinstance(o, string_types):
if isinstance(o, str):
obj = SwiftDeleteObject(o)
delete_objects.append(obj)
elif isinstance(o, SwiftDeleteObject):
@ -2933,7 +2933,7 @@ class SwiftService(object):
copy_objects = []
for o in objects:
if isinstance(o, string_types):
if isinstance(o, str):
obj = SwiftCopyObject(o, options)
copy_objects.append(obj)
elif isinstance(o, SwiftCopyObject):

View File

@ -25,8 +25,7 @@ import warnings
from os import environ, walk, _exit as os_exit
from os.path import isfile, isdir, join
from six import text_type, PY2
from six.moves.urllib.parse import unquote, urlparse
from urllib.parse import unquote, urlparse
from sys import argv as sys_argv, exit, stderr, stdin
from time import gmtime, strftime
@ -191,10 +190,6 @@ def st_delete(parser, args, output_manager, return_parser=False):
for o, err in r.get('result', {}).get('Errors', []):
# o will be of the form quote("/<cont>/<obj>")
o = unquote(o)
if PY2:
# In PY3, unquote(unicode) uses utf-8 like we
# want, but PY2 uses latin-1
o = o.encode('latin-1').decode('utf-8')
output_manager.error('Error Deleting: {0}: {1}'
.format(o[1:], err))
try:
@ -1931,7 +1926,7 @@ def add_default_args(parser):
def main(arguments=None):
argv = sys_argv if arguments is None else arguments
argv = [a if isinstance(a, text_type) else a.decode('utf-8') for a in argv]
argv = [a if isinstance(a, str) else a.decode('utf-8') for a in argv]
parser = argparse.ArgumentParser(
add_help=False, formatter_class=HelpFormatter, usage='''

View File

@ -13,17 +13,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""Miscellaneous utility functions for use with Swift."""
from calendar import timegm
try:
from collections.abc import Mapping
except ImportError:
from collections import Mapping
from collections.abc import Mapping
import gzip
import hashlib
import hmac
import io
import json
import logging
import six
import time
import traceback
@ -42,7 +40,7 @@ def config_true_value(value):
This function comes from swift.common.utils.config_true_value()
"""
return value is True or \
(isinstance(value, six.string_types) and value.lower() in TRUE_VALUES)
(isinstance(value, str) and value.lower() in TRUE_VALUES)
def prt_bytes(num_bytes, human_flag):
@ -134,7 +132,7 @@ def generate_temp_url(path, seconds, key, method, absolute=False,
except ValueError:
raise ValueError(TIME_ERRMSG)
if isinstance(path, six.binary_type):
if isinstance(path, bytes):
try:
path_for_body = path.decode('utf-8')
except UnicodeDecodeError:
@ -165,7 +163,7 @@ def generate_temp_url(path, seconds, key, method, absolute=False,
('prefix:' if prefix else '') + path_for_body]
if ip_range:
if isinstance(ip_range, six.binary_type):
if isinstance(ip_range, bytes):
try:
ip_range = ip_range.decode('utf-8')
except UnicodeDecodeError:
@ -177,7 +175,7 @@ def generate_temp_url(path, seconds, key, method, absolute=False,
hmac_body = u'\n'.join(hmac_parts)
# Encode to UTF-8 for py3 compatibility
if not isinstance(key, six.binary_type):
if not isinstance(key, bytes):
key = key.encode('utf-8')
sig = hmac.new(key, hmac_body.encode('utf-8'), hashlib.sha1).hexdigest()
@ -194,7 +192,7 @@ def generate_temp_url(path, seconds, key, method, absolute=False,
if prefix:
temp_url += u'&temp_url_prefix={}'.format(parts[4])
# Have return type match path from caller
if isinstance(path, six.binary_type):
if isinstance(path, bytes):
return temp_url.encode('utf-8')
else:
return temp_url
@ -202,7 +200,7 @@ def generate_temp_url(path, seconds, key, method, absolute=False,
def get_body(headers, body):
if headers.get('content-encoding') == 'gzip':
with gzip.GzipFile(fileobj=six.BytesIO(body), mode='r') as gz:
with gzip.GzipFile(fileobj=io.BytesIO(body), mode='r') as gz:
nbody = gz.read()
return nbody
return body
@ -224,7 +222,7 @@ def split_request_headers(options, prefix=''):
if isinstance(options, Mapping):
options = options.items()
for item in options:
if isinstance(item, six.string_types):
if isinstance(item, str):
if ':' not in item:
raise ValueError(
"Metadata parameter %s must contain a ':'.\n"
@ -401,8 +399,6 @@ def n_groups(seq, n):
def normalize_manifest_path(path):
if six.PY2 and isinstance(path, six.text_type):
path = path.encode('utf-8')
if path.startswith('/'):
return path[1:]
return path

View File

@ -13,8 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import configparser
import os
from six.moves import configparser
TEST_CONFIG = None

View File

@ -17,8 +17,6 @@ import unittest
import time
from io import BytesIO
import six
import swiftclient
from . import TEST_CONFIG
@ -417,12 +415,6 @@ class TestFunctional(unittest.TestCase):
# https://bugs.python.org/issue37093
# We'll have to settle for just testing that the POST doesn't blow up
# with a UnicodeDecodeError
if six.PY2:
headers = self.conn.head_object(
self.containername, self.objectname)
self.assertIn(u'x-object-meta-\U0001f44d', headers)
self.assertEqual(u'\U0001f44d',
headers.get(u'x-object-meta-\U0001f44d'))
def test_copy_object(self):
self.conn.put_object(

View File

@ -14,7 +14,7 @@
# limitations under the License.
import mock
from six import StringIO
from io import StringIO
import unittest
from swiftclient import command_helpers as h

View File

@ -12,13 +12,13 @@
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from queue import Queue, Empty
import sys
import unittest
import threading
import six
from concurrent.futures import as_completed
from six.moves.queue import Queue, Empty
from time import sleep
from swiftclient import multithreading as mt
@ -216,11 +216,7 @@ class TestOutputManager(unittest.TestCase):
# The threads should have been cleaned up
self.assertEqual(starting_thread_count, threading.active_count())
if six.PY3:
over_the = "over the '\u062a\u062a'\n"
else:
over_the = "over the u'\\u062a\\u062a'\n"
# We write to the CaptureStream so no decoding is performed
over_the = "over the '\u062a\u062a'\n"
self.assertEqual(''.join([
'one-argument\n',
'one fish, 88 fish\n',

View File

@ -13,20 +13,21 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import builtins
import contextlib
import io
import mock
import os
import six
import tempfile
import unittest
import time
import json
from io import BytesIO
from concurrent.futures import Future
from hashlib import md5
from mock import Mock, PropertyMock
from six.moves.queue import Queue, Empty as QueueEmptyError
from six import BytesIO
from queue import Queue, Empty as QueueEmptyError
from time import sleep
import swiftclient
@ -46,12 +47,6 @@ for key in os.environ:
clean_os_environ[key] = ''
if six.PY2:
import __builtin__ as builtins
else:
import builtins
class TestSwiftPostObject(unittest.TestCase):
def setUp(self):
@ -1268,7 +1263,7 @@ class TestService(unittest.TestCase):
for obj in objects:
with mock.patch('swiftclient.service.Connection') as mock_conn, \
mock.patch.object(builtins, 'open') as mock_open:
mock_open.return_value = six.StringIO('asdf')
mock_open.return_value = io.StringIO('asdf')
mock_conn.return_value.head_object.side_effect = \
ClientException('Not Found', http_status=404)
mock_conn.return_value.put_object.return_value =\
@ -2318,7 +2313,7 @@ class TestServiceDownload(_TestServiceBase):
def test_download_object_job(self):
mock_conn = self._get_mock_connection()
objcontent = six.BytesIO(b'objcontent')
objcontent = io.BytesIO(b'objcontent')
mock_conn.get_object.side_effect = [
({'content-type': 'text/plain',
'etag': '2cbbfe139a744d6abbe695e17f3c1991'},
@ -2360,7 +2355,7 @@ class TestServiceDownload(_TestServiceBase):
def test_download_object_job_with_mtime(self):
mock_conn = self._get_mock_connection()
objcontent = six.BytesIO(b'objcontent')
objcontent = io.BytesIO(b'objcontent')
mock_conn.get_object.side_effect = [
({'content-type': 'text/plain',
'etag': '2cbbfe139a744d6abbe695e17f3c1991',
@ -2406,7 +2401,7 @@ class TestServiceDownload(_TestServiceBase):
def test_download_object_job_bad_mtime(self):
mock_conn = self._get_mock_connection()
objcontent = six.BytesIO(b'objcontent')
objcontent = io.BytesIO(b'objcontent')
mock_conn.get_object.side_effect = [
({'content-type': 'text/plain',
'etag': '2cbbfe139a744d6abbe695e17f3c1991',
@ -2451,7 +2446,7 @@ class TestServiceDownload(_TestServiceBase):
def test_download_object_job_ignore_mtime(self):
mock_conn = self._get_mock_connection()
objcontent = six.BytesIO(b'objcontent')
objcontent = io.BytesIO(b'objcontent')
mock_conn.get_object.side_effect = [
({'content-type': 'text/plain',
'etag': '2cbbfe139a744d6abbe695e17f3c1991',

View File

@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import io
import contextlib
from genericpath import getmtime
import getpass
@ -26,8 +27,6 @@ import unittest
import textwrap
from time import localtime, mktime, strftime, strptime
import six
import swiftclient
from swiftclient.service import SwiftError
import swiftclient.shell
@ -46,10 +45,7 @@ try:
except ImportError:
InsecureRequestWarning = None
if six.PY2:
BUILTIN_OPEN = '__builtin__.open'
else:
BUILTIN_OPEN = 'builtins.open'
BUILTIN_OPEN = 'builtins.open'
mocked_os_environ = {
'ST_AUTH': 'http://localhost:8080/auth/v1.0',
@ -631,7 +627,7 @@ class TestShell(unittest.TestCase):
@mock.patch('swiftclient.service.makedirs')
@mock.patch('swiftclient.service.Connection')
def test_download(self, connection, makedirs):
objcontent = six.BytesIO(b'objcontent')
objcontent = io.BytesIO(b'objcontent')
connection.return_value.get_object.side_effect = [
({'content-type': 'text/plain',
'etag': '2cbbfe139a744d6abbe695e17f3c1991'},
@ -666,7 +662,7 @@ class TestShell(unittest.TestCase):
makedirs.reset_mock()
# Test downloading single object
objcontent = six.BytesIO(b'objcontent')
objcontent = io.BytesIO(b'objcontent')
connection.return_value.get_object.side_effect = [
({'content-type': 'text/plain',
'etag': '2cbbfe139a744d6abbe695e17f3c1991'},
@ -682,7 +678,7 @@ class TestShell(unittest.TestCase):
self.assertEqual([], makedirs.mock_calls)
# Test downloading without md5 checks
objcontent = six.BytesIO(b'objcontent')
objcontent = io.BytesIO(b'objcontent')
connection.return_value.get_object.side_effect = [
({'content-type': 'text/plain',
'etag': '2cbbfe139a744d6abbe695e17f3c1991'},
@ -700,7 +696,7 @@ class TestShell(unittest.TestCase):
self.assertEqual([], makedirs.mock_calls)
# Test downloading single object to stdout
objcontent = six.BytesIO(b'objcontent')
objcontent = io.BytesIO(b'objcontent')
connection.return_value.get_object.side_effect = [
({'content-type': 'text/plain',
'etag': '2cbbfe139a744d6abbe695e17f3c1991'},
@ -3246,7 +3242,7 @@ class TestAuth(MockHttpTest):
}
mock_resp = self.fake_http_connection(200, headers=headers)
with mock.patch('swiftclient.client.http_connection', new=mock_resp):
stdout = six.StringIO()
stdout = io.StringIO()
with mock.patch('sys.stdout', new=stdout):
argv = [
'',
@ -3265,7 +3261,7 @@ class TestAuth(MockHttpTest):
def test_auth_verbose(self):
with mock.patch('swiftclient.client.http_connection') as mock_conn:
stdout = six.StringIO()
stdout = io.StringIO()
with mock.patch('sys.stdout', new=stdout):
argv = [
'',
@ -3289,7 +3285,7 @@ class TestAuth(MockHttpTest):
os_options = {'tenant_name': 'demo'}
with mock.patch('swiftclient.client.get_auth_keystone',
new=fake_get_auth_keystone(os_options)):
stdout = six.StringIO()
stdout = io.StringIO()
with mock.patch('sys.stdout', new=stdout):
argv = [
'',
@ -3310,7 +3306,7 @@ class TestAuth(MockHttpTest):
def test_auth_verbose_v2(self):
with mock.patch('swiftclient.client.get_auth_keystone') \
as mock_keystone:
stdout = six.StringIO()
stdout = io.StringIO()
with mock.patch('sys.stdout', new=stdout):
argv = [
'',

View File

@ -17,15 +17,14 @@ import gzip
import json
import logging
import mock
import six
import io
import socket
import string
import unittest
import warnings
import tempfile
from hashlib import md5
from six import binary_type
from six.moves.urllib.parse import urlparse
from urllib.parse import urlparse
from requests.exceptions import RequestException
from .utils import (MockHttpTest, fake_get_auth_keystone, StubResponse,
@ -214,12 +213,12 @@ class TestHttpHelpers(MockHttpTest):
self.assertEqual(len(headers), len(r))
# ensure non meta headers are not encoded
self.assertIs(type(r.get('abc')), binary_type)
self.assertIs(type(r.get('abc')), bytes)
del r['abc']
for k, v in r.items():
self.assertIs(type(k), binary_type)
self.assertIs(type(v), binary_type)
self.assertIs(type(k), bytes)
self.assertIs(type(v), bytes)
self.assertIn(v, (b'123', b'12.3', b'True'))
def test_set_user_agent_default(self):
@ -1329,7 +1328,7 @@ class TestPutObject(MockHttpTest):
c.http_connection = self.fake_http_connection(200)
args = ('http://www.test.com', 'TOKEN', 'container', 'obj', 'body', 4)
value = c.put_object(*args)
self.assertIsInstance(value, six.string_types)
self.assertIsInstance(value, str)
self.assertEqual(value, EMPTY_ETAG)
self.assertRequests([
('PUT', '/container/obj', 'body', {
@ -1340,7 +1339,7 @@ class TestPutObject(MockHttpTest):
def test_unicode_ok(self):
conn = c.http_connection(u'http://www.test.com/')
mock_file = six.StringIO(u'\u5929\u7a7a\u4e2d\u7684\u4e4c\u4e91')
mock_file = io.StringIO(u'\u5929\u7a7a\u4e2d\u7684\u4e4c\u4e91')
args = (u'\u5929\u7a7a\u4e2d\u7684\u4e4c\u4e91',
u'\u5929\u7a7a\u4e2d\u7684\u4e4c\u4e91',
u'\u5929\u7a7a\u4e2d\u7684\u4e4c\u4e91',
@ -1354,7 +1353,7 @@ class TestPutObject(MockHttpTest):
conn[1].getresponse = resp.fake_response
conn[1]._request = resp._fake_request
value = c.put_object(*args, headers=headers, http_conn=conn)
self.assertIsInstance(value, six.string_types)
self.assertIsInstance(value, str)
# Test for RFC-2616 encoded symbols
self.assertIn(("a-b", b".x:yz mn:fg:lp"),
resp.buffer)
@ -1364,7 +1363,7 @@ class TestPutObject(MockHttpTest):
def test_chunk_warning(self):
conn = c.http_connection('http://www.test.com/')
mock_file = six.StringIO('asdf')
mock_file = io.StringIO('asdf')
args = ('asdf', 'asdf', 'asdf', 'asdf', mock_file)
resp = MockHttpResponse()
conn[1].getresponse = resp.fake_response
@ -1960,7 +1959,7 @@ class TestHTTPConnection(MockHttpTest):
self.assertFalse(resp.read())
self.assertTrue(resp.closed)
def test_response_python3_headers(self):
def test_response_headers(self):
'''Test latin1-encoded headers.
'''
_, conn = c.http_connection(u'http://www.test.com/')
@ -2547,7 +2546,7 @@ class TestConnection(MockHttpTest):
class LocalContents(object):
def __init__(self, tell_value=0):
self.data = six.BytesIO(string.ascii_letters.encode() * 10)
self.data = io.BytesIO(string.ascii_letters.encode() * 10)
self.data.seek(tell_value)
self.reads = []
self.seeks = []
@ -2845,7 +2844,7 @@ class TestLogging(MockHttpTest):
c.http_connection = self.fake_http_connection(200)
args = ('http://www.test.com', 'asdf', 'asdf', 'asdf', 'asdf')
value = c.put_object(*args)
self.assertIsInstance(value, six.string_types)
self.assertIsInstance(value, str)
def test_head_error(self):
c.http_connection = self.fake_http_connection(500)
@ -2859,7 +2858,7 @@ class TestLogging(MockHttpTest):
self.assertEqual(exc_context.exception.http_status, 404)
def test_content_encoding_gzip_body_is_logged_decoded(self):
buf = six.BytesIO()
buf = io.BytesIO()
gz = gzip.GzipFile(fileobj=buf, mode='w')
data = {"test": u"\u2603"}
decoded_body = json.dumps(data).encode('utf-8')

View File

@ -14,10 +14,10 @@
# limitations under the License.
import gzip
import io
import json
import unittest
import mock
import six
import tempfile
from time import gmtime, localtime, mktime, strftime, strptime
from hashlib import md5, sha1
@ -142,7 +142,7 @@ class TestTempURL(unittest.TestCase):
url = u.generate_temp_url(self.url, self.seconds,
self.key, self.method)
key = self.key
if not isinstance(key, six.binary_type):
if not isinstance(key, bytes):
key = key.encode('utf-8')
self.assertEqual(url, self.expected_url)
self.assertEqual(hmac_mock.mock_calls, [
@ -170,10 +170,10 @@ class TestTempURL(unittest.TestCase):
self.key, self.method,
ip_range=ip_range)
key = self.key
if not isinstance(key, six.binary_type):
if not isinstance(key, bytes):
key = key.encode('utf-8')
if isinstance(ip_range, six.binary_type):
if isinstance(ip_range, bytes):
ip_range_expected_url = (
expected_url + ip_range.decode('utf-8')
)
@ -215,7 +215,7 @@ class TestTempURL(unittest.TestCase):
lt = localtime()
expires = strftime(u.EXPIRES_ISO8601_FORMAT[:-1], lt)
if not isinstance(self.expected_url, six.string_types):
if not isinstance(self.expected_url, str):
expected_url = self.expected_url.replace(
b'1400003600', bytes(str(int(mktime(lt))), encoding='ascii'))
else:
@ -228,7 +228,7 @@ class TestTempURL(unittest.TestCase):
expires = strftime(u.SHORT_EXPIRES_ISO8601_FORMAT, lt)
lt = strptime(expires, u.SHORT_EXPIRES_ISO8601_FORMAT)
if not isinstance(self.expected_url, six.string_types):
if not isinstance(self.expected_url, str):
expected_url = self.expected_url.replace(
b'1400003600', bytes(str(int(mktime(lt))), encoding='ascii'))
else:
@ -246,11 +246,11 @@ class TestTempURL(unittest.TestCase):
self.key, self.method,
iso8601=True)
key = self.key
if not isinstance(key, six.binary_type):
if not isinstance(key, bytes):
key = key.encode('utf-8')
expires = strftime(u.EXPIRES_ISO8601_FORMAT, gmtime(1400003600))
if not isinstance(self.url, six.string_types):
if not isinstance(self.url, str):
self.assertTrue(url.endswith(bytes(expires, 'utf-8')))
else:
self.assertTrue(url.endswith(expires))
@ -280,7 +280,7 @@ class TestTempURL(unittest.TestCase):
url = u.generate_temp_url(path, self.seconds,
self.key, self.method, prefix=True)
key = self.key
if not isinstance(key, six.binary_type):
if not isinstance(key, bytes):
key = key.encode('utf-8')
self.assertEqual(url, expected_url)
self.assertEqual(hmac_mock.mock_calls, [
@ -299,7 +299,7 @@ class TestTempURL(unittest.TestCase):
@mock.patch('hmac.HMAC.hexdigest', return_value="temp_url_signature")
def test_generate_absolute_expiry_temp_url(self, hmac_mock):
if isinstance(self.expected_url, six.binary_type):
if isinstance(self.expected_url, bytes):
expected_url = self.expected_url.replace(
b'1400003600', b'2146636800')
else:
@ -486,7 +486,7 @@ class TestReadableToIterable(unittest.TestCase):
class TestLengthWrapper(unittest.TestCase):
def test_stringio(self):
contents = six.StringIO(u'a' * 50 + u'b' * 50)
contents = io.StringIO(u'a' * 50 + u'b' * 50)
contents.seek(22)
data = u.LengthWrapper(contents, 42, True)
s = u'a' * 28 + u'b' * 14
@ -506,7 +506,7 @@ class TestLengthWrapper(unittest.TestCase):
self.assertEqual(md5(s.encode()).hexdigest(), data.get_md5sum())
def test_bytesio(self):
contents = six.BytesIO(b'a' * 50 + b'b' * 50)
contents = io.BytesIO(b'a' * 50 + b'b' * 50)
contents.seek(22)
data = u.LengthWrapper(contents, 42, True)
s = b'a' * 28 + b'b' * 14
@ -613,7 +613,7 @@ class TestApiResponeParser(unittest.TestCase):
self.assertEqual({u't\xe9st': u'\xff'}, result)
def test_gzipped_utf8(self):
buf = six.BytesIO()
buf = io.BytesIO()
gz = gzip.GzipFile(fileobj=buf, mode='w')
gz.write(u'{"test": "\u2603"}'.encode('utf8'))
gz.close()
@ -631,7 +631,7 @@ class TestGetBody(unittest.TestCase):
self.assertEqual({'test': u'\u2603'}, result)
def test_gzipped_body(self):
buf = six.BytesIO()
buf = io.BytesIO()
gz = gzip.GzipFile(fileobj=buf, mode='w')
gz.write(u'{"test": "\u2603"}'.encode('utf8'))
gz.close()

View File

@ -12,17 +12,19 @@
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import functools
import io
import importlib
import os
import sys
from requests import RequestException
from requests.structures import CaseInsensitiveDict
from time import sleep
import unittest
from requests import RequestException
from requests.structures import CaseInsensitiveDict
import mock
import six
import os
from six.moves import reload_module
from six.moves.urllib.parse import urlparse, ParseResult
from urllib.parse import urlparse, ParseResult
from swiftclient import client as c
from swiftclient import shell as s
from swiftclient.utils import EMPTY_ETAG
@ -406,7 +408,7 @@ class MockHttpTest(unittest.TestCase):
# un-hygienic mocking on the swiftclient.client module; which may lead
# to some unfortunate test order dependency bugs by way of the broken
# window theory if any other modules are similarly patched
reload_module(c)
importlib.reload(c)
class CaptureStreamPrinter(object):
@ -421,24 +423,20 @@ class CaptureStreamPrinter(object):
# No encoding, just convert the raw bytes into a str for testing
# The below call also validates that we have a byte string.
self._captured_stream.write(
data if isinstance(data, six.binary_type) else data.encode('utf8'))
data if isinstance(data, bytes) else data.encode('utf8'))
class CaptureStream(object):
def __init__(self, stream):
self.stream = stream
self._buffer = six.BytesIO()
self._buffer = io.BytesIO()
self._capture = CaptureStreamPrinter(self._buffer)
self.streams = [self._capture]
@property
def buffer(self):
if six.PY3:
return self._buffer
else:
raise AttributeError(
'Output stream has no attribute "buffer" in Python2')
return self._buffer
def flush(self):
pass