Merge "pyupgrade changes for Python3.8+ (3)"

This commit is contained in:
Zuul 2024-04-15 09:22:31 +00:00 committed by Gerrit Code Review
commit 314f2b60dc
21 changed files with 80 additions and 80 deletions

View File

@ -61,7 +61,7 @@ class BarbicanCert(cert.Cert):
return None
class BarbicanAuth(object, metaclass=abc.ABCMeta):
class BarbicanAuth(metaclass=abc.ABCMeta):
@abc.abstractmethod
def get_barbican_client(self, project_id):
"""Creates a Barbican client object.

View File

@ -16,7 +16,7 @@
import abc
class Cert(object, metaclass=abc.ABCMeta):
class Cert(metaclass=abc.ABCMeta):
"""Base class to represent all certificates."""
@abc.abstractmethod

View File

@ -19,7 +19,7 @@ Certificate Generator API
import abc
class CertGenerator(object, metaclass=abc.ABCMeta):
class CertGenerator(metaclass=abc.ABCMeta):
"""Base Cert Generator Interface
A Certificate Generator is responsible for generating private keys,

View File

@ -47,9 +47,9 @@ class LocalCertGenerator(cert_gen.CertGenerator):
try:
with open(CONF.certificates.ca_certificate, 'rb') as fp:
fp.read()
except IOError as e:
except OSError as e:
raise exceptions.CertificateGenerationException(
msg="Failed to load CA Certificate {0}."
msg="Failed to load CA Certificate {}."
.format(CONF.certificates.ca_certificate)
) from e
if not ca_key:
@ -57,9 +57,9 @@ class LocalCertGenerator(cert_gen.CertGenerator):
try:
with open(CONF.certificates.ca_private_key, 'rb') as fp:
fp.read()
except IOError as e:
except OSError as e:
raise exceptions.CertificateGenerationException(
msg="Failed to load CA Private Key {0}."
msg="Failed to load CA Private Key {}."
.format(CONF.certificates.ca_private_key)
) from e
if not ca_key_pass:

View File

