Add certificates plugin for Windows

Install all the certificates provided by the metadata.

Change-Id: Ida2550a10fa043e40b194db5d0db10692e716edf
Implements: add-certificates-plugin
Co-Authored-By: Paula Madalina Crismaru <pcrismaru@cloudbasesolutions.com>
This commit is contained in:
Alessandro Pilotti 2017-02-24 13:55:20 +02:00 committed by Paula Madalina Crismaru
parent 34fb5a065f
commit 341930eb91
9 changed files with 791 additions and 24 deletions

View File

@ -43,3 +43,6 @@ LOGON_PASSWORD_CHANGE_OPTIONS = [CLEAR_TEXT_INJECTED_ONLY, NEVER_CHANGE,
VOL_ACT_KMS = "KMS"
VOL_ACT_AVMA = "AVMA"
CERT_LOCATION_LOCAL_MACHINE = "LocalMachine"
CERT_LOCATION_CURRENT_USER = "CurrentUser"

View File

@ -157,6 +157,9 @@ class BaseMetadataService(object):
def get_winrm_listeners_configuration(self):
pass
def get_server_certs(self):
pass
def get_client_auth_certs(self):
pass

View File

@ -0,0 +1,66 @@
# Copyright (c) 2017 Cloudbase Solutions Srl
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log as oslo_logging
from cloudbaseinit import conf as cloudbaseinit_conf
from cloudbaseinit import constant
from cloudbaseinit import exception
from cloudbaseinit.plugins.common import base
from cloudbaseinit.utils.windows import x509
CONF = cloudbaseinit_conf.CONF
LOG = oslo_logging.getLogger(__name__)
class ServerCertificatesPlugin(base.BasePlugin):
@staticmethod
def _use_machine_keyset(store_location):
if store_location == constant.CERT_LOCATION_LOCAL_MACHINE:
return True
elif store_location == constant.CERT_LOCATION_CURRENT_USER:
return False
else:
raise exception.ItemNotFoundException(
"Unsupported certificate store location: %s" %
store_location)
def execute(self, service, shared_data):
certs_info = service.get_server_certs()
if certs_info is None:
LOG.info("The metadata service does not provide server "
"certificates")
else:
cert_mgr = x509.CryptoAPICertManager()
for cert_info in service.get_server_certs():
cert_name = cert_info.get("name")
store_location = cert_info.get("store_location")
store_name = cert_info.get("store_name")
pfx_data = cert_info.get("pfx_data")
machine_keyset = self._use_machine_keyset(store_location)
pfx_password = None
LOG.info("Importing PFX certificate %(cert_name)s in store "
"%(store_location)s, %(store_name)s",
{"cert_name": cert_name,
"store_location": store_location,
"store_name": store_name})
cert_mgr.import_pfx_certificate(
pfx_data, pfx_password, machine_keyset, store_name)
return base.PLUGIN_EXECUTION_DONE, False
def get_os_requirements(self):
return 'win32', (5, 2)

View File

@ -0,0 +1,132 @@
# Copyright 2017 Cloudbase Solutions Srl
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import importlib
import unittest
try:
import unittest.mock as mock
except ImportError:
import mock
from cloudbaseinit import constant
from cloudbaseinit import exception
from cloudbaseinit.tests import testutils
class ServerCertificatesPluginTests(unittest.TestCase):
def setUp(self):
module_path = 'cloudbaseinit.plugins.windows.certificates'
self.snatcher = testutils.LogSnatcher(module_path)
self._ctypes_mock = mock.MagicMock()
self._comtypes_mock = mock.MagicMock()
self._ctypes_mock_ = mock.MagicMock()
self._module_patcher = mock.patch.dict(
'sys.modules',
{'comtypes': self._comtypes_mock,
'ctypes': self._ctypes_mock,
'ctypes.windll': self._ctypes_mock_})
self._module_patcher.start()
self.addCleanup(self._module_patcher.stop)
self.cert_module = importlib.import_module(module_path)
self._cert = self.cert_module.ServerCertificatesPlugin()
@mock.patch.object(constant, 'CERT_LOCATION_LOCAL_MACHINE',
mock.sentinel.CERT_LOCATION_LOCAL_MACHINE,
create=True)
@mock.patch.object(constant, 'CERT_LOCATION_CURRENT_USER',
mock.sentinel.CERT_LOCATION_CURRENT_USER,
create=True)
def _test_use_machine_keyset(self, store_location):
if store_location == constant.CERT_LOCATION_LOCAL_MACHINE:
expected_result = True
elif store_location == constant.CERT_LOCATION_CURRENT_USER:
expected_result = False
else:
ex = exception.ItemNotFoundException(
"Unsupported certificate store location: %s" %
store_location)
with self.assertRaises(exception.ItemNotFoundException) as exc:
(self._cert._use_machine_keyset(store_location))
self.assertEqual(str(ex), str(exc.exception))
return
result = (self._cert._use_machine_keyset(store_location))
self.assertEqual(result, expected_result)
def test_use_keyset_current_user(self):
store_location = mock.sentinel.CERT_LOCATION_CURRENT_USER
self._test_use_machine_keyset(store_location)
def test_use_keyset_local_machine(self):
store_location = mock.sentinel.CERT_LOCATION_LOCAL_MACHINE
self._test_use_machine_keyset(store_location)
def test_use_keyset_except(self):
self._test_use_machine_keyset(None)
def test_get_os_requirements(self):
result = self._cert.get_os_requirements()
self.assertEqual(result, ('win32', (5, 2)))
@mock.patch('cloudbaseinit.utils.windows.x509.CryptoAPICertManager')
def _test_execute(self, mock_crypto_manager, certs_info=None):
mock_service = mock.Mock()
mock_service.get_server_certs.return_value = certs_info
self._cert._use_machine_keyset = mock.Mock()
if certs_info is None:
expected_logging = [
"The metadata service does not provide server "
"certificates"
]
call_count = 0
else:
call_count = len(certs_info)
cert_info = certs_info[0]
cert_name = cert_info.get("name")
store_location = cert_info.get("store_location")
store_name = cert_info.get("store_name")
expected_logging = [
"Importing PFX certificate {cert_name} in store "
"{store_location}, {store_name}".format(
cert_name=cert_name,
store_location=store_location,
store_name=store_name)
] * call_count
with self.snatcher:
result = self._cert.execute(
mock_service, mock.sentinel.shared_data)
self.assertEqual(expected_logging, self.snatcher.output)
self.assertEqual(result,
(self.cert_module.base.PLUGIN_EXECUTION_DONE, False))
self.assertEqual(mock_crypto_manager.return_value.
import_pfx_certificate.call_count, call_count)
self.assertEqual(self._cert._use_machine_keyset.call_count, call_count)
def test_execute_no_certs(self):
self._test_execute()
def test_execute(self):
cert_info = {
"name": "fake_name",
"store_location": "fake store_location",
"store_name": "fake store_name",
"pfx_data": "fake pfx_data"
}
certs_info = [cert_info] * 5
self._test_execute(certs_info=certs_info)

View File

@ -54,3 +54,7 @@ class TestEncoding(unittest.TestCase):
if encode:
data = data.encode()
self.assertEqual(data, content)
def test_hex_to_bytes(self):
result = encoding.hex_to_bytes("66616b652064617461")
self.assertEqual(result, b"fake data")

View File

@ -30,7 +30,9 @@ class CryptoAPICertManagerTests(unittest.TestCase):
self._ctypes = mock.MagicMock()
self._module_patcher = mock.patch.dict(
'sys.modules', {'ctypes': self._ctypes})
'sys.modules',
{'ctypes': self._ctypes,
'ctypes.windll': mock.MagicMock()})
self._module_patcher.start()
@ -122,8 +124,8 @@ class CryptoAPICertManagerTests(unittest.TestCase):
self.x509.cryptoapi.PROV_RSA_FULL,
self.x509.cryptoapi.CRYPT_MACHINE_KEYSET)
mock_CryptGenKey.assert_called_with(
mock_HANDLE(), self.x509.cryptoapi.AT_SIGNATURE,
0x08000000, mock_HANDLE())
mock_HANDLE(), self.x509.cryptoapi.AT_KEYEXCHANGE,
0x08000000, mock_byref(mock_HANDLE()))
mock_CryptDestroyKey.assert_called_once_with(
mock_HANDLE())
mock_CryptReleaseContext.assert_called_once_with(
@ -188,9 +190,9 @@ class CryptoAPICertManagerTests(unittest.TestCase):
mock_uuid4, mock_get_cert_thumprint,
mock_generate_key,
mock_add_system_time_interval,
mock_malloc, mock_free, certstr,
certificate, enhanced_key, store_handle,
context_to_store):
mock_malloc, mock_free,
certstr, certificate, enhanced_key,
store_handle, context_to_store):
mock_POINTER = self._ctypes.POINTER
mock_byref = self._ctypes.byref
@ -202,8 +204,8 @@ class CryptoAPICertManagerTests(unittest.TestCase):
mock_CertStrToName.return_value = certstr
mock_CertOpenStore.return_value = store_handle
mock_CertAddCertificateContextToStore.return_value = context_to_store
if (certstr is None or certificate is None or enhanced_key is None
or store_handle is None or context_to_store is None):
if (certstr is None or certificate is None or enhanced_key is None or
store_handle is None or context_to_store is None):
self.assertRaises(self.x509.cryptoapi.CryptoAPIException,
self._x509_manager.create_self_signed_cert,
'fake subject', 10, True,
@ -239,7 +241,8 @@ class CryptoAPICertManagerTests(unittest.TestCase):
mock_CertCreateSelfSignCertificate())
mock_free.assert_called_once_with(mock_cast())
self.assertEqual(mock_get_cert_thumprint.return_value, response)
self.assertEqual(response,
mock_get_cert_thumprint.return_value)
mock_generate_key.assert_called_once_with('fake_name', True)
@ -435,3 +438,265 @@ class CryptoAPICertManagerTests(unittest.TestCase):
def test_import_cert_CertGetNameString_fail(self):
self._test_import_cert(crypttstr=True, store_handle='fake handle',
add_enc_cert='fake encoded cert', upn_len=3)
@mock.patch('cloudbaseinit.utils.windows.cryptoapi.'
'CertAddCertificateContextToStore')
@mock.patch('cloudbaseinit.utils.windows.cryptoapi.CertOpenStore')
@mock.patch('cloudbaseinit.utils.windows.cryptoapi.'
'CertFindCertificateInStore')
@mock.patch('cloudbaseinit.utils.windows.cryptoapi.'
'CertFreeCertificateContext')
@mock.patch('cloudbaseinit.utils.windows.cryptoapi.CertCloseStore')
@mock.patch('cloudbaseinit.utils.windows.cryptoapi.PFXImportCertStore')
@mock.patch('ctypes.pointer')
@mock.patch('ctypes.POINTER')
@mock.patch('ctypes.cast')
@mock.patch('cloudbaseinit.utils.windows.cryptoapi.CRYPTOAPI_BLOB')
def _test_import_pfx_certificate(self, mock_blob, mock_cast, mock_POINTER,
mock_pointer, mock_import_cert_store,
mock_cert_close_store,
mock_cert_free_context,
mock_find_cert_in_store,
mock_cert_open_store, mock_add_cert_store,
import_store_handle, cert_context_p,
store_handle, machine_keyset=True,
add_cert_to_store=True):
self.x509.cryptoapi.CERT_SYSTEM_STORE_LOCAL_MACHINE = \
mock.sentinel.local_machine
self.x509.cryptoapi.CERT_SYSTEM_STORE_CURRENT_USER = \
mock.sentinel.current_user
self.x509.cryptoapi.CERT_STORE_PROV_SYSTEM = \
mock.sentinel.store_prov_system
self.x509.cryptoapi.CERT_STORE_ADD_REPLACE_EXISTING = \
mock.sentinel.cert_add_replace_existing
if import_store_handle:
import_store_handle = mock.sentinel.import_store_handle
if cert_context_p:
cert_context_p = mock.sentinel.cert_context_p
if store_handle:
store_handle = mock.sentinel.store_handle
mock_blob.return_value = mock.sentinel.pfx_blob
mock_import_cert_store.return_value = import_store_handle
mock_find_cert_in_store.return_value = cert_context_p
mock_cert_open_store.return_value = store_handle
mock_add_cert_store.return_value = add_cert_to_store
if (not import_store_handle or not cert_context_p or
not store_handle or not add_cert_to_store):
with self.assertRaises(self.x509.cryptoapi.CryptoAPIException):
self._x509_manager.import_pfx_certificate(
str(mock.sentinel.pfx_data), machine_keyset=machine_keyset)
else:
self._x509_manager.import_pfx_certificate(
str(mock.sentinel.pfx_data), machine_keyset=machine_keyset)
mock_blob.assert_called_once_with()
mock_cast.assert_called_with(
str(mock.sentinel.pfx_data), mock_POINTER.return_value)
mock_import_cert_store.assert_called_with(
mock_pointer.return_value, None, 0)
mock_pointer.assert_called_once_with(mock_blob.return_value)
if import_store_handle:
if cert_context_p:
if machine_keyset:
flags = mock.sentinel.local_machine
else:
flags = mock.sentinel.current_user
mock_cert_open_store.assert_called_once_with(
mock.sentinel.store_prov_system, 0, 0, flags,
six.text_type(self.x509.STORE_NAME_MY))
if store_handle:
mock_add_cert_store.assert_called_once_with(
mock_cert_open_store.return_value, cert_context_p,
mock.sentinel.cert_add_replace_existing, None)
call_args = []
if import_store_handle:
call_args += [mock.call(import_store_handle, 0)]
elif store_handle:
call_args += [mock.call(store_handle, 0)]
mock_cert_close_store.assert_has_calls(call_args)
if cert_context_p:
mock_cert_free_context.assert_called_once_with(cert_context_p)
def test_import_pfx_certificate_no_import_store_handle(self):
self._test_import_pfx_certificate(
import_store_handle=None, cert_context_p=None, store_handle=None)
def test_import_pfx_certificate_no_cert_context_p(self):
self._test_import_pfx_certificate(
import_store_handle=True, cert_context_p=None, store_handle=None)
def test_import_pfx_certificate_no_store_handle(self):
self._test_import_pfx_certificate(
import_store_handle=True, cert_context_p=True, store_handle=None)
def test_import_pfx_certificate_not_added(self):
self._test_import_pfx_certificate(
import_store_handle=True, cert_context_p=True, store_handle=True,
add_cert_to_store=False)
def test_import_pfx_certificate(self):
self._test_import_pfx_certificate(
import_store_handle=True, cert_context_p=True, store_handle=True,
machine_keyset=False)
def test_get_thumbprint_buffer(self):
mock_result = mock.Mock()
mock_result.contents = mock.sentinel.contents
self._ctypes.cast = mock.Mock(return_value=mock_result)
thumbprint_str = '5c5350ff'
result = self._x509_manager._get_thumbprint_buffer(
thumbprint_str)
self.assertEqual(result, mock_result.contents)
@mock.patch('cloudbaseinit.utils.windows.cryptoapi.CertCloseStore')
@mock.patch('cloudbaseinit.utils.windows.cryptoapi.'
'CertFindCertificateInStore')
@mock.patch('cloudbaseinit.utils.windows.cryptoapi.CertOpenStore')
@mock.patch('cloudbaseinit.utils.windows.cryptoapi.CRYPTOAPI_BLOB')
def _test_find_certificate_in_store(self, mock_blob, mock_OpenStore,
mock_FindCertificateInStore,
mock_CloseStore, machine_keyset=True,
store_handle=True,
cert_context_p=True):
self._x509_manager._get_thumbprint_buffer = mock.Mock()
(self._x509_manager._get_thumbprint_buffer.
return_value) = str(mock.sentinel.thumbprint)
mock_blob.return_value = mock.Mock()
mock_OpenStore.return_value = store_handle
mock_FindCertificateInStore.return_value = cert_context_p
if not store_handle or not cert_context_p:
with self.assertRaises(self.x509.cryptoapi.CryptoAPIException):
self._x509_manager._find_certificate_in_store(
mock.sentinel.thumbprint_str, machine_keyset)
else:
result = self._x509_manager._find_certificate_in_store(
mock.sentinel.thumbprint_str, machine_keyset)
self.assertEqual(result, cert_context_p)
self._x509_manager._get_thumbprint_buffer.assert_called_once_with(
mock.sentinel.thumbprint_str)
mock_blob.assert_called_once_with()
if store_handle:
mock_CloseStore.assert_called_once_with(store_handle, 0)
def test_find_certificate_in_store(self):
self._test_find_certificate_in_store(machine_keyset=None)
def test_find_certificate_in_store_no_store_handle(self):
self._test_find_certificate_in_store(store_handle=False)
def test_find_certificate_in_store_no_cert_context_p(self):
self._test_find_certificate_in_store(cert_context_p=False)
@mock.patch('cloudbaseinit.utils.windows.cryptoapi.'
'CertFreeCertificateContext')
@mock.patch('cloudbaseinit.utils.windows.cryptoapi.'
'CertDeleteCertificateFromStore')
def _test_delete_certificate_from_store(self, mock_delete_cert,
mock_free_cert,
cert_context_p=True,
delete_cert=True):
self._x509_manager._find_certificate_in_store = mock.Mock()
(self._x509_manager._find_certificate_in_store.
return_value) = cert_context_p
mock_delete_cert.return_value = delete_cert
if not cert_context_p or not delete_cert:
with self.assertRaises(self.x509.cryptoapi.CryptoAPIException):
self._x509_manager.delete_certificate_from_store(
mock.sentinel.thumbprint_str, mock.sentinel.machine_keyset,
mock.sentinel.store_name)
else:
self._x509_manager.delete_certificate_from_store(
mock.sentinel.thumbprint_str, mock.sentinel.machine_keyset,
mock.sentinel.store_name)
self._x509_manager._find_certificate_in_store.assert_called_once_with(
mock.sentinel.thumbprint_str, mock.sentinel.machine_keyset,
mock.sentinel.store_name)
if not cert_context_p:
self.assertEqual(mock_delete_cert.call_count, 0)
else:
mock_free_cert.assert_called_once_with(cert_context_p)
def test_delete_certificate_from_store(self):
self._test_delete_certificate_from_store()
def test_delete_certificate_from_store_no_cert_context_p(self):
self._test_delete_certificate_from_store(cert_context_p=False)
def test_delete_certificate_from_store_delete_cert_failed(self):
self._test_delete_certificate_from_store(delete_cert=False)
@mock.patch('cloudbaseinit.utils.windows.cryptoapi.'
'CertFreeCertificateContext')
@mock.patch('cloudbaseinit.utils.windows.cryptoapi.'
'CRYPT_DECRYPT_MESSAGE_PARA')
@mock.patch('cloudbaseinit.utils.windows.cryptoapi.'
'CertCloseStore')
@mock.patch('cloudbaseinit.utils.windows.cryptoapi.'
'CertDeleteCertificateFromStore')
@mock.patch('cloudbaseinit.utils.windows.cryptoapi.'
'CryptDecryptMessage')
@mock.patch('cloudbaseinit.utils.windows.cryptoapi.'
'CertAddCertificateLinkToStore')
@mock.patch('cloudbaseinit.utils.windows.cryptoapi.'
'CertOpenStore')
@mock.patch('cloudbaseinit.utils.windows.cryptoapi.'
'CryptStringToBinaryW')
def _test_decode_pkcs7_base64_blob(self, mock_StringToBinary,
mock_OpenStore, mock_AddCert,
mock_Decrypt, mock_DeleteCert,
mock_CloseStore, mock_decrypt_para,
mock_FreeCert,
string_to_binary_data=True,
store_handle=True,
string_to_binary_data_value=True,
add_cert=True, decrypt_by_ref=True,
decrypt_by_pointer=True):
data = str(mock.sentinel.data)
self._x509_manager._find_certificate_in_store = mock.Mock()
mock_StringToBinary.side_effect = [
string_to_binary_data, string_to_binary_data_value]
mock_OpenStore.return_value = store_handle
mock_AddCert.return_value = add_cert
mock_Decrypt.side_effect = [decrypt_by_ref, decrypt_by_pointer]
if (string_to_binary_data and store_handle and add_cert and
string_to_binary_data_value and decrypt_by_ref and
decrypt_by_pointer):
result = self._x509_manager.decode_pkcs7_base64_blob(
data, mock.sentinel.thumbprint_str,
mock.sentinel.machine_keyset, mock.sentinel.store_name)
self.assertEqual(
result, bytes(self._ctypes.create_string_buffer.return_value))
else:
with self.assertRaises(self.x509.cryptoapi.CryptoAPIException):
self._x509_manager.decode_pkcs7_base64_blob(
data, mock.sentinel.thumbprint_str,
mock.sentinel.machine_keyset, mock.sentinel.store_name)
def test_decode_pkcs7_base64_blob(self):
self._test_decode_pkcs7_base64_blob()
def test_decode_pkcs7_base64_blob_encrypt_data_fails(self):
self._test_decode_pkcs7_base64_blob(string_to_binary_data=False)
def test_decode_pkcs7_base64_blob_no_store_handle(self):
self._test_decode_pkcs7_base64_blob(store_handle=False)
def test_decode_pkcs7_base64_blob_encrypt_data_value_fails(self):
self._test_decode_pkcs7_base64_blob(string_to_binary_data_value=False)
def test_decode_pkcs7_base64_blob_add_certificate_fails(self):
self._test_decode_pkcs7_base64_blob(add_cert=False)
def test_decode_pkcs7_base64_blob_decrypt_by_ref_fails(self):
self._test_decode_pkcs7_base64_blob(decrypt_by_ref=False)
def test_decode_pkcs7_base64_blob_decrypt_by_pointer_fails(self):
self._test_decode_pkcs7_base64_blob(decrypt_by_pointer=False)

