Add typing information to remaining octavia code

... and fix issues reported by mypy.

Apart from the benefit of finding possible bugs using mypy,
type annotations also have other benefits like for instance:

- IDEs like PyCharm and VS Code can use annotations to provide
  better
  code completion, highlight errors and to make coding more efficient
- It serves as documentation of the code and helps to understand
  code
  better. The annotation syntax is very easy to read.
- Sphinx generated module documentation can use this information
  (without the need for additional type info in comments/docstrings)

The syntax used here should be supported by Python 3.6, so it should
not create issues when backporting patches in the future.

Partial-Bug: #2017974
Change-Id: Iafda1d9d4245ae9db625f4ace1a6fcba8f3949ec
This commit is contained in:
Tom Weininger 2024-01-10 09:36:10 +01:00
parent 2b86b31b58
commit c38c458413
22 changed files with 138 additions and 100 deletions

View File

@ -14,6 +14,7 @@
# under the License.
import os
import stat
import typing as tp
import uuid
from oslo_config import cfg
@ -87,7 +88,7 @@ class LocalCertManager(cert_mgr.CertManager):
pass_file.write(private_key_passphrase)
except OSError as ioe:
LOG.error("Failed to store certificate.")
raise exceptions.CertificateStorageException(message=ioe.message)
raise exceptions.CertificateStorageException(message=ioe.strerror)
return cert_ref
@ -111,7 +112,7 @@ class LocalCertManager(cert_mgr.CertManager):
filename_intermediates = f"{filename_base}.int"
filename_pkp = f"{filename_base}.pass"
cert_data = {}
cert_data: tp.Dict[str, tp.Union[str, list]] = {}
flags = os.O_RDONLY
try:
@ -171,7 +172,7 @@ class LocalCertManager(cert_mgr.CertManager):
os.remove(filename_pkp)
except OSError as ioe:
LOG.error("Failed to delete certificate %s", cert_ref)
raise exceptions.CertificateStorageException(message=ioe.message)
raise exceptions.CertificateStorageException(message=ioe.strerror)
def set_acls(self, context, cert_ref):
# There is no security on this store, because it's really dumb

View File