@ -19,7 +19,7 @@ Certificate manager API
import abc
class CertManager(object, metaclass=abc.ABCMeta):
class CertManager(metaclass=abc.ABCMeta):
"""Base Cert Manager Interface
A Cert Manager is responsible for managing certificates for TLS.

View File

@ -57,20 +57,20 @@ class LocalCertManager(cert_mgr.CertManager):
LOG.info("Storing certificate data on the local filesystem.")
try:
filename_certificate = "{0}.crt".format(filename_base)
filename_certificate = f"{filename_base}.crt"
flags = os.O_WRONLY | os.O_CREAT
mode = stat.S_IRUSR | stat.S_IWUSR # mode 0600
with os.fdopen(os.open(
filename_certificate, flags, mode), 'w') as cert_file:
cert_file.write(certificate)
filename_private_key = "{0}.key".format(filename_base)
filename_private_key = f"{filename_base}.key"
with os.fdopen(os.open(
filename_private_key, flags, mode), 'w') as key_file:
key_file.write(private_key)
if intermediates:
filename_intermediates = "{0}.int".format(filename_base)
filename_intermediates = f"{filename_base}.int"
if isinstance(intermediates, bytes):
intermediates = intermediates.decode('utf-8')
with os.fdopen(os.open(
@ -78,14 +78,14 @@ class LocalCertManager(cert_mgr.CertManager):
int_file.write(intermediates)
if private_key_passphrase:
filename_pkp = "{0}.pass".format(filename_base)
filename_pkp = f"{filename_base}.pass"
if isinstance(private_key_passphrase, bytes):
private_key_passphrase = private_key_passphrase.decode(
'utf-8')
with os.fdopen(os.open(
filename_pkp, flags, mode), 'w') as pass_file:
pass_file.write(private_key_passphrase)
except IOError as ioe:
except OSError as ioe:
LOG.error("Failed to store certificate.")
raise exceptions.CertificateStorageException(message=ioe.message)
@ -106,10 +106,10 @@ class LocalCertManager(cert_mgr.CertManager):
filename_base = os.path.join(CONF.certificates.storage_path, cert_ref)
filename_certificate = "{0}.crt".format(filename_base)
filename_private_key = "{0}.key".format(filename_base)
filename_intermediates = "{0}.int".format(filename_base)
filename_pkp = "{0}.pass".format(filename_base)
filename_certificate = f"{filename_base}.crt"
filename_private_key = f"{filename_base}.key"
filename_intermediates = f"{filename_base}.int"
filename_pkp = f"{filename_base}.pass"
cert_data = {}
@ -117,14 +117,14 @@ class LocalCertManager(cert_mgr.CertManager):
try:
with os.fdopen(os.open(filename_certificate, flags)) as cert_file:
cert_data['certificate'] = cert_file.read()
except IOError as e:
except OSError as e:
LOG.error("Failed to read certificate for %s.", cert_ref)
raise exceptions.CertificateStorageException(
msg="Certificate could not be read.") from e
try:
with os.fdopen(os.open(filename_private_key, flags)) as key_file:
cert_data['private_key'] = key_file.read()
except IOError as e:
except OSError as e:
LOG.error("Failed to read private key for %s", cert_ref)
raise exceptions.CertificateStorageException(
msg="Private Key could not be read.") from e
@ -134,13 +134,13 @@ class LocalCertManager(cert_mgr.CertManager):
cert_data['intermediates'] = int_file.read()
cert_data['intermediates'] = list(
cert_parser.get_intermediates_pems(cert_data['intermediates']))
except IOError:
except OSError:
pass
try:
with os.fdopen(os.open(filename_pkp, flags)) as pass_file:
cert_data['private_key_passphrase'] = pass_file.read()
except IOError:
except OSError:
pass
return local_common.LocalCert(**cert_data)
@ -159,17 +159,17 @@ class LocalCertManager(cert_mgr.CertManager):
filename_base = os.path.join(CONF.certificates.storage_path, cert_ref)
filename_certificate = "{0}.crt".format(filename_base)
filename_private_key = "{0}.key".format(filename_base)
filename_intermediates = "{0}.int".format(filename_base)
filename_pkp = "{0}.pass".format(filename_base)
filename_certificate = f"{filename_base}.crt"
filename_private_key = f"{filename_base}.key"
filename_intermediates = f"{filename_base}.int"
filename_pkp = f"{filename_base}.pass"
try:
os.remove(filename_certificate)
os.remove(filename_private_key)
os.remove(filename_intermediates)
os.remove(filename_pkp)
except IOError as ioe:
except OSError as ioe:
LOG.error("Failed to delete certificate %s", cert_ref)
raise exceptions.CertificateStorageException(message=ioe.message)
@ -196,7 +196,7 @@ class LocalCertManager(cert_mgr.CertManager):
filename_base = os.path.join(CONF.certificates.storage_path,
secret_ref)
filename_secret = "{0}.crt".format(filename_base)
filename_secret = f"{filename_base}.crt"
secret_data = None
@ -204,7 +204,7 @@ class LocalCertManager(cert_mgr.CertManager):
try:
with os.fdopen(os.open(filename_secret, flags)) as secret_file:
secret_data = secret_file.read()
except IOError as e:
except OSError as e:
LOG.error("Failed to read secret for %s.", secret_ref)
raise exceptions.CertificateRetrievalException(
ref=secret_ref) from e

View File

@ -58,7 +58,7 @@ def _process_wrapper(exit_event, proc_name, function, agent_name=None):
process_title = 'octavia-driver-agent - {} -- {}'.format(
proc_name, agent_name)
else:
process_title = 'octavia-driver-agent - {}'.format(proc_name)
process_title = f'octavia-driver-agent - {proc_name}'
setproctitle.setproctitle(process_title)
while not exit_event.is_set():
try:

View File

@ -186,7 +186,7 @@ def sctp_health_check(ip_address, port, timeout=2):
data = _sctp_build_init_packet(src_port, port, tag)
print("Sending INIT packet to {}:{}".format(ip_address, port))
print(f"Sending INIT packet to {ip_address}:{port}")
s.send(data)
start = time.time()
@ -211,13 +211,13 @@ def sctp_health_check(ip_address, port, timeout=2):
print("Received ABORT")
ret = 1
else: # Others: unknown error
print("Received {} Type chunk".format(response_type))
print(f"Received {response_type} Type chunk")
send_abort = True
ret = 3
break
else:
print("Timeout after {} seconds.".format(timeout))
print(f"Timeout after {timeout} seconds.")
# Timeout
ret = 2
@ -260,5 +260,5 @@ def main():
ret = sctp_health_check(destination, port, timeout=default_timeout)
sys.exit(ret)
else:
print("Unsupported protocol '{}'".format(protocol))
print(f"Unsupported protocol '{protocol}'")
sys.exit(1)

View File

@ -34,7 +34,7 @@ def interfaces_find(interface_controller, name):
if name in all_interfaces:
return [all_interfaces[name]]
msg = "Could not find interface '{}'.".format(name)
msg = f"Could not find interface '{name}'."
raise InterfaceException(msg=msg)
@ -61,7 +61,7 @@ def interface_cmd(interface_name, action):
action_fn = interface_controller.down
else:
raise InterfaceException(
msg="Unknown action '{}'".format(action))
msg=f"Unknown action '{action}'")
interfaces = interfaces_find(interface_controller,
interface_name)
@ -76,11 +76,11 @@ def main():
action = sys.argv[-2]
interface_name = sys.argv[-1]
except IndexError:
print("usage: {} [up|down] <interface>".format(sys.argv[0]))
print(f"usage: {sys.argv[0]} [up|down] <interface>")
sys.exit(1)
try:
interface_cmd(interface_name, action)
except Exception as e:
print("Error: {}".format(e))
print(f"Error: {e}")
sys.exit(2)

View File

@ -75,7 +75,7 @@ class FilteredJob(Job):
# filter out private information from details
cls_name = type(self).__name__
details = _details_filter(self.details)
return "%s: %s (priority=%s, uuid=%s, details=%s)" % (
return "{}: {} (priority={}, uuid={}, details={})".format(
cls_name, self.name, self.priority,
self.uuid, details)
@ -90,7 +90,7 @@ class JobDetailsFilter(log.logging.Filter):
return True
class BaseTaskFlowEngine(object):
class BaseTaskFlowEngine:
"""This is the task flow engine
Use this engine to start/load flows in the
@ -164,7 +164,7 @@ class RedisDynamicLoggingConductor(DynamicLoggingConductor):
return listeners
class TaskFlowServiceController(object):
class TaskFlowServiceController:
def __init__(self, driver):
self.driver = driver

