Merge "Adds KMS and AVMA licensing automation"

This commit is contained in:
Jenkins 2017-02-23 01:45:40 +00:00 committed by Gerrit Code Review
commit 48592a8c24
10 changed files with 737 additions and 86 deletions

View File

@ -69,6 +69,19 @@ class GlobalOptions(conf_base.Options):
cfg.BoolOpt(
'activate_windows', default=False,
help='Activates Windows automatically'),
cfg.BoolOpt(
'set_kms_product_key', default=False,
help='Sets the KMS product key for this operating system'),
cfg.BoolOpt(
'set_avma_product_key', default=False,
help='Sets the AVMA product key for this operating system'),
cfg.StrOpt(
'kms_host', default=None,
help='The KMS host address in form <host>[:<port>], '
'e.g: "kmshost:1688"'),
cfg.BoolOpt(
'log_licensing_info', default=True,
help='Logs the operating system licensing information'),
cfg.BoolOpt(
'winrm_enable_basic_auth', default=True,
help='Enables basic authentication for the WinRM '

View File

@ -35,3 +35,6 @@ ALWAYS_CHANGE = 'always'
NEVER_CHANGE = 'no'
LOGON_PASSWORD_CHANGE_OPTIONS = [CLEAR_TEXT_INJECTED_ONLY, NEVER_CHANGE,
ALWAYS_CHANGE]
VOL_ACT_KMS = "KMS"
VOL_ACT_AVMA = "AVMA"

View File

@ -193,6 +193,12 @@ class BaseMetadataService(object):
def post_rdp_cert_thumbprint(self, thumbprint):
pass
def get_kms_host(self):
pass
def get_use_avma_licensing(self):
pass
class BaseHTTPMetadataService(BaseMetadataService):

View File

@ -107,6 +107,9 @@ class BaseOSUtils(object):
metric):
raise NotImplementedError()
def get_os_version(self):
raise NotImplementedError()
def check_os_version(self, major, minor, build=0):
raise NotImplementedError()

View File

@ -199,6 +199,10 @@ msvcrt.malloc.restype = ctypes.c_void_p
msvcrt.free.argtypes = [ctypes.c_void_p]
msvcrt.free.restype = None
ntdll.RtlGetVersion.argtypes = [
ctypes.POINTER(Win32_OSVERSIONINFOEX_W)]
ntdll.RtlGetVersion.restype = wintypes.DWORD
ntdll.RtlVerifyVersionInfo.argtypes = [
ctypes.POINTER(Win32_OSVERSIONINFOEX_W),
wintypes.DWORD, wintypes.ULARGE_INTEGER]
@ -1069,6 +1073,23 @@ class WindowsUtils(base.BaseOSUtils):
raise exception.CloudbaseInitException(
'Unable to add route: %s' % err)
def get_os_version(self):
vi = Win32_OSVERSIONINFOEX_W()
vi.dwOSVersionInfoSize = ctypes.sizeof(Win32_OSVERSIONINFOEX_W)
ret_val = ntdll.RtlGetVersion(ctypes.byref(vi))
if ret_val:
raise exception.WindowsCloudbaseInitException(
"RtlGetVersion failed with error: %s" % ret_val)
return {"major_version": vi.dwMajorVersion,
"minor_version": vi.dwMinorVersion,
"build_number": vi.dwBuildNumber,
"platform_id": vi.dwPlatformId,
"csd_version": vi.szCSDVersion,
"service_pack_major": vi.wServicePackMajor,
"service_pack_minor": vi.wServicePackMinor,
"suite_mask": vi.wSuiteMask,
"product_type": vi.wProductType}
def check_os_version(self, major, minor, build=0):
vi = Win32_OSVERSIONINFOEX_W()
vi.dwOSVersionInfoSize = ctypes.sizeof(Win32_OSVERSIONINFOEX_W)

View File

