Improve ThreadPoolExecutor usage
This change addresses a few different issues with out ThreadPoolExecutor usage. Previously we were inefficiently looping on the job results which was a blocking call and not asynchronous. This change switched the threadpool usage to use as_completed so we'll handle jobs as they complete rather than blocking until the the specific job we're looking at completes. Additionally this switches to use a with statement for the executor so the threads get cleaned up correctly when we're done with the block. See example comments: https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor-example The _inspect call was using a thread pool executor to run session calls but was essentially a serial executing function so it has been updated to remove the executor and just call the requests as needed. Change-Id: Ia7abec997f4e503e1a2db82e05d4fe6e8696defc Related-Bug: #1844446
This commit is contained in:
parent
813ccbcdfb
commit
a1d89c7d63
|
@ -582,18 +582,14 @@ class BaseImageUploader(object):
|
|||
)
|
||||
manifest_headers = {'Accept': MEDIA_MANIFEST_V2}
|
||||
|
||||
p = futures.ThreadPoolExecutor(max_workers=2)
|
||||
manifest_f = p.submit(
|
||||
session.get, manifest_url, headers=manifest_headers, timeout=30)
|
||||
tags_f = p.submit(session.get, tags_url, timeout=30)
|
||||
|
||||
manifest_r = manifest_f.result()
|
||||
manifest_r = session.get(manifest_url, headers=manifest_headers,
|
||||
timeout=30)
|
||||
if manifest_r.status_code in (403, 404):
|
||||
raise ImageNotFoundException('Not found image: %s' %
|
||||
image_url.geturl())
|
||||
cls.check_status(session=session, request=manifest_r)
|
||||
|
||||
tags_r = tags_f.result()
|
||||
tags_r = session.get(tags_url, timeout=30)
|
||||
cls.check_status(session=session, request=tags_r)
|
||||
|
||||
manifest_str = cls._get_response_text(manifest_r)
|
||||
|
@ -621,9 +617,8 @@ class BaseImageUploader(object):
|
|||
}
|
||||
config_url = cls._build_url(
|
||||
image_url, CALL_BLOB % parts)
|
||||
config_f = p.submit(
|
||||
session.get, config_url, headers=config_headers, timeout=30)
|
||||
config_r = config_f.result()
|
||||
config_r = session.get(config_url, headers=config_headers,
|
||||
timeout=30)
|
||||
cls.check_status(session=session, request=config_r)
|
||||
config = config_r.json()
|
||||
|
||||
|
@ -660,7 +655,9 @@ class BaseImageUploader(object):
|
|||
if catalog_resp.status_code in [200]:
|
||||
catalog = catalog_resp.json()
|
||||
elif catalog_resp.status_code in [404]:
|
||||
catalog = {}
|
||||
# just return since the catalog returned a 404
|
||||
LOG.debug('catalog_url return 404')
|
||||
return []
|
||||
else:
|
||||
raise ImageUploaderException(
|
||||
'Image registry made invalid response: %s' %
|
||||
|
@ -671,14 +668,14 @@ class BaseImageUploader(object):
|
|||
for repo in catalog.get('repositories', []):
|
||||
image = '%s/%s' % (registry, repo)
|
||||
tags_get_args.append((self, image, session))
|
||||
p = futures.ThreadPoolExecutor(max_workers=16)
|
||||
|
||||
images = []
|
||||
for image, tags in p.map(tags_for_image, tags_get_args):
|
||||
if not tags:
|
||||
continue
|
||||
for tag in tags:
|
||||
images.append('%s:%s' % (image, tag))
|
||||
with futures.ThreadPoolExecutor(max_workers=16) as p:
|
||||
for image, tags in p.map(tags_for_image, tags_get_args):
|
||||
if not tags:
|
||||
continue
|
||||
for tag in tags:
|
||||
images.append('%s:%s' % (image, tag))
|
||||
return images
|
||||
|
||||
def inspect(self, image, session=None):
|
||||
|
@ -783,12 +780,12 @@ class BaseImageUploader(object):
|
|||
discover_args = []
|
||||
for image in images:
|
||||
discover_args.append((self, image, tag_from_label))
|
||||
p = futures.ThreadPoolExecutor(max_workers=16)
|
||||
|
||||
versioned_images = {}
|
||||
for image, versioned_image in p.map(discover_tag_from_inspect,
|
||||
discover_args):
|
||||
versioned_images[image] = versioned_image
|
||||
with futures.ThreadPoolExecutor(max_workers=16) as p:
|
||||
for image, versioned_image in p.map(discover_tag_from_inspect,
|
||||
discover_args):
|
||||
versioned_images[image] = versioned_image
|
||||
return versioned_images
|
||||
|
||||
def discover_image_tag(self, image, tag_from_label=None,
|
||||
|
@ -1071,9 +1068,9 @@ class SkopeoImageUploader(BaseImageUploader):
|
|||
|
||||
# workers will be half the CPU count, to a minimum of 2
|
||||
workers = max(2, (processutils.get_worker_count() - 1))
|
||||
p = futures.ThreadPoolExecutor(max_workers=workers)
|
||||
for result in p.map(upload_task, self.upload_tasks):
|
||||
local_images.extend(result)
|
||||
with futures.ThreadPoolExecutor(max_workers=workers) as p:
|
||||
for result in p.map(upload_task, self.upload_tasks):
|
||||
local_images.extend(result)
|
||||
LOG.info('result %s' % local_images)
|
||||
|
||||
# Do cleanup after all the uploads so common layers don't get deleted
|
||||
|
@ -1417,23 +1414,23 @@ class PythonImageUploader(BaseImageUploader):
|
|||
|
||||
# Upload all layers
|
||||
copy_jobs = []
|
||||
p = futures.ThreadPoolExecutor(max_workers=4)
|
||||
if source_layers:
|
||||
for layer in source_layers:
|
||||
copy_jobs.append(p.submit(
|
||||
cls._copy_layer_registry_to_registry,
|
||||
source_url, target_url,
|
||||
layer=layer,
|
||||
source_session=source_session,
|
||||
target_session=target_session
|
||||
))
|
||||
for job in copy_jobs:
|
||||
e = job.exception()
|
||||
if e:
|
||||
raise e
|
||||
image = job.result()
|
||||
if image:
|
||||
LOG.debug('Upload complete for layer: %s' % image)
|
||||
with futures.ThreadPoolExecutor(max_workers=4) as p:
|
||||
if source_layers:
|
||||
for layer in source_layers:
|
||||
copy_jobs.append(p.submit(
|
||||
cls._copy_layer_registry_to_registry,
|
||||
source_url, target_url,
|
||||
layer=layer,
|
||||
source_session=source_session,
|
||||
target_session=target_session
|
||||
))
|
||||
for job in futures.as_completed(copy_jobs):
|
||||
e = job.exception()
|
||||
if e:
|
||||
raise e
|
||||
image = job.result()
|
||||
if image:
|
||||
LOG.debug('Upload complete for layer: %s' % image)
|
||||
|
||||
for source_manifest in source_manifests:
|
||||
manifest = json.loads(source_manifest)
|
||||
|
@ -1759,21 +1756,20 @@ class PythonImageUploader(BaseImageUploader):
|
|||
|
||||
# Upload all layers
|
||||
copy_jobs = []
|
||||
p = futures.ThreadPoolExecutor(max_workers=4)
|
||||
for layer in manifest['layers']:
|
||||
layer_entry = layers_by_digest[layer['digest']]
|
||||
|
||||
copy_jobs.append(p.submit(
|
||||
cls._copy_layer_local_to_registry,
|
||||
target_url, session, layer, layer_entry
|
||||
))
|
||||
for job in copy_jobs:
|
||||
e = job.exception()
|
||||
if e:
|
||||
raise e
|
||||
image = job.result()
|
||||
if image:
|
||||
LOG.debug('Upload complete for layer: %s' % image)
|
||||
with futures.ThreadPoolExecutor(max_workers=4) as p:
|
||||
for layer in manifest['layers']:
|
||||
layer_entry = layers_by_digest[layer['digest']]
|
||||
copy_jobs.append(p.submit(
|
||||
cls._copy_layer_local_to_registry,
|
||||
target_url, session, layer, layer_entry
|
||||
))
|
||||
for job in futures.as_completed(copy_jobs):
|
||||
e = job.exception()
|
||||
if e:
|
||||
raise e
|
||||
image = job.result()
|
||||
if image:
|
||||
LOG.debug('Upload complete for layer: %s' % image)
|
||||
|
||||
manifest_str = json.dumps(manifest, indent=3)
|
||||
cls._copy_manifest_config_to_registry(
|
||||
|
@ -1909,12 +1905,12 @@ class PythonImageUploader(BaseImageUploader):
|
|||
# same base layers
|
||||
local_images.extend(upload_task(args=self.upload_tasks.pop()))
|
||||
|
||||
# workers will the CPU count minus 1, with a minimum of 2
|
||||
workers = max(2, (processutils.get_worker_count() - 1))
|
||||
p = futures.ThreadPoolExecutor(max_workers=workers)
|
||||
for result in p.map(upload_task, self.upload_tasks):
|
||||
local_images.extend(result)
|
||||
LOG.info('result %s' % local_images)
|
||||
# workers will be half the CPU, with a minimum of 2
|
||||
workers = max(2, processutils.get_worker_count() // 2)
|
||||
with futures.ThreadPoolExecutor(max_workers=workers) as p:
|
||||
for result in p.map(upload_task, self.upload_tasks):
|
||||
local_images.extend(result)
|
||||
LOG.info('result %s' % local_images)
|
||||
|
||||
# Do cleanup after all the uploads so common layers don't get deleted
|
||||
# repeatedly
|
||||
|
|
|
@ -479,11 +479,13 @@ class TestBaseImageUploader(base.TestCase):
|
|||
|
||||
@mock.patch('concurrent.futures.ThreadPoolExecutor')
|
||||
def test_discover_image_tags(self, mock_pool):
|
||||
mock_pool.return_value.map.return_value = (
|
||||
mock_map = mock.Mock()
|
||||
mock_map.return_value = (
|
||||
('docker.io/t/foo', 'a'),
|
||||
('docker.io/t/bar', 'b'),
|
||||
('docker.io/t/baz', 'c')
|
||||
)
|
||||
mock_pool.return_value.__enter__.return_value.map = mock_map
|
||||
images = [
|
||||
'docker.io/t/foo',
|
||||
'docker.io/t/bar',
|
||||
|
@ -497,7 +499,7 @@ class TestBaseImageUploader(base.TestCase):
|
|||
},
|
||||
self.uploader.discover_image_tags(images, 'rdo_release')
|
||||
)
|
||||
mock_pool.return_value.map.assert_called_once_with(
|
||||
mock_map.assert_called_once_with(
|
||||
image_uploader.discover_tag_from_inspect,
|
||||
[
|
||||
(self.uploader, 'docker.io/t/foo', 'rdo_release'),
|
||||
|
@ -832,12 +834,14 @@ class TestBaseImageUploader(base.TestCase):
|
|||
|
||||
@mock.patch('concurrent.futures.ThreadPoolExecutor')
|
||||
def test_list(self, mock_pool):
|
||||
mock_pool.return_value.map.return_value = (
|
||||
mock_map = mock.Mock()
|
||||
mock_map.return_value = (
|
||||
('localhost:8787/t/foo', ['a']),
|
||||
('localhost:8787/t/bar', ['b']),
|
||||
('localhost:8787/t/baz', ['c', 'd']),
|
||||
('localhost:8787/t/bink', [])
|
||||
)
|
||||
mock_pool.return_value.__enter__.return_value.map = mock_map
|
||||
session = mock.Mock()
|
||||
response = mock.Mock()
|
||||
response.status_code = 200
|
||||
|
@ -854,7 +858,7 @@ class TestBaseImageUploader(base.TestCase):
|
|||
],
|
||||
self.uploader.list('localhost:8787', session=session)
|
||||
)
|
||||
mock_pool.return_value.map.assert_called_once_with(
|
||||
mock_map.assert_called_once_with(
|
||||
image_uploader.tags_for_image,
|
||||
[
|
||||
(self.uploader, 'localhost:8787/t/foo', session),
|
||||
|
@ -863,14 +867,12 @@ class TestBaseImageUploader(base.TestCase):
|
|||
(self.uploader, 'localhost:8787/t/bink', session)
|
||||
])
|
||||
|
||||
@mock.patch('concurrent.futures.ThreadPoolExecutor')
|
||||
def test_list_404(self, mock_pool):
|
||||
def test_list_404(self):
|
||||
# setup bits
|
||||
session = mock.Mock()
|
||||
response = mock.Mock()
|
||||
response.status_code = 404
|
||||
session.get.return_value = response
|
||||
mock_pool.return_value.map.return_value = ()
|
||||
# execute function
|
||||
return_val = self.uploader.list('localhost:8787', session=session)
|
||||
# check status of things
|
||||
|
|
Loading…
Reference in New Issue