View File

@ -31,7 +31,7 @@ NOVA_VERSION = '2.15'
CINDER_VERSION = '3'
class NovaAuth(object):
class NovaAuth:
nova_client = None
@classmethod
@ -70,7 +70,7 @@ class NovaAuth(object):
return cls.nova_client
class NeutronAuth(object):
class NeutronAuth:
neutron_client = None
@classmethod
@ -127,7 +127,7 @@ class NeutronAuth(object):
return conn.network
class GlanceAuth(object):
class GlanceAuth:
glance_client = None
@classmethod
@ -166,7 +166,7 @@ class GlanceAuth(object):
return cls.glance_client
class CinderAuth(object):
class CinderAuth:
cinder_client = None
@classmethod

View File

@ -773,22 +773,22 @@ RULE_API_READ_QUOTA = 'rule:load-balancer:read-quota'
RULE_API_READ_QUOTA_GLOBAL = 'rule:load-balancer:read-quota-global'
RULE_API_WRITE_QUOTA = 'rule:load-balancer:write-quota'
RBAC_LOADBALANCER = '{}:loadbalancer:'.format(LOADBALANCER_API)
RBAC_LISTENER = '{}:listener:'.format(LOADBALANCER_API)
RBAC_POOL = '{}:pool:'.format(LOADBALANCER_API)
RBAC_MEMBER = '{}:member:'.format(LOADBALANCER_API)
RBAC_HEALTHMONITOR = '{}:healthmonitor:'.format(LOADBALANCER_API)
RBAC_L7POLICY = '{}:l7policy:'.format(LOADBALANCER_API)
RBAC_L7RULE = '{}:l7rule:'.format(LOADBALANCER_API)
RBAC_QUOTA = '{}:quota:'.format(LOADBALANCER_API)
RBAC_AMPHORA = '{}:amphora:'.format(LOADBALANCER_API)
RBAC_PROVIDER = '{}:provider:'.format(LOADBALANCER_API)
RBAC_PROVIDER_FLAVOR = '{}:provider-flavor:'.format(LOADBALANCER_API)
RBAC_LOADBALANCER = f'{LOADBALANCER_API}:loadbalancer:'
RBAC_LISTENER = f'{LOADBALANCER_API}:listener:'
RBAC_POOL = f'{LOADBALANCER_API}:pool:'
RBAC_MEMBER = f'{LOADBALANCER_API}:member:'
RBAC_HEALTHMONITOR = f'{LOADBALANCER_API}:healthmonitor:'
RBAC_L7POLICY = f'{LOADBALANCER_API}:l7policy:'
RBAC_L7RULE = f'{LOADBALANCER_API}:l7rule:'
RBAC_QUOTA = f'{LOADBALANCER_API}:quota:'
RBAC_AMPHORA = f'{LOADBALANCER_API}:amphora:'
RBAC_PROVIDER = f'{LOADBALANCER_API}:provider:'
RBAC_PROVIDER_FLAVOR = f'{LOADBALANCER_API}:provider-flavor:'
RBAC_PROVIDER_AVAILABILITY_ZONE = '{}:provider-availability-zone:'.format(
LOADBALANCER_API)
RBAC_FLAVOR = '{}:flavor:'.format(LOADBALANCER_API)
RBAC_FLAVOR_PROFILE = '{}:flavor-profile:'.format(LOADBALANCER_API)
RBAC_AVAILABILITY_ZONE = '{}:availability-zone:'.format(LOADBALANCER_API)
RBAC_FLAVOR = f'{LOADBALANCER_API}:flavor:'
RBAC_FLAVOR_PROFILE = f'{LOADBALANCER_API}:flavor-profile:'
RBAC_AVAILABILITY_ZONE = f'{LOADBALANCER_API}:availability-zone:'
RBAC_AVAILABILITY_ZONE_PROFILE = '{}:availability-zone-profile:'.format(
LOADBALANCER_API)

