diff --git a/scripts/tripleo-container-image-prepare b/scripts/tripleo-container-image-prepare index e04c35ee6..d7694e70c 100755 --- a/scripts/tripleo-container-image-prepare +++ b/scripts/tripleo-container-image-prepare @@ -22,6 +22,7 @@ import sys from tripleo_common import constants from tripleo_common.image import image_uploader from tripleo_common.image import kolla_builder +from tripleo_common.utils.locks import processlock import yaml @@ -131,8 +132,10 @@ if __name__ == '__main__': env = yaml.safe_load(f) try: + lock = processlock.ProcessLock() params = kolla_builder.container_images_prepare_multi( - env, roles_data, cleanup=args.cleanup, dry_run=args.dry_run) + env, roles_data, cleanup=args.cleanup, dry_run=args.dry_run, + lock=lock) result = yaml.safe_dump(params, default_flow_style=False) log.info(result) print(result) diff --git a/tripleo_common/image/image_uploader.py b/tripleo_common/image/image_uploader.py index 8ce368b46..f76958c40 100644 --- a/tripleo_common/image/image_uploader.py +++ b/tripleo_common/image/image_uploader.py @@ -30,7 +30,6 @@ from six.moves.urllib import parse import subprocess import tempfile import tenacity -import threading import yaml from oslo_concurrency import processutils @@ -42,6 +41,7 @@ from tripleo_common.image.exception import ImageUploaderException from tripleo_common.image.exception import ImageUploaderThreadException from tripleo_common.image import image_export from tripleo_common.utils import common as common_utils +from tripleo_common.utils.locks import threadinglock LOG = logging.getLogger(__name__) @@ -152,13 +152,13 @@ class ImageUploadManager(BaseImageManager): def __init__(self, config_files=None, dry_run=False, cleanup=CLEANUP_FULL, mirrors=None, registry_credentials=None, - multi_arch=False): + multi_arch=False, lock=None): if config_files is None: config_files = [] super(ImageUploadManager, self).__init__(config_files) self.uploaders = { 'skopeo': SkopeoImageUploader(), - 'python': PythonImageUploader() + 'python': PythonImageUploader(lock) } self.dry_run = dry_run self.cleanup = cleanup @@ -171,6 +171,7 @@ class ImageUploadManager(BaseImageManager): for uploader in self.uploaders.values(): uploader.registry_credentials = registry_credentials self.multi_arch = multi_arch + self.lock = lock @staticmethod def validate_registry_credentials(creds_data): @@ -243,7 +244,7 @@ class ImageUploadManager(BaseImageManager): tasks.append(UploadTask( image_name, pull_source, push_destination, append_tag, modify_role, modify_vars, self.dry_run, - self.cleanup, multi_arch)) + self.cleanup, multi_arch, self.lock)) # NOTE(mwhahaha): We want to randomize the upload process because of # the shared nature of container layers. Because we multiprocess the @@ -275,12 +276,13 @@ class BaseImageUploader(object): export_registries = set() push_registries = set() - def __init__(self): + def __init__(self, lock=None): self.upload_tasks = [] # A mapping of layer hashs to the image which first copied that # layer to the target self.image_layers = {} self.registry_credentials = {} + self.lock = lock @classmethod def init_registries_cache(cls): @@ -1118,39 +1120,42 @@ class SkopeoImageUploader(BaseImageUploader): class PythonImageUploader(BaseImageUploader): """Upload images using a direct implementation of the registry API""" - uploader_lock = threading.Lock() - uploader_lock_info = set() - @classmethod @tenacity.retry( # Retry until we no longer have collisions retry=tenacity.retry_if_exception_type(ImageUploaderThreadException), wait=tenacity.wait_random_exponential(multiplier=1, max=10) ) - def _layer_fetch_lock(cls, layer): - if layer in cls.uploader_lock_info: + def _layer_fetch_lock(cls, layer, lock=None): + if not lock: + LOG.warning('No lock information provided for layer %s' % layer) + return + if layer in lock.objects(): LOG.debug('[%s] Layer is being fetched by another thread' % layer) raise ImageUploaderThreadException('layer being fetched') - LOG.debug('[%s] Locking layer' % layer) - LOG.debug('[%s] Starting acquire for lock' % layer) - with cls.uploader_lock: - if layer in cls.uploader_lock_info: - LOG.debug('[%s] Collision for lock' % layer) + LOG.debug('Locking layer %s' % layer) + LOG.debug('Starting acquire for lock %s' % layer) + with lock.get_lock(): + if layer in lock.objects(): + LOG.debug('Collision for lock %s' % layer) raise ImageUploaderThreadException('layer conflict') - LOG.debug('[%s] Acquired for lock' % layer) - cls.uploader_lock_info.add(layer) - LOG.debug('[%s] Updated lock info' % layer) - LOG.debug('[%s] Got lock on layer' % layer) + LOG.debug('Acquired for lock %s' % layer) + lock.objects().append(layer) + LOG.debug('Updated lock info %s' % layer) + LOG.debug('Got lock on layer %s' % layer) @classmethod - def _layer_fetch_unlock(cls, layer): - LOG.debug('[%s] Unlocking layer' % layer) - LOG.debug('[%s] Starting acquire for lock' % layer) - with cls.uploader_lock: - LOG.debug('[%s] Acquired for unlock' % layer) - if layer in cls.uploader_lock_info: - cls.uploader_lock_info.remove(layer) - LOG.debug('[%s] Updated lock info' % layer) - LOG.debug('[%s] Released lock on layer' % layer) + def _layer_fetch_unlock(cls, layer, lock=None): + if not lock: + LOG.warning('No lock information provided for layer %s' % layer) + return + LOG.debug('Unlocking layer %s' % layer) + LOG.debug('Starting acquire for lock %s' % layer) + with lock.get_lock(): + LOG.debug('Acquired for unlock %s' % layer) + if layer in lock.objects(): + lock.objects().remove(layer) + LOG.debug('Updated lock info %s' % layer) + LOG.debug('Released lock on layer %s' % layer) def upload_image(self, task): """Upload image from a task @@ -1176,6 +1181,8 @@ class PythonImageUploader(BaseImageUploader): if t.dry_run: return [] + lock = t.lock + target_username, target_password = self.credentials_for_registry( t.target_image_url.netloc) target_session = self.authenticate( @@ -1266,7 +1273,8 @@ class PythonImageUploader(BaseImageUploader): source_session=source_session, target_session=target_session, source_layers=source_layers, - multi_arch=t.multi_arch + multi_arch=t.multi_arch, + lock=lock ) except Exception: LOG.error('[%s] Failed uploading the target ' @@ -1482,19 +1490,20 @@ class PythonImageUploader(BaseImageUploader): def _copy_layer_registry_to_registry(cls, source_url, target_url, layer, source_session=None, - target_session=None): + target_session=None, + lock=None): layer_entry = {'digest': layer} try: cls._layer_fetch_lock(layer) if cls._target_layer_exists_registry( target_url, layer_entry, [layer_entry], target_session): - cls._layer_fetch_unlock(layer) + cls._layer_fetch_unlock(layer, lock) return except ImageUploaderThreadException: # skip trying to unlock, because that's what threw the exception raise except Exception: - cls._layer_fetch_unlock(layer) + cls._layer_fetch_unlock(layer, lock) raise digest = layer_entry['digest'] @@ -1512,7 +1521,7 @@ class PythonImageUploader(BaseImageUploader): else: return layer_val finally: - cls._layer_fetch_unlock(layer) + cls._layer_fetch_unlock(layer, lock) @classmethod def _assert_scheme(cls, url, scheme): @@ -1534,7 +1543,8 @@ class PythonImageUploader(BaseImageUploader): source_session=None, target_session=None, source_layers=None, - multi_arch=False): + multi_arch=False, + lock=None): cls._assert_scheme(source_url, 'docker') cls._assert_scheme(target_url, 'docker') @@ -1556,7 +1566,8 @@ class PythonImageUploader(BaseImageUploader): source_url, target_url, layer=layer, source_session=source_session, - target_session=target_session + target_session=target_session, + lock=lock )) jobs_count = len(copy_jobs) @@ -2051,6 +2062,24 @@ class PythonImageUploader(BaseImageUploader): image_url = parse.urlparse('containers-storage:%s' % image) self._delete(image_url) + def _get_executor(self): + """Get executor type based on lock object + + We check to see if the lock object is not set or if it is a threading + lock. We cannot check if it is a ProcessLock due to the side effect + of trying to include ProcessLock when running under Mistral breaks + Mistral. + """ + if not self.lock or isinstance(self.lock, threadinglock.ThreadingLock): + # workers will scale from 2 to 8 based on the cpu count // 2 + workers = min(max(2, processutils.get_worker_count() // 2), 8) + return futures.ThreadPoolExecutor(max_workers=workers) + else: + # there really isn't an improvement with > 4 workers due to the + # container layer overlaps. The higher the workers, the more + # RAM required which can lead to OOMs. It's best to limit to 4 + return futures.ProcessPoolExecutor(max_workers=4) + def run_tasks(self): if not self.upload_tasks: return @@ -2060,9 +2089,7 @@ class PythonImageUploader(BaseImageUploader): # same base layers local_images.extend(upload_task(args=self.upload_tasks.pop())) - # workers will be half the CPU, with a minimum of 2 - workers = max(2, processutils.get_worker_count() // 2) - with futures.ThreadPoolExecutor(max_workers=workers) as p: + with self._get_executor() as p: for result in p.map(upload_task, self.upload_tasks): local_images.extend(result) LOG.info('result %s' % local_images) @@ -2076,7 +2103,7 @@ class UploadTask(object): def __init__(self, image_name, pull_source, push_destination, append_tag, modify_role, modify_vars, dry_run, cleanup, - multi_arch): + multi_arch, lock=None): self.image_name = image_name self.pull_source = pull_source self.push_destination = push_destination @@ -2086,6 +2113,7 @@ class UploadTask(object): self.dry_run = dry_run self.cleanup = cleanup self.multi_arch = multi_arch + self.lock = lock if ':' in image_name: image = image_name.rpartition(':')[0] diff --git a/tripleo_common/image/kolla_builder.py b/tripleo_common/image/kolla_builder.py index d3b21ea68..0f9201998 100644 --- a/tripleo_common/image/kolla_builder.py +++ b/tripleo_common/image/kolla_builder.py @@ -27,6 +27,7 @@ from osc_lib.i18n import _ from oslo_log import log as logging from tripleo_common.image import base from tripleo_common.image import image_uploader +from tripleo_common.utils.locks import threadinglock CONTAINER_IMAGE_PREPARE_PARAM_STR = None @@ -135,7 +136,8 @@ def set_neutron_driver(pd, mapping_args): def container_images_prepare_multi(environment, roles_data, dry_run=False, - cleanup=image_uploader.CLEANUP_FULL): + cleanup=image_uploader.CLEANUP_FULL, + lock=None): """Perform multiple container image prepares and merge result Given the full heat environment and roles data, perform multiple image @@ -146,10 +148,14 @@ def container_images_prepare_multi(environment, roles_data, dry_run=False, :param environment: Heat environment for deployment :param roles_data: Roles file data used to filter services + :param lock: a locking object to use when handling uploads :returns: dict containing merged container image parameters from all prepare operations """ + if not lock: + lock = threadinglock.ThreadingLock() + pd = environment.get('parameter_defaults', {}) cip = pd.get('ContainerImagePrepare') # if user does not provide a ContainerImagePrepare, use the defaults. @@ -207,7 +213,8 @@ def container_images_prepare_multi(environment, roles_data, dry_run=False, modify_only_with_labels=modify_only_with_labels, mirrors=mirrors, registry_credentials=creds, - multi_arch=multi_arch + multi_arch=multi_arch, + lock=lock ) env_params.update(prepare_data['image_params']) @@ -222,7 +229,8 @@ def container_images_prepare_multi(environment, roles_data, dry_run=False, cleanup=cleanup, mirrors=mirrors, registry_credentials=creds, - multi_arch=multi_arch + multi_arch=multi_arch, + lock=lock ) uploader.upload() return env_params @@ -246,7 +254,7 @@ def container_images_prepare(template_file=DEFAULT_TEMPLATE_FILE, append_tag=None, modify_role=None, modify_vars=None, modify_only_with_labels=None, mirrors=None, registry_credentials=None, - multi_arch=False): + multi_arch=False, lock=None): """Perform container image preparation :param template_file: path to Jinja2 file containing all image entries @@ -280,6 +288,8 @@ def container_images_prepare(template_file=DEFAULT_TEMPLATE_FILE, value. :param multi_arch: boolean whether to prepare every architecture of each image + + :param lock: a locking object to use when handling uploads :returns: dict with entries for the supplied output_env_file or output_images_file """ @@ -287,6 +297,9 @@ def container_images_prepare(template_file=DEFAULT_TEMPLATE_FILE, if mapping_args is None: mapping_args = {} + if not lock: + lock = threadinglock.ThreadingLock() + def ffunc(entry): imagename = entry.get('imagename', '') if service_filter is not None: @@ -312,7 +325,8 @@ def container_images_prepare(template_file=DEFAULT_TEMPLATE_FILE, manager = image_uploader.ImageUploadManager( mirrors=mirrors, registry_credentials=registry_credentials, - multi_arch=multi_arch + multi_arch=multi_arch, + lock=lock ) uploader = manager.uploader('python') images = [i.get('imagename', '') for i in result] diff --git a/tripleo_common/tests/image/test_image_uploader.py b/tripleo_common/tests/image/test_image_uploader.py index b9cb446f1..3b0a628e5 100644 --- a/tripleo_common/tests/image/test_image_uploader.py +++ b/tripleo_common/tests/image/test_image_uploader.py @@ -1315,7 +1315,8 @@ class TestPythonImageUploader(base.TestCase): source_session=source_session, target_session=target_session, source_layers=['sha256:aaa', 'sha256:bbb', 'sha256:ccc'], - multi_arch=False + multi_arch=False, + lock=None ) @mock.patch('tripleo_common.image.image_uploader.' @@ -1549,7 +1550,8 @@ class TestPythonImageUploader(base.TestCase): source_session=source_session, target_session=target_session, source_layers=['sha256:aaa', 'sha256:bbb', 'sha256:ccc'], - multi_arch=False + multi_arch=False, + lock=None ) @mock.patch('tripleo_common.image.image_uploader.' @@ -1680,7 +1682,8 @@ class TestPythonImageUploader(base.TestCase): source_session=source_session, target_session=target_session, source_layers=['sha256:aaa', 'sha256:bbb', 'sha256:ccc'], - multi_arch=False + multi_arch=False, + lock=None ) _copy_registry_to_local.assert_called_once_with(unmodified_target_url) run_modify_playbook.assert_called_once_with( diff --git a/tripleo_common/tests/image/test_kolla_builder.py b/tripleo_common/tests/image/test_kolla_builder.py index 92bc673d1..64910379e 100644 --- a/tripleo_common/tests/image/test_kolla_builder.py +++ b/tripleo_common/tests/image/test_kolla_builder.py @@ -928,6 +928,7 @@ class TestPrepare(base.TestCase): @mock.patch('tripleo_common.image.image_uploader.ImageUploadManager', autospec=True) def test_container_images_prepare_multi(self, mock_im, mock_cip): + mock_lock = mock.MagicMock() mapping_args = { 'namespace': 't', 'name_prefix': '', @@ -981,7 +982,8 @@ class TestPrepare(base.TestCase): }, ] - image_params = kb.container_images_prepare_multi(env, roles_data) + image_params = kb.container_images_prepare_multi(env, roles_data, + lock=mock_lock) mock_cip.assert_has_calls([ mock.call( @@ -1004,7 +1006,8 @@ class TestPrepare(base.TestCase): registry_credentials={ 'docker.io': {'my_username': 'my_password'} }, - multi_arch=False + multi_arch=False, + lock=mock_lock ), mock.call( excludes=['nova', 'neutron'], @@ -1026,7 +1029,8 @@ class TestPrepare(base.TestCase): registry_credentials={ 'docker.io': {'my_username': 'my_password'} }, - multi_arch=False + multi_arch=False, + lock=mock_lock ) ]) @@ -1046,6 +1050,7 @@ class TestPrepare(base.TestCase): @mock.patch('tripleo_common.image.image_uploader.ImageUploadManager', autospec=True) def test_container_images_prepare_multi_dry_run(self, mock_im, mock_cip): + mock_lock = mock.MagicMock() mapping_args = { 'namespace': 't', 'name_prefix': '', @@ -1094,7 +1099,8 @@ class TestPrepare(base.TestCase): }, ] - image_params = kb.container_images_prepare_multi(env, roles_data, True) + image_params = kb.container_images_prepare_multi(env, roles_data, True, + lock=mock_lock) mock_cip.assert_has_calls([ mock.call( @@ -1113,7 +1119,8 @@ class TestPrepare(base.TestCase): modify_vars=None, mirrors={}, registry_credentials=None, - multi_arch=False + multi_arch=False, + lock=mock_lock ), mock.call( excludes=['nova', 'neutron'], @@ -1131,13 +1138,14 @@ class TestPrepare(base.TestCase): modify_vars={'foo_version': '1.0.1'}, mirrors={}, registry_credentials=None, - multi_arch=False + multi_arch=False, + lock=mock_lock ) ]) mock_im.assert_called_once_with(mock.ANY, dry_run=True, cleanup='full', mirrors={}, registry_credentials=None, - multi_arch=False) + multi_arch=False, lock=mock_lock) self.assertEqual( { diff --git a/tripleo_common/utils/locks/__init__.py b/tripleo_common/utils/locks/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tripleo_common/utils/locks/base.py b/tripleo_common/utils/locks/base.py new file mode 100644 index 000000000..e707edc6c --- /dev/null +++ b/tripleo_common/utils/locks/base.py @@ -0,0 +1,21 @@ +# Copyright 2019 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +class BaseLock(object): + def get_lock(self): + return self._lock + + def objects(self): + return self._objects diff --git a/tripleo_common/utils/locks/processlock.py b/tripleo_common/utils/locks/processlock.py new file mode 100644 index 000000000..c927eb0c2 --- /dev/null +++ b/tripleo_common/utils/locks/processlock.py @@ -0,0 +1,29 @@ +# Copyright 2019 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# NOTE(mwhahaha): this class cannot be imported under Mistral because the +# multiprocessor.Manager inclusion breaks things due to the service launching +# to handle the multiprocess work. + +import multiprocessing +from tripleo_common.utils.locks import base + + +class ProcessLock(base.BaseLock): + # the manager cannot live in __init__ + _mgr = multiprocessing.Manager() + + def __init__(self): + self._lock = self._mgr.Lock() + self._objects = self._mgr.list() diff --git a/tripleo_common/utils/locks/threadinglock.py b/tripleo_common/utils/locks/threadinglock.py new file mode 100644 index 000000000..14f582f3f --- /dev/null +++ b/tripleo_common/utils/locks/threadinglock.py @@ -0,0 +1,22 @@ +# Copyright 2019 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from tripleo_common.utils.locks import base +import threading + + +class ThreadingLock(base.BaseLock): + def __init__(self): + self._lock = threading.Lock() + self._objects = set()