VMware: Refactor the image transfer
The image transfer is unnecessary complicated and buggy. When transferring streamOptimized images we have to update the progress of the NFC lease to prevent timeouts. This patch replaces the complex usage of blocking queues and threads with a simple read+write loop. It has the same performance and the code is much cleaner. The NFC lease is updated with the loopingcall utility. Closes-Bug: #1546454 Closes-Bug: #1278690 Related-Bug: #1495429 Change-Id: I96e8e0682bcc642a2a5c4b7d2851812bef60d2ff
This commit is contained in:
parent
98161f0d4c
commit
2df83abaa0
|
@ -69,12 +69,12 @@ class VMwareImagesTestCase(test.NoDBTestCase):
|
|||
side_effect=fake_read_handle),
|
||||
mock.patch.object(rw_handles, 'FileWriteHandle',
|
||||
side_effect=fake_write_handle),
|
||||
mock.patch.object(images, 'start_transfer'),
|
||||
mock.patch.object(images, 'image_transfer'),
|
||||
mock.patch.object(images.IMAGE_API, 'get',
|
||||
return_value=image_data),
|
||||
mock.patch.object(images.IMAGE_API, 'download',
|
||||
return_value=read_iter),
|
||||
) as (glance_read, http_write, start_transfer, image_show,
|
||||
) as (glance_read, http_write, image_transfer, image_show,
|
||||
image_download):
|
||||
images.fetch_image(context, instance,
|
||||
host, port, dc_name,
|
||||
|
@ -83,10 +83,8 @@ class VMwareImagesTestCase(test.NoDBTestCase):
|
|||
glance_read.assert_called_once_with(read_iter)
|
||||
http_write.assert_called_once_with(host, port, dc_name, ds_name, None,
|
||||
file_path, image_data['size'])
|
||||
start_transfer.assert_called_once_with(
|
||||
context, read_file_handle,
|
||||
image_data['size'],
|
||||
write_file_handle=write_file_handle)
|
||||
image_transfer.assert_called_once_with(read_file_handle,
|
||||
write_file_handle)
|
||||
image_download.assert_called_once_with(context, instance['image_ref'])
|
||||
image_show.assert_called_once_with(context, instance['image_ref'])
|
||||
|
||||
|
@ -118,13 +116,13 @@ class VMwareImagesTestCase(test.NoDBTestCase):
|
|||
with test.nested(
|
||||
mock.patch.object(images.IMAGE_API, 'get'),
|
||||
mock.patch.object(images.IMAGE_API, 'download'),
|
||||
mock.patch.object(images, 'start_transfer'),
|
||||
mock.patch.object(images, 'image_transfer'),
|
||||
mock.patch.object(images, '_build_shadow_vm_config_spec'),
|
||||
mock.patch.object(session, '_call_method'),
|
||||
mock.patch.object(vm_util, 'get_vmdk_info')
|
||||
) as (mock_image_api_get,
|
||||
mock_image_api_download,
|
||||
mock_start_transfer,
|
||||
mock_image_transfer,
|
||||
mock_build_shadow_vm_config_spec,
|
||||
mock_call_method,
|
||||
mock_get_vmdk_info):
|
||||
|
@ -171,8 +169,8 @@ class VMwareImagesTestCase(test.NoDBTestCase):
|
|||
|
||||
mock_tar_open.assert_called_once_with(mode='r|',
|
||||
fileobj=mock_read_handle)
|
||||
mock_start_transfer.assert_called_once_with(context,
|
||||
mock_read_handle, 512, write_file_handle=mock_write_handle)
|
||||
mock_image_transfer.assert_called_once_with(mock_read_handle,
|
||||
mock_write_handle)
|
||||
mock_get_vmdk_info.assert_called_once_with(
|
||||
session, mock.sentinel.vm_ref, 'fake-vm')
|
||||
mock_call_method.assert_called_once_with(
|
||||
|
@ -189,13 +187,13 @@ class VMwareImagesTestCase(test.NoDBTestCase):
|
|||
with test.nested(
|
||||
mock.patch.object(images.IMAGE_API, 'get'),
|
||||
mock.patch.object(images.IMAGE_API, 'download'),
|
||||
mock.patch.object(images, 'start_transfer'),
|
||||
mock.patch.object(images, 'image_transfer'),
|
||||
mock.patch.object(images, '_build_shadow_vm_config_spec'),
|
||||
mock.patch.object(session, '_call_method'),
|
||||
mock.patch.object(vm_util, 'get_vmdk_info')
|
||||
) as (mock_image_api_get,
|
||||
mock_image_api_download,
|
||||
mock_start_transfer,
|
||||
mock_image_transfer,
|
||||
mock_build_shadow_vm_config_spec,
|
||||
mock_call_method,
|
||||
mock_get_vmdk_info):
|
||||
|
@ -221,9 +219,8 @@ class VMwareImagesTestCase(test.NoDBTestCase):
|
|||
context, instance, session, 'fake-vm', 'fake-datastore',
|
||||
vm_folder_ref, res_pool_ref)
|
||||
|
||||
mock_start_transfer.assert_called_once_with(context,
|
||||
mock_read_handle, 512, write_file_handle=mock_write_handle)
|
||||
|
||||
mock_image_transfer.assert_called_once_with(mock_read_handle,
|
||||
mock_write_handle)
|
||||
mock_call_method.assert_called_once_with(
|
||||
session.vim, "UnregisterVM", mock.sentinel.vm_ref)
|
||||
mock_get_vmdk_info.assert_called_once_with(
|
||||
|
|
|
@ -1,33 +0,0 @@
|
|||
# Copyright (c) 2014 VMware, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import mock
|
||||
|
||||
from nova import exception
|
||||
from nova import test
|
||||
from nova.virt.vmwareapi import io_util
|
||||
|
||||
|
||||
@mock.patch.object(io_util, 'IMAGE_API')
|
||||
class GlanceWriteThreadTestCase(test.NoDBTestCase):
|
||||
|
||||
def test_start_image_update_service_exception(self, mocked):
|
||||
mocked.update.side_effect = exception.ImageNotAuthorized(
|
||||
image_id='image')
|
||||
write_thread = io_util.GlanceWriteThread(
|
||||
None, None, image_id=None)
|
||||
write_thread.start()
|
||||
self.assertRaises(exception.ImageNotAuthorized, write_thread.wait)
|
||||
write_thread.stop()
|
||||
write_thread.close()
|
|
@ -23,17 +23,17 @@ import tarfile
|
|||
from lxml import etree
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
from oslo_service import loopingcall
|
||||
from oslo_utils import strutils
|
||||
from oslo_utils import units
|
||||
from oslo_vmware import rw_handles
|
||||
import six
|
||||
|
||||
from nova import exception
|
||||
from nova.i18n import _, _LE, _LI
|
||||
from nova.i18n import _, _LI
|
||||
from nova import image
|
||||
from nova.objects import fields
|
||||
from nova.virt.vmwareapi import constants
|
||||
from nova.virt.vmwareapi import io_util
|
||||
from nova.virt.vmwareapi import vm_util
|
||||
|
||||
# NOTE(mdbooth): We use use_linked_clone below, but don't have to import it
|
||||
|
@ -47,6 +47,8 @@ LOG = logging.getLogger(__name__)
|
|||
IMAGE_API = image.API()
|
||||
|
||||
QUEUE_BUFFER_SIZE = 10
|
||||
NFC_LEASE_UPDATE_PERIOD = 60 # update NFC lease every 60sec.
|
||||
CHUNK_SIZE = 64 * units.Ki # default chunk size for image transfer
|
||||
|
||||
|
||||
class VMwareImage(object):
|
||||
|
@ -191,60 +193,22 @@ def get_vsphere_location(context, image_id):
|
|||
return None
|
||||
|
||||
|
||||
def start_transfer(context, read_file_handle, data_size,
|
||||
write_file_handle=None, image_id=None, image_meta=None):
|
||||
"""Start the data transfer from the reader to the writer.
|
||||
Reader writes to the pipe and the writer reads from the pipe. This means
|
||||
that the total transfer time boils down to the slower of the read/write
|
||||
and not the addition of the two times.
|
||||
"""
|
||||
|
||||
if not image_meta:
|
||||
image_meta = {}
|
||||
|
||||
# The pipe that acts as an intermediate store of data for reader to write
|
||||
# to and writer to grab from.
|
||||
thread_safe_pipe = io_util.ThreadSafePipe(QUEUE_BUFFER_SIZE, data_size)
|
||||
# The read thread. In case of glance it is the instance of the
|
||||
# GlanceFileRead class. The glance client read returns an iterator
|
||||
# and this class wraps that iterator to provide datachunks in calls
|
||||
# to read.
|
||||
read_thread = io_util.IOThread(read_file_handle, thread_safe_pipe)
|
||||
|
||||
# In case of Glance - VMware transfer, we just need a handle to the
|
||||
# HTTP Connection that is to send transfer data to the VMware datastore.
|
||||
if write_file_handle:
|
||||
write_thread = io_util.IOThread(thread_safe_pipe, write_file_handle)
|
||||
# In case of VMware - Glance transfer, we relinquish VMware HTTP file read
|
||||
# handle to Glance Client instance, but to be sure of the transfer we need
|
||||
# to be sure of the status of the image on glance changing to active.
|
||||
# The GlanceWriteThread handles the same for us.
|
||||
elif image_id:
|
||||
write_thread = io_util.GlanceWriteThread(context, thread_safe_pipe,
|
||||
image_id, image_meta)
|
||||
# Start the read and write threads.
|
||||
read_event = read_thread.start()
|
||||
write_event = write_thread.start()
|
||||
def image_transfer(read_handle, write_handle):
|
||||
# write_handle could be an NFC lease, so we need to periodically
|
||||
# update its progress
|
||||
update_cb = getattr(write_handle, 'update_progress', lambda: None)
|
||||
updater = loopingcall.FixedIntervalLoopingCall(update_cb)
|
||||
try:
|
||||
# Wait on the read and write events to signal their end
|
||||
read_event.wait()
|
||||
write_event.wait()
|
||||
except Exception as exc:
|
||||
# In case of any of the reads or writes raising an exception,
|
||||
# stop the threads so that we un-necessarily don't keep the other one
|
||||
# waiting.
|
||||
read_thread.stop()
|
||||
write_thread.stop()
|
||||
|
||||
# Log and raise the exception.
|
||||
LOG.exception(_LE('Transfer data failed'))
|
||||
raise exception.NovaException(exc)
|
||||
updater.start(interval=NFC_LEASE_UPDATE_PERIOD)
|
||||
while True:
|
||||
data = read_handle.read(CHUNK_SIZE)
|
||||
if not data:
|
||||
break
|
||||
write_handle.write(data)
|
||||
finally:
|
||||
# No matter what, try closing the read and write handles, if it so
|
||||
# applies.
|
||||
read_file_handle.close()
|
||||
if write_file_handle:
|
||||
write_file_handle.close()
|
||||
updater.stop()
|
||||
read_handle.close()
|
||||
write_handle.close()
|
||||
|
||||
|
||||
def upload_iso_to_datastore(iso_path, instance, **kwargs):
|
||||
|
@ -289,8 +253,7 @@ def fetch_image(context, instance, host, port, dc_name, ds_name, file_path,
|
|||
read_file_handle = rw_handles.ImageReadHandle(read_iter)
|
||||
write_file_handle = rw_handles.FileWriteHandle(
|
||||
host, port, dc_name, ds_name, cookies, file_path, file_size)
|
||||
start_transfer(context, read_file_handle, file_size,
|
||||
write_file_handle=write_file_handle)
|
||||
image_transfer(read_file_handle, write_file_handle)
|
||||
LOG.debug("Downloaded image file data %(image_ref)s to "
|
||||
"%(upload_name)s on the data store "
|
||||
"%(data_store_name)s",
|
||||
|
@ -390,10 +353,7 @@ def fetch_image_stream_optimized(context, instance, session, vm_name,
|
|||
vm_folder_ref,
|
||||
vm_import_spec,
|
||||
file_size)
|
||||
start_transfer(context,
|
||||
read_handle,
|
||||
file_size,
|
||||
write_file_handle=write_handle)
|
||||
image_transfer(read_handle, write_handle)
|
||||
|
||||
imported_vm_ref = write_handle.get_imported_vm()
|
||||
|
||||
|
@ -458,11 +418,7 @@ def fetch_image_ova(context, instance, session, vm_name, ds_name,
|
|||
vm_folder_ref,
|
||||
vm_import_spec,
|
||||
file_size)
|
||||
start_transfer(context,
|
||||
extracted,
|
||||
file_size,
|
||||
write_file_handle=write_handle)
|
||||
extracted.close()
|
||||
image_transfer(extracted, write_handle)
|
||||
LOG.info(_LI("Downloaded OVA image file %(image_ref)s"),
|
||||
{'image_ref': instance.image_ref}, instance=instance)
|
||||
imported_vm_ref = write_handle.get_imported_vm()
|
||||
|
@ -506,13 +462,13 @@ def upload_image_stream_optimized(context, image_id, instance, session,
|
|||
'vmware_disktype': 'streamOptimized',
|
||||
'owner_id': instance.project_id}}
|
||||
|
||||
# Passing 0 as the file size since data size to be transferred cannot be
|
||||
# predetermined.
|
||||
start_transfer(context,
|
||||
read_handle,
|
||||
0,
|
||||
image_id=image_id,
|
||||
image_meta=image_metadata)
|
||||
updater = loopingcall.FixedIntervalLoopingCall(read_handle.update_progress)
|
||||
try:
|
||||
updater.start(interval=NFC_LEASE_UPDATE_PERIOD)
|
||||
IMAGE_API.update(context, image_id, image_metadata, data=read_handle)
|
||||
finally:
|
||||
updater.stop()
|
||||
read_handle.close()
|
||||
|
||||
LOG.debug("Uploaded image %s to the Glance image server", image_id,
|
||||
instance=instance)
|
||||
|
|
|
@ -1,195 +0,0 @@
|
|||
# Copyright (c) 2012 VMware, Inc.
|
||||
# Copyright (c) 2011 Citrix Systems, Inc.
|
||||
# Copyright 2011 OpenStack Foundation
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""
|
||||
Utility classes for defining the time saving transfer of data from the reader
|
||||
to the write using a LightQueue as a Pipe between the reader and the writer.
|
||||
"""
|
||||
|
||||
from eventlet import event
|
||||
from eventlet import greenthread
|
||||
from eventlet import queue
|
||||
from oslo_log import log as logging
|
||||
|
||||
from nova import exception
|
||||
from nova.i18n import _, _LE
|
||||
from nova import image
|
||||
from nova import utils
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
IMAGE_API = image.API()
|
||||
|
||||
IO_THREAD_SLEEP_TIME = .01
|
||||
GLANCE_POLL_INTERVAL = 5
|
||||
CHUNK_SIZE = 64 * 1024 # default chunk size for image transfer
|
||||
|
||||
|
||||
class ThreadSafePipe(queue.LightQueue):
|
||||
"""The pipe to hold the data which the reader writes to and the writer
|
||||
reads from.
|
||||
"""
|
||||
|
||||
def __init__(self, maxsize, transfer_size):
|
||||
queue.LightQueue.__init__(self, maxsize)
|
||||
self.transfer_size = transfer_size
|
||||
self.transferred = 0
|
||||
|
||||
def read(self, chunk_size):
|
||||
"""Read data from the pipe.
|
||||
|
||||
Chunksize if ignored for we have ensured
|
||||
that the data chunks written to the pipe by readers is the same as the
|
||||
chunks asked for by the Writer.
|
||||
"""
|
||||
if self.transfer_size == 0 or self.transferred < self.transfer_size:
|
||||
data_item = self.get()
|
||||
self.transferred += len(data_item)
|
||||
return data_item
|
||||
else:
|
||||
return ""
|
||||
|
||||
def write(self, data):
|
||||
"""Put a data item in the pipe."""
|
||||
self.put(data)
|
||||
|
||||
def seek(self, offset, whence=0):
|
||||
"""Set the file's current position at the offset."""
|
||||
pass
|
||||
|
||||
def tell(self):
|
||||
"""Get size of the file to be read."""
|
||||
return self.transfer_size
|
||||
|
||||
def close(self):
|
||||
"""A place-holder to maintain consistency."""
|
||||
pass
|
||||
|
||||
|
||||
class GlanceWriteThread(object):
|
||||
"""Ensures that image data is written to in the glance client and that
|
||||
it is in correct ('active')state.
|
||||
"""
|
||||
|
||||
def __init__(self, context, input, image_id,
|
||||
image_meta=None):
|
||||
if not image_meta:
|
||||
image_meta = {}
|
||||
|
||||
self.context = context
|
||||
self.input = input
|
||||
self.image_id = image_id
|
||||
self.image_meta = image_meta
|
||||
self._running = False
|
||||
|
||||
def start(self):
|
||||
self.done = event.Event()
|
||||
|
||||
def _inner():
|
||||
"""Function to do the image data transfer through an update
|
||||
and thereon checks if the state is 'active'.
|
||||
"""
|
||||
try:
|
||||
IMAGE_API.update(self.context,
|
||||
self.image_id,
|
||||
self.image_meta,
|
||||
data=self.input)
|
||||
self._running = True
|
||||
except exception.ImageNotAuthorized as exc:
|
||||
self.done.send_exception(exc)
|
||||
|
||||
while self._running:
|
||||
try:
|
||||
image_meta = IMAGE_API.get(self.context,
|
||||
self.image_id)
|
||||
image_status = image_meta.get("status")
|
||||
if image_status == "active":
|
||||
self.stop()
|
||||
self.done.send(True)
|
||||
# If the state is killed, then raise an exception.
|
||||
elif image_status == "killed":
|
||||
self.stop()
|
||||
msg = (_("Glance image %s is in killed state") %
|
||||
self.image_id)
|
||||
LOG.error(msg)
|
||||
self.done.send_exception(exception.NovaException(msg))
|
||||
elif image_status in ["saving", "queued"]:
|
||||
greenthread.sleep(GLANCE_POLL_INTERVAL)
|
||||
else:
|
||||
self.stop()
|
||||
msg = _("Glance image "
|
||||
"%(image_id)s is in unknown state "
|
||||
"- %(state)s") % {
|
||||
"image_id": self.image_id,
|
||||
"state": image_status}
|
||||
LOG.error(msg)
|
||||
self.done.send_exception(exception.NovaException(msg))
|
||||
except Exception as exc:
|
||||
self.stop()
|
||||
self.done.send_exception(exc)
|
||||
|
||||
utils.spawn(_inner)
|
||||
return self.done
|
||||
|
||||
def stop(self):
|
||||
self._running = False
|
||||
|
||||
def wait(self):
|
||||
return self.done.wait()
|
||||
|
||||
def close(self):
|
||||
pass
|
||||
|
||||
|
||||
class IOThread(object):
|
||||
"""Class that reads chunks from the input file and writes them to the
|
||||
output file till the transfer is completely done.
|
||||
"""
|
||||
|
||||
def __init__(self, input, output):
|
||||
self.input = input
|
||||
self.output = output
|
||||
self._running = False
|
||||
self.got_exception = False
|
||||
|
||||
def start(self):
|
||||
self.done = event.Event()
|
||||
|
||||
def _inner():
|
||||
"""Read data from the input and write the same to the output
|
||||
until the transfer completes.
|
||||
"""
|
||||
self._running = True
|
||||
while self._running:
|
||||
try:
|
||||
data = self.input.read(CHUNK_SIZE)
|
||||
if not data:
|
||||
self.stop()
|
||||
self.done.send(True)
|
||||
self.output.write(data)
|
||||
greenthread.sleep(IO_THREAD_SLEEP_TIME)
|
||||
except Exception as exc:
|
||||
self.stop()
|
||||
LOG.exception(_LE('Read/Write data failed'))
|
||||
self.done.send_exception(exc)
|
||||
|
||||
utils.spawn(_inner)
|
||||
return self.done
|
||||
|
||||
def stop(self):
|
||||
self._running = False
|
||||
|
||||
def wait(self):
|
||||
return self.done.wait()
|
Loading…
Reference in New Issue