From c19184765dea779999e9efdb2a7aa71da4a5cf5b Mon Sep 17 00:00:00 2001 From: Javeme Date: Wed, 17 Feb 2016 20:00:36 +0800 Subject: [PATCH] Refactor VmdkWriteHandle and VmdkReadHandle Add a class VmdkHandle as the base class of VmdkWriteHandle and VmdkReadHandle, and add related methods into it such as: def update_progress() def _release_lease() Change-Id: I29c48773842c1006b8499aad054e3901074d6195 Depends-On: Ib5b388f7eeb9f26de66c308a89f70c85ba6dc7a9 --- oslo_vmware/rw_handles.py | 387 +++++++++++++-------------- oslo_vmware/tests/test_rw_handles.py | 75 +++--- 2 files changed, 228 insertions(+), 234 deletions(-) diff --git a/oslo_vmware/rw_handles.py b/oslo_vmware/rw_handles.py index 36cec854..e81adf06 100644 --- a/oslo_vmware/rw_handles.py +++ b/oslo_vmware/rw_handles.py @@ -59,8 +59,6 @@ class FileHandle(object): """ self._eof = False self._file_handle = file_handle - self._last_logged_progress = 0 - self._last_progress_udpate = 0 def _create_connection(self, url, method, cacerts=False, ssl_thumbprint=None): @@ -192,51 +190,6 @@ class FileHandle(object): return '%s://[%s]:%d' % (scheme, host, port) return '%s://%s:%d' % (scheme, host, port) - def _fix_esx_url(self, url, host, port): - """Fix netloc in the case of an ESX host. - - In the case of an ESX host, the netloc is set to '*' in the URL - returned in HttpNfcLeaseInfo. It should be replaced with host name - or IP address. - """ - urlp = urlparse.urlparse(url) - if urlp.netloc == '*': - scheme, netloc, path, params, query, fragment = urlp - if netutils.is_valid_ipv6(host): - netloc = '[%s]:%d' % (host, port) - else: - netloc = "%s:%d" % (host, port) - url = urlparse.urlunparse((scheme, - netloc, - path, - params, - query, - fragment)) - return url - - def _find_vmdk_url(self, lease_info, host, port): - """Find the URL corresponding to a VMDK file in lease info.""" - url = None - ssl_thumbprint = None - for deviceUrl in lease_info.deviceUrl: - if deviceUrl.disk: - url = self._fix_esx_url(deviceUrl.url, host, port) - ssl_thumbprint = deviceUrl.sslThumbprint - break - if not url: - excep_msg = _("Could not retrieve VMDK URL from lease info.") - LOG.error(excep_msg) - raise exceptions.VimException(excep_msg) - LOG.debug("Found VMDK URL: %s from lease info.", url) - return url, ssl_thumbprint - - def _log_progress(self, progress): - """Log data transfer progress.""" - if (progress == 100 or (progress - self._last_logged_progress >= - MIN_PROGRESS_DIFF_TO_LOG)): - LOG.debug("Data transfer progress is %d%%.", progress) - self._last_logged_progress = progress - class FileWriteHandle(FileHandle): """Write handle for a file in VMware server.""" @@ -308,7 +261,170 @@ class FileWriteHandle(FileHandle): return "File write handle for %s" % self._url -class VmdkWriteHandle(FileHandle): +class VmdkHandle(FileHandle): + """VMDK handle based on HttpNfcLease.""" + + def __init__(self, session, lease, url, file_handle): + self._session = session + self._lease = lease + self._url = url + self._last_logged_progress = 0 + self._last_progress_udpate = 0 + + super(VmdkHandle, self).__init__(file_handle) + + def _log_progress(self, progress): + """Log data transfer progress.""" + if (progress == 100 or (progress - self._last_logged_progress >= + MIN_PROGRESS_DIFF_TO_LOG)): + LOG.debug("Data transfer progress is %d%%.", progress) + self._last_logged_progress = progress + + def _get_progress(self): + """Get current progress for updating progress to lease.""" + pass + + def update_progress(self): + """Updates progress to lease. + + This call back to the lease is essential to keep the lease alive + across long running write/read operations. + + :raises: VimException, VimFaultException, VimAttributeException, + VimSessionOverLoadException, VimConnectionException + """ + now = time.time() + if (now - self._last_progress_udpate < MIN_UPDATE_INTERVAL): + return + self._last_progress_udpate = now + progress = int(self._get_progress()) + self._log_progress(progress) + + try: + self._session.invoke_api(self._session.vim, + 'HttpNfcLeaseProgress', + self._lease, + percent=progress) + except exceptions.VimException: + with excutils.save_and_reraise_exception(): + LOG.exception(_LE("Error occurred while updating the " + "write/read progress of VMDK file " + "with URL = %s."), + self._url) + + def _release_lease(self): + """Release the lease + + :raises: VimException, VimFaultException, VimAttributeException, + VimSessionOverLoadException, VimConnectionException + """ + LOG.debug("Getting lease state for %s.", self._url) + + state = self._session.invoke_api(vim_util, + 'get_object_property', + self._session.vim, + self._lease, + 'state') + LOG.debug("Lease for %(url)s is in state: %(state)s.", + {'url': self._url, + 'state': state}) + if state == 'ready': + LOG.debug("Releasing lease for %s.", self._url) + self._session.invoke_api(self._session.vim, + 'HttpNfcLeaseComplete', + self._lease) + else: + LOG.debug("Lease for %(url)s is in state: %(state)s; no " + "need to release.", + {'url': self._url, + 'state': state}) + + @staticmethod + def _create_import_vapp_lease(session, rp_ref, import_spec, vm_folder_ref): + """Create and wait for HttpNfcLease lease for vApp import.""" + LOG.debug("Creating HttpNfcLease lease for vApp import into resource" + " pool: %s.", + rp_ref) + lease = session.invoke_api(session.vim, + 'ImportVApp', + rp_ref, + spec=import_spec, + folder=vm_folder_ref) + LOG.debug("Lease: %(lease)s obtained for vApp import into resource" + " pool %(rp_ref)s.", + {'lease': lease, + 'rp_ref': rp_ref}) + session.wait_for_lease_ready(lease) + + LOG.debug("Invoking VIM API for reading info of lease: %s.", lease) + lease_info = session.invoke_api(vim_util, + 'get_object_property', + session.vim, + lease, + 'info') + return lease, lease_info + + @staticmethod + def _create_export_vm_lease(session, vm_ref): + """Create and wait for HttpNfcLease lease for VM export.""" + LOG.debug("Creating HttpNfcLease lease for exporting VM: %s.", + vm_ref) + lease = session.invoke_api(session.vim, 'ExportVm', vm_ref) + LOG.debug("Lease: %(lease)s obtained for exporting VM: %(vm_ref)s.", + {'lease': lease, + 'vm_ref': vm_ref}) + session.wait_for_lease_ready(lease) + + LOG.debug("Invoking VIM API for reading info of lease: %s.", lease) + lease_info = session.invoke_api(vim_util, + 'get_object_property', + session.vim, + lease, + 'info') + return lease, lease_info + + @staticmethod + def _fix_esx_url(url, host, port): + """Fix netloc in the case of an ESX host. + + In the case of an ESX host, the netloc is set to '*' in the URL + returned in HttpNfcLeaseInfo. It should be replaced with host name + or IP address. + """ + urlp = urlparse.urlparse(url) + if urlp.netloc == '*': + scheme, netloc, path, params, query, fragment = urlp + if netutils.is_valid_ipv6(host): + netloc = '[%s]:%d' % (host, port) + else: + netloc = "%s:%d" % (host, port) + url = urlparse.urlunparse((scheme, + netloc, + path, + params, + query, + fragment)) + return url + + @staticmethod + def _find_vmdk_url(lease_info, host, port): + """Find the URL corresponding to a VMDK file in lease info.""" + url = None + ssl_thumbprint = None + for deviceUrl in lease_info.deviceUrl: + if deviceUrl.disk: + url = VmdkHandle._fix_esx_url(deviceUrl.url, host, port) + ssl_thumbprint = deviceUrl.sslThumbprint + break + if not url: + excep_msg = _("Could not retrieve VMDK URL from lease info.") + LOG.error(excep_msg) + raise exceptions.VimException(excep_msg) + LOG.debug("Found VMDK URL: %s from lease info.", url) + return url, ssl_thumbprint + + +class VmdkWriteHandle(VmdkHandle): """VMDK write handle based on HttpNfcLease. This class creates a vApp in the specified resource pool and uploads the @@ -332,25 +448,17 @@ class VmdkWriteHandle(FileHandle): VimSessionOverLoadException, VimConnectionException, ValueError """ - self._session = session self._vmdk_size = vmdk_size self._bytes_written = 0 # Get lease and its info for vApp import - self._lease = self._create_and_wait_for_lease(session, - rp_ref, - import_spec, - vm_folder_ref) - LOG.debug("Invoking VIM API for reading info of lease: %s.", - self._lease) - lease_info = session.invoke_api(vim_util, - 'get_object_property', - session.vim, - self._lease, - 'info') + lease, lease_info = self._create_import_vapp_lease(session, + rp_ref, + import_spec, + vm_folder_ref) # Find VMDK URL where data is to be written - self._url, thumbprint = self._find_vmdk_url(lease_info, host, port) + url, thumbprint = self._find_vmdk_url(lease_info, host, port) self._vm_ref = lease_info.entity cookies = session.vim.client.options.transport.cookiejar @@ -364,36 +472,18 @@ class VmdkWriteHandle(FileHandle): else: raise ValueError('http_method must be either PUT or POST') self._conn = self._create_write_connection(http_method, - self._url, + url, vmdk_size, cookies=cookies, overwrite=overwrite, content_type=content_type, ssl_thumbprint=thumbprint) - FileHandle.__init__(self, self._conn) + super(VmdkWriteHandle, self).__init__(session, lease, url, self._conn) def get_imported_vm(self): """"Get managed object reference of the VM created for import.""" return self._vm_ref - def _create_and_wait_for_lease(self, session, rp_ref, import_spec, - vm_folder_ref): - """Create and wait for HttpNfcLease lease for vApp import.""" - LOG.debug("Creating HttpNfcLease lease for vApp import into resource" - " pool: %s.", - rp_ref) - lease = session.invoke_api(session.vim, - 'ImportVApp', - rp_ref, - spec=import_spec, - folder=vm_folder_ref) - LOG.debug("Lease: %(lease)s obtained for vApp import into resource" - " pool %(rp_ref)s.", - {'lease': lease, - 'rp_ref': rp_ref}) - session.wait_for_lease_ready(lease) - return lease - def write(self, data): """Write data to the file. @@ -417,61 +507,14 @@ class VmdkWriteHandle(FileHandle): LOG.exception(excep_msg) raise exceptions.VimException(excep_msg, excep) - # TODO(vbala) Move this method to FileHandle. - def update_progress(self): - """Updates progress to lease. - - This call back to the lease is essential to keep the lease alive - across long running write operations. - - :raises: VimException, VimFaultException, VimAttributeException, - VimSessionOverLoadException, VimConnectionException - """ - now = time.time() - if (now - self._last_progress_udpate < MIN_UPDATE_INTERVAL): - return - self._last_progress_udpate = now - progress = int(float(self._bytes_written) / self._vmdk_size * 100) - self._log_progress(progress) - - try: - self._session.invoke_api(self._session.vim, - 'HttpNfcLeaseProgress', - self._lease, - percent=progress) - except exceptions.VimException: - with excutils.save_and_reraise_exception(): - LOG.exception(_LE("Error occurred while updating the " - "write progress of VMDK file with " - "URL = %s."), - self._url) - def close(self): """Releases the lease and close the connection. - :raises: VimException, VimFaultException, VimAttributeException, - VimSessionOverLoadException, VimConnectionException + :raises: VimAttributeException, VimSessionOverLoadException, + VimConnectionException """ - LOG.debug("Getting lease state for %s.", self._url) try: - state = self._session.invoke_api(vim_util, - 'get_object_property', - self._session.vim, - self._lease, - 'state') - LOG.debug("Lease for %(url)s is in state: %(state)s.", - {'url': self._url, - 'state': state}) - if state == 'ready': - LOG.debug("Releasing lease for %s.", self._url) - self._session.invoke_api(self._session.vim, - 'HttpNfcLeaseComplete', - self._lease) - else: - LOG.debug("Lease for %(url)s is in state: %(state)s; no " - "need to release.", - {'url': self._url, - 'state': state}) + self._release_lease() except exceptions.VimException: LOG.warning(_LW("Error occurred while releasing the lease " "for %s."), @@ -480,11 +523,14 @@ class VmdkWriteHandle(FileHandle): super(VmdkWriteHandle, self).close() LOG.debug("Closed VMDK write handle for %s.", self._url) + def _get_progress(self): + return float(self._bytes_written) / self._vmdk_size * 100 + def __str__(self): return "VMDK write handle for %s" % self._url -class VmdkReadHandle(FileHandle): +class VmdkReadHandle(VmdkHandle): """VMDK read handle based on HttpNfcLease.""" def __init__(self, session, host, port, vm_ref, vmdk_path, @@ -505,38 +551,19 @@ class VmdkReadHandle(FileHandle): :raises: VimException, VimFaultException, VimAttributeException, VimSessionOverLoadException, VimConnectionException """ - self._session = session self._vmdk_size = vmdk_size self._bytes_read = 0 # Obtain lease for VM export - self._lease = self._create_and_wait_for_lease(session, vm_ref) - LOG.debug("Invoking VIM API for reading info of lease: %s.", - self._lease) - lease_info = session.invoke_api(vim_util, - 'get_object_property', - session.vim, - self._lease, - 'info') + lease, lease_info = self._create_export_vm_lease(session, vm_ref) # find URL of the VMDK file to be read and open connection - self._url, thumbprint = self._find_vmdk_url(lease_info, host, port) + url, thumbprint = self._find_vmdk_url(lease_info, host, port) cookies = session.vim.client.options.transport.cookiejar - self._conn = self._create_read_connection(self._url, + self._conn = self._create_read_connection(url, cookies=cookies, ssl_thumbprint=thumbprint) - FileHandle.__init__(self, self._conn) - - def _create_and_wait_for_lease(self, session, vm_ref): - """Create and wait for HttpNfcLease lease for VM export.""" - LOG.debug("Creating HttpNfcLease lease for exporting VM: %s.", - vm_ref) - lease = session.invoke_api(session.vim, 'ExportVm', vm_ref) - LOG.debug("Lease: %(lease)s obtained for exporting VM: %(vm_ref)s.", - {'lease': lease, - 'vm_ref': vm_ref}) - session.wait_for_lease_ready(lease) - return lease + super(VmdkReadHandle, self).__init__(session, lease, url, self._conn) def read(self, chunk_size): """Read a chunk of data from the VMDK file. @@ -558,59 +585,14 @@ class VmdkReadHandle(FileHandle): LOG.exception(excep_msg) raise exceptions.VimException(excep_msg, excep) - def update_progress(self): - """Updates progress to lease. - - This call back to the lease is essential to keep the lease alive - across long running read operations. - - :raises: VimException, VimFaultException, VimAttributeException, - VimSessionOverLoadException, VimConnectionException - """ - now = time.time() - if (now - self._last_progress_udpate < MIN_UPDATE_INTERVAL): - return - self._last_progress_udpate = now - progress = int(float(self._bytes_read) / self._vmdk_size * 100) - self._log_progress(progress) - - try: - self._session.invoke_api(self._session.vim, - 'HttpNfcLeaseProgress', - self._lease, - percent=progress) - except exceptions.VimException: - with excutils.save_and_reraise_exception(): - LOG.exception(_LE("Error occurred while updating the " - "read progress of VMDK file with URL = %s."), - self._url) - def close(self): """Releases the lease and close the connection. :raises: VimException, VimFaultException, VimAttributeException, VimSessionOverLoadException, VimConnectionException """ - LOG.debug("Getting lease state for %s.", self._url) try: - state = self._session.invoke_api(vim_util, - 'get_object_property', - self._session.vim, - self._lease, - 'state') - LOG.debug("Lease for %(url)s is in state: %(state)s.", - {'url': self._url, - 'state': state}) - if state == 'ready': - LOG.debug("Releasing lease for %s.", self._url) - self._session.invoke_api(self._session.vim, - 'HttpNfcLeaseComplete', - self._lease) - else: - LOG.debug("Lease for %(url)s is in state: %(state)s; no " - "need to release.", - {'url': self._url, - 'state': state}) + self._release_lease() except exceptions.VimException: LOG.warning(_LW("Error occurred while releasing the lease " "for %s."), @@ -621,6 +603,9 @@ class VmdkReadHandle(FileHandle): super(VmdkReadHandle, self).close() LOG.debug("Closed VMDK read handle for %s.", self._url) + def _get_progress(self): + return float(self._bytes_read) / self._vmdk_size * 100 + def __str__(self): return "VMDK read handle for %s" % self._url diff --git a/oslo_vmware/tests/test_rw_handles.py b/oslo_vmware/tests/test_rw_handles.py index 0dd633ab..ba33ce54 100644 --- a/oslo_vmware/tests/test_rw_handles.py +++ b/oslo_vmware/tests/test_rw_handles.py @@ -37,23 +37,6 @@ class FileHandleTest(base.TestCase): vmw_http_file.close() file_handle.close.assert_called_once_with() - def test_find_vmdk_url(self): - device_url_0 = mock.Mock() - device_url_0.disk = False - device_url_1 = mock.Mock() - device_url_1.disk = True - device_url_1.url = 'https://*/ds1/vm1.vmdk' - device_url_1.sslThumbprint = '11:22:33:44:55' - lease_info = mock.Mock() - lease_info.deviceUrl = [device_url_0, device_url_1] - host = '10.1.2.3' - port = 443 - exp_url = 'https://%s:%d/ds1/vm1.vmdk' % (host, port) - vmw_http_file = rw_handles.FileHandle(None) - url, thumbprint = vmw_http_file._find_vmdk_url(lease_info, host, port) - self.assertEqual(exp_url, url) - self.assertEqual('11:22:33:44:55', thumbprint) - @mock.patch('urllib3.connection.HTTPConnection') def test_create_connection_http(self, http_conn): conn = mock.Mock() @@ -140,6 +123,48 @@ class FileWriteHandleTest(base.TestCase): self._conn.close.assert_called_once_with() +class VmdkHandleTest(base.TestCase): + """Tests for VmdkHandle.""" + + def test_find_vmdk_url(self): + device_url_0 = mock.Mock() + device_url_0.disk = False + device_url_1 = mock.Mock() + device_url_1.disk = True + device_url_1.url = 'https://*/ds1/vm1.vmdk' + device_url_1.sslThumbprint = '11:22:33:44:55' + lease_info = mock.Mock() + lease_info.deviceUrl = [device_url_0, device_url_1] + host = '10.1.2.3' + port = 443 + exp_url = 'https://%s:%d/ds1/vm1.vmdk' % (host, port) + vmw_http_file = rw_handles.VmdkHandle(None, None, None, None) + url, thumbprint = vmw_http_file._find_vmdk_url(lease_info, host, port) + self.assertEqual(exp_url, url) + self.assertEqual('11:22:33:44:55', thumbprint) + + def test_update_progress(self): + session = mock.Mock() + lease = mock.Mock() + handle = rw_handles.VmdkHandle(session, lease, 'fake-url', None) + handle._get_progress = mock.Mock(return_value=50) + + handle.update_progress() + + session.invoke_api.assert_called_once_with(session.vim, + 'HttpNfcLeaseProgress', + lease, percent=50) + + def test_update_progress_with_error(self): + session = mock.Mock() + handle = rw_handles.VmdkHandle(session, None, 'fake-url', None) + + handle._get_progress = mock.Mock(return_value=0) + session.invoke_api.side_effect = exceptions.VimException(None) + + self.assertRaises(exceptions.VimException, handle.update_progress) + + class VmdkWriteHandleTest(base.TestCase): """Tests for VmdkWriteHandle.""" @@ -220,14 +245,6 @@ class VmdkWriteHandleTest(base.TestCase): handle.write([1] * data_size) handle.update_progress() - def test_update_progress_with_error(self): - session = self._create_mock_session(True, 10) - handle = rw_handles.VmdkWriteHandle(session, '10.1.2.3', 443, - 'rp-1', 'folder-1', None, - 100) - session.invoke_api.side_effect = exceptions.VimException(None) - self.assertRaises(exceptions.VimException, handle.update_progress) - def test_close(self): session = self._create_mock_session() handle = rw_handles.VmdkWriteHandle(session, '10.1.2.3', 443, @@ -316,14 +333,6 @@ class VmdkReadHandleTest(base.TestCase): handle.update_progress() self.assertEqual('fake-data', data) - def test_update_progress_with_error(self): - session = self._create_mock_session(True, 10) - handle = rw_handles.VmdkReadHandle(session, '10.1.2.3', 443, - 'vm-1', '[ds] disk1.vmdk', - 100) - session.invoke_api.side_effect = exceptions.VimException(None) - self.assertRaises(exceptions.VimException, handle.update_progress) - def test_close(self): session = self._create_mock_session() handle = rw_handles.VmdkReadHandle(session, '10.1.2.3', 443,