Merge "Refactor registry request actions" into stable/train

This commit is contained in:
Zuul 2020-03-21 06:08:46 +00:00 committed by Gerrit Code Review
commit da2cc6203a
2 changed files with 400 additions and 189 deletions

View File

@ -153,6 +153,173 @@ class MakeSession(object):
self.session.close()
class RegistrySessionHelper(object):
""" Class with various registry session helpers
This class contains a bunch of static methods to be used when making
session requests against a container registry. The methods are primarily
used to handle authentication/reauthentication for the requests against
registries that require auth.
"""
@staticmethod
def check_status(session, request, allow_reauth=True):
""" Check request status and trigger reauth
This function can be used to check if we need to perform authentication
for a container registry request because we've gotten a 401.
"""
hash_request_id = hashlib.sha1(str(request.url).encode())
request_id = hash_request_id.hexdigest()
text = getattr(request, 'text', 'unknown')
reason = getattr(request, 'reason', 'unknown')
status_code = getattr(request, 'status_code', None)
headers = getattr(request, 'headers', {})
session_headers = getattr(session, 'headers', {})
if status_code >= 300:
LOG.info(
'Non-2xx: id {}, status {}, reason {}, text {}'.format(
request_id,
status_code,
reason,
text
)
)
if status_code == 401:
LOG.warning(
'Failure: id {}, status {}, reason {} text {}'.format(
request_id,
status_code,
reason,
text
)
)
LOG.debug(
'Request headers after 401: id {}, headers {}'.format(
request_id,
headers
)
)
LOG.debug(
'Session headers after 401: id {}, headers {}'.format(
request_id,
session_headers
)
)
www_auth = headers.get(
'www-authenticate',
headers.get(
'Www-Authenticate'
)
)
if www_auth:
error = None
# Handle docker.io shenanigans. docker.io will return 401
# for 403 and 404 but provide an error string. Other registries
# like registry.redhat.io and quay.io do not do this. So if
# we find an error string, check to see if we should reauth.
do_reauth = allow_reauth
if 'error=' in www_auth:
error = re.search('error="(.*?)"', www_auth).group(1)
LOG.warning(
'Error detected in auth headers: error {}'.format(
error
)
)
do_reauth = (error == 'invalid_token' and allow_reauth)
if do_reauth:
if hasattr(session, 'reauthenticate'):
reauth = int(session.headers.get('_TripleOReAuth', 0))
reauth += 1
session.headers['_TripleOReAuth'] = str(reauth)
LOG.warning(
'Re-authenticating: id {}, count {}'.format(
request_id,
reauth
)
)
session.reauthenticate(**session.auth_args)
request.raise_for_status()
@staticmethod
def _action(action, request_session, *args, **kwargs):
""" Perform a session action and retry if auth fails
This function dynamically performs a specific type of call
using the provided session (get, patch, post, etc). It will
attempt a single re-authentication if the initial request
fails with a 401.
"""
_action = getattr(request_session, action)
try:
req = _action(*args, **kwargs)
RegistrySessionHelper.check_status(session=request_session,
request=req)
except requests.exceptions.HTTPError as e:
if e.response.status_code == 401:
req = _action(*args, **kwargs)
RegistrySessionHelper.check_status(session=request_session,
request=req)
else:
raise
return req
@staticmethod
def get(request_session, *args, **kwargs):
""" Perform a get and retry if auth fails
This function is designed to be used when we perform a get to
an authenticated source. This function will attempt a single
re-authentication request if the first one fails.
"""
return RegistrySessionHelper._action('get',
request_session,
*args,
**kwargs)
@staticmethod
def patch(request_session, *args, **kwargs):
""" Perform a patch and retry if auth fails
This function is designed to be used when we perform a path to
an authenticated source. This function will attempt a single
re-authentication request if the first one fails.
"""
return RegistrySessionHelper._action('patch',
request_session,
*args,
**kwargs)
@staticmethod
def post(request_session, *args, **kwargs):
""" Perform a post and retry if auth fails
This function is designed to be used when we perform a post to
an authenticated source. This function will attempt a single
re-authentication request if the first one fails.
"""
return RegistrySessionHelper._action('post',
request_session,
*args,
**kwargs)
@staticmethod
def put(request_session, *args, **kwargs):
""" Perform a put and retry if auth fails
This function is designed to be used when we perform a put to
an authenticated source. This function will attempt a single
re-authentication request if the first one fails.
"""
return RegistrySessionHelper._action('put',
request_session,
*args,
**kwargs)
class ImageUploadManager(BaseImageManager):
"""Manage the uploading of image files
@ -493,84 +660,6 @@ class BaseImageUploader(object):
response.encoding = encoding
return response.text
@staticmethod
def check_status(session, request, allow_reauth=True):
hash_request_id = hashlib.sha1(str(request.url).encode())
request_id = hash_request_id.hexdigest()
text = getattr(request, 'text', 'unknown')
reason = getattr(request, 'reason', 'unknown')
status_code = getattr(request, 'status_code', None)
headers = getattr(request, 'headers', {})
session_headers = getattr(session, 'headers', {})
if status_code >= 300:
LOG.info(
'Non-2xx: id {}, status {}, reason {}, text {}'.format(
request_id,
status_code,
reason,
text
)
)
if status_code == 401:
LOG.warning(
'Failure: id {}, status {}, reason {} text {}'.format(
request_id,
status_code,
reason,
text
)
)
LOG.debug(
'Request headers after 401: id {}, headers {}'.format(
request_id,
headers
)
)
LOG.debug(
'Session headers after 401: id {}, headers {}'.format(
request_id,
session_headers
)
)
www_auth = headers.get(
'www-authenticate',
headers.get(
'Www-Authenticate'
)
)
if www_auth:
error = None
# Handle docker.io shenanigans. docker.io will return 401
# for 403 and 404 but provide an error string. Other registries
# like registry.redhat.io and quay.io do not do this. So if
# we find an error string, check to see if we should reauth.
do_reauth = allow_reauth
if 'error=' in www_auth:
error = re.search('error="(.*?)"', www_auth).group(1)
LOG.warning(
'Error detected in auth headers: error {}'.format(
error
)
)
do_reauth = (error == 'invalid_token' and allow_reauth)
if do_reauth:
if hasattr(session, 'reauthenticate'):
reauth = int(session.headers.get('_TripleOReAuth', 0))
reauth += 1
session.headers['_TripleOReAuth'] = str(reauth)
LOG.warning(
'Re-authenticating: id {}, count {}'.format(
request_id,
reauth
)
)
session.reauthenticate(**session.auth_args)
request.raise_for_status()
@classmethod
def _build_url(cls, url, path):
netloc = url.netloc
@ -621,15 +710,21 @@ class BaseImageUploader(object):
)
manifest_headers = {'Accept': MEDIA_MANIFEST_V2}
manifest_r = session.get(manifest_url, headers=manifest_headers,
timeout=30)
if manifest_r.status_code in (403, 404):
raise ImageNotFoundException('Not found image: %s' %
image_url.geturl())
cls.check_status(session=session, request=manifest_r)
try:
manifest_r = RegistrySessionHelper.get(
session,
manifest_url,
headers=manifest_headers,
timeout=30
)
except requests.exceptions.HTTPError as e:
if e.response.status_code in (403, 404):
raise ImageNotFoundException('Not found image: %s' %
image_url.geturl())
else:
raise
tags_r = session.get(tags_url, timeout=30)
cls.check_status(session=session, request=tags_r)
tags_r = RegistrySessionHelper.get(session, tags_url, timeout=30)
manifest_str = cls._get_response_text(manifest_r)
@ -656,9 +751,12 @@ class BaseImageUploader(object):
}
config_url = cls._build_url(
image_url, CALL_BLOB % parts)
config_r = session.get(config_url, headers=config_headers,
timeout=30)
cls.check_status(session=session, request=config_r)
config_r = RegistrySessionHelper.get(
session,
config_url,
headers=config_headers,
timeout=30
)
config = config_r.json()
tags = tags_r.json()['tags']
@ -974,8 +1072,7 @@ class BaseImageUploader(object):
'mount': layer,
'from': existing_name
}
r = session.post(url, data=data, timeout=30)
cls.check_status(session=session, request=r)
r = RegistrySessionHelper.post(session, url, data=data, timeout=30)
LOG.debug('%s %s' % (r.status_code, r.reason))
@ -1471,11 +1568,18 @@ class PythonImageUploader(BaseImageUploader):
upload_req_url = cls._build_url(
image_url,
path=CALL_UPLOAD % {'image': image})
r = session.post(upload_req_url, timeout=30)
if r.status_code in (501, 403, 404, 405):
cls.export_registries.add(image_url.netloc)
return True
cls.check_status(session=session, request=r)
try:
RegistrySessionHelper.post(
session,
upload_req_url,
timeout=30
)
except requests.exceptions.HTTPError as e:
if e.response.status_code in (501, 403, 404, 405):
cls.export_registries.add(image_url.netloc)
return True
else:
raise
cls.push_registries.add(image_url.netloc)
return False
@ -1501,10 +1605,18 @@ class PythonImageUploader(BaseImageUploader):
manifest_headers = {'Accept': MEDIA_MANIFEST_V2_LIST}
else:
manifest_headers = {'Accept': MEDIA_MANIFEST_V2}
r = session.get(url, headers=manifest_headers, timeout=30)
if r.status_code in (403, 404):
raise ImageNotFoundException('Not found image: %s' % url)
cls.check_status(session=session, request=r)
try:
r = RegistrySessionHelper.get(
session,
url,
headers=manifest_headers,
timeout=30
)
except requests.exceptions.HTTPError as e:
if e.response.status_code in (403, 404):
raise ImageNotFoundException('Not found image: %s' % url)
else:
raise
return cls._get_response_text(r)
def _collect_manifests_layers(self, image_url, session,
@ -1549,8 +1661,11 @@ class PythonImageUploader(BaseImageUploader):
upload_req_url = cls._build_url(
image_url,
path=CALL_UPLOAD % {'image': image})
r = session.post(upload_req_url, timeout=30)
cls.check_status(session=session, request=r)
r = RegistrySessionHelper.post(
session,
upload_req_url,
timeout=30
)
return r.headers['Location']
@classmethod
@ -1580,7 +1695,8 @@ class PythonImageUploader(BaseImageUploader):
source_blob_url, stream=True, timeout=30) as blob_req:
# TODO(aschultz): unsure if necessary or if only when using .text
blob_req.encoding = 'utf-8'
cls.check_status(session=session, request=blob_req)
RegistrySessionHelper.check_status(session=session,
request=blob_req)
for data in blob_req.iter_content(chunk_size):
LOG.debug("[%s] Read %i bytes for %s" %
(image, len(data), digest))
@ -1731,30 +1847,11 @@ class PythonImageUploader(BaseImageUploader):
CALL_BLOB % parts
)
# Because the image layer fetching can exceed the auth
# token lifetime, we may have a bad token here and don't want
# to retry all of the layer fetching to just fetch the config
# data. Let's try a single retry here (as check_status with
# reauth by default).
try:
r = source_session.get(source_config_url, timeout=30)
cls.check_status(
session=source_session,
request=r
)
except requests.exceptions.HTTPError as e:
LOG.debug('[%s] Config fetch failed, retrying: %s' %
(image, source_config_url))
if e.response.status_code == 401:
# check_status should have reauthed so try on more
# time and raise again if we still have problems.
r = source_session.get(source_config_url, timeout=30)
cls.check_status(
session=source_session,
request=r
)
else:
raise
r = RegistrySessionHelper.get(
source_session,
source_config_url,
timeout=30
)
config_str = cls._get_response_text(r)
manifest['config']['size'] = len(config_str)
@ -1812,7 +1909,8 @@ class PythonImageUploader(BaseImageUploader):
upload_url = cls._upload_url(
target_url,
session=target_session)
r = target_session.put(
r = RegistrySessionHelper.put(
target_session,
upload_url,
timeout=30,
params={
@ -1824,7 +1922,6 @@ class PythonImageUploader(BaseImageUploader):
'Content-Type': 'application/octet-stream'
}
)
cls.check_status(session=target_session, request=r)
# Upload the manifest
image, tag = cls._image_tag_from_url(target_url)
@ -1838,18 +1935,22 @@ class PythonImageUploader(BaseImageUploader):
LOG.debug('[%s] Uploading manifest of type %s to: %s' %
(image, manifest_type, manifest_url))
r = target_session.put(
manifest_url,
timeout=30,
data=manifest_str.encode('utf-8'),
headers={
'Content-Type': manifest_type
}
)
if r.status_code == 400:
LOG.error(cls._get_response_text(r))
raise ImageUploaderException('Pushing manifest failed')
cls.check_status(session=target_session, request=r)
try:
r = RegistrySessionHelper.put(
target_session,
manifest_url,
timeout=30,
data=manifest_str.encode('utf-8'),
headers={
'Content-Type': manifest_type
}
)
except requests.exceptions.HTTPError as e:
if e.response.status_code == 400:
LOG.error(cls._get_response_text(r))
raise ImageUploaderException('Pushing manifest failed')
else:
raise
@classmethod
@tenacity.retry( # Retry up to 5 times with jittered exponential backoff
@ -2049,7 +2150,8 @@ class PythonImageUploader(BaseImageUploader):
chunk_length = len(chunk)
upload_url = cls._upload_url(
target_url, session, upload_resp)
upload_resp = session.patch(
upload_resp = RegistrySessionHelper.patch(
session,
upload_url,
timeout=30,
data=chunk,
@ -2060,21 +2162,20 @@ class PythonImageUploader(BaseImageUploader):
'Content-Type': 'application/octet-stream'
}
)
cls.check_status(session=session, request=upload_resp)
length += chunk_length
layer_digest = 'sha256:%s' % calc_digest.hexdigest()
LOG.debug('[%s] Calculated layer digest' % layer_digest)
upload_url = cls._upload_url(
target_url, session, upload_resp)
upload_resp = session.put(
upload_resp = RegistrySessionHelper.put(
session,
upload_url,
timeout=30,
params={
'digest': layer_digest
},
)
cls.check_status(session=session, request=upload_resp)
layer['digest'] = layer_digest
layer['size'] = length
return (layer_digest, cls._build_url(target_url, target_url.path))

View File

@ -46,6 +46,128 @@ filedata = six.u(
""")
class TestRegistrySessionHelper(base.TestCase):
def setUp(self):
super(TestRegistrySessionHelper, self).setUp()
def test_check_status(self):
session = mock.Mock()
raise_for_status_mock = mock.Mock()
request = mock.Mock()
request.raise_for_status = raise_for_status_mock
request.status_code = 200
image_uploader.RegistrySessionHelper.check_status(session, request)
raise_for_status_mock.assert_called_once()
def test_check_status_reauth(self):
session = mock.Mock()
session_reauth_mock = mock.Mock()
session.headers = {}
session.auth_args = {}
session.reauthenticate = session_reauth_mock
raise_for_status_mock = mock.Mock()
request = mock.Mock()
request.headers = {'www-authenticate': 'foo'}
request.raise_for_status = raise_for_status_mock
request.status_code = 401
image_uploader.RegistrySessionHelper.check_status(session, request)
session_reauth_mock.assert_called_once_with()
raise_for_status_mock.assert_called_once()
@mock.patch('tripleo_common.image.image_uploader.RegistrySessionHelper'
'.check_status')
def test_action(self, mock_status):
request_session = mock.Mock()
mock_get = mock.Mock()
mock_get.return_value = {}
request_session.get = mock_get
image_uploader.RegistrySessionHelper._action('get', request_session)
mock_get.assert_called_once_with()
mock_status.assert_called_once_with(session=request_session,
request={})
@mock.patch('tripleo_common.image.image_uploader.RegistrySessionHelper'
'.check_status')
def test_action_reauth(self, mock_status):
exc_response = mock.Mock()
exc_response.status_code = 401
auth_exc = requests.exceptions.HTTPError(response=exc_response)
mock_status.side_effect = [auth_exc, True]
request_session = mock.Mock()
mock_get = mock.Mock()
mock_get.return_value = {}
request_session.get = mock_get
image_uploader.RegistrySessionHelper._action('get', request_session)
get_call = mock.call()
get_calls = [get_call, get_call]
mock_get.assert_has_calls(get_calls)
status_call = mock.call(session=request_session, request={})
status_calls = [status_call, status_call]
mock_status.assert_has_calls(status_calls)
@mock.patch('tripleo_common.image.image_uploader.RegistrySessionHelper'
'.check_status')
def test_action_reauth_fail(self, mock_status):
exc_response = mock.Mock()
exc_response.status_code = 404
auth_exc = requests.exceptions.HTTPError(response=exc_response)
mock_status.side_effect = auth_exc
request_session = mock.Mock()
mock_get = mock.Mock()
mock_get.return_value = {}
request_session.get = mock_get
self.assertRaises(requests.exceptions.HTTPError,
image_uploader.RegistrySessionHelper._action,
'get',
request_session)
mock_get.assert_called_once_with()
mock_status.assert_called_once_with(session=request_session,
request={})
@mock.patch('tripleo_common.image.image_uploader.RegistrySessionHelper'
'._action')
def test_get(self, mock_action):
request_session = mock.Mock()
image_uploader.RegistrySessionHelper.get(request_session)
mock_action.assert_called_once_with('get',
request_session)
@mock.patch('tripleo_common.image.image_uploader.RegistrySessionHelper'
'._action')
def test_patch(self, mock_action):
request_session = mock.Mock()
image_uploader.RegistrySessionHelper.patch(request_session)
mock_action.assert_called_once_with('patch',
request_session)
@mock.patch('tripleo_common.image.image_uploader.RegistrySessionHelper'
'._action')
def test_post(self, mock_action):
request_session = mock.Mock()
image_uploader.RegistrySessionHelper.post(request_session)
mock_action.assert_called_once_with('post',
request_session)
@mock.patch('tripleo_common.image.image_uploader.RegistrySessionHelper'
'._action')
def test_put(self, mock_action):
request_session = mock.Mock()
image_uploader.RegistrySessionHelper.put(request_session)
mock_action.assert_called_once_with('put',
request_session)
class TestImageUploadManager(base.TestCase):
def setUp(self):
super(TestImageUploadManager, self).setUp()
@ -54,7 +176,7 @@ class TestImageUploadManager(base.TestCase):
self.filelist = files
@mock.patch('tripleo_common.image.image_uploader.'
'BaseImageUploader.check_status')
'RegistrySessionHelper.check_status')
@mock.patch('tripleo_common.image.image_uploader.'
'PythonImageUploader._fetch_manifest')
@mock.patch('tripleo_common.image.image_uploader.'
@ -988,7 +1110,7 @@ class TestSkopeoImageUploader(base.TestCase):
self.uploader._inspect.retry.sleep = mock.Mock()
@mock.patch('tripleo_common.image.image_uploader.'
'BaseImageUploader.check_status')
'RegistrySessionHelper.check_status')
@mock.patch('os.environ')
@mock.patch('subprocess.Popen')
@mock.patch('tripleo_common.image.image_uploader.'
@ -1273,7 +1395,7 @@ class TestPythonImageUploader(base.TestCase):
self.requests = self.useFixture(rm_fixture.Fixture())
@mock.patch('tripleo_common.image.image_uploader.'
'BaseImageUploader.check_status')
'RegistrySessionHelper.check_status')
@mock.patch('tripleo_common.image.image_uploader.'
'PythonImageUploader.authenticate')
@mock.patch('tripleo_common.image.image_uploader.'
@ -1368,7 +1490,7 @@ class TestPythonImageUploader(base.TestCase):
)
@mock.patch('tripleo_common.image.image_uploader.'
'BaseImageUploader.check_status')
'RegistrySessionHelper.check_status')
@mock.patch('tripleo_common.image.image_uploader.'
'PythonImageUploader.authenticate')
@mock.patch('tripleo_common.image.image_uploader.'
@ -1442,7 +1564,7 @@ class TestPythonImageUploader(base.TestCase):
])
@mock.patch('tripleo_common.image.image_uploader.'
'BaseImageUploader.check_status')
'RegistrySessionHelper.check_status')
@mock.patch('tripleo_common.image.image_uploader.'
'PythonImageUploader.authenticate')
@mock.patch('tripleo_common.image.image_uploader.'
@ -1511,7 +1633,7 @@ class TestPythonImageUploader(base.TestCase):
])
@mock.patch('tripleo_common.image.image_uploader.'
'BaseImageUploader.check_status')
'RegistrySessionHelper.check_status')
@mock.patch('tripleo_common.image.image_uploader.'
'PythonImageUploader.authenticate')
@mock.patch('tripleo_common.image.image_uploader.'
@ -1602,7 +1724,7 @@ class TestPythonImageUploader(base.TestCase):
)
@mock.patch('tripleo_common.image.image_uploader.'
'BaseImageUploader.check_status')
'RegistrySessionHelper.check_status')
@mock.patch('tripleo_common.image.image_uploader.'
'PythonImageUploader.authenticate')
@mock.patch('tripleo_common.image.image_uploader.'
@ -1800,7 +1922,7 @@ class TestPythonImageUploader(base.TestCase):
session=target_session)
@mock.patch('tripleo_common.image.image_uploader.'
'BaseImageUploader.check_status')
'RegistrySessionHelper.check_status')
def test_fetch_manifest(self, check_status):
url = urlparse('docker://docker.io/t/nova-api:tripleo-current')
manifest = '{"layers": []}'
@ -1821,7 +1943,7 @@ class TestPythonImageUploader(base.TestCase):
}
)
@mock.patch('tripleo_common.image.image_uploader.'
'BaseImageUploader.check_status')
'RegistrySessionHelper.check_status')
def test_upload_url(self, check_status):
# test with previous request
previous_request = mock.Mock()
@ -1948,14 +2070,16 @@ class TestPythonImageUploader(base.TestCase):
)
@mock.patch('tripleo_common.image.image_uploader.'
'BaseImageUploader.check_status')
'PythonImageUploader._copy_manifest_config_to_registry')
@mock.patch('tripleo_common.image.image_uploader.'
'RegistrySessionHelper.get')
@mock.patch('tripleo_common.image.image_uploader.'
'PythonImageUploader._upload_url')
@mock.patch('tripleo_common.image.image_uploader.'
'PythonImageUploader.'
'_copy_layer_registry_to_registry')
def test_copy_registry_to_registry(self, _copy_layer, _upload_url,
check_status):
mock_get, mock_copy_manifest):
source_url = urlparse('docker://docker.io/t/nova-api:latest')
target_url = urlparse('docker://192.168.2.1:5000/t/nova-api:latest')
_upload_url.return_value = 'https://192.168.2.1:5000/v2/upload'
@ -1963,7 +2087,7 @@ class TestPythonImageUploader(base.TestCase):
source_session = mock.Mock()
target_session = mock.Mock()
source_session.get.return_value.text = '{}'
mock_get.return_value.text = '{}'
manifest = json.dumps({
'mediaType': image_uploader.MEDIA_MANIFEST_V2,
@ -1988,7 +2112,8 @@ class TestPythonImageUploader(base.TestCase):
target_session=target_session
)
source_session.get.assert_called_once_with(
mock_get.assert_called_once_with(
source_session,
'https://registry-1.docker.io/v2/t/nova-api/blobs/sha256:1234',
timeout=30
)
@ -2006,36 +2131,22 @@ class TestPythonImageUploader(base.TestCase):
'distribution.manifest.v2+json',
}
target_session.put.assert_has_calls([
mock_copy_manifest.assert_has_calls([
mock.call(
'https://192.168.2.1:5000/v2/upload',
data='{}'.encode('utf-8'),
headers={
'Content-Length':
'2',
'Content-Type':
'application/octet-stream'
},
params={'digest': 'sha256:1234'},
timeout=30
),
mock.call(
'https://192.168.2.1:5000/v2/t/nova-api/manifests/latest',
data=mock.ANY,
headers={
'Content-Type': 'application/vnd.docker.'
'distribution.manifest.v2+json'
},
timeout=30
),
target_url=target_url,
manifest_str=mock.ANY,
config_str='{}',
target_session=target_session,
multi_arch=False
)
])
put_manifest = json.loads(
target_session.put.call_args[1]['data'].decode('utf-8')
mock_copy_manifest.call_args[1]['manifest_str']
)
self.assertEqual(target_manifest, put_manifest)
@mock.patch('tripleo_common.image.image_uploader.'
'BaseImageUploader.check_status')
'RegistrySessionHelper.check_status')
@mock.patch('tripleo_common.image.image_uploader.'
'BaseImageUploader._build_url')
@mock.patch('tripleo_common.image.image_uploader.'
@ -2152,13 +2263,13 @@ class TestPythonImageUploader(base.TestCase):
self.assertEqual(expected_manifest, call_manifest)
@mock.patch('tripleo_common.image.image_uploader.'
'BaseImageUploader.check_status')
'RegistrySessionHelper.put')
@mock.patch('tripleo_common.image.image_uploader.'
'BaseImageUploader._build_url')
@mock.patch('tripleo_common.image.image_uploader.'
'BaseImageUploader._image_tag_from_url')
def test_copy_manifest_config_to_registry_oci(self, image_tag_mock,
build_url_mock, status_mock):
build_url_mock, put_mock):
target_url = urlparse('docker://192.168.2.1:5000/t/nova-api:latest')
@ -2167,9 +2278,7 @@ class TestPythonImageUploader(base.TestCase):
build_url = 'https://192.168.2.1:5000/v2/t/nova-api'
build_url_mock.return_value = build_url
target_session = mock.Mock()
target_put = mock.Mock()
target_put.return_value.text = '{}'
target_session.put = target_put
put_mock.return_value.text = '{}'
config_str = None
@ -2206,15 +2315,16 @@ class TestPythonImageUploader(base.TestCase):
target_session=target_session
)
calls = [mock.call(build_url,
calls = [mock.call(target_session,
build_url,
data=mock.ANY,
headers=expected_headers,
timeout=30)]
target_put.assert_has_calls(calls)
put_mock.assert_has_calls(calls)
# We're seeing ordering issues with the py27 checking this field
# so switch to checking it this way
call_manifest = json.loads(
target_put.call_args[1]['data'].decode('utf-8')
put_mock.call_args[1]['data'].decode('utf-8')
)
self.assertEqual(expected_manifest, call_manifest)