Merge "Skopeo based uploader"

This commit is contained in:
Zuul 2018-10-09 07:52:13 +00:00 committed by Gerrit Code Review
commit 92100d71d0
2 changed files with 519 additions and 23 deletions

View File

@ -22,7 +22,7 @@ import os
import requests
import shutil
import six
from six.moves import urllib
from six.moves.urllib.parse import urlparse
import subprocess
import tempfile
import tenacity
@ -56,6 +56,9 @@ CLEANUP = (
)
DEFAULT_UPLOADER = 'docker'
def get_undercloud_registry():
addr = 'localhost'
if 'br-ctlplane' in netifaces.interfaces():
@ -82,7 +85,7 @@ class ImageUploadManager(BaseImageManager):
self.cleanup = cleanup
def discover_image_tag(self, image, tag_from_label=None):
uploader = self.uploader('docker')
uploader = self.uploader(DEFAULT_UPLOADER)
return uploader.discover_image_tag(
image, tag_from_label=tag_from_label)
@ -113,7 +116,7 @@ class ImageUploadManager(BaseImageManager):
for item in upload_images:
image_name = item.get('imagename')
uploader = item.get('uploader', 'docker')
uploader = item.get('uploader', DEFAULT_UPLOADER)
pull_source = item.get('pull_source')
push_destination = self.get_push_destination(item)
@ -142,6 +145,8 @@ class ImageUploader(object):
def get_uploader(uploader):
if uploader == 'docker':
return DockerImageUploader()
if uploader == 'skopeo':
return SkopeoImageUploader()
raise ImageUploaderException('Unknown image uploader type')
@abc.abstractmethod
@ -178,6 +183,9 @@ class BaseImageUploader(ImageUploader):
self.upload_tasks = []
self.secure_registries = set(SECURE_REGISTRIES)
self.insecure_registries = set()
# A mapping of layer hashs to the image which first copied that
# layer to the target
self.image_layers = {}
def cleanup(self):
pass
@ -215,13 +223,15 @@ class BaseImageUploader(ImageUploader):
@staticmethod
def run_modify_playbook(modify_role, modify_vars,
source_image, target_image, append_tag):
source_image, target_image, append_tag,
container_build_tool='docker'):
vars = {}
if modify_vars:
vars.update(modify_vars)
vars['source_image'] = source_image
vars['target_image'] = target_image
vars['modified_append_tag'] = append_tag
vars['container_build_tool'] = container_build_tool
LOG.info('Playbook variables: \n%s' % yaml.safe_dump(
vars, default_flow_style=False))
playbook = [{
@ -241,7 +251,8 @@ class BaseImageUploader(ImageUploader):
action = ansible.AnsiblePlaybookAction(
playbook=playbook,
work_dir=work_dir,
verbosity=3
verbosity=3,
extra_env_variables=dict(os.environ)
)
result = action.run(None)
log_path = result.get('log_path')
@ -277,13 +288,13 @@ class BaseImageUploader(ImageUploader):
def _image_digest(image, insecure_registries):
image_url = BaseImageUploader._image_to_url(image)
insecure = image_url.netloc in insecure_registries
i = BaseImageUploader._inspect(image_url.geturl(), insecure)
i = BaseImageUploader._inspect(image_url, insecure)
return i.get('Digest')
@staticmethod
def _image_labels(image, insecure):
image_url = BaseImageUploader._image_to_url(image)
i = BaseImageUploader._inspect(image_url.geturl(), insecure)
i = BaseImageUploader._inspect(image_url, insecure)
return i.get('Labels', {}) or {}
@staticmethod
@ -302,7 +313,8 @@ class BaseImageUploader(ImageUploader):
wait=tenacity.wait_random_exponential(multiplier=1, max=10),
stop=tenacity.stop_after_attempt(5)
)
def _inspect(image, insecure=False):
def _inspect(image_url, insecure=False):
image = image_url.geturl()
cmd = ['skopeo', 'inspect']
@ -333,7 +345,7 @@ class BaseImageUploader(ImageUploader):
def _image_to_url(image):
if '://' not in image:
image = 'docker://' + image
return urllib.parse.urlparse(image)
return urlparse(image)
@staticmethod
def _discover_tag_from_inspect(i, image, tag_from_label=None,
@ -403,7 +415,7 @@ class BaseImageUploader(ImageUploader):
fallback_tag=None):
image_url = self._image_to_url(image)
insecure = self.is_insecure_registry(image_url.netloc)
i = self._inspect(image_url.geturl(), insecure)
i = self._inspect(image_url, insecure)
return self._discover_tag_from_inspect(i, image, tag_from_label,
fallback_tag)
@ -430,7 +442,8 @@ class BaseImageUploader(ImageUploader):
self.is_insecure_registry(self._image_to_url(push_destination).netloc)
self.upload_tasks.append((image_name, pull_source, push_destination,
self.insecure_registries, append_tag,
modify_role, modify_vars, dry_run, cleanup))
modify_role, modify_vars, dry_run, cleanup,
self.image_layers))
def is_insecure_registry(self, registry_host):
if registry_host in self.secure_registries:
@ -450,6 +463,29 @@ class BaseImageUploader(ImageUploader):
self.secure_registries.add(registry_host)
return False
@staticmethod
def _cross_repo_mount(target_image_url, image_layers,
source_layers, insecure_registries):
netloc = target_image_url.netloc
name = target_image_url.path.split(':')[0][1:]
if netloc in insecure_registries:
scheme = 'http'
else:
scheme = 'https'
url = '%s://%s/v2/%s/blobs/uploads/' % (scheme, netloc, name)
for layer in source_layers:
if layer in image_layers:
existing_name = image_layers[layer].path.split(':')[0][1:]
LOG.info('Cross repository blob mount %s from %s' %
(layer, existing_name))
data = {
'mount': layer,
'from': existing_name
}
r = requests.post(url, data=data)
LOG.debug('%s %s' % (r.status_code, r.reason))
class DockerImageUploader(BaseImageUploader):
"""Upload images using docker pull/tag/push"""
@ -457,7 +493,7 @@ class DockerImageUploader(BaseImageUploader):
@staticmethod
def upload_image(image_name, pull_source, push_destination,
insecure_registries, append_tag, modify_role,
modify_vars, dry_run, cleanup):
modify_vars, dry_run, cleanup, image_layers):
LOG.info('imagename: %s' % image_name)
names = BaseImageUploader.source_target_names(
image_name, pull_source, push_destination, append_tag)
@ -582,15 +618,184 @@ class DockerImageUploader(BaseImageUploader):
self.cleanup(local_images)
class SkopeoImageUploader(BaseImageUploader):
"""Upload images using skopeo copy"""
@staticmethod
def upload_image(image_name, pull_source, push_destination,
insecure_registries, append_tag, modify_role,
modify_vars, dry_run, cleanup, image_layers):
LOG.info('imagename: %s' % image_name)
names = BaseImageUploader.source_target_names(
image_name, pull_source, push_destination, append_tag)
source_image = names['source_image']
source_image_url = BaseImageUploader._image_to_url(source_image)
source_image_local_url = urlparse('containers-storage:%s'
% source_image)
append_tag = names['append_tag']
target_image_source_tag = names['target_image_source_tag']
target_image = names['target_image']
target_image_url = BaseImageUploader._image_to_url(target_image)
target_image_local_url = urlparse('containers-storage:%s' %
target_image)
if dry_run:
return []
if modify_role and BaseImageUploader._image_exists(
target_image, insecure_registries):
LOG.warning('Skipping upload for modified image %s' %
target_image)
return []
source_inspect = BaseImageUploader._inspect(source_image_url)
source_layers = source_inspect.get('Layers', [])
BaseImageUploader._cross_repo_mount(
target_image_url, image_layers, source_layers, insecure_registries)
to_cleanup = []
if modify_role:
# Copy from source registry to local storage
SkopeoImageUploader._copy(
source_image_url,
source_image_local_url,
insecure_registries
)
if cleanup in (CLEANUP_FULL, CLEANUP_PARTIAL):
to_cleanup = [source_image]
BaseImageUploader.run_modify_playbook(
modify_role, modify_vars, source_image,
target_image_source_tag, append_tag,
container_build_tool='buildah')
# Inspect to confirm the playbook created the target image
BaseImageUploader._inspect(target_image_local_url)
if cleanup == CLEANUP_FULL:
to_cleanup.append(target_image)
# Copy from local storage to target registry
SkopeoImageUploader._copy(
target_image_local_url,
target_image_url,
insecure_registries
)
for layer in source_layers:
image_layers.setdefault(layer, target_image_url)
LOG.warning('Completed modify and upload for image %s' %
image_name)
else:
SkopeoImageUploader._copy(
source_image_url,
target_image_url,
insecure_registries
)
LOG.warning('Completed upload for image %s' % image_name)
for layer in source_layers:
image_layers.setdefault(layer, target_image_url)
return to_cleanup
@staticmethod
@tenacity.retry( # Retry up to 5 times with jittered exponential backoff
reraise=True,
wait=tenacity.wait_random_exponential(multiplier=1, max=10),
stop=tenacity.stop_after_attempt(5)
)
def _copy(source_url, target_url, insecure_registries):
source = source_url.geturl()
target = target_url.geturl()
LOG.info('Copying from %s to %s' % (source, target))
cmd = ['skopeo', 'copy']
if source_url.netloc in insecure_registries:
cmd.append('--src-tls-verify=false')
if target_url.netloc in insecure_registries:
cmd.append('--dest-tls-verify=false')
cmd.append(source)
cmd.append(target)
LOG.info('Running %s' % ' '.join(cmd))
env = os.environ.copy()
process = subprocess.Popen(cmd, env=env, stdout=subprocess.PIPE)
out, err = process.communicate()
LOG.info(out)
if process.returncode != 0:
raise ImageUploaderException('Error copying image:\n%s\n%s' %
(' '.join(cmd), err))
return out
@staticmethod
def _delete(image_url, insecure=False):
image = image_url.geturl()
LOG.info('Deleting %s' % image)
cmd = ['skopeo', 'delete']
if insecure:
cmd.append('--tls-verify=false')
cmd.append(image)
LOG.info('Running %s' % ' '.join(cmd))
env = os.environ.copy()
process = subprocess.Popen(cmd, env=env, stdout=subprocess.PIPE)
out, err = process.communicate()
LOG.info(out)
if process.returncode != 0:
raise ImageUploaderException('Error deleting image:\n%s\n%s' %
(' '.join(cmd), err))
return out
def cleanup(self, local_images):
if not local_images:
return []
for image in sorted(local_images):
if not image:
continue
LOG.warning('Removing local copy of %s' % image)
image_url = urlparse('containers-storage:%s' % image)
SkopeoImageUploader._delete(image_url)
def run_tasks(self):
if not self.upload_tasks:
return
local_images = []
# Pull a single image first, to avoid duplicate pulls of the
# same base layers
first = self.upload_tasks.pop()
result = self.upload_image(*first)
local_images.extend(result)
# workers will be half the CPU count, to a minimum of 2
workers = max(2, processutils.get_worker_count() // 2)
p = futures.ThreadPoolExecutor(max_workers=workers)
for result in p.map(skopeo_upload, self.upload_tasks):
local_images.extend(result)
LOG.info('result %s' % local_images)
# Do cleanup after all the uploads so common layers don't get deleted
# repeatedly
self.cleanup(local_images)
def docker_upload(args):
return DockerImageUploader.upload_image(*args)
def skopeo_upload(args):
return SkopeoImageUploader.upload_image(*args)
def discover_tag_from_inspect(args):
image, tag_from_label, insecure_registries = args
image_url = BaseImageUploader._image_to_url(image)
insecure = image_url.netloc in insecure_registries
i = BaseImageUploader._inspect(image_url.geturl(), insecure)
i = BaseImageUploader._inspect(image_url, insecure)
if ':' in image_url.path:
# break out the tag from the url to be the fallback tag
path = image.rpartition(':')

View File

@ -16,8 +16,11 @@
import json
import mock
import operator
import os
import requests
import six
from six.moves.urllib.parse import urlparse
import tempfile
import urllib3
from oslo_concurrency import processutils
@ -47,6 +50,8 @@ class TestImageUploadManager(base.TestCase):
files.append('testfile')
self.filelist = files
@mock.patch('tripleo_common.image.image_uploader.'
'BaseImageUploader._inspect')
@mock.patch('tripleo_common.image.base.open',
mock.mock_open(read_data=filedata), create=True)
@mock.patch('tripleo_common.image.image_uploader.'
@ -61,8 +66,9 @@ class TestImageUploadManager(base.TestCase):
@mock.patch('tripleo_common.image.image_uploader.'
'get_undercloud_registry', return_value='192.0.2.0:8787')
def test_file_parsing(self, mock_gur, mockdocker, mockioctl, mockpath,
mock_images_match, mock_is_insecure):
mock_images_match, mock_is_insecure, mock_inspect):
mock_inspect.return_value = {}
manager = image_uploader.ImageUploadManager(self.filelist, debug=True)
parsed_data = manager.upload()
mockpath(self.filelist[0])
@ -433,7 +439,8 @@ class TestDockerImageUploader(base.TestCase):
None,
None,
False,
'full'
'full',
{}
)
)
@ -465,7 +472,8 @@ class TestDockerImageUploader(base.TestCase):
None,
None,
False,
'full')
'full',
{})
self.dockermock.assert_called_once_with(
base_url='unix://var/run/docker.sock', version='auto')
@ -504,7 +512,8 @@ class TestDockerImageUploader(base.TestCase):
None,
None,
False,
'full'
'full',
{}
)
)
@ -541,7 +550,8 @@ class TestDockerImageUploader(base.TestCase):
'target_image': '%s:%s' % (push_image, tag),
'modified_append_tag': append_tag,
'source_image': '%s:%s' % (image, tag),
'foo_version': '1.0.1'
'foo_version': '1.0.1',
'container_build_tool': 'docker'
}
}],
'hosts': 'localhost'
@ -559,7 +569,8 @@ class TestDockerImageUploader(base.TestCase):
'add-foo-plugin',
{'foo_version': '1.0.1'},
False,
'partial'
'partial',
{}
)
)
@ -569,7 +580,11 @@ class TestDockerImageUploader(base.TestCase):
self.dockermock.return_value.pull.assert_called_once_with(
image, tag=tag, stream=True)
mock_ansible.assert_called_once_with(
playbook=playbook, work_dir=mock.ANY, verbosity=3)
playbook=playbook,
work_dir=mock.ANY,
verbosity=3,
extra_env_variables=mock.ANY
)
self.dockermock.return_value.tag.assert_not_called()
self.dockermock.return_value.push.assert_called_once_with(
push_image,
@ -598,7 +613,8 @@ class TestDockerImageUploader(base.TestCase):
ImageUploaderException,
self.uploader.upload_image,
image + ':' + tag, None, push_destination, set(), append_tag,
'add-foo-plugin', {'foo_version': '1.0.1'}, False, 'full'
'add-foo-plugin', {'foo_version': '1.0.1'}, False, 'full',
{}
)
self.dockermock.assert_called_once_with(
@ -630,7 +646,8 @@ class TestDockerImageUploader(base.TestCase):
'add-foo-plugin',
{'foo_version': '1.0.1'},
True,
'full'
'full',
{}
)
self.dockermock.assert_not_called()
@ -659,7 +676,8 @@ class TestDockerImageUploader(base.TestCase):
'add-foo-plugin',
{'foo_version': '1.0.1'},
False,
'full'
'full',
{}
)
self.dockermock.assert_not_called()
@ -740,3 +758,276 @@ class TestDockerImageUploader(base.TestCase):
dockerc.push.assert_has_calls([
mock.call(image, tag=None, stream=True)
])
class TestSkopeoImageUploader(base.TestCase):
def setUp(self):
super(TestSkopeoImageUploader, self).setUp()
self.uploader = image_uploader.SkopeoImageUploader()
self.uploader._copy.retry.sleep = mock.Mock()
self.uploader._inspect.retry.sleep = mock.Mock()
@mock.patch('os.environ')
@mock.patch('subprocess.Popen')
@mock.patch('tripleo_common.image.image_uploader.'
'BaseImageUploader._inspect')
def test_upload_image(self, mock_inspect, mock_popen, mock_environ):
mock_process = mock.Mock()
mock_process.communicate.return_value = ('copy complete', '')
mock_process.returncode = 0
mock_popen.return_value = mock_process
mock_environ.copy.return_value = {}
mock_inspect.return_value = {}
image = 'docker.io/t/nova-api'
tag = 'latest'
push_destination = 'localhost:8787'
self.assertEqual(
[],
self.uploader.upload_image(
image + ':' + tag,
None,
push_destination,
set(),
None,
None,
None,
False,
'full',
{}
)
)
mock_popen.assert_called_once_with([
'skopeo',
'copy',
'docker://docker.io/t/nova-api:latest',
'docker://localhost:8787/t/nova-api:latest'],
env={}, stdout=-1
)
@mock.patch('tripleo_common.image.image_uploader.'
'BaseImageUploader._inspect')
@mock.patch('tripleo_common.image.image_uploader.'
'SkopeoImageUploader._copy')
@mock.patch('tripleo_common.image.image_uploader.'
'BaseImageUploader._image_exists')
@mock.patch('tripleo_common.actions.'
'ansible.AnsiblePlaybookAction', autospec=True)
def test_modify_upload_image(self, mock_ansible, mock_exists, mock_copy,
mock_inspect):
mock_exists.return_value = False
mock_inspect.return_value = {}
with tempfile.NamedTemporaryFile(delete=False) as logfile:
self.addCleanup(os.remove, logfile.name)
mock_ansible.return_value.run.return_value = {
'log_path': logfile.name
}
image = 'docker.io/t/nova-api'
tag = 'latest'
append_tag = 'modify-123'
push_destination = 'localhost:8787'
push_image = 'localhost:8787/t/nova-api'
playbook = [{
'tasks': [{
'import_role': {
'name': 'add-foo-plugin'
},
'name': 'Import role add-foo-plugin',
'vars': {
'target_image': '%s:%s' % (push_image, tag),
'modified_append_tag': append_tag,
'source_image': '%s:%s' % (image, tag),
'foo_version': '1.0.1',
'container_build_tool': 'buildah'
}
}],
'hosts': 'localhost'
}]
# test response for a partial cleanup
self.assertEqual(
['docker.io/t/nova-api:latest'],
self.uploader.upload_image(
image + ':' + tag,
None,
push_destination,
set(),
append_tag,
'add-foo-plugin',
{'foo_version': '1.0.1'},
False,
'partial',
{}
)
)
insecure = set()
mock_inspect.assert_has_calls([
mock.call(urlparse(
'docker://docker.io/t/nova-api:latest'
)),
mock.call(urlparse(
'containers-storage:localhost:8787/t/nova-api:latestmodify-123'
))
])
mock_copy.assert_has_calls([
mock.call(
urlparse('docker://docker.io/t/nova-api:latest'),
urlparse('containers-storage:docker.io/t/nova-api:latest'),
insecure
),
mock.call(
urlparse('containers-storage:localhost:8787/'
't/nova-api:latestmodify-123'),
urlparse('docker://localhost:8787/'
't/nova-api:latestmodify-123'),
insecure
)
])
mock_ansible.assert_called_once_with(
playbook=playbook,
work_dir=mock.ANY,
verbosity=3,
extra_env_variables=mock.ANY
)
@mock.patch('tripleo_common.image.image_uploader.'
'BaseImageUploader._inspect')
@mock.patch('tripleo_common.image.image_uploader.'
'SkopeoImageUploader._copy')
@mock.patch('tripleo_common.image.image_uploader.'
'BaseImageUploader._image_exists')
@mock.patch('tripleo_common.actions.'
'ansible.AnsiblePlaybookAction', autospec=True)
def test_modify_image_failed(self, mock_ansible, mock_exists, mock_copy,
mock_inspect):
mock_exists.return_value = False
mock_inspect.return_value = {}
image = 'docker.io/t/nova-api'
tag = 'latest'
append_tag = 'modify-123'
push_destination = 'localhost:8787'
error = processutils.ProcessExecutionError(
'', 'ouch', -1, 'ansible-playbook')
mock_ansible.return_value.run.side_effect = error
self.assertRaises(
ImageUploaderException,
self.uploader.upload_image,
image + ':' + tag, None, push_destination, set(), append_tag,
'add-foo-plugin', {'foo_version': '1.0.1'}, False, 'full',
{}
)
insecure = set()
mock_copy.assert_called_once_with(
urlparse('docker://docker.io/t/nova-api:latest'),
urlparse('containers-storage:docker.io/t/nova-api:latest'),
insecure
)
@mock.patch('subprocess.Popen')
@mock.patch('tripleo_common.actions.'
'ansible.AnsiblePlaybookAction', autospec=True)
def test_modify_upload_image_dry_run(self, mock_ansible, mock_popen):
mock_process = mock.Mock()
mock_popen.return_value = mock_process
image = 'docker.io/t/nova-api'
tag = 'latest'
append_tag = 'modify-123'
push_destination = 'localhost:8787'
result = self.uploader.upload_image(
image + ':' + tag,
None,
push_destination,
set(),
append_tag,
'add-foo-plugin',
{'foo_version': '1.0.1'},
True,
'full',
{}
)
mock_ansible.assert_not_called()
mock_process.communicate.assert_not_called()
self.assertEqual([], result)
@mock.patch('tripleo_common.image.image_uploader.'
'BaseImageUploader._inspect')
@mock.patch('tripleo_common.actions.'
'ansible.AnsiblePlaybookAction', autospec=True)
def test_modify_image_existing(self, mock_ansible, mock_inspect):
mock_inspect.return_value = {'Digest': 'a'}
image = 'docker.io/t/nova-api'
tag = 'latest'
append_tag = 'modify-123'
push_destination = 'localhost:8787'
result = self.uploader.upload_image(
image + ':' + tag,
None,
push_destination,
set(),
append_tag,
'add-foo-plugin',
{'foo_version': '1.0.1'},
False,
'full',
{}
)
mock_ansible.assert_not_called()
self.assertEqual([], result)
@mock.patch('os.environ')
@mock.patch('subprocess.Popen')
def test_copy_retry(self, mock_popen, mock_environ):
mock_success = mock.Mock()
mock_success.communicate.return_value = ('copy complete', '')
mock_success.returncode = 0
mock_failure = mock.Mock()
mock_failure.communicate.return_value = ('', 'ouch')
mock_failure.returncode = 1
mock_popen.side_effect = [
mock_failure,
mock_failure,
mock_failure,
mock_failure,
mock_success
]
mock_environ.copy.return_value = {}
source = urlparse('docker://docker.io/t/nova-api')
target = urlparse('containers_storage:docker.io/t/nova-api')
self.uploader._copy(source, target, set())
self.assertEqual(mock_failure.communicate.call_count, 4)
self.assertEqual(mock_success.communicate.call_count, 1)
@mock.patch('os.environ')
@mock.patch('subprocess.Popen')
def test_copy_retry_failure(self, mock_popen, mock_environ):
mock_failure = mock.Mock()
mock_failure.communicate.return_value = ('', 'ouch')
mock_failure.returncode = 1
mock_popen.return_value = mock_failure
mock_environ.copy.return_value = {}
source = urlparse('docker://docker.io/t/nova-api')
target = urlparse('containers_storage:docker.io/t/nova-api')
self.assertRaises(
ImageUploaderException, self.uploader._copy, source, target, set())
self.assertEqual(mock_failure.communicate.call_count, 5)