Make upload workers faster on processing layers

Make upload workers processing image layers only once (as the best
effort). This also reworks and simplifies locks management for
individual tasks now managed for the PythonImageUploader class
namespace only.

When fetching source layer, cross-link it for the target
local image, whenever that source is already exists. When pushing a
layer to a target registry, do not repeat transfering the same data,
if already pushed earlier for another image.

The 1st time a layer gets uploaded/fetched for an image, that image and
its known path (local or remote) becomes a reference for future
cross-referencing by other images.

Store such information about already processed layers in global view
shared for all workers to speed-up data transfering jobs they execute.

Having that global view, uploading the 1st image in the tasks list as a
separate (and non-concurrent) job becomes redundant and now will be
executed concurently with other images.

Based on the dynamically picked multi-workers mode, provide the global
view as a graf with its MP/MT state synchronization as the following:

* use globally shared locking info also containing global layers view
  for MP-workers. With the shared global view state we can no longer
  use local locking objects individual for each task.
* if cannot use multi-process workers, like when executing it via
  Mistral by monkey patched eventlet greenthreads, choose threadinglock
  and multi-threads-safe standard dictionary in the shared class
  namespace to store the global view there
* if it can do MP, pick processlock also containing a safe from data
  races Manager().dict() as the global view shared among cooperating OS
  processes.
* use that global view in a transparent fashion, provided by a special
  classmethod proxying access to the internal state shared for workers.

Ultimately, all that optimizes:

* completion time
* re-fetching of the already processed layers
* local deduplication of layers
* the amount of outbound HTTP requests to registries
* if-layer-exists and other internal logic check executed against the
  in-memory cache firstly.

As layers locking and unlocking becomes a popular action, reduce the
noise of the debug messages it produces.

Closes-bug: #1847225
Related-bug: #1844446

Change-Id: Ie5ef4045b7e22c06551e886f9f9b6f22c8d4bd21
Signed-off-by: Bogdan Dobrelya <bdobreli@redhat.com>
This commit is contained in:
Bogdan Dobrelya 2019-10-15 11:33:16 +02:00
parent 26bd0efd26
commit 46f8129894
7 changed files with 276 additions and 115 deletions

View File

