diff --git a/fuel_agent/errors.py b/fuel_agent/errors.py index b531ba6..aec2182 100644 --- a/fuel_agent/errors.py +++ b/fuel_agent/errors.py @@ -134,3 +134,11 @@ class GrubUtilsError(BaseError): class FsUtilsError(BaseError): pass + + +class HttpUrlConnectionError(BaseError): + pass + + +class HttpUrlInvalidContentLength(BaseError): + pass diff --git a/fuel_agent/tests/test_artifact_utils.py b/fuel_agent/tests/test_artifact_utils.py index ee3a236..29af053 100644 --- a/fuel_agent/tests/test_artifact_utils.py +++ b/fuel_agent/tests/test_artifact_utils.py @@ -14,10 +14,16 @@ import mock from oslotest import base as test_base -import requests import zlib +from oslo.config import cfg + +from fuel_agent import errors from fuel_agent.utils import artifact_utils as au +from fuel_agent.utils import utils + + +CONF = cfg.CONF class TestTarget(test_base.BaseTestCase): @@ -57,15 +63,29 @@ class TestLocalFile(test_base.BaseTestCase): class TestHttpUrl(test_base.BaseTestCase): - @mock.patch.object(requests, 'get') - def test_httpurl_iter(self, mock_r_get): + @mock.patch.object(utils, 'init_http_request') + def test_httpurl_init_ok(self, mock_req): + mock_req.return_value = mock.Mock(headers={'content-length': 123}) + httpurl = au.HttpUrl('fake_url') + self.assertEqual(123, httpurl.length) + mock_req.assert_called_once_with('fake_url') + + @mock.patch.object(utils, 'init_http_request') + def test_httpurl_init_invalid_content_length(self, mock_req): + mock_req.return_value = mock.Mock(headers={'content-length': + 'invalid'}) + self.assertRaises(errors.HttpUrlInvalidContentLength, au.HttpUrl, + 'fake_url') + + @mock.patch.object(utils, 'init_http_request') + def test_httpurl_next_ok(self, mock_req): content = ['fake content #1', 'fake content #2'] - mock_r_get.return_value.iter_content.return_value = content - mock_r_get.return_value.status_code = 200 + req_mock = mock.Mock(headers={'content-length': 30}) + req_mock.raw.read.side_effect = content + mock_req.return_value = req_mock httpurl = au.HttpUrl('fake_url') for data in enumerate(httpurl): self.assertEqual(content[data[0]], data[1]) - self.assertEqual('fake_url', httpurl.url) class TestGunzipStream(test_base.BaseTestCase): diff --git a/fuel_agent/tests/test_utils.py b/fuel_agent/tests/test_utils.py index f4dc208..82dcf46 100644 --- a/fuel_agent/tests/test_utils.py +++ b/fuel_agent/tests/test_utils.py @@ -16,12 +16,18 @@ import testtools import mock +from oslo.config import cfg +import requests import stevedore +import urllib3 from fuel_agent import errors from fuel_agent.utils import utils +CONF = cfg.CONF + + class ExecuteTestCase(testtools.TestCase): """This class is partly based on the same class in openstack/ironic.""" @@ -79,3 +85,37 @@ class ExecuteTestCase(testtools.TestCase): utils.render_and_save('fake_dir', 'fake_tmpl_name', 'fake_data', 'fake_file_name') mock_open.assert_called_once_with('fake_file_name', 'w') + + @mock.patch.object(requests, 'get') + def test_init_http_request_ok(self, mock_req): + utils.init_http_request('fake_url') + mock_req.assert_called_once_with( + 'fake_url', stream=True, timeout=CONF.http_request_timeout, + headers={'Range': 'bytes=0-'}) + + @mock.patch('time.sleep') + @mock.patch.object(requests, 'get') + def test_init_http_request_non_critical_errors(self, mock_req, mock_s): + mock_ok = mock.Mock() + mock_req.side_effect = [urllib3.exceptions.DecodeError(), + urllib3.exceptions.ProxyError(), + requests.exceptions.ConnectionError(), + requests.exceptions.Timeout(), + requests.exceptions.TooManyRedirects(), + mock_ok] + req_obj = utils.init_http_request('fake_url') + self.assertEqual(mock_ok, req_obj) + + @mock.patch.object(requests, 'get') + def test_init_http_request_wrong_http_status(self, mock_req): + mock_fail = mock.Mock() + mock_fail.raise_for_status.side_effect = KeyError() + mock_req.return_value = mock_fail + self.assertRaises(KeyError, utils.init_http_request, 'fake_url') + + @mock.patch('time.sleep') + @mock.patch.object(requests, 'get') + def test_init_http_request_max_retries_exceeded(self, mock_req, mock_s): + mock_req.side_effect = requests.exceptions.ConnectionError() + self.assertRaises(errors.HttpUrlConnectionError, + utils.init_http_request, 'fake_url') diff --git a/fuel_agent/utils/artifact_utils.py b/fuel_agent/utils/artifact_utils.py index f3e539f..122bcde 100644 --- a/fuel_agent/utils/artifact_utils.py +++ b/fuel_agent/utils/artifact_utils.py @@ -13,15 +13,29 @@ # limitations under the License. import abc -import requests import tarfile import tempfile import zlib +from oslo.config import cfg + +from fuel_agent import errors from fuel_agent.openstack.common import log as logging +from fuel_agent.utils import utils LOG = logging.getLogger(__name__) +au_opts = [ + cfg.IntOpt( + 'data_chunk_size', + default=1048576, + help='Size of data chunk to operate with images' + ), +] + +CONF = cfg.CONF +CONF.register_opts(au_opts) + class Target(object): __metaclass__ = abc.ABCMeta @@ -56,7 +70,7 @@ class LocalFile(Target): def next(self): if not self.fileobj: self.fileobj = open(self.filename, 'rb') - buffer = self.fileobj.read(1048576) + buffer = self.fileobj.read(CONF.data_chunk_size) if buffer: return buffer else: @@ -67,12 +81,34 @@ class LocalFile(Target): class HttpUrl(Target): def __init__(self, url): self.url = str(url) + self.response_obj = utils.init_http_request(self.url) + self.processed_bytes = 0 + try: + self.length = int(self.response_obj.headers['content-length']) + except (ValueError, KeyError): + raise errors.HttpUrlInvalidContentLength( + 'Can not get content length for %s' % self.url) + else: + LOG.debug('Expected content length %s for %s' % (self.length, + self.url)) - def __iter__(self): - response = requests.get(self.url, stream=True) - if response.status_code != 200: - raise Exception('Can not get %s' % self.url) - return iter(response.iter_content(1048576)) + def next(self): + while self.processed_bytes < self.length: + try: + data = self.response_obj.raw.read(CONF.data_chunk_size) + if not data: + raise errors.HttpUrlConnectionError( + 'Could not receive data: URL=%s, range=%s' % + (self.url, self.processed_bytes)) + except Exception as exc: + LOG.exception(exc) + self.response_obj = utils.init_http_request( + self.url, self.processed_bytes) + continue + else: + self.processed_bytes += len(data) + return data + raise StopIteration() class GunzipStream(Target): @@ -133,7 +169,7 @@ class ForwardFileStream(Target): self.chunk = None self.position = position - def read(self, length=1048576): + def read(self, length=CONF.data_chunk_size): # NOTE(kozhukalov): default lenght = 1048576 is not usual behaviour, # but that is ok for our use case. if self.closed: diff --git a/fuel_agent/utils/utils.py b/fuel_agent/utils/utils.py index 3d03f4e..6258bb1 100644 --- a/fuel_agent/utils/utils.py +++ b/fuel_agent/utils/utils.py @@ -18,9 +18,13 @@ import os import re import shlex import subprocess +import time import jinja2 +from oslo.config import cfg +import requests import stevedore.driver +import urllib3 from fuel_agent import errors from fuel_agent.openstack.common import log as logging @@ -28,6 +32,27 @@ from fuel_agent.openstack.common import log as logging LOG = logging.getLogger(__name__) +u_opts = [ + cfg.IntOpt( + 'http_max_retries', + default=30, + help='Maximum retries count for http requests. 0 means infinite', + ), + cfg.FloatOpt( + 'http_request_timeout', + default=1.0, + help='Http request timeout in seconds', + ), + cfg.FloatOpt( + 'http_retry_delay', + default=2.0, + help='Delay in seconds before the next http request retry', + ), +] + +CONF = cfg.CONF +CONF.register_opts(u_opts) + #NOTE(agordeev): signature compatible with execute from oslo def execute(*cmd, **kwargs): @@ -116,3 +141,34 @@ def render_and_save(tmpl_dir, tmpl_names, tmpl_data, file_name): raise errors.TemplateWriteError( 'Something goes wrong while trying to save' 'templated data to {0}'.format(file_name)) + + +def init_http_request(url, byte_range=0): + LOG.debug('Trying to initialize http request object %s, byte range: %s' + % (url, byte_range)) + retry = 0 + while True: + if (CONF.http_max_retries == 0) or retry <= CONF.http_max_retries: + try: + response_obj = requests.get( + url, stream=True, + timeout=CONF.http_request_timeout, + headers={'Range': 'bytes=%s-' % byte_range}) + except (urllib3.exceptions.DecodeError, + urllib3.exceptions.ProxyError, + requests.exceptions.ConnectionError, + requests.exceptions.Timeout, + requests.exceptions.TooManyRedirects) as e: + LOG.debug('Got non-critical error when accessing to %s ' + 'on %s attempt: %s' % (url, retry + 1, e)) + else: + LOG.debug('Successful http request to %s on %s retry' % + (url, retry + 1)) + break + retry += 1 + time.sleep(CONF.http_retry_delay) + else: + raise errors.HttpUrlConnectionError( + 'Exceeded maximum http request retries for %s' % url) + response_obj.raise_for_status() + return response_obj diff --git a/requirements.txt b/requirements.txt index 8380c0f..9e92ee7 100644 --- a/requirements.txt +++ b/requirements.txt @@ -9,3 +9,4 @@ pbr>=0.7.0 Jinja2 stevedore>=0.15 requests>=1.2.3 +urllib3>=1.7