From 627b9a8260f0959436def9373417524160188a63 Mon Sep 17 00:00:00 2001 From: Takashi Kajinami Date: Sat, 17 Feb 2024 11:57:06 +0900 Subject: [PATCH] Remove six from common module This is part of the steps to remove usage of six library, which is no longer needed since python 2 support was removed. Change-Id: I14ebd809b39079d06a8ecc8f747b6bb80d550acb --- magnum/common/cert_manager/cert_manager.py | 8 ++--- .../cert_manager/x509keypair_cert_manager.py | 5 ++-- magnum/common/exception.py | 7 ++--- magnum/common/short_id.py | 13 +++------ magnum/common/urlfetch.py | 2 +- magnum/common/utils.py | 3 +- magnum/common/x509/operations.py | 29 +++++++++---------- magnum/tests/unit/common/x509/test_sign.py | 4 +-- 8 files changed, 27 insertions(+), 44 deletions(-) diff --git a/magnum/common/cert_manager/cert_manager.py b/magnum/common/cert_manager/cert_manager.py index 639d501e53..d98746bae3 100644 --- a/magnum/common/cert_manager/cert_manager.py +++ b/magnum/common/cert_manager/cert_manager.py @@ -17,13 +17,10 @@ Certificate manager API """ import abc -import six - from magnum.common.x509 import operations -@six.add_metaclass(abc.ABCMeta) -class Cert(object): +class Cert(object, metaclass=abc.ABCMeta): """Base class to represent all certificates.""" @abc.abstractmethod @@ -52,8 +49,7 @@ class Cert(object): pass -@six.add_metaclass(abc.ABCMeta) -class CertManager(object): +class CertManager(object, metaclass=abc.ABCMeta): """Base Cert Manager Interface A Cert Manager is responsible for managing certificates for TLS. diff --git a/magnum/common/cert_manager/x509keypair_cert_manager.py b/magnum/common/cert_manager/x509keypair_cert_manager.py index 619c072a91..f3db3268ff 100644 --- a/magnum/common/cert_manager/x509keypair_cert_manager.py +++ b/magnum/common/cert_manager/x509keypair_cert_manager.py @@ -14,7 +14,6 @@ from magnum.common.cert_manager import cert_manager from magnum import objects -import six class Cert(cert_manager.Cert): @@ -59,9 +58,9 @@ class CertManager(cert_manager.CertManager): :returns: the UUID of the stored cert """ - if six.PY3 and isinstance(certificate, six.binary_type): + if isinstance(certificate, bytes): certificate = certificate.decode() - if six.PY3 and isinstance(private_key, six.binary_type): + if isinstance(private_key, bytes): private_key = private_key.decode() x509keypair = {'certificate': certificate, 'private_key': private_key, 'private_key_passphrase': private_key_passphrase, diff --git a/magnum/common/exception.py b/magnum/common/exception.py index 093bedd12d..52aa5f9561 100644 --- a/magnum/common/exception.py +++ b/magnum/common/exception.py @@ -24,7 +24,6 @@ import sys from keystoneclient import exceptions as keystone_exceptions from oslo_config import cfg from oslo_log import log as logging -import six import magnum.conf from magnum.i18n import _ @@ -104,9 +103,7 @@ class MagnumException(Exception): super(MagnumException, self).__init__(self.message) def __str__(self): - if six.PY3: - return self.message - return self.message.encode('utf-8') + return self.message def __unicode__(self): return self.message @@ -115,7 +112,7 @@ class MagnumException(Exception): if self.__class__.__name__.endswith('_Remote'): return self.args[0] else: - return six.text_type(self) + return str(self) class ObjectNotFound(MagnumException): diff --git a/magnum/common/short_id.py b/magnum/common/short_id.py index 5d0e9526ad..5746217212 100644 --- a/magnum/common/short_id.py +++ b/magnum/common/short_id.py @@ -19,8 +19,6 @@ The IDs each comprise 12 (lower-case) alphanumeric characters. import base64 import uuid -import six - from magnum.i18n import _ @@ -30,7 +28,7 @@ def _to_byte_string(value, num_bits): Padding is added at the end (i.e. after the least-significant bit) if required. """ - shifts = six.moves.xrange(num_bits - 8, -8, -8) + shifts = range(num_bits - 8, -8, -8) byte_at = lambda off: ((value >> off # noqa: E731 if off >= 0 else value << -off) & 0xff) return ''.join(chr(byte_at(offset)) for offset in shifts) @@ -41,7 +39,7 @@ def get_id(source_uuid): The supplied UUID must be a version 4 UUID object. """ - if isinstance(source_uuid, six.string_types): + if isinstance(source_uuid, str): source_uuid = uuid.UUID(source_uuid) if source_uuid.version != 4: raise ValueError(_('Invalid UUID version (%d)') % source_uuid.version) @@ -50,12 +48,9 @@ def get_id(source_uuid): # (see RFC4122, Section 4.4) random_bytes = _to_byte_string(source_uuid.time, 60) # The first 12 bytes (= 60 bits) of base32-encoded output is our data - encoded = base64.b32encode(six.b(random_bytes))[:12] + encoded = base64.b32encode(random_bytes.encode('latin-1'))[:12] - if six.PY3: - return encoded.lower().decode('utf-8') - else: - return encoded.lower() + return encoded.lower().decode('utf-8') def generate_id(): diff --git a/magnum/common/urlfetch.py b/magnum/common/urlfetch.py index e58be1434e..b4a365f125 100644 --- a/magnum/common/urlfetch.py +++ b/magnum/common/urlfetch.py @@ -12,11 +12,11 @@ # under the License. """Utility for fetching a resource (e.g. a manifest) from a URL.""" +import urllib from oslo_log import log as logging import requests from requests import exceptions -from six.moves import urllib from magnum.common import exception import magnum.conf diff --git a/magnum/common/utils.py b/magnum/common/utils.py index 5627cbf59d..a981822544 100644 --- a/magnum/common/utils.py +++ b/magnum/common/utils.py @@ -29,7 +29,6 @@ import netaddr from oslo_concurrency import processutils from oslo_log import log as logging from oslo_utils import netutils -import six from magnum.common import exception import magnum.conf @@ -164,7 +163,7 @@ def safe_rstrip(value, chars=None): :return: Stripped value. """ - if not isinstance(value, six.string_types): + if not isinstance(value, str): LOG.warning("Failed to remove trailing character. " "Returning original object. " "Supplied object is not a string: %s,", value) diff --git a/magnum/common/x509/operations.py b/magnum/common/x509/operations.py index f30e55dab8..7d8c5a8ebe 100644 --- a/magnum/common/x509/operations.py +++ b/magnum/common/x509/operations.py @@ -13,7 +13,6 @@ # under the License. import datetime -import six import uuid from cryptography.hazmat.primitives.asymmetric import rsa @@ -104,10 +103,10 @@ def _generate_certificate(issuer_name, subject_name, extensions, organization_name=None, ca_key=None, encryption_password=None, ca_key_password=None): - if not isinstance(subject_name, six.text_type): - subject_name = six.text_type(subject_name.decode('utf-8')) - if organization_name and not isinstance(organization_name, six.text_type): - organization_name = six.text_type(organization_name.decode('utf-8')) + if not isinstance(subject_name, str): + subject_name = subject_name.decode('utf-8') + if organization_name and not isinstance(organization_name, str): + organization_name = organization_name.decode('utf-8') private_key = rsa.generate_private_key( public_exponent=65537, @@ -132,8 +131,8 @@ def _generate_certificate(issuer_name, subject_name, extensions, csr = csr.sign(private_key, hashes.SHA256()) - if six.PY3 and isinstance(encryption_password, six.text_type): - encryption_password = encryption_password.encode() + if isinstance(encryption_password, str): + encryption_password = encryption_password.encode('latin-1') if encryption_password: encryption_algorithm = serialization.BestAvailableEncryption( @@ -161,10 +160,10 @@ def _generate_certificate(issuer_name, subject_name, extensions, def _load_pem_private_key(ca_key, ca_key_password=None): if not isinstance(ca_key, rsa.RSAPrivateKey): - if isinstance(ca_key, six.text_type): - ca_key = six.b(str(ca_key)) - if isinstance(ca_key_password, six.text_type): - ca_key_password = six.b(str(ca_key_password)) + if isinstance(ca_key, str): + ca_key = ca_key.encode('latin-1') + if isinstance(ca_key_password, str): + ca_key_password = ca_key_password.encode('latin-1') ca_key = serialization.load_pem_private_key( ca_key, @@ -188,11 +187,11 @@ def sign(csr, issuer_name, ca_key, ca_key_password=None, ca_key = _load_pem_private_key(ca_key, ca_key_password) - if not isinstance(issuer_name, six.text_type): - issuer_name = six.text_type(issuer_name.decode('utf-8')) + if not isinstance(issuer_name, str): + issuer_name = issuer_name.decode('utf-8') - if isinstance(csr, six.text_type): - csr = six.b(str(csr)) + if isinstance(csr, str): + csr = csr.encode('latin-1') if not isinstance(csr, x509.CertificateSigningRequest): try: csr = x509.load_pem_x509_csr(csr) diff --git a/magnum/tests/unit/common/x509/test_sign.py b/magnum/tests/unit/common/x509/test_sign.py index cc8c2f1976..f28f65f4b9 100644 --- a/magnum/tests/unit/common/x509/test_sign.py +++ b/magnum/tests/unit/common/x509/test_sign.py @@ -208,8 +208,7 @@ class TestX509(base.BaseTestCase): self.assertIsInstance(private_key, rsa.RSAPrivateKey) @mock.patch('cryptography.x509.load_pem_x509_csr') - @mock.patch('six.b') - def test_sign_with_unicode_csr(self, mock_six, mock_load_pem): + def test_sign_with_unicode_csr(self, mock_load_pem): ca_key = self._generate_private_key() private_key = self._generate_private_key() csr_obj = self._build_csr(private_key) @@ -219,7 +218,6 @@ class TestX509(base.BaseTestCase): mock_load_pem.return_value = csr_obj operations.sign(csr, self.issuer_name, ca_key, skip_validation=True) - mock_six.assert_called_once_with(csr) @mock.patch('cryptography.x509.load_pem_x509_csr') def test_sign_empty_chars(self, mock_load_pem):