Make executor type dynamic

When we run the tripleo-container-image-prepare script, it performs
better under python2 when the process leverages a ProcessPoolExecutor.
Rather than using threading, we should be using processes to handle the
image upload processing. Currently when we're processing the images, we
end up being single threaded due to the GIL when processing the data. By
switching to the ProcessPoolExecutor, we eliminate the locking that is
occuring during the data processing as it'll be handled in each process.

Unfortunately, we cannot leverage the ProcessPoolExecutor when the same
code is run under Mistral. In order to make the code work for both
methods, we need to make the execution type dynamic. This change creates
two types of lock objects that are used to determine what type of
executor to ultimately use when processing the images for uploading.

Additionally this change limits the number of concurrent image upload
processes to 4 if using the ProcessPoolExecutor and caps the number of
threads at a max of 8 based on (cpu count / 2)

 Conflicts:
	tripleo_common/image/image_uploader.py

Change-Id: I60507eba9884a0660fe269da5ad27b0e57a70ca8
Related-Bug: #1844446
(cherry picked from commit 60afc0eec4)
This commit is contained in:
Alex Schultz 2019-10-02 09:05:58 -06:00 committed by Emilien Macchi
parent f91ffa55e5
commit 7994173916
9 changed files with 183 additions and 55 deletions

View File

@ -22,6 +22,7 @@ import sys
from tripleo_common import constants
from tripleo_common.image import image_uploader
from tripleo_common.image import kolla_builder
from tripleo_common.utils.locks import processlock
import yaml
@ -130,8 +131,10 @@ if __name__ == '__main__':
env = yaml.safe_load(f)
try:
lock = processlock.ProcessLock()
params = kolla_builder.container_images_prepare_multi(
env, roles_data, cleanup=args.cleanup, dry_run=args.dry_run)
env, roles_data, cleanup=args.cleanup, dry_run=args.dry_run,
lock=lock)
result = yaml.safe_dump(params, default_flow_style=False)
log.info(result)
print(result)

View File

