Merge "Limit Failover Cluster WMI provider usage"

This commit is contained in:
Zuul 2018-10-26 10:56:48 +00:00 committed by Gerrit Code Review
commit 728ddeb4f4
8 changed files with 949 additions and 291 deletions

View File

@ -232,6 +232,10 @@ class ClusterException(OSWinException):
pass
class ClusterObjectNotFound(NotFound, ClusterException):
pass
class ClusterWin32Exception(ClusterException, Win32Exception):
pass

View File

@ -47,7 +47,7 @@ class BaseTestCase(base.BaseTestCase):
def _patch_autospec_classes(self):
for class_type in self._autospec_classes:
mocked_class = mock.Mock(autospec=class_type)
mocked_class = mock.MagicMock(autospec=class_type)
patcher = mock.patch(
'.'.join([class_type.__module__, class_type.__name__]),
mocked_class)

View File

@ -155,6 +155,20 @@ class ClusApiUtilsTestCase(test_base.OsWinBaseTestCase):
self.assertEqual(self._mock_run.return_value, handle)
def test_open_cluster_enum(self):
handle = self._clusapi_utils.open_cluster_enum(
mock.sentinel.cluster_handle,
mock.sentinel.object_type)
self._mock_run.assert_called_once_with(
self._clusapi.ClusterOpenEnumEx,
mock.sentinel.cluster_handle,
mock.sentinel.object_type,
None,
**self._clusapi_utils._open_handle_check_flags)
self.assertEqual(self._mock_run.return_value, handle)
def test_open_cluster_group(self):
self._mock_ctypes()
@ -185,6 +199,21 @@ class ClusApiUtilsTestCase(test_base.OsWinBaseTestCase):
self.assertEqual(self._mock_run.return_value, handle)
def test_open_cluster_resource(self):
self._mock_ctypes()
handle = self._clusapi_utils.open_cluster_resource(
mock.sentinel.cluster_handle,
mock.sentinel.resource_name)
self._mock_run.assert_called_once_with(
self._clusapi.OpenClusterResource,
mock.sentinel.cluster_handle,
self._ctypes.c_wchar_p(mock.sentinel.resource_name),
**self._clusapi_utils._open_handle_check_flags)
self.assertEqual(self._mock_run.return_value, handle)
def test_close_cluster(self):
self._clusapi_utils.close_cluster(mock.sentinel.handle)
self._clusapi.CloseCluster.assert_called_once_with(
@ -200,6 +229,36 @@ class ClusApiUtilsTestCase(test_base.OsWinBaseTestCase):
self._clusapi.CloseClusterNode.assert_called_once_with(
mock.sentinel.handle)
def test_close_cluster_resource(self):
self._clusapi_utils.close_cluster_resource(mock.sentinel.handle)
self._clusapi.CloseClusterResource.assert_called_once_with(
mock.sentinel.handle)
def test_close_cluster_enum(self):
self._clusapi_utils.close_cluster_enum(mock.sentinel.handle)
self._clusapi.ClusterCloseEnumEx.assert_called_once_with(
mock.sentinel.handle)
def test_online_cluster_group(self):
self._clusapi_utils.online_cluster_group(mock.sentinel.group_handle,
mock.sentinel.dest_handle)
self._mock_run.assert_called_once_with(
self._clusapi.OnlineClusterGroup,
mock.sentinel.group_handle,
mock.sentinel.dest_handle)
def test_destroy_cluster_group(self):
self._clusapi_utils.destroy_cluster_group(mock.sentinel.group_handle)
self._mock_run.assert_called_once_with(
self._clusapi.DestroyClusterGroup,
mock.sentinel.group_handle)
def test_offline_cluster_group(self):
self._clusapi_utils.offline_cluster_group(mock.sentinel.group_handle)
self._mock_run.assert_called_once_with(
self._clusapi.OfflineClusterGroup,
mock.sentinel.group_handle)
@ddt.data(0, w_const.ERROR_IO_PENDING)
def test_cancel_cluster_group_operation(self, cancel_ret_val):
self._mock_run.return_value = cancel_ret_val
@ -323,6 +382,9 @@ class ClusApiUtilsTestCase(test_base.OsWinBaseTestCase):
fake_filter_flags = 4
fake_clus_obj_name = 'fake-changed-clus-object'
fake_event_buff = 'fake-event-buff'
fake_obj_type = 'fake-object-type'
fake_obj_id = 'fake-obj-id'
fake_parent_id = 'fake-parent-id'
notif_key = ctypes.c_ulong(fake_notif_key)
requested_buff_sz = 1024
@ -344,16 +406,31 @@ class ClusApiUtilsTestCase(test_base.OsWinBaseTestCase):
buff_sz = ctypes.cast(
p_buff_sz,
wintypes.PDWORD).contents
obj_type_sz = ctypes.cast(
p_obj_type_sz,
wintypes.PDWORD).contents
obj_id_sz = ctypes.cast(
p_obj_id_buff_sz,
wintypes.PDWORD).contents
parent_id_buff_sz = ctypes.cast(
p_parent_id_buff_sz,
wintypes.PDWORD).contents
# We'll just request the tested method to pass us
# a buffer this large.
if (buff_sz.value < requested_buff_sz or
obj_name_buff_sz.value < requested_buff_sz):
obj_name_buff_sz.value < requested_buff_sz or
parent_id_buff_sz.value < requested_buff_sz or
obj_type_sz.value < requested_buff_sz or
obj_id_sz.value < requested_buff_sz):
buff_sz.value = requested_buff_sz
obj_name_buff_sz.value = requested_buff_sz
parent_id_buff_sz.value = requested_buff_sz
obj_type_sz.value = requested_buff_sz
obj_id_sz.value = requested_buff_sz
raise exceptions.ClusterWin32Exception(
error_code=w_const.ERROR_MORE_DATA,
func_name='GetClusterNotify',
func_name='GetClusterNotifyV2',
error_message='error more data')
pp_notif_key = ctypes.cast(pp_notif_key, ctypes.c_void_p)
@ -366,23 +443,23 @@ class ClusApiUtilsTestCase(test_base.OsWinBaseTestCase):
filter_and_type.dwObjectType = fake_notif_type
filter_and_type.FilterFlags = fake_filter_flags
obj_name_buff = ctypes.cast(
p_obj_name_buff,
ctypes.POINTER(
ctypes.c_wchar *
(requested_buff_sz // ctypes.sizeof(ctypes.c_wchar))))
obj_name_buff = obj_name_buff.contents
ctypes.memset(obj_name_buff, 0, obj_name_buff_sz.value)
obj_name_buff.value = fake_clus_obj_name
def set_wchar_buff(p_wchar_buff, wchar_buff_sz, value):
wchar_buff = ctypes.cast(
p_wchar_buff,
ctypes.POINTER(
ctypes.c_wchar *
(wchar_buff_sz // ctypes.sizeof(ctypes.c_wchar))))
wchar_buff = wchar_buff.contents
ctypes.memset(wchar_buff, 0, wchar_buff_sz)
wchar_buff.value = value
return wchar_buff
buff = ctypes.cast(
p_buff,
ctypes.POINTER(
ctypes.c_wchar *
(requested_buff_sz // ctypes.sizeof(ctypes.c_wchar))))
buff = buff.contents
ctypes.memset(buff, 0, buff_sz.value)
buff.value = fake_event_buff
set_wchar_buff(p_obj_name_buff, requested_buff_sz,
fake_clus_obj_name)
set_wchar_buff(p_buff, requested_buff_sz, fake_event_buff)
set_wchar_buff(p_parent_id_buff, requested_buff_sz, fake_parent_id)
set_wchar_buff(p_obj_type, requested_buff_sz, fake_obj_type)
set_wchar_buff(p_obj_id_buff, requested_buff_sz, fake_obj_id)
self.assertEqual(mock.sentinel.timeout_ms, timeout_ms)
@ -399,8 +476,11 @@ class ClusApiUtilsTestCase(test_base.OsWinBaseTestCase):
event['buff'] = w_event_buff.split('\x00')[0]
expected_event = dict(cluster_object_name=fake_clus_obj_name,
object_id=fake_obj_id,
object_type=fake_notif_type,
object_type_str=fake_obj_type,
filter_flags=fake_filter_flags,
parent_id=fake_parent_id,
buff=fake_event_buff,
buff_sz=requested_buff_sz,
notif_key=fake_notif_key)
@ -419,7 +499,11 @@ class ClusApiUtilsTestCase(test_base.OsWinBaseTestCase):
w_const.CLUSREG_NAME_GRP_STATUS_INFORMATION,
w_const.CLUSPROP_SYNTAX_LIST_VALUE_ULARGE_INTEGER,
ctypes.c_ulonglong(w_const.
CLUSGRP_STATUS_WAITING_IN_QUEUE_FOR_MOVE)) # noqa
CLUSGRP_STATUS_WAITING_IN_QUEUE_FOR_MOVE)), # noqa
self._clusapi_utils.get_property_list_entry(
w_const.CLUSREG_NAME_GRP_TYPE,
w_const.CLUSPROP_SYNTAX_LIST_VALUE_DWORD,
ctypes.c_ulonglong(w_const.ClusGroupTypeVirtualMachine)),
]
prop_list = self._clusapi_utils.get_property_list(prop_entries)
@ -527,3 +611,180 @@ class ClusApiUtilsTestCase(test_base.OsWinBaseTestCase):
self.assertEqual(
w_const.CLUSGRP_STATUS_WAITING_IN_QUEUE_FOR_MOVE,
status_info)
def test_get_cluster_group_type(self):
prop_list = self._get_fake_prop_list()
status_info = self._clusapi_utils.get_cluster_group_type(
ctypes.byref(prop_list), ctypes.sizeof(prop_list))
self.assertEqual(
w_const.ClusGroupTypeVirtualMachine,
status_info)
def test_cluster_get_enum_count(self):
ret_val = self._clusapi_utils.cluster_get_enum_count(
mock.sentinel.enum_handle)
self.assertEqual(self._mock_run.return_value, ret_val)
self._mock_run.assert_called_once_with(
self._clusapi.ClusterGetEnumCountEx,
mock.sentinel.enum_handle,
error_on_nonzero_ret_val=False,
ret_val_is_err_code=False)
def test_cluster_enum(self):
obj_id = 'fake_obj_id'
obj_id_wchar_p = ctypes.c_wchar_p(obj_id)
requested_buff_sz = 1024
def fake_cluster_enum(func, enum_handle, index, buff_p, buff_sz_p,
ignored_error_codes=tuple()):
self.assertEqual(self._clusapi.ClusterEnumEx, func)
self.assertEqual(mock.sentinel.enum_handle, enum_handle)
self.assertEqual(mock.sentinel.index, index)
buff_sz = ctypes.cast(
buff_sz_p,
wintypes.PDWORD).contents
# We'll just request the tested method to pass us
# a buffer this large.
if (buff_sz.value < requested_buff_sz):
buff_sz.value = requested_buff_sz
if w_const.ERROR_MORE_DATA not in ignored_error_codes:
raise exceptions.ClusterWin32Exception(
error_code=w_const.ERROR_MORE_DATA)
return
item = ctypes.cast(
buff_p,
clusapi_def.PCLUSTER_ENUM_ITEM).contents
item.lpszId = obj_id_wchar_p
item.cbId = len(obj_id)
self._mock_run.side_effect = fake_cluster_enum
item = self._clusapi_utils.cluster_enum(
mock.sentinel.enum_handle, mock.sentinel.index)
self.assertEqual(obj_id, item.lpszId)
@ddt.ddt
class TestClusterContextManager(test_base.OsWinBaseTestCase):
_autospec_classes = [_clusapi_utils.ClusApiUtils]
def setUp(self):
super(TestClusterContextManager, self).setUp()
self._cmgr = _clusapi_utils.ClusterContextManager()
self._clusapi_utils = self._cmgr._clusapi_utils
@ddt.data(None, mock.sentinel.cluster_name)
def test_open_cluster(self, cluster_name):
with self._cmgr.open_cluster(cluster_name) as f:
self._clusapi_utils.open_cluster.assert_called_once_with(
cluster_name)
self.assertEqual(f, self._clusapi_utils.open_cluster.return_value)
self._clusapi_utils.close_cluster.assert_called_once_with(
self._clusapi_utils.open_cluster.return_value)
def test_open_cluster_group(self):
with self._cmgr.open_cluster_group(mock.sentinel.group_name) as f:
self._clusapi_utils.open_cluster.assert_called_once_with(None)
self._clusapi_utils.open_cluster_group.assert_called_once_with(
self._clusapi_utils.open_cluster.return_value,
mock.sentinel.group_name)
self.assertEqual(
f,
self._clusapi_utils.open_cluster_group.return_value)
self._clusapi_utils.close_cluster_group.assert_called_once_with(
self._clusapi_utils.open_cluster_group.return_value)
self._clusapi_utils.close_cluster.assert_called_once_with(
self._clusapi_utils.open_cluster.return_value)
def test_open_missing_cluster_group(self):
exc = exceptions.ClusterWin32Exception(
func_name='OpenClusterGroup',
message='expected exception',
error_code=w_const.ERROR_GROUP_NOT_FOUND)
self._clusapi_utils.open_cluster_group.side_effect = exc
self.assertRaises(
exceptions.ClusterObjectNotFound,
self._cmgr.open_cluster_group(mock.sentinel.group_name).__enter__)
def test_open_cluster_group_with_handle(self):
with self._cmgr.open_cluster_group(
mock.sentinel.group_name,
cluster_handle=mock.sentinel.cluster_handle) as f:
self._clusapi_utils.open_cluster.assert_not_called()
self._clusapi_utils.open_cluster_group.assert_called_once_with(
mock.sentinel.cluster_handle, mock.sentinel.group_name)
self.assertEqual(
f,
self._clusapi_utils.open_cluster_group.return_value)
self._clusapi_utils.close_cluster_group.assert_called_once_with(
self._clusapi_utils.open_cluster_group.return_value)
# If we pass our own handle, we don't want the tested method to
# close it.
self._clusapi_utils.close_cluster.assert_not_called()
def test_open_cluster_resource(self):
with self._cmgr.open_cluster_resource(mock.sentinel.res_name) as f:
self._clusapi_utils.open_cluster.assert_called_once_with(None)
self._clusapi_utils.open_cluster_resource.assert_called_once_with(
self._clusapi_utils.open_cluster.return_value,
mock.sentinel.res_name)
self.assertEqual(
f,
self._clusapi_utils.open_cluster_resource.return_value)
self._clusapi_utils.close_cluster_resource.assert_called_once_with(
self._clusapi_utils.open_cluster_resource.return_value)
self._clusapi_utils.close_cluster.assert_called_once_with(
self._clusapi_utils.open_cluster.return_value)
def test_open_cluster_node(self):
with self._cmgr.open_cluster_node(mock.sentinel.node_name) as f:
self._clusapi_utils.open_cluster.assert_called_once_with(None)
self._clusapi_utils.open_cluster_node.assert_called_once_with(
self._clusapi_utils.open_cluster.return_value,
mock.sentinel.node_name)
self.assertEqual(
f,
self._clusapi_utils.open_cluster_node.return_value)
self._clusapi_utils.close_cluster_node.assert_called_once_with(
self._clusapi_utils.open_cluster_node.return_value)
self._clusapi_utils.close_cluster.assert_called_once_with(
self._clusapi_utils.open_cluster.return_value)
def test_open_cluster_enum(self):
with self._cmgr.open_cluster_enum(mock.sentinel.object_type) as f:
self._clusapi_utils.open_cluster.assert_called_once_with(None)
self._clusapi_utils.open_cluster_enum.assert_called_once_with(
self._clusapi_utils.open_cluster.return_value,
mock.sentinel.object_type)
self.assertEqual(
f,
self._clusapi_utils.open_cluster_enum.return_value)
self._clusapi_utils.close_cluster_enum.assert_called_once_with(
self._clusapi_utils.open_cluster_enum.return_value)
self._clusapi_utils.close_cluster.assert_called_once_with(
self._clusapi_utils.open_cluster.return_value)
def test_invalid_handle_type(self):
self.assertRaises(exceptions.Invalid,
self._cmgr._open(handle_type=None).__enter__)
self.assertRaises(exceptions.Invalid,
self._cmgr._close, mock.sentinel.handle,
handle_type=None)

View File

@ -34,6 +34,7 @@ class ClusterUtilsTestCase(test_base.OsWinBaseTestCase):
_autospec_classes = [
clusterutils._clusapi_utils.ClusApiUtils,
clusterutils._clusapi_utils.ClusterContextManager
]
_FAKE_RES_NAME = "fake_res_name"
@ -48,6 +49,11 @@ class ClusterUtilsTestCase(test_base.OsWinBaseTestCase):
self._clusterutils._conn_cluster = mock.MagicMock()
self._clusterutils._cluster = mock.MagicMock()
self._clusapi = self._clusterutils._clusapi_utils
self._cmgr = self._clusterutils._cmgr
def _cmgr_val(self, cmgr):
# Return the value that a mocked context manager would yield.
return cmgr.return_value.__enter__.return_value
def test_init_hyperv_conn(self):
fake_cluster_name = "fake_cluster"
@ -79,34 +85,33 @@ class ClusterUtilsTestCase(test_base.OsWinBaseTestCase):
self.assertEqual(mock.sentinel.fake_node_name,
self._clusterutils.get_node_name())
def test_get_cluster_nodes(self):
fake_node1 = mock.MagicMock(Dependent=mock.sentinel.cluster_node1)
fake_node2 = mock.MagicMock(Dependent=mock.sentinel.cluster_node2)
node_list = [fake_node1, fake_node2]
expected = [mock.sentinel.cluster_node1, mock.sentinel.cluster_node2]
fake_class = self._clusterutils._conn_cluster.MSCluster_ClusterToNode
fake_class.return_value = node_list
@mock.patch.object(clusterutils.ClusterUtils, 'cluster_enum')
def test_get_cluster_nodes(self, mock_cluster_enum):
expected = mock_cluster_enum.return_value
self.assertEqual(expected, self._clusterutils._get_cluster_nodes())
def test_get_vm_groups(self):
vm_gr1 = mock.MagicMock(GroupType=self._clusterutils._VM_GROUP_TYPE)
vm_gr2 = mock.MagicMock()
vm_gr3 = mock.MagicMock(GroupType=self._clusterutils._VM_GROUP_TYPE)
mock_cluster_enum.assert_called_once_with(w_const.CLUSTER_ENUM_NODE)
fake_assoc1 = mock.MagicMock(PartComponent=vm_gr1)
fake_assoc2 = mock.MagicMock(PartComponent=vm_gr2)
fake_assoc3 = mock.MagicMock(PartComponent=vm_gr3)
@mock.patch.object(clusterutils.ClusterUtils, 'cluster_enum')
@mock.patch.object(clusterutils.ClusterUtils, 'get_cluster_group_type')
def test_get_vm_groups(self, mock_get_type, mock_cluster_enum):
mock_groups = [mock.MagicMock(), mock.MagicMock(), mock.MagicMock()]
group_types = [w_const.ClusGroupTypeVirtualMachine,
w_const.ClusGroupTypeVirtualMachine,
mock.sentinel.some_other_group_type]
assoc_list = [fake_assoc1, fake_assoc2, fake_assoc3]
fake_conn = self._clusterutils._conn_cluster
fake_conn.MSCluster_ClusterToResourceGroup.return_value = assoc_list
mock_cluster_enum.return_value = mock_groups
mock_get_type.side_effect = group_types
exp = mock_groups[:-1]
res = list(self._clusterutils._get_vm_groups())
self.assertIn(vm_gr1, res)
self.assertNotIn(vm_gr2, res)
self.assertIn(vm_gr3, res)
self.assertEqual(exp, res)
mock_cluster_enum.assert_called_once_with(w_const.CLUSTER_ENUM_GROUP)
mock_get_type.assert_has_calls(
[mock.call(r['name']) for r in mock_groups])
@mock.patch.object(clusterutils.ClusterUtils,
'_lookup_vm_group')
@ -134,32 +139,6 @@ class ClusterUtilsTestCase(test_base.OsWinBaseTestCase):
self._clusterutils._conn_cluster.MSCluster_ResourceGroup,
self._FAKE_VM_NAME)
@mock.patch.object(clusterutils.ClusterUtils,
'_lookup_vm')
def test_lookup_vm_check(self, mock_lookup_vm):
mock_lookup_vm.return_value = mock.sentinel.fake_vm
ret = self._clusterutils._lookup_vm_check(
self._FAKE_VM_NAME)
self.assertEqual(mock.sentinel.fake_vm, ret)
@mock.patch.object(clusterutils.ClusterUtils,
'_lookup_vm')
def test_lookup_vm_check_no_vm(self, mock_lookup_vm):
mock_lookup_vm.return_value = None
self.assertRaises(exceptions.HyperVVMNotFoundException,
self._clusterutils._lookup_vm_check,
self._FAKE_VM_NAME)
@mock.patch.object(clusterutils.ClusterUtils,
'_lookup_res')
def test_lookup_vm(self, mock_lookup_res):
self._clusterutils._lookup_vm(self._FAKE_VM_NAME)
mock_lookup_res.assert_called_once_with(
self._clusterutils._conn_cluster.MSCluster_Resource,
self._clusterutils._VM_BASE_NAME % self._FAKE_VM_NAME)
def test_lookup_res_no_res(self):
res_list = []
resource_source = mock.MagicMock()
@ -199,36 +178,40 @@ class ClusterUtilsTestCase(test_base.OsWinBaseTestCase):
@mock.patch.object(clusterutils.ClusterUtils,
'_get_cluster_nodes')
def test_get_cluster_node_names(self, mock_get_cluster_nodes):
cluster_nodes = [mock.Mock(Name='node1'),
mock.Mock(Name='node2')]
cluster_nodes = [dict(name='node1'),
dict(name='node2')]
mock_get_cluster_nodes.return_value = cluster_nodes
ret = self._clusterutils.get_cluster_node_names()
self.assertItemsEqual(['node1', 'node2'], ret)
@mock.patch.object(clusterutils.ClusterUtils,
'_lookup_vm_group_check')
def test_get_vm_host(self, mock_lookup_vm_group_check):
@mock.patch.object(clusterutils.ClusterUtils, '_get_cluster_group_state')
def test_get_vm_host(self, mock_get_state):
# Refresh the helpers. Closures are a bit difficult to mock.
owner_node = "fake_owner_node"
vm = mock.Mock(OwnerNode=owner_node)
mock_lookup_vm_group_check.return_value = vm
mock_get_state.return_value = dict(owner_node=owner_node)
self.assertEqual(
owner_node,
self._clusterutils.get_vm_host(self._FAKE_VM_NAME))
self._clusterutils.get_vm_host(mock.sentinel.vm_name))
self._cmgr.open_cluster_group.assert_called_once_with(
mock.sentinel.vm_name)
mock_get_state.assert_called_once_with(
self._cmgr_val(self._cmgr.open_cluster_group))
@mock.patch.object(clusterutils.ClusterUtils, '_get_vm_groups')
def test_list_instances(self, mock_get_vm_groups):
mock_get_vm_groups.return_value = [mock.Mock(Name='vm1'),
mock.Mock(Name='vm2')]
mock_get_vm_groups.return_value = [dict(name='vm1'),
dict(name='vm2')]
ret = self._clusterutils.list_instances()
self.assertItemsEqual(['vm1', 'vm2'], ret)
@mock.patch.object(clusterutils.ClusterUtils, '_get_vm_groups')
def test_list_instance_uuids(self, mock_get_vm_groups):
mock_get_vm_groups.return_value = [mock.Mock(Id='uuid1'),
mock.Mock(Id='uuid2')]
mock_get_vm_groups.return_value = [dict(id='uuid1'),
dict(id='uuid2')]
ret = self._clusterutils.list_instance_uuids()
self.assertItemsEqual(['uuid1', 'uuid2'], ret)
@ -257,21 +240,21 @@ class ClusterUtilsTestCase(test_base.OsWinBaseTestCase):
self._clusterutils._FAILBACK_WINDOW_MAX)
vm_group.put.assert_called_once_with()
@mock.patch.object(clusterutils.ClusterUtils, '_lookup_vm_check')
def test_bring_online(self, mock_lookup_vm_check):
vm = mock.MagicMock()
mock_lookup_vm_check.return_value = vm
def test_bring_online(self):
self._clusterutils.bring_online(mock.sentinel.vm_name)
self._clusterutils.bring_online(self._FAKE_VM_NAME)
vm.BringOnline.assert_called_once_with()
self._cmgr.open_cluster_group.assert_called_once_with(
mock.sentinel.vm_name)
self._clusapi.online_cluster_group.assert_called_once_with(
self._cmgr_val(self._cmgr.open_cluster_group))
@mock.patch.object(clusterutils.ClusterUtils, '_lookup_vm')
def test_take_offline(self, mock_lookup_vm):
vm = mock.MagicMock()
mock_lookup_vm.return_value = vm
def test_take_offline(self):
self._clusterutils.take_offline(mock.sentinel.vm_name)
self._clusterutils.take_offline(self._FAKE_VM_NAME)
vm.TakeOffline.assert_called_once_with()
self._cmgr.open_cluster_group.assert_called_once_with(
mock.sentinel.vm_name)
self._clusapi.offline_cluster_group.assert_called_once_with(
self._cmgr_val(self._cmgr.open_cluster_group))
@mock.patch.object(clusterutils.ClusterUtils, '_lookup_vm_group')
def test_delete(self, mock_lookup_vm_group):
@ -282,18 +265,41 @@ class ClusterUtilsTestCase(test_base.OsWinBaseTestCase):
vm.DestroyGroup.assert_called_once_with(
self._clusterutils._DESTROY_GROUP)
@mock.patch.object(clusterutils.ClusterUtils, '_lookup_vm')
def test_vm_exists_true(self, mock_lookup_vm):
vm = mock.MagicMock()
mock_lookup_vm.return_value = vm
def test_cluster_enum(self):
cluster_objects = [mock.Mock(), mock.Mock()]
self.assertTrue(self._clusterutils.vm_exists(self._FAKE_VM_NAME))
self._clusapi.cluster_get_enum_count.return_value = len(
cluster_objects)
self._clusapi.cluster_enum.side_effect = cluster_objects
@mock.patch.object(clusterutils.ClusterUtils, '_lookup_vm')
def test_vm_exists_false(self, mock_lookup_vm):
mock_lookup_vm.return_value = None
exp_ret_val = [dict(version=item.dwVersion,
type=item.dwType,
id=item.lpszId,
name=item.lpszName) for item in cluster_objects]
ret_val = list(self._clusterutils.cluster_enum(mock.sentinel.obj_type))
self.assertFalse(self._clusterutils.vm_exists(self._FAKE_VM_NAME))
self.assertEqual(exp_ret_val, ret_val)
enum_handle = self._cmgr_val(self._cmgr.open_cluster_enum)
self._cmgr.open_cluster_enum.assert_called_once_with(
mock.sentinel.obj_type)
self._clusapi.cluster_get_enum_count.assert_called_once_with(
enum_handle)
self._clusapi.cluster_enum.assert_has_calls(
[mock.call(enum_handle, idx)
for idx in range(len(cluster_objects))])
@ddt.data(True, False)
def test_vm_exists(self, exists):
self._cmgr.open_cluster_resource.side_effect = (
None if exists else exceptions.ClusterObjectNotFound('test'))
self.assertEqual(
exists,
self._clusterutils.vm_exists(self._FAKE_VM_NAME))
self._cmgr.open_cluster_resource.assert_called_once_with(
self._FAKE_RESOURCEGROUP_NAME)
@mock.patch.object(clusterutils.ClusterUtils, '_migrate_vm')
def test_live_migrate_vm(self, mock_migrate_vm):
@ -354,15 +360,15 @@ class ClusterUtilsTestCase(test_base.OsWinBaseTestCase):
w_const.CLUSAPI_GROUP_MOVE_QUEUE_ENABLED |
w_const.CLUSAPI_GROUP_MOVE_HIGH_PRIORITY_START)
exp_clus_h = self._clusapi.open_cluster.return_value
exp_clus_node_h = self._clusapi.open_cluster_node.return_value
exp_clus_group_h = self._clusapi.open_cluster_group.return_value
exp_clus_h = self._cmgr_val(self._cmgr.open_cluster)
exp_clus_node_h = self._cmgr_val(self._cmgr.open_cluster_node)
exp_clus_group_h = self._cmgr_val(self._cmgr.open_cluster_group)
self._clusapi.open_cluster.assert_called_once_with()
self._clusapi.open_cluster_group.assert_called_once_with(
exp_clus_h, self._FAKE_VM_NAME)
self._clusapi.open_cluster_node.assert_called_once_with(
exp_clus_h, self._FAKE_HOST)
self._cmgr.open_cluster.assert_called_once_with()
self._cmgr.open_cluster_group.assert_called_once_with(
self._FAKE_VM_NAME, cluster_handle=exp_clus_h)
self._cmgr.open_cluster_node.assert_called_once_with(
self._FAKE_HOST, cluster_handle=exp_clus_h)
self._clusapi.move_cluster_group.assert_called_once_with(
exp_clus_group_h, exp_clus_node_h, expected_migrate_flags,
@ -385,12 +391,6 @@ class ClusterUtilsTestCase(test_base.OsWinBaseTestCase):
constants.CLUSTER_GROUP_ONLINE,
self._FAKE_HOST)
self._clusapi.close_cluster_group.assert_called_once_with(
exp_clus_group_h)
self._clusapi.close_cluster_node.assert_called_once_with(
exp_clus_node_h)
self._clusapi.close_cluster.assert_called_once_with(exp_clus_h)
@mock.patch.object(clusterutils.ClusterUtils,
'_cancel_cluster_group_migration')
@mock.patch.object(clusterutils.ClusterUtils,
@ -409,7 +409,7 @@ class ClusterUtilsTestCase(test_base.OsWinBaseTestCase):
group_name=self._FAKE_VM_NAME,
time_elapsed=10)
mock_wait_group.side_effect = timeout_exc
mock_listener = mock_listener_cls.return_value.__enter__.return_value
mock_listener = self._cmgr_val(mock_listener_cls)
mock_validate_migr.side_effect = (
(None, ) if finished_after_cancel
else exceptions.ClusterGroupMigrationFailed(
@ -432,7 +432,7 @@ class ClusterUtilsTestCase(test_base.OsWinBaseTestCase):
self._clusterutils._migrate_vm,
*migrate_args)
exp_clus_group_h = self._clusapi.open_cluster_group.return_value
exp_clus_group_h = self._cmgr_val(self._cmgr.open_cluster_group)
mock_cancel_migr.assert_called_once_with(
mock_listener, self._FAKE_VM_NAME, exp_clus_group_h,
mock.sentinel.exp_state, mock.sentinel.timeout)
@ -478,8 +478,8 @@ class ClusterUtilsTestCase(test_base.OsWinBaseTestCase):
def test_cancel_cluster_group_migration_public(self, mock_listener_cls,
mock_cancel_migr):
exp_clus_h = self._clusapi.open_cluster.return_value
exp_clus_group_h = self._clusapi.open_cluster_group.return_value
exp_clus_h = self._cmgr_val(self._cmgr.open_cluster)
exp_clus_group_h = self._cmgr_val(self._cmgr.open_cluster_group)
mock_listener = mock_listener_cls.return_value
mock_listener.__enter__.return_value = mock_listener
@ -489,9 +489,9 @@ class ClusterUtilsTestCase(test_base.OsWinBaseTestCase):
mock.sentinel.expected_state,
mock.sentinel.timeout)
self._clusapi.open_cluster.assert_called_once_with()
self._clusapi.open_cluster_group.assert_called_once_with(
exp_clus_h, mock.sentinel.group_name)
self._cmgr.open_cluster.assert_called_once_with()
self._cmgr.open_cluster_group.assert_called_once_with(
mock.sentinel.group_name, cluster_handle=exp_clus_h)
mock_listener.__enter__.assert_called_once_with()
mock_listener_cls.assert_called_once_with(exp_clus_h,
@ -503,10 +503,6 @@ class ClusterUtilsTestCase(test_base.OsWinBaseTestCase):
mock.sentinel.expected_state,
mock.sentinel.timeout)
self._clusapi.close_cluster.assert_called_once_with(exp_clus_h)
self._clusapi.close_cluster_group.assert_called_once_with(
exp_clus_group_h)
@mock.patch.object(clusterutils.ClusterUtils,
'_get_cluster_group_state')
@mock.patch.object(clusterutils.ClusterUtils,
@ -680,6 +676,39 @@ class ClusterUtilsTestCase(test_base.OsWinBaseTestCase):
mock_listener.get.assert_called_once_with(None)
@mock.patch.object(clusterutils.ClusterUtils, '_get_cluster_nodes')
def get_cluster_node_name(self, mock_get_nodes):
fake_node = dict(id=mock.sentinel.vm_id,
name=mock.sentinel.vm_name)
mock_get_nodes.return_value([fake_node])
self.assertEqual(
mock.sentinel.vm_name,
self._clusterutils.get_cluster_node_name(mock.sentinel.vm_id))
self.assertRaises(
exceptions.NotFound,
self._clusterutils.get_cluster_node_name(mock.sentinel.missing_id))
@mock.patch('ctypes.byref')
def test_get_cluster_group_type(self, mock_byref):
mock_byref.side_effect = lambda x: ('byref', x)
self._clusapi.cluster_group_control.return_value = (
mock.sentinel.buff, mock.sentinel.buff_sz)
ret_val = self._clusterutils.get_cluster_group_type(
mock.sentinel.group_name)
self.assertEqual(
self._clusapi.get_cluster_group_type.return_value,
ret_val)
self._cmgr.open_cluster_group.assert_called_once_with(
mock.sentinel.group_name)
self._clusapi.cluster_group_control.assert_called_once_with(
self._cmgr_val(self._cmgr.open_cluster_group),
w_const.CLUSCTL_GROUP_GET_RO_COMMON_PROPERTIES)
self._clusapi.get_cluster_group_type.assert_called_once_with(
mock_byref(mock.sentinel.buff), mock.sentinel.buff_sz)
@mock.patch.object(clusterutils.ClusterUtils,
'_get_cluster_group_state')
@mock.patch.object(clusterutils.ClusterUtils,
@ -687,8 +716,7 @@ class ClusterUtilsTestCase(test_base.OsWinBaseTestCase):
def test_get_cluster_group_state_info(self, mock_is_migr_queued,
mock_get_gr_state):
exp_clus_h = self._clusapi.open_cluster.return_value
exp_clus_group_h = self._clusapi.open_cluster_group.return_value
exp_clus_group_h = self._cmgr_val(self._cmgr.open_cluster_group)
mock_get_gr_state.return_value = dict(
state=mock.sentinel.state,
@ -703,17 +731,12 @@ class ClusterUtilsTestCase(test_base.OsWinBaseTestCase):
self.assertEqual(exp_sts_info, sts_info)
self._clusapi.open_cluster.assert_called_once_with()
self._clusapi.open_cluster_group.assert_called_once_with(
exp_clus_h, mock.sentinel.group_name)
self._cmgr.open_cluster_group.assert_called_once_with(
mock.sentinel.group_name)
mock_get_gr_state.assert_called_once_with(exp_clus_group_h)
mock_is_migr_queued.assert_called_once_with(mock.sentinel.status_info)
self._clusapi.close_cluster.assert_called_once_with(exp_clus_h)
self._clusapi.close_cluster_group.assert_called_once_with(
exp_clus_group_h)
@mock.patch('ctypes.byref')
def test_get_cluster_group_state(self, mock_byref):
mock_byref.side_effect = lambda x: ('byref', x)
@ -802,16 +825,48 @@ class ClusterUtilsTestCase(test_base.OsWinBaseTestCase):
mock_time.sleep.assert_called_once_with(
constants.DEFAULT_WMI_EVENT_TIMEOUT_MS / 1000)
@mock.patch.object(clusterutils._ClusterGroupOwnerChangeListener, 'get')
@mock.patch.object(clusterutils.ClusterUtils, 'get_cluster_node_name')
@mock.patch.object(clusterutils.ClusterUtils, 'get_cluster_group_type')
@mock.patch.object(clusterutils, 'time')
def test_get_vm_owner_change_listener_v2(self, mock_time, mock_get_type,
mock_get_node_name,
mock_get_event):
mock_get_type.side_effect = [
w_const.ClusGroupTypeVirtualMachine,
mock.sentinel.other_type]
mock_events = [mock.MagicMock(), mock.MagicMock()]
mock_get_event.side_effect = (
mock_events + [exceptions.OSWinException, KeyboardInterrupt])
callback = mock.Mock()
listener = self._clusterutils.get_vm_owner_change_listener_v2()
self.assertRaises(KeyboardInterrupt,
listener,
callback)
callback.assert_called_once_with(
mock_events[0]['cluster_object_name'],
mock_get_node_name.return_value)
mock_get_node_name.assert_called_once_with(mock_events[0]['parent_id'])
mock_get_type.assert_any_call(mock_events[0]['cluster_object_name'])
mock_time.sleep.assert_called_once_with(
constants.DEFAULT_WMI_EVENT_TIMEOUT_MS / 1000)
class ClusterEventListenerTestCase(test_base.OsWinBaseTestCase):
@mock.patch.object(clusterutils._ClusterEventListener, '_setup')
def setUp(self, mock_setup):
super(ClusterEventListenerTestCase, self).setUp()
self._setup_listener()
def _setup_listener(self, stop_on_error=True):
self._listener = clusterutils._ClusterEventListener(
mock.sentinel.cluster_handle,
mock.sentinel.notif_filters_list)
stop_on_error=stop_on_error)
self._listener._running = True
self._listener._clusapi_utils = mock.Mock()
self._clusapi = self._listener._clusapi_utils
@ -871,8 +926,6 @@ class ClusterEventListenerTestCase(test_base.OsWinBaseTestCase):
mock.sentinel.notif_key)
def test_signal_stopped(self):
self._listener._running = True
self._listener._signal_stopped()
self.assertFalse(self._listener._running)
@ -895,7 +948,6 @@ class ClusterEventListenerTestCase(test_base.OsWinBaseTestCase):
events = [mock.sentinel.ignored_event, mock.sentinel.retrieved_event]
self._clusapi.get_cluster_notify_v2.side_effect = events
self._listener._running = True
self._listener._notif_port_h = mock.sentinel.notif_port_h
def fake_process_event(event):
@ -919,8 +971,6 @@ class ClusterEventListenerTestCase(test_base.OsWinBaseTestCase):
timeout_ms=-1)
def test_listen_exception(self):
self._listener._running = True
self._clusapi.get_cluster_notify_v2.side_effect = (
test_base.TestingException)
@ -928,8 +978,21 @@ class ClusterEventListenerTestCase(test_base.OsWinBaseTestCase):
self.assertFalse(self._listener._running)
@mock.patch.object(clusterutils._ClusterEventListener, '_setup')
@mock.patch.object(clusterutils.time, 'sleep')
def test_listen_ignore_exception(self, mock_sleep, mock_setup):
self._setup_listener(stop_on_error=False)
self._clusapi.get_cluster_notify_v2.side_effect = (
test_base.TestingException,
KeyboardInterrupt)
self.assertRaises(KeyboardInterrupt, self._listener._listen)
self.assertTrue(self._listener._running)
mock_sleep.assert_called_once_with(
self._listener._error_sleep_interval)
def test_get_event(self):
self._listener._running = True
self._listener._event_queue = mock.Mock()
event = self._listener.get(timeout=mock.sentinel.timeout)
@ -939,6 +1002,7 @@ class ClusterEventListenerTestCase(test_base.OsWinBaseTestCase):
timeout=mock.sentinel.timeout)
def test_get_event_listener_stopped(self):
self._listener._running = False
self.assertRaises(exceptions.OSWinException,
self._listener.get,
timeout=1)

