Remove deprecated exception classes

These classes were deprecated in Stein and marked for removal in Ussuri.
By removing these classes, we fix pep8 issues (catching-non-exception)
we started seeing at the gate with the release of astroid 2.4.0.

(cherry picked from commit 0056b5175f)

Fix E741 pep8 errors

E741 ambiguous variable name 'l'

Change 'l' to another variable in affected code.

Change-Id: Idd176e40ccf2a79832a5c99140bd30e5e1f9c0d8
(cherry picked from commit 9a1d6d3585)
This commit is contained in:
Carlos Goncalves 2020-04-27 18:20:20 +02:00 committed by Brian Haley
parent 0e5372fb3a
commit 0636cd1b43
27 changed files with 105 additions and 154 deletions

View File

@ -58,8 +58,8 @@ class Wrapped(object):
self.stream = stream_
self.hash = hashlib.md5() # nosec
def read(self, l):
block = self.stream.read(l)
def read(self, line):
block = self.stream.read(line)
if block:
self.hash.update(block)
return block

View File

@ -352,8 +352,8 @@ class HaproxyAmphoraLoadBalancerDriver(
# See how many non-UDP listeners we have left
non_udp_listener_count = len([
1 for l in listener.load_balancer.listeners
if l.protocol != consts.PROTOCOL_UDP])
1 for li in listener.load_balancer.listeners
if li.protocol != consts.PROTOCOL_UDP])
if non_udp_listener_count > 0:
# We have other listeners, so just update is fine.
# TODO(rm_work): This is a little inefficient since this duplicates

View File