@ -30,7 +30,6 @@ from six.moves.urllib import parse
import subprocess
import tempfile
import tenacity
import threading
import yaml
from oslo_concurrency import processutils
@ -41,6 +40,7 @@ from tripleo_common.image.exception import ImageNotFoundException
from tripleo_common.image.exception import ImageUploaderException
from tripleo_common.image.exception import ImageUploaderThreadException
from tripleo_common.image import image_export
from tripleo_common.utils.locks import threadinglock
LOG = logging.getLogger(__name__)
@ -148,13 +148,13 @@ class ImageUploadManager(BaseImageManager):
def __init__(self, config_files=None,
dry_run=False, cleanup=CLEANUP_FULL,
mirrors=None, registry_credentials=None,
multi_arch=False):
multi_arch=False, lock=None):
if config_files is None:
config_files = []
super(ImageUploadManager, self).__init__(config_files)
self.uploaders = {
'skopeo': SkopeoImageUploader(),
'python': PythonImageUploader()
'python': PythonImageUploader(lock)
}
self.dry_run = dry_run
self.cleanup = cleanup
@ -167,6 +167,7 @@ class ImageUploadManager(BaseImageManager):
for uploader in self.uploaders.values():
uploader.registry_credentials = registry_credentials
self.multi_arch = multi_arch
self.lock = lock
@staticmethod
def validate_registry_credentials(creds_data):
@ -239,7 +240,7 @@ class ImageUploadManager(BaseImageManager):
tasks.append(UploadTask(
image_name, pull_source, push_destination,
append_tag, modify_role, modify_vars, self.dry_run,
self.cleanup, multi_arch))
self.cleanup, multi_arch, self.lock))
# NOTE(mwhahaha): We want to randomize the upload process because of
# the shared nature of container layers. Because we multiprocess the
@ -271,12 +272,13 @@ class BaseImageUploader(object):
export_registries = set()
push_registries = set()
def __init__(self):
def __init__(self, lock=None):
self.upload_tasks = []
# A mapping of layer hashs to the image which first copied that
# layer to the target
self.image_layers = {}
self.registry_credentials = {}
self.lock = lock
@classmethod
def init_registries_cache(cls):
@ -1114,39 +1116,42 @@ class SkopeoImageUploader(BaseImageUploader):
class PythonImageUploader(BaseImageUploader):
"""Upload images using a direct implementation of the registry API"""
uploader_lock = threading.Lock()
uploader_lock_info = set()
@classmethod
@tenacity.retry( # Retry until we no longer have collisions
retry=tenacity.retry_if_exception_type(ImageUploaderThreadException),
wait=tenacity.wait_random_exponential(multiplier=1, max=10)
)
def _layer_fetch_lock(cls, layer):
if layer in cls.uploader_lock_info:
def _layer_fetch_lock(cls, layer, lock=None):
if not lock:
LOG.warning('No lock information provided for layer %s' % layer)
return
if layer in lock.objects():
LOG.debug('[%s] Layer is being fetched by another thread' % layer)
raise ImageUploaderThreadException('layer being fetched')
LOG.debug('[%s] Locking layer' % layer)
LOG.debug('[%s] Starting acquire for lock' % layer)
with cls.uploader_lock:
if layer in cls.uploader_lock_info:
LOG.debug('[%s] Collision for lock' % layer)
LOG.debug('Locking layer %s' % layer)
LOG.debug('Starting acquire for lock %s' % layer)
with lock.get_lock():
if layer in lock.objects():
LOG.debug('Collision for lock %s' % layer)
raise ImageUploaderThreadException('layer conflict')
LOG.debug('[%s] Acquired for lock' % layer)
cls.uploader_lock_info.add(layer)
LOG.debug('[%s] Updated lock info' % layer)
LOG.debug('[%s] Got lock on layer' % layer)
LOG.debug('Acquired for lock %s' % layer)
lock.objects().append(layer)
LOG.debug('Updated lock info %s' % layer)
LOG.debug('Got lock on layer %s' % layer)
@classmethod
def _layer_fetch_unlock(cls, layer):
LOG.debug('[%s] Unlocking layer' % layer)
LOG.debug('[%s] Starting acquire for lock' % layer)
with cls.uploader_lock:
LOG.debug('[%s] Acquired for unlock' % layer)
if layer in cls.uploader_lock_info:
cls.uploader_lock_info.remove(layer)
LOG.debug('[%s] Updated lock info' % layer)
LOG.debug('[%s] Released lock on layer' % layer)
def _layer_fetch_unlock(cls, layer, lock=None):
if not lock:
LOG.warning('No lock information provided for layer %s' % layer)
return
LOG.debug('Unlocking layer %s' % layer)
LOG.debug('Starting acquire for lock %s' % layer)
with lock.get_lock():
LOG.debug('Acquired for unlock %s' % layer)
if layer in lock.objects():
lock.objects().remove(layer)
LOG.debug('Updated lock info %s' % layer)
LOG.debug('Released lock on layer %s' % layer)
def upload_image(self, task):
"""Upload image from a task
@ -1172,6 +1177,8 @@ class PythonImageUploader(BaseImageUploader):
if t.dry_run:
return []
lock = t.lock
target_username, target_password = self.credentials_for_registry(
t.target_image_url.netloc)
target_session = self.authenticate(
@ -1262,7 +1269,8 @@ class PythonImageUploader(BaseImageUploader):
source_session=source_session,
target_session=target_session,
source_layers=source_layers,
multi_arch=t.multi_arch
multi_arch=t.multi_arch,
lock=lock
)
except Exception:
LOG.error('[%s] Failed uploading the target '
@ -1478,19 +1486,20 @@ class PythonImageUploader(BaseImageUploader):
def _copy_layer_registry_to_registry(cls, source_url, target_url,
layer,
source_session=None,
target_session=None):
target_session=None,
lock=None):
layer_entry = {'digest': layer}
cls._layer_fetch_lock(layer)
try:
if cls._target_layer_exists_registry(
target_url, layer_entry, [layer_entry], target_session):
cls._layer_fetch_unlock(layer)
cls._layer_fetch_unlock(layer, lock)
return
except ImageUploaderThreadException:
# skip trying to unlock, because that's what threw the exception
raise
except Exception:
cls._layer_fetch_unlock(layer)
cls._layer_fetch_unlock(layer, lock)
raise
digest = layer_entry['digest']
@ -1508,7 +1517,7 @@ class PythonImageUploader(BaseImageUploader):
else:
return layer_val
finally:
cls._layer_fetch_unlock(layer)
cls._layer_fetch_unlock(layer, lock)
@classmethod
def _assert_scheme(cls, url, scheme):
@ -1530,7 +1539,8 @@ class PythonImageUploader(BaseImageUploader):
source_session=None,
target_session=None,
source_layers=None,
multi_arch=False):
multi_arch=False,
lock=None):
cls._assert_scheme(source_url, 'docker')
cls._assert_scheme(target_url, 'docker')
@ -1552,7 +1562,8 @@ class PythonImageUploader(BaseImageUploader):
source_url, target_url,
layer=layer,
source_session=source_session,
target_session=target_session
target_session=target_session,
lock=lock
))
jobs_count = len(copy_jobs)
@ -2053,6 +2064,24 @@ class PythonImageUploader(BaseImageUploader):
image_url = parse.urlparse('containers-storage:%s' % image)
self._delete(image_url)
def _get_executor(self):
"""Get executor type based on lock object
We check to see if the lock object is not set or if it is a threading
lock. We cannot check if it is a ProcessLock due to the side effect
of trying to include ProcessLock when running under Mistral breaks
Mistral.
"""
if not self.lock or isinstance(self.lock, threadinglock.ThreadingLock):
# workers will scale from 2 to 8 based on the cpu count // 2
workers = min(max(2, processutils.get_worker_count() // 2), 8)
return futures.ThreadPoolExecutor(max_workers=workers)
else:
# there really isn't an improvement with > 4 workers due to the
# container layer overlaps. The higher the workers, the more
# RAM required which can lead to OOMs. It's best to limit to 4
return futures.ProcessPoolExecutor(max_workers=4)
def run_tasks(self):
if not self.upload_tasks:
return
@ -2062,9 +2091,7 @@ class PythonImageUploader(BaseImageUploader):
# same base layers
local_images.extend(upload_task(args=self.upload_tasks.pop()))
# workers will be half the CPU, with a minimum of 2
workers = max(2, processutils.get_worker_count() // 2)
with futures.ThreadPoolExecutor(max_workers=workers) as p:
with self._get_executor() as p:
for result in p.map(upload_task, self.upload_tasks):
local_images.extend(result)
LOG.info('result %s' % local_images)
@ -2078,7 +2105,7 @@ class UploadTask(object):
def __init__(self, image_name, pull_source, push_destination,
append_tag, modify_role, modify_vars, dry_run, cleanup,
multi_arch):
multi_arch, lock=None):
self.image_name = image_name
self.pull_source = pull_source
self.push_destination = push_destination
@ -2088,6 +2115,7 @@ class UploadTask(object):
self.dry_run = dry_run
self.cleanup = cleanup
self.multi_arch = multi_arch
self.lock = lock
if ':' in image_name:
image = image_name.rpartition(':')[0]

View File

@ -26,6 +26,7 @@ import yaml
from tripleo_common.image import base
from tripleo_common.image import image_uploader
from tripleo_common.utils.locks import threadinglock
CONTAINER_IMAGE_PREPARE_PARAM_STR = None
@ -131,7 +132,8 @@ def set_neutron_driver(pd, mapping_args):
def container_images_prepare_multi(environment, roles_data, dry_run=False,
cleanup=image_uploader.CLEANUP_FULL):
cleanup=image_uploader.CLEANUP_FULL,
lock=None):
"""Perform multiple container image prepares and merge result
Given the full heat environment and roles data, perform multiple image
@ -142,10 +144,14 @@ def container_images_prepare_multi(environment, roles_data, dry_run=False,
:param environment: Heat environment for deployment
:param roles_data: Roles file data used to filter services
:param lock: a locking object to use when handling uploads
:returns: dict containing merged container image parameters from all
prepare operations
"""
if not lock:
lock = threadinglock.ThreadingLock()
pd = environment.get('parameter_defaults', {})
cip = pd.get('ContainerImagePrepare')
if not cip:
@ -200,7 +206,8 @@ def container_images_prepare_multi(environment, roles_data, dry_run=False,
modify_only_with_labels=modify_only_with_labels,
mirrors=mirrors,
registry_credentials=creds,
multi_arch=multi_arch
multi_arch=multi_arch,
lock=lock
)
env_params.update(prepare_data['image_params'])
@ -215,7 +222,8 @@ def container_images_prepare_multi(environment, roles_data, dry_run=False,
cleanup=cleanup,
mirrors=mirrors,
registry_credentials=creds,
multi_arch=multi_arch
multi_arch=multi_arch,
lock=lock
)
uploader.upload()
return env_params
@ -239,7 +247,7 @@ def container_images_prepare(template_file=DEFAULT_TEMPLATE_FILE,
append_tag=None, modify_role=None,
modify_vars=None, modify_only_with_labels=None,
mirrors=None, registry_credentials=None,
multi_arch=False):
multi_arch=False, lock=None):
"""Perform container image preparation
:param template_file: path to Jinja2 file containing all image entries
@ -273,6 +281,8 @@ def container_images_prepare(template_file=DEFAULT_TEMPLATE_FILE,
value.
:param multi_arch: boolean whether to prepare every architecture of
each image
:param lock: a locking object to use when handling uploads
:returns: dict with entries for the supplied output_env_file or
output_images_file
"""
@ -280,6 +290,9 @@ def container_images_prepare(template_file=DEFAULT_TEMPLATE_FILE,
if mapping_args is None:
mapping_args = {}
if not lock:
lock = threadinglock.ThreadingLock()
def ffunc(entry):
imagename = entry.get('imagename', '')
if service_filter is not None:
@ -305,7 +318,8 @@ def container_images_prepare(template_file=DEFAULT_TEMPLATE_FILE,
manager = image_uploader.ImageUploadManager(
mirrors=mirrors,
registry_credentials=registry_credentials,
multi_arch=multi_arch
multi_arch=multi_arch,
lock=lock
)
uploader = manager.uploader('python')
images = [i.get('imagename', '') for i in result]

View File

@ -1252,7 +1252,8 @@ class TestPythonImageUploader(base.TestCase):
source_session=source_session,
target_session=target_session,
source_layers=['sha256:aaa', 'sha256:bbb', 'sha256:ccc'],
multi_arch=False
multi_arch=False,
lock=None
)
@mock.patch('tripleo_common.image.image_uploader.'
@ -1486,7 +1487,8 @@ class TestPythonImageUploader(base.TestCase):
source_session=source_session,
target_session=target_session,
source_layers=['sha256:aaa', 'sha256:bbb', 'sha256:ccc'],
multi_arch=False
multi_arch=False,
lock=None
)
@mock.patch('tripleo_common.image.image_uploader.'
@ -1617,7 +1619,8 @@ class TestPythonImageUploader(base.TestCase):
source_session=source_session,
target_session=target_session,
source_layers=['sha256:aaa', 'sha256:bbb', 'sha256:ccc'],
multi_arch=False
multi_arch=False,
lock=None
)
_copy_registry_to_local.assert_called_once_with(unmodified_target_url)
run_modify_playbook.assert_called_once_with(

View File

@ -1026,6 +1026,7 @@ class TestPrepare(base.TestCase):
@mock.patch('tripleo_common.image.image_uploader.ImageUploadManager',
autospec=True)
def test_container_images_prepare_multi(self, mock_im, mock_cip):
mock_lock = mock.MagicMock()
mapping_args = {
'namespace': 't',
'name_prefix': '',
@ -1079,7 +1080,8 @@ class TestPrepare(base.TestCase):
},
]
image_params = kb.container_images_prepare_multi(env, roles_data)
image_params = kb.container_images_prepare_multi(env, roles_data,
lock=mock_lock)
mock_cip.assert_has_calls([
mock.call(
@ -1102,7 +1104,8 @@ class TestPrepare(base.TestCase):
registry_credentials={
'docker.io': {'my_username': 'my_password'}
},
multi_arch=False
multi_arch=False,
lock=mock_lock
),
mock.call(
excludes=['nova', 'neutron'],
@ -1124,7 +1127,8 @@ class TestPrepare(base.TestCase):
registry_credentials={
'docker.io': {'my_username': 'my_password'}
},
multi_arch=False
multi_arch=False,
lock=mock_lock
)
])
@ -1144,6 +1148,7 @@ class TestPrepare(base.TestCase):
@mock.patch('tripleo_common.image.image_uploader.ImageUploadManager',
autospec=True)
def test_container_images_prepare_multi_dry_run(self, mock_im, mock_cip):
mock_lock = mock.MagicMock()
mapping_args = {
'namespace': 't',
'name_prefix': '',
@ -1192,7 +1197,8 @@ class TestPrepare(base.TestCase):
},
]
image_params = kb.container_images_prepare_multi(env, roles_data, True)
image_params = kb.container_images_prepare_multi(env, roles_data, True,
lock=mock_lock)
mock_cip.assert_has_calls([
mock.call(
@ -1211,7 +1217,8 @@ class TestPrepare(base.TestCase):
modify_vars=None,
mirrors={},
registry_credentials=None,
multi_arch=False
multi_arch=False,
lock=mock_lock
),
mock.call(
excludes=['nova', 'neutron'],
@ -1229,13 +1236,14 @@ class TestPrepare(base.TestCase):
modify_vars={'foo_version': '1.0.1'},
mirrors={},
registry_credentials=None,
multi_arch=False
multi_arch=False,
lock=mock_lock
)
])
mock_im.assert_called_once_with(mock.ANY, dry_run=True, cleanup='full',
mirrors={}, registry_credentials=None,
multi_arch=False)
multi_arch=False, lock=mock_lock)
self.assertEqual(
{

View File

View File

@ -0,0 +1,21 @@
# Copyright 2019 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
class BaseLock(object):
def get_lock(self):
return self._lock
def objects(self):
return self._objects

View File

@ -0,0 +1,29 @@
# Copyright 2019 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# NOTE(mwhahaha): this class cannot be imported under Mistral because the
# multiprocessor.Manager inclusion breaks things due to the service launching
# to handle the multiprocess work.
import multiprocessing
from tripleo_common.utils.locks import base
class ProcessLock(base.BaseLock):
# the manager cannot live in __init__
_mgr = multiprocessing.Manager()
def __init__(self):
self._lock = self._mgr.Lock()
self._objects = self._mgr.list()

View File

@ -0,0 +1,22 @@
# Copyright 2019 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import threading
from tripleo_common.utils.locks import base
class ThreadingLock(base.BaseLock):
def __init__(self):
self._lock = threading.Lock()
self._objects = set()