Use MSFT_NetIPAddress in static network config

Change set_static_network_config in order to work with recent
Windows versions as well.

Partially-Implements: blueprint json-network-config
Change-Id: Ie916d658265ca2895afcdebeaa5ae82d48b90082
This commit is contained in:
Alessandro Pilotti 2018-08-26 01:13:25 +03:00
parent 518081b45b
commit ac667f7f50
5 changed files with 239 additions and 182 deletions

View File

@ -84,8 +84,8 @@ class BaseOSUtils(object):
def rename_network_adapter(self, old_name, new_name):
raise NotImplementedError()
def set_static_network_config(self, mac_address, address, netmask,
broadcast, gateway, dnsnameservers):
def set_static_network_config(self, name, address, prefix_len_or_netmask,
gateway, dnsnameservers):
raise NotImplementedError()
def create_network_team(self, team_name, mode, load_balancing_algorithm,

View File

@ -21,6 +21,7 @@ import struct
import subprocess
import time
import netaddr
from oslo_log import log as oslo_logging
import pywintypes
import six
@ -47,6 +48,8 @@ from cloudbaseinit.utils.windows import wmi_loader
wmi = wmi_loader.wmi()
LOG = oslo_logging.getLogger(__name__)
AF_INET = 2
AF_INET6 = 23
UNICAST = 1
MANUAL = 1
@ -781,24 +784,30 @@ class WindowsUtils(base.BaseOSUtils):
'Renaming interface "%(old_name)s" to "%(new_name)s" '
'failed' % {'old_name': old_name, 'new_name': new_name})
def set_static_network_config(self, mac_address, address, netmask,
broadcast, gateway, dnsnameservers):
@staticmethod
def _get_network_adapter(name):
conn = wmi.WMI(moniker='//./root/cimv2')
query = conn.query("SELECT * FROM Win32_NetworkAdapter WHERE "
"MACAddress = '{}'".format(mac_address))
query = conn.Win32_NetworkAdapter(NetConnectionID=name)
if not len(query):
raise exception.CloudbaseInitException(
"Network adapter not found")
"Network adapter not found: %s" % name)
return query[0]
adapter_config = query[0].associators(
@staticmethod
def _set_static_network_config_legacy(name, address, netmask, gateway,
dnsnameservers):
if netaddr.valid_ipv6(address):
LOG.warning("Setting IPv6 info not available on this system")
return
adapter_config = WindowsUtils._get_network_adapter(name).associators(
wmi_result_class='Win32_NetworkAdapterConfiguration')[0]
LOG.debug("Setting static IP address")
(ret_val,) = adapter_config.EnableStatic([address], [netmask])
if ret_val > 1:
raise exception.CloudbaseInitException(
"Cannot set static IP address on network adapter (%d)",
"Cannot set static IP address on network adapter: %d" %
ret_val)
reboot_required = (ret_val == 1)
@ -807,8 +816,7 @@ class WindowsUtils(base.BaseOSUtils):
(ret_val,) = adapter_config.SetGateways([gateway], [1])
if ret_val > 1:
raise exception.CloudbaseInitException(
"Cannot set gateway on network adapter (%d)",
ret_val)
"Cannot set gateway on network adapter: %d" % ret_val)
reboot_required = reboot_required or ret_val == 1
if dnsnameservers:
@ -816,62 +824,101 @@ class WindowsUtils(base.BaseOSUtils):
(ret_val,) = adapter_config.SetDNSServerSearchOrder(dnsnameservers)
if ret_val > 1:
raise exception.CloudbaseInitException(
"Cannot set DNS on network adapter (%d)",
ret_val)
"Cannot set DNS on network adapter: %d" % ret_val)
reboot_required = reboot_required or ret_val == 1
return reboot_required
def set_static_network_config_v6(self, mac_address, address6,
netmask6, gateway6):
"""Set IPv6 info for a network card."""
@staticmethod
def _fix_network_adapter_dhcp(interface_name, enable_dhcp, address_family):
interface_id = WindowsUtils._get_network_adapter(interface_name).GUID
tcpip_key = "Tcpip6" if address_family == AF_INET6 else "Tcpip"
# Get local properties by MAC identification.
adapters = network.get_adapter_addresses()
for adapter in adapters:
if mac_address == adapter["mac_address"]:
ifname = adapter["friendly_name"]
ifindex = adapter["interface_index"]
break
with winreg.OpenKey(
winreg.HKEY_LOCAL_MACHINE,
"SYSTEM\\CurrentControlSet\\services\\%(tcpip_key)s\\"
"Parameters\\Interfaces\\%(interface_id)s" %
{"tcpip_key": tcpip_key, "interface_id": interface_id},
0, winreg.KEY_SET_VALUE) as key:
winreg.SetValueEx(
key, 'EnableDHCP', 0, winreg.REG_DWORD,
1 if enable_dhcp else 0)
@staticmethod
def _set_interface_dns(interface_name, dnsnameservers):
# Import here to avoid loading errors on Windows versions where MI is
# not available
import mi
conn = wmi.WMI(moniker='//./root/standardcimv2')
# Requires Windows >= 6.2
dns_client = conn.MSFT_DnsClientServerAddress(
InterfaceAlias=interface_name)
if not len(dns_client):
raise exception.ItemNotFoundException(
'Network interface with name "%s" not found' %
interface_name)
dns_client = dns_client[0]
custom_options = [{
u'name': u'ServerAddresses',
u'value_type': mi.MI_ARRAY | mi.MI_STRING,
u'value': dnsnameservers
}]
operation_options = {u'custom_options': custom_options}
dns_client.put(operation_options=operation_options)
@staticmethod
def _set_static_network_config(name, address, prefix_len, gateway):
if netaddr.valid_ipv6(address):
family = AF_INET6
else:
raise exception.CloudbaseInitException(
"Adapter with MAC {!r} not available".format(mac_address))
family = AF_INET
# TODO(cpoieana): Extend support for other platforms.
# Currently windows8 @ ws2012 or above.
if not self.check_os_version(6, 2):
LOG.warning("Setting IPv6 info not available "
"on this system")
return
conn = wmi.WMI(moniker='//./root/StandardCimv2')
query = conn.query("SELECT * FROM MSFT_NetIPAddress "
"WHERE InterfaceAlias = '{}'".format(ifname))
netip = query[0]
# This is needed to avoid the error:
# "Inconsistent parameters PolicyStore PersistentStore and
# Dhcp Enabled"
WindowsUtils._fix_network_adapter_dhcp(name, False, family)
params = {
"InterfaceIndex": ifindex,
"InterfaceAlias": ifname,
"IPAddress": address6,
"AddressFamily": AF_INET6,
"PrefixLength": netmask6,
# Manual set type.
"Type": UNICAST,
"PrefixOrigin": MANUAL,
"SuffixOrigin": MANUAL,
"AddressState": PREFERRED_ADDR,
# No expiry.
"ValidLifetime": None,
"PreferredLifetime": None,
"SkipAsSource": False,
"DefaultGateway": gateway6,
"PolicyStore": None,
"PassThru": False,
}
LOG.debug("Setting IPv6 info for %s", ifname)
try:
netip.Create(**params)
except wmi.x_wmi as exc:
raise exception.CloudbaseInitException(exc.com_error)
conn = wmi.WMI(moniker='//./root/standardcimv2')
existing_addresses = conn.MSFT_NetIPAddress(
AddressFamily=family, InterfaceAlias=name)
for existing_address in existing_addresses:
LOG.debug(
"Removing existing IP address \"%(ip)s\" "
"from adapter \"%(name)s\"",
{"ip": existing_address.IPAddress, "name": name})
existing_address.Delete_()
existing_routes = conn.MSFT_NetRoute(
AddressFamily=family, InterfaceAlias=name)
for existing_route in existing_routes:
LOG.debug(
"Removing existing route \"%(route)s\" "
"from adapter \"%(name)s\"",
{"route": existing_route.DestinationPrefix, "name": name})
existing_route.Delete_()
conn.MSFT_NetIPAddress.create(
AddressFamily=family, InterfaceAlias=name, IPAddress=address,
PrefixLength=prefix_len, DefaultGateway=gateway)
def set_static_network_config(self, name, address, prefix_len_or_netmask,
gateway, dnsnameservers):
ip_network = netaddr.IPNetwork(
u"%s/%s" % (address, prefix_len_or_netmask))
prefix_len = ip_network.prefixlen
netmask = str(ip_network.netmask)
if self.check_os_version(6, 2):
self._set_static_network_config(
name, address, prefix_len, gateway)
if len(dnsnameservers):
self._set_interface_dns(name, dnsnameservers)
else:
return self._set_static_network_config_legacy(
name, address, netmask, gateway, dnsnameservers)
def _get_network_team_manager(self):
if self._network_team_manager:

View File

@ -145,24 +145,27 @@ class NetworkConfigPlugin(plugin_base.BasePlugin):
if not nic:
LOG.warn("Missing details for adapter %s", mac)
continue
LOG.info("Configuring network adapter %s", mac)
name = osutils.get_network_adapter_name_by_mac_address(mac)
LOG.info("Configuring network adapter: %s", name)
reboot = osutils.set_static_network_config(
mac,
name,
nic.address,
nic.netmask,
nic.broadcast,
nic.gateway,
nic.dnsnameservers
)
reboot_required = reboot or reboot_required
# Set v6 info too if available.
if nic.address6 and nic.netmask6:
osutils.set_static_network_config_v6(
mac,
reboot = osutils.set_static_network_config(
name,
nic.address6,
nic.netmask6,
nic.gateway6
nic.gateway6,
[]
)
reboot_required = reboot or reboot_required
configured = True
for mac in macnics:
LOG.warn("Details not used for adapter %s", mac)

View File

@ -18,6 +18,8 @@ import functools
import importlib
import os
import netaddr
try:
import unittest.mock as mock
except ImportError:
@ -588,18 +590,96 @@ class TestWindowsUtils(testutils.CloudbaseInitTestBase):
def test_get_network_adapters_xp_2003(self):
self._test_get_network_adapters(True)
def _test_set_static_network_config(self, adapter=True, static_val=(0,),
gateway_val=(0,), dns_val=(0,)):
conn = self._wmi_mock.WMI
mac_address = '54:EE:75:19:F4:61'
@mock.patch('cloudbaseinit.osutils.windows.WindowsUtils'
'.check_os_version')
def _test_set_static_network_config(self, mock_check_os_version,
adapter=True, static_val=(0,),
gateway_val=(0,), dns_val=(0,),
legacy=False, ipv6=False):
mock_check_os_version.return_value = not legacy
if legacy:
self._test_set_static_network_config_legacy(
adapter, static_val, gateway_val, dns_val)
else:
self._test_set_static_network_config_new(ipv6=ipv6)
@mock.patch('cloudbaseinit.osutils.windows.WindowsUtils'
'._fix_network_adapter_dhcp')
def _test_set_static_network_config_new(self,
mock_fix_network_adapter_dhcp,
ipv6):
conn = self._wmi_mock.WMI.return_value
if ipv6:
mock.sentinel.address = "2001:db8::3"
mock.sentinel.prefix_len_or_netmask = 64
else:
mock.sentinel.address = "10.10.10.10"
mock.sentinel.prefix_len_or_netmask = "255.255.255.0"
adapter = mock.Mock()
adapter.GUID = mock.sentinel.adapter_guid
conn.Win32_NetworkAdapter.return_value = [adapter]
if netaddr.valid_ipv6(mock.sentinel.address):
family = self.windows_utils.AF_INET6
else:
family = self.windows_utils.AF_INET
existing_adapter = mock.Mock()
existing_adapter.IPAddress = mock.sentinel.address
conn.MSFT_NetIPAddress.return_value = [existing_adapter]
existing_route = mock.Mock()
existing_route.DestinationPrefix = "0.0.0.0"
conn.MSFT_NetRoute.return_value = [existing_route]
dns_client = mock.Mock()
conn.MSFT_DnsClientServerAddress.return_value = [dns_client]
self._winutils.set_static_network_config(
mock.sentinel.nick_name, mock.sentinel.address,
mock.sentinel.prefix_len_or_netmask, mock.sentinel.gateway,
[mock.sentinel.dns])
mock_fix_network_adapter_dhcp.assert_called_once_with(
mock.sentinel.nick_name, False, family)
conn.MSFT_NetIPAddress.assert_called_once_with(
AddressFamily=family, InterfaceAlias=mock.sentinel.nick_name)
existing_adapter.Delete_.assert_called_once_with()
conn.MSFT_NetRoute.assert_called_once_with(
AddressFamily=family, InterfaceAlias=mock.sentinel.nick_name)
existing_route.Delete_.assert_called_once_with()
ip_network = netaddr.IPNetwork(
u"%s/%s" % (
mock.sentinel.address, mock.sentinel.prefix_len_or_netmask))
prefix_len = ip_network.prefixlen
conn.MSFT_NetIPAddress.create.assert_called_once_with(
AddressFamily=family, InterfaceAlias=mock.sentinel.nick_name,
IPAddress=mock.sentinel.address, PrefixLength=prefix_len,
DefaultGateway=mock.sentinel.gateway)
custom_options = [{
u'name': u'ServerAddresses',
u'value_type': self._mi_mock.MI_ARRAY | self._mi_mock.MI_STRING,
u'value': [mock.sentinel.dns]
}]
operation_options = {u'custom_options': custom_options}
dns_client.put.assert_called_once_with(
operation_options=operation_options)
def _test_set_static_network_config_legacy(self, adapter, static_val,
gateway_val, dns_val):
conn = self._wmi_mock.WMI.return_value
nic_name = 'fake NIC'
address = '10.10.10.10'
broadcast = '0.0.0.0'
dns_list = ['8.8.8.8']
set_static_call = functools.partial(
self._winutils.set_static_network_config,
mac_address, address, self._NETMASK,
broadcast, self._GATEWAY, dns_list
)
nic_name, address, self._NETMASK, self._GATEWAY, dns_list)
if adapter:
adapter = mock.MagicMock()
@ -617,8 +697,8 @@ class TestWindowsUtils(testutils.CloudbaseInitTestBase):
if ret_val in (0, 1):
expected_log.append(msg)
conn.return_value.query.return_value = adapter
adapter_config = adapter[0].associators.return_value[0]
conn.Win32_NetworkAdapter.return_value = [adapter]
adapter_config = adapter.associators.return_value[0]
adapter_config.EnableStatic.return_value = static_val
adapter_config.SetGateways.return_value = gateway_val
adapter_config.SetDNSServerSearchOrder.return_value = dns_val
@ -636,10 +716,9 @@ class TestWindowsUtils(testutils.CloudbaseInitTestBase):
self.assertFalse(response)
self.assertEqual(expected_log, self.snatcher.output)
select = ("SELECT * FROM Win32_NetworkAdapter WHERE "
"MACAddress = '{}'".format(mac_address))
conn.return_value.query.assert_called_once_with(select)
adapter[0].associators.assert_called_with(
conn.Win32_NetworkAdapter.assert_called_once_with(
NetConnectionID=nic_name)
adapter.associators.assert_called_with(
wmi_result_class='Win32_NetworkAdapterConfiguration')
adapter_config.EnableStatic.assert_called_with(
[address], [self._NETMASK])
@ -648,123 +727,52 @@ class TestWindowsUtils(testutils.CloudbaseInitTestBase):
adapter_config.SetDNSServerSearchOrder.assert_called_with(
dns_list)
@mock.patch('cloudbaseinit.osutils.windows.WindowsUtils'
'.check_os_version')
@mock.patch("cloudbaseinit.utils.windows.network"
".get_adapter_addresses")
def _test_set_static_network_config_v6(self, mock_get_adapter_addresses,
mock_check_os_version,
v6adapters=True, v6error=False):
friendly_name = "Ethernet0"
interface_index = "4"
mac_address = '54:EE:75:19:F4:61'
address6 = "2001:db8::3"
netmask6 = "64"
gateway6 = "2001:db8::1"
conn = self._wmi_mock.WMI
netip = conn.return_value.query.return_value[0]
if v6error:
netip.Create.side_effect = WMIError
adapter_addresses = []
if v6adapters:
adapter_addresses = [
{
"mac_address": mac_address,
"friendly_name": friendly_name,
"interface_index": interface_index
}
]
mock_get_adapter_addresses.return_value = adapter_addresses
mock_check_os_version.return_value = True
set_static_call = functools.partial(
self._winutils.set_static_network_config_v6,
mac_address, address6, netmask6, gateway6)
expected_log = []
if not mock_check_os_version.return_value:
expected_log.append("Setting IPv6 info not available "
"on this system")
if not v6adapters or v6error:
self.assertRaises(
exception.CloudbaseInitException,
set_static_call)
else:
expected_log.append("Setting IPv6 info for %s" % friendly_name)
with self.snatcher:
set_static_call()
mock_get_adapter_addresses.assert_called_once_with()
select = ("SELECT * FROM MSFT_NetIPAddress "
"WHERE InterfaceAlias = '{}'".format(friendly_name))
conn.return_value.query.assert_called_once_with(select)
params = {
"InterfaceIndex": interface_index,
"InterfaceAlias": friendly_name,
"IPAddress": address6,
"AddressFamily": self.windows_utils.AF_INET6,
"PrefixLength": netmask6,
# Manual set type.
"Type": self.windows_utils.UNICAST,
"PrefixOrigin": self.windows_utils.MANUAL,
"SuffixOrigin": self.windows_utils.MANUAL,
"AddressState": self.windows_utils.PREFERRED_ADDR,
# No expiry.
"ValidLifetime": None,
"PreferredLifetime": None,
"SkipAsSource": False,
"DefaultGateway": gateway6,
"PolicyStore": None,
"PassThru": False,
}
netip.Create.assert_called_once_with(**params)
self.assertEqual(expected_log, self.snatcher.output)
def test_set_static_network_config(self):
def test_set_static_network_config_legacy(self):
ret_val1 = (1,)
ret_val2 = (1,)
ret_val3 = (0,)
self._test_set_static_network_config(static_val=ret_val1,
gateway_val=ret_val2,
dns_val=ret_val3)
dns_val=ret_val3,
legacy=True)
def test_set_static_network_config_query_fail(self):
self._test_set_static_network_config(adapter=False)
def test_set_static_network_config_legacy_query_fail(self):
self._test_set_static_network_config(adapter=False, legacy=True)
def test_set_static_network_config_cannot_set_ip(self):
def test_set_static_network_config_legacy_cannot_set_ip(self):
ret_val1 = (2,)
self._test_set_static_network_config(static_val=ret_val1)
self._test_set_static_network_config(static_val=ret_val1, legacy=True)
def test_set_static_network_config_cannot_set_gateway(self):
def test_set_static_network_config_legacy_cannot_set_gateway(self):
ret_val1 = (1,)
ret_val2 = (2,)
self._test_set_static_network_config(static_val=ret_val1,
gateway_val=ret_val2)
gateway_val=ret_val2,
legacy=True)
def test_set_static_network_config_cannot_set_DNS(self):
def test_set_static_network_config_legacy_cannot_set_DNS(self):
ret_val1 = (1,)
ret_val2 = (1,)
ret_val3 = (2,)
self._test_set_static_network_config(static_val=ret_val1,
gateway_val=ret_val2,
dns_val=ret_val3)
dns_val=ret_val3,
legacy=True)
def test_set_static_network_config_no_reboot(self):
def test_set_static_network_config_legacy_no_reboot(self):
ret_val1 = (0,)
ret_val2 = (0,)
ret_val3 = (0,)
self._test_set_static_network_config(static_val=ret_val1,
gateway_val=ret_val2,
dns_val=ret_val3)
dns_val=ret_val3,
legacy=True)
def test_set_static_network_config_v6(self):
self._test_set_static_network_config_v6()
def test_set_static_network_config_ipv4(self):
self._test_set_static_network_config(ipv6=False)
def test_set_static_network_config_v6_no_adapters(self):
self._test_set_static_network_config_v6(v6adapters=False)
def test_set_static_network_config_v6_error(self):
self._test_set_static_network_config_v6(v6error=True)
def test_set_static_network_config_ipv6(self):
self._test_set_static_network_config(ipv6=True)
@mock.patch('cloudbaseinit.osutils.windows.WindowsUtils'
'.execute_process')