View File

@ -26,7 +26,7 @@ from octavia.common import constants
LOG = logging.getLogger(__name__)
class BaseDataModel(object):
class BaseDataModel:
def _to_dict(self, value, calling_classes=None, recurse=False):
calling_classes = calling_classes or []
# We need to have json convertible data for storing it in
@ -228,7 +228,7 @@ class ListenerStatistics(BaseDataModel):
self.total_connections += other.total_connections
else:
raise TypeError( # noqa: O342
"unsupported operand type(s) for +=: '{0}' and '{1}'".format(
"unsupported operand type(s) for +=: '{}' and '{}'".format(
type(self), type(other)))
return self

View File

@ -60,7 +60,7 @@ CONF = cfg.CONF
JINJA_ENV = None
class JinjaTemplater(object):
class JinjaTemplater:
def __init__(self,
base_amp_path=None,
@ -175,9 +175,9 @@ class JinjaTemplater(object):
tls_certs,
feature_compatibility)
if not socket_path:
socket_path = '%s/%s.sock' % (self.base_amp_path,
listeners[0].load_balancer.id)
state_file_path = '%s/%s/servers-state' % (
socket_path = '{}/{}.sock'.format(self.base_amp_path,
listeners[0].load_balancer.id)
state_file_path = '{}/{}/servers-state'.format(
self.base_amp_path,
listeners[0].load_balancer.id) if feature_compatibility.get(
constants.SERVER_STATE_FILE) else ''
@ -341,7 +341,7 @@ class JinjaTemplater(object):
if listener.tls_certificate_id:
ret_value['crt_list_filename'] = os.path.join(
CONF.haproxy_amphora.base_cert_dir,
loadbalancer.id, '{}.pem'.format(listener.id))
loadbalancer.id, f'{listener.id}.pem')
if tls_certs is not None:
if listener.client_ca_tls_certificate_id:

View File

@ -25,7 +25,7 @@ TEMPLATES_DIR = (os.path.dirname(os.path.realpath(__file__)) +
constants.LOGGING_TEMPLATES + '/')
class LoggingJinjaTemplater(object):
class LoggingJinjaTemplater:
def __init__(self, logging_templates=None):
self.logging_templates = logging_templates or TEMPLATES_DIR

View File

@ -47,7 +47,7 @@ KEEPALIVED_LVS_TEMPLATE = os.path.abspath(
JINJA_ENV = None
class LvsJinjaTemplater(object):
class LvsJinjaTemplater:
def __init__(self, base_amp_path=None, keepalivedlvs_template=None):
"""Keepalived LVS configuration generation

View File

@ -25,7 +25,7 @@ TEMPLATES_DIR = (os.path.dirname(os.path.realpath(__file__)) +
constants.TEMPLATES + '/')
class UserDataJinjaCfg(object):
class UserDataJinjaCfg:
def __init__(self):
template_loader = jinja2.FileSystemLoader(searchpath=os.path.dirname(

View File

@ -26,7 +26,7 @@ _NOAUTH_PATHS = ['/', '/load-balancer/', '/healthcheck',
'/load-balancer/healthcheck']
class KeystoneSession(object):
class KeystoneSession:
def __init__(self, section=constants.SERVICE_AUTH):
self._session = None

View File

@ -21,7 +21,7 @@ from octavia.db import repositories as repo
LOG = logging.getLogger(__name__)
class StatsMixin(object):
class StatsMixin:
def __init__(self):
super().__init__()

View File

@ -104,8 +104,8 @@ def ip_port_str(ip_address, port):
"""Return IP port as string representation depending on address family."""
ip = ipaddress.ip_address(ip_address)
if ip.version == 4:
return "{ip}:{port}".format(ip=ip, port=port)
return "[{ip}]:{port}".format(ip=ip, port=port)
return f"{ip}:{port}"
return f"[{ip}]:{port}"
def netmask_to_prefix(netmask):
@ -166,7 +166,7 @@ def expand_expected_codes(codes):
return retval
class exception_logger(object):
class exception_logger:
"""Wrap a function and log raised exception
:param logger: the logger to log the exception default is LOG.exception

View File

@ -198,26 +198,26 @@ def validate_l7rule_ssl_types(l7rule):
# key and value are not allowed
if req_key:
# log error or raise
msg = 'L7rule type {0} does not use the "key" field.'.format(
msg = 'L7rule type {} does not use the "key" field.'.format(
rule_type)
elif req_value.lower() != 'true':
msg = 'L7rule value {0} is not a boolean True string.'.format(
msg = 'L7rule value {} is not a boolean True string.'.format(
req_value)
elif compare_type != constants.L7RULE_COMPARE_TYPE_EQUAL_TO:
msg = 'L7rule type {0} only supports the {1} compare type.'.format(
msg = 'L7rule type {} only supports the {} compare type.'.format(
rule_type, constants.L7RULE_COMPARE_TYPE_EQUAL_TO)
if rule_type == constants.L7RULE_TYPE_SSL_VERIFY_RESULT:
if req_key:
# log or raise req_key not used
msg = 'L7rule type {0} does not use the "key" field.'.format(
msg = 'L7rule type {} does not use the "key" field.'.format(
rule_type)
elif not req_value.isdigit() or int(req_value) < 0:
# log or raise req_value must be int
msg = 'L7rule type {0} needs a int value, which is >= 0'.format(
msg = 'L7rule type {} needs a int value, which is >= 0'.format(
rule_type)
elif compare_type != constants.L7RULE_COMPARE_TYPE_EQUAL_TO:
msg = 'L7rule type {0} only supports the {1} compare type.'.format(
msg = 'L7rule type {} only supports the {} compare type.'.format(
rule_type, constants.L7RULE_COMPARE_TYPE_EQUAL_TO)
if rule_type == constants.L7RULE_TYPE_SSL_DN_FIELD:
@ -227,7 +227,7 @@ def validate_l7rule_ssl_types(l7rule):
if not req_key or not req_value:
# log or raise key and value must be specified.
msg = 'L7rule type {0} needs to specify a key and a value.'.format(
msg = 'L7rule type {} needs to specify a key and a value.'.format(
rule_type)
# log or raise the key must be split by '-'
elif not dn_regex.match(req_key):