Merge "VMware: Refactor the image transfer" into stable/mitaka
This commit is contained in:
commit
ceeec0a135
|
@ -69,12 +69,12 @@ class VMwareImagesTestCase(test.NoDBTestCase):
|
|||
side_effect=fake_read_handle),
|
||||
mock.patch.object(rw_handles, 'FileWriteHandle',
|
||||
side_effect=fake_write_handle),
|
||||
mock.patch.object(images, 'start_transfer'),
|
||||
mock.patch.object(images, 'image_transfer'),
|
||||
mock.patch.object(images.IMAGE_API, 'get',
|
||||
return_value=image_data),
|
||||
mock.patch.object(images.IMAGE_API, 'download',
|
||||
return_value=read_iter),
|
||||
) as (glance_read, http_write, start_transfer, image_show,
|
||||
) as (glance_read, http_write, image_transfer, image_show,
|
||||
image_download):
|
||||
images.fetch_image(context, instance,
|
||||
host, port, dc_name,
|
||||
|
@ -83,10 +83,8 @@ class VMwareImagesTestCase(test.NoDBTestCase):
|
|||
glance_read.assert_called_once_with(read_iter)
|
||||
http_write.assert_called_once_with(host, port, dc_name, ds_name, None,
|
||||
file_path, image_data['size'])
|
||||
start_transfer.assert_called_once_with(
|
||||
context, read_file_handle,
|
||||
image_data['size'],
|
||||
write_file_handle=write_file_handle)
|
||||
image_transfer.assert_called_once_with(read_file_handle,
|
||||
write_file_handle)
|
||||
image_download.assert_called_once_with(context, instance['image_ref'])
|
||||
image_show.assert_called_once_with(context, instance['image_ref'])
|
||||
|
||||
|
@ -118,13 +116,13 @@ class VMwareImagesTestCase(test.NoDBTestCase):
|
|||
with test.nested(
|
||||
mock.patch.object(images.IMAGE_API, 'get'),
|
||||
mock.patch.object(images.IMAGE_API, 'download'),
|
||||
mock.patch.object(images, 'start_transfer'),
|
||||
mock.patch.object(images, 'image_transfer'),
|
||||
mock.patch.object(images, '_build_shadow_vm_config_spec'),
|
||||
mock.patch.object(session, '_call_method'),
|
||||
mock.patch.object(vm_util, 'get_vmdk_info')
|
||||
) as (mock_image_api_get,
|
||||
mock_image_api_download,
|
||||
mock_start_transfer,
|
||||
mock_image_transfer,
|
||||
mock_build_shadow_vm_config_spec,
|
||||
mock_call_method,
|
||||
mock_get_vmdk_info):
|
||||
|
@ -171,8 +169,8 @@ class VMwareImagesTestCase(test.NoDBTestCase):
|
|||
|
||||
mock_tar_open.assert_called_once_with(mode='r|',
|
||||
fileobj=mock_read_handle)
|
||||
mock_start_transfer.assert_called_once_with(context,
|
||||
mock_read_handle, 512, write_file_handle=mock_write_handle)
|
||||
mock_image_transfer.assert_called_once_with(mock_read_handle,
|
||||
mock_write_handle)
|
||||
mock_get_vmdk_info.assert_called_once_with(
|
||||
session, mock.sentinel.vm_ref, 'fake-vm')
|
||||
mock_call_method.assert_called_once_with(
|
||||
|
@ -189,13 +187,13 @@ class VMwareImagesTestCase(test.NoDBTestCase):
|
|||
with test.nested(
|
||||
mock.patch.object(images.IMAGE_API, 'get'),
|
||||
mock.patch.object(images.IMAGE_API, 'download'),
|
||||
mock.patch.object(images, 'start_transfer'),
|
||||
mock.patch.object(images, 'image_transfer'),
|
||||
mock.patch.object(images, '_build_shadow_vm_config_spec'),
|
||||
mock.patch.object(session, '_call_method'),
|
||||
mock.patch.object(vm_util, 'get_vmdk_info')
|
||||
) as (mock_image_api_get,
|
||||
mock_image_api_download,
|
||||
mock_start_transfer,
|
||||
mock_image_transfer,
|
||||
mock_build_shadow_vm_config_spec,
|
||||
mock_call_method,
|
||||
mock_get_vmdk_info):
|
||||
|
@ -221,9 +219,8 @@ class VMwareImagesTestCase(test.NoDBTestCase):
|
|||
context, instance, session, 'fake-vm', 'fake-datastore',
|
||||
vm_folder_ref, res_pool_ref)
|
||||
|
||||
mock_start_transfer.assert_called_once_with(context,
|
||||
mock_read_handle, 512, write_file_handle=mock_write_handle)
|
||||
|
||||
mock_image_transfer.assert_called_once_with(mock_read_handle,
|
||||
mock_write_handle)
|
||||
mock_call_method.assert_called_once_with(
|
||||
session.vim, "UnregisterVM", mock.sentinel.vm_ref)
|
||||
mock_get_vmdk_info.assert_called_once_with(
|
||||
|
|
|
@ -1,33 +0,0 @@
|
|||
# Copyright (c) 2014 VMware, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import mock
|
||||
|
||||
from nova import exception
|
||||
from nova import test
|
||||
from nova.virt.vmwareapi import io_util
|
||||
|
||||
|
||||
@mock.patch.object(io_util, 'IMAGE_API')
|
||||
class GlanceWriteThreadTestCase(test.NoDBTestCase):
|
||||
|
||||
def test_start_image_update_service_exception(self, mocked):
|
||||
mocked.update.side_effect = exception.ImageNotAuthorized(
|
||||
image_id='image')
|
||||
write_thread = io_util.GlanceWriteThread(
|
||||
None, None, image_id=None)
|
||||
write_thread.start()
|
||||
self.assertRaises(exception.ImageNotAuthorized, write_thread.wait)
|
||||
write_thread.stop()
|
||||
write_thread.close()
|
|
@ -23,17 +23,17 @@ import tarfile
|
|||
from lxml import etree
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
from oslo_service import loopingcall
|
||||
from oslo_utils import strutils
|
||||
from oslo_utils import units
|
||||
from oslo_vmware import rw_handles
|
||||
import six
|
||||
|
||||
from nova import exception
|
||||
from nova.i18n import _, _LE, _LI
|
||||
from nova.i18n import _, _LI
|
||||
from nova import image
|
||||
from nova.objects import fields
|
||||
from nova.virt.vmwareapi import constants
|
||||
from nova.virt.vmwareapi import io_util
|
||||
from nova.virt.vmwareapi import vm_util
|
||||
|
||||
# NOTE(mdbooth): We use use_linked_clone below, but don't have to import it
|
||||
|
@ -47,6 +47,8 @@ LOG = logging.getLogger(__name__)
|
|||
IMAGE_API = image.API()
|
||||
|
||||
QUEUE_BUFFER_SIZE = 10
|
||||
NFC_LEASE_UPDATE_PERIOD = 60 # update NFC lease every 60sec.
|
||||
CHUNK_SIZE = 64 * units.Ki # default chunk size for image transfer
|
||||
|
||||
|
||||
class VMwareImage(object):
|
||||
|
@ -172,60 +174,22 @@ class VMwareImage(object):
|
|||
return cls(**props)
|
||||
|
||||
|
||||
def start_transfer(context, read_file_handle, data_size,
|
||||
write_file_handle=None, image_id=None, image_meta=None):
|
||||
"""Start the data transfer from the reader to the writer.
|
||||
Reader writes to the pipe and the writer reads from the pipe. This means
|
||||
that the total transfer time boils down to the slower of the read/write
|
||||
and not the addition of the two times.
|
||||
"""
|
||||
|
||||
if not image_meta:
|
||||
image_meta = {}
|
||||
|
||||
# The pipe that acts as an intermediate store of data for reader to write
|
||||
# to and writer to grab from.
|
||||
thread_safe_pipe = io_util.ThreadSafePipe(QUEUE_BUFFER_SIZE, data_size)
|
||||
# The read thread. In case of glance it is the instance of the
|
||||
# GlanceFileRead class. The glance client read returns an iterator
|
||||
# and this class wraps that iterator to provide datachunks in calls
|
||||
# to read.
|
||||
read_thread = io_util.IOThread(read_file_handle, thread_safe_pipe)
|
||||
|
||||
# In case of Glance - VMware transfer, we just need a handle to the
|
||||
# HTTP Connection that is to send transfer data to the VMware datastore.
|
||||
if write_file_handle:
|
||||
write_thread = io_util.IOThread(thread_safe_pipe, write_file_handle)
|
||||
# In case of VMware - Glance transfer, we relinquish VMware HTTP file read
|
||||
# handle to Glance Client instance, but to be sure of the transfer we need
|
||||
# to be sure of the status of the image on glance changing to active.
|
||||
# The GlanceWriteThread handles the same for us.
|
||||
elif image_id:
|
||||
write_thread = io_util.GlanceWriteThread(context, thread_safe_pipe,
|
||||
image_id, image_meta)
|
||||
# Start the read and write threads.
|
||||
read_event = read_thread.start()
|
||||
write_event = write_thread.start()
|
||||
def image_transfer(read_handle, write_handle):
|
||||
# write_handle could be an NFC lease, so we need to periodically
|
||||
# update its progress
|
||||
update_cb = getattr(write_handle, 'update_progress', lambda: None)
|
||||
updater = loopingcall.FixedIntervalLoopingCall(update_cb)
|
||||
try:
|
||||
# Wait on the read and write events to signal their end
|
||||
read_event.wait()
|
||||
write_event.wait()
|
||||
except Exception as exc:
|
||||
# In case of any of the reads or writes raising an exception,
|
||||
# stop the threads so that we un-necessarily don't keep the other one
|
||||
# waiting.
|
||||
read_thread.stop()
|
||||
write_thread.stop()
|
||||
|
||||
# Log and raise the exception.
|
||||
LOG.exception(_LE('Transfer data failed'))
|
||||
raise exception.NovaException(exc)
|
||||
updater.start(interval=NFC_LEASE_UPDATE_PERIOD)
|
||||
while True:
|
||||
data = read_handle.read(CHUNK_SIZE)
|
||||
if not data:
|
||||
break
|
||||
write_handle.write(data)
|
||||
finally:
|
||||
# No matter what, try closing the read and write handles, if it so
|
||||
# applies.
|
||||
read_file_handle.close()
|
||||
if write_file_handle:
|
||||
write_file_handle.close()
|
||||
updater.stop()
|
||||
read_handle.close()
|
||||
write_handle.close()
|
||||
|
||||
|
||||
def upload_iso_to_datastore(iso_path, instance, **kwargs):
|
||||
|
@ -270,8 +234,7 @@ def fetch_image(context, instance, host, port, dc_name, ds_name, file_path,
|
|||
read_file_handle = rw_handles.ImageReadHandle(read_iter)
|
||||
write_file_handle = rw_handles.FileWriteHandle(
|
||||
host, port, dc_name, ds_name, cookies, file_path, file_size)
|
||||
start_transfer(context, read_file_handle, file_size,
|
||||
write_file_handle=write_file_handle)
|
||||
image_transfer(read_file_handle, write_file_handle)
|
||||
LOG.debug("Downloaded image file data %(image_ref)s to "
|
||||
"%(upload_name)s on the data store "
|
||||
"%(data_store_name)s",
|
||||
|
@ -371,10 +334,7 @@ def fetch_image_stream_optimized(context, instance, session, vm_name,
|
|||
vm_folder_ref,
|
||||
vm_import_spec,
|
||||
file_size)
|
||||
start_transfer(context,
|
||||
read_handle,
|
||||
file_size,
|
||||
write_file_handle=write_handle)
|
||||
image_transfer(read_handle, write_handle)
|
||||
|
||||
imported_vm_ref = write_handle.get_imported_vm()
|
||||
|
||||
|
@ -439,11 +399,7 @@ def fetch_image_ova(context, instance, session, vm_name, ds_name,
|
|||
vm_folder_ref,
|
||||
vm_import_spec,
|
||||
file_size)
|
||||
start_transfer(context,
|
||||
extracted,
|
||||
file_size,
|
||||
write_file_handle=write_handle)
|
||||
extracted.close()
|
||||
image_transfer(extracted, write_handle)
|
||||
LOG.info(_LI("Downloaded OVA image file %(image_ref)s"),
|
||||
{'image_ref': instance.image_ref}, instance=instance)
|
||||
imported_vm_ref = write_handle.get_imported_vm()
|
||||
|
@ -487,13 +443,13 @@ def upload_image_stream_optimized(context, image_id, instance, session,
|
|||
'vmware_disktype': 'streamOptimized',
|
||||
'owner_id': instance.project_id}}
|
||||
|
||||
# Passing 0 as the file size since data size to be transferred cannot be
|
||||
# predetermined.
|
||||
start_transfer(context,
|
||||
read_handle,
|
||||
0,
|
||||
image_id=image_id,
|
||||
image_meta=image_metadata)
|
||||
updater = loopingcall.FixedIntervalLoopingCall(read_handle.update_progress)
|
||||
try:
|
||||
updater.start(interval=NFC_LEASE_UPDATE_PERIOD)
|
||||
IMAGE_API.update(context, image_id, image_metadata, data=read_handle)
|
||||
finally:
|
||||
updater.stop()
|
||||
read_handle.close()
|
||||
|
||||
LOG.debug("Uploaded image %s to the Glance image server", image_id,
|
||||
instance=instance)
|
||||
|
|
|
@ -1,195 +0,0 @@
|
|||
# Copyright (c) 2012 VMware, Inc.
|
||||
# Copyright (c) 2011 Citrix Systems, Inc.
|
||||
# Copyright 2011 OpenStack Foundation
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""
|
||||
Utility classes for defining the time saving transfer of data from the reader
|
||||
to the write using a LightQueue as a Pipe between the reader and the writer.
|
||||
"""
|
||||
|
||||
from eventlet import event
|
||||
from eventlet import greenthread
|
||||
from eventlet import queue
|
||||
from oslo_log import log as logging
|
||||
|
||||
from nova import exception
|
||||
from nova.i18n import _, _LE
|
||||
from nova import image
|
||||
from nova import utils
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
IMAGE_API = image.API()
|
||||
|
||||
IO_THREAD_SLEEP_TIME = .01
|
||||
GLANCE_POLL_INTERVAL = 5
|
||||
CHUNK_SIZE = 64 * 1024 # default chunk size for image transfer
|
||||
|
||||
|
||||
class ThreadSafePipe(queue.LightQueue):
|
||||
"""The pipe to hold the data which the reader writes to and the writer
|
||||
reads from.
|
||||
"""
|
||||
|
||||
def __init__(self, maxsize, transfer_size):
|
||||
queue.LightQueue.__init__(self, maxsize)
|
||||
self.transfer_size = transfer_size
|
||||
self.transferred = 0
|
||||
|
||||
def read(self, chunk_size):
|
||||
"""Read data from the pipe.
|
||||
|
||||
Chunksize if ignored for we have ensured
|
||||
that the data chunks written to the pipe by readers is the same as the
|
||||
chunks asked for by the Writer.
|
||||
"""
|
||||
if self.transfer_size == 0 or self.transferred < self.transfer_size:
|
||||
data_item = self.get()
|
||||
self.transferred += len(data_item)
|
||||
return data_item
|
||||
else:
|
||||
return ""
|
||||
|
||||
def write(self, data):
|
||||
"""Put a data item in the pipe."""
|
||||
self.put(data)
|
||||
|
||||
def seek(self, offset, whence=0):
|
||||
"""Set the file's current position at the offset."""
|
||||
pass
|
||||
|
||||
def tell(self):
|
||||
"""Get size of the file to be read."""
|
||||
return self.transfer_size
|
||||
|
||||
def close(self):
|
||||
"""A place-holder to maintain consistency."""
|
||||
pass
|
||||
|
||||
|
||||
class GlanceWriteThread(object):
|
||||
"""Ensures that image data is written to in the glance client and that
|
||||
it is in correct ('active')state.
|
||||
"""
|
||||
|
||||
def __init__(self, context, input, image_id,
|
||||
image_meta=None):
|
||||
if not image_meta:
|
||||
image_meta = {}
|
||||
|
||||
self.context = context
|
||||
self.input = input
|
||||
self.image_id = image_id
|
||||
self.image_meta = image_meta
|
||||
self._running = False
|
||||
|
||||
def start(self):
|
||||
self.done = event.Event()
|
||||
|
||||
def _inner():
|
||||
"""Function to do the image data transfer through an update
|
||||
and thereon checks if the state is 'active'.
|
||||
"""
|
||||
try:
|
||||
IMAGE_API.update(self.context,
|
||||
self.image_id,
|
||||
self.image_meta,
|
||||
data=self.input)
|
||||
self._running = True
|
||||
except exception.ImageNotAuthorized as exc:
|
||||
self.done.send_exception(exc)
|
||||
|
||||
while self._running:
|
||||
try:
|
||||
image_meta = IMAGE_API.get(self.context,
|
||||
self.image_id)
|
||||
image_status = image_meta.get("status")
|
||||
if image_status == "active":
|
||||
self.stop()
|
||||
self.done.send(True)
|
||||
# If the state is killed, then raise an exception.
|
||||
elif image_status == "killed":
|
||||
self.stop()
|
||||
msg = (_("Glance image %s is in killed state") %
|
||||
self.image_id)
|
||||
LOG.error(msg)
|
||||
self.done.send_exception(exception.NovaException(msg))
|
||||
elif image_status in ["saving", "queued"]:
|
||||
greenthread.sleep(GLANCE_POLL_INTERVAL)
|
||||
else:
|
||||
self.stop()
|
||||
msg = _("Glance image "
|
||||
"%(image_id)s is in unknown state "
|
||||
"- %(state)s") % {
|
||||
"image_id": self.image_id,
|
||||
"state": image_status}
|
||||
LOG.error(msg)
|
||||
self.done.send_exception(exception.NovaException(msg))
|
||||
except Exception as exc:
|
||||
self.stop()
|
||||
self.done.send_exception(exc)
|
||||
|
||||
utils.spawn(_inner)
|
||||
return self.done
|
||||
|
||||
def stop(self):
|
||||
self._running = False
|
||||
|
||||
def wait(self):
|
||||
return self.done.wait()
|
||||
|
||||
def close(self):
|
||||
pass
|
||||
|
||||
|
||||
class IOThread(object):
|
||||
"""Class that reads chunks from the input file and writes them to the
|
||||
output file till the transfer is completely done.
|
||||
"""
|
||||
|
||||
def __init__(self, input, output):
|
||||
self.input = input
|
||||
self.output = output
|
||||
self._running = False
|
||||
self.got_exception = False
|
||||
|
||||
def start(self):
|
||||
self.done = event.Event()
|
||||
|
||||
def _inner():
|
||||
"""Read data from the input and write the same to the output
|
||||
until the transfer completes.
|
||||
"""
|
||||
self._running = True
|
||||
while self._running:
|
||||
try:
|
||||
data = self.input.read(CHUNK_SIZE)
|
||||
if not data:
|
||||
self.stop()
|
||||
self.done.send(True)
|
||||
self.output.write(data)
|
||||
greenthread.sleep(IO_THREAD_SLEEP_TIME)
|
||||
except Exception as exc:
|
||||
self.stop()
|
||||
LOG.exception(_LE('Read/Write data failed'))
|
||||
self.done.send_exception(exc)
|
||||
|
||||
utils.spawn(_inner)
|
||||
return self.done
|
||||
|
||||
def stop(self):
|
||||
self._running = False
|
||||
|
||||
def wait(self):
|
||||
return self.done.wait()
|
Loading…
Reference in New Issue