View File

@ -48,6 +48,10 @@ class TestNetworkConfigPlugin(unittest.TestCase):
mock_get_os_utils.return_value = mock_osutils
mock_osutils.get_network_adapters.return_value = network_adapters
mock_osutils.set_static_network_config.return_value = True
mock_osutils.get_network_adapter_name_by_mac_address = (
lambda mac: [n[0] for n in network_adapters if n[1] == mac][0])
network_execute = functools.partial(
self._network_plugin.execute,
mock_service, mock_shared_data
@ -66,7 +70,7 @@ class TestNetworkConfigPlugin(unittest.TestCase):
'common.networkconfig'):
ret = network_execute()
calls, calls6 = [], []
calls = []
for adapter in set(network_adapters) - set(missed_adapters):
nics = [nic for nic in (network_details +
extra_network_details)
@ -74,32 +78,27 @@ class TestNetworkConfigPlugin(unittest.TestCase):
self.assertTrue(nics) # missed_adapters should do the job
nic = nics[0]
call = mock.call(
nic.mac,
adapter[0],
nic.address,
nic.netmask,
nic.broadcast,
nic.gateway,
nic.dnsnameservers
)
call6 = mock.call(
nic.mac,
adapter[0],
nic.address6,
nic.netmask6,
nic.gateway6
nic.gateway6,
[]
)
calls.append(call)
if nic.address6 and nic.netmask6:
calls6.append(call6)
calls.append(call6)
self.assertEqual(
len(calls),
mock_osutils.set_static_network_config.call_count)
self.assertEqual(
len(calls6),
mock_osutils.set_static_network_config_v6.call_count)
mock_osutils.set_static_network_config.assert_has_calls(
calls, any_order=True)
mock_osutils.set_static_network_config_v6.assert_has_calls(
calls6, any_order=True)
reboot = len(missed_adapters) != self._count
self.assertEqual((plugin_base.PLUGIN_EXECUTION_DONE, reboot), ret)