@ -12,15 +12,13 @@
# License for the specific language governing permissions and limitations
# under the License.
import os
from oslo_log import log as oslo_logging
from cloudbaseinit import conf as cloudbaseinit_conf
from cloudbaseinit import exception
from cloudbaseinit import constant
from cloudbaseinit.osutils import factory as osutils_factory
from cloudbaseinit.plugins.common import base
from cloudbaseinit.utils.windows import licensing
CONF = cloudbaseinit_conf.CONF
LOG = oslo_logging.getLogger(__name__)
@ -28,27 +26,55 @@ LOG = oslo_logging.getLogger(__name__)
class WindowsLicensingPlugin(base.BasePlugin):
def _run_slmgr(self, osutils, args):
if osutils.check_sysnative_dir_exists():
cscript_dir = osutils.get_sysnative_dir()
def _set_product_key(self, service, manager):
if not CONF.set_kms_product_key and not CONF.set_avma_product_key:
return
description, license_family, is_current = manager.get_kms_product()
if is_current:
LOG.info('Product "%s" is already the current one, no need to set '
'a product key', description)
else:
cscript_dir = osutils.get_system32_dir()
use_avma = service.get_use_avma_licensing()
if use_avma is None:
use_avma = CONF.set_avma_product_key
LOG.debug("Use AVMA: %s", use_avma)
# Not SYSNATIVE, as it is already executed by a x64 process
slmgr_dir = osutils.get_system32_dir()
product_key = None
if use_avma:
product_key = manager.get_volume_activation_product_key(
license_family, constant.VOL_ACT_AVMA)
if not product_key:
LOG.error("AVMA product key not found for this OS")
cscript_path = os.path.join(cscript_dir, "cscript.exe")
slmgr_path = os.path.join(slmgr_dir, "slmgr.vbs")
if not product_key and CONF.set_kms_product_key:
product_key = manager.get_volume_activation_product_key(
license_family, constant.VOL_ACT_KMS)
if not product_key:
LOG.error("KMS product key not found for this OS")
(out, err, exit_code) = osutils.execute_process(
[cscript_path, slmgr_path] + args, shell=False, decode_output=True)
if product_key:
LOG.info("Setting product key: %s", product_key)
manager.set_product_key(product_key)
if exit_code:
raise exception.CloudbaseInitException(
'slmgr.vbs failed with error code %(exit_code)s.\n'
'Output: %(out)s\nError: %(err)s' % {'exit_code': exit_code,
'out': out, 'err': err})
return out
def _set_kms_host(self, service, manager):
kms_host = service.get_kms_host() or CONF.kms_host
if kms_host:
LOG.info("Setting KMS host: %s", kms_host)
manager.set_kms_host(*kms_host.split(':'))
def _activate_windows(self, service, manager):
if CONF.activate_windows:
# note(alexpilotti): KMS clients activate themselves
# so this could be skipped if a KMS host is set
LOG.info("Activating Windows")
activation_result = manager.activate_windows()
LOG.debug("Activation result:\n%s" % activation_result)
def _log_licensing_info(self, manager):
if CONF.log_licensing_info:
license_info = manager.get_licensing_info()
LOG.info('Microsoft Windows license info:\n%s' % license_info)
def execute(self, service, shared_data):
osutils = osutils_factory.get_os_utils()
@ -57,12 +83,18 @@ class WindowsLicensingPlugin(base.BasePlugin):
LOG.info("Licensing info and activation are not available on "
"Nano Server")
else:
license_info = self._run_slmgr(osutils, ['/dlv'])
LOG.info('Microsoft Windows license info:\n%s' % license_info)
manager = licensing.get_licensing_manager()
if CONF.activate_windows:
LOG.info("Activating Windows")
activation_result = self._run_slmgr(osutils, ['/ato'])
LOG.debug("Activation result:\n%s" % activation_result)
eval_end_date = manager.is_eval()
if eval_end_date:
LOG.info("Evaluation license, skipping activation. "
"Evaluation end date: %s", eval_end_date)
else:
self._set_product_key(service, manager)
self._set_kms_host(service, manager)
self._activate_windows(service, manager)
manager.refresh_status()
self._log_licensing_info(manager)
return base.PLUGIN_EXECUTION_DONE, False

View File

