diff --git a/packetary/tests/test_packetary.py b/packetary/library/__init__.py similarity index 75% rename from packetary/tests/test_packetary.py rename to packetary/library/__init__.py index 7970569..60d4f22 100644 --- a/packetary/tests/test_packetary.py +++ b/packetary/library/__init__.py @@ -15,17 +15,6 @@ # under the License. -""" -test_packetary ----------------------------------- +import eventlet -Tests for `packetary` module. -""" - -from packetary.tests import base - - -class TestPacketary(base.TestCase): - - def test_something(self): - pass +eventlet.monkey_patch() diff --git a/packetary/library/checksum.py b/packetary/library/checksum.py new file mode 100644 index 0000000..7243389 --- /dev/null +++ b/packetary/library/checksum.py @@ -0,0 +1,82 @@ +# -*- coding: utf-8 -*- + +# Copyright 2015 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import functools +import hashlib + + +class _HashComposite(object): + """Combines several hash methods.""" + + def __init__(self, hash_objects): + self.hash_objects = hash_objects + + def update(self, data): + """Updates the hash objects with the string arg. + + For more details see doc of hashlib.update. + """ + for o in self.hash_objects: + o.update(data) + + def hexdigest(self): + """Returns the list of appropriate hexdigests of hash_objects. + + For more details see doc of hashlib.hexdigest. + """ + return [o.hexdigest() for o in self.hash_objects] + + +def _new_composite(methods): + """Creates new composite method.""" + + def wrapper(): + return _HashComposite([x() for x in methods]) + return wrapper + + +def _checksum(method): + """Makes function to calculate checksum for stream.""" + @functools.wraps(method) + def calculate(stream, chunksize=16 * 1024): + """Calculates checksum for binary stream. + + :param stream: file-like object opened in binary mode. + :return: the checksum of content in terms of method. + """ + + s = method() + while True: + chunk = stream.read(chunksize) + if not chunk: + break + s.update(chunk) + return s.hexdigest() + return calculate + + +md5 = _checksum(hashlib.md5) + +sha1 = _checksum(hashlib.sha1) + +sha256 = _checksum(hashlib.sha256) + + +def composite(*methods): + """Calculate several checksum at one time.""" + return _checksum(_new_composite( + [getattr(hashlib, x) for x in methods] + )) diff --git a/packetary/library/connections.py b/packetary/library/connections.py new file mode 100644 index 0000000..6d42fd4 --- /dev/null +++ b/packetary/library/connections.py @@ -0,0 +1,280 @@ +# -*- coding: utf-8 -*- + +# Copyright 2015 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging +import os +import six +import six.moves.http_client as http_client +import six.moves.urllib.request as urllib_request +import six.moves.urllib_error as urllib_error +import time + +from packetary.library.streams import StreamWrapper + + +logger = logging.getLogger(__package__) + + +RETRYABLE_ERRORS = (http_client.HTTPException, IOError) + + +class RangeError(urllib_error.URLError): + pass + + +class RetryableRequest(urllib_request.Request): + offset = 0 + retries_left = 1 + start_time = 0 + + +class ResumableResponse(StreamWrapper): + """The http-response wrapper to add resume ability. + + Allows to resume read from same position if connection is lost. + """ + + def __init__(self, request, response, opener): + """Initialises. + + :param request: the original http request + :param response: the original http response + :param opener: the instance of urllib.OpenerDirector + """ + super(ResumableResponse, self).__init__(response) + self.request = request + self.opener = opener + + def read_chunk(self, chunksize): + """Overrides super class method.""" + while 1: + try: + chunk = self.stream.read(chunksize) + self.request.offset += len(chunk) + return chunk + except RETRYABLE_ERRORS as e: + response = self.opener.error( + self.request.get_type(), self.request, + self.stream, 502, six.text_type(e), self.stream.info() + ) + self.stream = response.stream + + +class RetryHandler(urllib_request.BaseHandler): + """urllib Handler to add ability for retrying on server errors.""" + + @staticmethod + def http_request(request): + """Initialises http request.""" + logger.debug("start request: %s", request.get_full_url()) + if request.offset > 0: + request.add_header('Range', 'bytes=%d-' % request.offset) + request.start_time = time.time() + return request + + def http_response(self, request, response): + """Wraps response in a ResumableResponse. + + Checks that partial request completed successfully. + """ + # the server should response partial content if range is specified + logger.debug( + "finish request: %s - %d (%s), duration - %d ms.", + request.get_full_url(), response.getcode(), response.msg, + int((time.time() - request.start_time) * 1000) + ) + if request.offset > 0 and response.getcode() != 206: + raise RangeError("Server does not support ranges.") + return ResumableResponse(request, response, self.parent) + + def http_error(self, req, fp, code, msg, hdrs): + """Checks error code and retries request if it is allowed.""" + if code >= 500 and req.retries_left > 0: + req.retries_left -= 1 + logger.warning( + "fail request: %s - %d(%s), retries left - %d.", + req.get_full_url(), code, msg, req.retries_left + ) + return self.parent.open(req) + + https_request = http_request + https_response = http_response + + +class Connection(object): + """Helper class to deal with streams.""" + + def __init__(self, opener, retries_num): + """Initializes. + + :param opener: the instance of urllib.OpenerDirector + :param retries_num: the number of allowed retries + """ + self.opener = opener + self.retries_num = retries_num + + def make_request(self, url, offset=0): + """Makes new http request. + + :param url: the remote file`s url + :param offset: the number of bytes from begin, that will be skipped + :return: The new http request + """ + + if url.startswith("/"): + url = "file://" + url + + request = RetryableRequest(url) + request.retries_left = self.retries_num + request.offset = offset + return request + + def open_stream(self, url, offset=0): + """Opens remote file for streaming. + + :param url: the remote file`s url + :param offset: the number of bytes from begin, that will be skipped + """ + + request = self.make_request(url, offset) + while 1: + try: + return self.opener.open(request) + except (RangeError, urllib_error.HTTPError): + raise + except RETRYABLE_ERRORS as e: + if request.retries_left <= 0: + raise + request.retries_left -= 1 + logger.exception( + "Failed to open url - %s: %s. retries left - %d.", + url, six.text_type(e), request.retries_left + ) + + def retrieve(self, url, filename, offset=0): + """Downloads remote file. + + :param url: the remote file`s url + :param filename: the file`s name, that includes path on local fs + :param offset: the number of bytes from begin, that will be skipped + """ + + self._ensure_dir_exists(filename) + fd = os.open(filename, os.O_CREAT | os.O_WRONLY) + try: + self._copy_stream(fd, url, offset) + except RangeError: + if offset == 0: + raise + logger.warning( + "Failed to resume download, starts from begin: %s", url + ) + self._copy_stream(fd, url, 0) + finally: + os.fsync(fd) + os.close(fd) + + @staticmethod + def _ensure_dir_exists(dst): + """Checks that directory exists and creates otherwise.""" + target_dir = os.path.dirname(dst) + try: + os.makedirs(target_dir) + except OSError as e: + if e.errno != 17: + raise + + def _copy_stream(self, fd, url, offset): + """Copies remote file to local. + + :param fd: the file`s descriptor + :param url: the remote file`s url + :param offset: the number of bytes from begin, that will be skipped + """ + + source = self.open_stream(url, offset) + os.ftruncate(fd, offset) + os.lseek(fd, offset, os.SEEK_SET) + chunk_size = 16 * 1024 + while 1: + chunk = source.read(chunk_size) + if not chunk: + break + os.write(fd, chunk) + + +class ConnectionContext(object): + """Helper class acquire and release connection within context.""" + def __init__(self, connection, on_exit): + self.connection = connection + self.on_exit = on_exit + + def __enter__(self): + return self.connection + + def __exit__(self, *_): + self.on_exit(self.connection) + + +class ConnectionsPool(object): + """Controls the number of simultaneously opened connections.""" + + MIN_CONNECTIONS_COUNT = 1 + + def __init__(self, count=0, proxy=None, secure_proxy=None, retries_num=0): + """Initialises. + + :param count: the number of allowed simultaneously connections + :param proxy: the url of proxy for http-connections + :param secure_proxy: the url of proxy for https-connections + :param retries_num: the number of allowed retries + """ + if proxy: + proxies = { + "http": proxy, + "https": secure_proxy or proxy, + } + else: + proxies = None + + opener = urllib_request.build_opener( + RetryHandler(), + urllib_request.ProxyHandler(proxies) + ) + + limit = max(count, self.MIN_CONNECTIONS_COUNT) + connections = six.moves.queue.Queue() + while limit > 0: + connections.put(Connection(opener, retries_num)) + limit -= 1 + + self.free = connections + + def get(self, timeout=None): + """Gets the free connection. + + Blocks in case if there is no free connections. + + :param timeout: the timeout in seconds to wait. + by default infinity waiting. + """ + return ConnectionContext( + self.free.get(timeout=timeout), self._release + ) + + def _release(self, connection): + """Puts back connection to free connections.""" + self.free.put(connection) diff --git a/packetary/library/executor.py b/packetary/library/executor.py new file mode 100644 index 0000000..7e708d1 --- /dev/null +++ b/packetary/library/executor.py @@ -0,0 +1,82 @@ +# -*- coding: utf-8 -*- + +# Copyright 2015 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from __future__ import with_statement + +import logging +import six + +from eventlet.greenpool import GreenPool + + +logger = logging.getLogger(__package__) + + +class AsynchronousSection(object): + """Allows calling function asynchronously with waiting on exit.""" + + MIN_POOL_SIZE = 1 + + def __init__(self, size=0, ignore_errors_num=0): + """Initialises. + + :param size: the max number of parallel tasks + :param ignore_errors_num: + number of errors which does not stop the execution + """ + + self.executor = GreenPool(max(size, self.MIN_POOL_SIZE)) + self.ignore_errors_num = ignore_errors_num + self.errors = 0 + self.tasks = set() + + def __enter__(self): + self.errors = 0 + return self + + def __exit__(self, etype, *_): + self.wait(etype is not None) + + def execute(self, func, *args, **kwargs): + """Calls function asynchronously.""" + if 0 <= self.ignore_errors_num < self.errors: + raise RuntimeError("Too many errors.") + + gt = self.executor.spawn(func, *args, **kwargs) + self.tasks.add(gt) + gt.link(self.on_complete) + + def on_complete(self, gt): + """Callback to handle task completion.""" + + try: + gt.wait() + except Exception as e: + self.errors += 1 + logger.exception("Task failed: %s", six.text_type(e)) + finally: + self.tasks.discard(gt) + + def wait(self, ignore_errors=False): + """Waits until all tasks will be completed. + + Do not use directly, will be called from context manager. + """ + self.executor.waitall() + if not ignore_errors and self.errors > 0: + raise RuntimeError( + "Operations completed with errors. See log for more details." + ) diff --git a/packetary/library/streams.py b/packetary/library/streams.py new file mode 100644 index 0000000..558a27b --- /dev/null +++ b/packetary/library/streams.py @@ -0,0 +1,125 @@ +# -*- coding: utf-8 -*- + +# Copyright 2015 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import zlib + + +class StreamWrapper(object): + """Helper class to implement stream wrappers. + + It is base-class for Streamers, + that provides functionality to transform stream on the fly. + The wrapped stream may return data more that required, + the extra read data will be kept in the internal buffer till + next read. + """ + + CHUNK_SIZE = 1024 + + def __init__(self, stream): + """Initializes. + + :param stream: file-like object opened in binary mode. + """ + self.stream = stream + self.unread_tail = b"" + + def __getattr__(self, item): + return getattr(self.stream, item) + + def _read_tail(self): + tmp = self.unread_tail + self.unread_tail = b"" + return tmp + + def _align_chunk(self, chunk, size): + self.unread_tail = chunk[size:] + return chunk[:size] + + def read_chunk(self, chunksize): + """Overrides this method to change default behaviour.""" + return self.stream.read(chunksize) + + def read(self, size=-1): + result = self._read_tail() + if size < 0: + while True: + chunk = self.read_chunk(self.CHUNK_SIZE) + if not chunk: + break + result += chunk + else: + if len(result) > size: + result = self._align_chunk(result, size) + size -= len(result) + while size > 0: + chunk = self.read_chunk(max(self.CHUNK_SIZE, size)) + if not chunk: + break + if len(chunk) > size: + chunk = self._align_chunk(chunk, size) + size -= len(chunk) + result += chunk + return result + + def readline(self): + pos = self.unread_tail.find(b"\n") + if pos >= 0: + line = self._align_chunk(self.unread_tail, pos + 1) + else: + line = self._read_tail() + while True: + chunk = self.read_chunk(self.CHUNK_SIZE) + if not chunk: + break + pos = chunk.find(b"\n") + if pos >= 0: + line += self._align_chunk(chunk, pos + 1) + break + line += chunk + return line + + def readlines(self): + while True: + line = self.readline() + if not line: + break + yield line + + def __iter__(self): + return self.readlines() + + +class GzipDecompress(StreamWrapper): + """The decompress stream.""" + + def __init__(self, stream): + super(GzipDecompress, self).__init__(stream) + # Magic parameter makes zlib module understand gzip header + # http://stackoverflow.com/questions/1838699/how-can-i-decompress-a-gzip-stream-with-zlib + # This works on cpython and pypy, but not jython. + self.decompress = zlib.decompressobj(16 + zlib.MAX_WBITS) + + def read_chunk(self, chunksize): + if self.decompress.unconsumed_tail: + return self.decompress.decompress( + self.decompress.unconsumed_tail, chunksize + ) + + chunk = self.stream.read(chunksize) + if not chunk: + return self.decompress.flush() + return self.decompress.decompress(chunk, chunksize) diff --git a/packetary/tests/test_checksums.py b/packetary/tests/test_checksums.py new file mode 100644 index 0000000..8060e00 --- /dev/null +++ b/packetary/tests/test_checksums.py @@ -0,0 +1,50 @@ +# -*- coding: utf-8 -*- + +# Copyright 2015 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import six + +from packetary.library import checksum +from packetary.tests import base + + +class TestChecksum(base.TestCase): + def test_checksum(self): + stream = six.BytesIO(b"line1\nline2\nline3\n") + checksums = { + checksum.md5: "cc3d5ed5fda53dfa81ea6aa951d7e1fe", + checksum.sha1: "8c84f6f36dd2230d3e9c954fa436e5fda90b1957", + checksum.sha256: "66663af9c7aa341431a8ee2ff27b72" + "abd06c9218f517bb6fef948e4803c19e03" + } + for chunksize in (8, 256): + for algo, expected in six.iteritems(checksums): + stream.seek(0) + self.assertEqual( + expected, algo(stream, chunksize) + ) + + def test_composite(self): + stream = six.BytesIO(b"line1\nline2\nline3\n") + result = checksum.composite('md5', 'sha1', 'sha256')(stream) + self.assertEqual( + [ + "cc3d5ed5fda53dfa81ea6aa951d7e1fe", + "8c84f6f36dd2230d3e9c954fa436e5fda90b1957", + "66663af9c7aa341431a8ee2ff27b72" + "abd06c9218f517bb6fef948e4803c19e03" + ], + result + ) diff --git a/packetary/tests/test_connections.py b/packetary/tests/test_connections.py new file mode 100644 index 0000000..33d4ef2 --- /dev/null +++ b/packetary/tests/test_connections.py @@ -0,0 +1,249 @@ +# -*- coding: utf-8 -*- + +# Copyright 2015 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import mock +import six +import time + +from packetary.library import connections +from packetary.tests import base + + +class TestConnectionsPool(base.TestCase): + def test_get_connection(self): + pool = connections.ConnectionsPool(count=2) + self.assertEqual(2, pool.free.qsize()) + with pool.get(): + self.assertEqual(1, pool.free.qsize()) + self.assertEqual(2, pool.free.qsize()) + + def _check_proxies(self, pool, http_proxy, https_proxy): + with pool.get() as c: + for h in c.opener.handlers: + if isinstance(h, connections.urllib_request.ProxyHandler): + self.assertEqual( + (http_proxy, https_proxy), + (h.proxies["http"], h.proxies["https"]) + ) + break + else: + self.fail("ProxyHandler should be in list of handlers.") + + def test_set_proxy(self): + pool = connections.ConnectionsPool(count=1, proxy="http://localhost") + self._check_proxies(pool, "http://localhost", "http://localhost") + pool = connections.ConnectionsPool( + proxy="http://localhost", secure_proxy="https://localhost") + self._check_proxies(pool, "http://localhost", "https://localhost") + + def test_reliability(self): + pool = connections.ConnectionsPool(count=0, retries_num=2) + self.assertEqual(1, pool.free.qsize()) + with pool.get() as c: + self.assertEqual(2, c.retries_num) + for h in c.opener.handlers: + if isinstance(h, connections.RetryHandler): + break + else: + self.fail("RetryHandler should be in list of handlers.") + + +class TestConnection(base.TestCase): + def setUp(self): + super(TestConnection, self).setUp() + self.connection = connections.Connection(mock.MagicMock(), 2) + + def test_make_request(self): + request = self.connection.make_request("/test/file", 0) + self.assertIsInstance(request, connections.RetryableRequest) + self.assertEqual("file:///test/file", request.get_full_url()) + self.assertEqual(0, request.offset) + self.assertEqual(2, request.retries_left) + request2 = self.connection.make_request("http://server/path", 100) + self.assertEqual("http://server/path", request2.get_full_url()) + self.assertEqual(100, request2.offset) + + def test_open_stream(self): + self.connection.open_stream("/test/file") + self.assertEqual(1, self.connection.opener.open.call_count) + args = self.connection.opener.open.call_args[0] + self.assertIsInstance(args[0], connections.RetryableRequest) + self.assertEqual(2, args[0].retries_left) + + @mock.patch("packetary.library.connections.logger") + def test_retries_on_io_error(self, logger): + self.connection.opener.open.side_effect = [ + IOError("I/O error"), + mock.MagicMock() + ] + self.connection.open_stream("/test/file") + self.assertEqual(2, self.connection.opener.open.call_count) + logger.exception.assert_called_with( + "Failed to open url - %s: %s. retries left - %d.", + "/test/file", "I/O error", 1 + ) + + self.connection.opener.open.side_effect = IOError("I/O error") + with self.assertRaises(IOError): + self.connection.open_stream("/test/file") + logger.exception.assert_called_with( + "Failed to open url - %s: %s. retries left - %d.", + "/test/file", "I/O error", 0 + ) + + def test_raise_other_errors(self): + self.connection.opener.open.side_effect = \ + connections.urllib_error.HTTPError("", 500, "", {}, None) + + with self.assertRaises(connections.urllib_error.URLError): + self.connection.open_stream("/test/file") + + self.assertEqual(1, self.connection.opener.open.call_count) + + @mock.patch("packetary.library.connections.os") + def test_retrieve_from_offset(self, os): + os.path.mkdirs.side_effect = OSError(17, "") + os.open.return_value = 1 + response = mock.MagicMock() + self.connection.opener.open.return_value = response + response.read.side_effect = [b"test", b""] + self.connection.retrieve("/file/src", "/file/dst", 10) + os.lseek.assert_called_once_with(1, 10, os.SEEK_SET) + os.ftruncate.assert_called_once_with(1, 10) + self.assertEqual(1, os.write.call_count) + os.fsync.assert_called_once_with(1) + os.close.assert_called_once_with(1) + + @mock.patch.multiple( + "packetary.library.connections", + logger=mock.DEFAULT, + os=mock.DEFAULT + ) + def test_retrieve_from_offset_fail(self, os, logger): + os.path.mkdirs.side_effect = OSError(17, "") + os.open.return_value = 1 + response = mock.MagicMock() + self.connection.opener.open.side_effect = [ + connections.RangeError("error"), response + ] + response.read.side_effect = [b"test", b""] + self.connection.retrieve("/file/src", "/file/dst", 10) + logger.warning.assert_called_once_with( + "Failed to resume download, starts from begin: %s", + "/file/src" + ) + os.lseek.assert_called_once_with(1, 0, os.SEEK_SET) + os.ftruncate.assert_called_once_with(1, 0) + self.assertEqual(1, os.write.call_count) + os.fsync.assert_called_once_with(1) + os.close.assert_called_once_with(1) + + +@mock.patch("packetary.library.connections.logger") +class TestRetryHandler(base.TestCase): + def setUp(self): + super(TestRetryHandler, self).setUp() + self.handler = connections.RetryHandler() + self.handler.add_parent(mock.MagicMock()) + + def test_start_request(self, logger): + request = mock.MagicMock() + request.offset = 0 + request.get_full_url.return_value = "/file/test" + request = self.handler.http_request(request) + request.start_time <= time.time() + logger.debug.assert_called_with("start request: %s", "/file/test") + request.offset = 1 + request = self.handler.http_request(request) + request.add_header.assert_called_once_with('Range', 'bytes=1-') + + def test_handle_response(self, logger): + request = mock.MagicMock() + request.offset = 0 + request.start_time.__rsub__.return_value = 0.01 + request.get_full_url.return_value = "/file/test" + response = mock.MagicMock() + response.getcode.return_value = 200 + response.msg = "test" + r = self.handler.http_response(request, response) + self.assertIsInstance(r, connections.ResumableResponse) + logger.debug.assert_called_with( + "finish request: %s - %d (%s), duration - %d ms.", + "/file/test", 200, "test", 10 + ) + + def test_handle_partial_response(self, _): + request = mock.MagicMock() + request.offset = 1 + request.get_full_url.return_value = "/file/test" + response = mock.MagicMock() + response.getcode.return_value = 200 + response.msg = "test" + with self.assertRaises(connections.RangeError): + self.handler.http_response(request, response) + response.getcode.return_value = 206 + self.handler.http_response(request, response) + + def test_error(self, logger): + request = mock.MagicMock() + request.get_full_url.return_value = "/test" + request.retries_left = 1 + self.handler.http_error( + request, mock.MagicMock(), 500, "error", mock.MagicMock() + ) + logger.warning.assert_called_with( + "fail request: %s - %d(%s), retries left - %d.", + "/test", 500, "error", 0 + ) + self.handler.http_error( + request, mock.MagicMock(), 500, "error", mock.MagicMock() + ) + self.handler.parent.open.assert_called_once_with(request) + + +class TestResumeableResponse(base.TestCase): + def setUp(self): + super(TestResumeableResponse, self).setUp() + self.request = mock.MagicMock() + self.opener = mock.MagicMock() + self.stream = mock.MagicMock() + + def test_resume_read(self): + self.request.offset = 0 + response = connections.ResumableResponse( + self.request, + self.stream, + self.opener + ) + self.stream.read.side_effect = [ + b"chunk1", IOError(), b"chunk2", b"" + ] + self.opener.error.return_value = response + data = response.read() + self.assertEqual(b"chunk1chunk2", data) + self.assertEqual(12, self.request.offset) + self.assertEqual(1, self.opener.error.call_count) + + def test_read(self): + self.request.offset = 0 + response = connections.ResumableResponse( + self.request, + six.BytesIO(b"line1\nline2\nline3\n"), + self.opener + ) + self.assertEqual( + b"line1\nline2\nline3\n", response.read() + ) diff --git a/packetary/tests/test_executor.py b/packetary/tests/test_executor.py new file mode 100644 index 0000000..59d036e --- /dev/null +++ b/packetary/tests/test_executor.py @@ -0,0 +1,64 @@ +# -*- coding: utf-8 -*- + +# Copyright 2015 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import mock +import threading +import time + +from packetary.library import executor +from packetary.tests import base + + +def _raise_value_error(*_): + raise ValueError("error") + + +@mock.patch("packetary.library.executor.logger") +class TestAsynchronousSection(base.TestCase): + def setUp(self): + super(TestAsynchronousSection, self).setUp() + self.results = [] + + def test_isolation(self, _): + section1 = executor.AsynchronousSection() + section2 = executor.AsynchronousSection() + event = threading.Event() + section1.execute(event.wait) + section2.execute(time.sleep, 0) + section2.wait() + event.set() + section1.wait() + + def test_ignore_errors(self, logger): + section = executor.AsynchronousSection(ignore_errors_num=1) + section.execute(_raise_value_error) + section.execute(time.sleep, 0) + section.wait(ignore_errors=True) + self.assertEqual(1, section.errors) + logger.exception.assert_called_with( + "Task failed: %s", "error" + ) + + def test_fail_if_too_many_errors(self, _): + section = executor.AsynchronousSection(ignore_errors_num=0) + section.execute(_raise_value_error) + section.wait(ignore_errors=True) + with self.assertRaisesRegexp(RuntimeError, "Too many errors"): + section.execute(time.sleep, 0) + + with self.assertRaisesRegexp( + RuntimeError, "Operations completed with errors"): + section.wait(ignore_errors=False) diff --git a/packetary/tests/test_streams.py b/packetary/tests/test_streams.py new file mode 100644 index 0000000..77fc956 --- /dev/null +++ b/packetary/tests/test_streams.py @@ -0,0 +1,93 @@ +# -*- coding: utf-8 -*- + +# Copyright 2015 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import gzip +import six + +from packetary.library import streams +from packetary.tests import base + + +class TestBufferedStream(base.TestCase): + def setUp(self): + super(TestBufferedStream, self).setUp() + self.stream = streams.StreamWrapper( + six.BytesIO(b"line1\nline2\nline3\n") + ) + + def test_read(self): + self.stream.CHUNK_SIZE = 10 + chunk = self.stream.read(5) + self.assertEqual(b"line1", chunk) + self.assertEqual(b"\nline", self.stream.unread_tail) + chunk = self.stream.read(1024) + self.assertEqual(b"\nline2\nline3\n", chunk) + self.assertEqual(b"", self.stream.unread_tail) + + def test_readline(self): + self.stream.CHUNK_SIZE = 12 + chunk = self.stream.readline() + self.assertEqual(b"line1\n", chunk) + self.assertEqual(b"line2\n", self.stream.unread_tail) + lines = list(self.stream.readlines()) + self.assertEqual([b"line2\n", b"line3\n"], lines) + self.assertEqual(b"", self.stream.unread_tail) + + def test_readlines(self): + self.stream.CHUNK_SIZE = 12 + lines = list(self.stream.readlines()) + self.assertEqual( + [b"line1\n", b"line2\n", b"line3\n"], + lines) + + +class TestGzipDecompress(base.TestCase): + @classmethod + def setUpClass(cls): + cls.gzipped = six.BytesIO() + gz = gzip.GzipFile(fileobj=cls.gzipped, mode="w") + gz.write(b"line1\nline2\nline3\n") + gz.flush() + gz.close() + + def setUp(self): + super(TestGzipDecompress, self).setUp() + self.gzipped.seek(0) + self.stream = streams.GzipDecompress(self.gzipped) + + def test_read(self): + chunk = self.stream.read(5) + self.assertEqual(b"line1", chunk) + self.assertEqual(b"\nline2\nline3\n", self.stream.unread_tail) + chunk = self.stream.read(1024) + self.assertEqual(b"\nline2\nline3\n", chunk) + self.assertEqual(b"", self.stream.unread_tail) + + def test_readline(self): + self.stream.CHUNK_SIZE = 12 + chunk = self.stream.readline() + self.assertEqual(b"line1\n", chunk) + self.assertEqual(b"line2\nl", self.stream.unread_tail) + lines = list(self.stream.readlines()) + self.assertEqual([b"line2\n", b"line3\n"], lines) + self.assertEqual(b"", self.stream.unread_tail) + + def test_readlines(self): + self.stream.CHUNK_SIZE = 12 + lines = list(self.stream.readlines()) + self.assertEqual( + [b"line1\n", b"line2\n", b"line3\n"], + lines) diff --git a/requirements.txt b/requirements.txt index 08b0f01..2aee4db 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,5 +2,6 @@ # of appearance. Changing the order has an impact on the overall integration # process, which may cause wedges in the gate later. -pbr>=1.6 Babel>=1.3 +eventlet>=0.17 +pbr>=1.6 diff --git a/tox.ini b/tox.ini index ab6ca39..ebc1142 100644 --- a/tox.ini +++ b/tox.ini @@ -1,6 +1,6 @@ [tox] minversion = 1.6 -envlist = py34,py26,py27,pep8 +envlist = py34,py27,py26,pep8 skipsdist = True [testenv] @@ -9,22 +9,22 @@ install_command = pip install -U {opts} {packages} setenv = VIRTUAL_ENV={envdir} deps = -r{toxinidir}/test-requirements.txt -commands = python setup.py test --slowest --testr-args='{posargs}' +commands = python setup.py test --slowest --testr-args='{posargs:packetary}' [testenv:pep8] -commands = flake8 +commands = flake8 {posargs:packetary} [testenv:venv] -commands = {posargs} +commands = {posargs:packetary} [testenv:cover] -commands = python setup.py test --coverage --testr-args='{posargs}' +commands = python setup.py test --coverage --testr-args='{posargs:packetary}' [testenv:docs] commands = python setup.py build_sphinx [testenv:debug] -commands = oslo_debug_helper {posargs} +commands = oslo_debug_helper {posargs:packetary} [flake8] # E123, E125 skipped as they are invalid PEP-8. @@ -32,4 +32,4 @@ commands = oslo_debug_helper {posargs} show-source = True ignore = E123,E125 builtins = _ -exclude=.venv,.git,.tox,dist,doc,*openstack/common*,*lib/python*,*egg,build,util,.idea +exclude=*egg,*lib/python*,*openstack/common*,.git,.idea,.tox,.venv,build,dist,doc