Refactor the image transfer
The image transfer is unnecessary complicated and buggy. This patch replaces the complex usage of blocking queues and threads with a simple read+write loop. It has the same performance and the code is much cleaner. The NFC lease is updated with the loopingcall utility. Change-Id: I2b04173cd23c59162056360c03d419efbce77ba1 Closes-Bug: #1384840
This commit is contained in:
parent
7b04a56d7f
commit
2f9af244de
|
@ -17,16 +17,14 @@
|
||||||
Functions and classes for image transfer between ESX/VC & image service.
|
Functions and classes for image transfer between ESX/VC & image service.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import errno
|
|
||||||
import logging
|
import logging
|
||||||
import tarfile
|
import tarfile
|
||||||
|
|
||||||
from eventlet import event
|
|
||||||
from eventlet import greenthread
|
|
||||||
from eventlet import queue
|
|
||||||
from eventlet import timeout
|
from eventlet import timeout
|
||||||
|
|
||||||
|
from oslo_utils import units
|
||||||
from oslo_vmware._i18n import _
|
from oslo_vmware._i18n import _
|
||||||
|
from oslo_vmware.common import loopingcall
|
||||||
from oslo_vmware import constants
|
from oslo_vmware import constants
|
||||||
from oslo_vmware import exceptions
|
from oslo_vmware import exceptions
|
||||||
from oslo_vmware import image_util
|
from oslo_vmware import image_util
|
||||||
|
@ -37,363 +35,40 @@ from oslo_vmware import vim_util
|
||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
IMAGE_SERVICE_POLL_INTERVAL = 5
|
NFC_LEASE_UPDATE_PERIOD = 60 # update NFC lease every 60sec.
|
||||||
FILE_READ_WRITE_TASK_SLEEP_TIME = 0.01
|
CHUNK_SIZE = 64 * units.Ki # default chunk size for image transfer
|
||||||
BLOCKING_QUEUE_SIZE = 10
|
|
||||||
|
|
||||||
|
|
||||||
class BlockingQueue(queue.LightQueue):
|
def _start_transfer(read_handle, write_handle, timeout_secs):
|
||||||
"""Producer-Consumer queue to share data between reader/writer threads."""
|
# write_handle could be an NFC lease, so we need to periodically
|
||||||
|
# update its progress
|
||||||
def __init__(self, max_size, max_transfer_size):
|
update_cb = getattr(write_handle, 'update_progress', lambda: None)
|
||||||
"""Initializes the queue with the given parameters.
|
updater = loopingcall.FixedIntervalLoopingCall(update_cb)
|
||||||
|
|
||||||
:param max_size: maximum queue size; if max_size is less than zero or
|
|
||||||
None, the queue size is infinite.
|
|
||||||
:param max_transfer_size: maximum amount of data that can be
|
|
||||||
_transferred using this queue
|
|
||||||
"""
|
|
||||||
queue.LightQueue.__init__(self, max_size)
|
|
||||||
self._max_transfer_size = max_transfer_size
|
|
||||||
self._transferred = 0
|
|
||||||
|
|
||||||
def read(self, chunk_size):
|
|
||||||
"""Read data from the queue.
|
|
||||||
|
|
||||||
This method blocks until data is available. The input chunk size is
|
|
||||||
ignored since we have ensured that the data chunks written to the pipe
|
|
||||||
by the image reader thread is the same as the chunks asked for by the
|
|
||||||
image writer thread.
|
|
||||||
"""
|
|
||||||
if (self._max_transfer_size is 0 or
|
|
||||||
self._transferred < self._max_transfer_size):
|
|
||||||
data_item = self.get()
|
|
||||||
self._transferred += len(data_item)
|
|
||||||
return data_item
|
|
||||||
else:
|
|
||||||
LOG.debug("Completed transfer of size %s.", self._transferred)
|
|
||||||
return ""
|
|
||||||
|
|
||||||
def write(self, data):
|
|
||||||
"""Write data into the queue.
|
|
||||||
|
|
||||||
:param data: data to be written
|
|
||||||
"""
|
|
||||||
self.put(data)
|
|
||||||
|
|
||||||
# Below methods are provided in order to enable treating the queue
|
|
||||||
# as a file handle.
|
|
||||||
|
|
||||||
def seek(self, offset, whence=0):
|
|
||||||
"""Set the file's current position at the offset.
|
|
||||||
|
|
||||||
This method throws IOError since seek cannot be supported for a pipe.
|
|
||||||
"""
|
|
||||||
raise IOError(errno.ESPIPE, "Illegal seek")
|
|
||||||
|
|
||||||
def tell(self):
|
|
||||||
"""Get the current file position."""
|
|
||||||
return self._transferred
|
|
||||||
|
|
||||||
def close(self):
|
|
||||||
pass
|
|
||||||
|
|
||||||
def __str__(self):
|
|
||||||
return "blocking queue"
|
|
||||||
|
|
||||||
|
|
||||||
class ImageWriter(object):
|
|
||||||
"""Class to write the image to the image service from an input file."""
|
|
||||||
|
|
||||||
def __init__(self, context, input_file, image_service, image_id,
|
|
||||||
image_meta=None):
|
|
||||||
"""Initializes the image writer instance with given parameters.
|
|
||||||
|
|
||||||
:param context: write context needed by the image service
|
|
||||||
:param input_file: file to read the image data from
|
|
||||||
:param image_service: handle to image service
|
|
||||||
:param image_id: ID of the image in the image service
|
|
||||||
:param image_meta: image meta-data
|
|
||||||
"""
|
|
||||||
if not image_meta:
|
|
||||||
image_meta = {}
|
|
||||||
|
|
||||||
self._context = context
|
|
||||||
self._input_file = input_file
|
|
||||||
self._image_service = image_service
|
|
||||||
self._image_id = image_id
|
|
||||||
self._image_meta = image_meta
|
|
||||||
self._running = False
|
|
||||||
|
|
||||||
def start(self):
|
|
||||||
"""Start the image write task.
|
|
||||||
|
|
||||||
:returns: the event indicating the status of the write task
|
|
||||||
"""
|
|
||||||
self._done = event.Event()
|
|
||||||
|
|
||||||
def _inner():
|
|
||||||
"""Task performing the image write operation.
|
|
||||||
|
|
||||||
This method performs image data transfer through an update call.
|
|
||||||
After the update, it waits until the image state becomes
|
|
||||||
'active', 'killed' or unknown. If the final state is not 'active'
|
|
||||||
an instance of ImageTransferException is thrown.
|
|
||||||
|
|
||||||
:raises: ImageTransferException
|
|
||||||
"""
|
|
||||||
LOG.debug("Calling image service update on image: %(image)s "
|
|
||||||
"with meta: %(meta)s",
|
|
||||||
{'image': self._image_id,
|
|
||||||
'meta': self._image_meta})
|
|
||||||
|
|
||||||
try:
|
|
||||||
self._image_service.update(self._context,
|
|
||||||
self._image_id,
|
|
||||||
self._image_meta,
|
|
||||||
data=self._input_file)
|
|
||||||
self._running = True
|
|
||||||
while self._running:
|
|
||||||
LOG.debug("Retrieving status of image: %s.",
|
|
||||||
self._image_id)
|
|
||||||
image_meta = self._image_service.show(self._context,
|
|
||||||
self._image_id)
|
|
||||||
image_status = image_meta.get('status')
|
|
||||||
if image_status == 'active':
|
|
||||||
self.stop()
|
|
||||||
LOG.debug("Image: %s is now active.",
|
|
||||||
self._image_id)
|
|
||||||
self._done.send(True)
|
|
||||||
elif image_status == 'killed':
|
|
||||||
self.stop()
|
|
||||||
excep_msg = (_("Image: %s is in killed state.") %
|
|
||||||
self._image_id)
|
|
||||||
LOG.error(excep_msg)
|
|
||||||
excep = exceptions.ImageTransferException(excep_msg)
|
|
||||||
self._done.send_exception(excep)
|
|
||||||
elif image_status in ['saving', 'queued']:
|
|
||||||
LOG.debug("Image: %(image)s is in %(state)s state; "
|
|
||||||
"sleeping for %(sleep)d seconds.",
|
|
||||||
{'image': self._image_id,
|
|
||||||
'state': image_status,
|
|
||||||
'sleep': IMAGE_SERVICE_POLL_INTERVAL})
|
|
||||||
greenthread.sleep(IMAGE_SERVICE_POLL_INTERVAL)
|
|
||||||
else:
|
|
||||||
self.stop()
|
|
||||||
excep_msg = (_("Image: %(image)s is in unknown "
|
|
||||||
"state: %(state)s.") %
|
|
||||||
{'image': self._image_id,
|
|
||||||
'state': image_status})
|
|
||||||
LOG.error(excep_msg)
|
|
||||||
excep = exceptions.ImageTransferException(excep_msg)
|
|
||||||
self._done.send_exception(excep)
|
|
||||||
except Exception as excep:
|
|
||||||
self.stop()
|
|
||||||
excep_msg = (_("Error occurred while writing image: %s") %
|
|
||||||
self._image_id)
|
|
||||||
LOG.exception(excep_msg)
|
|
||||||
excep = exceptions.ImageTransferException(excep_msg, excep)
|
|
||||||
self._done.send_exception(excep)
|
|
||||||
|
|
||||||
LOG.debug("Starting image write task for image: %(image)s with"
|
|
||||||
" source: %(source)s.",
|
|
||||||
{'source': self._input_file,
|
|
||||||
'image': self._image_id})
|
|
||||||
greenthread.spawn(_inner)
|
|
||||||
return self._done
|
|
||||||
|
|
||||||
def stop(self):
|
|
||||||
"""Stop the image writing task."""
|
|
||||||
LOG.debug("Stopping the writing task for image: %s.",
|
|
||||||
self._image_id)
|
|
||||||
self._running = False
|
|
||||||
|
|
||||||
def wait(self):
|
|
||||||
"""Wait for the image writer task to complete.
|
|
||||||
|
|
||||||
This method returns True if the writer thread completes successfully.
|
|
||||||
In case of error, it raises ImageTransferException.
|
|
||||||
|
|
||||||
:raises ImageTransferException
|
|
||||||
"""
|
|
||||||
return self._done.wait()
|
|
||||||
|
|
||||||
def close(self):
|
|
||||||
"""This is a NOP."""
|
|
||||||
pass
|
|
||||||
|
|
||||||
def __str__(self):
|
|
||||||
string = "Image Writer <source = %s, dest = %s>" % (self._input_file,
|
|
||||||
self._image_id)
|
|
||||||
return string
|
|
||||||
|
|
||||||
|
|
||||||
class FileReadWriteTask(object):
|
|
||||||
"""Task which reads data from the input file and writes to the output file.
|
|
||||||
|
|
||||||
This class defines the task which copies the given input file to the given
|
|
||||||
output file. The copy operation involves reading chunks of data from the
|
|
||||||
input file and writing the same to the output file.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self, input_file, output_file):
|
|
||||||
"""Initializes the read-write task with the given input parameters.
|
|
||||||
|
|
||||||
:param input_file: the input file handle
|
|
||||||
:param output_file: the output file handle
|
|
||||||
"""
|
|
||||||
self._input_file = input_file
|
|
||||||
self._output_file = output_file
|
|
||||||
self._running = False
|
|
||||||
|
|
||||||
def start(self):
|
|
||||||
"""Start the file read - file write task.
|
|
||||||
|
|
||||||
:returns: the event indicating the status of the read-write task
|
|
||||||
"""
|
|
||||||
self._done = event.Event()
|
|
||||||
|
|
||||||
def _inner():
|
|
||||||
"""Task performing the file read-write operation."""
|
|
||||||
self._running = True
|
|
||||||
while self._running:
|
|
||||||
try:
|
|
||||||
data = self._input_file.read(rw_handles.READ_CHUNKSIZE)
|
|
||||||
if not data:
|
|
||||||
LOG.debug("File read-write task is done.")
|
|
||||||
self.stop()
|
|
||||||
self._done.send(True)
|
|
||||||
self._output_file.write(data)
|
|
||||||
|
|
||||||
# update lease progress if applicable
|
|
||||||
if hasattr(self._input_file, "update_progress"):
|
|
||||||
self._input_file.update_progress()
|
|
||||||
if hasattr(self._output_file, "update_progress"):
|
|
||||||
self._output_file.update_progress()
|
|
||||||
|
|
||||||
greenthread.sleep(FILE_READ_WRITE_TASK_SLEEP_TIME)
|
|
||||||
except Exception as excep:
|
|
||||||
self.stop()
|
|
||||||
excep_msg = _("Error occurred during file read-write "
|
|
||||||
"task.")
|
|
||||||
LOG.exception(excep_msg)
|
|
||||||
excep = exceptions.ImageTransferException(excep_msg, excep)
|
|
||||||
self._done.send_exception(excep)
|
|
||||||
|
|
||||||
LOG.debug("Starting file read-write task with source: %(source)s "
|
|
||||||
"and destination: %(dest)s.",
|
|
||||||
{'source': self._input_file,
|
|
||||||
'dest': self._output_file})
|
|
||||||
greenthread.spawn(_inner)
|
|
||||||
return self._done
|
|
||||||
|
|
||||||
def stop(self):
|
|
||||||
"""Stop the read-write task."""
|
|
||||||
LOG.debug("Stopping the file read-write task.")
|
|
||||||
self._running = False
|
|
||||||
|
|
||||||
def wait(self):
|
|
||||||
"""Wait for the file read-write task to complete.
|
|
||||||
|
|
||||||
This method returns True if the read-write thread completes
|
|
||||||
successfully. In case of error, it raises ImageTransferException.
|
|
||||||
|
|
||||||
:raises: ImageTransferException
|
|
||||||
"""
|
|
||||||
return self._done.wait()
|
|
||||||
|
|
||||||
def __str__(self):
|
|
||||||
string = ("File Read-Write Task <source = %s, dest = %s>" %
|
|
||||||
(self._input_file, self._output_file))
|
|
||||||
return string
|
|
||||||
|
|
||||||
|
|
||||||
# Functions to perform image transfer between VMware servers and image service.
|
|
||||||
|
|
||||||
|
|
||||||
def _start_transfer(context, timeout_secs, read_file_handle, max_data_size,
|
|
||||||
write_file_handle=None, image_service=None, image_id=None,
|
|
||||||
image_meta=None):
|
|
||||||
"""Start the image transfer.
|
|
||||||
|
|
||||||
The image reader reads the data from the image source and writes to the
|
|
||||||
blocking queue. The image source is always a file handle (VmdkReadHandle
|
|
||||||
or ImageReadHandle); therefore, a FileReadWriteTask is created for this
|
|
||||||
transfer. The image writer reads the data from the blocking queue and
|
|
||||||
writes it to the image destination. The image destination is either a
|
|
||||||
file or VMDK in VMware datastore or an image in the image service.
|
|
||||||
|
|
||||||
If the destination is a file or VMDK in VMware datastore, the method
|
|
||||||
creates a FileReadWriteTask which reads from the blocking queue and
|
|
||||||
writes to either FileWriteHandle or VmdkWriteHandle. In the case of
|
|
||||||
image service as the destination, an instance of ImageWriter task is
|
|
||||||
created which reads from the blocking queue and writes to the image
|
|
||||||
service.
|
|
||||||
|
|
||||||
:param context: write context needed for the image service
|
|
||||||
:param timeout_secs: time in seconds to wait for the transfer to complete
|
|
||||||
:param read_file_handle: handle to read data from
|
|
||||||
:param max_data_size: maximum transfer size
|
|
||||||
:param write_file_handle: handle to write data to; if this is None, then
|
|
||||||
param image_service and param image_id should
|
|
||||||
be set.
|
|
||||||
:param image_service: image service handle
|
|
||||||
:param image_id: ID of the image in the image service
|
|
||||||
:param image_meta: image meta-data
|
|
||||||
:raises: ImageTransferException, ValueError
|
|
||||||
"""
|
|
||||||
|
|
||||||
# Create the blocking queue
|
|
||||||
blocking_queue = BlockingQueue(BLOCKING_QUEUE_SIZE, max_data_size)
|
|
||||||
|
|
||||||
# Create the image reader
|
|
||||||
reader = FileReadWriteTask(read_file_handle, blocking_queue)
|
|
||||||
|
|
||||||
# Create the image writer
|
|
||||||
if write_file_handle:
|
|
||||||
# File or VMDK in VMware datastore is the image destination
|
|
||||||
writer = FileReadWriteTask(blocking_queue, write_file_handle)
|
|
||||||
elif image_service and image_id:
|
|
||||||
# Image service image is the destination
|
|
||||||
writer = ImageWriter(context,
|
|
||||||
blocking_queue,
|
|
||||||
image_service,
|
|
||||||
image_id,
|
|
||||||
image_meta)
|
|
||||||
else:
|
|
||||||
excep_msg = _("No image destination given.")
|
|
||||||
LOG.error(excep_msg)
|
|
||||||
raise ValueError(excep_msg)
|
|
||||||
|
|
||||||
# Start the reader and writer
|
|
||||||
LOG.debug("Starting image transfer with reader: %(reader)s and writer: "
|
|
||||||
"%(writer)s",
|
|
||||||
{'reader': reader,
|
|
||||||
'writer': writer})
|
|
||||||
reader.start()
|
|
||||||
writer.start()
|
|
||||||
timer = timeout.Timeout(timeout_secs)
|
timer = timeout.Timeout(timeout_secs)
|
||||||
try:
|
try:
|
||||||
# Wait for the reader and writer to complete
|
updater.start(interval=NFC_LEASE_UPDATE_PERIOD)
|
||||||
reader.wait()
|
while True:
|
||||||
writer.wait()
|
data = read_handle.read(CHUNK_SIZE)
|
||||||
except (timeout.Timeout, exceptions.ImageTransferException) as excep:
|
if not data:
|
||||||
excep_msg = (_("Error occurred during image transfer with reader: "
|
break
|
||||||
"%(reader)s and writer: %(writer)s") %
|
write_handle.write(data)
|
||||||
{'reader': reader,
|
except timeout.Timeout as excep:
|
||||||
'writer': writer})
|
msg = (_('Timeout, read_handle: "%(src)s", write_handle: "%(dest)s"') %
|
||||||
LOG.exception(excep_msg)
|
{'src': read_handle,
|
||||||
reader.stop()
|
'dest': write_handle})
|
||||||
writer.stop()
|
LOG.exception(msg)
|
||||||
|
raise exceptions.ImageTransferException(msg, excep)
|
||||||
if isinstance(excep, exceptions.ImageTransferException):
|
except Exception as excep:
|
||||||
raise
|
msg = (_('Error, read_handle: "%(src)s", write_handle: "%(dest)s"') %
|
||||||
raise exceptions.ImageTransferException(excep_msg, excep)
|
{'src': read_handle,
|
||||||
|
'dest': write_handle})
|
||||||
|
LOG.exception(msg)
|
||||||
|
raise exceptions.ImageTransferException(msg, excep)
|
||||||
finally:
|
finally:
|
||||||
timer.cancel()
|
timer.cancel()
|
||||||
read_file_handle.close()
|
updater.stop()
|
||||||
if write_file_handle:
|
read_handle.close()
|
||||||
write_file_handle.close()
|
write_handle.close()
|
||||||
|
|
||||||
|
|
||||||
def download_image(image, image_meta, session, datastore, rel_path,
|
def download_image(image, image_meta, session, datastore, rel_path,
|
||||||
|
@ -427,8 +102,7 @@ def download_image(image, image_meta, session, datastore, rel_path,
|
||||||
conn.write = conn.send
|
conn.write = conn.send
|
||||||
|
|
||||||
read_handle = rw_handles.ImageReadHandle(image)
|
read_handle = rw_handles.ImageReadHandle(image)
|
||||||
_start_transfer(None, timeout_secs, read_handle, image_size,
|
_start_transfer(read_handle, conn, timeout_secs)
|
||||||
write_file_handle=conn)
|
|
||||||
|
|
||||||
|
|
||||||
def download_flat_image(context, timeout_secs, image_service, image_id,
|
def download_flat_image(context, timeout_secs, image_service, image_id,
|
||||||
|
@ -458,11 +132,7 @@ def download_flat_image(context, timeout_secs, image_service, image_id,
|
||||||
kwargs.get('file_path'),
|
kwargs.get('file_path'),
|
||||||
file_size,
|
file_size,
|
||||||
cacerts=kwargs.get('cacerts'))
|
cacerts=kwargs.get('cacerts'))
|
||||||
_start_transfer(context,
|
_start_transfer(read_handle, write_handle, timeout_secs)
|
||||||
timeout_secs,
|
|
||||||
read_handle,
|
|
||||||
file_size,
|
|
||||||
write_file_handle=write_handle)
|
|
||||||
LOG.debug("Downloaded image: %s from image service as a flat file.",
|
LOG.debug("Downloaded image: %s from image service as a flat file.",
|
||||||
image_id)
|
image_id)
|
||||||
|
|
||||||
|
@ -490,11 +160,7 @@ def download_stream_optimized_data(context, timeout_secs, read_handle,
|
||||||
kwargs.get('vm_folder'),
|
kwargs.get('vm_folder'),
|
||||||
kwargs.get('vm_import_spec'),
|
kwargs.get('vm_import_spec'),
|
||||||
file_size)
|
file_size)
|
||||||
_start_transfer(context,
|
_start_transfer(read_handle, write_handle, timeout_secs)
|
||||||
timeout_secs,
|
|
||||||
read_handle,
|
|
||||||
file_size,
|
|
||||||
write_file_handle=write_handle)
|
|
||||||
return write_handle.get_imported_vm()
|
return write_handle.get_imported_vm()
|
||||||
|
|
||||||
|
|
||||||
|
@ -578,8 +244,7 @@ def copy_stream_optimized_disk(
|
||||||
kwargs.get('vm'),
|
kwargs.get('vm'),
|
||||||
kwargs.get('vmdk_file_path'),
|
kwargs.get('vmdk_file_path'),
|
||||||
file_size)
|
file_size)
|
||||||
_start_transfer(context, timeout_secs, read_handle, file_size,
|
_start_transfer(read_handle, write_handle, timeout_secs)
|
||||||
write_file_handle=write_handle)
|
|
||||||
LOG.debug("Downloaded virtual disk: %s.", vmdk_file_path)
|
LOG.debug("Downloaded virtual disk: %s.", vmdk_file_path)
|
||||||
|
|
||||||
|
|
||||||
|
@ -623,14 +288,12 @@ def upload_image(context, timeout_secs, image_service, image_id, owner_id,
|
||||||
kwargs.get('image_version'),
|
kwargs.get('image_version'),
|
||||||
'vmware_disktype': 'streamOptimized',
|
'vmware_disktype': 'streamOptimized',
|
||||||
'owner_id': owner_id}}
|
'owner_id': owner_id}}
|
||||||
|
updater = loopingcall.FixedIntervalLoopingCall(read_handle.update_progress)
|
||||||
# Passing 0 as the file size since data size to be transferred cannot be
|
try:
|
||||||
# predetermined.
|
updater.start(interval=NFC_LEASE_UPDATE_PERIOD)
|
||||||
_start_transfer(context,
|
image_service.update(context, image_id, image_metadata,
|
||||||
timeout_secs,
|
data=read_handle)
|
||||||
read_handle,
|
finally:
|
||||||
0,
|
updater.stop()
|
||||||
image_service=image_service,
|
read_handle.close()
|
||||||
image_id=image_id,
|
|
||||||
image_meta=image_metadata)
|
|
||||||
LOG.debug("Uploaded image: %s.", image_id)
|
LOG.debug("Uploaded image: %s.", image_id)
|
||||||
|
|
|
@ -17,280 +17,23 @@
|
||||||
Unit tests for functions and classes for image transfer.
|
Unit tests for functions and classes for image transfer.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import math
|
|
||||||
|
|
||||||
from eventlet import greenthread
|
|
||||||
from eventlet import timeout
|
|
||||||
import mock
|
import mock
|
||||||
|
import six
|
||||||
|
|
||||||
from oslo_vmware import exceptions
|
from oslo_vmware import exceptions
|
||||||
from oslo_vmware import image_transfer
|
from oslo_vmware import image_transfer
|
||||||
from oslo_vmware import rw_handles
|
|
||||||
from oslo_vmware.tests import base
|
from oslo_vmware.tests import base
|
||||||
|
|
||||||
|
|
||||||
class BlockingQueueTest(base.TestCase):
|
|
||||||
"""Tests for BlockingQueue."""
|
|
||||||
|
|
||||||
def test_read(self):
|
|
||||||
max_size = 10
|
|
||||||
chunk_size = 10
|
|
||||||
max_transfer_size = 30
|
|
||||||
queue = image_transfer.BlockingQueue(max_size, max_transfer_size)
|
|
||||||
|
|
||||||
def get_side_effect():
|
|
||||||
return [1] * chunk_size
|
|
||||||
|
|
||||||
queue.get = mock.Mock(side_effect=get_side_effect)
|
|
||||||
while True:
|
|
||||||
data_item = queue.read(chunk_size)
|
|
||||||
if not data_item:
|
|
||||||
break
|
|
||||||
|
|
||||||
self.assertEqual(max_transfer_size, queue._transferred)
|
|
||||||
exp_calls = [mock.call()] * int(math.ceil(float(max_transfer_size) /
|
|
||||||
chunk_size))
|
|
||||||
self.assertEqual(exp_calls, queue.get.call_args_list)
|
|
||||||
|
|
||||||
def test_write(self):
|
|
||||||
queue = image_transfer.BlockingQueue(10, 30)
|
|
||||||
queue.put = mock.Mock()
|
|
||||||
write_count = 10
|
|
||||||
for _ in range(0, write_count):
|
|
||||||
queue.write([1])
|
|
||||||
exp_calls = [mock.call([1])] * write_count
|
|
||||||
self.assertEqual(exp_calls, queue.put.call_args_list)
|
|
||||||
|
|
||||||
def test_seek(self):
|
|
||||||
queue = image_transfer.BlockingQueue(10, 30)
|
|
||||||
self.assertRaises(IOError, queue.seek, 5)
|
|
||||||
|
|
||||||
def test_tell(self):
|
|
||||||
queue = image_transfer.BlockingQueue(10, 30)
|
|
||||||
self.assertEqual(0, queue.tell())
|
|
||||||
queue.get = mock.Mock(return_value=[1] * 10)
|
|
||||||
queue.read(10)
|
|
||||||
self.assertEqual(10, queue.tell())
|
|
||||||
|
|
||||||
|
|
||||||
class ImageWriterTest(base.TestCase):
|
|
||||||
"""Tests for ImageWriter class."""
|
|
||||||
|
|
||||||
def _create_image_writer(self):
|
|
||||||
self.image_service = mock.Mock()
|
|
||||||
self.context = mock.Mock()
|
|
||||||
self.input_file = mock.Mock()
|
|
||||||
self.image_id = mock.Mock()
|
|
||||||
return image_transfer.ImageWriter(self.context, self.input_file,
|
|
||||||
self.image_service, self.image_id)
|
|
||||||
|
|
||||||
@mock.patch.object(greenthread, 'sleep')
|
|
||||||
def test_start(self, mock_sleep):
|
|
||||||
writer = self._create_image_writer()
|
|
||||||
status_list = ['queued', 'saving', 'active']
|
|
||||||
|
|
||||||
def image_service_show_side_effect(context, image_id):
|
|
||||||
status = status_list.pop(0)
|
|
||||||
return {'status': status}
|
|
||||||
|
|
||||||
self.image_service.show.side_effect = image_service_show_side_effect
|
|
||||||
exp_calls = [mock.call(self.context, self.image_id)] * len(status_list)
|
|
||||||
writer.start()
|
|
||||||
self.assertTrue(writer.wait())
|
|
||||||
self.image_service.update.assert_called_once_with(self.context,
|
|
||||||
self.image_id, {},
|
|
||||||
data=self.input_file)
|
|
||||||
self.assertEqual(exp_calls, self.image_service.show.call_args_list)
|
|
||||||
|
|
||||||
def test_start_with_killed_status(self):
|
|
||||||
writer = self._create_image_writer()
|
|
||||||
|
|
||||||
def image_service_show_side_effect(_context, _image_id):
|
|
||||||
return {'status': 'killed'}
|
|
||||||
|
|
||||||
self.image_service.show.side_effect = image_service_show_side_effect
|
|
||||||
writer.start()
|
|
||||||
self.assertRaises(exceptions.ImageTransferException,
|
|
||||||
writer.wait)
|
|
||||||
self.image_service.update.assert_called_once_with(self.context,
|
|
||||||
self.image_id, {},
|
|
||||||
data=self.input_file)
|
|
||||||
self.image_service.show.assert_called_once_with(self.context,
|
|
||||||
self.image_id)
|
|
||||||
|
|
||||||
def test_start_with_unknown_status(self):
|
|
||||||
writer = self._create_image_writer()
|
|
||||||
|
|
||||||
def image_service_show_side_effect(_context, _image_id):
|
|
||||||
return {'status': 'unknown'}
|
|
||||||
|
|
||||||
self.image_service.show.side_effect = image_service_show_side_effect
|
|
||||||
writer.start()
|
|
||||||
self.assertRaises(exceptions.ImageTransferException,
|
|
||||||
writer.wait)
|
|
||||||
self.image_service.update.assert_called_once_with(self.context,
|
|
||||||
self.image_id, {},
|
|
||||||
data=self.input_file)
|
|
||||||
self.image_service.show.assert_called_once_with(self.context,
|
|
||||||
self.image_id)
|
|
||||||
|
|
||||||
def test_start_with_image_service_show_exception(self):
|
|
||||||
writer = self._create_image_writer()
|
|
||||||
self.image_service.show.side_effect = RuntimeError()
|
|
||||||
writer.start()
|
|
||||||
self.assertRaises(exceptions.ImageTransferException, writer.wait)
|
|
||||||
self.image_service.update.assert_called_once_with(self.context,
|
|
||||||
self.image_id, {},
|
|
||||||
data=self.input_file)
|
|
||||||
self.image_service.show.assert_called_once_with(self.context,
|
|
||||||
self.image_id)
|
|
||||||
|
|
||||||
|
|
||||||
class FileReadWriteTaskTest(base.TestCase):
|
|
||||||
"""Tests for FileReadWriteTask class."""
|
|
||||||
|
|
||||||
def test_start(self):
|
|
||||||
data_items = [[1] * 10, [1] * 20, [1] * 5, []]
|
|
||||||
|
|
||||||
def input_file_read_side_effect(arg):
|
|
||||||
self.assertEqual(arg, rw_handles.READ_CHUNKSIZE)
|
|
||||||
data = data_items[input_file_read_side_effect.i]
|
|
||||||
input_file_read_side_effect.i += 1
|
|
||||||
return data
|
|
||||||
|
|
||||||
input_file_read_side_effect.i = 0
|
|
||||||
input_file = mock.Mock()
|
|
||||||
input_file.read.side_effect = input_file_read_side_effect
|
|
||||||
output_file = mock.Mock()
|
|
||||||
rw_task = image_transfer.FileReadWriteTask(input_file, output_file)
|
|
||||||
rw_task.start()
|
|
||||||
self.assertTrue(rw_task.wait())
|
|
||||||
self.assertEqual(len(data_items), input_file.read.call_count)
|
|
||||||
|
|
||||||
exp_calls = []
|
|
||||||
for i in range(0, len(data_items)):
|
|
||||||
exp_calls.append(mock.call(data_items[i]))
|
|
||||||
self.assertEqual(exp_calls, output_file.write.call_args_list)
|
|
||||||
|
|
||||||
self.assertEqual(len(data_items),
|
|
||||||
input_file.update_progress.call_count)
|
|
||||||
self.assertEqual(len(data_items),
|
|
||||||
output_file.update_progress.call_count)
|
|
||||||
|
|
||||||
def test_start_with_read_exception(self):
|
|
||||||
input_file = mock.Mock()
|
|
||||||
input_file.read.side_effect = RuntimeError()
|
|
||||||
output_file = mock.Mock()
|
|
||||||
rw_task = image_transfer.FileReadWriteTask(input_file, output_file)
|
|
||||||
rw_task.start()
|
|
||||||
self.assertRaises(exceptions.ImageTransferException, rw_task.wait)
|
|
||||||
input_file.read.assert_called_once_with(rw_handles.READ_CHUNKSIZE)
|
|
||||||
|
|
||||||
|
|
||||||
class ImageTransferUtilityTest(base.TestCase):
|
class ImageTransferUtilityTest(base.TestCase):
|
||||||
"""Tests for image_transfer utility methods."""
|
"""Tests for image_transfer utility methods."""
|
||||||
|
|
||||||
@mock.patch.object(timeout, 'Timeout')
|
def test_start_transfer(self):
|
||||||
@mock.patch.object(image_transfer, 'ImageWriter')
|
data = 'image-data-here'
|
||||||
@mock.patch.object(image_transfer, 'FileReadWriteTask')
|
read_handle = six.StringIO(data)
|
||||||
@mock.patch.object(image_transfer, 'BlockingQueue')
|
write_handle = mock.Mock()
|
||||||
def test_start_transfer(self, fake_BlockingQueue, fake_FileReadWriteTask,
|
image_transfer._start_transfer(read_handle, write_handle, None)
|
||||||
fake_ImageWriter, fake_Timeout):
|
write_handle.write.assert_called_once_with(data)
|
||||||
|
|
||||||
context = mock.Mock()
|
|
||||||
read_file_handle = mock.Mock()
|
|
||||||
read_file_handle.close = mock.Mock()
|
|
||||||
image_service = mock.Mock()
|
|
||||||
image_id = mock.Mock()
|
|
||||||
blocking_queue = mock.Mock()
|
|
||||||
|
|
||||||
write_file_handle1 = mock.Mock()
|
|
||||||
write_file_handle1.close = mock.Mock()
|
|
||||||
write_file_handle2 = None
|
|
||||||
write_file_handles = [write_file_handle1, write_file_handle2]
|
|
||||||
|
|
||||||
timeout_secs = 10
|
|
||||||
blocking_queue_size = 10
|
|
||||||
image_meta = {}
|
|
||||||
max_data_size = 30
|
|
||||||
|
|
||||||
fake_BlockingQueue.return_value = blocking_queue
|
|
||||||
fake_timer = mock.Mock()
|
|
||||||
fake_timer.cancel = mock.Mock()
|
|
||||||
fake_Timeout.return_value = fake_timer
|
|
||||||
|
|
||||||
for write_file_handle in write_file_handles:
|
|
||||||
image_transfer._start_transfer(context,
|
|
||||||
timeout_secs,
|
|
||||||
read_file_handle,
|
|
||||||
max_data_size,
|
|
||||||
write_file_handle=write_file_handle,
|
|
||||||
image_service=image_service,
|
|
||||||
image_id=image_id,
|
|
||||||
image_meta=image_meta)
|
|
||||||
|
|
||||||
exp_calls = [mock.call(blocking_queue_size,
|
|
||||||
max_data_size)] * len(write_file_handles)
|
|
||||||
self.assertEqual(exp_calls,
|
|
||||||
fake_BlockingQueue.call_args_list)
|
|
||||||
|
|
||||||
exp_calls2 = [mock.call(read_file_handle, blocking_queue),
|
|
||||||
mock.call(blocking_queue, write_file_handle1),
|
|
||||||
mock.call(read_file_handle, blocking_queue)]
|
|
||||||
self.assertEqual(exp_calls2,
|
|
||||||
fake_FileReadWriteTask.call_args_list)
|
|
||||||
|
|
||||||
exp_calls3 = mock.call(context, blocking_queue, image_service,
|
|
||||||
image_id, image_meta)
|
|
||||||
self.assertEqual(exp_calls3,
|
|
||||||
fake_ImageWriter.call_args)
|
|
||||||
|
|
||||||
exp_calls4 = [mock.call(timeout_secs)] * len(write_file_handles)
|
|
||||||
self.assertEqual(exp_calls4,
|
|
||||||
fake_Timeout.call_args_list)
|
|
||||||
|
|
||||||
self.assertEqual(len(write_file_handles),
|
|
||||||
fake_timer.cancel.call_count)
|
|
||||||
|
|
||||||
self.assertEqual(len(write_file_handles),
|
|
||||||
read_file_handle.close.call_count)
|
|
||||||
|
|
||||||
write_file_handle1.close.assert_called_once_with()
|
|
||||||
|
|
||||||
@mock.patch.object(image_transfer, 'FileReadWriteTask')
|
|
||||||
@mock.patch.object(image_transfer, 'BlockingQueue')
|
|
||||||
def test_start_transfer_with_no_image_destination(self, fake_BlockingQueue,
|
|
||||||
fake_FileReadWriteTask):
|
|
||||||
|
|
||||||
context = mock.Mock()
|
|
||||||
read_file_handle = mock.Mock()
|
|
||||||
write_file_handle = None
|
|
||||||
image_service = None
|
|
||||||
image_id = None
|
|
||||||
timeout_secs = 10
|
|
||||||
image_meta = {}
|
|
||||||
blocking_queue_size = 10
|
|
||||||
max_data_size = 30
|
|
||||||
blocking_queue = mock.Mock()
|
|
||||||
|
|
||||||
fake_BlockingQueue.return_value = blocking_queue
|
|
||||||
|
|
||||||
self.assertRaises(ValueError,
|
|
||||||
image_transfer._start_transfer,
|
|
||||||
context,
|
|
||||||
timeout_secs,
|
|
||||||
read_file_handle,
|
|
||||||
max_data_size,
|
|
||||||
write_file_handle=write_file_handle,
|
|
||||||
image_service=image_service,
|
|
||||||
image_id=image_id,
|
|
||||||
image_meta=image_meta)
|
|
||||||
|
|
||||||
fake_BlockingQueue.assert_called_once_with(blocking_queue_size,
|
|
||||||
max_data_size)
|
|
||||||
|
|
||||||
fake_FileReadWriteTask.assert_called_once_with(read_file_handle,
|
|
||||||
blocking_queue)
|
|
||||||
|
|
||||||
@mock.patch('oslo_vmware.rw_handles.FileWriteHandle')
|
@mock.patch('oslo_vmware.rw_handles.FileWriteHandle')
|
||||||
@mock.patch('oslo_vmware.rw_handles.ImageReadHandle')
|
@mock.patch('oslo_vmware.rw_handles.ImageReadHandle')
|
||||||
|
@ -349,11 +92,9 @@ class ImageTransferUtilityTest(base.TestCase):
|
||||||
cacerts=None)
|
cacerts=None)
|
||||||
|
|
||||||
fake_transfer.assert_called_once_with(
|
fake_transfer.assert_called_once_with(
|
||||||
context,
|
|
||||||
timeout_secs,
|
|
||||||
fake_ImageReadHandle,
|
fake_ImageReadHandle,
|
||||||
image_size,
|
fake_FileWriteHandle,
|
||||||
write_file_handle=fake_FileWriteHandle)
|
timeout_secs)
|
||||||
|
|
||||||
@mock.patch('oslo_vmware.rw_handles.VmdkWriteHandle')
|
@mock.patch('oslo_vmware.rw_handles.VmdkWriteHandle')
|
||||||
@mock.patch.object(image_transfer, '_start_transfer')
|
@mock.patch.object(image_transfer, '_start_transfer')
|
||||||
|
@ -396,12 +137,9 @@ class ImageTransferUtilityTest(base.TestCase):
|
||||||
vm_import_spec,
|
vm_import_spec,
|
||||||
image_size)
|
image_size)
|
||||||
|
|
||||||
fake_transfer.assert_called_once_with(
|
fake_transfer.assert_called_once_with(read_handle,
|
||||||
context,
|
fake_VmdkWriteHandle,
|
||||||
timeout_secs,
|
timeout_secs)
|
||||||
read_handle,
|
|
||||||
image_size,
|
|
||||||
write_file_handle=fake_VmdkWriteHandle)
|
|
||||||
|
|
||||||
fake_VmdkWriteHandle.get_imported_vm.assert_called_once_with()
|
fake_VmdkWriteHandle.get_imported_vm.assert_called_once_with()
|
||||||
|
|
||||||
|
@ -573,9 +311,8 @@ class ImageTransferUtilityTest(base.TestCase):
|
||||||
|
|
||||||
vmdk_read_handle.assert_called_once_with(
|
vmdk_read_handle.assert_called_once_with(
|
||||||
session, host, port, vm, vmdk_file_path, vmdk_size)
|
session, host, port, vm, vmdk_file_path, vmdk_size)
|
||||||
start_transfer.assert_called_once_with(
|
start_transfer.assert_called_once_with(read_handle, write_handle,
|
||||||
context, timeout, read_handle, vmdk_size,
|
timeout)
|
||||||
write_file_handle=write_handle)
|
|
||||||
|
|
||||||
@mock.patch('oslo_vmware.rw_handles.VmdkReadHandle')
|
@mock.patch('oslo_vmware.rw_handles.VmdkReadHandle')
|
||||||
@mock.patch.object(image_transfer, '_start_transfer')
|
@mock.patch.object(image_transfer, '_start_transfer')
|
||||||
|
@ -601,7 +338,7 @@ class ImageTransferUtilityTest(base.TestCase):
|
||||||
image_name = 'fake_image'
|
image_name = 'fake_image'
|
||||||
image_version = 1
|
image_version = 1
|
||||||
|
|
||||||
fake_VmdkReadHandle = 'fake_VmdkReadHandle'
|
fake_VmdkReadHandle = mock.Mock()
|
||||||
fake_rw_handles_VmdkReadHandle.return_value = fake_VmdkReadHandle
|
fake_rw_handles_VmdkReadHandle.return_value = fake_VmdkReadHandle
|
||||||
|
|
||||||
image_transfer.upload_image(context,
|
image_transfer.upload_image(context,
|
||||||
|
@ -633,10 +370,7 @@ class ImageTransferUtilityTest(base.TestCase):
|
||||||
'vmware_disktype': 'streamOptimized',
|
'vmware_disktype': 'streamOptimized',
|
||||||
'owner_id': owner_id}}
|
'owner_id': owner_id}}
|
||||||
|
|
||||||
fake_transfer.assert_called_once_with(context,
|
image_service.update.assert_called_once_with(context,
|
||||||
timeout_secs,
|
image_id,
|
||||||
fake_VmdkReadHandle,
|
image_metadata,
|
||||||
0,
|
data=fake_VmdkReadHandle)
|
||||||
image_service=image_service,
|
|
||||||
image_id=image_id,
|
|
||||||
image_meta=image_metadata)
|
|
||||||
|
|
Loading…
Reference in New Issue