@ -12,7 +12,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import os
import importlib
import unittest
try:
@ -20,90 +20,138 @@ try:
except ImportError:
import mock
from cloudbaseinit import exception
from cloudbaseinit.plugins.common import base
from cloudbaseinit.plugins.windows import licensing
from cloudbaseinit.tests import testutils
MODPATH = "cloudbaseinit.plugins.windows.licensing"
class WindowsLicensingPluginTests(unittest.TestCase):
def setUp(self):
self._wmi_mock = mock.MagicMock()
self._module_patcher = mock.patch.dict(
'sys.modules', {
'wmi': self._wmi_mock})
self.snatcher = testutils.LogSnatcher(MODPATH)
self._module_patcher.start()
licensing = importlib.import_module(MODPATH)
self._licensing = licensing.WindowsLicensingPlugin()
def _test_run_slmgr(self, sysnative, exit_code):
mock_osutils = mock.MagicMock()
get_system32_dir_calls = [mock.call()]
cscript_path = os.path.join('cscrypt path', "cscript.exe")
slmgr_path = os.path.join('slmgr path', "slmgr.vbs")
def tearDown(self):
self._module_patcher.stop()
mock_osutils.check_sysnative_dir_exists.return_value = sysnative
mock_osutils.get_sysnative_dir.return_value = 'cscrypt path'
if not sysnative:
mock_osutils.get_system32_dir.side_effect = ['cscrypt path',
'slmgr path']
@testutils.ConfPatcher('set_kms_product_key', True)
@testutils.ConfPatcher('set_avma_product_key', True)
def _test_set_product_key(self, description=None,
license_family=None, is_current=None):
mock_service = mock.Mock()
mock_manager = mock.Mock()
fake_key = mock.sentinel.key
mock_service.get_use_avma_licensing.return_value = None
mock_manager.get_kms_product.return_value = (description,
license_family,
is_current)
mock_manager.get_volume_activation_product_key.return_value = fake_key
with self.snatcher:
self._licensing._set_product_key(mock_service, mock_manager)
mock_manager.get_kms_product.assert_called_once_with()
if is_current:
expected_logs = ['Product "%s" is already the current one, '
'no need to set a product key' % description]
self.assertEqual(self.snatcher.output, expected_logs)
return
else:
mock_osutils.get_system32_dir.return_value = 'slmgr path'
mock_osutils.execute_process.return_value = ('fake output', None,
exit_code)
mock_service.get_use_avma_licensing.assert_called_once_with()
(mock_manager.get_volume_activation_product_key.
assert_called_once_with(None, 'AVMA'))
mock_manager.set_product_key.assert_called_once_with(fake_key)
if exit_code:
self.assertRaises(exception.CloudbaseInitException,
self._licensing._run_slmgr,
mock_osutils, ['fake args'])
else:
response = self._licensing._run_slmgr(osutils=mock_osutils,
args=['fake args'])
self.assertEqual('fake output', response)
def test_set_product_key(self):
self._test_set_product_key()
mock_osutils.check_sysnative_dir_exists.assert_called_once_with()
if sysnative:
mock_osutils.get_sysnative_dir.assert_called_once_with()
else:
get_system32_dir_calls.append(mock.call())
def test_set_product_key_is_current(self):
self._test_set_product_key(is_current=True)
mock_osutils.execute_process.assert_called_once_with(
[cscript_path, slmgr_path, 'fake args'],
shell=False, decode_output=True)
self.assertEqual(get_system32_dir_calls,
mock_osutils.get_system32_dir.call_args_list)
def test_set_kms_host(self):
mock_service = mock.Mock()
mock_manager = mock.Mock()
mock_host = "127.0.0.1:1688"
expected_host_call = mock_host.split(':')
mock_service.get_kms_host.return_value = mock_host
expected_logs = ["Setting KMS host: %s" % mock_host]
with self.snatcher:
self._licensing._set_kms_host(mock_service, mock_manager)
self.assertEqual(self.snatcher.output, expected_logs)
mock_manager.set_kms_host.assert_called_once_with(*expected_host_call)
def test_run_slmgr_sysnative(self):
self._test_run_slmgr(sysnative=True, exit_code=None)
def test_run_slmgr_not_sysnative(self):
self._test_run_slmgr(sysnative=False, exit_code=None)
def test_run_slmgr_exit_code(self):
self._test_run_slmgr(sysnative=True, exit_code='fake exit code')
def test_activate_windows(self):
activate_result = mock.Mock()
mock_service = mock.Mock()
mock_manager = mock.Mock()
mock_manager.activate_windows.return_value = activate_result
expected_logs = [
"Activating Windows",
"Activation result:\n%s" % activate_result]
with testutils.ConfPatcher('activate_windows', True):
with self.snatcher:
self._licensing._activate_windows(mock_service, mock_manager)
self.assertEqual(self.snatcher.output, expected_logs)
mock_manager.activate_windows.assert_called_once_with()
@mock.patch(MODPATH + ".WindowsLicensingPlugin._activate_windows")
@mock.patch(MODPATH + ".WindowsLicensingPlugin._set_kms_host")
@mock.patch(MODPATH + ".WindowsLicensingPlugin._set_product_key")
@mock.patch('cloudbaseinit.osutils.factory.get_os_utils')
@mock.patch('cloudbaseinit.plugins.windows.licensing'
'.WindowsLicensingPlugin._run_slmgr')
def _test_execute(self, mock_run_slmgr, mock_get_os_utils,
activate_windows=None, nano=False):
@mock.patch('cloudbaseinit.utils.windows.licensing.get_licensing_manager')
def _test_execute(self, mock_get_licensing_manager,
mock_get_os_utils,
mock_set_product_key,
mock_set_kms_host,
mock_activate_windows,
nano=False, is_eval=True):
mock_service = mock.Mock()
mock_manager = mock.Mock()
mock_get_licensing_manager.return_value = mock_manager
mock_osutils = mock.MagicMock()
mock_osutils.is_nano_server.return_value = nano
run_slmgr_calls = [mock.call(mock_osutils, ['/dlv'])]
mock_get_os_utils.return_value = mock_osutils
with testutils.ConfPatcher('activate_windows', activate_windows):
response = self._licensing.execute(service=None, shared_data=None)
mock_manager.is_eval.return_value = is_eval
mock_manager.get_licensing_info.return_value = "fake"
expected_logs = []
with self.snatcher:
response = self._licensing.execute(service=mock_service,
shared_data=None)
mock_get_os_utils.assert_called_once_with()
if nano:
expected_logs = ["Licensing info and activation are "
"not available on Nano Server"]
self.assertEqual(self.snatcher.output, expected_logs)
return # no activation available
if activate_windows:
run_slmgr_calls.append(mock.call(mock_osutils, ['/ato']))
else:
if not is_eval:
mock_set_product_key.assert_called_once_with(mock_service,
mock_manager)
mock_set_kms_host.assert_called_once_with(mock_service,
mock_manager)
mock_activate_windows.assert_called_once_with(mock_service,
mock_manager)
else:
expected_logs.append("Evaluation license, skipping activation"
". Evaluation end date: %s" % is_eval)
expected_logs.append('Microsoft Windows license info:\nfake')
mock_manager.get_licensing_info.assert_called_once_with()
self.assertEqual(run_slmgr_calls, mock_run_slmgr.call_args_list)
self.assertEqual((base.PLUGIN_EXECUTION_DONE, False), response)
self.assertEqual(self.snatcher.output, expected_logs)
def test_execute_activate_windows_true(self):
self._test_execute(activate_windows=True)
def test_execute_activate_windows_false(self):
self._test_execute(activate_windows=False)
def test_execute_activate_windows_nano(self):
def test_execute_nano(self):
self._test_execute(nano=True)
def test_execute_is_evaluated(self):
self._test_execute()
def test_execute(self):
self._test_execute(is_eval=False)

