Switch to cryptography from pycrypto

PyCrypto isn't active developed for quite a while, cryptography is
recommended instead. This patch does this migration, but still keeps
pycrytpo as a fallback solution.

Random generation is also migrated to os.urandom as the cryptography
document suggests:

Closes-Bug: #1749574

Change-Id: I5c0c1a238023c116af5a84d899e629f1c7c3513f
Co-Authored-By: Fan Zhang <zh.f@outlook.com>
Signed-off-by: Zhao Chao <zhaochao1984@gmail.com>
This commit is contained in:
Zhao Chao 2018-04-11 12:39:05 +08:00 committed by Fan Zhang
parent e96c0093cd
commit 46a031e765
5 changed files with 47 additions and 25 deletions

View File

@ -27,5 +27,6 @@ osprofiler>=0.3.0
oslo.concurrency>=1.8.0 # Apache-2.0
enum34;python_version=='2.7' or python_version=='2.6' or python_version=='3.3' # BSD
cryptography>=2.1.4 # BSD/Apache-2.0
pycrypto>=2.6 # Public Domain
xmltodict>=0.10.1 # MIT

View File

@ -26,5 +26,6 @@ oslo.utils>=1.1.0
enum34;python_version=='2.7' or python_version=='2.6' or python_version=='3.3' # BSD
cryptography>=2.1.4 # BSD/Apache-2.0
pycrypto>=2.6 # Public Domain
xmltodict>=0.10.1 # MIT

View File

@ -44,6 +44,7 @@ oslo.log>=3.36.0 # Apache-2.0
oslo.db>=4.27.0 # Apache-2.0
enum34>=1.0.4;python_version=='2.7' or python_version=='2.6' or python_version=='3.3' # BSD
xmltodict>=0.10.1 # MIT
cryptography>=2.1.4 # BSD/Apache-2.0
pycrypto>=2.6 # Public Domain
oslo.policy>=1.30.0 # Apache-2.0
diskimage-builder!=1.6.0,!=1.7.0,!=1.7.1,>=1.1.2 # Apache-2.0

View File

@ -16,18 +16,41 @@
# Encryption/decryption handling
from Crypto.Cipher import AES
from Crypto import Random
import hashlib
import os
from oslo_utils import encodeutils
import random
import six
import string
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.ciphers import algorithms
from cryptography.hazmat.primitives.ciphers import Cipher
from cryptography.hazmat.primitives.ciphers import modes
from trove.common import stream_codecs
def _get_cipher(key, iv):
_CRYPT_BACKEND = default_backend()
return Cipher(algorithms.AES(key), modes.CBC(iv),
def _encrypt(key, iv, data):
encryptor = _get_cipher(key, iv).encryptor()
return encryptor.update(data) + encryptor.finalize()
def _decrypt(key, iv, data):
decryptor = _get_cipher(key, iv).decryptor()
return decryptor.update(data) + decryptor.finalize()
def encode_data(data):
@ -42,7 +65,7 @@ def decode_data(data):
# Pad the data string to an multiple of pad_size
def pad_for_encryption(data, pad_size=IV_BIT_COUNT):
def pad_for_encryption(data, pad_size=IV_BYTE_COUNT):
pad_count = pad_size - (len(data) % pad_size)
return data + six.int2byte(pad_count) * pad_count
@ -52,24 +75,22 @@ def unpad_after_decryption(data):
return data[:len(data) - six.indexbytes(data, -1)]
def encrypt_data(data, key, iv_bit_count=IV_BIT_COUNT):
def encrypt_data(data, key, iv_byte_count=IV_BYTE_COUNT):
data = encodeutils.to_utf8(data)
key = encodeutils.to_utf8(key)
md5_key = hashlib.md5(key).hexdigest()
iv = Random.new().read(iv_bit_count)
iv = iv[:iv_bit_count]
aes = AES.new(md5_key, AES.MODE_CBC, iv)
data = pad_for_encryption(data, iv_bit_count)
encrypted = aes.encrypt(data)
md5_key = encodeutils.safe_encode(hashlib.md5(key).hexdigest())
iv = os.urandom(iv_byte_count)
iv = iv[:iv_byte_count]
data = pad_for_encryption(data, iv_byte_count)
encrypted = _encrypt(md5_key, bytes(iv), data)
return iv + encrypted
def decrypt_data(data, key, iv_bit_count=IV_BIT_COUNT):
def decrypt_data(data, key, iv_byte_count=IV_BYTE_COUNT):
key = encodeutils.to_utf8(key)
md5_key = hashlib.md5(key).hexdigest()
iv = data[:iv_bit_count]
aes = AES.new(md5_key, AES.MODE_CBC, bytes(iv))
decrypted = aes.decrypt(bytes(data[iv_bit_count:]))
md5_key = encodeutils.safe_encode(hashlib.md5(key).hexdigest())
iv = data[:iv_byte_count]
decrypted = _decrypt(md5_key, bytes(iv), bytes(data[iv_byte_count:]))
return unpad_after_decryption(decrypted)

View File

@ -14,8 +14,8 @@
# under the License.
from Crypto import Random
import mock
import os
import six
from trove.common import crypto_utils
@ -31,7 +31,7 @@ class TestEncryptUtils(trove_testtools.TestCase):
super(TestEncryptUtils, self).tearDown()
def test_encode_decode_string(self):
random_data = bytearray(Random.new().read(12))
random_data = bytearray(os.urandom(12))
data = [b'abc', b'numbers01234', b'\x00\xFF\x00\xFF\xFF\x00',
random_data, u'Unicode:\u20ac']
@ -47,8 +47,8 @@ class TestEncryptUtils(trove_testtools.TestCase):
for size in range(1, 100):
data_str = b'a' * size
padded_str = crypto_utils.pad_for_encryption(
data_str, crypto_utils.IV_BIT_COUNT)
self.assertEqual(0, len(padded_str) % crypto_utils.IV_BIT_COUNT,
data_str, crypto_utils.IV_BYTE_COUNT)
self.assertEqual(0, len(padded_str) % crypto_utils.IV_BYTE_COUNT,
"Padding not successful")
unpadded_str = crypto_utils.unpad_after_decryption(padded_str)
self.assertEqual(data_str, unpadded_str,
@ -57,7 +57,7 @@ class TestEncryptUtils(trove_testtools.TestCase):
def test_encryp_decrypt(self):
key = 'my_secure_key'
for size in range(1, 100):
orig_data = Random.new().read(size)
orig_data = os.urandom(size)
orig_encoded = crypto_utils.encode_data(orig_data)
encrypted = crypto_utils.encrypt_data(orig_encoded, key)
encoded = crypto_utils.encode_data(encrypted)
@ -71,11 +71,9 @@ class TestEncryptUtils(trove_testtools.TestCase):
def test_encrypt(self):
# test encrypt() with an hardcoded IV
key = 'my_secure_key'
salt = b'x' * crypto_utils.IV_BIT_COUNT
with mock.patch('Crypto.Random.new') as mock_random:
mock_random.return_value.read.return_value = salt
salt = b'x' * crypto_utils.IV_BYTE_COUNT
with mock.patch('os.urandom', return_value=salt):
for orig_data, expected in (
# byte string
(b'Hello World!',