View File

@ -38,3 +38,10 @@ def write_file(target_path, data, mode='wb'):
with open(target_path, mode) as f:
f.write(data)
def hex_to_bytes(value):
if six.PY2:
return value.decode("hex")
else:
return bytes.fromhex(value)

View File

@ -94,20 +94,49 @@ class CRYPT_KEY_PROV_INFO(ctypes.Structure):
]
class CRYPT_DECRYPT_MESSAGE_PARA(ctypes.Structure):
_fields_ = [
('cbSize', wintypes.DWORD),
('dwMsgAndCertEncodingType', wintypes.DWORD),
('cCertStore', wintypes.DWORD),
('rghCertStore', ctypes.POINTER(wintypes.HANDLE)),
('dwFlags', wintypes.DWORD),
]
class CERT_KEY_CONTEXT(ctypes.Structure):
_fields_ = [
('cbSize', wintypes.DWORD),
('hNCryptKey', wintypes.HANDLE),
('dwKeySpec', wintypes.DWORD),
]
AT_KEYEXCHANGE = 1
AT_SIGNATURE = 2
CERT_NAME_UPN_TYPE = 8
CERT_SHA1_HASH_PROP_ID = 3
CERT_STORE_ADD_REPLACE_EXISTING = 3
CERT_STORE_PROV_MEMORY = wintypes.LPSTR(2)
CERT_STORE_PROV_SYSTEM = wintypes.LPSTR(10)
CERT_SYSTEM_STORE_CURRENT_USER = 65536
CERT_SYSTEM_STORE_LOCAL_MACHINE = 131072
CERT_SYSTEM_STORE_CURRENT_USER = 0x10000
CERT_SYSTEM_STORE_LOCAL_MACHINE = 0x20000
CERT_X500_NAME_STR = 3
CERT_STORE_ADD_NEW = 1
CERT_STORE_OPEN_EXISTING_FLAG = 0x4000
CERT_STORE_CREATE_NEW_FLAG = 0x2000
CRYPT_SILENT = 64
CRYPT_MACHINE_KEYSET = 32
CRYPT_NEWKEYSET = 8
CRYPT_STRING_BASE64 = 1
PKCS_7_ASN_ENCODING = 65536
PKCS_7_ASN_ENCODING = 0x10000
PROV_RSA_FULL = 1
X509_ASN_ENCODING = 1
CERT_FIND_ANY = 0
CERT_FIND_SHA1_HASH = 0x10000
CERT_KEY_PROV_INFO_PROP_ID = 2
CERT_KEY_CONTEXT_PROP_ID = 5
szOID_PKIX_KP_SERVER_AUTH = b"1.3.6.1.5.5.7.3.1"
szOID_RSA_SHA1RSA = b"1.2.840.113549.1.1.5"
@ -243,3 +272,61 @@ crypt32.CertGetCertificateContextProperty.argtypes = [
ctypes.c_void_p,
ctypes.POINTER(wintypes.DWORD)]
CertGetCertificateContextProperty = crypt32.CertGetCertificateContextProperty
crypt32.CryptBinaryToStringW.restype = wintypes.BOOL
crypt32.CryptBinaryToStringW.argtypes = [
ctypes.c_void_p,
wintypes.DWORD,
wintypes.DWORD,
wintypes.LPWSTR,
ctypes.POINTER(wintypes.DWORD)]
CryptBinaryToString = crypt32.CryptBinaryToStringW
crypt32.CryptDecryptMessage.restype = wintypes.BOOL
crypt32.CryptDecryptMessage.argtypes = [
ctypes.POINTER(CRYPT_DECRYPT_MESSAGE_PARA),
ctypes.c_void_p,
wintypes.DWORD,
ctypes.c_void_p,
ctypes.POINTER(wintypes.DWORD),
ctypes.c_void_p]
CryptDecryptMessage = crypt32.CryptDecryptMessage
crypt32.CertAddCertificateLinkToStore.restype = wintypes.BOOL
crypt32.CertAddCertificateLinkToStore.argtypes = [
wintypes.HANDLE,
ctypes.POINTER(CERT_CONTEXT),
wintypes.DWORD,
ctypes.c_void_p
]
CertAddCertificateLinkToStore = crypt32.CertAddCertificateLinkToStore
crypt32.CertFindCertificateInStore.restype = ctypes.POINTER(CERT_CONTEXT)
crypt32.CertFindCertificateInStore.argtypes = [
wintypes.HANDLE,
wintypes.DWORD,
wintypes.DWORD,
wintypes.DWORD,
ctypes.c_void_p,
ctypes.c_void_p]
CertFindCertificateInStore = crypt32.CertFindCertificateInStore
crypt32.CertSetCertificateContextProperty.restype = wintypes.BOOL
crypt32.CertSetCertificateContextProperty.argtypes = [
ctypes.POINTER(CERT_CONTEXT),
wintypes.DWORD,
wintypes.DWORD,
ctypes.c_void_p]
CertSetCertificateContextProperty = crypt32.CertSetCertificateContextProperty
crypt32.PFXImportCertStore.restype = wintypes.HANDLE
crypt32.PFXImportCertStore.argtypes = [
ctypes.POINTER(CRYPTOAPI_BLOB),
wintypes.LPCWSTR,
wintypes.DWORD]
PFXImportCertStore = crypt32.PFXImportCertStore
crypt32.CertDeleteCertificateFromStore.restype = wintypes.BOOL
crypt32.CertDeleteCertificateFromStore.argtypes = [
ctypes.POINTER(CERT_CONTEXT)]
CertDeleteCertificateFromStore = crypt32.CertDeleteCertificateFromStore