@ -45,7 +45,8 @@ class NoopCertManager(cert_mgr.CertManager):
return self._local_cert
def store_cert(self, context, certificate, private_key, intermediates=None,
private_key_passphrase=None, **kwargs) -> cert.Cert:
private_key_passphrase=None, expiration=None,
name=None) -> cert.Cert:
"""Stores (i.e., registers) a cert with the cert manager.
This method stores the specified cert to the filesystem and returns
@ -82,8 +83,8 @@ class NoopCertManager(cert_mgr.CertManager):
return local.LocalCert(**cert_data)
def get_cert(self, context, cert_ref, check_only=True, **kwargs) -> (
cert.Cert):
def get_cert(self, context, cert_ref, resource_ref=None, check_only=True,
service_name=None) -> cert.Cert:
LOG.debug('Driver %s no-op, get_cert with cert_ref %s',
self.__class__.__name__, cert_ref)
return self.local_cert

View File

@ -30,7 +30,7 @@ from octavia import version
CONF = cfg.CONF
HM_SENDER_CMD_QUEUE = multiproc.Queue()
HM_SENDER_CMD_QUEUE: multiproc.queues.Queue = multiproc.Queue()
class AmphoraAgent(gunicorn.app.base.BaseApplication):

View File

@ -112,7 +112,8 @@ def main():
def process_cleanup(*args, **kwargs):
LOG.info("Health Manager exiting due to signal")
exit_event.set()
os.kill(hm_health_check_proc.pid, signal.SIGINT)
if hm_health_check_proc.pid is not None:
os.kill(hm_health_check_proc.pid, signal.SIGINT)
hm_health_check_proc.join()
hm_listener_proc.join()

View File

@ -26,6 +26,7 @@ import sys
import threading
import time
import traceback
import typing as tp
import urllib.request
import psutil
@ -45,7 +46,9 @@ EXIT_EVENT = threading.Event()
# If tuple[1] is None, the key will be replaced in the HELP string.
# tuple[2]: If not None, includes a dictionary of additional substitutions.
# tuple[2] substitutions happen prior to the key replacement in tuple[0].
METRIC_MAP = {
METRIC_MAP: tp.Dict[str,
tp.Tuple[str, tp.Optional[str],
tp.Optional[tp.Dict[str, str]]]] = {
# Load balancer metrics
"haproxy_process_pool_allocated_bytes ":
("octavia_memory_pool_allocated_bytes ",
@ -290,7 +293,7 @@ METRIC_MAP = {
"haproxy_backend_last_session_seconds ":
("octavia_pool_last_session_seconds ",
"# HELP octavia_pool_last_session_seconds Number of seconds since "
"last session assigned to a member.\n", None, None),
"last session assigned to a member.\n", None),
"haproxy_backend_last_session_seconds{":
("octavia_pool_last_session_seconds{", None, {"proxy=": "pool="}),
"haproxy_backend_current_queue ":
@ -740,7 +743,8 @@ class PrometheusProxy(SimpleHTTPRequestHandler):
continue
match = next((x for x in METRIC_KEYS if x in line), False)
if match:
map_tuple = METRIC_MAP[match]
match_str = tp.cast(str, match)
map_tuple = METRIC_MAP[match_str]
if map_tuple[1] and "HELP" in line:
metrics_buffer += map_tuple[1]
else:
@ -748,7 +752,7 @@ class PrometheusProxy(SimpleHTTPRequestHandler):
for key in map_tuple[2].keys():
line = line.replace(key,
map_tuple[2][key])
metrics_buffer += line.replace(match,
metrics_buffer += line.replace(match_str,
map_tuple[0])
elif PRINT_REJECTED:
print("REJECTED: %s" % line)

View File

@ -17,6 +17,7 @@ import concurrent.futures
import datetime
import functools
import time
import typing as tp
from oslo_config import cfg
from oslo_log import log
@ -54,7 +55,7 @@ LOG.logger.addFilter(retryMaskFilter)
def _details_filter(obj):
if isinstance(obj, dict):
ret = {}
ret: tp.Dict[str, tp.Union[str, list]] = {}
for key in obj:
if (key in ('certificate', 'private_key', 'passphrase') and
isinstance(obj[key], str)):

View File

@ -612,7 +612,7 @@ task_flow_opts = [
'persistence backend when job completed.'),
]
core_cli_opts = []
core_cli_opts: list = []
certificate_opts = [
cfg.StrOpt('cert_manager',

View File

@ -28,6 +28,7 @@ LOG = logging.getLogger(__name__)
class BaseDataModel:
def _to_dict(self, value, calling_classes=None, recurse=False):
ret: tp.Union[str, list, None]
calling_classes = calling_classes or []
# We need to have json convertible data for storing it in
# persistence jobboard backend.

View File

@ -206,16 +206,18 @@ class JinjaTemplater:
'enable_prometheus': enable_prometheus,
'require_insecure_fork': require_insecure_fork,
}
try:
# Enable cpu-pinning only if the amphora TuneD profile is active
if "amphora" in amp_details["active_tuned_profiles"].split():
jinja_dict["cpu_count"] = int(amp_details["cpu_count"])
except (KeyError, TypeError):
pass
if amp_details is not None:
try:
# Enable cpu-pinning only if the amphora TuneD profile
# is active
if "amphora" in amp_details["active_tuned_profiles"].split():
jinja_dict["cpu_count"] = int(amp_details["cpu_count"])
except (KeyError, TypeError):
pass
if term_https_listener:
try:
mem = amp_details["memory"]
mem = amp_details["memory"] # type: ignore
# Account for 32 KB per established connection for each
# pair of HAProxy network sockets. Use 1024 as fallback
# because that is what ulimit -n typically returns.

View File

@ -15,6 +15,7 @@
import base64
import hashlib
import typing as tp
from cryptography.hazmat import backends
from cryptography.hazmat.primitives import serialization
@ -193,7 +194,7 @@ def _read_pem_blocks(data):
state = stSpam
if isinstance(data, bytes):
data = data.decode('utf-8')
certLines = []
certLines: tp.List[str] = []
for certLine in data.replace('\r', '').split('\n'):
if not certLine:
continue
@ -255,7 +256,7 @@ def get_host_names(certificate):
"""
if isinstance(certificate, str):
certificate = certificate.encode('utf-8')
host_names = {'cn': None, 'dns_names': []}
host_names: tp.Dict[str, tp.Any] = {'cn': None, 'dns_names': []}
try:
cert = x509.load_pem_x509_certificate(certificate,
backends.default_backend())

