VMware: Refactor the image transfer

The image transfer is unnecessary complicated and buggy. When
transferring streamOptimized images we have to update the progress of
the NFC lease to prevent timeouts.
This patch replaces the complex usage of blocking queues and threads with
a simple read+write loop. It has the same performance and the code is
much cleaner. The NFC lease is updated with the loopingcall utility.

Closes-Bug: #1546454
Closes-Bug: #1278690
Related-Bug: #1495429
Change-Id: I96e8e0682bcc642a2a5c4b7d2851812bef60d2ff
This commit is contained in:
Radoslav Gerganov 2016-02-17 10:35:59 +02:00 committed by Tracy Jones
parent 98161f0d4c
commit 2df83abaa0
4 changed files with 40 additions and 315 deletions

View File

@ -69,12 +69,12 @@ class VMwareImagesTestCase(test.NoDBTestCase):
side_effect=fake_read_handle),
mock.patch.object(rw_handles, 'FileWriteHandle',
side_effect=fake_write_handle),
mock.patch.object(images, 'start_transfer'),
mock.patch.object(images, 'image_transfer'),
mock.patch.object(images.IMAGE_API, 'get',
return_value=image_data),
mock.patch.object(images.IMAGE_API, 'download',
return_value=read_iter),
) as (glance_read, http_write, start_transfer, image_show,
) as (glance_read, http_write, image_transfer, image_show,
image_download):
images.fetch_image(context, instance,
host, port, dc_name,
@ -83,10 +83,8 @@ class VMwareImagesTestCase(test.NoDBTestCase):
glance_read.assert_called_once_with(read_iter)
http_write.assert_called_once_with(host, port, dc_name, ds_name, None,
file_path, image_data['size'])
start_transfer.assert_called_once_with(
context, read_file_handle,
image_data['size'],
write_file_handle=write_file_handle)
image_transfer.assert_called_once_with(read_file_handle,
write_file_handle)
image_download.assert_called_once_with(context, instance['image_ref'])
image_show.assert_called_once_with(context, instance['image_ref'])
@ -118,13 +116,13 @@ class VMwareImagesTestCase(test.NoDBTestCase):
with test.nested(
mock.patch.object(images.IMAGE_API, 'get'),
mock.patch.object(images.IMAGE_API, 'download'),
mock.patch.object(images, 'start_transfer'),
mock.patch.object(images, 'image_transfer'),
mock.patch.object(images, '_build_shadow_vm_config_spec'),
mock.patch.object(session, '_call_method'),
mock.patch.object(vm_util, 'get_vmdk_info')
) as (mock_image_api_get,
mock_image_api_download,
mock_start_transfer,
mock_image_transfer,
mock_build_shadow_vm_config_spec,
mock_call_method,
mock_get_vmdk_info):
@ -171,8 +169,8 @@ class VMwareImagesTestCase(test.NoDBTestCase):
mock_tar_open.assert_called_once_with(mode='r|',
fileobj=mock_read_handle)
mock_start_transfer.assert_called_once_with(context,
mock_read_handle, 512, write_file_handle=mock_write_handle)
mock_image_transfer.assert_called_once_with(mock_read_handle,
mock_write_handle)
mock_get_vmdk_info.assert_called_once_with(
session, mock.sentinel.vm_ref, 'fake-vm')
mock_call_method.assert_called_once_with(
@ -189,13 +187,13 @@ class VMwareImagesTestCase(test.NoDBTestCase):
with test.nested(
mock.patch.object(images.IMAGE_API, 'get'),
mock.patch.object(images.IMAGE_API, 'download'),
mock.patch.object(images, 'start_transfer'),
mock.patch.object(images, 'image_transfer'),
mock.patch.object(images, '_build_shadow_vm_config_spec'),
mock.patch.object(session, '_call_method'),
mock.patch.object(vm_util, 'get_vmdk_info')
) as (mock_image_api_get,
mock_image_api_download,
mock_start_transfer,
mock_image_transfer,
mock_build_shadow_vm_config_spec,
mock_call_method,
mock_get_vmdk_info):
@ -221,9 +219,8 @@ class VMwareImagesTestCase(test.NoDBTestCase):
context, instance, session, 'fake-vm', 'fake-datastore',
vm_folder_ref, res_pool_ref)
mock_start_transfer.assert_called_once_with(context,
mock_read_handle, 512, write_file_handle=mock_write_handle)
mock_image_transfer.assert_called_once_with(mock_read_handle,
mock_write_handle)
mock_call_method.assert_called_once_with(
session.vim, "UnregisterVM", mock.sentinel.vm_ref)
mock_get_vmdk_info.assert_called_once_with(

View File

@ -1,33 +0,0 @@
# Copyright (c) 2014 VMware, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from nova import exception
from nova import test
from nova.virt.vmwareapi import io_util
@mock.patch.object(io_util, 'IMAGE_API')
class GlanceWriteThreadTestCase(test.NoDBTestCase):
def test_start_image_update_service_exception(self, mocked):
mocked.update.side_effect = exception.ImageNotAuthorized(
image_id='image')
write_thread = io_util.GlanceWriteThread(
None, None, image_id=None)
write_thread.start()
self.assertRaises(exception.ImageNotAuthorized, write_thread.wait)
write_thread.stop()
write_thread.close()

View File

@ -23,17 +23,17 @@ import tarfile
from lxml import etree
from oslo_config import cfg
from oslo_log import log as logging
from oslo_service import loopingcall
from oslo_utils import strutils
from oslo_utils import units
from oslo_vmware import rw_handles
import six
from nova import exception
from nova.i18n import _, _LE, _LI
from nova.i18n import _, _LI
from nova import image
from nova.objects import fields
from nova.virt.vmwareapi import constants
from nova.virt.vmwareapi import io_util
from nova.virt.vmwareapi import vm_util
# NOTE(mdbooth): We use use_linked_clone below, but don't have to import it
@ -47,6 +47,8 @@ LOG = logging.getLogger(__name__)
IMAGE_API = image.API()
QUEUE_BUFFER_SIZE = 10
NFC_LEASE_UPDATE_PERIOD = 60 # update NFC lease every 60sec.
CHUNK_SIZE = 64 * units.Ki # default chunk size for image transfer
class VMwareImage(object):
@ -191,60 +193,22 @@ def get_vsphere_location(context, image_id):
return None
def start_transfer(context, read_file_handle, data_size,
write_file_handle=None, image_id=None, image_meta=None):
"""Start the data transfer from the reader to the writer.
Reader writes to the pipe and the writer reads from the pipe. This means
that the total transfer time boils down to the slower of the read/write
and not the addition of the two times.
"""
if not image_meta:
image_meta = {}
# The pipe that acts as an intermediate store of data for reader to write
# to and writer to grab from.
thread_safe_pipe = io_util.ThreadSafePipe(QUEUE_BUFFER_SIZE, data_size)
# The read thread. In case of glance it is the instance of the
# GlanceFileRead class. The glance client read returns an iterator
# and this class wraps that iterator to provide datachunks in calls
# to read.
read_thread = io_util.IOThread(read_file_handle, thread_safe_pipe)
# In case of Glance - VMware transfer, we just need a handle to the
# HTTP Connection that is to send transfer data to the VMware datastore.
if write_file_handle:
write_thread = io_util.IOThread(thread_safe_pipe, write_file_handle)
# In case of VMware - Glance transfer, we relinquish VMware HTTP file read
# handle to Glance Client instance, but to be sure of the transfer we need
# to be sure of the status of the image on glance changing to active.
# The GlanceWriteThread handles the same for us.
elif image_id:
write_thread = io_util.GlanceWriteThread(context, thread_safe_pipe,
image_id, image_meta)
# Start the read and write threads.
read_event = read_thread.start()
write_event = write_thread.start()
def image_transfer(read_handle, write_handle):
# write_handle could be an NFC lease, so we need to periodically
# update its progress
update_cb = getattr(write_handle, 'update_progress', lambda: None)
updater = loopingcall.FixedIntervalLoopingCall(update_cb)
try:
# Wait on the read and write events to signal their end
read_event.wait()
write_event.wait()
except Exception as exc:
# In case of any of the reads or writes raising an exception,
# stop the threads so that we un-necessarily don't keep the other one
# waiting.
read_thread.stop()
write_thread.stop()
# Log and raise the exception.
LOG.exception(_LE('Transfer data failed'))
raise exception.NovaException(exc)
updater.start(interval=NFC_LEASE_UPDATE_PERIOD)
while True:
data = read_handle.read(CHUNK_SIZE)
if not data:
break
write_handle.write(data)
finally:
# No matter what, try closing the read and write handles, if it so
# applies.
read_file_handle.close()
if write_file_handle:
write_file_handle.close()
updater.stop()
read_handle.close()
write_handle.close()
def upload_iso_to_datastore(iso_path, instance, **kwargs):
@ -289,8 +253,7 @@ def fetch_image(context, instance, host, port, dc_name, ds_name, file_path,
read_file_handle = rw_handles.ImageReadHandle(read_iter)
write_file_handle = rw_handles.FileWriteHandle(
host, port, dc_name, ds_name, cookies, file_path, file_size)
start_transfer(context, read_file_handle, file_size,
write_file_handle=write_file_handle)
image_transfer(read_file_handle, write_file_handle)
LOG.debug("Downloaded image file data %(image_ref)s to "
"%(upload_name)s on the data store "
"%(data_store_name)s",
@ -390,10 +353,7 @@ def fetch_image_stream_optimized(context, instance, session, vm_name,
vm_folder_ref,
vm_import_spec,
file_size)
start_transfer(context,
read_handle,
file_size,
write_file_handle=write_handle)
image_transfer(read_handle, write_handle)
imported_vm_ref = write_handle.get_imported_vm()
@ -458,11 +418,7 @@ def fetch_image_ova(context, instance, session, vm_name, ds_name,
vm_folder_ref,
vm_import_spec,
file_size)
start_transfer(context,
extracted,
file_size,
write_file_handle=write_handle)
extracted.close()
image_transfer(extracted, write_handle)
LOG.info(_LI("Downloaded OVA image file %(image_ref)s"),
{'image_ref': instance.image_ref}, instance=instance)
imported_vm_ref = write_handle.get_imported_vm()
@ -506,13 +462,13 @@ def upload_image_stream_optimized(context, image_id, instance, session,
'vmware_disktype': 'streamOptimized',
'owner_id': instance.project_id}}
# Passing 0 as the file size since data size to be transferred cannot be
# predetermined.
start_transfer(context,
read_handle,
0,
image_id=image_id,
image_meta=image_metadata)
updater = loopingcall.FixedIntervalLoopingCall(read_handle.update_progress)
try:
updater.start(interval=NFC_LEASE_UPDATE_PERIOD)
IMAGE_API.update(context, image_id, image_metadata, data=read_handle)
finally:
updater.stop()
read_handle.close()
LOG.debug("Uploaded image %s to the Glance image server", image_id,
instance=instance)

View File

@ -1,195 +0,0 @@
# Copyright (c) 2012 VMware, Inc.
# Copyright (c) 2011 Citrix Systems, Inc.
# Copyright 2011 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Utility classes for defining the time saving transfer of data from the reader
to the write using a LightQueue as a Pipe between the reader and the writer.
"""
from eventlet import event
from eventlet import greenthread
from eventlet import queue
from oslo_log import log as logging
from nova import exception
from nova.i18n import _, _LE
from nova import image
from nova import utils
LOG = logging.getLogger(__name__)
IMAGE_API = image.API()
IO_THREAD_SLEEP_TIME = .01
GLANCE_POLL_INTERVAL = 5
CHUNK_SIZE = 64 * 1024 # default chunk size for image transfer
class ThreadSafePipe(queue.LightQueue):
"""The pipe to hold the data which the reader writes to and the writer
reads from.
"""
def __init__(self, maxsize, transfer_size):
queue.LightQueue.__init__(self, maxsize)
self.transfer_size = transfer_size
self.transferred = 0
def read(self, chunk_size):
"""Read data from the pipe.
Chunksize if ignored for we have ensured
that the data chunks written to the pipe by readers is the same as the
chunks asked for by the Writer.
"""
if self.transfer_size == 0 or self.transferred < self.transfer_size:
data_item = self.get()
self.transferred += len(data_item)
return data_item
else:
return ""
def write(self, data):
"""Put a data item in the pipe."""
self.put(data)
def seek(self, offset, whence=0):
"""Set the file's current position at the offset."""
pass
def tell(self):
"""Get size of the file to be read."""
return self.transfer_size
def close(self):
"""A place-holder to maintain consistency."""
pass
class GlanceWriteThread(object):
"""Ensures that image data is written to in the glance client and that
it is in correct ('active')state.
"""
def __init__(self, context, input, image_id,
image_meta=None):
if not image_meta:
image_meta = {}
self.context = context
self.input = input
self.image_id = image_id
self.image_meta = image_meta
self._running = False
def start(self):
self.done = event.Event()
def _inner():
"""Function to do the image data transfer through an update
and thereon checks if the state is 'active'.
"""
try:
IMAGE_API.update(self.context,
self.image_id,
self.image_meta,
data=self.input)
self._running = True
except exception.ImageNotAuthorized as exc:
self.done.send_exception(exc)
while self._running:
try:
image_meta = IMAGE_API.get(self.context,
self.image_id)
image_status = image_meta.get("status")
if image_status == "active":
self.stop()
self.done.send(True)
# If the state is killed, then raise an exception.
elif image_status == "killed":
self.stop()
msg = (_("Glance image %s is in killed state") %
self.image_id)
LOG.error(msg)
self.done.send_exception(exception.NovaException(msg))
elif image_status in ["saving", "queued"]:
greenthread.sleep(GLANCE_POLL_INTERVAL)
else:
self.stop()
msg = _("Glance image "
"%(image_id)s is in unknown state "
"- %(state)s") % {
"image_id": self.image_id,
"state": image_status}
LOG.error(msg)
self.done.send_exception(exception.NovaException(msg))
except Exception as exc:
self.stop()
self.done.send_exception(exc)
utils.spawn(_inner)
return self.done
def stop(self):
self._running = False
def wait(self):
return self.done.wait()
def close(self):
pass
class IOThread(object):
"""Class that reads chunks from the input file and writes them to the
output file till the transfer is completely done.
"""
def __init__(self, input, output):
self.input = input
self.output = output
self._running = False
self.got_exception = False
def start(self):
self.done = event.Event()
def _inner():
"""Read data from the input and write the same to the output
until the transfer completes.
"""
self._running = True
while self._running:
try:
data = self.input.read(CHUNK_SIZE)
if not data:
self.stop()
self.done.send(True)
self.output.write(data)
greenthread.sleep(IO_THREAD_SLEEP_TIME)
except Exception as exc:
self.stop()
LOG.exception(_LE('Read/Write data failed'))
self.done.send_exception(exc)
utils.spawn(_inner)
return self.done
def stop(self):
self._running = False
def wait(self):
return self.done.wait()