Merge "Sub pycrypto with cryptography in simple_crypto"

This commit is contained in:
Jenkins 2017-05-16 14:30:12 +00:00 committed by Gerrit Code Review
commit 6eca39d353
2 changed files with 174 additions and 85 deletions

View File

@ -12,11 +12,13 @@
# limitations under the License.
import os
from Crypto.PublicKey import DSA
from Crypto.PublicKey import RSA
from Crypto.Util import asn1
from cryptography import fernet
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import dsa
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives import serialization
from oslo_config import cfg
from oslo_utils import encodeutils
import six
from barbican.common import config
@ -121,36 +123,50 @@ class SimpleCryptoPlugin(c.CryptoPluginBase):
- RSA, with passphrase (supported)
- RSA, without passphrase (supported)
- DSA, without passphrase (supported)
- DSA, with passphrase (not supported)
Note: PyCrypto is not capable of serializing DSA
keys and DER formatted keys. Such keys will be
serialized to Base64 PEM to store in DB.
TODO (atiwari/reaperhulk): PyCrypto is not capable to serialize
DSA keys and DER formatted keys, later we need to pick better
crypto lib.
- DSA, with passphrase (supported)
"""
if(generate_dto.algorithm is None or generate_dto
.algorithm.lower() == 'rsa'):
private_key = RSA.generate(
generate_dto.bit_length, None, None, 65537)
private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=generate_dto.bit_length,
backend=default_backend()
)
elif generate_dto.algorithm.lower() == 'dsa':
private_key = DSA.generate(generate_dto.bit_length, None, None)
private_key = dsa.generate_private_key(
key_size=generate_dto.bit_length,
backend=default_backend()
)
else:
raise c.CryptoPrivateKeyFailureException()
public_key = private_key.publickey()
public_key = private_key.public_key()
# Note (atiwari): key wrapping format PEM only supported
if generate_dto.algorithm.lower() == 'rsa':
public_key, private_key = self._wrap_key(public_key, private_key,
generate_dto.passphrase)
private_key = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=self._get_encryption_algorithm(
generate_dto.passphrase)
)
public_key = public_key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo
)
if generate_dto.algorithm.lower() == 'dsa':
if generate_dto.passphrase:
raise ValueError(u._('Passphrase not supported for DSA key'))
public_key, private_key = self._serialize_dsa_key(public_key,
private_key)
private_key = private_key.private_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=self._get_encryption_algorithm(
generate_dto.passphrase)
)
public_key = public_key.public_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PublicFormat.SubjectPublicKeyInfo
)
private_dto = self.encrypt(c.EncryptDTO(private_key),
kek_meta_dto,
project_id)
@ -186,29 +202,23 @@ class SimpleCryptoPlugin(c.CryptoPluginBase):
else:
return False
def _wrap_key(self, public_key, private_key,
passphrase):
pkcs = 8
key_wrap_format = 'PEM'
def _get_encryption_algorithm(self, passphrase):
"""Choose whether to use encryption or not based on passphrase
private_key = private_key.exportKey(key_wrap_format, passphrase, pkcs)
public_key = public_key.exportKey(key_wrap_format)
serialization.BestAvailableEncryption fails if passphrase is not
given or if less than one byte therefore we need to check if it is
valid or not
"""
if passphrase:
# encryption requires password in bytes format
algorithm = serialization.BestAvailableEncryption(
# default encoding is utf-8
encodeutils.safe_encode(passphrase)
)
else:
algorithm = serialization.NoEncryption()
return public_key, private_key
def _serialize_dsa_key(self, public_key, private_key):
pub_seq = asn1.DerSequence()
pub_seq[:] = [0, public_key.p, public_key.q,
public_key.g, public_key.y]
public_key = pub_seq.encode()
prv_seq = asn1.DerSequence()
prv_seq[:] = [0, private_key.p, private_key.q,
private_key.g, private_key.y, private_key.x]
private_key = prv_seq.encode()
return public_key, private_key
return algorithm
def _is_algorithm_supported(self, algorithm=None, bit_length=None):
"""check if algorithm and bit_length combination is supported."""

View File

@ -15,10 +15,9 @@
import os
from Crypto.PublicKey import DSA
from Crypto.PublicKey import RSA
from Crypto.Util import asn1
from cryptography import fernet
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
import mock
import six
@ -246,31 +245,6 @@ class WhenTestingSimpleCryptoPlugin(utils.BaseTestCase):
"RSA", 64)
)
def test_generate_512_bit_RSA_key(self):
generate_dto = plugin.GenerateDTO('rsa', 512, None, None)
kek_meta_dto = self._get_mocked_kek_meta_dto()
self.assertRaises(ValueError,
self.plugin.generate_asymmetric,
generate_dto,
kek_meta_dto,
mock.MagicMock())
def test_generate_2048_bit_DSA_key(self):
generate_dto = plugin.GenerateDTO('dsa', 2048, None, None)
kek_meta_dto = self._get_mocked_kek_meta_dto()
self.assertRaises(ValueError, self.plugin.generate_asymmetric,
generate_dto,
kek_meta_dto,
mock.MagicMock())
def test_generate_1024_bit_DSA_key_with_passphrase(self):
generate_dto = plugin.GenerateDTO('dsa', 1024, None, 'Passphrase')
kek_meta_dto = self._get_mocked_kek_meta_dto()
self.assertRaises(ValueError, self.plugin.generate_asymmetric,
generate_dto,
kek_meta_dto,
mock.MagicMock())
def test_generate_asymmetric_1024_bit_key(self):
generate_dto = plugin.GenerateDTO('rsa', 1024, None, None)
kek_meta_dto = self._get_mocked_kek_meta_dto()
@ -290,13 +264,35 @@ class WhenTestingSimpleCryptoPlugin(utils.BaseTestCase):
public_dto.kek_meta_extended,
mock.MagicMock())
public_dto = RSA.importKey(public_dto)
private_dto = RSA.importKey(private_dto)
self.assertEqual(1023, public_dto.size())
self.assertEqual(1023, private_dto.size())
self.assertTrue(private_dto.has_private)
# check we can reload the private and public keys
private_key = serialization.load_pem_private_key(
data=private_dto,
password=None,
backend=default_backend()
)
def test_generate_1024_bit_RSA_key_in_pem(self):
public_key = serialization.load_pem_public_key(
data=public_dto,
backend=default_backend()
)
self.assertEqual(1024, private_key.key_size)
self.assertEqual(1024, public_key.key_size)
public_key = public_key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.PKCS1
)
# get the public key from the private key we recovered to compare
recovered_key = private_key.public_key().public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.PKCS1
)
self.assertTrue(public_key == recovered_key)
def test_generate_1024_bit_RSA_key_with_passphrase(self):
generate_dto = plugin.GenerateDTO('rsa', 1024, None, 'changeme')
kek_meta_dto = self._get_mocked_kek_meta_dto()
@ -305,14 +301,96 @@ class WhenTestingSimpleCryptoPlugin(utils.BaseTestCase):
kek_meta_dto,
mock.MagicMock()
)
decrypt_dto = plugin.DecryptDTO(private_dto.cypher_text)
private_dto = self.plugin.decrypt(decrypt_dto,
kek_meta_dto,
private_dto.kek_meta_extended,
mock.MagicMock())
private_dto = RSA.importKey(private_dto, 'changeme')
self.assertTrue(private_dto.has_private())
decrypt_dto = plugin.DecryptDTO(public_dto.cypher_text)
public_dto = self.plugin.decrypt(decrypt_dto,
kek_meta_dto,
public_dto.kek_meta_extended,
mock.MagicMock())
# check we can reload the private and public keys
private_key = serialization.load_pem_private_key(
data=private_dto,
password='changeme'.encode(),
backend=default_backend()
)
public_key = serialization.load_pem_public_key(
data=public_dto,
backend=default_backend()
)
self.assertEqual(1024, private_key.key_size)
self.assertEqual(1024, public_key.key_size)
public_key = public_key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.PKCS1
)
# get the public key from the private key we recovered to compare
recovered_key = private_key.public_key().public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.PKCS1
)
self.assertTrue(public_key == recovered_key)
def test_generate_1024_bit_DSA_key_with_passphrase(self):
generate_dto = plugin.GenerateDTO('dsa', 1024, None, 'changeme')
kek_meta_dto = self._get_mocked_kek_meta_dto()
private_dto, public_dto, passwd_dto = self.plugin.generate_asymmetric(
generate_dto,
kek_meta_dto,
mock.MagicMock()
)
decrypt_dto = plugin.DecryptDTO(private_dto.cypher_text)
private_dto = self.plugin.decrypt(decrypt_dto,
kek_meta_dto,
private_dto.kek_meta_extended,
mock.MagicMock())
decrypt_dto = plugin.DecryptDTO(public_dto.cypher_text)
public_dto = self.plugin.decrypt(decrypt_dto,
kek_meta_dto,
public_dto.kek_meta_extended,
mock.MagicMock())
# check we can reload the private and public keys
private_key = serialization.load_der_private_key(
data=private_dto,
password='changeme'.encode(),
backend=default_backend()
)
public_key = serialization.load_der_public_key(
data=public_dto,
backend=default_backend()
)
self.assertEqual(1024, private_key.key_size)
self.assertEqual(1024, public_key.key_size)
public_key = public_key.public_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PublicFormat.SubjectPublicKeyInfo
)
# get the public key from the private key we recovered to compare
recovered_key = private_key.public_key().public_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PublicFormat.SubjectPublicKeyInfo
)
self.assertTrue(public_key == recovered_key)
def test_generate_1024_DSA_key_in_pem_and_reconstruct_key_der(self):
generate_dto = plugin.GenerateDTO('dsa', 1024, None, None)
@ -330,12 +408,13 @@ class WhenTestingSimpleCryptoPlugin(utils.BaseTestCase):
private_dto.kek_meta_extended,
mock.MagicMock())
prv_seq = asn1.DerSequence()
prv_seq.decode(private_dto)
p, q, g, y, x = prv_seq[1:]
private_key = serialization.load_der_private_key(
data=private_dto,
password=None,
backend=default_backend()
)
private_dto = DSA.construct((y, g, p, q, x))
self.assertTrue(private_dto.has_private())
self.assertEqual(1024, private_key.key_size)
def test_generate_128_bit_hmac_key(self):
secret = models.Secret()