View File

@ -19,6 +19,7 @@ import uuid
import six
from cloudbaseinit.utils import encoding
from cloudbaseinit.utils.windows import cryptoapi
from cloudbaseinit.utils import x509constants
@ -40,6 +41,26 @@ X509_END_DATE_INTERVAL = 10 * 365 * 24 * 60 * 60 * 10000000
class CryptoAPICertManager(object):
@staticmethod
def _get_thumprint_str(thumbprint, size):
thumbprint_ar = ctypes.cast(
thumbprint,
ctypes.POINTER(ctypes.c_ubyte *
size)).contents
thumbprint_str = ""
for b in thumbprint_ar:
thumbprint_str += "%02x" % b
return thumbprint_str
@staticmethod
def _get_thumbprint_buffer(thumbprint_str):
thumbprint_bytes = encoding.hex_to_bytes(thumbprint_str)
return ctypes.cast(
ctypes.create_string_buffer(thumbprint_bytes),
ctypes.POINTER(wintypes.BYTE *
len(thumbprint_bytes))).contents
def _get_cert_thumprint(self, cert_context_p):
thumbprint = None
@ -61,15 +82,7 @@ class CryptoAPICertManager(object):
thumbprint, ctypes.byref(thumprint_len)):
raise cryptoapi.CryptoAPIException()
thumbprint_ar = ctypes.cast(
thumbprint,
ctypes.POINTER(ctypes.c_ubyte *
thumprint_len.value)).contents
thumbprint_str = ""
for b in thumbprint_ar:
thumbprint_str += "%02x" % b
return thumbprint_str
return self._get_thumprint_str(thumbprint, thumprint_len.value)
finally:
if thumbprint:
free(thumbprint)
@ -100,9 +113,12 @@ class CryptoAPICertManager(object):
# RSA 2048 bits
if not cryptoapi.CryptGenKey(crypt_prov_handle,
cryptoapi.AT_SIGNATURE,
0x08000000, key_handle):
cryptoapi.AT_KEYEXCHANGE,
0x08000000,
ctypes.byref(key_handle)):
raise cryptoapi.CryptoAPIException()
return key_handle
finally:
if key_handle:
cryptoapi.CryptDestroyKey(key_handle)
@ -171,7 +187,7 @@ class CryptoAPICertManager(object):
key_prov_info.dwProvType = cryptoapi.PROV_RSA_FULL
key_prov_info.cProvParam = None
key_prov_info.rgProvParam = None
key_prov_info.dwKeySpec = cryptoapi.AT_SIGNATURE
key_prov_info.dwKeySpec = cryptoapi.AT_KEYEXCHANGE
if machine_keyset:
key_prov_info.dwFlags = cryptoapi.CRYPT_MACHINE_KEYSET
@ -243,6 +259,190 @@ class CryptoAPICertManager(object):
cert_data = cert_data.replace(remove, "")
return cert_data
def _find_certificate_in_store(self, thumbprint_str, machine_keyset=True,
store_name=STORE_NAME_MY):
store_handle = None
thumbprint = self._get_thumbprint_buffer(thumbprint_str)
hash_blob = cryptoapi.CRYPTOAPI_BLOB()
hash_blob.cbData = len(thumbprint)
hash_blob.pbData = thumbprint
try:
flags = cryptoapi.CERT_STORE_OPEN_EXISTING_FLAG
if machine_keyset:
flags |= cryptoapi.CERT_SYSTEM_STORE_LOCAL_MACHINE
else:
flags |= cryptoapi.CERT_SYSTEM_STORE_CURRENT_USER
store_handle = cryptoapi.CertOpenStore(
cryptoapi.CERT_STORE_PROV_SYSTEM, 0, 0, flags,
six.text_type(store_name))
if not store_handle:
raise cryptoapi.CryptoAPIException()
cert_context_p = cryptoapi.CertFindCertificateInStore(
store_handle,
cryptoapi.X509_ASN_ENCODING | cryptoapi.PKCS_7_ASN_ENCODING,
0,
cryptoapi.CERT_FIND_SHA1_HASH,
ctypes.pointer(hash_blob),
None)
if not cert_context_p:
raise cryptoapi.CryptoAPIException()
return cert_context_p
finally:
if store_handle:
cryptoapi.CertCloseStore(store_handle, 0)
def delete_certificate_from_store(self, thumbprint_str,
machine_keyset=True,
store_name=STORE_NAME_MY):
cert_context_p = None
try:
cert_context_p = self._find_certificate_in_store(
thumbprint_str, machine_keyset, store_name)
if not cert_context_p:
raise cryptoapi.CryptoAPIException()
if not cryptoapi.CertDeleteCertificateFromStore(cert_context_p):
raise cryptoapi.CryptoAPIException()
finally:
if cert_context_p:
cryptoapi.CertFreeCertificateContext(cert_context_p)
def import_pfx_certificate(self, pfx_data, pfx_password=None,
machine_keyset=True, store_name=STORE_NAME_MY):
cert_context_p = None
import_store_handle = None
store_handle = None
try:
pfx_blob = cryptoapi.CRYPTOAPI_BLOB()
pfx_blob.cbData = len(pfx_data)
pfx_blob.pbData = ctypes.cast(
pfx_data, ctypes.POINTER(wintypes.BYTE))
import_store_handle = cryptoapi.PFXImportCertStore(
ctypes.pointer(pfx_blob), pfx_password, 0)
if not import_store_handle:
raise cryptoapi.CryptoAPIException()
cert_context_p = cryptoapi.CertFindCertificateInStore(
import_store_handle,
cryptoapi.X509_ASN_ENCODING | cryptoapi.PKCS_7_ASN_ENCODING,
0, cryptoapi.CERT_FIND_ANY, None, None)
if not cert_context_p:
raise cryptoapi.CryptoAPIException()
if machine_keyset:
flags = cryptoapi.CERT_SYSTEM_STORE_LOCAL_MACHINE
else:
flags = cryptoapi.CERT_SYSTEM_STORE_CURRENT_USER
store_handle = cryptoapi.CertOpenStore(
cryptoapi.CERT_STORE_PROV_SYSTEM, 0, 0, flags,
six.text_type(store_name))
if not store_handle:
raise cryptoapi.CryptoAPIException()
if not cryptoapi.CertAddCertificateContextToStore(
store_handle, cert_context_p,
cryptoapi.CERT_STORE_ADD_REPLACE_EXISTING, None):
raise cryptoapi.CryptoAPIException()
finally:
if import_store_handle:
cryptoapi.CertCloseStore(import_store_handle, 0)
if cert_context_p:
cryptoapi.CertFreeCertificateContext(cert_context_p)
if store_handle:
cryptoapi.CertCloseStore(store_handle, 0)
def decode_pkcs7_base64_blob(self, data, thumbprint_str,
machine_keyset=True,
store_name=STORE_NAME_MY):
base64_data = data.replace('\r', '').replace('\n', '')
store_handle = None
cert_context_p = None
try:
data_encoded_len = wintypes.DWORD()
if not cryptoapi.CryptStringToBinaryW(
base64_data, len(base64_data),
cryptoapi.CRYPT_STRING_BASE64,
None, ctypes.byref(data_encoded_len),
None, None):
raise cryptoapi.CryptoAPIException()
data_encoded = ctypes.cast(
ctypes.create_string_buffer(data_encoded_len.value),
ctypes.POINTER(wintypes.BYTE))
if not cryptoapi.CryptStringToBinaryW(
base64_data, len(base64_data),
cryptoapi.CRYPT_STRING_BASE64,
data_encoded, ctypes.byref(data_encoded_len),
None, None):
raise cryptoapi.CryptoAPIException()
store_handle = cryptoapi.CertOpenStore(
cryptoapi.CERT_STORE_PROV_MEMORY,
cryptoapi.X509_ASN_ENCODING | cryptoapi.PKCS_7_ASN_ENCODING,
None, cryptoapi.CERT_STORE_CREATE_NEW_FLAG, None)
if not store_handle:
raise cryptoapi.CryptoAPIException()
cert_context_p = self._find_certificate_in_store(
thumbprint_str, machine_keyset, store_name)
if not cryptoapi.CertAddCertificateLinkToStore(
store_handle, cert_context_p,
cryptoapi.CERT_STORE_ADD_NEW, None):
raise cryptoapi.CryptoAPIException()
para = cryptoapi.CRYPT_DECRYPT_MESSAGE_PARA()
para.cbSize = ctypes.sizeof(cryptoapi.CRYPT_DECRYPT_MESSAGE_PARA)
para.dwMsgAndCertEncodingType = (cryptoapi.X509_ASN_ENCODING |
cryptoapi.PKCS_7_ASN_ENCODING)
para.cCertStore = 1
para.rghCertStore = ctypes.pointer(wintypes.HANDLE(store_handle))
para.dwFlags = cryptoapi.CRYPT_SILENT
data_decoded_len = wintypes.DWORD()
if not cryptoapi.CryptDecryptMessage(
ctypes.byref(para),
data_encoded,
data_encoded_len,
None,
ctypes.byref(data_decoded_len),
None):
raise cryptoapi.CryptoAPIException()
data_decoded_buf = ctypes.create_string_buffer(
data_decoded_len.value)
data_decoded = ctypes.cast(
data_decoded_buf, ctypes.POINTER(wintypes.BYTE))
if not cryptoapi.CryptDecryptMessage(
ctypes.pointer(para),
data_encoded,
data_encoded_len,
data_decoded,
ctypes.byref(data_decoded_len),
None):
raise cryptoapi.CryptoAPIException()
return bytes(data_decoded_buf)
finally:
if cert_context_p:
cryptoapi.CertFreeCertificateContext(cert_context_p)
if store_handle:
cryptoapi.CertCloseStore(store_handle, 0)
def import_cert(self, cert_data, machine_keyset=True,
store_name=STORE_NAME_MY):