View File

@ -13,8 +13,10 @@
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
import ctypes
from os_win._i18n import _
from os_win import constants
from os_win import exceptions
from os_win.utils import win32utils
@ -115,6 +117,14 @@ class ClusApiUtils(object):
**self._open_handle_check_flags)
return handle
def open_cluster_enum(self, cluster_handle, object_type):
return self._run_and_check_output(
clusapi.ClusterOpenEnumEx,
cluster_handle,
object_type,
None, # pOptions, reserved for future use.
**self._open_handle_check_flags)
def open_cluster_group(self, cluster_handle, group_name):
handle = self._run_and_check_output(clusapi.OpenClusterGroup,
cluster_handle,
@ -129,6 +139,13 @@ class ClusApiUtils(object):
**self._open_handle_check_flags)
return handle
def open_cluster_resource(self, cluster_handle, resource_name):
handle = self._run_and_check_output(clusapi.OpenClusterResource,
cluster_handle,
ctypes.c_wchar_p(resource_name),
**self._open_handle_check_flags)
return handle
def close_cluster(self, cluster_handle):
# This function will always return 'True'. Closing the cluster
# handle will also invalidate handles opened using it.
@ -142,6 +159,25 @@ class ClusApiUtils(object):
def close_cluster_node(self, node_handle):
clusapi.CloseClusterNode(node_handle)
def close_cluster_resource(self, resource_handle):
clusapi.CloseClusterResource(resource_handle)
def close_cluster_enum(self, enum_handle):
clusapi.ClusterCloseEnumEx(enum_handle)
def online_cluster_group(self, group_handle, destination_node_handle=None):
self._run_and_check_output(clusapi.OnlineClusterGroup,
group_handle,
destination_node_handle)
def destroy_cluster_group(self, group_handle):
self._run_and_check_output(clusapi.DestroyClusterGroup,
group_handle)
def offline_cluster_group(self, group_handle):
self._run_and_check_output(clusapi.OfflineClusterGroup,
group_handle)
def cancel_cluster_group_operation(self, group_handle):
"""Requests a pending move operation to be canceled.
@ -239,6 +275,9 @@ class ClusApiUtils(object):
def get_cluster_notify_v2(self, notif_port_h, timeout_ms):
filter_and_type = clusapi_def.NOTIFY_FILTER_AND_TYPE()
obj_name_buff_sz = ctypes.c_ulong(w_const.MAX_PATH)
obj_type_buff_sz = ctypes.c_ulong(w_const.MAX_PATH)
obj_id_buff_sz = ctypes.c_ulong(w_const.MAX_PATH)
parent_id_buff_sz = ctypes.c_ulong(w_const.MAX_PATH)
notif_key_p = wintypes.PDWORD()
buff_sz = ctypes.c_ulong(w_const.MAX_PATH)
@ -246,33 +285,53 @@ class ClusApiUtils(object):
# on the event type and filter flags.
buff = (wintypes.BYTE * buff_sz.value)()
obj_name_buff = (ctypes.c_wchar * obj_name_buff_sz.value)()
obj_type_buff = (ctypes.c_wchar * obj_type_buff_sz.value)()
obj_id_buff = (ctypes.c_wchar * obj_id_buff_sz.value)()
parent_id_buff = (ctypes.c_wchar * parent_id_buff_sz.value)()
def get_args(buff, obj_name_buff):
return (clusapi.GetClusterNotifyV2,
notif_port_h,
ctypes.byref(notif_key_p),
ctypes.byref(filter_and_type),
buff,
ctypes.byref(buff_sz),
None, # object id
None, # object id sz
None, # parent id
None, # parent id sz
obj_name_buff,
ctypes.byref(obj_name_buff_sz),
None, # object type
None, # object type sz
timeout_ms)
try:
self._run_and_check_output(*get_args(buff, obj_name_buff))
self._run_and_check_output(
clusapi.GetClusterNotifyV2,
notif_port_h,
ctypes.byref(notif_key_p),
ctypes.byref(filter_and_type),
buff,
ctypes.byref(buff_sz),
obj_id_buff,
ctypes.byref(obj_id_buff_sz),
parent_id_buff,
ctypes.byref(parent_id_buff_sz),
obj_name_buff,
ctypes.byref(obj_name_buff_sz),
obj_type_buff,
ctypes.byref(obj_type_buff_sz),
timeout_ms)
except exceptions.ClusterWin32Exception as ex:
if ex.error_code == w_const.ERROR_MORE_DATA:
# This function will specify the buffer sizes it needs using
# the references we pass.
buff = (wintypes.BYTE * buff_sz.value)()
obj_name_buff = (ctypes.c_wchar * obj_name_buff_sz.value)()
parent_id_buff = (ctypes.c_wchar * parent_id_buff_sz.value)()
obj_type_buff = (ctypes.c_wchar * obj_type_buff_sz.value)()
obj_id_buff = (ctypes.c_wchar * obj_id_buff_sz.value)()
self._run_and_check_output(*get_args(buff, obj_name_buff))
self._run_and_check_output(
clusapi.GetClusterNotifyV2,
notif_port_h,
ctypes.byref(notif_key_p),
ctypes.byref(filter_and_type),
buff,
ctypes.byref(buff_sz),
obj_id_buff,
ctypes.byref(obj_id_buff_sz),
parent_id_buff,
ctypes.byref(parent_id_buff_sz),
obj_name_buff,
ctypes.byref(obj_name_buff_sz),
obj_type_buff,
ctypes.byref(obj_type_buff_sz),
timeout_ms)
else:
raise
@ -281,8 +340,11 @@ class ClusApiUtils(object):
# the notification port.
notif_key = notif_key_p.contents.value
event = {'cluster_object_name': obj_name_buff.value,
'object_id': obj_id_buff.value,
'object_type': filter_and_type.dwObjectType,
'object_type_str': obj_type_buff.value,
'filter_flags': filter_and_type.FilterFlags,
'parent_id': parent_id_buff.value,
'buff': buff,
'buff_sz': buff_sz.value,
'notif_key': notif_key}
@ -354,17 +416,154 @@ class ClusApiUtils(object):
return out_buff, out_buff_sz.value
def get_cluster_group_status_info(self, prop_list_p, prop_list_sz):
def get_prop_list_entry_value(self, prop_list_p, prop_list_sz,
entry_name, entry_type, entry_syntax):
prop_entry = self.get_prop_list_entry_p(
prop_list_p, prop_list_sz,
w_const.CLUSREG_NAME_GRP_STATUS_INFORMATION)
prop_list_p, prop_list_sz, entry_name)
if (prop_entry['length'] != ctypes.sizeof(ctypes.c_ulonglong) or
prop_entry['syntax'] !=
w_const.CLUSPROP_SYNTAX_LIST_VALUE_ULARGE_INTEGER):
if (prop_entry['length'] != ctypes.sizeof(entry_type) or
prop_entry['syntax'] != entry_syntax):
raise exceptions.ClusterPropertyListParsingError()
status_info_p = prop_entry['val_p']
status_info = ctypes.c_ulonglong.from_address(
status_info_p.value).value
return status_info
return entry_type.from_address(prop_entry['val_p'].value).value
def get_cluster_group_status_info(self, prop_list_p, prop_list_sz):
return self.get_prop_list_entry_value(
prop_list_p, prop_list_sz,
w_const.CLUSREG_NAME_GRP_STATUS_INFORMATION,
ctypes.c_ulonglong,
w_const.CLUSPROP_SYNTAX_LIST_VALUE_ULARGE_INTEGER)
def get_cluster_group_type(self, prop_list_p, prop_list_sz):
return self.get_prop_list_entry_value(
prop_list_p, prop_list_sz,
w_const.CLUSREG_NAME_GRP_TYPE,
wintypes.DWORD,
w_const.CLUSPROP_SYNTAX_LIST_VALUE_DWORD)
def cluster_get_enum_count(self, enum_handle):
return self._run_and_check_output(
clusapi.ClusterGetEnumCountEx,
enum_handle,
error_on_nonzero_ret_val=False,
ret_val_is_err_code=False)
def cluster_enum(self, enum_handle, index):
item_sz = wintypes.DWORD(0)
self._run_and_check_output(
clusapi.ClusterEnumEx,
enum_handle,
index,
None,
ctypes.byref(item_sz),
ignored_error_codes=[w_const.ERROR_MORE_DATA])
item_buff = (ctypes.c_ubyte * item_sz.value)()
self._run_and_check_output(
clusapi.ClusterEnumEx,
enum_handle,
index,
ctypes.byref(item_buff),
ctypes.byref(item_sz))
return ctypes.cast(item_buff,
clusapi_def.PCLUSTER_ENUM_ITEM).contents
class ClusterContextManager(object):
_CLUSTER_HANDLE = 0
_NODE_HANDLE = 1
_GROUP_HANDLE = 2
_RESOURCE_HANDLE = 3
_ENUM_HANDLE = 4
_HANDLE_TYPES = [
_CLUSTER_HANDLE, _NODE_HANDLE, _GROUP_HANDLE, _RESOURCE_HANDLE,
_ENUM_HANDLE
]
def __init__(self):
self._clusapi_utils = ClusApiUtils()
def open_cluster(self, cluster_name=None):
return self._open(cluster_name, self._CLUSTER_HANDLE)
def open_cluster_group(self, group_name, cluster_handle=None):
return self._open(group_name, self._GROUP_HANDLE, cluster_handle)
def open_cluster_resource(self, resource_name, cluster_handle=None):
return self._open(resource_name, self._RESOURCE_HANDLE, cluster_handle)
def open_cluster_node(self, node_name, cluster_handle=None):
return self._open(node_name, self._NODE_HANDLE, cluster_handle)
def open_cluster_enum(self, object_type, cluster_handle=None):
return self._open(object_type, self._ENUM_HANDLE, cluster_handle)
def _check_handle_type(self, handle_type):
if handle_type not in self._HANDLE_TYPES:
err_msg = _("Invalid cluster handle type: %(handle_type)s. "
"Allowed handle types: %(allowed_types)s.")
raise exceptions.Invalid(
err_msg % dict(handle_type=handle_type,
allowed_types=self._HANDLE_TYPES))
def _close(self, handle, handle_type):
self._check_handle_type(handle_type)
if not handle:
return
cutils = self._clusapi_utils
helper_map = {
self._CLUSTER_HANDLE: cutils.close_cluster,
self._RESOURCE_HANDLE: cutils.close_cluster_resource,
self._GROUP_HANDLE: cutils.close_cluster_group,
self._NODE_HANDLE: cutils.close_cluster_node,
self._ENUM_HANDLE: cutils.close_cluster_enum,
}
helper_map[handle_type](handle)
@contextlib.contextmanager
def _open(self, name=None, handle_type=_CLUSTER_HANDLE,
cluster_handle=None):
self._check_handle_type(handle_type)
ext_cluster_handle = cluster_handle is not None
handle = None
try:
# We accept a cluster handle, avoiding opening it again.
if not cluster_handle:
cluster_name = (name if handle_type == self._CLUSTER_HANDLE
else None)
cluster_handle = self._clusapi_utils.open_cluster(cluster_name)
cutils = self._clusapi_utils
helper_map = {
self._CLUSTER_HANDLE: lambda x, y: x,
self._RESOURCE_HANDLE: cutils.open_cluster_resource,
self._GROUP_HANDLE: cutils.open_cluster_group,
self._NODE_HANDLE: cutils.open_cluster_node,
self._ENUM_HANDLE: cutils.open_cluster_enum,
}
handle = helper_map[handle_type](cluster_handle, name)
yield handle
except exceptions.ClusterWin32Exception as win32_ex:
if win32_ex.error_code in w_const.CLUSTER_NOT_FOUND_ERROR_CODES:
err_msg = _("Could not find the specified cluster object. "
"Object type: %(obj_type)s. "
"Object name: %(name)s.")
raise exceptions.ClusterObjectNotFound(
err_msg % dict(obj_type=handle_type,
name=name))
else:
raise
finally:
if handle_type != self._CLUSTER_HANDLE:
self._close(handle, handle_type)
if not ext_cluster_handle:
self._close(cluster_handle, self._CLUSTER_HANDLE)

View File

@ -49,7 +49,6 @@ class ClusterUtils(baseutils.BaseUtils):
_VM_BASE_NAME = 'Virtual Machine %s'
_VM_TYPE = 'Virtual Machine'
_VM_GROUP_TYPE = 111
_MS_CLUSTER_NAMESPACE = '//%s/root/MSCluster'
@ -66,6 +65,7 @@ class ClusterUtils(baseutils.BaseUtils):
def __init__(self, host='.'):
self._instance_name_regex = re.compile('Virtual Machine (.*)')
self._clusapi_utils = _clusapi_utils.ClusApiUtils()
self._cmgr = _clusapi_utils.ClusterContextManager()
if sys.platform == 'win32':
self._init_hyperv_conn(host)
@ -96,7 +96,7 @@ class ClusterUtils(baseutils.BaseUtils):
return self._conn_cluster.watch_for(raw_wql=raw_query)
def check_cluster_state(self):
if len(self._get_cluster_nodes()) < 1:
if len(list(self._get_cluster_nodes())) < 1:
raise exceptions.HyperVClusterException(
_("Not enough cluster nodes."))
@ -104,17 +104,13 @@ class ClusterUtils(baseutils.BaseUtils):
return self._this_node
def _get_cluster_nodes(self):
cluster_assoc = self._conn_cluster.MSCluster_ClusterToNode(
Antecedent=self._cluster.path_())
return [x.Dependent for x in cluster_assoc]
return self.cluster_enum(w_const.CLUSTER_ENUM_NODE)
def _get_vm_groups(self):
assocs = self._conn_cluster.MSCluster_ClusterToResourceGroup(
GroupComponent=self._cluster.path_())
resources = [a.PartComponent for a in assocs]
return (r for r in resources if
hasattr(r, 'GroupType') and
r.GroupType == self._VM_GROUP_TYPE)
for r in self.cluster_enum(w_const.CLUSTER_ENUM_GROUP):
group_type = self.get_cluster_group_type(r['name'])
if group_type == w_const.ClusGroupTypeVirtualMachine:
yield r
def _lookup_vm_group_check(self, vm_name):
vm = self._lookup_vm_group(vm_name)
@ -126,16 +122,6 @@ class ClusterUtils(baseutils.BaseUtils):
return self._lookup_res(self._conn_cluster.MSCluster_ResourceGroup,
vm_name)
def _lookup_vm_check(self, vm_name):
vm = self._lookup_vm(vm_name)
if not vm:
raise exceptions.HyperVVMNotFoundException(vm_name=vm_name)
return vm
def _lookup_vm(self, vm_name):
vm_name = self._VM_BASE_NAME % vm_name
return self._lookup_res(self._conn_cluster.MSCluster_Resource, vm_name)
def _lookup_res(self, resource_source, res_name):
res = resource_source(Name=res_name)
n = len(res)
@ -149,16 +135,18 @@ class ClusterUtils(baseutils.BaseUtils):
def get_cluster_node_names(self):
nodes = self._get_cluster_nodes()
return [n.Name for n in nodes]
return [n['name'] for n in nodes]
def get_vm_host(self, vm_name):
return self._lookup_vm_group_check(vm_name).OwnerNode
with self._cmgr.open_cluster_group(vm_name) as group_handle:
state_info = self._get_cluster_group_state(group_handle)
return state_info['owner_node']
def list_instances(self):
return [r.Name for r in self._get_vm_groups()]
return [r['name'] for r in self._get_vm_groups()]
def list_instance_uuids(self):
return [r.Id for r in self._get_vm_groups()]
return [r['id'] for r in self._get_vm_groups()]
def add_vm_to_cluster(self, vm_name, max_failover_count=1,
failover_period=6, auto_failback=True):
@ -190,19 +178,41 @@ class ClusterUtils(baseutils.BaseUtils):
vm_group.put()
def bring_online(self, vm_name):
vm = self._lookup_vm_check(vm_name)
vm.BringOnline()
with self._cmgr.open_cluster_group(vm_name) as group_handle:
self._clusapi_utils.online_cluster_group(group_handle)
def take_offline(self, vm_name):
vm = self._lookup_vm_check(vm_name)
vm.TakeOffline()
with self._cmgr.open_cluster_group(vm_name) as group_handle:
self._clusapi_utils.offline_cluster_group(group_handle)
def delete(self, vm_name):
# We're sticking with WMI, for now. Destroying VM cluster groups using
# clusapi's DestroyClusterGroup function acts strange. VMs get
# recreated asyncronuously and put in suspended state,
# breaking everything.
vm = self._lookup_vm_group_check(vm_name)
vm.DestroyGroup(self._DESTROY_GROUP)
def cluster_enum(self, object_type):
with self._cmgr.open_cluster_enum(object_type) as enum_handle:
object_count = self._clusapi_utils.cluster_get_enum_count(
enum_handle)
for idx in range(object_count):
item = self._clusapi_utils.cluster_enum(enum_handle, idx)
item_dict = dict(version=item.dwVersion,
type=item.dwType,
id=item.lpszId,
name=item.lpszName)
yield item_dict
def vm_exists(self, vm_name):
return self._lookup_vm(vm_name) is not None
res_name = self._VM_BASE_NAME % vm_name
try:
with self._cmgr.open_cluster_resource(res_name):
return True
except exceptions.ClusterObjectNotFound:
return False
def live_migrate_vm(self, vm_name, new_host, timeout=None):
self._migrate_vm(vm_name, new_host, self._LIVE_MIGRATION_TYPE,
@ -227,62 +237,51 @@ class ClusterUtils(baseutils.BaseUtils):
w_const.CLUSAPI_GROUP_MOVE_QUEUE_ENABLED |
w_const.CLUSAPI_GROUP_MOVE_HIGH_PRIORITY_START)
cluster_handle = None
group_handle = None
dest_node_handle = None
with self._cmgr.open_cluster() as cluster_handle, \
self._cmgr.open_cluster_group(
vm_name,
cluster_handle=cluster_handle) as group_handle, \
self._cmgr.open_cluster_node(
new_host,
cluster_handle=cluster_handle) as dest_node_handle, \
_ClusterGroupStateChangeListener(cluster_handle,
vm_name) as listener:
self._clusapi_utils.move_cluster_group(group_handle,
dest_node_handle,
flags,
prop_list)
try:
self._wait_for_cluster_group_migration(
listener,
vm_name,
group_handle,
exp_state_after_migr,
timeout)
except exceptions.ClusterGroupMigrationTimeOut:
with excutils.save_and_reraise_exception() as ctxt:
self._cancel_cluster_group_migration(
listener, vm_name, group_handle,
exp_state_after_migr, timeout)
try:
cluster_handle = self._clusapi_utils.open_cluster()
group_handle = self._clusapi_utils.open_cluster_group(
cluster_handle, vm_name)
dest_node_handle = self._clusapi_utils.open_cluster_node(
cluster_handle, new_host)
with _ClusterGroupStateChangeListener(cluster_handle,
vm_name) as listener:
self._clusapi_utils.move_cluster_group(group_handle,
dest_node_handle,
flags,
prop_list)
try:
self._wait_for_cluster_group_migration(
listener,
vm_name,
group_handle,
exp_state_after_migr,
timeout)
except exceptions.ClusterGroupMigrationTimeOut:
with excutils.save_and_reraise_exception() as ctxt:
self._cancel_cluster_group_migration(
listener, vm_name, group_handle,
exp_state_after_migr, timeout)
# This is rather unlikely to happen but we're
# covering it out.
try:
self._validate_migration(group_handle,
vm_name,
exp_state_after_migr,
new_host)
LOG.warning(
'Cluster group migration completed '
'successfuly after cancel attempt. '
'Suppressing timeout exception.')
ctxt.reraise = False
except exceptions.ClusterGroupMigrationFailed:
pass
else:
self._validate_migration(group_handle,
vm_name,
exp_state_after_migr,
new_host)
finally:
if group_handle:
self._clusapi_utils.close_cluster_group(group_handle)
if dest_node_handle:
self._clusapi_utils.close_cluster_node(dest_node_handle)
if cluster_handle:
self._clusapi_utils.close_cluster(cluster_handle)
# This is rather unlikely to happen but we're
# covering it out.
try:
self._validate_migration(group_handle,
vm_name,
exp_state_after_migr,
new_host)
LOG.warning(
'Cluster group migration completed '
'successfuly after cancel attempt. '
'Suppressing timeout exception.')
ctxt.reraise = False
except exceptions.ClusterGroupMigrationFailed:
pass
else:
self._validate_migration(group_handle,
vm_name,
exp_state_after_migr,
new_host)
def _validate_migration(self, group_handle, group_name,
expected_state, expected_node):
@ -301,24 +300,15 @@ class ClusterUtils(baseutils.BaseUtils):
def cancel_cluster_group_migration(self, group_name, expected_state,
timeout=None):
cluster_handle = None
group_handle = None
try:
cluster_handle = self._clusapi_utils.open_cluster()
group_handle = self._clusapi_utils.open_cluster_group(
cluster_handle, group_name)
with _ClusterGroupStateChangeListener(cluster_handle,
group_name) as listener:
self._cancel_cluster_group_migration(
listener, group_name, group_handle,
expected_state, timeout)
finally:
if group_handle:
self._clusapi_utils.close_cluster_group(group_handle)
if cluster_handle:
self._clusapi_utils.close_cluster(cluster_handle)
with self._cmgr.open_cluster() as cluster_handle, \
self._cmgr.open_cluster_group(
group_name,
cluster_handle=cluster_handle) as group_handle, \
_ClusterGroupStateChangeListener(cluster_handle,
group_name) as listener:
self._cancel_cluster_group_migration(
listener, group_name, group_handle,
expected_state, timeout)
def _cancel_cluster_group_migration(self, event_listener,
group_name, group_handle,
@ -416,20 +406,28 @@ class ClusterUtils(baseutils.BaseUtils):
group_name=group_name,
time_elapsed=time.time() - time_start)
def get_cluster_node_name(self, node_id):
for node in self._get_cluster_nodes():
if node['id'] == node_id:
return node['name']
err_msg = _("Could not find any cluster node with id: %s.")
raise exceptions.NotFound(err_msg % node_id)
def get_cluster_group_type(self, group_name):
with self._cmgr.open_cluster_group(group_name) as group_handle:
buff, buff_sz = self._clusapi_utils.cluster_group_control(
group_handle, w_const.CLUSCTL_GROUP_GET_RO_COMMON_PROPERTIES)
return self._clusapi_utils.get_cluster_group_type(
ctypes.byref(buff), buff_sz)
def get_cluster_group_state_info(self, group_name):
"""Gets cluster group state info.
:return: a dict containing the following keys:
['state', 'migration_queued', 'owner_node']
"""
cluster_handle = None
group_handle = None
try:
cluster_handle = self._clusapi_utils.open_cluster()
group_handle = self._clusapi_utils.open_cluster_group(
cluster_handle, group_name)
with self._cmgr.open_cluster_group(group_name) as group_handle:
state_info = self._get_cluster_group_state(group_handle)
migration_queued = self._is_migration_queued(
state_info['status_info'])
@ -437,11 +435,6 @@ class ClusterUtils(baseutils.BaseUtils):
return dict(owner_node=state_info['owner_node'],
state=state_info['state'],
migration_queued=migration_queued)
finally:
if group_handle:
self._clusapi_utils.close_cluster_group(group_handle)
if cluster_handle:
self._clusapi_utils.close_cluster(cluster_handle)
def _get_cluster_group_state(self, group_handle):
state_info = self._clusapi_utils.get_cluster_group_state(group_handle)
@ -519,6 +512,30 @@ class ClusterUtils(baseutils.BaseUtils):
return listener
def get_vm_owner_change_listener_v2(self):
def listener(callback):
cluster_handle = self._clusapi_utils.open_cluster()
_listener = _ClusterGroupOwnerChangeListener(cluster_handle)
while True:
try:
event = _listener.get()
group_name = event['cluster_object_name']
group_type = self.get_cluster_group_type(group_name)
if group_type != w_const.ClusGroupTypeVirtualMachine:
continue
new_node_id = event['parent_id']
new_node_name = self.get_cluster_node_name(new_node_id)
callback(group_name, new_node_name)
except Exception:
LOG.exception("The VM cluster group owner change "
"event listener encountered an "
"unexpected exception.")
time.sleep(constants.DEFAULT_WMI_EVENT_TIMEOUT_MS / 1000)
return listener
# At the moment, those event listeners are not meant to be used outside
# os-win, mostly because of the underlying API limitations.
@ -527,10 +544,12 @@ class _ClusterEventListener(object):
_notif_port_h = None
_cluster_handle = None
_running = False
_stop_on_error = True
_error_sleep_interval = 2
def __init__(self, cluster_handle, notif_filters_list):
def __init__(self, cluster_handle, stop_on_error=True):
self._cluster_handle = cluster_handle
self._notif_filters_list = notif_filters_list
self._stop_on_error = stop_on_error
self._clusapi_utils = _clusapi_utils.ClusApiUtils()
self._event_queue = queue.Queue()
@ -607,9 +626,13 @@ class _ClusterEventListener(object):
except Exception:
if self._running:
LOG.exception(
"Unexpected exception in event listener loop. "
"The cluster event listener will now close.")
self._signal_stopped()
"Unexpected exception in event listener loop.")
if self._stop_on_error:
LOG.warning(
"The cluster event listener will now close.")
self._signal_stopped()
else:
time.sleep(self._error_sleep_interval)
def _process_event(self, event):
return event
@ -640,11 +663,11 @@ class _ClusterGroupStateChangeListener(_ClusterEventListener):
filter_flags=w_const.CLUSTER_CHANGE_GROUP_COMMON_PROPERTY_V2,
notif_key=_NOTIF_KEY_GROUP_COMMON_PROP)]
def __init__(self, cluster_handle, group_name=None):
def __init__(self, cluster_handle, group_name=None, **kwargs):
self._group_name = group_name
super(_ClusterGroupStateChangeListener, self).__init__(
cluster_handle, self._notif_filters_list)
cluster_handle, **kwargs)
def _process_event(self, event):
group_name = event['cluster_object_name']
@ -674,3 +697,10 @@ class _ClusterGroupStateChangeListener(_ClusterEventListener):
# At the moment, we only care about the 'StatusInformation'
# common property.
pass
class _ClusterGroupOwnerChangeListener(_ClusterEventListener):
_notif_filters_list = [
dict(object_type=w_const.CLUSTER_OBJECT_TYPE_GROUP,
filter_flags=w_const.CLUSTER_CHANGE_GROUP_OWNER_NODE_V2)
]

View File

@ -28,6 +28,32 @@ ERROR_MORE_DATA = 234
ERROR_WAIT_TIMEOUT = 258
ERROR_IO_PENDING = 997
ERROR_NOT_FOUND = 1168
# Cluster errors
ERROR_DEPENDENCY_NOT_FOUND = 5002
ERROR_RESOURCE_NOT_FOUND = 5007
ERROR_GROUP_NOT_FOUND = 5013
ERROR_CLUSTERLOG_CHKPOINT_NOT_FOUND = 5032
ERROR_CLUSTER_NODE_NOT_FOUND = 5042
ERROR_CLUSTER_LOCAL_NODE_NOT_FOUND = 5043
ERROR_CLUSTER_NETWORK_NOT_FOUND = 5045
ERROR_CLUSTER_NETINTERFACE_NOT_FOUND = 5047
ERROR_CLUSTER_RESOURCE_TYPE_NOT_FOUND = 5078
ERROR_CLUSTER_RESNAME_NOT_FOUND = 5080
ERROR_QUORUM_DISK_NOT_FOUND = 5086
ERROR_CLUSTER_QUORUMLOG_NOT_FOUND = 5891
ERROR_CLUSTER_NETWORK_NOT_FOUND_FOR_IP = 5894
CLUSTER_NOT_FOUND_ERROR_CODES = [
ERROR_DEPENDENCY_NOT_FOUND, ERROR_RESOURCE_NOT_FOUND,
ERROR_GROUP_NOT_FOUND, ERROR_CLUSTERLOG_CHKPOINT_NOT_FOUND,
ERROR_CLUSTER_NODE_NOT_FOUND, ERROR_CLUSTER_LOCAL_NODE_NOT_FOUND,
ERROR_CLUSTER_NETWORK_NOT_FOUND, ERROR_CLUSTER_NETINTERFACE_NOT_FOUND,
ERROR_CLUSTER_RESOURCE_TYPE_NOT_FOUND, ERROR_CLUSTER_RESNAME_NOT_FOUND,
ERROR_QUORUM_DISK_NOT_FOUND, ERROR_CLUSTER_QUORUMLOG_NOT_FOUND,
ERROR_CLUSTER_NETWORK_NOT_FOUND_FOR_IP,
]
ERROR_INVALID_STATE = 5023
ERROR_VHD_INVALID_TYPE = 0xC03A001B
@ -107,16 +133,30 @@ CLUSTER_OBJECT_TYPE_GROUP = 2
CLUSTER_CHANGE_GROUP_COMMON_PROPERTY_V2 = 2
CLUSTER_CHANGE_GROUP_STATE_V2 = 8
CLUSTER_CHANGE_GROUP_OWNER_NODE_V2 = 0x00000010
CLUSGRP_STATUS_WAITING_IN_QUEUE_FOR_MOVE = 4
CLUS_RESTYPE_NAME_VM = "Virtual Machine"
CLUS_RESTYPE_NAME_VM_CONFIG = "Virtual Machine Configuration"
CLUSREG_NAME_GRP_TYPE = "GroupType"
CLUSREG_NAME_GRP_STATUS_INFORMATION = 'StatusInformation'
CLUSCTL_GROUP_GET_RO_COMMON_PROPERTIES = 0x3000055
ClusGroupTypeVirtualMachine = 111
CLUSTER_ENUM_NODE = 0x00000001
CLUSTER_ENUM_RESTYPE = 0x00000002
CLUSTER_ENUM_RESOURCE = 0x00000004
CLUSTER_ENUM_GROUP = 0x00000008
CLUSTER_ENUM_NETWORK = 0x00000010
CLUSTER_ENUM_NETINTERFACE = 0x00000020
CLUSTER_ENUM_SHARED_VOLUME_GROUP = 0x20000000
CLUSTER_ENUM_SHARED_VOLUME_RESOURCE = 0x40000000
CLUSTER_ENUM_INTERNAL_NETWORK = 0x80000000
# iscsidsc.h
# ----------
ISCSI_ANY_INITIATOR_PORT = wintypes.ULONG(-1).value

View File

@ -27,6 +27,18 @@ class NOTIFY_FILTER_AND_TYPE(ctypes.Structure):
]
class CLUSTER_ENUM_ITEM(ctypes.Structure):
_fields_ = [
('dwVersion', wintypes.DWORD),
('dwType', wintypes.DWORD),
('cbId', wintypes.DWORD),
('lpszId', wintypes.LPWSTR),
('cbName', wintypes.DWORD),
('lpszName', wintypes.LPWSTR)
]
PCLUSTER_ENUM_ITEM = ctypes.POINTER(CLUSTER_ENUM_ITEM)
PNOTIFY_FILTER_AND_TYPE = ctypes.POINTER(NOTIFY_FILTER_AND_TYPE)
@ -49,9 +61,28 @@ def register():
lib_handle.CloseClusterNode.argtypes = [wintypes.HANDLE]
lib_handle.CloseClusterNode.restype = wintypes.BOOL
lib_handle.CloseClusterResource.argtypes = [wintypes.HANDLE]
lib_handle.CloseClusterResource.restype = wintypes.BOOL
lib_handle.CloseClusterNotifyPort.argtypes = [wintypes.HANDLE]
lib_handle.CloseClusterNotifyPort.restype = wintypes.BOOL
lib_handle.ClusterCloseEnumEx.argtypes = [wintypes.HANDLE]
lib_handle.ClusterCloseEnumEx.restype = wintypes.BOOL
lib_handle.ClusterEnumEx.argtypes = [
wintypes.HANDLE,
wintypes.DWORD,
wintypes.PVOID,
wintypes.LPDWORD
]
lib_handle.ClusterEnumEx.restype = wintypes.DWORD
lib_handle.ClusterGetEnumCountEx.argtypes = [
wintypes.HANDLE,
]
lib_handle.ClusterGetEnumCountEx.restype = wintypes.DWORD
lib_handle.ClusterGroupControl.argtypes = [
wintypes.HANDLE,
wintypes.HANDLE,
@ -64,6 +95,18 @@ def register():
]
lib_handle.ClusterGroupControl.restype = wintypes.DWORD
lib_handle.ClusterOpenEnumEx.argtypes = [
wintypes.HANDLE,
wintypes.DWORD,
wintypes.PVOID
]
lib_handle.ClusterOpenEnumEx.restype = wintypes.HANDLE
lib_handle.DestroyClusterGroup.argtypes = [
wintypes.HANDLE
]
lib_handle.DestroyClusterGroup.restype = wintypes.DWORD
lib_handle.GetClusterGroupState.argtypes = [
wintypes.HANDLE,
wintypes.LPWSTR,
@ -107,6 +150,17 @@ def register():
]
lib_handle.MoveClusterGroupEx.restype = wintypes.DWORD
lib_handle.OfflineClusterGroup.argtypes = [
wintypes.HANDLE
]
lib_handle.OfflineClusterGroup.restype = wintypes.DWORD
lib_handle.OnlineClusterGroup.argtypes = [
wintypes.HANDLE,
wintypes.HANDLE,
]
lib_handle.OnlineClusterGroup.restype = wintypes.DWORD
lib_handle.OpenCluster.argtypes = [wintypes.LPCWSTR]
lib_handle.OpenCluster.restype = wintypes.HANDLE
@ -121,3 +175,9 @@ def register():
wintypes.LPCWSTR
]
lib_handle.OpenClusterNode.restype = wintypes.HANDLE
lib_handle.OpenClusterResource.argtypes = [
wintypes.HANDLE,
wintypes.LPCWSTR
]
lib_handle.OpenClusterResource.restype = wintypes.HANDLE