View File

@ -23,6 +23,7 @@ import hashlib
import ipaddress
import re
import socket
import typing as tp
from oslo_config import cfg
from oslo_log import log as logging
@ -152,7 +153,7 @@ def expand_expected_codes(codes):
200-204 -> 200, 201, 202, 204
200, 203 -> 200, 203
"""
retval = set()
retval: tp.Set[str] = set()
codes = re.split(', *', codes)
for code in codes:
if not code:

View File

@ -21,6 +21,7 @@ Defined here so these can also be used at deeper levels than the API.
import ipaddress
import re
import typing as tp
from oslo_config import cfg
from rfc3986 import uri_reference
@ -33,8 +34,10 @@ from octavia.common import exceptions
from octavia.common import utils
from octavia.i18n import _
if tp.TYPE_CHECKING:
from octavia.api.v2.types.listener import ListenerPUT
CONF = cfg.CONF
_ListenerPUT = 'octavia.api.v2.types.listener.ListenerPUT'
def url(url, require_scheme=True):
@ -190,7 +193,7 @@ def validate_l7rule_ssl_types(l7rule):
rule_type = None if l7rule.type == wtypes.Unset else l7rule.type
req_key = None if l7rule.key == wtypes.Unset else l7rule.key
req_value = None if l7rule.value == wtypes.Unset else l7rule.value
req_value = "" if l7rule.value == wtypes.Unset else l7rule.value
compare_type = (None if l7rule.compare_type == wtypes.Unset else
l7rule.compare_type)
msg = None
@ -551,10 +554,10 @@ def check_hsts_options(listener: dict):
'the TERMINATED_HTTPS protocol.'))
def check_hsts_options_put(listener: _ListenerPUT,
def check_hsts_options_put(listener: "ListenerPUT",
db_listener: data_models.Listener):
hsts_disabled = all(obj.hsts_max_age in [None, wtypes.Unset] for obj
in (db_listener, listener))
hsts_disabled = (db_listener.hsts_max_age is None and
listener.hsts_max_age in (None, wtypes.Unset))
if ((listener.hsts_include_subdomains or listener.hsts_preload) and
hsts_disabled):
raise exceptions.ValidationException(

View File

@ -25,7 +25,7 @@ from octavia.network import data_models as network_models
LOG = logging.getLogger(__name__)
NoopServerGroup = namedtuple('ServerGroup', ['id'])
NoopServerGroup = namedtuple('NoopServerGroup', ['id'])
class NoopManager:

View File

@ -332,8 +332,8 @@ class VirtualMachineManager(compute_base.ComputeBase):
for interface in interfaces:
if interface.id == port_id:
return interface
except Exception as e:
raise exceptions.ComputeUnknownException(exc=str(e))
except Exception as ex:
raise exceptions.ComputeUnknownException(exc=str(ex))
raise exceptions.ComputePortInUseException(port=port_id)

View File

@ -16,6 +16,7 @@
from concurrent import futures
import functools
import time
import typing as tp
from oslo_config import cfg
from oslo_db import exception as db_exc
@ -25,6 +26,7 @@ from oslo_utils import excutils
from octavia.common import constants
from octavia.controller.worker.v2 import controller_worker as cw2
from octavia.db import api as db_api
from octavia.db import models
from octavia.db import repositories as repo
CONF = cfg.CONF
@ -83,6 +85,14 @@ class HealthManager:
'failover_cancelled': 0,
}
futs = []
amp_health: tp.Optional[models.Amphora]
def _db_error_handler():
nonlocal amp_health
if lock_session:
lock_session.rollback()
amp_health = None
while not self.dead.is_set():
amp_health = None
lock_session = None
@ -105,16 +115,13 @@ class HealthManager:
lock_session.commit()
except db_exc.DBDeadlock:
LOG.debug('Database reports deadlock. Skipping.')
lock_session.rollback()
amp_health = None
_db_error_handler()
except db_exc.RetryRequest:
LOG.debug('Database is requesting a retry. Skipping.')
lock_session.rollback()
amp_health = None
_db_error_handler()
except db_exc.DBConnectionError:
db_api.wait_for_connection(self.dead)
lock_session.rollback()
amp_health = None
_db_error_handler()
if not self.dead.is_set():
# amphora heartbeat timestamps should also be outdated
# while DB is unavailable and soon after DB comes back

View File

@ -12,6 +12,7 @@
# License for the specific language governing permissions and limitations
# under the License.
#
import typing as tp
from octavia_lib.common import constants as lib_consts
from oslo_config import cfg
@ -1026,7 +1027,7 @@ class ControllerWorker:
lb_amp_count = 1
az_metadata = {}
flavor_dict = {}
flavor_dict: tp.Dict[str, str] = {}
lb_id = None
vip_dict = {}
additional_vip_dicts = []

View File

@ -14,8 +14,7 @@
#
import copy
from typing import List
from typing import Optional
import typing as tp
from cryptography import fernet
from oslo_config import cfg
@ -576,7 +575,8 @@ class AmphoraIndexVRRPUpdate(BaseAmphoraTask):
"""Task to update the VRRP configuration of an amphora."""
def execute(self, loadbalancer_id, amphorae_network_config, amphora_index,
amphorae, amphorae_status: dict, amp_vrrp_int: Optional[str],
amphorae, amphorae_status: dict,
amp_vrrp_int: tp.Optional[str],
new_amphora_id: str, timeout_dict=None):
"""Execute update_vrrp_conf."""
# Note, we don't want this to cause a revert as it may be used
@ -732,9 +732,9 @@ class AmphoraeGetConnectivityStatus(BaseAmphoraTask):
load balancers
"""
def execute(self, amphorae: List[dict], new_amphora_id: str,
def execute(self, amphorae: tp.List[dict], new_amphora_id: str,
timeout_dict=None):
amphorae_status = {}
amphorae_status: tp.Dict[str, tp.Dict[str, bool]] = {}
for amphora in amphorae:
amphora_id = amphora[constants.ID]
@ -765,8 +765,8 @@ class AmphoraeGetConnectivityStatus(BaseAmphoraTask):
class SetAmphoraFirewallRules(BaseAmphoraTask):
"""Task to push updated firewall ruls to an amphora."""
def execute(self, amphorae: List[dict], amphora_index: int,
amphora_firewall_rules: List[dict], amphorae_status: dict,
def execute(self, amphorae: tp.List[dict], amphora_index: int,
amphora_firewall_rules: tp.List[dict], amphorae_status: dict,
timeout_dict=None):
if (amphora_firewall_rules and

View File

@ -11,6 +11,8 @@
# under the License.
#
import typing as tp
from oslo_log import log as logging
from taskflow import task
@ -22,7 +24,7 @@ LOG = logging.getLogger(__name__)
class BaseNotificationTask(task.Task):
event_type = None
event_type: tp.Optional[str] = None
def __init__(self, **kwargs):
super().__init__(**kwargs)

View File

@ -12,6 +12,8 @@
# License for the specific language governing permissions and limitations
# under the License.
import typing as tp
from oslo_db.sqlalchemy import models
from oslo_utils import strutils
from oslo_utils import uuidutils
@ -113,7 +115,7 @@ class OctaviaBase(models.ModelBase):
@staticmethod
def apply_filter(query, model, filters):
translated_filters = {}
child_map = {}
child_map: dict = {}
# Convert enabled to proper type
if 'enabled' in filters:
filters['enabled'] = strutils.bool_from_string(
@ -136,10 +138,10 @@ class OctaviaBase(models.ModelBase):
return query
def __repr__(self):
params = sorted(
params_list: tp.List[tp.Tuple[str, tp.Any]] = sorted(
(k, getattr(self, k)) for k in self.__mapper__.columns.keys()
)
params = ", ".join(f"{k}={v!r}" for k, v in params)
params = ", ".join(f"{k}={v!r}" for k, v in params_list)
return f"{self.__class__.__name__}({params})"
@ -170,6 +172,8 @@ class TagMixin:
The class must realize the specified db relationship as well.
"""
_tags: list
id: str
@property
def tags(self):
@ -192,7 +196,7 @@ class TagMixin:
BASE = declarative_base(cls=OctaviaBase)
class Tags(BASE):
class Tags(BASE): # type: ignore
__tablename__ = "tags"
resource_id = sa.Column(sa.String(36), primary_key=True)

View File

@ -19,7 +19,7 @@ reference
"""
import datetime
from typing import Optional
import typing as tp
from oslo_config import cfg
from oslo_db import api as oslo_db_api
@ -42,6 +42,7 @@ from octavia.common import exceptions
from octavia.common import utils
from octavia.common import validate
from octavia.db import api as db_api
from octavia.db import base_models
from octavia.db import models
CONF = cfg.CONF
@ -50,7 +51,7 @@ LOG = logging.getLogger(__name__)
class BaseRepository:
model_class = None
model_class: base_models.BASE = None
def count(self, session, **filters):
"""Retrieves a count of entities from the database.
@ -120,7 +121,8 @@ class BaseRepository:
:param session: A Sql Alchemy database session.
:param filters: Filters to decide which entity should be retrieved.
:returns: octavia.common.data_model
:raises AssertionError: If no data base object matching given
filters can be found.
"""
deleted = filters.pop('show_deleted', True)
model = session.query(self.model_class).filter_by(**filters)
@ -135,10 +137,7 @@ class BaseRepository:
model = model.first()
if not model:
return None
return model.to_data_model()
return None if model is None else model.to_data_model()
def get_all(self, session, pagination_helper=None,
query_options=None, **filters):
@ -235,8 +234,9 @@ class Repositories:
self.availability_zone = AvailabilityZoneRepository()
self.availability_zone_profile = AvailabilityZoneProfileRepository()
def create_load_balancer_and_vip(self, session, lb_dict, vip_dict,
additional_vip_dicts=None):
def create_load_balancer_and_vip(
self, session, lb_dict, vip_dict, additional_vip_dicts=None) -> (
data_models.LoadBalancer):
"""Inserts load balancer and vip entities into the database.
Inserts load balancer and vip entities into the database in one
@ -246,7 +246,6 @@ class Repositories:
:param lb_dict: Dictionary representation of a load balancer
:param vip_dict: Dictionary representation of a vip
:param additional_vip_dicts: Dict representations of additional vips
:returns: octavia.common.data_models.LoadBalancer
"""
additional_vip_dicts = additional_vip_dicts or []
if not lb_dict.get('id'):
@ -1345,9 +1344,9 @@ class AmphoraRepository(BaseRepository):
"load_balancer.provisioning_status != :deleted;").bindparams(
amp_id=amphora_id, deleted=consts.DELETED))
lb = {}
listeners = {}
pools = {}
lb: dict = {}
listeners: dict = {}
pools: dict = {}
for row in rows.mappings():
if not lb:
lb['id'] = row['id']
@ -1534,12 +1533,11 @@ class AmphoraHealthRepository(BaseRepository):
# In this case, the amphora is expired.
return amphora_model is None
def get_stale_amphora(self,
lock_session: Session) -> Optional[models.Amphora]:
def get_stale_amphora(
self, lock_session: Session) -> tp.Optional[models.Amphora]:
"""Retrieves a stale amphora from the health manager database.
:param lock_session: A Sql Alchemy database autocommit session.
:returns: [octavia.common.data_model]
"""
timeout = CONF.health_manager.heartbeat_timeout
expired_time = datetime.datetime.utcnow() - datetime.timedelta(
@ -1606,7 +1604,7 @@ class AmphoraHealthRepository(BaseRepository):
return amp_health.to_data_model()
def update_failover_stopped(self, lock_session: Session,
expired_time: datetime) -> None:
expired_time: datetime.datetime) -> None:
"""Updates the status of amps that are FAILOVER_STOPPED."""
# Update any FAILOVER_STOPPED amphora that are no longer stale
# back to ALLOCATED.
@ -1944,6 +1942,7 @@ class QuotasRepository(BaseRepository):
class _GetALLExceptDELETEDIdMixin:
model_class: base_models.BASE
def get_all(self, session, pagination_helper=None,
query_options=None, **filters):
@ -2065,8 +2064,8 @@ class AvailabilityZoneRepository(_GetALLExceptDELETEDIdMixin, BaseRepository):
:param serial_session: A Sql Alchemy database transaction session.
:param filters: Filters to decide which entity should be deleted.
:returns: None
:raises: odb_exceptions.DBReferenceError
:raises: sqlalchemy.orm.exc.NoResultFound
:raises odb_exceptions.DBReferenceError:
:raises sqlalchemy.orm.exc.NoResultFound:
"""
(serial_session.query(models.LoadBalancer).
filter(models.LoadBalancer.availability_zone == filters[consts.NAME]).

View File

@ -13,6 +13,7 @@
# under the License.
import ipaddress
import time
import typing as tp
from novaclient import exceptions as nova_client_exceptions
from octavia_lib.common import constants as lib_consts
@ -589,7 +590,7 @@ class AllowedAddressPairsDriver(neutron_base.BaseNeutronDriver):
except Exception:
pass
try:
aap_update = {
aap_update: tp.Dict[str, list] = {
constants.ALLOWED_ADDRESS_PAIRS: []
}
self.network_proxy.update_port(interface.port_id,
@ -780,7 +781,7 @@ class AllowedAddressPairsDriver(neutron_base.BaseNeutronDriver):
def get_network_configs(self, loadbalancer, amphora=None):
vip_subnet = self.get_subnet(loadbalancer.vip.subnet_id)
vip_port = self.get_port(loadbalancer.vip.port_id)
amp_configs = {}
amp_configs: tp.Dict[str, n_data_models.AmphoraNetworkConfig] = {}
if amphora:
self._get_amp_net_configs(amphora, amp_configs,
vip_subnet, vip_port,

View File

@ -12,6 +12,8 @@
# License for the specific language governing permissions and limitations
# under the License.
import typing as tp
from oslo_log import log as logging
from oslo_utils import uuidutils
@ -22,7 +24,7 @@ from octavia.network import data_models as network_models
LOG = logging.getLogger(__name__)
_NOOP_MANAGER_VARS = {
_NOOP_MANAGER_VARS: tp.Dict[str, tp.Optional[dict]] = {
'networks': {},
'subnets': {},
'ports': {},
@ -130,12 +132,12 @@ class NoopManager:
fixed_ips=[],
port_id=uuidutils.generate_uuid()
)
_NOOP_MANAGER_VARS['ports'][interface.port_id] = (
_NOOP_MANAGER_VARS['ports'][interface.port_id] = ( # type: ignore
network_models.Port(
id=interface.port_id,
network_id=network_id))
_NOOP_MANAGER_VARS['interfaces'][(network_id, compute_id)] = (
interface)
_NOOP_MANAGER_VARS['interfaces'][ # type: ignore
(network_id, compute_id)] = interface
return interface
def unplug_network(self, compute_id, network_id):
@ -144,14 +146,16 @@ class NoopManager:
self.__class__.__name__, compute_id, network_id)
self.networkconfigconfig[(compute_id, network_id)] = (
compute_id, network_id, 'unplug_network')
_NOOP_MANAGER_VARS['interfaces'].pop((network_id, compute_id), None)
_NOOP_MANAGER_VARS['interfaces'].pop( # type: ignore
(network_id, compute_id), None)
def get_plugged_networks(self, compute_id):
LOG.debug("Network %s no-op, get_plugged_networks amphora_id %s",
self.__class__.__name__, compute_id)
self.networkconfigconfig[compute_id] = (
compute_id, 'get_plugged_networks')
return [pn for pn in _NOOP_MANAGER_VARS['interfaces'].values()
return [pn for pn in
_NOOP_MANAGER_VARS['interfaces'].values() # type: ignore
if pn.compute_id == compute_id]
def update_vip(self, loadbalancer, for_delete=False):
@ -165,8 +169,8 @@ class NoopManager:
LOG.debug("Network %s no-op, get_network network_id %s",
self.__class__.__name__, network_id)
self.networkconfigconfig[network_id] = (network_id, 'get_network')
if network_id in _NOOP_MANAGER_VARS['networks']:
return _NOOP_MANAGER_VARS['networks'][network_id]
if network_id in _NOOP_MANAGER_VARS['networks']: # type: ignore
return _NOOP_MANAGER_VARS['networks'][network_id] # type: ignore
network = network_models.Network(id=network_id,
port_security_enabled=True)
@ -196,11 +200,12 @@ class NoopManager:
subnet = network_models.Subnet(id=uuidutils.generate_uuid(),
network_id=self.network.id)
self.known_subnets[subnet.id] = subnet
_NOOP_MANAGER_VARS['subnets'][subnet.id] = subnet
_NOOP_MANAGER_VARS['subnets'][subnet.id] = ( # type: ignore
subnet)
yield subnet.id
network.subnets = ItIsInsideMe(network, self)
_NOOP_MANAGER_VARS['networks'][network_id] = network
_NOOP_MANAGER_VARS['networks'][network_id] = network # type: ignore
_NOOP_MANAGER_VARS['current_network'] = network_id
return network
@ -208,24 +213,24 @@ class NoopManager:
LOG.debug("Subnet %s no-op, get_subnet subnet_id %s",
self.__class__.__name__, subnet_id)
self.networkconfigconfig[subnet_id] = (subnet_id, 'get_subnet')
if subnet_id in _NOOP_MANAGER_VARS['subnets']:
return _NOOP_MANAGER_VARS['subnets'][subnet_id]
if subnet_id in _NOOP_MANAGER_VARS['subnets']: # type: ignore
return _NOOP_MANAGER_VARS['subnets'][subnet_id] # type: ignore
subnet = network_models.Subnet(
id=subnet_id,
network_id=_NOOP_MANAGER_VARS['current_network'])
_NOOP_MANAGER_VARS['subnets'][subnet_id] = subnet
_NOOP_MANAGER_VARS['subnets'][subnet_id] = subnet # type: ignore
return subnet
def get_port(self, port_id):
LOG.debug("Port %s no-op, get_port port_id %s",
self.__class__.__name__, port_id)
self.networkconfigconfig[port_id] = (port_id, 'get_port')
if port_id in _NOOP_MANAGER_VARS['ports']:
return _NOOP_MANAGER_VARS['ports'][port_id]
if port_id in _NOOP_MANAGER_VARS['ports']: # type: ignore
return _NOOP_MANAGER_VARS['ports'][port_id] # type: ignore
port = network_models.Port(id=port_id)
_NOOP_MANAGER_VARS['ports'][port_id] = port
_NOOP_MANAGER_VARS['ports'][port_id] = port # type: ignore
return port
def get_network_by_name(self, network_name):
@ -233,14 +238,15 @@ class NoopManager:
self.__class__.__name__, network_name)
self.networkconfigconfig[network_name] = (network_name,
'get_network_by_name')
by_name = {n.name: n for n in _NOOP_MANAGER_VARS['networks'].values()}
by_name = {n.name: n for n in
_NOOP_MANAGER_VARS['networks'].values()} # type: ignore
if network_name in by_name:
return by_name[network_name]
network = network_models.Network(id=uuidutils.generate_uuid(),
port_security_enabled=True,
name=network_name)
_NOOP_MANAGER_VARS['networks'][network.id] = network
_NOOP_MANAGER_VARS['networks'][network.id] = network # type: ignore
_NOOP_MANAGER_VARS['current_network'] = network.id
return network
@ -249,7 +255,8 @@ class NoopManager:
self.__class__.__name__, subnet_name)
self.networkconfigconfig[subnet_name] = (subnet_name,
'get_subnet_by_name')
by_name = {s.name: s for s in _NOOP_MANAGER_VARS['subnets'].values()}
by_name = {s.name: s for s in
_NOOP_MANAGER_VARS['subnets'].values()} # type: ignore
if subnet_name in by_name:
return by_name[subnet_name]
@ -257,20 +264,21 @@ class NoopManager:
id=uuidutils.generate_uuid(),
name=subnet_name,
network_id=_NOOP_MANAGER_VARS['current_network'])
_NOOP_MANAGER_VARS['subnets'][subnet.id] = subnet
_NOOP_MANAGER_VARS['subnets'][subnet.id] = subnet # type: ignore
return subnet
def get_port_by_name(self, port_name):
LOG.debug("Port %s no-op, get_port_by_name port_name %s",
self.__class__.__name__, port_name)
self.networkconfigconfig[port_name] = (port_name, 'get_port_by_name')
by_name = {p.name: p for p in _NOOP_MANAGER_VARS['ports'].values()}
by_name = {p.name: p for p in
_NOOP_MANAGER_VARS['ports'].values()} # type: ignore
if port_name in by_name:
return by_name[port_name]
port = network_models.Port(id=uuidutils.generate_uuid(),
name=port_name)
_NOOP_MANAGER_VARS['ports'][port.id] = port
_NOOP_MANAGER_VARS['ports'][port.id] = port # type: ignore
return port
def get_port_by_net_id_device_id(self, network_id, device_id):
@ -279,15 +287,15 @@ class NoopManager:
self.__class__.__name__, network_id, device_id)
self.networkconfigconfig[(network_id, device_id)] = (
network_id, device_id, 'get_port_by_net_id_device_id')
by_net_dev_id = {(p.network_id, p.device_id): p
for p in _NOOP_MANAGER_VARS['ports'].values()}
by_net_dev_id = {(p.network_id, p.device_id): p for p in
_NOOP_MANAGER_VARS['ports'].values()} # type: ignore
if (network_id, device_id) in by_net_dev_id:
return by_net_dev_id[(network_id, device_id)]
port = network_models.Port(id=uuidutils.generate_uuid(),
network_id=network_id,
device_id=device_id)
_NOOP_MANAGER_VARS['ports'][port.id] = port
_NOOP_MANAGER_VARS['ports'][port.id] = port # type: ignore
return port
def get_security_group(self, sg_name):
@ -328,7 +336,7 @@ class NoopManager:
vip_subnet = self.get_subnet(loadbalancer.vip.subnet_id)
vip_port = self.get_port(loadbalancer.vip.port_id)
amp_configs = {}
amp_configs: tp.Dict[str, network_models.AmphoraNetworkConfig] = {}
if amphora:
self._get_amp_net_configs(amphora, amp_configs,
vip_subnet, vip_port)
@ -427,7 +435,7 @@ class NoopManager:
port = network_models.Port(id=port_id,
network_id=uuidutils.generate_uuid())
_NOOP_MANAGER_VARS['ports'][port.id] = port
_NOOP_MANAGER_VARS['ports'][port.id] = port # type: ignore
return port
def unplug_fixed_ip(self, port_id, subnet_id):
@ -437,7 +445,7 @@ class NoopManager:
self.networkconfigconfig[(port_id, subnet_id)] = (
port_id, subnet_id, 'unplug_fixed_ip')
return _NOOP_MANAGER_VARS['ports'].pop(port_id, None)
return _NOOP_MANAGER_VARS['ports'].pop(port_id, None) # type: ignore
class NoopNetworkDriver(driver_base.AbstractNetworkDriver):