Merge "[IBP] Add http connection tracking to fuel-agent"
This commit is contained in:
commit
9947cf9c92
|
@ -134,3 +134,11 @@ class GrubUtilsError(BaseError):
|
|||
|
||||
class FsUtilsError(BaseError):
|
||||
pass
|
||||
|
||||
|
||||
class HttpUrlConnectionError(BaseError):
|
||||
pass
|
||||
|
||||
|
||||
class HttpUrlInvalidContentLength(BaseError):
|
||||
pass
|
||||
|
|
|
@ -14,10 +14,16 @@
|
|||
|
||||
import mock
|
||||
from oslotest import base as test_base
|
||||
import requests
|
||||
import zlib
|
||||
|
||||
from oslo.config import cfg
|
||||
|
||||
from fuel_agent import errors
|
||||
from fuel_agent.utils import artifact_utils as au
|
||||
from fuel_agent.utils import utils
|
||||
|
||||
|
||||
CONF = cfg.CONF
|
||||
|
||||
|
||||
class TestTarget(test_base.BaseTestCase):
|
||||
|
@ -57,15 +63,29 @@ class TestLocalFile(test_base.BaseTestCase):
|
|||
|
||||
|
||||
class TestHttpUrl(test_base.BaseTestCase):
|
||||
@mock.patch.object(requests, 'get')
|
||||
def test_httpurl_iter(self, mock_r_get):
|
||||
@mock.patch.object(utils, 'init_http_request')
|
||||
def test_httpurl_init_ok(self, mock_req):
|
||||
mock_req.return_value = mock.Mock(headers={'content-length': 123})
|
||||
httpurl = au.HttpUrl('fake_url')
|
||||
self.assertEqual(123, httpurl.length)
|
||||
mock_req.assert_called_once_with('fake_url')
|
||||
|
||||
@mock.patch.object(utils, 'init_http_request')
|
||||
def test_httpurl_init_invalid_content_length(self, mock_req):
|
||||
mock_req.return_value = mock.Mock(headers={'content-length':
|
||||
'invalid'})
|
||||
self.assertRaises(errors.HttpUrlInvalidContentLength, au.HttpUrl,
|
||||
'fake_url')
|
||||
|
||||
@mock.patch.object(utils, 'init_http_request')
|
||||
def test_httpurl_next_ok(self, mock_req):
|
||||
content = ['fake content #1', 'fake content #2']
|
||||
mock_r_get.return_value.iter_content.return_value = content
|
||||
mock_r_get.return_value.status_code = 200
|
||||
req_mock = mock.Mock(headers={'content-length': 30})
|
||||
req_mock.raw.read.side_effect = content
|
||||
mock_req.return_value = req_mock
|
||||
httpurl = au.HttpUrl('fake_url')
|
||||
for data in enumerate(httpurl):
|
||||
self.assertEqual(content[data[0]], data[1])
|
||||
self.assertEqual('fake_url', httpurl.url)
|
||||
|
||||
|
||||
class TestGunzipStream(test_base.BaseTestCase):
|
||||
|
|
|
@ -16,12 +16,18 @@
|
|||
import testtools
|
||||
|
||||
import mock
|
||||
from oslo.config import cfg
|
||||
import requests
|
||||
import stevedore
|
||||
import urllib3
|
||||
|
||||
from fuel_agent import errors
|
||||
from fuel_agent.utils import utils
|
||||
|
||||
|
||||
CONF = cfg.CONF
|
||||
|
||||
|
||||
class ExecuteTestCase(testtools.TestCase):
|
||||
"""This class is partly based on the same class in openstack/ironic."""
|
||||
|
||||
|
@ -79,3 +85,37 @@ class ExecuteTestCase(testtools.TestCase):
|
|||
utils.render_and_save('fake_dir', 'fake_tmpl_name', 'fake_data',
|
||||
'fake_file_name')
|
||||
mock_open.assert_called_once_with('fake_file_name', 'w')
|
||||
|
||||
@mock.patch.object(requests, 'get')
|
||||
def test_init_http_request_ok(self, mock_req):
|
||||
utils.init_http_request('fake_url')
|
||||
mock_req.assert_called_once_with(
|
||||
'fake_url', stream=True, timeout=CONF.http_request_timeout,
|
||||
headers={'Range': 'bytes=0-'})
|
||||
|
||||
@mock.patch('time.sleep')
|
||||
@mock.patch.object(requests, 'get')
|
||||
def test_init_http_request_non_critical_errors(self, mock_req, mock_s):
|
||||
mock_ok = mock.Mock()
|
||||
mock_req.side_effect = [urllib3.exceptions.DecodeError(),
|
||||
urllib3.exceptions.ProxyError(),
|
||||
requests.exceptions.ConnectionError(),
|
||||
requests.exceptions.Timeout(),
|
||||
requests.exceptions.TooManyRedirects(),
|
||||
mock_ok]
|
||||
req_obj = utils.init_http_request('fake_url')
|
||||
self.assertEqual(mock_ok, req_obj)
|
||||
|
||||
@mock.patch.object(requests, 'get')
|
||||
def test_init_http_request_wrong_http_status(self, mock_req):
|
||||
mock_fail = mock.Mock()
|
||||
mock_fail.raise_for_status.side_effect = KeyError()
|
||||
mock_req.return_value = mock_fail
|
||||
self.assertRaises(KeyError, utils.init_http_request, 'fake_url')
|
||||
|
||||
@mock.patch('time.sleep')
|
||||
@mock.patch.object(requests, 'get')
|
||||
def test_init_http_request_max_retries_exceeded(self, mock_req, mock_s):
|
||||
mock_req.side_effect = requests.exceptions.ConnectionError()
|
||||
self.assertRaises(errors.HttpUrlConnectionError,
|
||||
utils.init_http_request, 'fake_url')
|
||||
|
|
|
@ -13,15 +13,29 @@
|
|||
# limitations under the License.
|
||||
|
||||
import abc
|
||||
import requests
|
||||
import tarfile
|
||||
import tempfile
|
||||
import zlib
|
||||
|
||||
from oslo.config import cfg
|
||||
|
||||
from fuel_agent import errors
|
||||
from fuel_agent.openstack.common import log as logging
|
||||
from fuel_agent.utils import utils
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
au_opts = [
|
||||
cfg.IntOpt(
|
||||
'data_chunk_size',
|
||||
default=1048576,
|
||||
help='Size of data chunk to operate with images'
|
||||
),
|
||||
]
|
||||
|
||||
CONF = cfg.CONF
|
||||
CONF.register_opts(au_opts)
|
||||
|
||||
|
||||
class Target(object):
|
||||
__metaclass__ = abc.ABCMeta
|
||||
|
@ -56,7 +70,7 @@ class LocalFile(Target):
|
|||
def next(self):
|
||||
if not self.fileobj:
|
||||
self.fileobj = open(self.filename, 'rb')
|
||||
buffer = self.fileobj.read(1048576)
|
||||
buffer = self.fileobj.read(CONF.data_chunk_size)
|
||||
if buffer:
|
||||
return buffer
|
||||
else:
|
||||
|
@ -67,12 +81,34 @@ class LocalFile(Target):
|
|||
class HttpUrl(Target):
|
||||
def __init__(self, url):
|
||||
self.url = str(url)
|
||||
self.response_obj = utils.init_http_request(self.url)
|
||||
self.processed_bytes = 0
|
||||
try:
|
||||
self.length = int(self.response_obj.headers['content-length'])
|
||||
except (ValueError, KeyError):
|
||||
raise errors.HttpUrlInvalidContentLength(
|
||||
'Can not get content length for %s' % self.url)
|
||||
else:
|
||||
LOG.debug('Expected content length %s for %s' % (self.length,
|
||||
self.url))
|
||||
|
||||
def __iter__(self):
|
||||
response = requests.get(self.url, stream=True)
|
||||
if response.status_code != 200:
|
||||
raise Exception('Can not get %s' % self.url)
|
||||
return iter(response.iter_content(1048576))
|
||||
def next(self):
|
||||
while self.processed_bytes < self.length:
|
||||
try:
|
||||
data = self.response_obj.raw.read(CONF.data_chunk_size)
|
||||
if not data:
|
||||
raise errors.HttpUrlConnectionError(
|
||||
'Could not receive data: URL=%s, range=%s' %
|
||||
(self.url, self.processed_bytes))
|
||||
except Exception as exc:
|
||||
LOG.exception(exc)
|
||||
self.response_obj = utils.init_http_request(
|
||||
self.url, self.processed_bytes)
|
||||
continue
|
||||
else:
|
||||
self.processed_bytes += len(data)
|
||||
return data
|
||||
raise StopIteration()
|
||||
|
||||
|
||||
class GunzipStream(Target):
|
||||
|
@ -133,7 +169,7 @@ class ForwardFileStream(Target):
|
|||
self.chunk = None
|
||||
self.position = position
|
||||
|
||||
def read(self, length=1048576):
|
||||
def read(self, length=CONF.data_chunk_size):
|
||||
# NOTE(kozhukalov): default lenght = 1048576 is not usual behaviour,
|
||||
# but that is ok for our use case.
|
||||
if self.closed:
|
||||
|
|
|
@ -18,9 +18,13 @@ import os
|
|||
import re
|
||||
import shlex
|
||||
import subprocess
|
||||
import time
|
||||
|
||||
import jinja2
|
||||
from oslo.config import cfg
|
||||
import requests
|
||||
import stevedore.driver
|
||||
import urllib3
|
||||
|
||||
from fuel_agent import errors
|
||||
from fuel_agent.openstack.common import log as logging
|
||||
|
@ -28,6 +32,27 @@ from fuel_agent.openstack.common import log as logging
|
|||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
u_opts = [
|
||||
cfg.IntOpt(
|
||||
'http_max_retries',
|
||||
default=30,
|
||||
help='Maximum retries count for http requests. 0 means infinite',
|
||||
),
|
||||
cfg.FloatOpt(
|
||||
'http_request_timeout',
|
||||
default=1.0,
|
||||
help='Http request timeout in seconds',
|
||||
),
|
||||
cfg.FloatOpt(
|
||||
'http_retry_delay',
|
||||
default=2.0,
|
||||
help='Delay in seconds before the next http request retry',
|
||||
),
|
||||
]
|
||||
|
||||
CONF = cfg.CONF
|
||||
CONF.register_opts(u_opts)
|
||||
|
||||
|
||||
#NOTE(agordeev): signature compatible with execute from oslo
|
||||
def execute(*cmd, **kwargs):
|
||||
|
@ -116,3 +141,34 @@ def render_and_save(tmpl_dir, tmpl_names, tmpl_data, file_name):
|
|||
raise errors.TemplateWriteError(
|
||||
'Something goes wrong while trying to save'
|
||||
'templated data to {0}'.format(file_name))
|
||||
|
||||
|
||||
def init_http_request(url, byte_range=0):
|
||||
LOG.debug('Trying to initialize http request object %s, byte range: %s'
|
||||
% (url, byte_range))
|
||||
retry = 0
|
||||
while True:
|
||||
if (CONF.http_max_retries == 0) or retry <= CONF.http_max_retries:
|
||||
try:
|
||||
response_obj = requests.get(
|
||||
url, stream=True,
|
||||
timeout=CONF.http_request_timeout,
|
||||
headers={'Range': 'bytes=%s-' % byte_range})
|
||||
except (urllib3.exceptions.DecodeError,
|
||||
urllib3.exceptions.ProxyError,
|
||||
requests.exceptions.ConnectionError,
|
||||
requests.exceptions.Timeout,
|
||||
requests.exceptions.TooManyRedirects) as e:
|
||||
LOG.debug('Got non-critical error when accessing to %s '
|
||||
'on %s attempt: %s' % (url, retry + 1, e))
|
||||
else:
|
||||
LOG.debug('Successful http request to %s on %s retry' %
|
||||
(url, retry + 1))
|
||||
break
|
||||
retry += 1
|
||||
time.sleep(CONF.http_retry_delay)
|
||||
else:
|
||||
raise errors.HttpUrlConnectionError(
|
||||
'Exceeded maximum http request retries for %s' % url)
|
||||
response_obj.raise_for_status()
|
||||
return response_obj
|
||||
|
|
|
@ -9,3 +9,4 @@ pbr>=0.7.0
|
|||
Jinja2
|
||||
stevedore>=0.15
|
||||
requests>=1.2.3
|
||||
urllib3>=1.7
|
||||
|
|
Loading…
Reference in New Issue