@ -49,10 +49,10 @@ class NoopManager(object):
def update(self, loadbalancer):
LOG.debug("Amphora %s no-op, update listener %s, vip %s",
self.__class__.__name__,
tuple(l.protocol_port for l in loadbalancer.listeners),
tuple(li.protocol_port for li in loadbalancer.listeners),
loadbalancer.vip.ip_address)
self.amphoraconfig[
(tuple(l.protocol_port for l in loadbalancer.listeners),
(tuple(li.protocol_port for li in loadbalancer.listeners),
loadbalancer.vip.ip_address)] = (loadbalancer.listeners,
loadbalancer.vip,
'active')

View File

@ -1,41 +0,0 @@
# Copyright 2018 Rackspace, US Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import warnings
from debtcollector import moves
from octavia_lib.api.drivers import exceptions as lib_exceptions
warnings.simplefilter('default', DeprecationWarning)
DriverError = moves.moved_class(lib_exceptions.DriverError, 'DriverError',
__name__, version='Stein', removal_version='U')
NotImplementedError = moves.moved_class(
lib_exceptions.NotImplementedError, 'NotImplementedError', __name__,
version='Stein', removal_version='U')
UnsupportedOptionError = moves.moved_class(
lib_exceptions.UnsupportedOptionError, 'UnsupportedOptionError', __name__,
version='Stein', removal_version='U')
UpdateStatusError = moves.moved_class(
lib_exceptions.UpdateStatusError, 'UpdateStatusError', __name__,
version='Stein', removal_version='U')
UpdateStatisticsError = moves.moved_class(
lib_exceptions.UpdateStatisticsError, 'UpdateStatisticsError', __name__,
version='Stein', removal_version='U')

View File

@ -22,7 +22,6 @@ from oslo_log import log as logging
from oslo_utils import excutils
from stevedore import driver as stevedore_driver
from octavia.api.drivers import exceptions as driver_exceptions
from octavia.common import data_models
from octavia.common import exceptions
from octavia.common.tls_utils import cert_parser
@ -50,14 +49,12 @@ def call_provider(provider, driver_method, *args, **kwargs):
try:
return driver_method(*args, **kwargs)
except (driver_exceptions.DriverError, lib_exceptions.DriverError) as e:
except lib_exceptions.DriverError as e:
LOG.exception("Provider '%s' raised a driver error: %s",
provider, e.operator_fault_string)
raise exceptions.ProviderDriverError(prov=provider,
user_msg=e.user_fault_string)
except (driver_exceptions.NotImplementedError,
lib_exceptions.NotImplementedError,
NotImplementedError) as e:
except (lib_exceptions.NotImplementedError, NotImplementedError) as e:
op_fault_string = (
e.operator_fault_string
if hasattr(e, "operator_fault_string")
@ -70,8 +67,7 @@ def call_provider(provider, driver_method, *args, **kwargs):
provider, op_fault_string)
raise exceptions.ProviderNotImplementedError(
prov=provider, user_msg=usr_fault_string)
except (driver_exceptions.UnsupportedOptionError,
lib_exceptions.UnsupportedOptionError) as e:
except lib_exceptions.UnsupportedOptionError as e:
LOG.info("Provider '%s' raised an unsupported option error: "
"%s", provider, e.operator_fault_string)
raise exceptions.ProviderUnsupportedOptionError(

View File

@ -83,7 +83,7 @@ class HealthMonitorController(base.BaseController):
def _get_affected_listener_ids(self, session, hm):
"""Gets a list of all listeners this request potentially affects."""
pool = self.repositories.pool.get(session, id=hm.pool_id)
listener_ids = [l.id for l in pool.listeners]
listener_ids = [li.id for li in pool.listeners]
return listener_ids
def _test_lb_and_listener_and_pool_statuses(self, session, hm):

View File

@ -509,8 +509,8 @@ class LoadBalancersController(base.BaseController):
pools_required = set()
# Look through listeners and find any extra pools, and move them to the
# top level so they are created first.
for l in listeners:
default_pool = l.get('default_pool')
for li in listeners:
default_pool = li.get('default_pool')
pool_name = (
default_pool.get('name') if default_pool else None)
# All pools need to have a name so they can be referenced
@ -523,12 +523,12 @@ class LoadBalancersController(base.BaseController):
# as default)
if default_pool and len(default_pool) > 3:
pools.append(default_pool)
l['default_pool'] = {'name': pool_name}
li['default_pool'] = {'name': pool_name}
# Otherwise, it's a reference and we record it and move on
elif default_pool:
pools_required.add(pool_name)
# We also need to check policy redirects
for policy in l.get('l7policies'):
for policy in li.get('l7policies'):
redirect_pool = policy.get('redirect_pool')
pool_name = (
redirect_pool.get('name') if redirect_pool else None)
@ -595,8 +595,8 @@ class LoadBalancersController(base.BaseController):
# Now create all of the listeners
new_lists = []
for l in listeners:
default_pool = l.pop('default_pool', None)
for li in listeners:
default_pool = li.pop('default_pool', None)
# If there's a default pool, replace it with the ID
if default_pool:
pool_name = default_pool['name']
@ -604,11 +604,11 @@ class LoadBalancersController(base.BaseController):
if not pool_id:
raise exceptions.SingleCreateDetailsMissing(
type='Pool', name=pool_name)
l['default_pool_id'] = pool_id
l['load_balancer_id'] = db_lb.id
l['project_id'] = db_lb.project_id
li['default_pool_id'] = pool_id
li['load_balancer_id'] = db_lb.id
li['project_id'] = db_lb.project_id
new_lists.append(listener.ListenersController()._graph_create(
lock_session, l, pool_name_ids=pool_name_ids))
lock_session, li, pool_name_ids=pool_name_ids))
return new_pools, new_lists

View File

@ -90,10 +90,10 @@ class MemberController(base.BaseController):
def _get_affected_listener_ids(self, session, member=None):
"""Gets a list of all listeners this request potentially affects."""
if member:
listener_ids = [l.id for l in member.pool.listeners]
listener_ids = [li.id for li in member.pool.listeners]
else:
pool = self._get_db_pool(session, self.pool_id)
listener_ids = [l.id for l in pool.listeners]
listener_ids = [li.id for li in pool.listeners]
return listener_ids
def _test_lb_and_listener_and_pool_statuses(self, session, member=None):

View File

@ -83,7 +83,7 @@ class PoolsController(base.BaseController):
def _get_affected_listener_ids(self, pool):
"""Gets a list of all listeners this request potentially affects."""
listener_ids = [l.id for l in pool.listeners]
listener_ids = [li.id for li in pool.listeners]
return listener_ids
def _test_lb_and_listener_statuses(self, session, lb_id, listener_ids):

