Return serialized certificate on certificate creation

Return serialized certificate on certificate creation besides the
thumbprint

Change-Id: I7b00445675a1d118ef78e1bbcef550e22ae5325f
Co-Authored-By: Paula Madalina Crismaru <pcrismaru@cloudbasesolutions.com>
This commit is contained in:
Alessandro Pilotti 2017-05-17 14:21:46 +03:00 committed by Paula Madalina Crismaru
parent 341930eb91
commit 0a695a8aa3
4 changed files with 67 additions and 5 deletions

View File

@ -105,7 +105,7 @@ class ConfigWinRMListenerPlugin(base.BasePlugin):
def _create_self_signed_certificate(self):
LOG.info("Generating self signed certificate for WinRM HTTPS listener")
cert_manager = x509.CryptoAPICertManager()
cert_thumbprint = cert_manager.create_self_signed_cert(
cert_thumbprint, _ = cert_manager.create_self_signed_cert(
self._cert_subject)
return cert_thumbprint

View File

@ -200,7 +200,7 @@ class ConfigWinRMListenerPluginTests(unittest.TestCase):
mock_cert_mgr = mock.MagicMock()
mock_CryptoAPICertManager.return_value = mock_cert_mgr
mock_cert_mgr.create_self_signed_cert.return_value = \
mock.sentinel.cert_thumbprint
mock.sentinel.cert_thumbprint, mock.sentinel.cert_str
result = self._winrmlistener._create_self_signed_certificate()
self.assertEqual(result, mock.sentinel.cert_thumbprint)
mock_CryptoAPICertManager.assert_called_once_with()

View File

@ -145,6 +145,8 @@ class CryptoAPICertManagerTests(unittest.TestCase):
@mock.patch('cloudbaseinit.utils.windows.x509.free')
@mock.patch('cloudbaseinit.utils.windows.x509.malloc')
@mock.patch('cloudbaseinit.utils.windows.x509.CryptoAPICertManager.'
'_get_cert_str')
@mock.patch('cloudbaseinit.utils.windows.x509.CryptoAPICertManager'
'._add_system_time_interval')
@mock.patch('cloudbaseinit.utils.windows.x509.CryptoAPICertManager'
@ -190,6 +192,7 @@ class CryptoAPICertManagerTests(unittest.TestCase):
mock_uuid4, mock_get_cert_thumprint,
mock_generate_key,
mock_add_system_time_interval,
mock_get_cert_str,
mock_malloc, mock_free,
certstr, certificate, enhanced_key,
store_handle, context_to_store):
@ -241,8 +244,8 @@ class CryptoAPICertManagerTests(unittest.TestCase):
mock_CertCreateSelfSignCertificate())
mock_free.assert_called_once_with(mock_cast())
self.assertEqual(response,
mock_get_cert_thumprint.return_value)
self.assertEqual((mock_get_cert_thumprint.return_value,
mock_get_cert_str.return_value), response)
mock_generate_key.assert_called_once_with('fake_name', True)
@ -700,3 +703,41 @@ class CryptoAPICertManagerTests(unittest.TestCase):
def test_decode_pkcs7_base64_blob_decrypt_by_pointer_fails(self):
self._test_decode_pkcs7_base64_blob(decrypt_by_pointer=False)
@mock.patch('cloudbaseinit.utils.windows.cryptoapi.'
'CryptBinaryToString')
def _test_get_cert_str(self, mock_CryptBinaryToString, works):
mock_DWORD = self._ctypes.wintypes.DWORD
mock_DWORD.return_value = mock.Mock()
mock_cert_context_p = mock.Mock()
if not all(works):
mock_CryptBinaryToString.side_effect = works
with self.assertRaises(self.x509.cryptoapi.CryptoAPIException):
self._x509_manager._get_cert_str(mock_cert_context_p)
else:
mock_create_unicode_buffer = self._ctypes.create_unicode_buffer
mock_cer_str = mock.Mock()
mock_create_unicode_buffer.return_value = mock_cer_str
result = self._x509_manager._get_cert_str(mock_cert_context_p)
self.assertEqual(result, mock_cer_str.value)
mock_DWORD.assert_called_once_with(0)
calls = [mock.call(mock_cert_context_p.contents.pbCertEncoded,
mock_cert_context_p.contents.cbCertEncoded,
self.x509.cryptoapi.CRYPT_STRING_BASE64,
None, self._ctypes.byref.return_value)]
if all(works):
calls += [mock.call(mock_cert_context_p.contents.pbCertEncoded,
mock_cert_context_p.contents.cbCertEncoded,
self.x509.cryptoapi.CRYPT_STRING_BASE64,
mock_create_unicode_buffer.return_value,
self._ctypes.byref.return_value)]
self.assertTrue(calls, mock_CryptBinaryToString.calls)
def test_get_cert_str_fails(self):
self._test_get_cert_str(works=[False, False])
def test_get_cert_str_fails_2(self):
self._test_get_cert_str(works=[False, True])
def test_get_cert_str(self):
self._test_get_cert_str(works=[True, True])

View File

@ -236,7 +236,8 @@ class CryptoAPICertManager(object):
cryptoapi.CERT_STORE_ADD_REPLACE_EXISTING, None):
raise cryptoapi.CryptoAPIException()
return self._get_cert_thumprint(cert_context_p)
return (self._get_cert_thumprint(cert_context_p),
self._get_cert_str(cert_context_p))
finally:
if store_handle:
@ -246,6 +247,26 @@ class CryptoAPICertManager(object):
if subject_encoded:
free(subject_encoded)
def _get_cert_str(self, cert_context_p):
ch_cer_str = wintypes.DWORD(0)
if not cryptoapi.CryptBinaryToString(
cert_context_p.contents.pbCertEncoded,
cert_context_p.contents.cbCertEncoded,
cryptoapi.CRYPT_STRING_BASE64,
None, ctypes.byref(ch_cer_str)):
raise cryptoapi.CryptoAPIException()
cer_str = ctypes.create_unicode_buffer(ch_cer_str.value)
if not cryptoapi.CryptBinaryToString(
cert_context_p.contents.pbCertEncoded,
cert_context_p.contents.cbCertEncoded,
cryptoapi.CRYPT_STRING_BASE64,
cer_str,
ctypes.byref(ch_cer_str)):
raise cryptoapi.CryptoAPIException()
return cer_str.value
def _get_cert_base64(self, cert_data):
"""Remove certificate header and footer and also new lines."""
# It's assured that the certificate is already a string.