Improve ThreadPoolExecutor usage

This change addresses a few different issues with out ThreadPoolExecutor
usage.

Previously we were inefficiently looping on the job results which was a
blocking call and not asynchronous. This change switched the threadpool
usage to use as_completed so we'll handle jobs as they complete rather
than blocking until the the specific job we're looking at completes.

Additionally this switches to use a with statement for the executor so
the threads get cleaned up correctly when we're done with the block.

See example comments:
https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor-example

The _inspect call was using a thread pool executor to run session calls
but was essentially a serial executing function so it has been updated
to remove the executor and just call the requests as needed.

Change-Id: Ia7abec997f4e503e1a2db82e05d4fe6e8696defc
Related-Bug: #1844446
This commit is contained in:
Alex Schultz 2019-09-24 12:04:42 -06:00
parent 813ccbcdfb
commit a1d89c7d63
2 changed files with 67 additions and 69 deletions

View File

@ -582,18 +582,14 @@ class BaseImageUploader(object):
)
manifest_headers = {'Accept': MEDIA_MANIFEST_V2}
p = futures.ThreadPoolExecutor(max_workers=2)
manifest_f = p.submit(
session.get, manifest_url, headers=manifest_headers, timeout=30)
tags_f = p.submit(session.get, tags_url, timeout=30)
manifest_r = manifest_f.result()
manifest_r = session.get(manifest_url, headers=manifest_headers,
timeout=30)
if manifest_r.status_code in (403, 404):
raise ImageNotFoundException('Not found image: %s' %
image_url.geturl())
cls.check_status(session=session, request=manifest_r)
tags_r = tags_f.result()
tags_r = session.get(tags_url, timeout=30)
cls.check_status(session=session, request=tags_r)
manifest_str = cls._get_response_text(manifest_r)
@ -621,9 +617,8 @@ class BaseImageUploader(object):
}
config_url = cls._build_url(
image_url, CALL_BLOB % parts)
config_f = p.submit(
session.get, config_url, headers=config_headers, timeout=30)
config_r = config_f.result()
config_r = session.get(config_url, headers=config_headers,
timeout=30)
cls.check_status(session=session, request=config_r)
config = config_r.json()
@ -660,7 +655,9 @@ class BaseImageUploader(object):
if catalog_resp.status_code in [200]:
catalog = catalog_resp.json()
elif catalog_resp.status_code in [404]:
catalog = {}
# just return since the catalog returned a 404
LOG.debug('catalog_url return 404')
return []
else:
raise ImageUploaderException(
'Image registry made invalid response: %s' %
@ -671,14 +668,14 @@ class BaseImageUploader(object):
for repo in catalog.get('repositories', []):
image = '%s/%s' % (registry, repo)
tags_get_args.append((self, image, session))
p = futures.ThreadPoolExecutor(max_workers=16)
images = []
for image, tags in p.map(tags_for_image, tags_get_args):
if not tags:
continue
for tag in tags:
images.append('%s:%s' % (image, tag))
with futures.ThreadPoolExecutor(max_workers=16) as p:
for image, tags in p.map(tags_for_image, tags_get_args):
if not tags:
continue
for tag in tags:
images.append('%s:%s' % (image, tag))
return images
def inspect(self, image, session=None):
@ -783,12 +780,12 @@ class BaseImageUploader(object):
discover_args = []
for image in images:
discover_args.append((self, image, tag_from_label))
p = futures.ThreadPoolExecutor(max_workers=16)
versioned_images = {}
for image, versioned_image in p.map(discover_tag_from_inspect,
discover_args):
versioned_images[image] = versioned_image
with futures.ThreadPoolExecutor(max_workers=16) as p:
for image, versioned_image in p.map(discover_tag_from_inspect,
discover_args):
versioned_images[image] = versioned_image
return versioned_images
def discover_image_tag(self, image, tag_from_label=None,
@ -1071,9 +1068,9 @@ class SkopeoImageUploader(BaseImageUploader):
# workers will be half the CPU count, to a minimum of 2
workers = max(2, (processutils.get_worker_count() - 1))
p = futures.ThreadPoolExecutor(max_workers=workers)
for result in p.map(upload_task, self.upload_tasks):
local_images.extend(result)
with futures.ThreadPoolExecutor(max_workers=workers) as p:
for result in p.map(upload_task, self.upload_tasks):
local_images.extend(result)
LOG.info('result %s' % local_images)
# Do cleanup after all the uploads so common layers don't get deleted
@ -1417,23 +1414,23 @@ class PythonImageUploader(BaseImageUploader):
# Upload all layers
copy_jobs = []
p = futures.ThreadPoolExecutor(max_workers=4)
if source_layers:
for layer in source_layers:
copy_jobs.append(p.submit(
cls._copy_layer_registry_to_registry,
source_url, target_url,
layer=layer,
source_session=source_session,
target_session=target_session
))
for job in copy_jobs:
e = job.exception()
if e:
raise e
image = job.result()
if image:
LOG.debug('Upload complete for layer: %s' % image)
with futures.ThreadPoolExecutor(max_workers=4) as p:
if source_layers:
for layer in source_layers:
copy_jobs.append(p.submit(
cls._copy_layer_registry_to_registry,
source_url, target_url,
layer=layer,
source_session=source_session,
target_session=target_session
))
for job in futures.as_completed(copy_jobs):
e = job.exception()
if e:
raise e
image = job.result()
if image:
LOG.debug('Upload complete for layer: %s' % image)
for source_manifest in source_manifests:
manifest = json.loads(source_manifest)
@ -1759,21 +1756,20 @@ class PythonImageUploader(BaseImageUploader):
# Upload all layers
copy_jobs = []
p = futures.ThreadPoolExecutor(max_workers=4)
for layer in manifest['layers']:
layer_entry = layers_by_digest[layer['digest']]
copy_jobs.append(p.submit(
cls._copy_layer_local_to_registry,
target_url, session, layer, layer_entry
))
for job in copy_jobs:
e = job.exception()
if e:
raise e
image = job.result()
if image:
LOG.debug('Upload complete for layer: %s' % image)
with futures.ThreadPoolExecutor(max_workers=4) as p:
for layer in manifest['layers']:
layer_entry = layers_by_digest[layer['digest']]
copy_jobs.append(p.submit(
cls._copy_layer_local_to_registry,
target_url, session, layer, layer_entry
))
for job in futures.as_completed(copy_jobs):
e = job.exception()
if e:
raise e
image = job.result()
if image:
LOG.debug('Upload complete for layer: %s' % image)
manifest_str = json.dumps(manifest, indent=3)
cls._copy_manifest_config_to_registry(
@ -1909,12 +1905,12 @@ class PythonImageUploader(BaseImageUploader):
# same base layers
local_images.extend(upload_task(args=self.upload_tasks.pop()))
# workers will the CPU count minus 1, with a minimum of 2
workers = max(2, (processutils.get_worker_count() - 1))
p = futures.ThreadPoolExecutor(max_workers=workers)
for result in p.map(upload_task, self.upload_tasks):
local_images.extend(result)
LOG.info('result %s' % local_images)
# workers will be half the CPU, with a minimum of 2
workers = max(2, processutils.get_worker_count() // 2)
with futures.ThreadPoolExecutor(max_workers=workers) as p:
for result in p.map(upload_task, self.upload_tasks):
local_images.extend(result)
LOG.info('result %s' % local_images)
# Do cleanup after all the uploads so common layers don't get deleted
# repeatedly

View File

@ -479,11 +479,13 @@ class TestBaseImageUploader(base.TestCase):
@mock.patch('concurrent.futures.ThreadPoolExecutor')
def test_discover_image_tags(self, mock_pool):
mock_pool.return_value.map.return_value = (
mock_map = mock.Mock()
mock_map.return_value = (
('docker.io/t/foo', 'a'),
('docker.io/t/bar', 'b'),
('docker.io/t/baz', 'c')
)
mock_pool.return_value.__enter__.return_value.map = mock_map
images = [
'docker.io/t/foo',
'docker.io/t/bar',
@ -497,7 +499,7 @@ class TestBaseImageUploader(base.TestCase):
},
self.uploader.discover_image_tags(images, 'rdo_release')
)
mock_pool.return_value.map.assert_called_once_with(
mock_map.assert_called_once_with(
image_uploader.discover_tag_from_inspect,
[
(self.uploader, 'docker.io/t/foo', 'rdo_release'),
@ -832,12 +834,14 @@ class TestBaseImageUploader(base.TestCase):
@mock.patch('concurrent.futures.ThreadPoolExecutor')
def test_list(self, mock_pool):
mock_pool.return_value.map.return_value = (
mock_map = mock.Mock()
mock_map.return_value = (
('localhost:8787/t/foo', ['a']),
('localhost:8787/t/bar', ['b']),
('localhost:8787/t/baz', ['c', 'd']),
('localhost:8787/t/bink', [])
)
mock_pool.return_value.__enter__.return_value.map = mock_map
session = mock.Mock()
response = mock.Mock()
response.status_code = 200
@ -854,7 +858,7 @@ class TestBaseImageUploader(base.TestCase):
],
self.uploader.list('localhost:8787', session=session)
)
mock_pool.return_value.map.assert_called_once_with(
mock_map.assert_called_once_with(
image_uploader.tags_for_image,
[
(self.uploader, 'localhost:8787/t/foo', session),
@ -863,14 +867,12 @@ class TestBaseImageUploader(base.TestCase):
(self.uploader, 'localhost:8787/t/bink', session)
])
@mock.patch('concurrent.futures.ThreadPoolExecutor')
def test_list_404(self, mock_pool):
def test_list_404(self):
# setup bits
session = mock.Mock()
response = mock.Mock()
response.status_code = 404
session.get.return_value = response
mock_pool.return_value.map.return_value = ()
# execute function
return_val = self.uploader.list('localhost:8787', session=session)
# check status of things