From e57116d9c45c53ab5395025a32076c7c5bedbe45 Mon Sep 17 00:00:00 2001 From: Alex Schultz Date: Thu, 26 Sep 2019 14:30:13 -0600 Subject: [PATCH] Implement threading locks around layers When we fetch layers, we shouldn't fetch the same layers multiple times. This change adds some locking basked on layer hashes to prevent multiple threads from trying to fetch the same layer at the same time. Change-Id: I477219b7dca1e6cfa02a278c55a0cc1a9833d007 Related-Bug: #1844446 --- tripleo_common/image/exception.py | 5 ++ tripleo_common/image/image_uploader.py | 68 +++++++++++++++++++++++--- 2 files changed, 66 insertions(+), 7 deletions(-) diff --git a/tripleo_common/image/exception.py b/tripleo_common/image/exception.py index b72668c28..a46fe350b 100644 --- a/tripleo_common/image/exception.py +++ b/tripleo_common/image/exception.py @@ -26,5 +26,10 @@ class ImageUploaderException(Exception): pass +class ImageUploaderThreadException(Exception): + """Conflict during thread processing""" + pass + + class ImageNotFoundException(Exception): pass diff --git a/tripleo_common/image/image_uploader.py b/tripleo_common/image/image_uploader.py index 792dce58a..8c0cdcb04 100644 --- a/tripleo_common/image/image_uploader.py +++ b/tripleo_common/image/image_uploader.py @@ -30,6 +30,8 @@ from six.moves.urllib import parse import subprocess import tempfile import tenacity +import threading +import time import yaml from oslo_concurrency import processutils @@ -38,6 +40,7 @@ from tripleo_common.actions import ansible from tripleo_common.image.base import BaseImageManager from tripleo_common.image.exception import ImageNotFoundException from tripleo_common.image.exception import ImageUploaderException +from tripleo_common.image.exception import ImageUploaderThreadException from tripleo_common.image import image_export from tripleo_common.utils import common as common_utils @@ -1100,6 +1103,40 @@ class SkopeoImageUploader(BaseImageUploader): class PythonImageUploader(BaseImageUploader): """Upload images using a direct implementation of the registry API""" + uploader_lock = threading.Lock() + uploader_lock_info = set() + + @classmethod + @tenacity.retry( # Retry until we no longer have collisions + retry=tenacity.retry_if_exception_type(ImageUploaderThreadException), + wait=tenacity.wait_random_exponential(multiplier=1, max=10) + ) + def _layer_fetch_lock(cls, layer): + LOG.debug('Locking layer %s' % layer) + while layer in cls.uploader_lock_info: + LOG.debug('%s is being fetched by another thread' % layer) + time.sleep(0.5) + LOG.debug('Starting acquire for lock %s' % layer) + with cls.uploader_lock: + if layer in cls.uploader_lock_info: + LOG.debug('Collision for lock %s' % layer) + raise ImageUploaderThreadException('layer conflict') + LOG.debug('Acquired for lock %s' % layer) + cls.uploader_lock_info.add(layer) + LOG.debug('Updated lock info %s' % layer) + LOG.debug('Got lock on layer %s' % layer) + + @classmethod + def _layer_fetch_unlock(cls, layer): + LOG.debug('Unlocking layer %s' % layer) + LOG.debug('Starting acquire for lock %s' % layer) + with cls.uploader_lock: + LOG.debug('Acquired for unlock %s' % layer) + if layer in cls.uploader_lock_info: + cls.uploader_lock_info.remove(layer) + LOG.debug('Updated lock info %s' % layer) + LOG.debug('Released lock on layer %s' % layer) + def upload_image(self, task): """Upload image from a task @@ -1388,18 +1425,35 @@ class PythonImageUploader(BaseImageUploader): source_session=None, target_session=None): layer_entry = {'digest': layer} - if cls._target_layer_exists_registry(target_url, layer_entry, - [layer_entry], target_session): - return + cls._layer_fetch_lock(layer) + try: + if cls._target_layer_exists_registry( + target_url, layer_entry, [layer_entry], target_session): + cls._layer_fetch_unlock(layer) + return + except ImageUploaderThreadException: + # skip trying to unlock, because that's what threw the exception + raise + except Exception: + cls._layer_fetch_unlock(layer) + raise digest = layer_entry['digest'] LOG.debug('Uploading layer: %s' % digest) calc_digest = hashlib.sha256() - layer_stream = cls._layer_stream_registry( - digest, source_url, calc_digest, source_session) - return cls._copy_stream_to_registry( - target_url, layer_entry, calc_digest, layer_stream, target_session) + try: + layer_stream = cls._layer_stream_registry( + digest, source_url, calc_digest, source_session) + layer_val = cls._copy_stream_to_registry( + target_url, layer_entry, calc_digest, layer_stream, + target_session) + except Exception: + raise + else: + return layer_val + finally: + cls._layer_fetch_unlock(layer) @classmethod def _assert_scheme(cls, url, scheme):