diff --git a/cloudbaseinit/plugins/windows/winrmlistener.py b/cloudbaseinit/plugins/windows/winrmlistener.py index 34ee337f..01b4dc50 100644 --- a/cloudbaseinit/plugins/windows/winrmlistener.py +++ b/cloudbaseinit/plugins/windows/winrmlistener.py @@ -105,7 +105,7 @@ class ConfigWinRMListenerPlugin(base.BasePlugin): def _create_self_signed_certificate(self): LOG.info("Generating self signed certificate for WinRM HTTPS listener") cert_manager = x509.CryptoAPICertManager() - cert_thumbprint = cert_manager.create_self_signed_cert( + cert_thumbprint, _ = cert_manager.create_self_signed_cert( self._cert_subject) return cert_thumbprint diff --git a/cloudbaseinit/tests/plugins/windows/test_winrmlistener.py b/cloudbaseinit/tests/plugins/windows/test_winrmlistener.py index a36e9670..22092f01 100644 --- a/cloudbaseinit/tests/plugins/windows/test_winrmlistener.py +++ b/cloudbaseinit/tests/plugins/windows/test_winrmlistener.py @@ -200,7 +200,7 @@ class ConfigWinRMListenerPluginTests(unittest.TestCase): mock_cert_mgr = mock.MagicMock() mock_CryptoAPICertManager.return_value = mock_cert_mgr mock_cert_mgr.create_self_signed_cert.return_value = \ - mock.sentinel.cert_thumbprint + mock.sentinel.cert_thumbprint, mock.sentinel.cert_str result = self._winrmlistener._create_self_signed_certificate() self.assertEqual(result, mock.sentinel.cert_thumbprint) mock_CryptoAPICertManager.assert_called_once_with() diff --git a/cloudbaseinit/tests/utils/windows/test_x509.py b/cloudbaseinit/tests/utils/windows/test_x509.py index b30f1c27..55711392 100644 --- a/cloudbaseinit/tests/utils/windows/test_x509.py +++ b/cloudbaseinit/tests/utils/windows/test_x509.py @@ -145,6 +145,8 @@ class CryptoAPICertManagerTests(unittest.TestCase): @mock.patch('cloudbaseinit.utils.windows.x509.free') @mock.patch('cloudbaseinit.utils.windows.x509.malloc') + @mock.patch('cloudbaseinit.utils.windows.x509.CryptoAPICertManager.' + '_get_cert_str') @mock.patch('cloudbaseinit.utils.windows.x509.CryptoAPICertManager' '._add_system_time_interval') @mock.patch('cloudbaseinit.utils.windows.x509.CryptoAPICertManager' @@ -190,6 +192,7 @@ class CryptoAPICertManagerTests(unittest.TestCase): mock_uuid4, mock_get_cert_thumprint, mock_generate_key, mock_add_system_time_interval, + mock_get_cert_str, mock_malloc, mock_free, certstr, certificate, enhanced_key, store_handle, context_to_store): @@ -241,8 +244,8 @@ class CryptoAPICertManagerTests(unittest.TestCase): mock_CertCreateSelfSignCertificate()) mock_free.assert_called_once_with(mock_cast()) - self.assertEqual(response, - mock_get_cert_thumprint.return_value) + self.assertEqual((mock_get_cert_thumprint.return_value, + mock_get_cert_str.return_value), response) mock_generate_key.assert_called_once_with('fake_name', True) @@ -700,3 +703,41 @@ class CryptoAPICertManagerTests(unittest.TestCase): def test_decode_pkcs7_base64_blob_decrypt_by_pointer_fails(self): self._test_decode_pkcs7_base64_blob(decrypt_by_pointer=False) + + @mock.patch('cloudbaseinit.utils.windows.cryptoapi.' + 'CryptBinaryToString') + def _test_get_cert_str(self, mock_CryptBinaryToString, works): + mock_DWORD = self._ctypes.wintypes.DWORD + mock_DWORD.return_value = mock.Mock() + mock_cert_context_p = mock.Mock() + if not all(works): + mock_CryptBinaryToString.side_effect = works + with self.assertRaises(self.x509.cryptoapi.CryptoAPIException): + self._x509_manager._get_cert_str(mock_cert_context_p) + else: + mock_create_unicode_buffer = self._ctypes.create_unicode_buffer + mock_cer_str = mock.Mock() + mock_create_unicode_buffer.return_value = mock_cer_str + result = self._x509_manager._get_cert_str(mock_cert_context_p) + self.assertEqual(result, mock_cer_str.value) + mock_DWORD.assert_called_once_with(0) + calls = [mock.call(mock_cert_context_p.contents.pbCertEncoded, + mock_cert_context_p.contents.cbCertEncoded, + self.x509.cryptoapi.CRYPT_STRING_BASE64, + None, self._ctypes.byref.return_value)] + if all(works): + calls += [mock.call(mock_cert_context_p.contents.pbCertEncoded, + mock_cert_context_p.contents.cbCertEncoded, + self.x509.cryptoapi.CRYPT_STRING_BASE64, + mock_create_unicode_buffer.return_value, + self._ctypes.byref.return_value)] + self.assertTrue(calls, mock_CryptBinaryToString.calls) + + def test_get_cert_str_fails(self): + self._test_get_cert_str(works=[False, False]) + + def test_get_cert_str_fails_2(self): + self._test_get_cert_str(works=[False, True]) + + def test_get_cert_str(self): + self._test_get_cert_str(works=[True, True]) diff --git a/cloudbaseinit/utils/windows/x509.py b/cloudbaseinit/utils/windows/x509.py index 4556087d..ce6d75b2 100644 --- a/cloudbaseinit/utils/windows/x509.py +++ b/cloudbaseinit/utils/windows/x509.py @@ -236,7 +236,8 @@ class CryptoAPICertManager(object): cryptoapi.CERT_STORE_ADD_REPLACE_EXISTING, None): raise cryptoapi.CryptoAPIException() - return self._get_cert_thumprint(cert_context_p) + return (self._get_cert_thumprint(cert_context_p), + self._get_cert_str(cert_context_p)) finally: if store_handle: @@ -246,6 +247,26 @@ class CryptoAPICertManager(object): if subject_encoded: free(subject_encoded) + def _get_cert_str(self, cert_context_p): + ch_cer_str = wintypes.DWORD(0) + if not cryptoapi.CryptBinaryToString( + cert_context_p.contents.pbCertEncoded, + cert_context_p.contents.cbCertEncoded, + cryptoapi.CRYPT_STRING_BASE64, + None, ctypes.byref(ch_cer_str)): + raise cryptoapi.CryptoAPIException() + + cer_str = ctypes.create_unicode_buffer(ch_cer_str.value) + if not cryptoapi.CryptBinaryToString( + cert_context_p.contents.pbCertEncoded, + cert_context_p.contents.cbCertEncoded, + cryptoapi.CRYPT_STRING_BASE64, + cer_str, + ctypes.byref(ch_cer_str)): + raise cryptoapi.CryptoAPIException() + + return cer_str.value + def _get_cert_base64(self, cert_data): """Remove certificate header and footer and also new lines.""" # It's assured that the certificate is already a string.