View File

@ -12,6 +12,7 @@
# License for the specific language governing permissions and limitations
# under the License.
from octavia_lib.api.drivers import exceptions as lib_exceptions
from oslo_config import cfg
from oslo_log import log as logging
from pecan import expose as pecan_expose
@ -20,7 +21,6 @@ from wsme import types as wtypes
from wsmeext import pecan as wsme_pecan
from octavia.api.drivers import driver_factory
from octavia.api.drivers import exceptions as driver_except
from octavia.api.v2.controllers import base
from octavia.api.v2.types import provider as provider_types
from octavia.common import constants
@ -89,7 +89,7 @@ class FlavorCapabilitiesController(base.BaseController):
self.driver = driver_factory.get_driver(self.provider)
try:
metadata_dict = self.driver.get_supported_flavor_metadata()
except driver_except.NotImplementedError as e:
except lib_exceptions.NotImplementedError as e:
LOG.warning('Provider %s get_supported_flavor_metadata() '
'reported: %s', self.provider, e.operator_fault_string)
raise exceptions.ProviderNotImplementedError(
@ -140,7 +140,7 @@ class AvailabilityZoneCapabilitiesController(base.BaseController):
try:
metadata_dict = (
self.driver.get_supported_availability_zone_metadata())
except driver_except.NotImplementedError as e:
except lib_exceptions.NotImplementedError as e:
LOG.warning(
'Provider %s get_supported_availability_zone_metadata() '
'reported: %s', self.provider, e.operator_fault_string)

View File

@ -1191,7 +1191,7 @@ class MarkLBAndListenersActiveInDB(BaseDatabaseTask):
LOG.debug("Mark ACTIVE in DB for load balancer id: %s "
"and updating status for listener ids: %s", loadbalancer.id,
', '.join([l.id for l in listeners]))
', '.join([listener.id for listener in listeners]))
self.loadbalancer_repo.update(db_apis.get_session(),
loadbalancer.id,
provisioning_status=constants.ACTIVE)
@ -1210,7 +1210,8 @@ class MarkLBAndListenersActiveInDB(BaseDatabaseTask):
LOG.warning("Reverting mark load balancer and listeners active in DB "
"for load balancer id %(LB)s and listener ids: %(list)s",
{'LB': loadbalancer.id,
'list': ', '.join([l.id for l in listeners])})
'list': ', '.join([listener.id
for listener in listeners])})
self.task_utils.mark_loadbalancer_prov_status_error(loadbalancer.id)
for listener in listeners:
self.task_utils.mark_listener_prov_status_error(listener.id)

View File

@ -247,9 +247,9 @@ class ControllerWorker(object):
load_balancer = db_listener.load_balancer
listeners = load_balancer.listeners
dict_listeners = []
for l in listeners:
for li in listeners:
dict_listeners.append(
provider_utils.db_listener_to_provider_listener(l).to_dict())
provider_utils.db_listener_to_provider_listener(li).to_dict())
provider_lb = provider_utils.db_loadbalancer_to_provider_loadbalancer(
load_balancer).to_dict()

View File