View File

@ -0,0 +1,231 @@
# Copyright (c) 2017 Cloudbase Solutions Srl
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import importlib
import unittest
try:
import unittest.mock as mock
except ImportError:
import mock
from cloudbaseinit import exception
from cloudbaseinit.tests import testutils
MODPATH = "cloudbaseinit.utils.windows.licensing"
class LicensingTest(unittest.TestCase):
def setUp(self):
self._wmi_mock = mock.MagicMock()
self._module_patcher = mock.patch.dict(
'sys.modules', {
'wmi': self._wmi_mock})
self.snatcher = testutils.LogSnatcher(MODPATH)
self._module_patcher.start()
self.licensing = importlib.import_module(MODPATH)
self._licensing = self.licensing.LicensingManager()
self._licensing_v2 = self.licensing.LicensingManagerV2()
def tearDown(self):
self._module_patcher.stop()
@mock.patch('cloudbaseinit.osutils.factory.get_os_utils')
def _test_run_slmgr(self, mock_get_os_utils, ret_val=0,
sysnative=True):
mock_args = [mock.sentinel.args]
mock_outval = b"fake-out"
mock_cscriptdir = r"fake\cscript\dir"
mock_osutils = mock.Mock()
mock_osutils.get_sysnative_dir.return_value = mock_cscriptdir
mock_osutils.get_system32_dir.return_value = mock_cscriptdir
mock_get_os_utils.return_value = mock_osutils
mock_osutils.get_system32_dir.return_value = r"fakedir"
mock_osutils.check_sysnative_dir_exists.return_value = sysnative
mock_osutils.execute_process.return_value = (
mock_outval, mock.sentinel.err, ret_val)
if ret_val:
self.assertRaises(exception.CloudbaseInitException,
self._licensing._run_slmgr, mock_args)
else:
res_out = self._licensing._run_slmgr(mock_args)
self.assertEqual(res_out, "fake-out")
self.assertEqual(mock_osutils.execute_process.call_count, 1)
def test_run_slmgr_sys_native(self):
self._test_run_slmgr()
def test_run_slmgr_system32(self):
self._test_run_slmgr(sysnative=False)
def test_run_slmgr_fail(self):
self._test_run_slmgr(ret_val=1)
@mock.patch(MODPATH + ".LicensingManager._run_slmgr")
def test_get_licensing_info(self, mock_run_slmgr):
mock_out = mock.sentinel.out_val
mock_run_slmgr.return_value = mock_out
res = self._licensing.get_licensing_info()
mock_run_slmgr.assert_called_once_with(['/dlv'])
self.assertEqual(res, mock_out)
@mock.patch(MODPATH + ".LicensingManager._run_slmgr")
def test_activate_windows(self, mock_run_slmgr):
mock_out = mock.sentinel.out_val
mock_run_slmgr.return_value = mock_out
res = self._licensing.activate_windows()
mock_run_slmgr.assert_called_once_with(['/ato'])
self.assertEqual(res, mock_out)
@mock.patch(MODPATH + ".LicensingManager._run_slmgr")
def test_set_kms_host(self, mock_run_slmgr):
mock_out = mock.sentinel.out_val
mock_kms = mock.sentinel.kms_host
mock_run_slmgr.return_value = mock_out
res = self._licensing.set_kms_host(mock_kms)
expected_host = "%s:%s" % (mock_kms, self.licensing.DEFAULT_KMS_PORT)
mock_run_slmgr.assert_called_once_with(['/skms', expected_host])
self.assertEqual(res, mock_out)
@mock.patch(MODPATH + ".LicensingManager._run_slmgr")
def test_set_kms_auto_discovery(self, mock_run_slmgr):
mock_out = mock.sentinel.out_val
mock_run_slmgr.return_value = mock_out
res = self._licensing.set_kms_auto_discovery()
mock_run_slmgr.assert_called_once_with(['/ckms'])
self.assertEqual(res, mock_out)
@mock.patch(MODPATH + ".LicensingManager._run_slmgr")
def test_set_product_key(self, mock_run_slmgr):
mock_out = mock.sentinel.out_val
mock_product_key = mock.sentinel.product_key
mock_run_slmgr.return_value = mock_out
res = self._licensing.set_product_key(mock_product_key)
mock_run_slmgr.assert_called_once_with(['/ipk', mock_product_key])
self.assertEqual(res, mock_out)
def test_is_eval_v1(self):
with self.assertRaises(NotImplementedError):
self._licensing.is_eval()
def test_get_kms_product_v1(self):
with self.assertRaises(NotImplementedError):
self._licensing.get_kms_product()
def test_get_volume_activation_product_key_v1(self):
with self.assertRaises(NotImplementedError):
self._licensing.get_volume_activation_product_key('fake')
def test_get_service(self):
mock_result = mock.Mock()
conn = self._wmi_mock.WMI
conn.SoftwareLicensingService.return_value = [mock_result]
self._licensing_v2._get_service()
self.assertIsNotNone(self._licensing_v2._service)
@mock.patch(MODPATH + '.LicensingManagerV2._get_service')
def test_set_product_key_v2(self, mock_get_service):
mock_product_key = mock.Mock()
mock_service = mock.Mock()
mock_get_service.return_value = mock_service
self._licensing_v2.set_product_key(mock_product_key)
mock_get_service.assert_called_once_with()
mock_service.InstallProductKey.assert_called_once_with(
mock_product_key)
@mock.patch(MODPATH + '.LicensingManagerV2._get_service')
def test_set_kms_auto_discovery_v2(self, mock_get_service):
mock_service = mock.Mock()
mock_get_service.return_value = mock_service
self._licensing_v2.set_kms_auto_discovery()
mock_get_service.assert_called_once_with()
mock_service.ClearKeyManagementServiceMachine.assert_called_once_with()
mock_service.ClearKeyManagementServicePort.assert_called_once_with()
@mock.patch(MODPATH + '.LicensingManagerV2._get_service')
def test_set_kms_host_v2(self, mock_get_service):
mock_service = mock.Mock()
mock_host = mock.sentinel.host
mock_port = mock.sentinel.port
mock_get_service.return_value = mock_service
self._licensing_v2.set_kms_host(mock_host, mock_port)
mock_get_service.assert_called_once_with()
mock_service.SetKeyManagementServiceMachine.assert_called_once_with(
mock_host)
mock_service.SetKeyManagementServicePort.assert_called_once_with(
mock_port)
@mock.patch(MODPATH + '.LicensingManagerV2._get_service')
def test_refresh_status_v2(self, mock_get_service):
mock_service = mock.Mock()
mock_get_service.return_value = mock_service
self._licensing_v2.refresh_status()
mock_get_service.assert_called_once_with()
mock_service.RefreshLicenseStatus.assert_called_once_with()
def test_is_current_product(self):
mock_product = mock.Mock()
mock_product.PartialProductKey = "fake-key"
res = self._licensing_v2._is_current_product(mock_product)
self.assertTrue(res)
def test_get_products(self):
mock_result = mock.Mock()
conn = self._wmi_mock.WMI
conn.query.return_value = mock_result
res = self._licensing_v2._get_products()
self.assertEqual(res, self._licensing_v2._products)
@mock.patch(MODPATH + ".LicensingManagerV2._get_products")
def test_is_eval(self, mock_get_products):
mock_product = mock.Mock()
mock_product.ApplicationId = self.licensing.WINDOWS_APP_ID
mock_product.Description = u"TIMEBASED_EVAL"
mock_product.EvaluationEndDate = "fake"
mock_get_products.return_value = [mock_product]
res = self._licensing_v2.is_eval()
self.assertEqual(res, "fake")
@mock.patch(MODPATH + ".LicensingManagerV2._get_products")
def _test_get_kms_product(self, mock_get_products, products=()):
mock_get_products.return_value = products
if not products:
self.assertRaises(exception.ItemNotFoundException,
self._licensing_v2.get_kms_product)
return
res = self._licensing_v2.get_kms_product()
self.assertIsNotNone(res)
def test_get_kms_product_no_keys(self):
self._test_get_kms_product()
def test_get_kms_product(self):
mock_product = mock.Mock()
mock_product.ApplicationId = self.licensing.WINDOWS_APP_ID
mock_product.Description = u"VOLUME_KMSCLIENT"
self._test_get_kms_product(products=[mock_product])
@mock.patch('cloudbaseinit.osutils.factory.get_os_utils')
def test_get_volume_activation_product_key(self, mock_get_os_utils):
mock_os_version = {'major_version': 10, 'minor_version': 0}
expected_key = "WC2BQ-8NRM3-FDDYY-2BFGV-KHKQY"
mock_osutils = mock.Mock()
mock_get_os_utils.return_value = mock_osutils
mock_osutils.get_os_version.return_value = mock_os_version
res = self._licensing_v2.get_volume_activation_product_key(
license_family="ServerStandard")
self.assertEqual(res, expected_key)