@ -21,6 +21,7 @@ import requests
import shutil
from oslo_log import log as logging
from tripleo_common.utils import image as image_utils
LOG = logging.getLogger(__name__)
@ -143,39 +144,68 @@ def export_stream(target_url, layer, layer_stream, verify_digest=True):
)
if blob_path != expected_blob_path:
os.rename(blob_path, expected_blob_path)
blob_path = expected_blob_path
layer['digest'] = layer_digest
layer['size'] = length
LOG.debug('[%s] Done exporting image layer %s' % (image, digest))
return layer_digest
return (layer_digest, blob_path)
def cross_repo_mount(target_image_url, image_layers, source_layers):
for layer in source_layers:
if layer not in image_layers:
continue
image_url = image_layers[layer]
image, tag = image_tag_from_url(image_url)
dir_path = os.path.join(IMAGE_EXPORT_DIR, 'v2', image, 'blobs')
blob_path = os.path.join(dir_path, '%s.gz' % layer)
if not os.path.exists(blob_path):
LOG.debug('[%s] Layer not found: %s' % (image, blob_path))
continue
target_image, tag = image_tag_from_url(target_image_url)
target_dir_path = os.path.join(
IMAGE_EXPORT_DIR, 'v2', target_image, 'blobs')
make_dir(target_dir_path)
target_blob_path = os.path.join(target_dir_path, '%s.gz' % layer)
if os.path.exists(target_blob_path):
continue
def layer_cross_link(layer, image, blob_path, target_image_url):
target_image, _ = image_tag_from_url(target_image_url)
target_dir_path = os.path.join(
IMAGE_EXPORT_DIR, 'v2', target_image, 'blobs')
make_dir(target_dir_path)
target_blob_path = os.path.join(target_dir_path, '%s.gz' % layer)
if not os.path.exists(target_blob_path):
LOG.debug('[%s] Linking layers: %s -> %s' %
(image, blob_path, target_blob_path))
# make a hard link so the layers can have independent lifecycles
os.link(blob_path, target_blob_path)
def cross_repo_mount(target_image_url, image_layers, source_layers,
uploaded_layers=None):
linked_layers = {}
target_image, _ = image_tag_from_url(target_image_url)
for layer in source_layers:
known_path, ref_image = image_utils.uploaded_layers_details(
uploaded_layers, layer, scope='local')
if layer not in image_layers and not ref_image:
continue
image_url = image_layers.get(layer, None)
if image_url:
image, _ = image_tag_from_url(image_url)
else:
image = ref_image
if not image:
continue
if known_path and ref_image:
blob_path = known_path
image = ref_image
if ref_image != image:
LOG.debug('[%s] Layer ref. by image %s already exists '
'at %s' % (image, ref_image, known_path))
else:
LOG.debug('[%s] Layer already exists at %s'
% (image, known_path))
else:
dir_path = os.path.join(IMAGE_EXPORT_DIR, 'v2', image, 'blobs')
blob_path = os.path.join(dir_path, '%s.gz' % layer)
if not os.path.exists(blob_path):
LOG.debug('[%s] Layer not found: %s' % (image, blob_path))
continue
layer_cross_link(layer, image, blob_path, target_image_url)
linked_layers.update({layer: {'known_path': blob_path,
'ref_image': image}})
return linked_layers
def export_manifest_config(target_url,
manifest_str,
manifest_type,

View File

@ -40,6 +40,7 @@ from tripleo_common.image.exception import ImageNotFoundException
from tripleo_common.image.exception import ImageUploaderException
from tripleo_common.image.exception import ImageUploaderThreadException
from tripleo_common.image import image_export
from tripleo_common.utils import image as image_utils
from tripleo_common.utils.locks import threadinglock
@ -164,8 +165,9 @@ class ImageUploadManager(BaseImageManager):
super(ImageUploadManager, self).__init__(config_files)
self.uploaders = {
'skopeo': SkopeoImageUploader(),
'python': PythonImageUploader(lock)
'python': PythonImageUploader()
}
self.uploaders['python'].init_global_state(lock)
self.dry_run = dry_run
self.cleanup = cleanup
if mirrors:
@ -177,7 +179,6 @@ class ImageUploadManager(BaseImageManager):
for uploader in self.uploaders.values():
uploader.registry_credentials = registry_credentials
self.multi_arch = multi_arch
self.lock = lock
@staticmethod
def validate_registry_credentials(creds_data):
@ -250,7 +251,7 @@ class ImageUploadManager(BaseImageManager):
tasks.append(UploadTask(
image_name, pull_source, push_destination,
append_tag, modify_role, modify_vars, self.dry_run,
self.cleanup, multi_arch, self.lock))
self.cleanup, multi_arch))
# NOTE(mwhahaha): We want to randomize the upload process because of
# the shared nature of container layers. Because we multiprocess the
@ -282,13 +283,12 @@ class BaseImageUploader(object):
export_registries = set()
push_registries = set()
def __init__(self, lock=None):
def __init__(self):
self.upload_tasks = []
# A mapping of layer hashs to the image which first copied that
# layer to the target
self.image_layers = {}
self.registry_credentials = {}
self.lock = lock
@classmethod
def init_registries_cache(cls):
@ -912,8 +912,14 @@ class BaseImageUploader(object):
name = target_image_url.path.split(':')[0][1:]
export = netloc in cls.export_registries
if export:
image_export.cross_repo_mount(
target_image_url, image_layers, source_layers)
linked_layers = image_export.cross_repo_mount(
target_image_url, image_layers, source_layers,
uploaded_layers=cls._global_view_proxy())
# track linked layers globally for future references
for layer, info in linked_layers.items():
cls._track_uploaded_layers(
layer, known_path=info['known_path'],
image_ref=info['ref_image'], scope='local')
return
if netloc in cls.insecure_registries:
@ -923,17 +929,24 @@ class BaseImageUploader(object):
url = '%s://%s/v2/%s/blobs/uploads/' % (scheme, netloc, name)
for layer in source_layers:
if layer in image_layers:
known_path, existing_name = image_utils.uploaded_layers_details(
cls._global_view_proxy(), layer, scope='remote')
if layer not in image_layers and not existing_name:
continue
if not existing_name:
existing_name = image_layers[layer].path.split(':')[0][1:]
LOG.info('[%s] Cross repository blob mount from %s' %
(layer, existing_name))
data = {
'mount': layer,
'from': existing_name
}
r = session.post(url, data=data, timeout=30)
cls.check_status(session=session, request=r)
LOG.debug('%s %s' % (r.status_code, r.reason))
if existing_name != name:
LOG.debug('[%s] Layer %s ref. by image %s already exists '
'at %s' % (name, layer, existing_name, known_path))
LOG.info('[%s] Cross repository blob mount from %s' %
(layer, existing_name))
data = {
'mount': layer,
'from': existing_name
}
r = session.post(url, data=data, timeout=30)
cls.check_status(session=session, request=r)
LOG.debug('%s %s' % (r.status_code, r.reason))
class SkopeoImageUploader(BaseImageUploader):
@ -1124,43 +1137,87 @@ class SkopeoImageUploader(BaseImageUploader):
class PythonImageUploader(BaseImageUploader):
"""Upload images using a direct implementation of the registry API"""
uploaded_layers = {} # provides global view for multi-threading workers
lock = None # provides global locking info plus global view, if MP is used
@classmethod
def init_global_state(cls, lock):
if not cls.lock:
cls.lock = lock
@classmethod
@tenacity.retry( # Retry until we no longer have collisions
retry=tenacity.retry_if_exception_type(ImageUploaderThreadException),
wait=tenacity.wait_random_exponential(multiplier=1, max=10)
)
def _layer_fetch_lock(cls, layer, lock=None):
if not lock:
def _layer_fetch_lock(cls, layer):
if not cls.lock:
LOG.warning('No lock information provided for layer %s' % layer)
return
if layer in lock.objects():
if layer in cls.lock.objects():
LOG.debug('[%s] Layer is being fetched by another thread' % layer)
raise ImageUploaderThreadException('layer being fetched')
LOG.debug('Locking layer %s' % layer)
LOG.debug('Starting acquire for lock %s' % layer)
with lock.get_lock():
if layer in lock.objects():
known_path, image = image_utils.uploaded_layers_details(
cls._global_view_proxy(), layer, scope='local')
if not known_path or not image:
known_path, image = image_utils.uploaded_layers_details(
cls._global_view_proxy(), layer, scope='remote')
if image and known_path:
# already processed layers needs no further locking
return
with cls.lock.get_lock():
if layer in cls.lock.objects():
LOG.debug('Collision for lock %s' % layer)
raise ImageUploaderThreadException('layer conflict')
LOG.debug('Acquired for lock %s' % layer)
lock.objects().append(layer)
LOG.debug('Updated lock info %s' % layer)
cls.lock.objects().append(layer)
LOG.debug('Got lock on layer %s' % layer)
@classmethod
def _layer_fetch_unlock(cls, layer, lock=None):
if not lock:
def _layer_fetch_unlock(cls, layer):
if not cls.lock:
LOG.warning('No lock information provided for layer %s' % layer)
return
LOG.debug('Unlocking layer %s' % layer)
LOG.debug('Starting acquire for lock %s' % layer)
with lock.get_lock():
LOG.debug('Acquired for unlock %s' % layer)
while layer in lock.objects():
lock.objects().remove(layer)
LOG.debug('Updated lock info %s' % layer)
with cls.lock.get_lock():
while layer in cls.lock.objects():
cls.lock.objects().remove(layer)
LOG.debug('Released lock on layer %s' % layer)
@classmethod
def _global_view_proxy(cls, value=None, forget=False):
if not cls.lock:
LOG.warning('No lock information provided for value %s' % value)
return
with cls.lock.get_lock():
if value and forget:
cls.uploaded_layers.pop(value, None)
if hasattr(cls.lock, '_global_view'):
cls.lock._global_view.pop(value, None)
elif value:
cls.uploaded_layers.update(value)
if hasattr(cls.lock, '_global_view'):
cls.lock._global_view.update(value)
if not value:
# return global view consolidated among MP/MT workers state
if hasattr(cls.lock, '_global_view'):
consolidated_view = cls.uploaded_layers.copy()
consolidated_view.update(cls.lock._global_view)
return consolidated_view
else:
return cls.uploaded_layers
@classmethod
def _track_uploaded_layers(cls, layer, known_path=None, image_ref=None,
forget=False, scope='remote'):
if forget:
LOG.debug('Untracking processed layer %s for any scope' % layer)
cls._global_view_proxy(value=layer, forget=True)
else:
LOG.debug('Tracking processed layer %s for %s scope'
% (layer, scope))
cls._global_view_proxy(
value={layer: {scope: {'ref': image_ref, 'path': known_path}}})
def upload_image(self, task):
"""Upload image from a task
@ -1185,8 +1242,6 @@ class PythonImageUploader(BaseImageUploader):
if t.dry_run:
return []
lock = t.lock
target_username, target_password = self.credentials_for_registry(
t.target_image_url.netloc)
target_session = self.authenticate(
@ -1277,8 +1332,7 @@ class PythonImageUploader(BaseImageUploader):
source_session=source_session,
target_session=target_session,
source_layers=source_layers,
multi_arch=t.multi_arch,
lock=lock
multi_arch=t.multi_arch
)
except Exception:
LOG.error('[%s] Failed uploading the target '
@ -1494,38 +1548,58 @@ class PythonImageUploader(BaseImageUploader):
def _copy_layer_registry_to_registry(cls, source_url, target_url,
layer,
source_session=None,
target_session=None,
lock=None):
target_session=None):
layer_entry = {'digest': layer}
try:
cls._layer_fetch_lock(layer, lock)
cls._layer_fetch_lock(layer)
if cls._target_layer_exists_registry(
target_url, layer_entry, [layer_entry], target_session):
cls._layer_fetch_unlock(layer, lock)
cls._layer_fetch_unlock(layer)
return
known_path, ref_image = image_utils.uploaded_layers_details(
cls._global_view_proxy(), layer, scope='local')
if known_path and ref_image:
# cross-link target from local source, skip fetching it again
image_export.layer_cross_link(
layer, ref_image, known_path, target_url)
cls._layer_fetch_unlock(layer)
return
except ImageUploaderThreadException:
# skip trying to unlock, because that's what threw the exception
raise
except Exception:
cls._layer_fetch_unlock(layer, lock)
cls._layer_fetch_unlock(layer)
raise
digest = layer_entry['digest']
LOG.debug('[%s] Uploading layer' % digest)
calc_digest = hashlib.sha256()
known_path = None
layer_val = None
try:
layer_stream = cls._layer_stream_registry(
digest, source_url, calc_digest, source_session)
layer_val = cls._copy_stream_to_registry(
layer_val, known_path = cls._copy_stream_to_registry(
target_url, layer_entry, calc_digest, layer_stream,
target_session)
except (IOError, requests.exceptions.HTTPError):
cls._track_uploaded_layers(layer, forget=True, scope='remote')
LOG.error('[%s] Failed processing layer for the target '
'image %s' % (layer, target_url.geturl()))
raise
except Exception:
raise
else:
if layer_val and known_path:
image_ref = target_url.path.split(':')[0][1:]
uploaded = parse.urlparse(known_path).scheme
cls._track_uploaded_layers(
layer_val, known_path=known_path, image_ref=image_ref,
scope=('remote' if uploaded else 'local'))
return layer_val
finally:
cls._layer_fetch_unlock(layer, lock)
cls._layer_fetch_unlock(layer)
@classmethod
def _assert_scheme(cls, url, scheme):
@ -1547,8 +1621,7 @@ class PythonImageUploader(BaseImageUploader):
source_session=None,
target_session=None,
source_layers=None,
multi_arch=False,
lock=None):
multi_arch=False):
cls._assert_scheme(source_url, 'docker')
cls._assert_scheme(target_url, 'docker')
@ -1570,8 +1643,7 @@ class PythonImageUploader(BaseImageUploader):
source_url, target_url,
layer=layer,
source_session=source_session,
target_session=target_session,
lock=lock
target_session=target_session
))
jobs_count = len(copy_jobs)
@ -1742,27 +1814,40 @@ class PythonImageUploader(BaseImageUploader):
def _target_layer_exists_registry(cls, target_url, layer, check_layers,
session):
image, tag = cls._image_tag_from_url(target_url)
norm_image = (image[1:] if image.startswith('/') else image)
parts = {
'image': image,
'tag': tag
}
# Do a HEAD call for the supplied digests
# to see if the layer is already in the registry
layer_found = None
# Check in global view or do a HEAD call for the supplied
# digests to see if the layer is already in the registry
for l in check_layers:
if not l:
continue
parts['digest'] = l['digest']
blob_url = cls._build_url(
target_url, CALL_BLOB % parts)
if session.head(blob_url, timeout=30).status_code == 200:
LOG.debug('[%s] Layer already exists: %s' %
(image, l['digest']))
layer['digest'] = l['digest']
if 'size' in l:
layer['size'] = l['size']
if 'mediaType' in l:
layer['mediaType'] = l['mediaType']
return True
known_path, ref_image = image_utils.uploaded_layers_details(
cls._global_view_proxy(), l['digest'], scope='remote')
if ref_image == norm_image:
LOG.debug('[%s] Layer %s already exists at %s' %
(image, l['digest'], known_path))
layer_found = l
break
else:
parts['digest'] = l['digest']
blob_url = cls._build_url(
target_url, CALL_BLOB % parts)
if session.head(blob_url, timeout=30).status_code == 200:
LOG.debug('[%s] Layer already exists: %s' %
(image, l['digest']))
layer_found = l
break
if layer_found:
layer['digest'] = layer_found['digest']
if 'size' in layer_found:
layer['size'] = layer_found['size']
if 'mediaType' in layer_found:
layer['mediaType'] = layer_found['mediaType']
return True
return False
@classmethod
@ -1809,8 +1894,8 @@ class PythonImageUploader(BaseImageUploader):
def _copy_layer_local_to_registry(cls, target_url,
session, layer, layer_entry):
# Do a HEAD call for the compressed-diff-digest and diff-digest
# to see if the layer is already in the registry
# Check in global view or do a HEAD call for the compressed-diff-digest
# and diff-digest to see if the layer is already in the registry
check_layers = []
compressed_digest = layer_entry.get('compressed-diff-digest')
if compressed_digest:
@ -1835,10 +1920,29 @@ class PythonImageUploader(BaseImageUploader):
LOG.debug('[%s] Uploading layer' % layer_id)
calc_digest = hashlib.sha256()
layer_stream = cls._layer_stream_local(layer_id, calc_digest)
return cls._copy_stream_to_registry(target_url, layer, calc_digest,
layer_stream, session,
verify_digest=False)
known_path = None
layer_val = None
try:
layer_stream = cls._layer_stream_local(layer_id, calc_digest)
layer_val, known_path = cls._copy_stream_to_registry(
target_url, layer, calc_digest, layer_stream, session,
verify_digest=False)
except (IOError, requests.exceptions.HTTPError):
cls._track_uploaded_layers(
layer['digest'], forget=True, scope='remote')
LOG.error('[%s] Failed processing layer for the target '
'image %s' % (layer['digest'], target_url.geturl()))
raise
except Exception:
raise
else:
if layer_val and known_path:
image_ref = target_url.path.split(':')[0][1:]
uploaded = parse.urlparse(known_path).scheme
cls._track_uploaded_layers(
layer_val, known_path=known_path, image_ref=image_ref,
scope=('remote' if uploaded else 'local'))
return layer_val
@classmethod
def _copy_stream_to_registry(cls, target_url, layer, calc_digest,
@ -1887,7 +1991,7 @@ class PythonImageUploader(BaseImageUploader):
cls.check_status(session=session, request=upload_resp)
layer['digest'] = layer_digest
layer['size'] = length
return layer_digest
return (layer_digest, cls._build_url(target_url, target_url.path))
@classmethod
@tenacity.retry( # Retry up to 5 times with jittered exponential backoff
@ -2089,10 +2193,6 @@ class PythonImageUploader(BaseImageUploader):
return
local_images = []
# Pull a single image first, to avoid duplicate pulls of the
# same base layers
local_images.extend(upload_task(args=self.upload_tasks.pop()))
with self._get_executor() as p:
for result in p.map(upload_task, self.upload_tasks):
local_images.extend(result)
@ -2107,7 +2207,7 @@ class UploadTask(object):
def __init__(self, image_name, pull_source, push_destination,
append_tag, modify_role, modify_vars, dry_run, cleanup,
multi_arch, lock=None):
multi_arch):
self.image_name = image_name
self.pull_source = pull_source
self.push_destination = push_destination
@ -2117,7 +2217,6 @@ class UploadTask(object):
self.dry_run = dry_run
self.cleanup = cleanup
self.multi_arch = multi_arch
self.lock = lock
if ':' in image_name:
image = image_name.rpartition(':')[0]

View File

@ -378,7 +378,7 @@ def container_images_prepare(template_file=DEFAULT_TEMPLATE_FILE,
del(entry['services'])
params.update(
detect_insecure_registries(params))
detect_insecure_registries(params, lock=lock))
return_data = {}
if output_env_file:
@ -388,7 +388,7 @@ def container_images_prepare(template_file=DEFAULT_TEMPLATE_FILE,
return return_data
def detect_insecure_registries(params):
def detect_insecure_registries(params, lock=None):
"""Detect insecure registries in image parameters
:param params: dict of container image parameters
@ -396,7 +396,7 @@ def detect_insecure_registries(params):
merged into other parameters
"""
insecure = set()
uploader = image_uploader.ImageUploadManager().uploader('python')
uploader = image_uploader.ImageUploadManager(lock=lock).uploader('python')
for image in params.values():
host = image.split('/')[0]
if uploader.is_insecure_registry(host):

View File

@ -89,7 +89,7 @@ class TestImageExport(base.TestCase):
}
calc_digest = hashlib.sha256()
layer_stream = io.BytesIO(blob_compressed)
layer_digest = image_export.export_stream(
layer_digest, _ = image_export.export_stream(
target_url, layer, layer_stream, verify_digest=False
)
self.assertEqual(compressed_digest, layer_digest)
@ -145,7 +145,8 @@ class TestImageExport(base.TestCase):
target_blob_path = os.path.join(target_blob_dir, 'sha256:1234.gz')
# call with missing source, no change
image_export.cross_repo_mount(target_url, image_layers, source_layers)
image_export.cross_repo_mount(target_url, image_layers, source_layers,
uploaded_layers={})
self.assertFalse(os.path.exists(source_blob_path))
self.assertFalse(os.path.exists(target_blob_path))
@ -155,7 +156,8 @@ class TestImageExport(base.TestCase):
self.assertTrue(os.path.exists(source_blob_path))
# call with existing source
image_export.cross_repo_mount(target_url, image_layers, source_layers)
image_export.cross_repo_mount(target_url, image_layers, source_layers,
uploaded_layers={})
self.assertTrue(os.path.exists(target_blob_path))
with open(target_blob_path, 'r') as f:
self.assertEqual('blob', f.read())

View File

@ -1318,8 +1318,7 @@ class TestPythonImageUploader(base.TestCase):
source_session=source_session,
target_session=target_session,
source_layers=['sha256:aaa', 'sha256:bbb', 'sha256:ccc'],
multi_arch=False,
lock=None
multi_arch=False
)
@mock.patch('tripleo_common.image.image_uploader.'
@ -1553,8 +1552,7 @@ class TestPythonImageUploader(base.TestCase):
source_session=source_session,
target_session=target_session,
source_layers=['sha256:aaa', 'sha256:bbb', 'sha256:ccc'],
multi_arch=False,
lock=None
multi_arch=False
)
@mock.patch('tripleo_common.image.image_uploader.'
@ -1685,8 +1683,7 @@ class TestPythonImageUploader(base.TestCase):
source_session=source_session,
target_session=target_session,
source_layers=['sha256:aaa', 'sha256:bbb', 'sha256:ccc'],
multi_arch=False,
lock=None
multi_arch=False
)
_copy_registry_to_local.assert_called_once_with(unmodified_target_url)
run_modify_playbook.assert_called_once_with(
@ -1816,7 +1813,8 @@ class TestPythonImageUploader(base.TestCase):
@mock.patch('tripleo_common.image.image_uploader.'
'PythonImageUploader._upload_url')
def test_copy_layer_registry_to_registry(self, _upload_url):
@mock.patch('tripleo_common.utils.image.uploaded_layers_details')
def test_copy_layer_registry_to_registry(self, global_check, _upload_url):
_upload_url.return_value = 'https://192.168.2.1:5000/v2/upload'
source_url = urlparse('docker://docker.io/t/nova-api:latest')
target_url = urlparse('docker://192.168.2.1:5000/t/nova-api:latest')
@ -1835,6 +1833,7 @@ class TestPythonImageUploader(base.TestCase):
layer = layer_entry['digest']
# layer already exists at destination
global_check.return_value = (None, None)
self.requests.head(
'https://192.168.2.1:5000/v2/t/nova-api/blobs/%s' % blob_digest,
status_code=200
@ -2022,8 +2021,9 @@ class TestPythonImageUploader(base.TestCase):
@mock.patch('subprocess.Popen')
@mock.patch('tripleo_common.image.image_uploader.'
'PythonImageUploader._upload_url')
def test_copy_layer_local_to_registry(self, _upload_url, mock_popen,
mock_exists):
@mock.patch('tripleo_common.utils.image.uploaded_layers_details')
def test_copy_layer_local_to_registry(self, global_check, _upload_url,
mock_popen, mock_exists):
mock_exists.return_value = True
_upload_url.return_value = 'https://192.168.2.1:5000/v2/upload'
target_url = urlparse('docker://192.168.2.1:5000/t/nova-api:latest')
@ -2048,6 +2048,7 @@ class TestPythonImageUploader(base.TestCase):
}
# layer already exists at destination
global_check.return_value = (None, None)
self.requests.head(
'https://192.168.2.1:5000/v2/t/'
'nova-api/blobs/%s' % compressed_digest,
@ -2117,6 +2118,7 @@ class TestPythonImageUploader(base.TestCase):
layer
)
@mock.patch('tripleo_common.utils.image.uploaded_layers_details')
@mock.patch('tripleo_common.image.image_uploader.'
'PythonImageUploader._image_manifest_config')
@mock.patch('tripleo_common.image.image_uploader.'
@ -2127,11 +2129,12 @@ class TestPythonImageUploader(base.TestCase):
'PythonImageUploader._upload_url')
def test_copy_local_to_registry(self, _upload_url, _containers_json,
_copy_layer_local_to_registry,
_image_manifest_config):
_image_manifest_config, _global_check):
source_url = urlparse('containers-storage:/t/nova-api:latest')
target_url = urlparse('docker://192.168.2.1:5000/t/nova-api:latest')
target_session = requests.Session()
_upload_url.return_value = 'https://192.168.2.1:5000/v2/upload'
_global_check.return_value = (None, None)
layers = [{
"compressed-diff-digest": "sha256:aeb786",
"compressed-size": 74703002,

View File

@ -0,0 +1,26 @@
# Copyright 2019 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
def uploaded_layers_details(uploaded_layers, layer, scope):
known_path = None
known_layer = None
image = None
if layer:
known_layer = uploaded_layers.get(layer, None)
if known_layer and scope in known_layer:
known_path = known_layer[scope].get('path', None)
image = known_layer[scope].get('ref', None)
return (known_path, image)

View File

@ -23,6 +23,7 @@ from tripleo_common.utils.locks import base
class ProcessLock(base.BaseLock):
# the manager cannot live in __init__
_mgr = multiprocessing.Manager()
_global_view = _mgr.dict()
def __init__(self):
self._lock = self._mgr.Lock()