@ -1308,7 +1308,8 @@ class MarkLBAndListenersActiveInDB(BaseDatabaseTask):
if lb_id:
LOG.debug("Mark ACTIVE in DB for load balancer id: %s "
"and updating status for listener ids: %s", lb_id,
', '.join([l[constants.LISTENER_ID] for l in listeners]))
', '.join([listener[constants.LISTENER_ID]
for listener in listeners]))
self.loadbalancer_repo.update(db_apis.get_session(), lb_id,
provisioning_status=constants.ACTIVE)
for listener in listeners:
@ -1329,7 +1330,8 @@ class MarkLBAndListenersActiveInDB(BaseDatabaseTask):
lb_id = listeners[0][constants.LOADBALANCER_ID]
if lb_id:
lists = ', '.join([l[constants.LISTENER_ID] for l in listeners])
lists = ', '.join([listener[constants.LISTENER_ID]
for listener in listeners])
LOG.warning("Reverting mark load balancer and listeners active in "
"DB for load balancer id %(LB)s and listener ids: "
"%(list)s", {'LB': lb_id,

View File

@ -346,13 +346,13 @@ class Pool(base_models.BASE, base_models.IdMixin, base_models.ProjectMixin,
@property
def listeners(self):
_listeners = self._default_listeners[:]
_l_ids = [l.id for l in _listeners]
_l_ids = [li.id for li in _listeners]
l7_listeners = [p.listener for p in self.l7policies
if len(p.l7rules) > 0 and p.enabled is True]
for l in l7_listeners:
if l.id not in _l_ids:
_listeners.append(l)
_l_ids.append(l.id)
for li in l7_listeners:
if li.id not in _l_ids:
_listeners.append(li)
_l_ids.append(li.id)
return _listeners

View File

@ -150,28 +150,28 @@ class AllowedAddressPairsDriver(neutron_base.BaseNeutronDriver):
security_group_id=sec_grp_id)
updated_ports = []
for l in load_balancer.listeners:
if (l.provisioning_status in [constants.PENDING_DELETE,
constants.DELETED]):
for listener in load_balancer.listeners:
if (listener.provisioning_status in [constants.PENDING_DELETE,
constants.DELETED]):
continue
protocol = constants.PROTOCOL_TCP.lower()
if l.protocol == constants.PROTOCOL_UDP:
if listener.protocol == constants.PROTOCOL_UDP:
protocol = constants.PROTOCOL_UDP.lower()
if l.allowed_cidrs:
for ac in l.allowed_cidrs:
port = (l.protocol_port, protocol, ac.cidr)
if listener.allowed_cidrs:
for ac in listener.allowed_cidrs:
port = (listener.protocol_port, protocol, ac.cidr)
updated_ports.append(port)
else:
port = (l.protocol_port, protocol, None)
port = (listener.protocol_port, protocol, None)
updated_ports.append(port)
# As the peer port will hold the tcp connection for keepalived and
# haproxy session synchronization, so here the security group rule
# should be just related with tcp protocol only.
updated_ports.append(
(l.peer_port, constants.PROTOCOL_TCP.lower(), None))
(listener.peer_port, constants.PROTOCOL_TCP.lower(), None))
# Just going to use port_range_max for now because we can assume that
# port_range_max and min will be the same since this driver is

View File

@ -396,7 +396,8 @@ class TestAmphora(base.BaseAPITest):
links = middle[self.root_tag_links]
self.assertEqual(1, len(objs))
self.assertEqual(2, len(links))
self.assertItemsEqual(['previous', 'next'], [l['rel'] for l in links])
self.assertItemsEqual(['previous', 'next'],
[link['rel'] for link in links])
def test_get_all_fields_filter(self):
amps = self.get(self.AMPHORAE_PATH, params={

View File

@ -513,7 +513,8 @@ class TestHealthMonitor(base.BaseAPITest):
links = middle[self.root_tag_links]
self.assertEqual(1, len(objs))
self.assertEqual(2, len(links))
self.assertItemsEqual(['previous', 'next'], [l['rel'] for l in links])
self.assertItemsEqual(['previous', 'next'],
[link['rel'] for link in links])
def test_get_all_fields_filter(self):
pool1 = self.create_pool(

View File

@ -442,7 +442,8 @@ class TestL7Policy(base.BaseAPITest):
links = middle[self.root_tag_links]
self.assertEqual(1, len(objs))
self.assertEqual(2, len(links))
self.assertItemsEqual(['previous', 'next'], [l['rel'] for l in links])
self.assertItemsEqual(['previous', 'next'],
[link['rel'] for link in links])
def test_get_all_fields_filter(self):
self.create_l7policy(

View File

@ -291,7 +291,8 @@ class TestL7Rule(base.BaseAPITest):
links = middle[self.root_tag_links]
self.assertEqual(1, len(objs))
self.assertEqual(2, len(links))
self.assertItemsEqual(['previous', 'next'], [l['rel'] for l in links])
self.assertItemsEqual(['previous', 'next'],
[link['rel'] for link in links])
def test_get_all_fields_filter(self):
self.create_l7rule(

View File

@ -70,9 +70,9 @@ class TestListener(base.BaseAPITest):
self.set_lb_status(lb1_id)
listeners = self.get(self.LISTENERS_PATH).json.get(self.root_tag_list)
self.assertEqual(3, len(listeners))
listener_id_ports = [(l.get('id'), l.get('protocol_port'),
l.get('tags'))
for l in listeners]
listener_id_ports = [(li.get('id'), li.get('protocol_port'),
li.get('tags'))
for li in listeners]
self.assertIn((listener1.get('id'), listener1.get('protocol_port'),
listener1.get('tags')),
listener_id_ports)
@ -125,8 +125,8 @@ class TestListener(base.BaseAPITest):
self.conf.config(group='api_settings', auth_strategy=auth_strategy)
self.assertEqual(1, len(listeners))
listener_id_ports = [(l.get('id'), l.get('protocol_port'))
for l in listeners]
listener_id_ports = [(li.get('id'), li.get('protocol_port'))
for li in listeners]
self.assertIn((listener3.get('id'), listener3.get('protocol_port')),
listener_id_ports)
@ -172,8 +172,8 @@ class TestListener(base.BaseAPITest):
self.conf.config(group='api_settings', auth_strategy=auth_strategy)
self.assertEqual(3, len(listeners))
listener_id_ports = [(l.get('id'), l.get('protocol_port'))
for l in listeners]
listener_id_ports = [(li.get('id'), li.get('protocol_port'))
for li in listeners]
self.assertIn((listener1.get('id'), listener1.get('protocol_port')),
listener_id_ports)
self.assertIn((listener2.get('id'), listener2.get('protocol_port')),
@ -232,8 +232,8 @@ class TestListener(base.BaseAPITest):
params={'project_id': project1_id}).json.get(self.root_tag_list)
self.assertEqual(2, len(listeners))
listener_id_ports = [(l.get('id'), l.get('protocol_port'))
for l in listeners]
listener_id_ports = [(li.get('id'), li.get('protocol_port'))
for li in listeners]
self.assertIn((listener1.get('id'), listener1.get('protocol_port')),
listener_id_ports)
self.assertIn((listener2.get('id'), listener2.get('protocol_port')),
@ -241,8 +241,8 @@ class TestListener(base.BaseAPITest):
listeners = self.get(
self.LISTENERS_PATH,
params={'project_id': project2_id}).json.get(self.root_tag_list)
listener_id_ports = [(l.get('id'), l.get('protocol_port'))
for l in listeners]
listener_id_ports = [(li.get('id'), li.get('protocol_port'))
for li in listeners]
self.assertEqual(1, len(listeners))
self.assertIn((listener3.get('id'), listener3.get('protocol_port')),
listener_id_ports)
@ -317,7 +317,8 @@ class TestListener(base.BaseAPITest):
links = middle[self.root_tag_links]
self.assertEqual(1, len(objs))
self.assertEqual(2, len(links))
self.assertItemsEqual(['previous', 'next'], [l['rel'] for l in links])
self.assertItemsEqual(['previous', 'next'],
[link['rel'] for link in links])
def test_get_all_fields_filter(self):
self.create_listener(constants.PROTOCOL_HTTP, 80,

View File

@ -16,12 +16,12 @@ import copy
import random
from unittest import mock
from octavia_lib.api.drivers import exceptions as lib_exceptions
from oslo_config import cfg
from oslo_config import fixture as oslo_fixture
from oslo_utils import uuidutils
from sqlalchemy.orm import exc as sa_exception
from octavia.api.drivers import exceptions as provider_exceptions
from octavia.common import constants
import octavia.common.context
from octavia.common import data_models
@ -459,8 +459,7 @@ class TestLoadBalancer(base.BaseAPITest):
"create_vip_port") as mock_provider:
mock_get_network.return_value = network
mock_get_port.return_value = port
mock_provider.side_effect = (provider_exceptions.
NotImplementedError())
mock_provider.side_effect = lib_exceptions.NotImplementedError()
response = self.post(self.LBS_PATH, body)
api_lb = response.json.get(self.root_tag)
self._assert_request_matches_response(lb_json, api_lb)
@ -621,8 +620,7 @@ class TestLoadBalancer(base.BaseAPITest):
mock_get_port.return_value = port
mock_allocate_vip.side_effect = TestNeutronException(
"octavia_msg", "neutron_msg", 409)
mock_provider.side_effect = (provider_exceptions.
NotImplementedError())
mock_provider.side_effect = lib_exceptions.NotImplementedError()
response = self.post(self.LBS_PATH, body, status=409)
# Make sure the faultstring contains the neutron error and not
# the octavia error message
@ -1349,7 +1347,8 @@ class TestLoadBalancer(base.BaseAPITest):
links = middle[self.root_tag_links]
self.assertEqual(1, len(objs))
self.assertEqual(2, len(links))
self.assertItemsEqual(['previous', 'next'], [l['rel'] for l in links])
self.assertItemsEqual(['previous', 'next'],
[link['rel'] for link in links])
def test_get_all_fields_filter(self):
self.create_load_balancer(uuidutils.generate_uuid(),

View File

@ -294,7 +294,8 @@ class TestMember(base.BaseAPITest):
links = middle[self.root_tag_links]
self.assertEqual(1, len(objs))
self.assertEqual(2, len(links))
self.assertItemsEqual(['previous', 'next'], [l['rel'] for l in links])
self.assertItemsEqual(['previous', 'next'],
[link['rel'] for link in links])
def test_get_all_fields_filter(self):
self.create_member(self.pool_id, '192.0.2.1', 80, name='member1')

View File

@ -476,7 +476,8 @@ class TestPool(base.BaseAPITest):
links = middle[self.root_tag_links]
self.assertEqual(1, len(objs))
self.assertEqual(2, len(links))
self.assertItemsEqual(['previous', 'next'], [l['rel'] for l in links])
self.assertItemsEqual(['previous', 'next'],
[link['rel'] for link in links])
def test_get_all_fields_filter(self):
self.create_pool(

View File

@ -15,11 +15,11 @@
from unittest import mock
from octavia_lib.api.drivers import exceptions as lib_exceptions
from oslo_config import cfg
from oslo_config import fixture as oslo_fixture
from oslo_utils import uuidutils
from octavia.api.drivers import exceptions
from octavia.common import constants
import octavia.common.context
from octavia.tests.functional.api.v2 import base
@ -90,7 +90,7 @@ class TestFlavorCapabilities(base.BaseAPITest):
@mock.patch('octavia.api.drivers.noop_driver.driver.NoopProviderDriver.'
'get_supported_flavor_metadata')
def test_not_implemented(self, mock_get_metadata):
mock_get_metadata.side_effect = exceptions.NotImplementedError()
mock_get_metadata.side_effect = lib_exceptions.NotImplementedError()
self.get(self.FLAVOR_CAPABILITIES_PATH.format(provider='noop_driver'),
status=501)
@ -214,7 +214,7 @@ class TestAvailabilityZoneCapabilities(base.BaseAPITest):
@mock.patch('octavia.api.drivers.noop_driver.driver.NoopProviderDriver.'
'get_supported_availability_zone_metadata')
def test_not_implemented(self, mock_get_metadata):
mock_get_metadata.side_effect = exceptions.NotImplementedError()
mock_get_metadata.side_effect = lib_exceptions.NotImplementedError()
self.get(self.AVAILABILITY_ZONE_CAPABILITIES_PATH.format(
provider='noop_driver'), status=501)

View File

@ -241,8 +241,8 @@ class TestQuotas(base.BaseAPITest):
).get(self.root_tag)
quotas = self.get(self.QUOTAS_PATH).json.get(self.root_tag_list)
self.assertEqual(3, len(quotas))
quota_lb_member_quotas = [(l.get('load_balancer'), l.get('member'))
for l in quotas]
quota_lb_member_quotas = [(lb.get('load_balancer'), lb.get('member'))
for lb in quotas]
self.assertIn((quota1.get('load_balancer'), quota1.get('member')),
quota_lb_member_quotas)
self.assertIn((quota2.get('load_balancer'), quota2.get('member')),
@ -287,8 +287,8 @@ class TestQuotas(base.BaseAPITest):
quotas = quotas.json.get(self.root_tag_list)
self.conf.config(group='api_settings', auth_strategy=auth_strategy)
self.assertEqual(3, len(quotas))
quota_lb_member_quotas = [(l.get('load_balancer'), l.get('member'))
for l in quotas]
quota_lb_member_quotas = [(lb.get('load_balancer'), lb.get('member'))
for lb in quotas]
self.assertIn((quota1.get('load_balancer'), quota1.get('member')),
quota_lb_member_quotas)
self.assertIn((quota2.get('load_balancer'), quota2.get('member')),
@ -333,8 +333,8 @@ class TestQuotas(base.BaseAPITest):
quotas = quotas.json.get(self.root_tag_list)
self.conf.config(group='api_settings', auth_strategy=auth_strategy)
self.assertEqual(3, len(quotas))
quota_lb_member_quotas = [(l.get('load_balancer'), l.get('member'))
for l in quotas]
quota_lb_member_quotas = [(lb.get('load_balancer'), lb.get('member'))
for lb in quotas]
self.assertIn((quota1.get('load_balancer'), quota1.get('member')),
quota_lb_member_quotas)
self.assertIn((quota2.get('load_balancer'), quota2.get('member')),
@ -382,8 +382,8 @@ class TestQuotas(base.BaseAPITest):
self.conf.config(group='api_settings', auth_strategy=auth_strategy)
self.assertEqual(1, len(quotas))
quota_lb_member_quotas = [(l.get('load_balancer'), l.get('member'))
for l in quotas]
quota_lb_member_quotas = [(lb.get('load_balancer'), lb.get('member'))
for lb in quotas]
self.assertIn((quota3.get('load_balancer'), quota3.get('member')),
quota_lb_member_quotas)
@ -427,8 +427,8 @@ class TestQuotas(base.BaseAPITest):
self.conf.config(group='api_settings', auth_strategy=auth_strategy)
self.assertEqual(1, len(quotas))
quota_lb_member_quotas = [(l.get('load_balancer'), l.get('member'))
for l in quotas]
quota_lb_member_quotas = [(lb.get('load_balancer'), lb.get('member'))
for lb in quotas]
self.assertIn((quota3.get('load_balancer'), quota3.get('member')),
quota_lb_member_quotas)
@ -601,10 +601,10 @@ class TestQuotas(base.BaseAPITest):
self.assertEqual(3, len(quotas_desc))
self.assertEqual(3, len(quotas_asc))
quota_lb_member_desc = [(l.get('load_balancer'), l.get('member'))
for l in quotas_desc]
quota_lb_member_asc = [(l.get('load_balancer'), l.get('member'))
for l in quotas_asc]
quota_lb_member_desc = [(lb.get('load_balancer'), lb.get('member'))
for lb in quotas_desc]
quota_lb_member_asc = [(lb.get('load_balancer'), lb.get('member'))
for lb in quotas_asc]
self.assertEqual(quota_lb_member_asc,
list(reversed(quota_lb_member_desc)))
@ -650,7 +650,8 @@ class TestQuotas(base.BaseAPITest):
links = middle[self.root_tag_links]
self.assertEqual(1, len(objs))
self.assertEqual(2, len(links))
self.assertItemsEqual(['previous', 'next'], [l['rel'] for l in links])
self.assertItemsEqual(['previous', 'next'],
[link['rel'] for link in links])
def test_get_default_quotas(self):
response = self.get(self.QUOTA_DEFAULT_PATH.format(

View File

@ -265,7 +265,7 @@ class PoolModelTest(base.OctaviaDBTestBase, ModelTestMixin):
self.assertIsNotNone(new_pool.listeners)
self.assertIsInstance(new_pool.listeners, list)
self.assertIsInstance(new_pool.listeners[0], models.Listener)
self.assertIn(listener.id, [l.id for l in new_pool.listeners])
self.assertIn(listener.id, [li.id for li in new_pool.listeners])
class MemberModelTest(base.OctaviaDBTestBase, ModelTestMixin):
@ -787,7 +787,7 @@ class L7PolicyModelTest(base.OctaviaDBTestBase, ModelTestMixin):
new_listener = self.session.query(models.Listener).filter_by(
id=self.listener.id).first()
self.assertIsInstance(new_pool.listeners, list)
self.assertIn(new_listener.id, [l.id for l in new_pool.listeners])
self.assertIn(new_listener.id, [li.id for li in new_pool.listeners])
self.assertIsInstance(new_listener.pools, list)
self.assertIn(new_pool.id, [p.id for p in new_listener.pools])
@ -802,7 +802,7 @@ class L7PolicyModelTest(base.OctaviaDBTestBase, ModelTestMixin):
new_listener = self.session.query(models.Listener).filter_by(
id=self.listener.id).first()
self.assertIsInstance(new_pool.listeners, list)
self.assertNotIn(new_listener.id, [l.id for l in new_pool.listeners])
self.assertNotIn(new_listener.id, [li.id for li in new_pool.listeners])
self.assertIsInstance(new_listener.pools, list)
self.assertNotIn(new_pool.id, [p.id for p in new_listener.pools])
@ -818,7 +818,7 @@ class L7PolicyModelTest(base.OctaviaDBTestBase, ModelTestMixin):
new_listener = self.session.query(models.Listener).filter_by(
id=self.listener.id).first()
self.assertIsInstance(new_pool.listeners, list)
self.assertNotIn(new_listener.id, [l.id for l in new_pool.listeners])
self.assertNotIn(new_listener.id, [li.id for li in new_pool.listeners])
self.assertIsInstance(new_listener.pools, list)
self.assertNotIn(new_pool.id, [p.id for p in new_listener.pools])

View File

@ -18,7 +18,6 @@ from octavia_lib.api.drivers import data_models as driver_dm
from octavia_lib.api.drivers import exceptions as lib_exceptions
from octavia_lib.common import constants as lib_constants
from octavia.api.drivers import exceptions as driver_exceptions
from octavia.api.drivers import utils
from octavia.common import constants
from octavia.common import data_models
@ -40,21 +39,13 @@ class TestUtils(base.TestCase):
"arg1", foo="arg2")
mock_driver_method.assert_called_with("arg1", foo="arg2")
# Test driver raising different types of DriverError
mock_driver_method.side_effect = driver_exceptions.DriverError
self.assertRaises(exceptions.ProviderDriverError,
utils.call_provider, "provider_name",
mock_driver_method)
# Test driver raising DriverError
mock_driver_method.side_effect = lib_exceptions.DriverError
self.assertRaises(exceptions.ProviderDriverError,
utils.call_provider, "provider_name",
mock_driver_method)
# Test driver raising different types of NotImplementedError
mock_driver_method.side_effect = driver_exceptions.NotImplementedError
self.assertRaises(exceptions.ProviderNotImplementedError,
utils.call_provider, "provider_name",
mock_driver_method)
mock_driver_method.side_effect = NotImplementedError
self.assertRaises(exceptions.ProviderNotImplementedError,
utils.call_provider, "provider_name",
@ -64,19 +55,14 @@ class TestUtils(base.TestCase):
utils.call_provider, "provider_name",
mock_driver_method)
# Test driver raising different types of UnsupportedOptionError
mock_driver_method.side_effect = (
driver_exceptions.UnsupportedOptionError)
self.assertRaises(exceptions.ProviderUnsupportedOptionError,
utils.call_provider, "provider_name",
mock_driver_method)
# Test driver raising UnsupportedOptionError
mock_driver_method.side_effect = (
lib_exceptions.UnsupportedOptionError)
self.assertRaises(exceptions.ProviderUnsupportedOptionError,
utils.call_provider, "provider_name",
mock_driver_method)
# Test driver raising DriverError
# Test driver raising ProviderDriverError
mock_driver_method.side_effect = Exception
self.assertRaises(exceptions.ProviderDriverError,
utils.call_provider, "provider_name",