View File

@ -0,0 +1,175 @@
# Copyright 2014 Cloudbase Solutions Srl
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import wmi
from oslo_log import log as oslo_logging
from cloudbaseinit import constant
from cloudbaseinit import exception
from cloudbaseinit.osutils import factory as osutils_factory
from cloudbaseinit.utils.windows import productkeys
LOG = oslo_logging.getLogger(__name__)
WINDOWS_APP_ID = "55c92734-d682-4d71-983e-d6ec3f16059f"
DEFAULT_KMS_PORT = 1688
def get_licensing_manager():
osutils = osutils_factory.get_os_utils()
if osutils.check_os_version(6, 1):
return LicensingManagerV2()
else:
return LicensingManager()
class LicensingManager(object):
@staticmethod
def _run_slmgr(args):
osutils = osutils_factory.get_os_utils()
if osutils.check_sysnative_dir_exists():
cscript_dir = osutils.get_sysnative_dir()
else:
cscript_dir = osutils.get_system32_dir()
# Not SYSNATIVE, as it is already executed by a x64 process
slmgr_dir = osutils.get_system32_dir()
cscript_path = os.path.join(cscript_dir, "cscript.exe")
slmgr_path = os.path.join(slmgr_dir, "slmgr.vbs")
(out, err, exit_code) = osutils.execute_process(
[cscript_path, slmgr_path] + args, shell=False, decode_output=True)
if exit_code:
raise exception.CloudbaseInitException(
'slmgr.vbs failed with error code %(exit_code)s.\n'
'Output: %(out)s\nError: %(err)s' % {'exit_code': exit_code,
'out': out, 'err': err})
return out.decode(errors='replace')
def get_licensing_info(self):
return self._run_slmgr(['/dlv'])
def activate_windows(self):
return self._run_slmgr(['/ato'])
def set_kms_host(self, host, port=DEFAULT_KMS_PORT):
kms_host = "%s:%s" % (host, port)
return self._run_slmgr(['/skms', kms_host])
def set_kms_auto_discovery(self):
return self._run_slmgr(['/ckms'])
def set_product_key(self, product_key):
return self._run_slmgr(['/ipk', product_key])
def is_eval(self):
raise NotImplementedError()
def get_kms_product(self):
raise NotImplementedError()
def get_volume_activation_product_key(self, license_family,
vol_act_type=constant.VOL_ACT_KMS):
raise NotImplementedError()
def refresh_status(self):
pass
class LicensingManagerV2(LicensingManager):
def __init__(self):
self._products = None
self._service = None
def _get_service(self):
if not self._service:
conn = wmi.WMI(moniker='//./root/cimv2')
self._service = conn.SoftwareLicensingService()[0]
return self._service
def set_product_key(self, product_key):
service = self._get_service()
service.InstallProductKey(product_key)
def set_kms_auto_discovery(self):
service = self._get_service()
service.ClearKeyManagementServiceMachine()
service.ClearKeyManagementServicePort()
def set_kms_host(self, host, port=DEFAULT_KMS_PORT):
service = self._get_service()
service.SetKeyManagementServiceMachine(host)
service.SetKeyManagementServicePort(port)
def refresh_status(self):
service = self._get_service()
service.RefreshLicenseStatus()
def _get_products(self):
if not self._products:
conn = wmi.WMI(moniker='//./root/cimv2')
self._products = conn.query(
'SELECT ID, ApplicationId, PartialProductKey, '
'LicenseFamily, LicenseIsAddon, Description, '
'EvaluationEndDate, Name FROM '
'SoftwareLicensingProduct WHERE '
'LicenseIsAddon=False')
return self._products
@staticmethod
def _is_current_product(product):
return bool(product.PartialProductKey)
def is_eval(self):
def _is_eval(product):
return (u"TIMEBASED_EVAL" in product.Description or
product.EvaluationEndDate != u"16010101000000.000000-000")
for product in self._get_products():
app_id = product.ApplicationId.lower()
if (app_id == WINDOWS_APP_ID and _is_eval(product) and
self._is_current_product(product)):
return product.EvaluationEndDate
def get_kms_product(self):
def _is_kms_client(product):
# note(alexpilotti): could check for
# KeyManagementServiceProductKeyID
return u"VOLUME_KMSCLIENT" in product.Description
for product in self._get_products():
app_id = product.ApplicationId.lower()
if app_id == WINDOWS_APP_ID and _is_kms_client(product):
return (product.Description, product.LicenseFamily,
self._is_current_product(product))
raise exception.ItemNotFoundException("KMS client product not found")
def get_volume_activation_product_key(self, license_family,
vol_act_type=constant.VOL_ACT_KMS):
osutils = osutils_factory.get_os_utils()
os_version = osutils.get_os_version()
os_major = os_version["major_version"]
os_minor = os_version["minor_version"]
product_keys_map = productkeys.SKU_TO_PRODUCT_KEY_MAP.get(
(os_major, os_minor, vol_act_type), {})
return product_keys_map.get(license_family)

