Using HttpNfcLease to transfer vmdk files.

The current VMware driver supported only "sparse" and "preallocated"
vmware_disktype property set in a "vmdk" glance image. Both of these were just
copied over as *-flat.vmdk files into the vmfs or nfs file system of the
underlying datastore. This was used during copy_image_to_volume() api.
Unfortunately for a vsan datastore this work flow breaks since there is no
access to the flat vmdk file in the underlying datastore.

This patch introduces a new vmware_disktype for a glance image called
"streamOptimized". This is a format generated when a VM/vApp is exported using
the HttpNfc APIs. AS the name suggests this is a highly optimized format for
streaming in chunks and thus would result in much faster upload / download
speeds. The driver's copy_volume_to_image() implementation now always uploads
the vmdk contents using HttpNfc api so that the glance image ends up in the
"streamOptimized" disk type. Also the driver's copy_image_to_volume()
implementation now understands a "streamOptmized" disk type and uses HttpNfc to
import that vmdk into a backing VM.

Note that the same "streamOptmized" glance image format will also be supported
by VMware nova driver. This change is in a different patch -
https://review.openstack.org/#/c/53976/

Patch Set 4: Removing changes to requirements.txt that got in by mistake.
Patch Set 5: Fixing a small bug around progress updates.
Patch Set 6: Addressing comments from Avishay.

Fixes bug: 1229998

Change-Id: I6b55945cb61efded826e0bcf7e2a678ebbbbd9d3
This commit is contained in:
Subramanian Neelakantan 2013-10-11 08:54:18 +05:30
parent 26b58e2c10
commit 7b64653931
7 changed files with 543 additions and 161 deletions

View File

