nova/nova/tests/virt/vmwareapi/test_ds_util.py

549 lines
22 KiB
Python

# Copyright (c) 2014 VMware, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
import re
import mock
from oslo.utils import units
from oslo.vmware import exceptions as vexc
from testtools import matchers
from nova import exception
from nova.i18n import _
from nova import test
from nova.tests.virt.vmwareapi import fake
from nova.virt.vmwareapi import ds_util
class DsUtilTestCase(test.NoDBTestCase):
def setUp(self):
super(DsUtilTestCase, self).setUp()
self.session = fake.FakeSession()
self.flags(api_retry_count=1, group='vmware')
fake.reset()
def tearDown(self):
super(DsUtilTestCase, self).tearDown()
fake.reset()
def test_file_delete(self):
def fake_call_method(module, method, *args, **kwargs):
self.assertEqual('DeleteDatastoreFile_Task', method)
name = kwargs.get('name')
self.assertEqual('[ds] fake/path', name)
datacenter = kwargs.get('datacenter')
self.assertEqual('fake-dc-ref', datacenter)
return 'fake_delete_task'
with contextlib.nested(
mock.patch.object(self.session, '_wait_for_task'),
mock.patch.object(self.session, '_call_method',
fake_call_method)
) as (_wait_for_task, _call_method):
ds_path = ds_util.DatastorePath('ds', 'fake/path')
ds_util.file_delete(self.session,
ds_path, 'fake-dc-ref')
_wait_for_task.assert_has_calls([
mock.call('fake_delete_task')])
def test_file_move(self):
def fake_call_method(module, method, *args, **kwargs):
self.assertEqual('MoveDatastoreFile_Task', method)
sourceName = kwargs.get('sourceName')
self.assertEqual('[ds] tmp/src', sourceName)
destinationName = kwargs.get('destinationName')
self.assertEqual('[ds] base/dst', destinationName)
sourceDatacenter = kwargs.get('sourceDatacenter')
self.assertEqual('fake-dc-ref', sourceDatacenter)
destinationDatacenter = kwargs.get('destinationDatacenter')
self.assertEqual('fake-dc-ref', destinationDatacenter)
return 'fake_move_task'
with contextlib.nested(
mock.patch.object(self.session, '_wait_for_task'),
mock.patch.object(self.session, '_call_method',
fake_call_method)
) as (_wait_for_task, _call_method):
src_ds_path = ds_util.DatastorePath('ds', 'tmp/src')
dst_ds_path = ds_util.DatastorePath('ds', 'base/dst')
ds_util.file_move(self.session,
'fake-dc-ref', src_ds_path, dst_ds_path)
_wait_for_task.assert_has_calls([
mock.call('fake_move_task')])
def test_mkdir(self):
def fake_call_method(module, method, *args, **kwargs):
self.assertEqual('MakeDirectory', method)
name = kwargs.get('name')
self.assertEqual('[ds] fake/path', name)
datacenter = kwargs.get('datacenter')
self.assertEqual('fake-dc-ref', datacenter)
createParentDirectories = kwargs.get('createParentDirectories')
self.assertTrue(createParentDirectories)
with mock.patch.object(self.session, '_call_method',
fake_call_method):
ds_path = ds_util.DatastorePath('ds', 'fake/path')
ds_util.mkdir(self.session, ds_path, 'fake-dc-ref')
def test_file_exists(self):
def fake_call_method(module, method, *args, **kwargs):
if method == 'SearchDatastore_Task':
ds_browser = args[0]
self.assertEqual('fake-browser', ds_browser)
datastorePath = kwargs.get('datastorePath')
self.assertEqual('[ds] fake/path', datastorePath)
return 'fake_exists_task'
# Should never get here
self.fail()
def fake_wait_for_task(task_ref):
if task_ref == 'fake_exists_task':
result_file = fake.DataObject()
result_file.path = 'fake-file'
result = fake.DataObject()
result.file = [result_file]
result.path = '[ds] fake/path'
task_info = fake.DataObject()
task_info.result = result
return task_info
# Should never get here
self.fail()
with contextlib.nested(
mock.patch.object(self.session, '_call_method',
fake_call_method),
mock.patch.object(self.session, '_wait_for_task',
fake_wait_for_task)):
ds_path = ds_util.DatastorePath('ds', 'fake/path')
file_exists = ds_util.file_exists(self.session,
'fake-browser', ds_path, 'fake-file')
self.assertTrue(file_exists)
def test_file_exists_fails(self):
def fake_call_method(module, method, *args, **kwargs):
if method == 'SearchDatastore_Task':
return 'fake_exists_task'
# Should never get here
self.fail()
def fake_wait_for_task(task_ref):
if task_ref == 'fake_exists_task':
raise vexc.FileNotFoundException()
# Should never get here
self.fail()
with contextlib.nested(
mock.patch.object(self.session, '_call_method',
fake_call_method),
mock.patch.object(self.session, '_wait_for_task',
fake_wait_for_task)):
ds_path = ds_util.DatastorePath('ds', 'fake/path')
file_exists = ds_util.file_exists(self.session,
'fake-browser', ds_path, 'fake-file')
self.assertFalse(file_exists)
def _mock_get_datastore_calls(self, *datastores):
"""Mock vim_util calls made by get_datastore."""
datastores_i = [None]
# For the moment, at least, this list of datastores is simply passed to
# get_properties_for_a_collection_of_objects, which we mock below. We
# don't need to over-complicate the fake function by worrying about its
# contents.
fake_ds_list = ['fake-ds']
def fake_call_method(module, method, *args, **kwargs):
# Mock the call which returns a list of datastores for the cluster
if (module == ds_util.vim_util and
method == 'get_dynamic_property' and
args == ('fake-cluster', 'ClusterComputeResource',
'datastore')):
fake_ds_mor = fake.DataObject()
fake_ds_mor.ManagedObjectReference = fake_ds_list
return fake_ds_mor
# Return the datastore result sets we were passed in, in the order
# given
if (module == ds_util.vim_util and
method == 'get_properties_for_a_collection_of_objects' and
args[0] == 'Datastore' and
args[1] == fake_ds_list):
# Start a new iterator over given datastores
datastores_i[0] = iter(datastores)
return datastores_i[0].next()
# Continue returning results from the current iterator.
if (module == ds_util.vim_util and
method == 'continue_to_get_objects'):
try:
return datastores_i[0].next()
except StopIteration:
return None
# Sentinel that get_datastore's use of vim has changed
self.fail('Unexpected vim call in get_datastore: %s' % method)
return mock.patch.object(self.session, '_call_method',
side_effect=fake_call_method)
def test_get_datastore(self):
fake_objects = fake.FakeRetrieveResult()
fake_objects.add_object(fake.Datastore())
fake_objects.add_object(fake.Datastore("fake-ds-2", 2048, 1000,
False, "normal"))
fake_objects.add_object(fake.Datastore("fake-ds-3", 4096, 2000,
True, "inMaintenance"))
with self._mock_get_datastore_calls(fake_objects):
result = ds_util.get_datastore(self.session, 'fake-cluster')
self.assertEqual("fake-ds", result.name)
self.assertEqual(units.Ti, result.capacity)
self.assertEqual(500 * units.Gi, result.freespace)
def test_get_datastore_with_regex(self):
# Test with a regex that matches with a datastore
datastore_valid_regex = re.compile("^openstack.*\d$")
fake_objects = fake.FakeRetrieveResult()
fake_objects.add_object(fake.Datastore("openstack-ds0"))
fake_objects.add_object(fake.Datastore("fake-ds0"))
fake_objects.add_object(fake.Datastore("fake-ds1"))
with self._mock_get_datastore_calls(fake_objects):
result = ds_util.get_datastore(self.session, 'fake-cluster',
datastore_valid_regex)
self.assertEqual("openstack-ds0", result.name)
def test_get_datastore_with_token(self):
regex = re.compile("^ds.*\d$")
fake0 = fake.FakeRetrieveResult()
fake0.add_object(fake.Datastore("ds0", 10 * units.Gi, 5 * units.Gi))
fake0.add_object(fake.Datastore("foo", 10 * units.Gi, 9 * units.Gi))
setattr(fake0, 'token', 'token-0')
fake1 = fake.FakeRetrieveResult()
fake1.add_object(fake.Datastore("ds2", 10 * units.Gi, 8 * units.Gi))
fake1.add_object(fake.Datastore("ds3", 10 * units.Gi, 1 * units.Gi))
with self._mock_get_datastore_calls(fake0, fake1):
result = ds_util.get_datastore(self.session, 'fake-cluster', regex)
self.assertEqual("ds2", result.name)
def test_get_datastore_with_list(self):
# Test with a regex containing whitelist of datastores
datastore_valid_regex = re.compile("(openstack-ds0|openstack-ds2)")
fake_objects = fake.FakeRetrieveResult()
fake_objects.add_object(fake.Datastore("openstack-ds0"))
fake_objects.add_object(fake.Datastore("openstack-ds1"))
fake_objects.add_object(fake.Datastore("openstack-ds2"))
with self._mock_get_datastore_calls(fake_objects):
result = ds_util.get_datastore(self.session, 'fake-cluster',
datastore_valid_regex)
self.assertNotEqual("openstack-ds1", result.name)
def test_get_datastore_with_regex_error(self):
# Test with a regex that has no match
# Checks if code raises DatastoreNotFound with a specific message
datastore_invalid_regex = re.compile("unknown-ds")
exp_message = (_("Datastore regex %s did not match any datastores")
% datastore_invalid_regex.pattern)
fake_objects = fake.FakeRetrieveResult()
fake_objects.add_object(fake.Datastore("fake-ds0"))
fake_objects.add_object(fake.Datastore("fake-ds1"))
# assertRaisesRegExp would have been a good choice instead of
# try/catch block, but it's available only from Py 2.7.
try:
with self._mock_get_datastore_calls(fake_objects):
ds_util.get_datastore(self.session, 'fake-cluster',
datastore_invalid_regex)
except exception.DatastoreNotFound as e:
self.assertEqual(exp_message, e.args[0])
else:
self.fail("DatastoreNotFound Exception was not raised with "
"message: %s" % exp_message)
def test_get_datastore_without_datastore(self):
self.assertRaises(exception.DatastoreNotFound,
ds_util.get_datastore,
fake.FakeObjectRetrievalSession(None), cluster="fake-cluster")
def test_get_datastore_inaccessible_ds(self):
data_store = fake.Datastore()
data_store.set("summary.accessible", False)
fake_objects = fake.FakeRetrieveResult()
fake_objects.add_object(data_store)
with self._mock_get_datastore_calls(fake_objects):
self.assertRaises(exception.DatastoreNotFound,
ds_util.get_datastore,
self.session, 'fake-cluster')
def test_get_datastore_ds_in_maintenance(self):
data_store = fake.Datastore()
data_store.set("summary.maintenanceMode", "inMaintenance")
fake_objects = fake.FakeRetrieveResult()
fake_objects.add_object(data_store)
with self._mock_get_datastore_calls(fake_objects):
self.assertRaises(exception.DatastoreNotFound,
ds_util.get_datastore,
self.session, 'fake-cluster')
def test_get_datastore_no_host_in_cluster(self):
def fake_call_method(module, method, *args, **kwargs):
return ''
with mock.patch.object(self.session, '_call_method',
fake_call_method):
self.assertRaises(exception.DatastoreNotFound,
ds_util.get_datastore,
self.session, 'fake-cluster')
def _test_is_datastore_valid(self, accessible=True,
maintenance_mode="normal",
type="VMFS",
datastore_regex=None):
propdict = {}
propdict["summary.accessible"] = accessible
propdict["summary.maintenanceMode"] = maintenance_mode
propdict["summary.type"] = type
propdict["summary.name"] = "ds-1"
return ds_util._is_datastore_valid(propdict, datastore_regex)
def test_is_datastore_valid(self):
for ds_type in ds_util.ALLOWED_DATASTORE_TYPES:
self.assertTrue(self._test_is_datastore_valid(True,
"normal",
ds_type))
def test_is_datastore_valid_inaccessible_ds(self):
self.assertFalse(self._test_is_datastore_valid(False,
"normal",
"VMFS"))
def test_is_datastore_valid_ds_in_maintenance(self):
self.assertFalse(self._test_is_datastore_valid(True,
"inMaintenance",
"VMFS"))
def test_is_datastore_valid_ds_type_invalid(self):
self.assertFalse(self._test_is_datastore_valid(True,
"normal",
"vfat"))
def test_is_datastore_valid_not_matching_regex(self):
datastore_regex = re.compile("ds-2")
self.assertFalse(self._test_is_datastore_valid(True,
"normal",
"VMFS",
datastore_regex))
def test_is_datastore_valid_matching_regex(self):
datastore_regex = re.compile("ds-1")
self.assertTrue(self._test_is_datastore_valid(True,
"normal",
"VMFS",
datastore_regex))
class DatastoreTestCase(test.NoDBTestCase):
def test_ds(self):
ds = ds_util.Datastore(
"fake_ref", "ds_name", 2 * units.Gi, 1 * units.Gi)
self.assertEqual('ds_name', ds.name)
self.assertEqual('fake_ref', ds.ref)
self.assertEqual(2 * units.Gi, ds.capacity)
self.assertEqual(1 * units.Gi, ds.freespace)
def test_ds_invalid_space(self):
self.assertRaises(ValueError, ds_util.Datastore,
"fake_ref", "ds_name", 1 * units.Gi, 2 * units.Gi)
self.assertRaises(ValueError, ds_util.Datastore,
"fake_ref", "ds_name", None, 2 * units.Gi)
def test_ds_no_capacity_no_freespace(self):
ds = ds_util.Datastore("fake_ref", "ds_name")
self.assertIsNone(ds.capacity)
self.assertIsNone(ds.freespace)
def test_ds_invalid(self):
self.assertRaises(ValueError, ds_util.Datastore, None, "ds_name")
self.assertRaises(ValueError, ds_util.Datastore, "fake_ref", None)
def test_build_path(self):
ds = ds_util.Datastore("fake_ref", "ds_name")
ds_path = ds.build_path("some_dir", "foo.vmdk")
self.assertEqual('[ds_name] some_dir/foo.vmdk', str(ds_path))
class DatastorePathTestCase(test.NoDBTestCase):
def test_ds_path(self):
p = ds_util.DatastorePath('dsname', 'a/b/c', 'file.iso')
self.assertEqual('[dsname] a/b/c/file.iso', str(p))
self.assertEqual('a/b/c/file.iso', p.rel_path)
self.assertEqual('a/b/c', p.parent.rel_path)
self.assertEqual('[dsname] a/b/c', str(p.parent))
self.assertEqual('dsname', p.datastore)
self.assertEqual('file.iso', p.basename)
self.assertEqual('a/b/c', p.dirname)
def test_ds_path_no_ds_name(self):
bad_args = [
('', ['a/b/c', 'file.iso']),
(None, ['a/b/c', 'file.iso'])]
for t in bad_args:
self.assertRaises(
ValueError, ds_util.DatastorePath,
t[0], *t[1])
def test_ds_path_invalid_path_components(self):
bad_args = [
('dsname', [None]),
('dsname', ['', None]),
('dsname', ['a', None]),
('dsname', ['a', None, 'b']),
('dsname', [None, '']),
('dsname', [None, 'b'])]
for t in bad_args:
self.assertRaises(
ValueError, ds_util.DatastorePath,
t[0], *t[1])
def test_ds_path_no_subdir(self):
args = [
('dsname', ['', 'x.vmdk']),
('dsname', ['x.vmdk'])]
canonical_p = ds_util.DatastorePath('dsname', 'x.vmdk')
self.assertEqual('[dsname] x.vmdk', str(canonical_p))
self.assertEqual('', canonical_p.dirname)
self.assertEqual('x.vmdk', canonical_p.basename)
self.assertEqual('x.vmdk', canonical_p.rel_path)
for t in args:
p = ds_util.DatastorePath(t[0], *t[1])
self.assertEqual(str(canonical_p), str(p))
def test_ds_path_ds_only(self):
args = [
('dsname', []),
('dsname', ['']),
('dsname', ['', ''])]
canonical_p = ds_util.DatastorePath('dsname')
self.assertEqual('[dsname]', str(canonical_p))
self.assertEqual('', canonical_p.rel_path)
self.assertEqual('', canonical_p.basename)
self.assertEqual('', canonical_p.dirname)
for t in args:
p = ds_util.DatastorePath(t[0], *t[1])
self.assertEqual(str(canonical_p), str(p))
self.assertEqual(canonical_p.rel_path, p.rel_path)
def test_ds_path_equivalence(self):
args = [
('dsname', ['a/b/c/', 'x.vmdk']),
('dsname', ['a/', 'b/c/', 'x.vmdk']),
('dsname', ['a', 'b', 'c', 'x.vmdk']),
('dsname', ['a/b/c', 'x.vmdk'])]
canonical_p = ds_util.DatastorePath('dsname', 'a/b/c', 'x.vmdk')
for t in args:
p = ds_util.DatastorePath(t[0], *t[1])
self.assertEqual(str(canonical_p), str(p))
self.assertEqual(canonical_p.datastore, p.datastore)
self.assertEqual(canonical_p.rel_path, p.rel_path)
self.assertEqual(str(canonical_p.parent), str(p.parent))
def test_ds_path_non_equivalence(self):
args = [
# leading slash
('dsname', ['/a', 'b', 'c', 'x.vmdk']),
('dsname', ['/a/b/c/', 'x.vmdk']),
('dsname', ['a/b/c', '/x.vmdk']),
# leading space
('dsname', ['a/b/c/', ' x.vmdk']),
('dsname', ['a/', ' b/c/', 'x.vmdk']),
('dsname', [' a', 'b', 'c', 'x.vmdk']),
# trailing space
('dsname', ['/a/b/c/', 'x.vmdk ']),
('dsname', ['a/b/c/ ', 'x.vmdk'])]
canonical_p = ds_util.DatastorePath('dsname', 'a/b/c', 'x.vmdk')
for t in args:
p = ds_util.DatastorePath(t[0], *t[1])
self.assertNotEqual(str(canonical_p), str(p))
def test_ds_path_hashable(self):
ds1 = ds_util.DatastorePath('dsname', 'path')
ds2 = ds_util.DatastorePath('dsname', 'path')
# If the above objects have the same hash, they will only be added to
# the set once
self.assertThat(set([ds1, ds2]), matchers.HasLength(1))
def test_equal(self):
a = ds_util.DatastorePath('ds_name', 'a')
b = ds_util.DatastorePath('ds_name', 'a')
self.assertEqual(a, b)
def test_join(self):
p = ds_util.DatastorePath('ds_name', 'a')
ds_path = p.join('b')
self.assertEqual('[ds_name] a/b', str(ds_path))
p = ds_util.DatastorePath('ds_name', 'a')
ds_path = p.join()
self.assertEqual('[ds_name] a', str(ds_path))
bad_args = [
[None],
['', None],
['a', None],
['a', None, 'b']]
for arg in bad_args:
self.assertRaises(ValueError, p.join, *arg)
def test_ds_path_parse(self):
p = ds_util.DatastorePath.parse('[dsname]')
self.assertEqual('dsname', p.datastore)
self.assertEqual('', p.rel_path)
p = ds_util.DatastorePath.parse('[dsname] folder')
self.assertEqual('dsname', p.datastore)
self.assertEqual('folder', p.rel_path)
p = ds_util.DatastorePath.parse('[dsname] folder/file')
self.assertEqual('dsname', p.datastore)
self.assertEqual('folder/file', p.rel_path)
for p in [None, '']:
self.assertRaises(ValueError, ds_util.DatastorePath.parse, p)
for p in ['bad path', '/a/b/c', 'a/b/c']:
self.assertRaises(IndexError, ds_util.DatastorePath.parse, p)