View File

@ -0,0 +1,119 @@
# Copyright 2017 Cloudbase Solutions Srl
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from cloudbaseinit import constant
SKU_TO_PRODUCT_KEY_MAP = {
# KMS: https://technet.microsoft.com/en-US/jj612867.aspx
# AVMA: https://technet.microsoft.com/en-us/library/dn303421.aspx
(6, 1, constant.VOL_ACT_KMS): {
# Windows 7, Windows server 2008 R2
"Business": "FJ82H-XT6CR-J8D7P-XQJJ2-GPDD4",
"BusinessN": "MRPKT-YTG23-K7D7T-X2JMM-QY7MG",
"BusinessE": "W82YF-2Q76Y-63HXB-FGJG9-GF7QX",
"Enterprise": "33PXH-7Y6KF-2VJC9-XBBR8-HVTHH",
"EnterpriseN": "YDRBP-3D83W-TY26F-D46B2-XCKRJ",
"EnterpriseE": "C29WB-22CC8-VJ326-GHFJW-H9DH4",
"ServerComputeCluster": "FKJQ8-TMCVP-FRMR7-4WR42-3JCD7",
"ServerDatacenter": "74YFP-3QFB3-KQT8W-PMXWJ-7M648",
"ServerEnterprise": "489J6-VHDMP-X63PK-3K798-CPX3Y",
"ServerEnterpriseIA64": "GT63C-RJFQ3-4GMB6-BRFB9-CB83V",
"ServerStandard": "YC6KT-GKW9T-YTKYR-T4X34-R7VHC",
"ServerWeb": "6TPJF-RBVHG-WBW2R-86QPH-6RTM4",
},
(6, 2, constant.VOL_ACT_KMS): {
# Windows 8, Windows server 2012
"Core": "BN3D2-R7TKB-3YPBD-8DRP2-27GG4",
"CoreARM": "DXHJF-N9KQX-MFPVR-GHGQK-Y7RKV",
"CoreCountrySpecific": "4K36P-JN4VD-GDC6V-KDT89-DYFKP",
"CoreN": "8N2M2-HWPGY-7PGT9-HGDD8-GVGGY",
"CoreSingleLanguage": "2WN2H-YGCQR-KFX6K-CD6TF-84YXQ",
"Enterprise": "32JNW-9KQ84-P47T8-D8GGY-CWCK7",
"EnterpriseN": "JMNMF-RHW7P-DMY6X-RF3DR-X2BQT",
"Professional": "NG4HW-VH26C-733KW-K6F98-J8CK4",
"ProfessionalN": "XCVCF-2NXM9-723PB-MHCB7-2RYQQ",
"ProfessionalWMC": "GNBB8-YVD74-QJHX6-27H4K-8QHDG",
"ServerDatacenter": "48HP8-DN98B-MYWDG-T2DCC-8W83P",
"ServerDatacenterCore": "48HP8-DN98B-MYWDG-T2DCC-8W83P",
"ServerMultiPointStandard": "HM7DN-YVMH3-46JC3-XYTG7-CYQJJ",
"ServerMultiPointPremium": "XNH6W-2V9GX-RGJ4K-Y8X6F-QGJ2G",
"ServerStandard": "XC9B7-NBPP2-83J2H-RHMBY-92BT4",
"ServerStandardCore": "XC9B7-NBPP2-83J2H-RHMBY-92BT4",
},
(6, 3, constant.VOL_ACT_KMS): {
# Windows 8.1, Windows server 2012 R2
"CoreARM": "XYTND-K6QKT-K2MRH-66RTM-43JKP",
"ServerStandard": "D2N9P-3P6X9-2R39C-7RTCD-MDVJX",
"ServerCloudStorageCore": "3NPTF-33KPT-GGBPR-YX76B-39KDD",
"ServerCloudStorage": "3NPTF-33KPT-GGBPR-YX76B-39KDD",
"EmbeddedIndustryA": "VHXM3-NR6FT-RY6RT-CK882-KW2CJ",
"CoreN": "7B9N3-D94CG-YTVHR-QBPX3-RJP64",
"CoreSingleLanguage": "BB6NG-PQ82V-VRDPW-8XVD2-V8P66",
"ServerDatacenterCore": "W3GGN-FT8W3-Y4M27-J84CP-Q3VJ9",
"Professional": "GCRJD-8NW9H-F2CDX-CCM8D-9D6T9",
"ServerSolutionCore": "KNC87-3J2TX-XB4WP-VCPJV-M4FWM",
"ServerSolution": "KNC87-3J2TX-XB4WP-VCPJV-M4FWM",
"EmbeddedIndustryE": "FNFKF-PWTVT-9RC8H-32HB2-JB34X",
"ProfessionalN": "HMCNV-VVBFX-7HMBH-CTY9B-B4FXY",
"EmbeddedIndustry": "NMMPB-38DD4-R2823-62W8D-VXKJB",
"CoreCountrySpecific": "NCTT7-2RGK8-WMHRF-RY7YQ-JTXG3",
"ProfessionalWMC": "789NJ-TQK6T-6XTH8-J39CJ-J8D3P",
"ServerDatacenter": "W3GGN-FT8W3-Y4M27-J84CP-Q3VJ9",
"ServerStandardCore": "D2N9P-3P6X9-2R39C-7RTCD-MDVJX",
"Enterprise": "MHF9N-XY6XB-WVXMC-BTDCT-MKKG7",
"Core": "M9Q9P-WNJJT-6PXPY-DWX8H-6XWKK",
"EnterpriseN": "TT4HM-HN7YT-62K67-RGRQJ-JFFXW",
},
(6, 3, constant.VOL_ACT_AVMA): {
# Windows server 2012 R2
"ServerSolutionCore": "K2XGM-NMBT3-2R6Q8-WF2FK-P36R2",
"ServerDatacenterCore": "Y4TGP-NPTV9-HTC2H-7MGQ3-DV4TW",
"ServerStandardCore": "DBGBW-NPF86-BJVTX-K3WKJ-MTB6V",
"ServerSolution": "K2XGM-NMBT3-2R6Q8-WF2FK-P36R2",
"ServerDatacenter": "Y4TGP-NPTV9-HTC2H-7MGQ3-DV4TW",
"ServerStandard": "DBGBW-NPF86-BJVTX-K3WKJ-MTB6V",
},
(10, 0, constant.VOL_ACT_KMS): {
# Windows 10, Windows Server 2016
"Professional": "W269N-WFGWX-YVC9B-4J6C9-T83GX",
"ProfessionalN": "MH37W-N47XK-V7XM9-C7227-GCQG9",
"Enterprise": "NPPR9-FWDCX-D2C8J-H872K-2YT43",
"EnterpriseN": "DPH2V-TTNVB-4X9Q3-TJR4H-KHJW4",
"ServerDatacenterCore": "CB7KF-BWN84-R7R2Y-793K2-8XDDG",
"ServerDatacenter": "CB7KF-BWN84-R7R2Y-793K2-8XDDG",
"ServerStandardCore": "WC2BQ-8NRM3-FDDYY-2BFGV-KHKQY",
"ServerStandard": "WC2BQ-8NRM3-FDDYY-2BFGV-KHKQY",
"ServerSolutionCore": "JCKRF-N37P4-C2D82-9YXRT-4M63B",
"ServerSolution": "JCKRF-N37P4-C2D82-9YXRT-4M63B",
"ServerCloudStorage": "QN4C6-GBJD2-FB422-GHWJK-GJG2R",
"ServerCloudStorageCore": "QN4C6-GBJD2-FB422-GHWJK-GJG2R",
"ServerAzureCor": "VP34G-4NPPG-79JTQ-864T4-R3MQX",
"ServerAzureCorCore": "VP34G-4NPPG-79JTQ-864T4-R3MQX",
},
(10, 0, constant.VOL_ACT_AVMA): {
# Windows server 2016
"ServerSolutionCore": "B4YNW-62DX9-W8V6M-82649-MHBKQ",
"ServerDatacenterCore": "TMJ3Y-NTRTM-FJYXT-T22BY-CWG3J",
"ServerStandardCore": "C3RCX-M6NRP-6CXC9-TW2F2-4RHYD",
"ServerSolution": "B4YNW-62DX9-W8V6M-82649-MHBKQ",
"ServerDatacenter": "TMJ3Y-NTRTM-FJYXT-T22BY-CWG3J",
"ServerStandard": "C3RCX-M6NRP-6CXC9-TW2F2-4RHYD",
},
}