@ -1321,6 +1321,7 @@ class VMwareEsxVmdkDriverTestCase(test.TestCase):
image_meta = FakeObject()
image_meta['disk_format'] = 'vmdk'
image_meta['size'] = 1 * units.MiB
image_meta['properties'] = {'vmware_disktype': 'preallocated'}
image_service = m.CreateMock(glance.GlanceImageService)
image_service.show(mox.IgnoreArg(), image_id).AndReturn(image_meta)
volume = FakeObject()
@ -1354,14 +1355,87 @@ class VMwareEsxVmdkDriverTestCase(test.TestCase):
client.options.transport.cookiejar = cookies
m.StubOutWithMock(self._vim.__class__, 'client')
self._vim.client = client
m.StubOutWithMock(vmware_images, 'fetch_image')
m.StubOutWithMock(vmware_images, 'fetch_flat_image')
timeout = self._config.vmware_image_transfer_timeout_secs
vmware_images.fetch_image(mox.IgnoreArg(), timeout, image_service,
image_id, host=self.IP,
data_center_name=datacenter_name,
datastore_name=datastore_name,
cookies=cookies,
file_path=flat_vmdk_path)
vmware_images.fetch_flat_image(mox.IgnoreArg(), timeout, image_service,
image_id, image_size=image_meta['size'],
host=self.IP,
data_center_name=datacenter_name,
datastore_name=datastore_name,
cookies=cookies,
file_path=flat_vmdk_path)
m.ReplayAll()
self._driver.copy_image_to_volume(mox.IgnoreArg(), volume,
image_service, image_id)
m.UnsetStubs()
m.VerifyAll()
def test_copy_image_to_volume_stream_optimized(self):
"""Test copy_image_to_volume.
Test with an acceptable vmdk disk format and streamOptimized disk type.
"""
m = self.mox
m.StubOutWithMock(self._driver.__class__, 'session')
self._driver.session = self._session
m.StubOutWithMock(api.VMwareAPISession, 'vim')
self._session.vim = self._vim
m.StubOutWithMock(self._driver.__class__, 'volumeops')
self._driver.volumeops = self._volumeops
image_id = 'image-id'
size = 5 * units.GiB
size_kb = float(size) / units.KiB
size_gb = float(size) / units.GiB
# image_service.show call
image_meta = FakeObject()
image_meta['disk_format'] = 'vmdk'
image_meta['size'] = size
image_meta['properties'] = {'vmware_disktype': 'streamOptimized'}
image_service = m.CreateMock(glance.GlanceImageService)
image_service.show(mox.IgnoreArg(), image_id).AndReturn(image_meta)
# _select_ds_for_volume call
(host, rp, folder, summary) = (FakeObject(), FakeObject(),
FakeObject(), FakeObject())
summary.name = "datastore-1"
m.StubOutWithMock(self._driver, '_select_ds_for_volume')
self._driver._select_ds_for_volume(size_gb).AndReturn((host, rp,
folder,
summary))
# _get_disk_type call
vol_name = 'volume name'
volume = FakeObject()
volume['name'] = vol_name
volume['size'] = size_gb
volume['volume_type_id'] = None # _get_disk_type will return 'thin'
disk_type = 'thin'
# _get_create_spec call
m.StubOutWithMock(self._volumeops, '_get_create_spec')
self._volumeops._get_create_spec(vol_name, 0, disk_type,
summary.name)
# vim.client.factory.create call
class FakeFactory(object):
def create(self, name):
return mox.MockAnything()
client = FakeObject()
client.factory = FakeFactory()
m.StubOutWithMock(self._vim.__class__, 'client')
self._vim.client = client
# fetch_stream_optimized_image call
timeout = self._config.vmware_image_transfer_timeout_secs
m.StubOutWithMock(vmware_images, 'fetch_stream_optimized_image')
vmware_images.fetch_stream_optimized_image(mox.IgnoreArg(), timeout,
image_service, image_id,
session=self._session,
host=self.IP,
resource_pool=rp,
vm_folder=folder,
vm_create_spec=
mox.IgnoreArg(),
image_size=size)
m.ReplayAll()
self._driver.copy_image_to_volume(mox.IgnoreArg(), volume,
@ -1421,6 +1495,9 @@ class VMwareEsxVmdkDriverTestCase(test.TestCase):
project_id = 'project-owner-id-123'
volume = FakeObject()
volume['name'] = vol_name
size_gb = 5
size = size_gb * units.GiB
volume['size'] = size_gb
volume['project_id'] = project_id
volume['instance_uuid'] = None
volume['attached_host'] = None
@ -1434,48 +1511,17 @@ class VMwareEsxVmdkDriverTestCase(test.TestCase):
vmdk_file_path = '[%s] %s' % (datastore_name, file_path)
m.StubOutWithMock(self._volumeops, 'get_vmdk_path')
self._volumeops.get_vmdk_path(backing).AndReturn(vmdk_file_path)
tmp_vmdk = '[datastore1] %s.vmdk' % image_id
# volumeops.get_host
host = FakeMor('Host', 'my_host')
m.StubOutWithMock(self._volumeops, 'get_host')
self._volumeops.get_host(backing).AndReturn(host)
# volumeops.get_dc
datacenter_name = 'my_datacenter'
datacenter = FakeMor('Datacenter', datacenter_name)
m.StubOutWithMock(self._volumeops, 'get_dc')
self._volumeops.get_dc(host).AndReturn(datacenter)
# volumeops.copy_vmdk_file
m.StubOutWithMock(self._volumeops, 'copy_vmdk_file')
self._volumeops.copy_vmdk_file(datacenter, vmdk_file_path, tmp_vmdk)
# host_ip
host_ip = self.IP
# volumeops.get_entity_name
m.StubOutWithMock(self._volumeops, 'get_entity_name')
self._volumeops.get_entity_name(datacenter).AndReturn(datacenter_name)
# cookiejar
client = FakeObject()
client.options = FakeObject()
client.options.transport = FakeObject()
cookies = FakeObject()
client.options.transport.cookiejar = cookies
m.StubOutWithMock(self._vim.__class__, 'client')
self._vim.client = client
# flat_vmdk
flat_vmdk_file = '%s-flat.vmdk' % image_id
# vmware_images.upload_image
timeout = self._config.vmware_image_transfer_timeout_secs
host_ip = self.IP
m.StubOutWithMock(vmware_images, 'upload_image')
vmware_images.upload_image(mox.IgnoreArg(), timeout, image_service,
image_id, project_id, host=host_ip,
data_center_name=datacenter_name,
datastore_name=datastore_name,
cookies=cookies,
file_path=flat_vmdk_file,
snapshot_name=image_meta['name'],
image_id, project_id, session=self._session,
host=host_ip, vm=backing,
vmdk_file_path=vmdk_file_path,
vmdk_size=size,
image_name=image_id,
image_version=1)
# volumeops.delete_vmdk_file
m.StubOutWithMock(self._volumeops, 'delete_vmdk_file')
self._volumeops.delete_vmdk_file(tmp_vmdk, datacenter)
m.ReplayAll()
self._driver.copy_volume_to_image(mox.IgnoreArg(), volume,

View File

@ -271,3 +271,38 @@ class VMwareAPISession(object):
LOG.exception(_("Task: %(task)s failed with error: %(err)s.") %
{'task': task, 'err': excep})
done.send_exception(excep)
def wait_for_lease_ready(self, lease):
done = event.Event()
loop = loopingcall.FixedIntervalLoopingCall(self._poll_lease,
lease,
done)
loop.start(self._task_poll_interval)
done.wait()
loop.stop()
def _poll_lease(self, lease, done):
try:
state = self.invoke_api(vim_util, 'get_object_property',
self.vim, lease, 'state')
if state == 'ready':
# done
LOG.debug(_("Lease is ready."))
done.send()
return
elif state == 'initializing':
LOG.debug(_("Lease initializing..."))
return
elif state == 'error':
error_msg = self.invoke_api(vim_util, 'get_object_property',
self.vim, lease, 'error')
LOG.exception(error_msg)
excep = error_util.VimFaultException([], error_msg)
done.send_exception(excep)
else:
# unknown state - complain
error_msg = _("Error: unknown lease state %s.") % state
raise error_util.VimFaultException([], error_msg)
except Exception as excep:
LOG.exception(excep)
done.send_exception(excep)

View File

@ -36,22 +36,26 @@ class ThreadSafePipe(queue.LightQueue):
"""The pipe to hold the data which the reader writes to and the writer
reads from.
"""
def __init__(self, maxsize, transfer_size):
def __init__(self, maxsize, max_transfer_size):
queue.LightQueue.__init__(self, maxsize)
self.transfer_size = transfer_size
self.max_transfer_size = max_transfer_size
self.transferred = 0
def read(self, chunk_size):
"""Read data from the pipe.
Chunksize if ignored for we have ensured that the data chunks written
Chunksize is ignored for we have ensured that the data chunks written
to the pipe by readers is the same as the chunks asked for by Writer.
"""
if self.transferred < self.transfer_size:
if self.transferred < self.max_transfer_size:
data_item = self.get()
self.transferred += len(data_item)
LOG.debug(_("Read %(bytes)s out of %(max)s from ThreadSafePipe.") %
{'bytes': self.transferred,
'max': self.max_transfer_size})
return data_item
else:
LOG.debug(_("Completed transfer of size %s.") % self.transferred)
return ""
def write(self, data):
@ -64,7 +68,7 @@ class ThreadSafePipe(queue.LightQueue):
def tell(self):
"""Get size of the file to be read."""
return self.transfer_size
return self.max_transfer_size
def close(self):
"""A place-holder to maintain consistency."""
@ -76,13 +80,13 @@ class GlanceWriteThread(object):
it is in correct ('active')state.
"""
def __init__(self, context, input, image_service, image_id,
def __init__(self, context, input_file, image_service, image_id,
image_meta=None):
if not image_meta:
image_meta = {}
self.context = context
self.input = input
self.input_file = input_file
self.image_service = image_service
self.image_id = image_id
self.image_meta = image_meta
@ -97,10 +101,13 @@ class GlanceWriteThread(object):
Function to do the image data transfer through an update
and thereon checks if the state is 'active'.
"""
LOG.debug(_("Initiating image service update on image: %(image)s "
"with meta: %(meta)s") % {'image': self.image_id,
'meta': self.image_meta})
self.image_service.update(self.context,
self.image_id,
self.image_meta,
data=self.input)
data=self.input_file)
self._running = True
while self._running:
try:
@ -109,6 +116,8 @@ class GlanceWriteThread(object):
image_status = image_meta.get('status')
if image_status == 'active':
self.stop()
LOG.debug(_("Glance image: %s is now active.") %
self.image_id)
self.done.send(True)
# If the state is killed, then raise an exception.
elif image_status == 'killed':
@ -150,9 +159,9 @@ class IOThread(object):
output file till the transfer is completely done.
"""
def __init__(self, input, output):
self.input = input
self.output = output
def __init__(self, input_file, output_file):
self.input_file = input_file
self.output_file = output_file
self._running = False
self.got_exception = False
@ -160,15 +169,19 @@ class IOThread(object):
self.done = event.Event()
def _inner():
"""Read data from the input and write the same to the output."""
"""Read data from input and write the same to output."""
self._running = True
while self._running:
try:
data = self.input.read(None)
data = self.input_file.read(None)
if not data:
self.stop()
self.done.send(True)
self.output.write(data)
self.output_file.write(data)
if hasattr(self.input_file, "update_progress"):
self.input_file.update_progress()
if hasattr(self.output_file, "update_progress"):
self.output_file.update_progress()
greenthread.sleep(IO_THREAD_SLEEP_TIME)
except Exception as exc:
self.stop()

View File

@ -28,6 +28,8 @@ import urllib2
import urlparse
from cinder.openstack.common import log as logging
from cinder.volume.drivers.vmware import error_util
from cinder.volume.drivers.vmware import vim_util
LOG = logging.getLogger(__name__)
USER_AGENT = 'OpenStack-ESX-Adapter'
@ -63,20 +65,12 @@ class GlanceFileRead(object):
class VMwareHTTPFile(object):
"""Base class for HTTP file."""
"""Base class for VMDK file access over HTTP."""
def __init__(self, file_handle):
self.eof = False
self.file_handle = file_handle
def set_eof(self, eof):
"""Set the end of file marker."""
self.eof = eof
def get_eof(self):
"""Check if the end of file has been reached."""
return self.eof
def close(self):
"""Close the file handle."""
try:
@ -121,6 +115,28 @@ class VMwareHTTPFile(object):
return '%s://[%s]' % (scheme, host)
return '%s://%s' % (scheme, host)
def _fix_esx_url(self, url, host):
"""Fix netloc if it is a ESX host.
For a ESX host the netloc is set to '*' in the url returned in
HttpNfcLeaseInfo. The netloc is right IP when talking to a VC.
"""
urlp = urlparse.urlparse(url)
if urlp.netloc == '*':
scheme, _, path, params, query, fragment = urlp
url = urlparse.urlunparse((scheme, host, path, params,
query, fragment))
return url
def find_vmdk_url(self, lease_info, host):
"""Find the URL corresponding to a vmdk disk in lease info."""
url = None
for deviceUrl in lease_info.deviceUrl:
if deviceUrl.disk:
url = self._fix_esx_url(deviceUrl.url, host)
break
return url
class VMwareHTTPWriteFile(VMwareHTTPFile):
"""VMware file write handler class."""
@ -159,28 +175,165 @@ class VMwareHTTPWriteFile(VMwareHTTPFile):
super(VMwareHTTPWriteFile, self).close()
class VMwareHTTPReadFile(VMwareHTTPFile):
"""VMware file read handler class."""
class VMwareHTTPWriteVmdk(VMwareHTTPFile):
"""Write VMDK over HTTP using VMware HttpNfcLease."""
def __init__(self, host, data_center_name, datastore_name, cookies,
file_path, scheme='https'):
soap_url = self.get_soap_url(scheme, host)
base_url = '%s/folder/%s' % (soap_url, urllib.pathname2url(file_path))
param_list = {'dcPath': data_center_name, 'dsName': datastore_name}
base_url = base_url + '?' + urllib.urlencode(param_list)
def __init__(self, session, host, rp_ref, vm_folder_ref, vm_create_spec,
vmdk_size):
"""Initialize a writer for vmdk file.
:param session: a valid api session to ESX/VC server
:param host: the ESX or VC host IP
:param rp_ref: resource pool into which backing VM is imported
:param vm_folder_ref: VM folder in ESX/VC inventory to use as parent
of backing VM
:param vm_create_spec: backing VM created using this create spec
:param vmdk_size: VMDK size to be imported into backing VM
"""
self._session = session
self._vmdk_size = vmdk_size
self._progress = 0
lease = session.invoke_api(session.vim, 'ImportVApp', rp_ref,
spec=vm_create_spec, folder=vm_folder_ref)
session.wait_for_lease_ready(lease)
self._lease = lease
lease_info = session.invoke_api(vim_util, 'get_object_property',
session.vim, lease, 'info')
# Find the url for vmdk device
url = self.find_vmdk_url(lease_info, host)
if not url:
msg = _("Could not retrieve URL from lease.")
LOG.exception(msg)
raise error_util.VimException(msg)
LOG.info(_("Opening vmdk url: %s for write.") % url)
# Prepare the http connection to the vmdk url
cookies = session.vim.client.options.transport.cookiejar
_urlparse = urlparse.urlparse(url)
scheme, netloc, path, params, query, fragment = _urlparse
if scheme == 'http':
conn = httplib.HTTPConnection(netloc)
elif scheme == 'https':
conn = httplib.HTTPSConnection(netloc)
if query:
path = path + '?' + query
conn.putrequest('PUT', path)
conn.putheader('User-Agent', USER_AGENT)
conn.putheader('Content-Length', str(vmdk_size))
conn.putheader('Overwrite', 't')
conn.putheader('Cookie', self._build_vim_cookie_headers(cookies))
conn.putheader('Content-Type', 'binary/octet-stream')
conn.endheaders()
self.conn = conn
VMwareHTTPFile.__init__(self, conn)
def write(self, data):
"""Write to the file."""
self._progress += len(data)
LOG.debug(_("Written %s bytes to vmdk.") % self._progress)
self.file_handle.send(data)
def update_progress(self):
"""Updates progress to lease.
This call back to the lease is essential to keep the lease alive
across long running write operations.
"""
percent = int(float(self._progress) / self._vmdk_size * 100)
try:
LOG.debug(_("Updating progress to %s percent.") % percent)
self._session.invoke_api(self._session.vim,
'HttpNfcLeaseProgress',
self._lease, percent=percent)
except error_util.VimException as ex:
LOG.exception(ex)
raise ex
def close(self):
"""End the lease and close the connection."""
state = self._session.invoke_api(vim_util, 'get_object_property',
self._session.vim,
self._lease, 'state')
if state == 'ready':
self._session.invoke_api(self._session.vim, 'HttpNfcLeaseComplete',
self._lease)
LOG.debug(_("Lease released."))
else:
LOG.debug(_("Lease is already in state: %s.") % state)
super(VMwareHTTPWriteVmdk, self).close()
class VMwareHTTPReadVmdk(VMwareHTTPFile):
"""read VMDK over HTTP using VMware HttpNfcLease."""
def __init__(self, session, host, vm_ref, vmdk_path, vmdk_size):
"""Initialize a writer for vmdk file.
During an export operation the vmdk disk is converted to a
stream-optimized sparse disk format. So the size of the VMDK
after export may be smaller than the current vmdk disk size.
:param session: a valid api session to ESX/VC server
:param host: the ESX or VC host IP
:param vm_ref: backing VM whose vmdk is to be exported
:param vmdk_path: datastore relative path to vmdk file to be exported
:param vmdk_size: current disk size of vmdk file to be exported
"""
self._session = session
self._vmdk_size = vmdk_size
self._progress = 0
lease = session.invoke_api(session.vim, 'ExportVm', vm_ref)
session.wait_for_lease_ready(lease)
self._lease = lease
lease_info = session.invoke_api(vim_util, 'get_object_property',
session.vim, lease, 'info')
# find the right disk url corresponding to given vmdk_path
url = self.find_vmdk_url(lease_info, host)
if not url:
msg = _("Could not retrieve URL from lease.")
LOG.exception(msg)
raise error_util.VimException(msg)
LOG.info(_("Opening vmdk url: %s for read.") % url)
cookies = session.vim.client.options.transport.cookiejar
headers = {'User-Agent': USER_AGENT,
'Cookie': self._build_vim_cookie_headers(cookies)}
request = urllib2.Request(base_url, None, headers)
request = urllib2.Request(url, None, headers)
conn = urllib2.urlopen(request)
VMwareHTTPFile.__init__(self, conn)
def read(self, chunk_size):
"""Read a chunk of data."""
# We are ignoring the chunk size passed for we want the pipe to hold
# data items of the chunk-size that Glance Client uses for read
# while writing.
"""Read a chunk from file"""
self._progress += READ_CHUNKSIZE
LOG.debug(_("Read %s bytes from vmdk.") % self._progress)
return self.file_handle.read(READ_CHUNKSIZE)
def get_size(self):
"""Get size of the file to be read."""
return self.file_handle.headers.get('Content-Length', -1)
def update_progress(self):
"""Updates progress to lease.
This call back to the lease is essential to keep the lease alive
across long running read operations.
"""
percent = int(float(self._progress) / self._vmdk_size * 100)
try:
LOG.debug(_("Updating progress to %s percent.") % percent)
self._session.invoke_api(self._session.vim,
'HttpNfcLeaseProgress',
self._lease, percent=percent)
except error_util.VimException as ex:
LOG.exception(ex)
raise ex
def close(self):
"""End the lease and close the connection."""
state = self._session.invoke_api(vim_util, 'get_object_property',
self._session.vim,
self._lease, 'state')
if state == 'ready':
self._session.invoke_api(self._session.vim, 'HttpNfcLeaseComplete',
self._lease)
LOG.debug(_("Lease released."))
else:
LOG.debug(_("Lease is already in state: %s.") % state)
super(VMwareHTTPReadVmdk, self).close()

View File

@ -319,6 +319,43 @@ class VMwareEsxVmdkDriver(driver.VolumeDriver):
def _relocate_backing(self, size_gb, backing, host):
pass
def _select_ds_for_volume(self, size_gb):
"""Select datastore that can accommodate a volume of given size.
Returns the selected datastore summary along with a compute host and
its resource pool and folder where the volume can be created
:return: (host, rp, folder, summary)
"""
retrv_result = self.volumeops.get_hosts()
while retrv_result:
hosts = retrv_result.objects
if not hosts:
break
(selected_host, rp, folder, summary) = (None, None, None, None)
for host in hosts:
host = host.obj
try:
(dss, rp) = self.volumeops.get_dss_rp(host)
(folder, summary) = self._get_folder_ds_summary(size_gb,
rp, dss)
selected_host = host
break
except error_util.VimException as excep:
LOG.warn(_("Unable to find suitable datastore for volume "
"of size: %(vol)s GB under host: %(host)s. "
"More details: %(excep)s") %
{'vol': size_gb,
'host': host.obj, 'excep': excep})
if selected_host:
self.volumeops.cancel_retrieval(retrv_result)
return (selected_host, rp, folder, summary)
retrv_result = self.volumeops.continue_retrieval(retrv_result)
msg = _("Unable to find host to accommodate a disk of size: %s "
"in the inventory.") % size_gb
LOG.error(msg)
raise error_util.VimException(msg)
def _create_backing_in_inventory(self, volume):
"""Creates backing under any suitable host.
@ -612,27 +649,18 @@ class VMwareEsxVmdkDriver(driver.VolumeDriver):
LOG.error(msg)
raise exception.ImageUnacceptable(msg)
def copy_image_to_volume(self, context, volume, image_service, image_id):
"""Creates volume from image.
def _fetch_flat_image(self, context, volume, image_service, image_id,
image_size):
"""Creates a volume from flat glance image.
Creates a backing for the volume under the ESX/VC server and
copies the VMDK flat file from the glance image content.
The method supports only image with VMDK disk format.
:param context: context
:param volume: Volume object
:param image_service: Glance image service
:param image_id: Glance image id
The method assumes glance image is VMDK disk format and its
vmware_disktype is "sparse" or "preallocated", but not
"streamOptimized"
"""
LOG.debug(_("Copy glance image: %s to create new volume.") % image_id)
# Verify glance image is vmdk disk format
metadata = image_service.show(context, image_id)
disk_format = metadata['disk_format']
VMwareEsxVmdkDriver._validate_disk_format(disk_format)
# Set volume size in GB from image metadata
volume['size'] = float(metadata['size']) / units.GiB
volume['size'] = float(image_size) / units.GiB
# First create empty backing in the inventory
backing = self._create_backing_in_inventory(volume)
@ -653,12 +681,13 @@ class VMwareEsxVmdkDriver(driver.VolumeDriver):
cookies = self.session.vim.client.options.transport.cookiejar
LOG.debug(_("Fetching glance image: %(id)s to server: %(host)s.") %
{'id': image_id, 'host': host_ip})
vmware_images.fetch_image(context, timeout, image_service,
image_id, host=host_ip,
data_center_name=datacenter_name,
datastore_name=datastore_name,
cookies=cookies,
file_path=flat_vmdk_path)
vmware_images.fetch_flat_image(context, timeout, image_service,
image_id, image_size=image_size,
host=host_ip,
data_center_name=datacenter_name,
datastore_name=datastore_name,
cookies=cookies,
file_path=flat_vmdk_path)
LOG.info(_("Done copying image: %(id)s to volume: %(vol)s.") %
{'id': image_id, 'vol': volume['name']})
except Exception as excep:
@ -669,68 +698,148 @@ class VMwareEsxVmdkDriver(driver.VolumeDriver):
self.volumeops.delete_backing(backing)
raise excep
def _fetch_stream_optimized_image(self, context, volume, image_service,
image_id, image_size):
"""Creates volume from image using HttpNfc VM import.
Uses Nfc API to download the VMDK file from Glance. Nfc creates the
backing VM that wraps the VMDK in the ESX/VC inventory.
This method assumes glance image is VMDK disk format and its
vmware_disktype is 'streamOptimized'.
"""
try:
# find host in which to create the volume
size_gb = volume['size']
(host, rp, folder, summary) = self._select_ds_for_volume(size_gb)
except error_util.VimException as excep:
LOG.exception(_("Exception in _select_ds_for_volume: %s.") % excep)
raise excep
LOG.debug(_("Selected datastore %(ds)s for new volume of size "
"%(size)s GB.") % {'ds': summary.name, 'size': size_gb})
# prepare create spec for backing vm
disk_type = VMwareEsxVmdkDriver._get_disk_type(volume)
# The size of stream optimized glance image is often suspect,
# so better let VC figure out the disk capacity during import.
dummy_disk_size = 0
vm_create_spec = self.volumeops._get_create_spec(volume['name'],
dummy_disk_size,
disk_type,
summary.name)
# convert vm_create_spec to vm_import_spec
cf = self.session.vim.client.factory
vm_import_spec = cf.create('ns0:VirtualMachineImportSpec')
vm_import_spec.configSpec = vm_create_spec
try:
# fetching image from glance will also create the backing
timeout = self.configuration.vmware_image_transfer_timeout_secs
host_ip = self.configuration.vmware_host_ip
LOG.debug(_("Fetching glance image: %(id)s to server: %(host)s.") %
{'id': image_id, 'host': host_ip})
vmware_images.fetch_stream_optimized_image(context, timeout,
image_service,
image_id,
session=self.session,
host=host_ip,
resource_pool=rp,
vm_folder=folder,
vm_create_spec=
vm_import_spec,
image_size=image_size)
except exception.CinderException as excep:
LOG.exception(_("Exception in copy_image_to_volume: %s.") % excep)
backing = self.volumeops.get_backing(volume['name'])
if backing:
LOG.exception(_("Deleting the backing: %s") % backing)
# delete the backing
self.volumeops.delete_backing(backing)
raise excep
LOG.info(_("Done copying image: %(id)s to volume: %(vol)s.") %
{'id': image_id, 'vol': volume['name']})
def copy_image_to_volume(self, context, volume, image_service, image_id):
"""Creates volume from image.
This method only supports Glance image of VMDK disk format.
Uses flat vmdk file copy for "sparse" and "preallocated" disk types
Uses HttpNfc import API for "streamOptimized" disk types. This API
creates a backing VM that wraps the VMDK in the ESX/VC inventory.
:param context: context
:param volume: Volume object
:param image_service: Glance image service
:param image_id: Glance image id
"""
LOG.debug(_("Copy glance image: %s to create new volume.") % image_id)
# Verify glance image is vmdk disk format
metadata = image_service.show(context, image_id)
VMwareEsxVmdkDriver._validate_disk_format(metadata['disk_format'])
# Get disk_type for vmdk disk
disk_type = None
properties = metadata['properties']
if properties and 'vmware_disktype' in properties:
disk_type = properties['vmware_disktype']
if disk_type == 'streamOptimized':
self._fetch_stream_optimized_image(context, volume, image_service,
image_id, metadata['size'])
else:
self._fetch_flat_image(context, volume, image_service, image_id,
metadata['size'])
def copy_volume_to_image(self, context, volume, image_service, image_meta):
"""Creates glance image from volume.
Upload of only available volume is supported.
Upload of only available volume is supported. The uploaded glance image
has a vmdk disk type of "streamOptimized" that can only be downloaded
using the HttpNfc API.
Steps followed are:
1. Get the name of the vmdk file which the volume points to right now.
Can be a chain of snapshots, so we need to know the last in the
chain.
2. Call CopyVirtualDisk which coalesces the disk chain to form a
single vmdk, rather a .vmdk metadata file and a -flat.vmdk disk
data file.
3. Now upload the -flat.vmdk file to the image store.
4. Delete the coalesced .vmdk and -flat.vmdk created.
2. Use Nfc APIs to upload the contents of the vmdk file to glance.
"""
# if volume is attached raise exception
if volume['instance_uuid'] or volume['attached_host']:
msg = _("Upload to glance of attached volume is not supported.")
LOG.error(msg)
raise exception.InvalidVolume(msg)
# validate disk format is vmdk
LOG.debug(_("Copy Volume: %s to new image.") % volume['name'])
VMwareEsxVmdkDriver._validate_disk_format(image_meta['disk_format'])
# get backing vm of volume and its vmdk path
backing = self.volumeops.get_backing(volume['name'])
if not backing:
LOG.info(_("Backing not found, creating for volume: %s") %
volume['name'])
backing = self._create_backing_in_inventory(volume)
vmdk_file_path = self.volumeops.get_vmdk_path(backing)
datastore_name = volumeops.split_datastore_path(vmdk_file_path)[0]
# Create a copy of the vmdk into a tmp file
image_id = image_meta['id']
tmp_vmdk_file_path = '[%s] %s.vmdk' % (datastore_name, image_id)
host = self.volumeops.get_host(backing)
datacenter = self.volumeops.get_dc(host)
self.volumeops.copy_vmdk_file(datacenter, vmdk_file_path,
tmp_vmdk_file_path)
try:
# Upload image from copy of -flat.vmdk
timeout = self.configuration.vmware_image_transfer_timeout_secs
host_ip = self.configuration.vmware_host_ip
datacenter_name = self.volumeops.get_entity_name(datacenter)
cookies = self.session.vim.client.options.transport.cookiejar
flat_vmdk_copy = '%s-flat.vmdk' % image_id
# Upload image from vmdk
timeout = self.configuration.vmware_image_transfer_timeout_secs
host_ip = self.configuration.vmware_host_ip
vmware_images.upload_image(context, timeout, image_service,
image_meta['id'],
volume['project_id'], host=host_ip,
data_center_name=datacenter_name,
datastore_name=datastore_name,
cookies=cookies,
file_path=flat_vmdk_copy,
snapshot_name=image_meta['name'],
image_version=1)
LOG.info(_("Done copying volume %(vol)s to a new image %(img)s") %
{'vol': volume['name'], 'img': image_meta['name']})
finally:
# Delete the coalesced .vmdk and -flat.vmdk created
self.volumeops.delete_vmdk_file(tmp_vmdk_file_path, datacenter)
vmware_images.upload_image(context, timeout, image_service,
image_meta['id'],
volume['project_id'],
session=self.session,
host=host_ip,
vm=backing,
vmdk_file_path=vmdk_file_path,
vmdk_size=volume['size'] * units.GiB,
image_name=image_meta['name'],
image_version=1)
LOG.info(_("Done copying volume %(vol)s to a new image %(img)s") %
{'vol': volume['name'], 'img': image_meta['name']})
class VMwareVcVmdkDriver(VMwareEsxVmdkDriver):

View File

@ -30,7 +30,7 @@ LOG = logging.getLogger(__name__)
QUEUE_BUFFER_SIZE = 10
def start_transfer(context, timeout_secs, read_file_handle, data_size,
def start_transfer(context, timeout_secs, read_file_handle, max_data_size,
write_file_handle=None, image_service=None, image_id=None,
image_meta=None):
"""Start the data transfer from the reader to the writer.
@ -45,7 +45,7 @@ def start_transfer(context, timeout_secs, read_file_handle, data_size,
# The pipe that acts as an intermediate store of data for reader to write
# to and writer to grab from.
thread_safe_pipe = io_util.ThreadSafePipe(QUEUE_BUFFER_SIZE, data_size)
thread_safe_pipe = io_util.ThreadSafePipe(QUEUE_BUFFER_SIZE, max_data_size)
# The read thread. In case of glance it is the instance of the
# GlanceFileRead class. The glance client read returns an iterator
# and this class wraps that iterator to provide datachunks in calls
@ -91,11 +91,11 @@ def start_transfer(context, timeout_secs, read_file_handle, data_size,
write_file_handle.close()
def fetch_image(context, timeout_secs, image_service, image_id, **kwargs):
"""Download image from the glance image server."""
LOG.debug(_("Downloading image: %s from glance image server.") % image_id)
metadata = image_service.show(context, image_id)
file_size = int(metadata['size'])
def fetch_flat_image(context, timeout_secs, image_service, image_id, **kwargs):
"""Download flat image from the glance image server."""
LOG.debug(_("Downloading image: %s from glance image server as a flat vmdk"
" file.") % image_id)
file_size = int(kwargs.get('image_size'))
read_iter = image_service.download(context, image_id)
read_handle = rw_util.GlanceFileRead(read_iter)
write_handle = rw_util.VMwareHTTPWriteFile(kwargs.get('host'),
@ -109,25 +109,50 @@ def fetch_image(context, timeout_secs, image_service, image_id, **kwargs):
LOG.info(_("Downloaded image: %s from glance image server.") % image_id)
def fetch_stream_optimized_image(context, timeout_secs, image_service,
image_id, **kwargs):
"""Download stream optimized image from glance image server."""
LOG.debug(_("Downloading image: %s from glance image server using HttpNfc"
" import.") % image_id)
file_size = int(kwargs.get('image_size'))
read_iter = image_service.download(context, image_id)
read_handle = rw_util.GlanceFileRead(read_iter)
write_handle = rw_util.VMwareHTTPWriteVmdk(kwargs.get('session'),
kwargs.get('host'),
kwargs.get('resource_pool'),
kwargs.get('vm_folder'),
kwargs.get('vm_create_spec'),
file_size)
start_transfer(context, timeout_secs, read_handle, file_size,
write_file_handle=write_handle)
LOG.info(_("Downloaded image: %s from glance image server.") % image_id)
def upload_image(context, timeout_secs, image_service, image_id, owner_id,
**kwargs):
"""Upload the snapshot vm disk file to Glance image server."""
LOG.debug(_("Uploading image: %s to the Glance image server.") % image_id)
read_handle = rw_util.VMwareHTTPReadFile(kwargs.get('host'),
kwargs.get('data_center_name'),
kwargs.get('datastore_name'),
kwargs.get('cookies'),
kwargs.get('file_path'))
file_size = read_handle.get_size()
"""Upload the vm's disk file to Glance image server."""
LOG.debug(_("Uploading image: %s to the Glance image server using HttpNfc"
" export.") % image_id)
file_size = kwargs.get('vmdk_size')
read_handle = rw_util.VMwareHTTPReadVmdk(kwargs.get('session'),
kwargs.get('host'),
kwargs.get('vm'),
kwargs.get('vmdk_file_path'),
file_size)
# The properties and other fields that we need to set for the image.
# Important to set the 'size' to 0 here. Otherwise the glance client
# uses the volume size which may not be image size after upload since
# it is converted to a stream-optimized sparse disk
image_metadata = {'disk_format': 'vmdk',
'is_public': 'false',
'name': kwargs.get('snapshot_name'),
'name': kwargs.get('image_name'),
'status': 'active',
'container_format': 'bare',
'size': file_size,
'size': 0,
'properties': {'vmware_image_version':
kwargs.get('image_version'),
'vmware_disktype': 'streamOptimized',
'owner_id': owner_id}}
start_transfer(context, timeout_secs, read_handle, file_size,
image_service=image_service, image_id=image_id,

View File

@ -299,7 +299,8 @@ class VMwareVolumeOps(object):
controller_spec.device = controller_device
disk_device = cf.create('ns0:VirtualDisk')
disk_device.capacityInKB = int(size_kb)
# for very small disks allocate at least 1KB
disk_device.capacityInKB = max(1, int(size_kb))
disk_device.key = -101
disk_device.unitNumber = 0
disk_device.controllerKey = -100
@ -308,7 +309,7 @@ class VMwareVolumeOps(object):
disk_device_bkng.eagerlyScrub = True
elif disk_type == 'thin':
disk_device_bkng.thinProvisioned = True
disk_device_bkng.fileName = '[%s]' % ds_name
disk_device_bkng.fileName = ''
disk_device_bkng.diskMode = 'persistent'
disk_device.backing = disk_device_bkng
disk_spec = cf.create('ns0:VirtualDeviceConfigSpec')