Fix clustered vm live migration

At the moment, in order to migrate a clustered VM, we move the
according cluster resource group using the MoveToNewNodeParams WMI
method. This method accepts an undocumented 'Parameters' argument,
which is then passed to the underlying MoveClusterGroupEx clusapi.dll
function.

While the latter expects a property list as described at the following
link, we were passing the desired migration type directly.
https://msdn.microsoft.com/en-us/library/aa371809(v=vs.85).aspx

This worked on Windows Server 2012R2 but is causing issues on WS 2016.
Resource group end up hanging in a 'Pending' state.

This change addresses this issue by using clusapi.dll functions directly,
as the according WMI methods are not working, throwing a generic error
even though the VMs are migrated properly.

Closes-Bug: #1618425

Change-Id: Idfcd3505cbbf2754addeba4f1ebeb880f3b9a56b
This commit is contained in:
Lucian Petrut 2016-08-26 13:18:08 +03:00
parent a5efe06b24
commit f3de712905
8 changed files with 745 additions and 24 deletions

View File

@ -92,7 +92,8 @@ def get_wrapped_function(function):
def retry_decorator(max_retry_count=5, timeout=None, inc_sleep_time=1,
max_sleep_time=1, exceptions=(), error_codes=()):
max_sleep_time=1, exceptions=(), error_codes=(),
pass_retry_context=False):
"""Retries invoking the decorated method in case of expected exceptions.
:param max_retry_count: The maximum number of retries performed. If 0, no
@ -109,6 +110,13 @@ def retry_decorator(max_retry_count=5, timeout=None, inc_sleep_time=1,
for example in case of Win32Exception. If this argument
is not passed, retries will be performed for any of the
expected exceptions.
:param pass_retry_context: Convenient way of letting a method aware of
this decorator prevent a retry from being
performed. The decorated method must accept an
argument called 'retry_context', which will
include a dict containing the 'prevent_retry'
field. If this field is set, no further retries
will be performed.
"""
if isinstance(error_codes, six.integer_types):
@ -120,6 +128,10 @@ def retry_decorator(max_retry_count=5, timeout=None, inc_sleep_time=1,
sleep_time = 0
time_start = time.time()
retry_context = dict(prevent_retry=False)
if pass_retry_context:
kwargs['retry_context'] = retry_context
while True:
try:
return f(*args, **kwargs)
@ -137,6 +149,7 @@ def retry_decorator(max_retry_count=5, timeout=None, inc_sleep_time=1,
else 'undefined')
should_retry = (
not retry_context['prevent_retry'] and
expected_err_code and
tries_left and
(time_left == 'undefined' or

View File

@ -161,3 +161,10 @@ DNS_ZONE_TRANSFER_ALLOWED_ANY_HOST = 0
DNS_ZONE_TRANSFER_ALLOWED_NAME_SERVERS = 1
DNS_ZONE_TRANSFER_ALLOWED_SECONDARY_SERVERS = 2
DNS_ZONE_TRANSFER_NOT_ALLOWED = 3
CLUSTER_GROUP_STATE_UNKNOWN = -1
CLUSTER_GROUP_ONLINE = 0
CLUSTER_GROUP_OFFLINE = 1
CLUSTER_GROUP_FAILED = 2
CLUSTER_GROUP_PARTIAL_ONLINE = 3
CLUSTER_GROUP_PENDING = 4

View File

@ -172,3 +172,18 @@ class DNSZoneAlreadyExists(DNSException):
class JobTerminateFailed(HyperVException):
msg_fmt = _("Could not terminate the requested job(s).")
class ClusterException(OSWinException):
pass
class ClusterWin32Exception(ClusterException, Win32Exception):
pass
class InvalidClusterGroupState(ClusterException):
msg_fmt = _("The cluster group %(group_name)s is in an invalid state. "
"Expected state %(expected_state)s. Expected owner node: "
"%(expected_node)s. Current group state: %(group_state)s. "
"Current owner node: %(owner_node)s.")

View File

@ -159,6 +159,32 @@ class UtilsTestCase(base.BaseTestCase):
self._test_retry_decorator_no_retry(
expected_exceptions=(IOError, AttributeError))
@mock.patch('time.sleep')
def test_retry_decorator_explicitly_avoid_retry(self, mock_sleep):
# Tests the case when there is a function aware of the retry
# decorator and explicitly requests that no retry should be
# performed.
def func_side_effect(fake_arg, retry_context):
self.assertEqual(mock.sentinel.arg, fake_arg)
self.assertEqual(retry_context, dict(prevent_retry=False))
retry_context['prevent_retry'] = True
raise exceptions.Win32Exception(message='fake_exc',
error_code=1)
fake_func, mock_side_effect = (
self._get_fake_func_with_retry_decorator(
exceptions=exceptions.Win32Exception,
side_effect=func_side_effect,
pass_retry_context=True))
self.assertRaises(exceptions.Win32Exception,
fake_func, mock.sentinel.arg)
self.assertEqual(1, mock_side_effect.call_count)
self.assertFalse(mock_sleep.called)
@mock.patch('socket.getaddrinfo')
def test_get_ips(self, mock_getaddrinfo):
ips = ['1.2.3.4', '5.6.7.8']

View File

@ -0,0 +1,269 @@
# Copyright 2016 Cloudbase Solutions Srl
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import ctypes
import ddt
import mock
from os_win import constants
from os_win import exceptions
from os_win.tests import test_base
from os_win.utils.compute import _clusapi_utils
@ddt.ddt
class ClusApiUtilsTestCase(test_base.OsWinBaseTestCase):
def setUp(self):
super(ClusApiUtilsTestCase, self).setUp()
self._clusapi = mock.patch.object(
_clusapi_utils, 'clusapi', create=True).start()
self._clusapi_utils = _clusapi_utils.ClusApiUtils()
self._run_patcher = mock.patch.object(self._clusapi_utils,
'_run_and_check_output')
self._mock_run = self._run_patcher.start()
def _mock_ctypes(self):
self._ctypes = mock.Mock()
# This is used in order to easily make assertions on the variables
# passed by reference.
self._ctypes.byref = lambda x: (x, "byref")
self._ctypes.c_wchar_p = lambda x: (x, 'c_wchar_p')
self._ctypes.sizeof = lambda x: (x, 'sizeof')
mock.patch.object(_clusapi_utils, 'ctypes', self._ctypes).start()
def test_run_and_check_output(self):
self._clusapi_utils._win32utils = mock.Mock()
self._clusapi_utils._run_and_check_output = (
self._run_patcher.temp_original)
mock_win32utils_run_and_check_output = (
self._clusapi_utils._win32utils.run_and_check_output)
ret_val = self._clusapi_utils._run_and_check_output(
mock.sentinel.func,
mock.sentinel.arg,
fake_kwarg=mock.sentinel.kwarg)
mock_win32utils_run_and_check_output.assert_called_once_with(
mock.sentinel.func,
mock.sentinel.arg,
fake_kwarg=mock.sentinel.kwarg,
failure_exc=exceptions.ClusterWin32Exception)
self.assertEqual(mock_win32utils_run_and_check_output.return_value,
ret_val)
def test_dword_align(self):
self.assertEqual(8, self._clusapi_utils._dword_align(5))
self.assertEqual(4, self._clusapi_utils._dword_align(4))
def test_get_clusprop_value_struct(self):
val_type = ctypes.c_ubyte * 3
expected_padding_sz = 1
clusprop_val_struct = self._clusapi_utils._get_clusprop_value_struct(
val_type)
expected_fields = [('syntax', _clusapi_utils.DWORD),
('length', _clusapi_utils.DWORD),
('value', val_type),
('_padding', ctypes.c_ubyte * expected_padding_sz)]
self.assertEqual(expected_fields, clusprop_val_struct._fields_)
def test_get_property_list_entry(self):
fake_prop_name = 'fake prop name'
fake_prop_syntax = 1
fake_prop_val = (ctypes.c_wchar * 10)()
fake_prop_val.value = 'fake prop'
entry = self._clusapi_utils.get_property_list_entry(
name=fake_prop_name,
syntax=fake_prop_syntax,
value=fake_prop_val)
self.assertEqual(_clusapi_utils.CLUSPROP_SYNTAX_NAME,
entry.name.syntax)
self.assertEqual(fake_prop_name,
entry.name.value)
self.assertEqual(
ctypes.sizeof(ctypes.c_wchar) * (len(fake_prop_name) + 1),
entry.name.length)
self.assertEqual(fake_prop_syntax,
entry.value.syntax)
self.assertEqual(bytearray(fake_prop_val),
bytearray(entry.value.value))
self.assertEqual(
ctypes.sizeof(fake_prop_val),
entry.value.length)
self.assertEqual(_clusapi_utils.CLUSPROP_SYNTAX_ENDMARK,
entry._endmark)
def test_get_property_list(self):
entry_0 = self._clusapi_utils.get_property_list_entry(
name='fake prop name',
syntax=1,
value=ctypes.c_uint(2))
entry_1 = self._clusapi_utils.get_property_list_entry(
name='fake prop name',
syntax=2,
value=ctypes.c_ubyte(5))
prop_list = self._clusapi_utils.get_property_list(
[entry_0, entry_1])
self.assertEqual(2, prop_list.count)
self.assertEqual(bytearray(entry_0) + bytearray(entry_1),
prop_list.entries_buff)
@ddt.data('fake cluster name', None)
def test_open_cluster(self, cluster_name):
self._mock_ctypes()
handle = self._clusapi_utils.open_cluster(cluster_name)
expected_handle_arg = (
self._ctypes.c_wchar_p(cluster_name)
if cluster_name else None)
self._mock_run.assert_called_once_with(
self._clusapi.OpenCluster,
expected_handle_arg,
**self._clusapi_utils._open_handle_check_flags)
self.assertEqual(self._mock_run.return_value, handle)
def test_open_cluster_group(self):
self._mock_ctypes()
handle = self._clusapi_utils.open_cluster_group(
mock.sentinel.cluster_handle,
mock.sentinel.group_name)
self._mock_run.assert_called_once_with(
self._clusapi.OpenClusterGroup,
mock.sentinel.cluster_handle,
self._ctypes.c_wchar_p(mock.sentinel.group_name),
**self._clusapi_utils._open_handle_check_flags)
self.assertEqual(self._mock_run.return_value, handle)
def test_open_cluster_node(self):
self._mock_ctypes()
handle = self._clusapi_utils.open_cluster_node(
mock.sentinel.cluster_handle,
mock.sentinel.node_name)
self._mock_run.assert_called_once_with(
self._clusapi.OpenClusterNode,
mock.sentinel.cluster_handle,
self._ctypes.c_wchar_p(mock.sentinel.node_name),
**self._clusapi_utils._open_handle_check_flags)
self.assertEqual(self._mock_run.return_value, handle)
def test_close_cluster(self):
self._clusapi_utils.close_cluster(mock.sentinel.handle)
self._clusapi.CloseCluster.assert_called_once_with(
mock.sentinel.handle)
def test_close_cluster_group(self):
self._clusapi_utils.close_cluster_group(mock.sentinel.handle)
self._clusapi.CloseClusterGroup.assert_called_once_with(
mock.sentinel.handle)
def test_close_cluster_node(self):
self._clusapi_utils.close_cluster_node(mock.sentinel.handle)
self._clusapi.CloseClusterNode.assert_called_once_with(
mock.sentinel.handle)
def test_cancel_cluster_group_operation(self):
self._clusapi_utils.cancel_cluster_group_operation(
mock.sentinel.group_handle)
self._mock_run.assert_called_once_with(
self._clusapi.CancelClusterGroupOperation,
mock.sentinel.group_handle,
0,
ignored_error_codes=[_clusapi_utils.ERROR_IO_PENDING])
@ddt.data(mock.sentinel.prop_list, None)
def test_move_cluster_group(self, prop_list):
self._mock_ctypes()
expected_prop_list_arg = (
self._ctypes.byref(prop_list) if prop_list else None)
expected_prop_list_sz = (
self._ctypes.sizeof(prop_list) if prop_list else 0)
self._clusapi_utils.move_cluster_group(
mock.sentinel.group_handle,
mock.sentinel.dest_node_handle,
mock.sentinel.move_flags,
prop_list)
self._mock_run.assert_called_once_with(
self._clusapi.MoveClusterGroupEx,
mock.sentinel.group_handle,
mock.sentinel.dest_node_handle,
mock.sentinel.move_flags,
expected_prop_list_arg,
expected_prop_list_sz,
ignored_error_codes=[_clusapi_utils.ERROR_IO_PENDING])
def test_get_cluster_group_state(self):
owner_node = 'fake owner node'
def fake_get_state(inst,
group_handle, node_name_buff, node_name_len,
error_ret_vals, error_on_nonzero_ret_val,
ret_val_is_err_code):
self.assertEqual(mock.sentinel.group_handle, group_handle)
# Those arguments would not normally get to the ClusApi
# function, instead being used by the helper invoking
# it and catching errors. For convenience, we validate
# those arguments at this point.
self.assertEqual([constants.CLUSTER_GROUP_STATE_UNKNOWN],
error_ret_vals)
self.assertFalse(error_on_nonzero_ret_val)
self.assertFalse(ret_val_is_err_code)
node_name_len_arg = ctypes.cast(
node_name_len,
ctypes.POINTER(_clusapi_utils.DWORD)).contents
self.assertEqual(self._clusapi_utils._MAX_NODE_NAME,
node_name_len_arg.value)
node_name_arg = ctypes.cast(
node_name_buff,
ctypes.POINTER(
ctypes.c_wchar *
self._clusapi_utils._MAX_NODE_NAME)).contents
node_name_arg.value = owner_node
return mock.sentinel.group_state
self._mock_run.side_effect = fake_get_state
state_info = self._clusapi_utils.get_cluster_group_state(
mock.sentinel.group_handle)
expected_state_info = dict(state=mock.sentinel.group_state,
owner_node=owner_node)
self.assertDictEqual(expected_state_info, state_info)

View File

@ -13,13 +13,17 @@
# License for the specific language governing permissions and limitations
# under the License.
import ddt
import mock
from os_win import constants
from os_win import exceptions
from os_win.tests import test_base
from os_win.utils.compute import _clusapi_utils
from os_win.utils.compute import clusterutils
@ddt.ddt
class ClusterUtilsTestCase(test_base.OsWinBaseTestCase):
"""Unit tests for the Hyper-V ClusterUtilsBase class."""
@ -34,6 +38,8 @@ class ClusterUtilsTestCase(test_base.OsWinBaseTestCase):
self._clusterutils = clusterutils.ClusterUtils()
self._clusterutils._conn_cluster = mock.MagicMock()
self._clusterutils._cluster = mock.MagicMock()
self._clusterutils._clusapi_utils = mock.Mock()
self._clusapi = self._clusterutils._clusapi_utils
def test_init_hyperv_conn(self):
fake_cluster_name = "fake_cluster"
@ -277,24 +283,138 @@ class ClusterUtilsTestCase(test_base.OsWinBaseTestCase):
@mock.patch.object(clusterutils.ClusterUtils, '_migrate_vm')
def test_live_migrate_vm(self, mock_migrate_vm):
self._clusterutils.live_migrate_vm(self._FAKE_VM_NAME,
self._FAKE_HOST)
self._FAKE_HOST,
mock.sentinel.timeout)
exp_valid_transition_states = [constants.CLUSTER_GROUP_PENDING]
mock_migrate_vm.assert_called_once_with(
self._FAKE_VM_NAME, self._FAKE_HOST,
self._clusterutils._LIVE_MIGRATION_TYPE,
constants.CLUSTER_GROUP_ONLINE,
exp_valid_transition_states,
mock.sentinel.timeout)
@mock.patch.object(_clusapi_utils, 'DWORD')
@mock.patch.object(clusterutils.ClusterUtils,
'_wait_for_cluster_group_state')
@ddt.data(None, exceptions.ClusterException)
def test_migrate_vm(self, raised_exc, mock_wait_group, mock_dword):
mock_wait_group.side_effect = raised_exc
migrate_args = (self._FAKE_VM_NAME,
self._FAKE_HOST,
self._clusterutils._LIVE_MIGRATION_TYPE,
constants.CLUSTER_GROUP_ONLINE,
mock.sentinel.valid_transition_states,
mock.sentinel.timeout)
if raised_exc:
self.assertRaises(raised_exc,
self._clusterutils._migrate_vm,
*migrate_args)
else:
self._clusterutils._migrate_vm(*migrate_args)
mock_dword.assert_called_once_with(
self._clusterutils._LIVE_MIGRATION_TYPE)
@mock.patch.object(clusterutils.ClusterUtils, '_lookup_vm_group_check')
def test_migrate_vm(self, mock_lookup_vm_group_check):
vm_group = mock.MagicMock()
mock_lookup_vm_group_check.return_value = vm_group
self._clusapi.get_property_list_entry.assert_has_calls(
[mock.call(prop_name,
_clusapi_utils.CLUSPROP_SYNTAX_LIST_VALUE_DWORD,
mock_dword.return_value)
for prop_name in (_clusapi_utils.CLUSPROP_NAME_VM,
_clusapi_utils.CLUSPROP_NAME_VM_CONFIG)])
self._clusterutils._migrate_vm(
self._FAKE_VM_NAME, self._FAKE_HOST,
self._clusterutils._LIVE_MIGRATION_TYPE)
expected_prop_entries = [
self._clusapi.get_property_list_entry.return_value] * 2
self._clusapi.get_property_list.assert_called_once_with(
expected_prop_entries)
vm_group.MoveToNewNodeParams.assert_called_once_with(
self._clusterutils._IGNORE_LOCKED,
expected_migrate_flags = (
_clusapi_utils.CLUSAPI_GROUP_MOVE_RETURN_TO_SOURCE_NODE_ON_ERROR |
_clusapi_utils.CLUSAPI_GROUP_MOVE_QUEUE_ENABLED |
_clusapi_utils.CLUSAPI_GROUP_MOVE_HIGH_PRIORITY_START)
exp_clus_h = self._clusapi.open_cluster.return_value
exp_clus_node_h = self._clusapi.open_cluster_node.return_value
exp_clus_group_h = self._clusapi.open_cluster_group.return_value
self._clusapi.open_cluster.assert_called_once_with()
self._clusapi.open_cluster_group.assert_called_once_with(
exp_clus_h, self._FAKE_VM_NAME)
self._clusapi.open_cluster_node.assert_called_once_with(
exp_clus_h, self._FAKE_HOST)
self._clusapi.move_cluster_group.assert_called_once_with(
exp_clus_group_h, exp_clus_node_h, expected_migrate_flags,
self._clusapi.get_property_list.return_value)
mock_wait_group.assert_called_once_with(
self._FAKE_VM_NAME, exp_clus_group_h,
constants.CLUSTER_GROUP_ONLINE,
self._FAKE_HOST,
[self._clusterutils._LIVE_MIGRATION_TYPE])
mock.sentinel.valid_transition_states,
mock.sentinel.timeout)
self._clusapi.close_cluster_group.assert_called_once_with(
exp_clus_group_h)
self._clusapi.close_cluster_node.assert_called_once_with(
exp_clus_node_h)
self._clusapi.close_cluster.assert_called_once_with(exp_clus_h)
@mock.patch.object(clusterutils._utils, 'time')
def test_wait_for_clus_group_state_failed(self, mock_time):
desired_host = self._FAKE_HOST
desired_state = constants.CLUSTER_GROUP_ONLINE
valid_transition_states = [constants.CLUSTER_GROUP_PENDING]
group_states = [dict(owner_node='other node',
state=desired_state),
dict(owner_node=desired_host,
state=constants.CLUSTER_GROUP_PENDING),
dict(owner_node=desired_host,
state=constants.CLUSTER_GROUP_FAILED)]
self._clusapi.get_cluster_group_state.side_effect = group_states
# We don't want a timeout to be raised. We expect the tested
# function to force breaking the retry loop when the cluster
# group gets into a 'failed' state.
#
# As a precaution measure, we're still forcing a timeout at
# some point, to avoid an infinite loop if something goes wrong.
mock_time.time.side_effect = [0] * 10 + [100]
self.assertRaises(exceptions.InvalidClusterGroupState,
self._clusterutils._wait_for_cluster_group_state,
mock.sentinel.group_name,
mock.sentinel.group_handle,
desired_state,
desired_host,
valid_transition_states,
timeout=10)
self._clusapi.get_cluster_group_state.assert_has_calls(
[mock.call(mock.sentinel.group_handle)] * 3)
@mock.patch.object(clusterutils._utils, 'time')
def test_wait_for_clus_group_state_success(self, mock_time):
desired_host = self._FAKE_HOST
desired_state = constants.CLUSTER_GROUP_ONLINE
group_state = dict(owner_node=desired_host.upper(),
state=desired_state)
self._clusapi.get_cluster_group_state.return_value = group_state
self._clusterutils._wait_for_cluster_group_state(
mock.sentinel.group_name,
mock.sentinel.group_handle,
desired_state,
desired_host,
[],
timeout=10)
self._clusapi.get_cluster_group_state.assert_called_once_with(
mock.sentinel.group_handle)
@mock.patch.object(clusterutils, 'tpool')
@mock.patch.object(clusterutils, 'patcher')

View File

@ -0,0 +1,197 @@
# Copyright 2016 Cloudbase Solutions Srl
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import ctypes
import sys
if sys.platform == 'win32':
clusapi = ctypes.windll.clusapi
from os_win import constants
from os_win import exceptions
from os_win.utils import win32utils
DWORD = ctypes.c_ulong
CLUSPROP_SYNTAX_NAME = 262147
CLUSPROP_SYNTAX_ENDMARK = 0
CLUSPROP_SYNTAX_LIST_VALUE_DWORD = 65538
CLUSAPI_GROUP_MOVE_RETURN_TO_SOURCE_NODE_ON_ERROR = 2
CLUSAPI_GROUP_MOVE_QUEUE_ENABLED = 4
CLUSAPI_GROUP_MOVE_HIGH_PRIORITY_START = 8
ERROR_IO_PENDING = 997
CLUSPROP_NAME_VM = 'Virtual Machine'
CLUSPROP_NAME_VM_CONFIG = 'Virtual Machine Configuration'
class ClusApiUtils(object):
_MAX_NODE_NAME = 255
_open_handle_check_flags = dict(ret_val_is_err_code=False,
error_on_nonzero_ret_val=False,
error_ret_vals=[0, None])
def __init__(self):
self._win32utils = win32utils.Win32Utils()
def _run_and_check_output(self, *args, **kwargs):
kwargs['failure_exc'] = exceptions.ClusterWin32Exception
return self._win32utils.run_and_check_output(*args, **kwargs)
def _dword_align(self, value):
return (value + 3) & ~3
def _get_clusprop_value_struct(self, val_type):
def _get_padding():
# The cluster property entries must be 4B aligned.
val_sz = ctypes.sizeof(val_type)
return self._dword_align(val_sz) - val_sz
# For convenience, as opposed to the homonymous ClusAPI
# structure, we add the actual value as well.
class CLUSPROP_VALUE(ctypes.Structure):
_fields_ = [('syntax', DWORD),
('length', DWORD),
('value', val_type),
('_padding', ctypes.c_ubyte * _get_padding())]
return CLUSPROP_VALUE
def get_property_list_entry(self, name, syntax, value):
# The value argument must have a ctypes type.
name_len = len(name) + 1
val_sz = ctypes.sizeof(value)
class CLUSPROP_LIST_ENTRY(ctypes.Structure):
_fields_ = [
('name', self._get_clusprop_value_struct(
val_type=ctypes.c_wchar * name_len)),
('value', self._get_clusprop_value_struct(
val_type=ctypes.c_ubyte * val_sz)),
('_endmark', DWORD)
]
entry = CLUSPROP_LIST_ENTRY()
entry.name.syntax = CLUSPROP_SYNTAX_NAME
entry.name.length = name_len * ctypes.sizeof(ctypes.c_wchar)
entry.name.value = name
entry.value.syntax = syntax
entry.value.length = val_sz
entry.value.value[0:val_sz] = bytearray(value)
entry._endmark = CLUSPROP_SYNTAX_ENDMARK
return entry
def get_property_list(self, property_entries):
prop_entries_sz = sum([ctypes.sizeof(entry)
for entry in property_entries])
class CLUSPROP_LIST(ctypes.Structure):
_fields_ = [('count', DWORD),
('entries_buff', ctypes.c_ubyte * prop_entries_sz)]
prop_list = CLUSPROP_LIST(count=len(property_entries))
pos = 0
for prop_entry in property_entries:
prop_entry_sz = ctypes.sizeof(prop_entry)
prop_list.entries_buff[pos:prop_entry_sz + pos] = bytearray(
prop_entry)
pos += prop_entry_sz
return prop_list
def open_cluster(self, cluster_name=None):
"""Returns a handle for the requested cluster.
:param cluster_name: (Optional) specifies the name of the cluster
to be opened. If None, the cluster that the
local node belongs to will be opened.
"""
p_clus_name = ctypes.c_wchar_p(cluster_name) if cluster_name else None
handle = self._run_and_check_output(clusapi.OpenCluster,
p_clus_name,
**self._open_handle_check_flags)
return handle
def open_cluster_group(self, cluster_handle, group_name):
handle = self._run_and_check_output(clusapi.OpenClusterGroup,
cluster_handle,
ctypes.c_wchar_p(group_name),
**self._open_handle_check_flags)
return handle
def open_cluster_node(self, cluster_handle, node_name):
handle = self._run_and_check_output(clusapi.OpenClusterNode,
cluster_handle,
ctypes.c_wchar_p(node_name),
**self._open_handle_check_flags)
return handle
def close_cluster(self, cluster_handle):
# This function will always return 'True'. Closing the cluster
# handle will also invalidate handles opened using it.
clusapi.CloseCluster(cluster_handle)
def close_cluster_group(self, group_handle):
# TODO(lpetrut): The following functions can fail, in which case
# 'False' will be returned. We may want to handle this situation.
clusapi.CloseClusterGroup(group_handle)
def close_cluster_node(self, node_handle):
clusapi.CloseClusterNode(node_handle)
def cancel_cluster_group_operation(self, group_handle):
"""Requests a pending move operation to be canceled."""
# This only applies to move operations requested by
# MoveClusterGroup(Ex), thus it will not apply to fail overs.
self._run_and_check_output(
clusapi.CancelClusterGroupOperation,
group_handle,
0, # cancel flags (reserved for future use by MS)
ignored_error_codes=[ERROR_IO_PENDING])
def move_cluster_group(self, group_handle, destination_node_handle,
move_flags, property_list):
prop_list_p = ctypes.byref(property_list) if property_list else None
prop_list_sz = ctypes.sizeof(property_list) if property_list else 0
self._run_and_check_output(clusapi.MoveClusterGroupEx,
group_handle,
destination_node_handle,
move_flags,
prop_list_p,
prop_list_sz,
ignored_error_codes=[ERROR_IO_PENDING])
def get_cluster_group_state(self, group_handle):
node_name_len = DWORD(self._MAX_NODE_NAME)
node_name_buff = (ctypes.c_wchar * node_name_len.value)()
group_state = self._run_and_check_output(
clusapi.GetClusterGroupState,
group_handle,
ctypes.byref(node_name_buff),
ctypes.byref(node_name_len),
error_ret_vals=[constants.CLUSTER_GROUP_STATE_UNKNOWN],
error_on_nonzero_ret_val=False,
ret_val_is_err_code=False)
return {'state': group_state,
'owner_node': node_name_buff.value}

View File

@ -25,8 +25,11 @@ from eventlet import tpool
from oslo_log import log as logging
from os_win._i18n import _, _LE
from os_win import _utils
from os_win import constants
from os_win import exceptions
from os_win.utils import baseutils
from os_win.utils.compute import _clusapi_utils
LOG = logging.getLogger(__name__)
@ -55,6 +58,7 @@ class ClusterUtils(baseutils.BaseUtils):
def __init__(self, host='.'):
self._instance_name_regex = re.compile('Virtual Machine (.*)')
self._clusapi_utils = _clusapi_utils.ClusApiUtils()
if sys.platform == 'win32':
self._init_hyperv_conn(host)
@ -178,20 +182,90 @@ class ClusterUtils(baseutils.BaseUtils):
def vm_exists(self, vm_name):
return self._lookup_vm(vm_name) is not None
def live_migrate_vm(self, vm_name, new_host):
self._migrate_vm(vm_name, new_host, self._LIVE_MIGRATION_TYPE)
def live_migrate_vm(self, vm_name, new_host, timeout=None):
valid_transition_states = [constants.CLUSTER_GROUP_PENDING]
self._migrate_vm(vm_name, new_host, self._LIVE_MIGRATION_TYPE,
constants.CLUSTER_GROUP_ONLINE,
valid_transition_states,
timeout)
def _migrate_vm(self, vm_name, new_host, migration_type,
exp_state_after_migr, valid_transition_states,
timeout):
syntax = _clusapi_utils.CLUSPROP_SYNTAX_LIST_VALUE_DWORD
migr_type = _clusapi_utils.DWORD(migration_type)
prop_entries = [
self._clusapi_utils.get_property_list_entry(
_clusapi_utils.CLUSPROP_NAME_VM, syntax, migr_type),
self._clusapi_utils.get_property_list_entry(
_clusapi_utils.CLUSPROP_NAME_VM_CONFIG, syntax, migr_type)
]
prop_list = self._clusapi_utils.get_property_list(prop_entries)
flags = (
_clusapi_utils.CLUSAPI_GROUP_MOVE_RETURN_TO_SOURCE_NODE_ON_ERROR |
_clusapi_utils.CLUSAPI_GROUP_MOVE_QUEUE_ENABLED |
_clusapi_utils.CLUSAPI_GROUP_MOVE_HIGH_PRIORITY_START)
cluster_handle = None
group_handle = None
dest_node_handle = None
def _migrate_vm(self, vm_name, new_host, migration_type):
vm_group = self._lookup_vm_group_check(vm_name)
try:
vm_group.MoveToNewNodeParams(self._IGNORE_LOCKED, new_host,
[migration_type])
except Exception as e:
LOG.error(_LE('Exception during cluster live migration of '
'%(vm_name)s to %(host)s: %(exception)s'),
{'vm_name': vm_name,
'host': new_host,
'exception': e})
cluster_handle = self._clusapi_utils.open_cluster()
group_handle = self._clusapi_utils.open_cluster_group(
cluster_handle, vm_name)
dest_node_handle = self._clusapi_utils.open_cluster_node(
cluster_handle, new_host)
self._clusapi_utils.move_cluster_group(group_handle,
dest_node_handle,
flags,
prop_list)
self._wait_for_cluster_group_state(vm_name,
group_handle,
exp_state_after_migr,
new_host,
valid_transition_states,
timeout)
finally:
if group_handle:
self._clusapi_utils.close_cluster_group(group_handle)
if dest_node_handle:
self._clusapi_utils.close_cluster_node(dest_node_handle)
if cluster_handle:
self._clusapi_utils.close_cluster(cluster_handle)
def _wait_for_cluster_group_state(self, group_name, group_handle,
desired_state, desired_node,
valid_transition_states, timeout):
@_utils.retry_decorator(max_retry_count=None,
timeout=timeout,
exceptions=exceptions.InvalidClusterGroupState,
pass_retry_context=True)
def _ensure_group_state(retry_context):
state_info = self._clusapi_utils.get_cluster_group_state(
group_handle)
owner_node = state_info['owner_node']
group_state = state_info['state']
reached_desired_state = desired_state == group_state
reached_desired_node = desired_node.lower() == owner_node.lower()
if not (reached_desired_state and reached_desired_node):
valid_states = [desired_state] + valid_transition_states
valid_state = group_state in valid_states
retry_context['prevent_retry'] = not valid_state
raise exceptions.InvalidClusterGroupState(
group_name=group_name,
expected_state=desired_state,
expected_node=desired_node,
group_state=group_state,
owner_node=owner_node)
_ensure_group_state()
def monitor_vm_failover(self, callback):
"""Creates a monitor to check for new WMI MSCluster_Resource