pyupgrade changes for Python3.8+ (6)

Result of running

$ pyupgrade --py38-plus $(git ls-files | grep ".py$")

This was inspired by Nova [1]

Fixed PEP8 errors introduced by pyupgrade by running:

$ autopep8 --select=E127,E128,E501 --max-line-length 79 -r \
  --in-place octavia

and manual updates.

[1]: https://review.opendev.org/c/openstack/nova/+/896986

Change-Id: I3924de7cd8e2d242006ec4f272ece0276053e2ed
This commit is contained in:
Tom Weininger 2024-01-15 12:56:46 +01:00
parent a0360f9719
commit ba3bbfa866
57 changed files with 349 additions and 353 deletions

View File

@ -20,7 +20,7 @@ import abc
# a single amphora, or multiple amphora among other options.
class DistributorDriver(object, metaclass=abc.ABCMeta):
class DistributorDriver(metaclass=abc.ABCMeta):
@abc.abstractmethod
def get_create_distributor_subflow(self):
"""Get a subflow to create a distributor

View File

@ -38,7 +38,7 @@ class NoopProvidesRequiresTask(task.Task):
return self.provides_dict.values()
class NoopManager(object):
class NoopManager:
def __init__(self):
super().__init__()

View File

@ -22,7 +22,7 @@ from openstack.network.v2.subnet import Subnet
from octavia.common import constants
class MockNovaInterface(object):
class MockNovaInterface:
net_id = None
port_id = None
fixed_ips = []

View File

@ -35,10 +35,10 @@ def generate_load_balancer(vip=None, amphorae=None,
additional_vips = additional_vips or []
global LB_SEED
LB_SEED += 1
lb = data_models.LoadBalancer(id='lb{0}-id'.format(LB_SEED),
lb = data_models.LoadBalancer(id=f'lb{LB_SEED}-id',
project_id='2',
name='lb{0}'.format(LB_SEED),
description='lb{0}'.format(LB_SEED),
name=f'lb{LB_SEED}',
description=f'lb{LB_SEED}',
vip=vip,
topology=topology,
amphorae=amphorae)
@ -68,9 +68,9 @@ VIP_SEED = 0
def generate_vip(load_balancer=None):
global VIP_SEED
VIP_SEED += 1
vip = data_models.Vip(ip_address='10.0.0.{0}'.format(VIP_SEED),
vip = data_models.Vip(ip_address=f'10.0.0.{VIP_SEED}',
subnet_id=ut_constants.MOCK_VIP_SUBNET_ID,
port_id='vrrp-port-{0}'.format(VIP_SEED),
port_id=f'vrrp-port-{VIP_SEED}',
load_balancer=load_balancer)
if load_balancer:
vip.load_balancer_id = load_balancer.id
@ -83,12 +83,12 @@ AMP_SEED = 0
def generate_amphora(load_balancer=None):
global AMP_SEED
AMP_SEED += 1
amp = data_models.Amphora(id='amp{0}-id'.format(AMP_SEED),
compute_id='amp{0}-compute-id'.format(AMP_SEED),
amp = data_models.Amphora(id=f'amp{AMP_SEED}-id',
compute_id=f'amp{AMP_SEED}-compute-id',
status='ACTIVE',
lb_network_ip='99.99.99.{0}'.format(AMP_SEED),
vrrp_ip='55.55.55.{0}'.format(AMP_SEED),
vrrp_port_id='vrrp_port-{0}-id'.format(AMP_SEED),
lb_network_ip=f'99.99.99.{AMP_SEED}',
vrrp_ip=f'55.55.55.{AMP_SEED}',
vrrp_port_id=f'vrrp_port-{AMP_SEED}-id',
load_balancer=load_balancer)
if load_balancer:
amp.load_balancer_id = load_balancer.id

View File

@ -23,7 +23,7 @@ from octavia.common import constants
from octavia.common import data_models
class SampleDriverDataModels(object):
class SampleDriverDataModels:
def __init__(self):
self.project_id = uuidutils.generate_uuid()

View File

@ -48,7 +48,7 @@ class OpenFixture(fixtures.Fixture):
def assert_address_lists_equal(obj, l1, l2):
obj.assertEqual(len(l1), len(l2),
"Address lists don't match: {} vs {}".format(l1, l2))
f"Address lists don't match: {l1} vs {l2}")
for a1, a2 in zip(l1, l2):
if consts.ADDRESS in a1 and consts.ADDRESS in a2:
obj.assertEqual(
@ -62,7 +62,7 @@ def assert_address_lists_equal(obj, l1, l2):
def assert_route_lists_equal(obj, l1, l2):
obj.assertEqual(len(l1), len(l2),
"Routes don't match: {} vs {}".format(l1, l2))
f"Routes don't match: {l1} vs {l2}")
for r1, r2 in zip(l1, l2):
obj.assertEqual(
ipaddress.ip_network(r1[consts.DST]),

View File

@ -343,7 +343,7 @@ class KeepalivedLvsTestCase(base.TestCase):
res = self.test_keepalivedlvs.delete_lvs_listener(self.FAKE_ID)
cmd1 = ("/usr/sbin/service "
"octavia-keepalivedlvs-{0} stop".format(self.FAKE_ID))
"octavia-keepalivedlvs-{} stop".format(self.FAKE_ID))
cmd2 = ("systemctl disable "
"octavia-keepalivedlvs-{list}".format(list=self.FAKE_ID))
calls = [
@ -362,7 +362,7 @@ class KeepalivedLvsTestCase(base.TestCase):
mock.call(
json=dict(message='UDP Listener Not Found',
details="No UDP listener with UUID: "
"{0}".format(self.FAKE_ID)), status=404),
"{}".format(self.FAKE_ID)), status=404),
mock.call(json={'message': 'OK'})
]
m_webob.Response.assert_has_calls(calls)

View File

@ -230,7 +230,7 @@ class TestServerTestCase(base.TestCase):
data='test')
self.assertEqual(400, rv.status_code)
self.assertEqual(
{'message': 'Invalid request', u'details': u'random error'},
{'message': 'Invalid request', 'details': 'random error'},
jsonutils.loads(rv.data.decode('utf-8')))
mode = stat.S_IRUSR | stat.S_IWUSR
mock_open.assert_called_with(file_name, flags, mode)
@ -497,7 +497,7 @@ class TestServerTestCase(base.TestCase):
rv = self.centos_app.delete('/' + api_server.VERSION +
'/listeners/123')
self.assertEqual(200, rv.status_code)
self.assertEqual({u'message': u'OK'},
self.assertEqual({'message': 'OK'},
jsonutils.loads(rv.data.decode('utf-8')))
mock_rmtree.assert_called_with('/var/lib/octavia/123')
@ -524,7 +524,7 @@ class TestServerTestCase(base.TestCase):
rv = self.centos_app.delete('/' + api_server.VERSION +
'/listeners/123')
self.assertEqual(200, rv.status_code)
self.assertEqual({u'message': u'OK'},
self.assertEqual({'message': 'OK'},
jsonutils.loads(rv.data.decode('utf-8')))
mock_rmtree.assert_called_with('/var/lib/octavia/123')
@ -551,7 +551,7 @@ class TestServerTestCase(base.TestCase):
rv = self.centos_app.delete('/' + api_server.VERSION +
'/listeners/123')
self.assertEqual(200, rv.status_code)
self.assertEqual({u'message': u'OK'},
self.assertEqual({'message': 'OK'},
jsonutils.loads(rv.data.decode('utf-8')))
if init_system == consts.INIT_SYSTEMD:
@ -575,7 +575,7 @@ class TestServerTestCase(base.TestCase):
rv = self.centos_app.delete('/' + api_server.VERSION +
'/listeners/123')
self.assertEqual(200, rv.status_code)
self.assertEqual({u'message': u'OK'},
self.assertEqual({'message': 'OK'},
jsonutils.loads(rv.data.decode('utf-8')))
if init_system == consts.INIT_SYSTEMD:
@ -600,7 +600,7 @@ class TestServerTestCase(base.TestCase):
rv = self.centos_app.delete('/' + api_server.VERSION +
'/listeners/123')
self.assertEqual(200, rv.status_code)
self.assertEqual({u'message': u'OK'},
self.assertEqual({'message': 'OK'},
jsonutils.loads(rv.data.decode('utf-8')))
mock_pid.assert_called_once_with('123')
mock_check_output.assert_any_call(
@ -630,7 +630,7 @@ class TestServerTestCase(base.TestCase):
rv = self.centos_app.delete('/' + api_server.VERSION +
'/listeners/123')
self.assertEqual(200, rv.status_code)
self.assertEqual({u'message': u'OK'},
self.assertEqual({'message': 'OK'},
jsonutils.loads(rv.data.decode('utf-8')))
mock_pid.assert_called_with('123')
mock_check_output.assert_any_call(
@ -1105,7 +1105,7 @@ class TestServerTestCase(base.TestCase):
mock_fdopen.assert_any_call(123, 'r+')
expected_dict = {
consts.NAME: "eth{}".format(test_int_num),
consts.NAME: f"eth{test_int_num}",
consts.ADDRESSES: [
{
consts.DHCP: True,
@ -1189,7 +1189,7 @@ class TestServerTestCase(base.TestCase):
mock_fdopen.assert_any_call(123, 'r+')
expected_dict = {
consts.NAME: "eth{}".format(test_int_num),
consts.NAME: f"eth{test_int_num}",
consts.MTU: 1450,
consts.ADDRESSES: [
{consts.ADDRESS: '10.0.0.5', consts.PREFIXLEN: 24}
@ -1259,7 +1259,7 @@ class TestServerTestCase(base.TestCase):
mock_fdopen.assert_any_call(123, 'r+')
expected_dict = {
consts.NAME: "eth{}".format(test_int_num),
consts.NAME: f"eth{test_int_num}",
consts.MTU: 1450,
consts.ADDRESSES: [
{consts.ADDRESS: '2001:0db8::2',
@ -2977,8 +2977,8 @@ class TestServerTestCase(base.TestCase):
'haproxy_count': haproxy_count,
'haproxy_version': '9.9.99-9',
'hostname': 'test-host',
'ipvsadm_version': u'2.2.22-2',
'keepalived_version': u'1.1.11-1',
'ipvsadm_version': '2.2.22-2',
'keepalived_version': '1.1.11-1',
'listeners': [listener_id],
'load': [load_1min, load_5min, load_15min],
'memory': {'buffers': Buffers,

View File

@ -54,7 +54,7 @@ class DriverAgentTest(base.OctaviaDBTestBase):
uuidutils.generate_uuid())
sqlite_db_file = '/tmp/octavia-{}.sqlite.db'.format(
uuidutils.generate_uuid())
sqlite_db_connection = 'sqlite:///{}'.format(sqlite_db_file)
sqlite_db_connection = f'sqlite:///{sqlite_db_file}'
# Note that because the driver agent is a multi-process
# agent we must use a sqlite file rather than an

View File

@ -170,7 +170,7 @@ class BaseAPITest(base_db_test.OctaviaDBTestBase):
full_path = self._get_full_path(path)
param_string = ""
for k, v in params.items():
param_string += "{key}={value}&".format(key=k, value=v)
param_string += f"{k}={v}&"
if param_string:
full_path = "{path}?{params}".format(
path=full_path, params=param_string.rstrip("&"))

View File

@ -500,17 +500,17 @@ class TestAmphora(base.BaseAPITest):
amps = self.get(self.AMPHORAE_PATH, params={
'fields': ['id', 'role']}).json
for amp in amps['amphorae']:
self.assertIn(u'id', amp)
self.assertIn(u'role', amp)
self.assertNotIn(u'ha_port_id', amp)
self.assertIn('id', amp)
self.assertIn('role', amp)
self.assertNotIn('ha_port_id', amp)
def test_get_one_fields_filter(self):
amp = self.get(
self.AMPHORA_PATH.format(amphora_id=self.amp_id),
params={'fields': ['id', 'role']}).json.get(self.root_tag)
self.assertIn(u'id', amp)
self.assertIn(u'role', amp)
self.assertNotIn(u'ha_port_id', amp)
self.assertIn('id', amp)
self.assertIn('role', amp)
self.assertNotIn('ha_port_id', amp)
def test_get_all_filter(self):
self._create_additional_amp()

View File

@ -191,9 +191,9 @@ class TestAvailabilityZoneProfiles(base.BaseAPITest):
'fields': ['id', constants.PROVIDER_NAME]}
).json.get(self.root_tag)
self.assertEqual(azp.get('id'), response.get('id'))
self.assertIn(u'id', response)
self.assertIn('id', response)
self.assertIn(constants.PROVIDER_NAME, response)
self.assertNotIn(u'name', response)
self.assertNotIn('name', response)
self.assertNotIn(constants.AVAILABILITY_ZONE_DATA, response)
def test_get_authorized(self):
@ -243,15 +243,15 @@ class TestAvailabilityZoneProfiles(base.BaseAPITest):
def test_get_all(self):
fp1 = self.create_availability_zone_profile(
'test1', 'noop_driver', '{"compute_zone": "my_az_1"}')
ref_fp_1 = {u'availability_zone_data': u'{"compute_zone": "my_az_1"}',
u'id': fp1.get('id'), u'name': u'test1',
constants.PROVIDER_NAME: u'noop_driver'}
ref_fp_1 = {'availability_zone_data': '{"compute_zone": "my_az_1"}',
'id': fp1.get('id'), 'name': 'test1',
constants.PROVIDER_NAME: 'noop_driver'}
self.assertTrue(uuidutils.is_uuid_like(fp1.get('id')))
fp2 = self.create_availability_zone_profile(
'test2', 'noop_driver-alt', '{"compute_zone": "my_az_1"}')
ref_fp_2 = {u'availability_zone_data': u'{"compute_zone": "my_az_1"}',
u'id': fp2.get('id'), u'name': u'test2',
constants.PROVIDER_NAME: u'noop_driver-alt'}
ref_fp_2 = {'availability_zone_data': '{"compute_zone": "my_az_1"}',
'id': fp2.get('id'), 'name': 'test2',
constants.PROVIDER_NAME: 'noop_driver-alt'}
self.assertTrue(uuidutils.is_uuid_like(fp2.get('id')))
response = self.get(self.AZPS_PATH)
@ -273,8 +273,8 @@ class TestAvailabilityZoneProfiles(base.BaseAPITest):
api_list = response.json.get(self.root_tag_list)
self.assertEqual(2, len(api_list))
for profile in api_list:
self.assertIn(u'id', profile)
self.assertIn(u'name', profile)
self.assertIn('id', profile)
self.assertIn('name', profile)
self.assertNotIn(constants.PROVIDER_NAME, profile)
self.assertNotIn(constants.AVAILABILITY_ZONE_DATA, profile)

View File

@ -186,15 +186,15 @@ class TestAvailabilityZones(base.BaseAPITest):
self.assertEqual(az.get('name'), response.get('name'))
self.assertEqual(self.azp.get('id'),
response.get('availability_zone_profile_id'))
self.assertIn(u'availability_zone_profile_id', response)
self.assertNotIn(u'description', response)
self.assertNotIn(u'enabled', response)
self.assertIn('availability_zone_profile_id', response)
self.assertNotIn('description', response)
self.assertNotIn('enabled', response)
def test_get_one_deleted_name(self):
response = self.get(
self.AZ_PATH.format(az_name=constants.NIL_UUID), status=404)
self.assertEqual(
'Availability Zone {} not found.'.format(constants.NIL_UUID),
f'Availability Zone {constants.NIL_UUID} not found.',
response.json.get('faultstring'))
def test_get_authorized(self):
@ -249,15 +249,15 @@ class TestAvailabilityZones(base.BaseAPITest):
self.create_availability_zone(
'name1', 'description', self.azp.get('id'), True)
ref_az_1 = {
u'description': u'description', u'enabled': True,
u'availability_zone_profile_id': self.azp.get('id'),
u'name': u'name1'}
'description': 'description', 'enabled': True,
'availability_zone_profile_id': self.azp.get('id'),
'name': 'name1'}
self.create_availability_zone(
'name2', 'description', self.azp.get('id'), True)
ref_az_2 = {
u'description': u'description', u'enabled': True,
u'availability_zone_profile_id': self.azp.get('id'),
u'name': u'name2'}
'description': 'description', 'enabled': True,
'availability_zone_profile_id': self.azp.get('id'),
'name': 'name2'}
response = self.get(self.AZS_PATH)
api_list = response.json.get(self.root_tag_list)
self.assertEqual(2, len(api_list))
@ -274,10 +274,10 @@ class TestAvailabilityZones(base.BaseAPITest):
api_list = response.json.get(self.root_tag_list)
self.assertEqual(2, len(api_list))
for az in api_list:
self.assertIn(u'name', az)
self.assertNotIn(u'availability_zone_profile_id', az)
self.assertNotIn(u'description', az)
self.assertNotIn(u'enabled', az)
self.assertIn('name', az)
self.assertNotIn('availability_zone_profile_id', az)
self.assertNotIn('description', az)
self.assertNotIn('enabled', az)
def test_get_all_authorized(self):
self.create_availability_zone(
@ -355,7 +355,7 @@ class TestAvailabilityZones(base.BaseAPITest):
self.AZ_PATH.format(az_name=constants.NIL_UUID), body,
status=404)
self.assertEqual(
'Availability Zone {} not found.'.format(constants.NIL_UUID),
f'Availability Zone {constants.NIL_UUID} not found.',
response.json.get('faultstring'))
def test_update_none(self):
@ -505,7 +505,7 @@ class TestAvailabilityZones(base.BaseAPITest):
response = self.delete(
self.AZ_PATH.format(az_name=constants.NIL_UUID), status=404)
self.assertEqual(
'Availability Zone {} not found.'.format(constants.NIL_UUID),
f'Availability Zone {constants.NIL_UUID} not found.',
response.json.get('faultstring'))
def test_delete_authorized(self):

View File

@ -189,9 +189,9 @@ class TestFlavorProfiles(base.BaseAPITest):
'fields': ['id', constants.PROVIDER_NAME]}
).json.get(self.root_tag)
self.assertEqual(fp.get('id'), response.get('id'))
self.assertIn(u'id', response)
self.assertIn('id', response)
self.assertIn(constants.PROVIDER_NAME, response)
self.assertNotIn(u'name', response)
self.assertNotIn('name', response)
self.assertNotIn(constants.FLAVOR_DATA, response)
def test_get_authorized(self):
@ -241,15 +241,15 @@ class TestFlavorProfiles(base.BaseAPITest):
def test_get_all(self):
fp1 = self.create_flavor_profile('test1', 'noop_driver',
'{"image": "ubuntu"}')
ref_fp_1 = {u'flavor_data': u'{"image": "ubuntu"}',
u'id': fp1.get('id'), u'name': u'test1',
constants.PROVIDER_NAME: u'noop_driver'}
ref_fp_1 = {'flavor_data': '{"image": "ubuntu"}',
'id': fp1.get('id'), 'name': 'test1',
constants.PROVIDER_NAME: 'noop_driver'}
self.assertTrue(uuidutils.is_uuid_like(fp1.get('id')))
fp2 = self.create_flavor_profile('test2', 'noop_driver-alt',
'{"image": "ubuntu"}')
ref_fp_2 = {u'flavor_data': u'{"image": "ubuntu"}',
u'id': fp2.get('id'), u'name': u'test2',
constants.PROVIDER_NAME: u'noop_driver-alt'}
ref_fp_2 = {'flavor_data': '{"image": "ubuntu"}',
'id': fp2.get('id'), 'name': 'test2',
constants.PROVIDER_NAME: 'noop_driver-alt'}
self.assertTrue(uuidutils.is_uuid_like(fp2.get('id')))
response = self.get(self.FPS_PATH)
@ -271,8 +271,8 @@ class TestFlavorProfiles(base.BaseAPITest):
api_list = response.json.get(self.root_tag_list)
self.assertEqual(2, len(api_list))
for profile in api_list:
self.assertIn(u'id', profile)
self.assertIn(u'name', profile)
self.assertIn('id', profile)
self.assertIn('name', profile)
self.assertNotIn(constants.PROVIDER_NAME, profile)
self.assertNotIn(constants.FLAVOR_DATA, profile)

View File

@ -184,16 +184,16 @@ class TestFlavors(base.BaseAPITest):
'fields': ['id', 'flavor_profile_id']}).json.get(self.root_tag)
self.assertEqual(flavor.get('id'), response.get('id'))
self.assertEqual(self.fp.get('id'), response.get('flavor_profile_id'))
self.assertIn(u'id', response)
self.assertIn(u'flavor_profile_id', response)
self.assertNotIn(u'name', response)
self.assertNotIn(u'description', response)
self.assertNotIn(u'enabled', response)
self.assertIn('id', response)
self.assertIn('flavor_profile_id', response)
self.assertNotIn('name', response)
self.assertNotIn('description', response)
self.assertNotIn('enabled', response)
def test_get_one_deleted_id(self):
response = self.get(
self.FLAVOR_PATH.format(flavor_id=constants.NIL_UUID), status=404)
self.assertEqual('Flavor {} not found.'.format(constants.NIL_UUID),
self.assertEqual(f'Flavor {constants.NIL_UUID} not found.',
response.json.get('faultstring'))
def test_get_authorized(self):
@ -251,18 +251,18 @@ class TestFlavors(base.BaseAPITest):
True)
self.assertTrue(uuidutils.is_uuid_like(flavor1.get('id')))
ref_flavor_1 = {
u'description': u'description', u'enabled': True,
u'flavor_profile_id': self.fp.get('id'),
u'id': flavor1.get('id'),
u'name': u'name1'}
'description': 'description', 'enabled': True,
'flavor_profile_id': self.fp.get('id'),
'id': flavor1.get('id'),
'name': 'name1'}
flavor2 = self.create_flavor('name2', 'description', self.fp.get('id'),
True)
self.assertTrue(uuidutils.is_uuid_like(flavor2.get('id')))
ref_flavor_2 = {
u'description': u'description', u'enabled': True,
u'flavor_profile_id': self.fp.get('id'),
u'id': flavor2.get('id'),
u'name': u'name2'}
'description': 'description', 'enabled': True,
'flavor_profile_id': self.fp.get('id'),
'id': flavor2.get('id'),
'name': 'name2'}
response = self.get(self.FLAVORS_PATH)
api_list = response.json.get(self.root_tag_list)
self.assertEqual(2, len(api_list))
@ -281,11 +281,11 @@ class TestFlavors(base.BaseAPITest):
api_list = response.json.get(self.root_tag_list)
self.assertEqual(2, len(api_list))
for flavor in api_list:
self.assertIn(u'id', flavor)
self.assertIn(u'name', flavor)
self.assertNotIn(u'flavor_profile_id', flavor)
self.assertNotIn(u'description', flavor)
self.assertNotIn(u'enabled', flavor)
self.assertIn('id', flavor)
self.assertIn('name', flavor)
self.assertNotIn('flavor_profile_id', flavor)
self.assertNotIn('description', flavor)
self.assertNotIn('enabled', flavor)
def test_get_all_authorized(self):
flavor1 = self.create_flavor('name1', 'description', self.fp.get('id'),
@ -368,7 +368,7 @@ class TestFlavors(base.BaseAPITest):
response = self.put(
self.FLAVOR_PATH.format(flavor_id=constants.NIL_UUID), body,
status=404)
self.assertEqual('Flavor {} not found.'.format(constants.NIL_UUID),
self.assertEqual(f'Flavor {constants.NIL_UUID} not found.',
response.json.get('faultstring'))
def test_update_none(self):
@ -522,7 +522,7 @@ class TestFlavors(base.BaseAPITest):
def test_delete_deleted_id(self):
response = self.delete(
self.FLAVOR_PATH.format(flavor_id=constants.NIL_UUID), status=404)
self.assertEqual('Flavor {} not found.'.format(constants.NIL_UUID),
self.assertEqual(f'Flavor {constants.NIL_UUID} not found.',
response.json.get('faultstring'))
def test_delete_authorized(self):

View File

@ -688,9 +688,9 @@ class TestHealthMonitor(base.BaseAPITest):
hms = self.get(self.HMS_PATH, params={
'fields': ['id', 'project_id']}).json
for hm in hms['healthmonitors']:
self.assertIn(u'id', hm)
self.assertIn(u'project_id', hm)
self.assertNotIn(u'description', hm)
self.assertIn('id', hm)
self.assertIn('project_id', hm)
self.assertNotIn('description', hm)
def test_get_one_fields_filter(self):
pool1 = self.create_pool(
@ -709,9 +709,9 @@ class TestHealthMonitor(base.BaseAPITest):
hm = self.get(
self.HM_PATH.format(healthmonitor_id=hm1.get('id')),
params={'fields': ['id', 'project_id']}).json.get(self.root_tag)
self.assertIn(u'id', hm)
self.assertIn(u'project_id', hm)
self.assertNotIn(u'description', hm)
self.assertIn('id', hm)
self.assertIn('project_id', hm)
self.assertNotIn('description', hm)
def test_get_all_filter(self):
pool1 = self.create_pool(

View File

@ -523,9 +523,9 @@ class TestL7Policy(base.BaseAPITest):
l7pos = self.get(self.L7POLICIES_PATH, params={
'fields': ['id', 'project_id']}).json
for l7po in l7pos['l7policies']:
self.assertIn(u'id', l7po)
self.assertIn(u'project_id', l7po)
self.assertNotIn(u'description', l7po)
self.assertIn('id', l7po)
self.assertIn('project_id', l7po)
self.assertNotIn('description', l7po)
def test_get_one_fields_filter(self):
l7p1 = self.create_l7policy(
@ -536,9 +536,9 @@ class TestL7Policy(base.BaseAPITest):
l7po = self.get(
self.L7POLICY_PATH.format(l7policy_id=l7p1.get('id')),
params={'fields': ['id', 'project_id']}).json.get(self.root_tag)
self.assertIn(u'id', l7po)
self.assertIn(u'project_id', l7po)
self.assertNotIn(u'description', l7po)
self.assertIn('id', l7po)
self.assertIn('project_id', l7po)
self.assertNotIn('description', l7po)
def test_get_all_filter(self):
policy1 = self.create_l7policy(

View File

@ -356,9 +356,9 @@ class TestL7Rule(base.BaseAPITest):
l7rus = self.get(self.l7rules_path, params={
'fields': ['id', 'compare_type']}).json
for l7ru in l7rus['rules']:
self.assertIn(u'id', l7ru)
self.assertIn(u'compare_type', l7ru)
self.assertNotIn(u'project_id', l7ru)
self.assertIn('id', l7ru)
self.assertIn('compare_type', l7ru)
self.assertNotIn('project_id', l7ru)
def test_get_one_fields_filter(self):
l7r1 = self.create_l7rule(
@ -370,9 +370,9 @@ class TestL7Rule(base.BaseAPITest):
l7ru = self.get(
self.l7rule_path.format(l7rule_id=l7r1.get('id')),
params={'fields': ['id', 'compare_type']}).json.get(self.root_tag)
self.assertIn(u'id', l7ru)
self.assertIn(u'compare_type', l7ru)
self.assertNotIn(u'project_id', l7ru)
self.assertIn('id', l7ru)
self.assertIn('compare_type', l7ru)
self.assertNotIn('project_id', l7ru)
def test_get_all_filter(self):
ru1 = self.create_l7rule(
@ -797,7 +797,7 @@ class TestL7Rule(base.BaseAPITest):
'key': 'no-need-key'}
response = req_func(first_req_arg, self._build_body(l7rule),
status=400).json
self.assertIn('L7rule type {0} does not use the "key" field.'.format(
self.assertIn('L7rule type {} does not use the "key" field.'.format(
constants.L7RULE_TYPE_SSL_CONN_HAS_CERT),
response.get('faultstring'))
@ -806,7 +806,7 @@ class TestL7Rule(base.BaseAPITest):
response = req_func(first_req_arg, self._build_body(l7rule),
status=400).json
self.assertIn(
'L7rule value {0} is not a boolean True string.'.format(
'L7rule value {} is not a boolean True string.'.format(
l7rule['value']), response.get('faultstring'))
l7rule['value'] = 'tRUe'
@ -814,7 +814,7 @@ class TestL7Rule(base.BaseAPITest):
response = req_func(first_req_arg, self._build_body(l7rule),
status=400).json
self.assertIn(
'L7rule type {0} only supports the {1} compare type.'.format(
'L7rule type {} only supports the {} compare type.'.format(
constants.L7RULE_TYPE_SSL_CONN_HAS_CERT,
constants.L7RULE_COMPARE_TYPE_EQUAL_TO),
response.get('faultstring'))
@ -829,14 +829,14 @@ class TestL7Rule(base.BaseAPITest):
response = req_func(first_req_arg, self._build_body(l7rule),
status=400).json
self.assertIn(
'L7rule type {0} does not use the "key" field.'.format(
'L7rule type {} does not use the "key" field.'.format(
l7rule['type']), response.get('faultstring'))
l7rule.pop('key')
response = req_func(first_req_arg, self._build_body(l7rule),
status=400).json
self.assertIn(
'L7rule type {0} needs a int value, which is >= 0'.format(
'L7rule type {} needs a int value, which is >= 0'.format(
l7rule['type']), response.get('faultstring'))
l7rule['value'] = '0'
@ -844,7 +844,7 @@ class TestL7Rule(base.BaseAPITest):
response = req_func(first_req_arg, self._build_body(l7rule),
status=400).json
self.assertIn(
'L7rule type {0} only supports the {1} compare type.'.format(
'L7rule type {} only supports the {} compare type.'.format(
l7rule['type'], constants.L7RULE_COMPARE_TYPE_EQUAL_TO),
response.get('faultstring'))
@ -861,7 +861,7 @@ class TestL7Rule(base.BaseAPITest):
response = req_func(first_req_arg, self._build_body(l7rule),
status=400).json
self.assertIn(
'L7rule type {0} needs to specify a key and a value.'.format(
'L7rule type {} needs to specify a key and a value.'.format(
l7rule['type']), response.get('faultstring'))
l7rule['key'] = 'NOT_SUPPORTED_DN_FIELD'

View File

@ -383,9 +383,9 @@ class TestListener(base.BaseAPITest):
lis = self.get(self.LISTENERS_PATH, params={
'fields': ['id', 'project_id']}).json
for li in lis['listeners']:
self.assertIn(u'id', li)
self.assertIn(u'project_id', li)
self.assertNotIn(u'description', li)
self.assertIn('id', li)
self.assertIn('project_id', li)
self.assertNotIn('description', li)
def test_get_one_fields_filter(self):
listener1 = self.create_listener(
@ -396,9 +396,9 @@ class TestListener(base.BaseAPITest):
li = self.get(
self.LISTENER_PATH.format(listener_id=listener1.get('id')),
params={'fields': ['id', 'project_id']}).json.get(self.root_tag)
self.assertIn(u'id', li)
self.assertIn(u'project_id', li)
self.assertNotIn(u'description', li)
self.assertIn('id', li)
self.assertIn('project_id', li)
self.assertNotIn('description', li)
def test_get_all_filter(self):
li1 = self.create_listener(constants.PROTOCOL_HTTP,
@ -699,10 +699,10 @@ class TestListener(base.BaseAPITest):
optionals.update({field[0]: 1})
fault = resp.get('faultstring')
self.assertIn(
'Invalid input for field/attribute {0}'.format(
'Invalid input for field/attribute {}'.format(
field[0]), fault)
self.assertIn(
'Value should be lower or equal to {0}'.format(
'Value should be lower or equal to {}'.format(
constants.MAX_TIMEOUT), fault)
def test_create_with_timeouts_too_low(self):
@ -717,7 +717,7 @@ class TestListener(base.BaseAPITest):
self.assertIn(
'Invalid input for field/attribute timeout_tcp_inspect', fault)
self.assertIn(
'Value should be greater or equal to {0}'.format(
'Value should be greater or equal to {}'.format(
constants.MIN_TIMEOUT), fault)
def test_create_udp_case(self):
@ -807,7 +807,7 @@ class TestListener(base.BaseAPITest):
self.assertIn(
'Certificate container references are not allowed on ', fault)
self.assertIn(
'{} protocol listeners.'.format(constants.PROTOCOL_TCP), fault)
f'{constants.PROTOCOL_TCP} protocol listeners.', fault)
def test_create_without_certs_if_terminated_https(self):
optionals = {
@ -1276,7 +1276,7 @@ class TestListener(base.BaseAPITest):
self._test_create_with_allowed_cidrs(allowed_cidrs, lb_id)
def test_create_with_bad_allowed_cidrs(self):
allowed_cidrs = [u'10.0.1.0/33', u'172.16.55.1.0/25']
allowed_cidrs = ['10.0.1.0/33', '172.16.55.1.0/25']
lb_listener = {
'protocol': constants.PROTOCOL_TCP,
'protocol_port': 80,
@ -2576,7 +2576,7 @@ class TestListener(base.BaseAPITest):
default_tls_container_ref=cert_id,
status=400)
self.assertIn(
'The selected protocol is not allowed in this deployment: {0}'
'The selected protocol is not allowed in this deployment: {}'
.format(constants.PROTOCOL_TERMINATED_HTTPS),
listener.get('faultstring'))
@ -2776,7 +2776,7 @@ class TestListener(base.BaseAPITest):
lb_listener['insert_headers'] = header
body = self._build_body(lb_listener)
listener = self.post(self.LISTENERS_PATH, body, status=400).json
self.assertIn('{0} is not a valid option for {1}'.format(
self.assertIn('{} is not a valid option for {}'.format(
[name],
'%s protocol listener.' % constants.PROTOCOL_HTTP),
listener.get('faultstring'))
@ -2829,7 +2829,7 @@ class TestListener(base.BaseAPITest):
listener_id=listener['listener'].get('id'))
update_listener = self.put(
listener_path, new_listener, status=400).json
self.assertIn('{0} is not a valid option for {1}'.format(
self.assertIn('{} is not a valid option for {}'.format(
'[\'X-Bad-Header\']', 'insert_headers'),
update_listener.get('faultstring'))

View File

@ -156,7 +156,7 @@ class TestLoadBalancer(base.BaseAPITest):
".NoopManager.get_subnet") as mock_get_subnet:
mock_get_subnet.side_effect = network_base.SubnetNotFound
response = self.post(self.LBS_PATH, body, status=400)
err_msg = 'Subnet {} not found.'.format(subnet_id)
err_msg = f'Subnet {subnet_id} not found.'
self.assertEqual(err_msg, response.json.get('faultstring'))
def test_create_with_invalid_vip_network_subnet(self):
@ -172,7 +172,7 @@ class TestLoadBalancer(base.BaseAPITest):
".NoopManager.get_network") as mock_get_network:
mock_get_network.return_value = network
response = self.post(self.LBS_PATH, body, status=400)
err_msg = 'Subnet {} not found.'.format(subnet_id)
err_msg = f'Subnet {subnet_id} not found.'
self.assertEqual(err_msg, response.json.get('faultstring'))
def test_create_with_vip_subnet_fills_network(self):
@ -1414,7 +1414,7 @@ class TestLoadBalancer(base.BaseAPITest):
name='lb3', project_id=project_id)
auth_strategy = self.conf.conf.api_settings.get('auth_strategy')
self.conf.config(group='api_settings', auth_strategy=constants.TESTING)
LB_PROJECT_PATH = '{}?project_id={}'.format(self.LBS_PATH, project_id)
LB_PROJECT_PATH = f'{self.LBS_PATH}?project_id={project_id}'
with mock.patch.object(octavia.common.context.RequestContext,
'project_id',
self.project_id):
@ -1567,9 +1567,9 @@ class TestLoadBalancer(base.BaseAPITest):
lbs = self.get(self.LBS_PATH, params={
'fields': ['id', 'project_id']}).json
for lb in lbs['loadbalancers']:
self.assertIn(u'id', lb)
self.assertIn(u'project_id', lb)
self.assertNotIn(u'description', lb)
self.assertIn('id', lb)
self.assertIn('project_id', lb)
self.assertNotIn('description', lb)
def test_get_one_fields_filter(self):
lb1 = self.create_load_balancer(
@ -1579,9 +1579,9 @@ class TestLoadBalancer(base.BaseAPITest):
lb = self.get(
self.LB_PATH.format(lb_id=lb1.get('id')),
params={'fields': ['id', 'project_id']}).json.get(self.root_tag)
self.assertIn(u'id', lb)
self.assertIn(u'project_id', lb)
self.assertNotIn(u'description', lb)
self.assertIn('id', lb)
self.assertIn('project_id', lb)
self.assertNotIn('description', lb)
def test_get_all_admin_state_up_filter(self):
self.create_load_balancer(uuidutils.generate_uuid(),

View File

@ -352,10 +352,10 @@ class TestMember(base.BaseAPITest):
members = self.get(self.members_path, params={
'fields': ['id', 'address']}).json
for member in members['members']:
self.assertIn(u'id', member)
self.assertIn(u'address', member)
self.assertNotIn(u'name', member)
self.assertNotIn(u'monitor_address', member)
self.assertIn('id', member)
self.assertIn('address', member)
self.assertNotIn('name', member)
self.assertNotIn('monitor_address', member)
def test_get_one_fields_filter(self):
member1 = self.create_member(
@ -365,10 +365,10 @@ class TestMember(base.BaseAPITest):
member = self.get(
self.member_path.format(member_id=member1.get('id')),
params={'fields': ['id', 'address']}).json.get(self.root_tag)
self.assertIn(u'id', member)
self.assertIn(u'address', member)
self.assertNotIn(u'name', member)
self.assertNotIn(u'monitor_address', member)
self.assertIn('id', member)
self.assertIn('address', member)
self.assertNotIn('name', member)
self.assertNotIn('monitor_address', member)
def test_get_all_filter(self):
mem1 = self.create_member(self.pool_id,
@ -689,7 +689,7 @@ class TestMember(base.BaseAPITest):
# Order matters here
provider_creates += provider_updates
mock_provider.assert_called_once_with(u'noop_driver',
mock_provider.assert_called_once_with('noop_driver',
mock_driver.member_batch_update,
self.pool_id, provider_creates)
@ -732,7 +732,7 @@ class TestMember(base.BaseAPITest):
provider_dict['name'] = None
provider_members.append(driver_dm.Member(**provider_dict))
mock_provider.assert_called_once_with(u'noop_driver',
mock_provider.assert_called_once_with('noop_driver',
mock_driver.member_batch_update,
self.pool_id, provider_members)
@ -792,7 +792,7 @@ class TestMember(base.BaseAPITest):
req_dict = [member1, member2, member5, member6]
body = {self.root_tag_list: req_dict}
path = self.MEMBERS_PATH.format(pool_id=self.pool_id)
path = "{}?additive_only=True".format(path)
path = f"{path}?additive_only=True"
self.put(path, body, status=202)
returned_members = self.get(
self.MEMBERS_PATH.format(pool_id=self.pool_id)
@ -834,7 +834,7 @@ class TestMember(base.BaseAPITest):
provider_creates += provider_updates
provider_creates += provider_ignored
mock_provider.assert_called_once_with(u'noop_driver',
mock_provider.assert_called_once_with('noop_driver',
mock_driver.member_batch_update,
self.pool_id, provider_creates)
@ -885,7 +885,7 @@ class TestMember(base.BaseAPITest):
del provider_dict['subnet_id']
provider_members.append(driver_dm.Member(**provider_dict))
mock_provider.assert_called_once_with(u'noop_driver',
mock_provider.assert_called_once_with('noop_driver',
mock_driver.member_batch_update,
self.pool_id, provider_members)
@ -971,7 +971,7 @@ class TestMember(base.BaseAPITest):
rm['protocol_port'],
rm['provisioning_status']), expected_members)
mock_provider.assert_called_once_with(u'noop_driver',
mock_provider.assert_called_once_with('noop_driver',
mock_driver.member_batch_update,
self.pool_id, provider_members)

View File

@ -553,9 +553,9 @@ class TestPool(base.BaseAPITest):
pools = self.get(self.POOLS_PATH, params={
'fields': ['id', 'project_id']}).json
for pool in pools['pools']:
self.assertIn(u'id', pool)
self.assertIn(u'project_id', pool)
self.assertNotIn(u'description', pool)
self.assertIn('id', pool)
self.assertIn('project_id', pool)
self.assertNotIn('description', pool)
def test_get_one_fields_filter(self):
pool1 = self.create_pool(
@ -568,9 +568,9 @@ class TestPool(base.BaseAPITest):
pool = self.get(
self.POOL_PATH.format(pool_id=pool1.get('id')),
params={'fields': ['id', 'project_id']}).json.get(self.root_tag)
self.assertIn(u'id', pool)
self.assertIn(u'project_id', pool)
self.assertNotIn(u'description', pool)
self.assertIn('id', pool)
self.assertIn('project_id', pool)
self.assertNotIn('description', pool)
def test_get_all_filter(self):
po1 = self.create_pool(

View File

@ -33,10 +33,10 @@ class TestProvider(base.BaseAPITest):
super().setUp()
def test_get_all_providers(self):
octavia_dict = {u'description': u'Octavia driver.',
u'name': u'octavia'}
amphora_dict = {u'description': u'Amp driver.', u'name': u'amphora'}
noop_dict = {u'description': u'NoOp driver.', u'name': u'noop_driver'}
octavia_dict = {'description': 'Octavia driver.',
'name': 'octavia'}
amphora_dict = {'description': 'Amp driver.', 'name': 'amphora'}
noop_dict = {'description': 'NoOp driver.', 'name': 'noop_driver'}
providers = self.get(self.PROVIDERS_PATH).json.get(self.root_tag_list)
self.assertEqual(4, len(providers))
self.assertIn(octavia_dict, providers)
@ -44,9 +44,9 @@ class TestProvider(base.BaseAPITest):
self.assertIn(noop_dict, providers)
def test_get_all_providers_fields(self):
octavia_dict = {u'name': u'octavia'}
amphora_dict = {u'name': u'amphora'}
noop_dict = {u'name': u'noop_driver'}
octavia_dict = {'name': 'octavia'}
amphora_dict = {'name': 'amphora'}
noop_dict = {'name': 'noop_driver'}
providers = self.get(self.PROVIDERS_PATH, params={'fields': ['name']})
providers_list = providers.json.get(self.root_tag_list)
self.assertEqual(4, len(providers_list))

View File

@ -24,7 +24,7 @@ from octavia.tests.functional.db import base
from sqlalchemy.orm import collections
class ModelTestMixin(object):
class ModelTestMixin:
FAKE_IP = '10.0.0.1'
FAKE_UUID_1 = uuidutils.generate_uuid()

View File

@ -155,36 +155,36 @@ class TestAmphoraInfo(base.TestCase):
m_count.return_value = 5
original_version = api_server.VERSION
api_server.VERSION = self.API_VERSION
expected_dict = {u'active': True,
expected_dict = {'active': True,
'active_tuned_profiles': '',
u'api_version': self.API_VERSION,
u'cpu': {u'soft_irq': u'8336',
u'system': u'52554',
u'total': 7503411,
u'user': u'252551'},
'api_version': self.API_VERSION,
'cpu': {'soft_irq': '8336',
'system': '52554',
'total': 7503411,
'user': '252551'},
'cpu_count': os.cpu_count(),
u'disk': {u'available': 109079126016,
u'used': 25718685696},
u'haproxy_count': 5,
u'haproxy_version': self.HAPROXY_VERSION,
u'hostname': u'FAKE_HOST',
u'listeners': sorted([self.FAKE_LISTENER_ID_1,
self.FAKE_LISTENER_ID_2]),
u'load': [u'0.09', u'0.11', u'0.10'],
u'memory': {u'buffers': 344792,
u'cached': 4271856,
u'free': 12685624,
u'shared': 9520,
u'slab': 534384,
u'swap_used': 0,
u'total': 21692784},
u'networks': {u'eth1': {u'network_rx': 996,
u'network_tx': 418},
u'eth2': {u'network_rx': 848,
u'network_tx': 578}},
u'packages': {},
u'topology': u'SINGLE',
u'topology_status': u'OK'}
'disk': {'available': 109079126016,
'used': 25718685696},
'haproxy_count': 5,
'haproxy_version': self.HAPROXY_VERSION,
'hostname': 'FAKE_HOST',
'listeners': sorted([self.FAKE_LISTENER_ID_1,
self.FAKE_LISTENER_ID_2]),
'load': ['0.09', '0.11', '0.10'],
'memory': {'buffers': 344792,
'cached': 4271856,
'free': 12685624,
'shared': 9520,
'slab': 534384,
'swap_used': 0,
'total': 21692784},
'networks': {'eth1': {'network_rx': 996,
'network_tx': 418},
'eth2': {'network_rx': 848,
'network_tx': 578}},
'packages': {},
'topology': 'SINGLE',
'topology_status': 'OK'}
self.useFixture(test_utils.OpenFixture('/etc/tuned/active_profile'))
actual = self.amp_info.compile_amphora_details()
self.assertEqual(expected_dict, actual.json)
@ -243,41 +243,40 @@ class TestAmphoraInfo(base.TestCase):
mock_get_lb.return_value = [self.LB_ID_1]
original_version = api_server.VERSION
api_server.VERSION = self.API_VERSION
expected_dict = {u'active': True,
expected_dict = {'active': True,
'active_tuned_profiles': '',
u'api_version': self.API_VERSION,
u'cpu': {u'soft_irq': u'8336',
u'system': u'52554',
u'total': 7503411,
u'user': u'252551'},
'api_version': self.API_VERSION,
'cpu': {'soft_irq': '8336',
'system': '52554',
'total': 7503411,
'user': '252551'},
'cpu_count': os.cpu_count(),
u'disk': {u'available': 109079126016,
u'used': 25718685696},
u'haproxy_count': 5,
u'haproxy_version': self.HAPROXY_VERSION,
u'keepalived_version': self.KEEPALIVED_VERSION,
u'ipvsadm_version': self.IPVSADM_VERSION,
u'lvs_listener_process_count': 1,
u'hostname': u'FAKE_HOST',
u'listeners': sorted(list(set(
[self.FAKE_LISTENER_ID_3,
self.FAKE_LISTENER_ID_4,
'sample_listener_id_1']))),
u'load': [u'0.09', u'0.11', u'0.10'],
u'memory': {u'buffers': 344792,
u'cached': 4271856,
u'free': 12685624,
u'shared': 9520,
u'slab': 534384,
u'swap_used': 0,
u'total': 21692784},
u'networks': {u'eth1': {u'network_rx': 996,
u'network_tx': 418},
u'eth2': {u'network_rx': 848,
u'network_tx': 578}},
u'packages': {},
u'topology': u'SINGLE',
u'topology_status': u'OK'}
'disk': {'available': 109079126016,
'used': 25718685696},
'haproxy_count': 5,
'haproxy_version': self.HAPROXY_VERSION,
'keepalived_version': self.KEEPALIVED_VERSION,
'ipvsadm_version': self.IPVSADM_VERSION,
'lvs_listener_process_count': 1,
'hostname': 'FAKE_HOST',
'listeners': sorted(list({self.FAKE_LISTENER_ID_3,
self.FAKE_LISTENER_ID_4,
'sample_listener_id_1'})),
'load': ['0.09', '0.11', '0.10'],
'memory': {'buffers': 344792,
'cached': 4271856,
'free': 12685624,
'shared': 9520,
'slab': 534384,
'swap_used': 0,
'total': 21692784},
'networks': {'eth1': {'network_rx': 996,
'network_tx': 418},
'eth2': {'network_rx': 848,
'network_tx': 578}},
'packages': {},
'topology': 'SINGLE',
'topology_status': 'OK'}
self.useFixture(test_utils.OpenFixture('/etc/tuned/active_profile'))
actual = self.amp_info.compile_amphora_details(self.lvs_driver)
self.assertEqual(expected_dict, actual.json)

View File

@ -36,7 +36,7 @@ class KeepalivedLvsTestCase(base.TestCase):
mock.call(
json=dict(message='UDP Listener Not Found',
details="No UDP listener with UUID: "
"{0}".format(self.FAKE_ID)), status=404),
"{}".format(self.FAKE_ID)), status=404),
mock.call(json={'message': 'OK'})
]
m_webob.Response.assert_has_calls(calls)

View File

@ -77,7 +77,7 @@ class ListenerTestCase(base.TestCase):
# Happy path - No VRRP
ref_command_split = ['/usr/sbin/service']
ref_command_split.append('haproxy-{}'.format(listener_id))
ref_command_split.append(f'haproxy-{listener_id}')
ref_command_split.append(consts.AMP_ACTION_START)
result = self.test_loadbalancer.start_stop_lb(
@ -90,7 +90,7 @@ class ListenerTestCase(base.TestCase):
self.assertEqual(202, result.status_code)
self.assertEqual('OK', result.json['message'])
ref_details = ('Configuration file is valid\n'
'haproxy daemon for {0} started'.format(listener_id))
'haproxy daemon for {} started'.format(listener_id))
self.assertEqual(ref_details, result.json['details'])
# Happy path - VRRP - RELOAD
@ -102,7 +102,7 @@ class ListenerTestCase(base.TestCase):
mock_check_output.reset_mock()
ref_command_split = ['/usr/sbin/service']
ref_command_split.append('haproxy-{}'.format(listener_id))
ref_command_split.append(f'haproxy-{listener_id}')
ref_command_split.append(consts.AMP_ACTION_RELOAD)
result = self.test_loadbalancer.start_stop_lb(
@ -115,8 +115,8 @@ class ListenerTestCase(base.TestCase):
consts.AMP_ACTION_RELOAD)
self.assertEqual(202, result.status_code)
self.assertEqual('OK', result.json['message'])
ref_details = ('Listener {0} {1}ed'.format(listener_id,
consts.AMP_ACTION_RELOAD))
ref_details = ('Listener {} {}ed'.format(listener_id,
consts.AMP_ACTION_RELOAD))
self.assertEqual(ref_details, result.json['details'])
# Happy path - VRRP - RELOAD - OFFLINE
@ -125,7 +125,7 @@ class ListenerTestCase(base.TestCase):
mock_check_output.reset_mock()
ref_command_split = ['/usr/sbin/service']
ref_command_split.append('haproxy-{}'.format(listener_id))
ref_command_split.append(f'haproxy-{listener_id}')
ref_command_split.append(consts.AMP_ACTION_START)
result = self.test_loadbalancer.start_stop_lb(
@ -139,7 +139,7 @@ class ListenerTestCase(base.TestCase):
self.assertEqual(202, result.status_code)
self.assertEqual('OK', result.json['message'])
ref_details = ('Configuration file is valid\n'
'haproxy daemon for {0} started'.format(listener_id))
'haproxy daemon for {} started'.format(listener_id))
self.assertEqual(ref_details, result.json['details'])
# Unhappy path - Not already running
@ -151,7 +151,7 @@ class ListenerTestCase(base.TestCase):
mock_check_output.reset_mock()
ref_command_split = ['/usr/sbin/service']
ref_command_split.append('haproxy-{}'.format(listener_id))
ref_command_split.append(f'haproxy-{listener_id}')
ref_command_split.append(consts.AMP_ACTION_START)
mock_check_output.side_effect = subprocess.CalledProcessError(
@ -165,7 +165,7 @@ class ListenerTestCase(base.TestCase):
mock_lb_exists.assert_called_once_with(listener_id)
mock_vrrp_update.assert_not_called()
self.assertEqual(500, result.status_code)
self.assertEqual('Error {}ing haproxy'.format(consts.AMP_ACTION_START),
self.assertEqual(f'Error {consts.AMP_ACTION_START}ing haproxy',
result.json['message'])
self.assertEqual('bogus', result.json['details'])
@ -175,7 +175,7 @@ class ListenerTestCase(base.TestCase):
mock_check_output.reset_mock()
ref_command_split = ['/usr/sbin/service']
ref_command_split.append('haproxy-{}'.format(listener_id))
ref_command_split.append(f'haproxy-{listener_id}')
ref_command_split.append(consts.AMP_ACTION_START)
mock_check_output.side_effect = subprocess.CalledProcessError(
@ -191,7 +191,7 @@ class ListenerTestCase(base.TestCase):
self.assertEqual(202, result.status_code)
self.assertEqual('OK', result.json['message'])
ref_details = ('Configuration file is valid\n'
'haproxy daemon for {0} started'.format(listener_id))
'haproxy daemon for {} started'.format(listener_id))
self.assertEqual(ref_details, result.json['details'])
# Invalid action

View File

@ -113,19 +113,19 @@ class TestOSUtils(base.TestCase):
@mock.patch('octavia.amphorae.backends.utils.interface_file.'
'VIPInterfaceFile')
def test_write_vip_interface_file(self, mock_vip_interface_file):
netns_interface = u'eth1234'
FIXED_IP = u'192.0.2.2'
SUBNET_CIDR = u'192.0.2.0/24'
GATEWAY = u'192.51.100.1'
DEST1 = u'198.51.100.0/24'
DEST2 = u'203.0.113.0/24'
NEXTHOP = u'192.0.2.1'
netns_interface = 'eth1234'
FIXED_IP = '192.0.2.2'
SUBNET_CIDR = '192.0.2.0/24'
GATEWAY = '192.51.100.1'
DEST1 = '198.51.100.0/24'
DEST2 = '203.0.113.0/24'
NEXTHOP = '192.0.2.1'
MTU = 1450
FIXED_IP_IPV6 = u'2001:0db8:0000:0000:0000:0000:0000:000a'
FIXED_IP_IPV6 = '2001:0db8:0000:0000:0000:0000:0000:000a'
# Subnet prefix is purposefully not 32, because that coincidentally
# matches the result of any arbitrary IPv4->prefixlen conversion
SUBNET_CIDR_IPV6 = u'2001:db8::/70'
GATEWAY_IPV6 = u'2001:0db8:0000:0000:0000:0000:0000:0001'
SUBNET_CIDR_IPV6 = '2001:db8::/70'
GATEWAY_IPV6 = '2001:0db8:0000:0000:0000:0000:0000:0001'
ip = ipaddress.ip_address(FIXED_IP)
network = ipaddress.ip_network(SUBNET_CIDR)
@ -200,15 +200,15 @@ class TestOSUtils(base.TestCase):
@mock.patch('octavia.amphorae.backends.utils.interface_file.'
'PortInterfaceFile')
def test_write_port_interface_file(self, mock_port_interface_file):
FIXED_IP = u'192.0.2.2'
NEXTHOP = u'192.0.2.1'
DEST = u'198.51.100.0/24'
FIXED_IP = '192.0.2.2'
NEXTHOP = '192.0.2.1'
DEST = '198.51.100.0/24'
host_routes = [
{'nexthop': NEXTHOP, 'destination': ipaddress.ip_network(DEST)}
]
FIXED_IP_IPV6 = u'2001:db8::2'
NEXTHOP_IPV6 = u'2001:db8::1'
DEST_IPV6 = u'2001:db8:51:100::/64'
FIXED_IP_IPV6 = '2001:db8::2'
NEXTHOP_IPV6 = '2001:db8::1'
DEST_IPV6 = '2001:db8:51:100::/64'
host_routes_ipv6 = [
{'nexthop': NEXTHOP_IPV6,
'destination': ipaddress.ip_network(DEST_IPV6)}

View File

@ -275,7 +275,7 @@ class TestUtil(base.TestCase):
pass
# Bad listener mode
fake_cfg = 'stats socket foo\nfrontend {}\nmode\n'.format(LISTENER_ID1)
fake_cfg = f'stats socket foo\nfrontend {LISTENER_ID1}\nmode\n'
self.useFixture(test_utils.OpenFixture(path, fake_cfg))
self.assertRaises(util.ParsingError, util.parse_haproxy_file,
LISTENER_ID1)

View File

@ -337,7 +337,7 @@ class TestHealthDaemon(base.TestCase):
mock_get_stats, mock_is_running):
health_daemon.COUNTERS = None
health_daemon.COUNTERS_FILE = None
lb1_stats_socket = '/var/lib/octavia/{0}/haproxy.sock'.format(LB_ID1)
lb1_stats_socket = f'/var/lib/octavia/{LB_ID1}/haproxy.sock'
mock_list_files.return_value = {LB_ID1: lb1_stats_socket}
mock_is_running.return_value = True
@ -374,7 +374,7 @@ class TestHealthDaemon(base.TestCase):
mock_is_running):
health_daemon.COUNTERS = None
health_daemon.COUNTERS_FILE = None
lb1_stats_socket = '/var/lib/octavia/{0}/haproxy.sock'.format(LB_ID1)
lb1_stats_socket = f'/var/lib/octavia/{LB_ID1}/haproxy.sock'
mock_list_files.return_value = {LB_ID1: lb1_stats_socket}
mock_is_running.return_value = False
@ -472,7 +472,7 @@ class TestHealthDaemon(base.TestCase):
mock_get_stats, mock_is_running):
health_daemon.COUNTERS = None
health_daemon.COUNTERS_FILE = None
lb1_stats_socket = '/var/lib/octavia/{0}/haproxy.sock'.format(LB_ID1)
lb1_stats_socket = f'/var/lib/octavia/{LB_ID1}/haproxy.sock'
mock_list_files.return_value = {LB_ID1: lb1_stats_socket}
mock_is_running.return_value = True

View File

@ -37,7 +37,7 @@ class TestInterface(base.TestCase):
ifaces = ('eth0', 'eth7', 'eth8')
mock_listdir.return_value = [
"{}.json".format(iface)
f"{iface}.json"
for iface in ifaces
]
mock_listdir.return_value.extend(["invalidfile"])
@ -48,7 +48,7 @@ class TestInterface(base.TestCase):
for iface in ifaces:
f = os.path.join(consts.AMP_NET_DIR_TEMPLATE,
"{}.json".format(iface))
f"{iface}.json")
self.assertIn(f, config_file_list)
# unsupported file
@ -277,7 +277,7 @@ class TestInterface(base.TestCase):
"/var/lib/dhclient/dhclient-{}.leases".format(
iface),
"-pf",
"/run/dhclient-{}.pid".format(iface),
f"/run/dhclient-{iface}.pid",
iface], stderr=-2)
@mock.patch('subprocess.check_output')
@ -294,7 +294,7 @@ class TestInterface(base.TestCase):
"/var/lib/dhclient/dhclient-{}.leases".format(
iface),
"-pf",
"/run/dhclient-{}.pid".format(iface),
f"/run/dhclient-{iface}.pid",
iface], stderr=-2)
@mock.patch('subprocess.check_output')
@ -924,13 +924,13 @@ class TestInterface(base.TestCase):
"/var/lib/dhclient/dhclient-{}.leases".format(
iface.name),
"-pf",
"/run/dhclient-{}.pid".format(iface.name),
f"/run/dhclient-{iface.name}.pid",
iface.name], stderr=-2),
mock.call(["/sbin/sysctl", "-w",
"net.ipv6.conf.{}.accept_ra=2".format(iface.name)],
f"net.ipv6.conf.{iface.name}.accept_ra=2"],
stderr=-2),
mock.call(["/sbin/sysctl", "-w",
"net.ipv6.conf.{}.autoconf=1".format(iface.name)],
f"net.ipv6.conf.{iface.name}.autoconf=1"],
stderr=-2),
mock.call(["post-up", iface.name])
])
@ -1341,13 +1341,13 @@ class TestInterface(base.TestCase):
"/var/lib/dhclient/dhclient-{}.leases".format(
iface.name),
"-pf",
"/run/dhclient-{}.pid".format(iface.name),
f"/run/dhclient-{iface.name}.pid",
iface.name], stderr=-2),
mock.call(["/sbin/sysctl", "-w",
"net.ipv6.conf.{}.accept_ra=0".format(iface.name)],
f"net.ipv6.conf.{iface.name}.accept_ra=0"],
stderr=-2),
mock.call(["/sbin/sysctl", "-w",
"net.ipv6.conf.{}.autoconf=0".format(iface.name)],
f"net.ipv6.conf.{iface.name}.autoconf=0"],
stderr=-2),
mock.call(["post-down", iface.name])
])

View File

@ -50,7 +50,7 @@ class TestNetworkNamespace(base.TestCase):
mock_cdll_obj = mock.MagicMock()
mock_cdll.return_value = mock_cdll_obj
mock_getpid.return_value = FAKE_PID
expected_current_netns = '/proc/{pid}/ns/net'.format(pid=FAKE_PID)
expected_current_netns = f'/proc/{FAKE_PID}/ns/net'
expected_target_netns = '/var/run/netns/{netns}'.format(
netns=FAKE_NETNS)
@ -72,7 +72,7 @@ class TestNetworkNamespace(base.TestCase):
mock_getpid.return_value = FAKE_PID
mock_cdll_obj = mock.MagicMock()
mock_cdll.return_value = mock_cdll_obj
expected_current_netns = '/proc/{pid}/ns/net'.format(pid=FAKE_PID)
expected_current_netns = f'/proc/{FAKE_PID}/ns/net'
expected_target_netns = '/var/run/netns/{netns}'.format(
netns=FAKE_NETNS)

View File

@ -70,7 +70,7 @@ class TestHaproxyAmphoraLoadBalancerDriverTest(base.TestCase):
'api_version': API_VERSION}
self.driver.clients[
API_VERSION].get_info.return_value = {
'haproxy_version': u'1.6.3-1ubuntu0.1',
'haproxy_version': '1.6.3-1ubuntu0.1',
'api_version': API_VERSION}
self.driver.jinja_combo = mock.MagicMock()
self.driver.lvs_jinja = mock.MagicMock()
@ -356,7 +356,7 @@ class TestHaproxyAmphoraLoadBalancerDriverTest(base.TestCase):
self.driver.cert_manager.get_secret.return_value = fake_secret
ref_md5 = md5(fake_secret, usedforsecurity=False).hexdigest() # nosec
ref_id = hashlib.sha1(fake_secret).hexdigest() # nosec
ref_name = '{id}.pem'.format(id=ref_id)
ref_name = f'{ref_id}.pem'
result = self.driver._process_secret(
sample_listener, sample_listener.client_ca_tls_certificate_id,
@ -419,7 +419,7 @@ class TestHaproxyAmphoraLoadBalancerDriverTest(base.TestCase):
fake_pem = b'fake pem'
mock_build_pem.return_value = fake_pem
ref_md5 = md5(fake_pem, usedforsecurity=False).hexdigest() # nosec
ref_name = '{id}.pem'.format(id=pool_cert.id)
ref_name = f'{pool_cert.id}.pem'
ref_path = '{cert_dir}/{lb_id}/{name}'.format(
cert_dir=fake_cert_dir, lb_id=sample_listener.load_balancer.id,
name=ref_name)
@ -637,7 +637,7 @@ class TestHaproxyAmphoraLoadBalancerDriverTest(base.TestCase):
# Ensure that at least one call in each pair has been seen
if (dcp_calls[0] not in mock_calls and
dcp_calls[1] not in mock_calls):
raise Exception("%s not found in %s" % (dcp_calls, mock_calls))
raise Exception(f"{dcp_calls} not found in {mock_calls}")
# Now just make sure we did an update and not a delete
self.driver.clients[API_VERSION].delete_listener.assert_not_called()
@ -951,7 +951,7 @@ class TestAmphoraAPIClientTest(base.TestCase):
@requests_mock.mock()
def test_get_api_version(self, mock_requests):
ref_api_version = {'api_version': '0.1'}
mock_requests.get('{base}/'.format(base=self.base_url),
mock_requests.get(f'{self.base_url}/',
json=ref_api_version)
result = self.driver.get_api_version(self.amp)
self.assertEqual(ref_api_version, result)
@ -960,7 +960,7 @@ class TestAmphoraAPIClientTest(base.TestCase):
def test_get_info(self, m):
info = {"hostname": "some_hostname", "version": "some_version",
"api_version": "1.0", "uuid": FAKE_UUID_1}
m.get("{base}/info".format(base=self.base_url_ver),
m.get(f"{self.base_url_ver}/info",
json=info)
information = self.driver.get_info(self.amp)
self.assertEqual(info, information)
@ -969,7 +969,7 @@ class TestAmphoraAPIClientTest(base.TestCase):
def test_get_info_with_timeout_dict(self, m):
info = {"hostname": "some_hostname", "version": "some_version",
"api_version": "1.0", "uuid": FAKE_UUID_1}
m.get("{base}/info".format(base=self.base_url_ver),
m.get(f"{self.base_url_ver}/info",
json=info)
timeout_dict = {
constants.CONN_MAX_RETRIES: 100,
@ -980,27 +980,27 @@ class TestAmphoraAPIClientTest(base.TestCase):
@requests_mock.mock()
def test_get_info_unauthorized(self, m):
m.get("{base}/info".format(base=self.base_url_ver),
m.get(f"{self.base_url_ver}/info",
status_code=401)
self.assertRaises(exc.Unauthorized, self.driver.get_info, self.amp)
@requests_mock.mock()
def test_get_info_missing(self, m):
m.get("{base}/info".format(base=self.base_url_ver),
m.get(f"{self.base_url_ver}/info",
status_code=404,
headers={'content-type': 'application/json'})
self.assertRaises(exc.NotFound, self.driver.get_info, self.amp)
@requests_mock.mock()
def test_get_info_server_error(self, m):
m.get("{base}/info".format(base=self.base_url_ver),
m.get(f"{self.base_url_ver}/info",
status_code=500)
self.assertRaises(exc.InternalServerError, self.driver.get_info,
self.amp)
@requests_mock.mock()
def test_get_info_service_unavailable(self, m):
m.get("{base}/info".format(base=self.base_url_ver),
m.get(f"{self.base_url_ver}/info",
status_code=503)
self.assertRaises(exc.ServiceUnavailable, self.driver.get_info,
self.amp)
@ -1011,34 +1011,34 @@ class TestAmphoraAPIClientTest(base.TestCase):
"api_version": "1.0", "uuid": FAKE_UUID_1,
"network_tx": "some_tx", "network_rx": "some_rx",
"active": True, "haproxy_count": 10}
m.get("{base}/details".format(base=self.base_url_ver),
m.get(f"{self.base_url_ver}/details",
json=details)
amp_details = self.driver.get_details(self.amp)
self.assertEqual(details, amp_details)
@requests_mock.mock()
def test_get_details_unauthorized(self, m):
m.get("{base}/details".format(base=self.base_url_ver),
m.get(f"{self.base_url_ver}/details",
status_code=401)
self.assertRaises(exc.Unauthorized, self.driver.get_details, self.amp)
@requests_mock.mock()
def test_get_details_missing(self, m):
m.get("{base}/details".format(base=self.base_url_ver),
m.get(f"{self.base_url_ver}/details",
status_code=404,
headers={'content-type': 'application/json'})
self.assertRaises(exc.NotFound, self.driver.get_details, self.amp)
@requests_mock.mock()
def test_get_details_server_error(self, m):
m.get("{base}/details".format(base=self.base_url_ver),
m.get(f"{self.base_url_ver}/details",
status_code=500)
self.assertRaises(exc.InternalServerError, self.driver.get_details,
self.amp)
@requests_mock.mock()
def test_get_details_service_unavailable(self, m):
m.get("{base}/details".format(base=self.base_url_ver),
m.get(f"{self.base_url_ver}/details",
status_code=503)
self.assertRaises(exc.ServiceUnavailable, self.driver.get_details,
self.amp)
@ -1047,21 +1047,21 @@ class TestAmphoraAPIClientTest(base.TestCase):
def test_get_all_listeners(self, m):
listeners = [{"status": "ONLINE", "provisioning_status": "ACTIVE",
"type": "PASSIVE", "uuid": FAKE_UUID_1}]
m.get("{base}/listeners".format(base=self.base_url_ver),
m.get(f"{self.base_url_ver}/listeners",
json=listeners)
all_listeners = self.driver.get_all_listeners(self.amp)
self.assertEqual(listeners, all_listeners)
@requests_mock.mock()
def test_get_all_listeners_unauthorized(self, m):
m.get("{base}/listeners".format(base=self.base_url_ver),
m.get(f"{self.base_url_ver}/listeners",
status_code=401)
self.assertRaises(exc.Unauthorized, self.driver.get_all_listeners,
self.amp)
@requests_mock.mock()
def test_get_all_listeners_missing(self, m):
m.get("{base}/listeners".format(base=self.base_url_ver),
m.get(f"{self.base_url_ver}/listeners",
status_code=404,
headers={'content-type': 'application/json'})
self.assertRaises(exc.NotFound, self.driver.get_all_listeners,
@ -1069,14 +1069,14 @@ class TestAmphoraAPIClientTest(base.TestCase):
@requests_mock.mock()
def test_get_all_listeners_server_error(self, m):
m.get("{base}/listeners".format(base=self.base_url_ver),
m.get(f"{self.base_url_ver}/listeners",
status_code=500)
self.assertRaises(exc.InternalServerError,
self.driver.get_all_listeners, self.amp)
@requests_mock.mock()
def test_get_all_listeners_service_unavailable(self, m):
m.get("{base}/listeners".format(base=self.base_url_ver),
m.get(f"{self.base_url_ver}/listeners",
status_code=503)
self.assertRaises(exc.ServiceUnavailable,
self.driver.get_all_listeners, self.amp)
@ -1214,14 +1214,14 @@ class TestAmphoraAPIClientTest(base.TestCase):
@requests_mock.mock()
def test_update_cert_for_rotation(self, m):
m.put("{base}/certificate".format(base=self.base_url_ver))
m.put(f"{self.base_url_ver}/certificate")
resp_body = self.driver.update_cert_for_rotation(self.amp,
"some_file")
self.assertEqual(200, resp_body.status_code)
@requests_mock.mock()
def test_update_invalid_cert_for_rotation(self, m):
m.put("{base}/certificate".format(base=self.base_url_ver),
m.put(f"{self.base_url_ver}/certificate",
status_code=400)
self.assertRaises(exc.InvalidRequest,
self.driver.update_cert_for_rotation, self.amp,
@ -1229,7 +1229,7 @@ class TestAmphoraAPIClientTest(base.TestCase):
@requests_mock.mock()
def test_update_cert_for_rotation_unauthorized(self, m):
m.put("{base}/certificate".format(base=self.base_url_ver),
m.put(f"{self.base_url_ver}/certificate",
status_code=401)
self.assertRaises(exc.Unauthorized,
self.driver.update_cert_for_rotation, self.amp,
@ -1237,7 +1237,7 @@ class TestAmphoraAPIClientTest(base.TestCase):
@requests_mock.mock()
def test_update_cert_for_rotation_error(self, m):
m.put("{base}/certificate".format(base=self.base_url_ver),
m.put(f"{self.base_url_ver}/certificate",
status_code=500)
self.assertRaises(exc.InternalServerError,
self.driver.update_cert_for_rotation, self.amp,
@ -1245,7 +1245,7 @@ class TestAmphoraAPIClientTest(base.TestCase):
@requests_mock.mock()
def test_update_cert_for_rotation_unavailable(self, m):
m.put("{base}/certificate".format(base=self.base_url_ver),
m.put(f"{self.base_url_ver}/certificate",
status_code=503)
self.assertRaises(exc.ServiceUnavailable,
self.driver.update_cert_for_rotation, self.amp,
@ -1538,13 +1538,13 @@ class TestAmphoraAPIClientTest(base.TestCase):
@requests_mock.mock()
def test_update_agent_config(self, m):
m.put("{base}/config".format(base=self.base_url_ver))
m.put(f"{self.base_url_ver}/config")
resp_body = self.driver.update_agent_config(self.amp, "some_file")
self.assertEqual(200, resp_body.status_code)
@requests_mock.mock()
def test_update_agent_config_error(self, m):
m.put("{base}/config".format(base=self.base_url_ver), status_code=500)
m.put(f"{self.base_url_ver}/config", status_code=500)
self.assertRaises(exc.InternalServerError,
self.driver.update_agent_config, self.amp,
"some_file")

View File

@ -124,7 +124,7 @@ class TestHeartbeatUDP(base.TestCase):
}
},
"pools": {
"pool-id-1:{}".format(self.listener_id): {
f"pool-id-1:{self.listener_id}": {
"status": constants.UP,
"members": {
"member-id-1": {
@ -160,7 +160,7 @@ class TestHeartbeatUDP(base.TestCase):
}
},
"pools": {
"pool-id-1:{}".format(self.listener_id): {
f"pool-id-1:{self.listener_id}": {
"status": constants.UP,
"members": {
"member-id-1": {
@ -296,7 +296,7 @@ class TestUpdateHealthDb(base.TestCase):
FAKE_UUID_1 = uuidutils.generate_uuid()
def setUp(self):
super(TestUpdateHealthDb, self).setUp()
super().setUp()
session_patch = mock.patch('octavia.db.api.get_session')
self.addCleanup(session_patch.stop)

View File

@ -20,7 +20,7 @@ from octavia.common import constants
from octavia.tests.unit.api.common import base
class TestAvailabilityZoneProfile(object):
class TestAvailabilityZoneProfile:
_type = None

View File

@ -20,7 +20,7 @@ from octavia.api.v2.types import availability_zones as availability_zone_type
from octavia.tests.unit.api.common import base
class TestAvailabilityZone(object):
class TestAvailabilityZone:
_type = None

View File

@ -20,7 +20,7 @@ from octavia.common import constants
from octavia.tests.unit.api.common import base
class TestFlavorProfile(object):
class TestFlavorProfile:
_type = None

View File

@ -20,7 +20,7 @@ from octavia.api.v2.types import flavors as flavor_type
from octavia.tests.unit.api.common import base
class TestFlavor(object):
class TestFlavor:
_type = None

View File

@ -22,7 +22,7 @@ from octavia.common import constants
from octavia.tests.unit.api.v2.types import base
class TestHealthMonitor(object):
class TestHealthMonitor:
_type = None

View File

@ -22,7 +22,7 @@ from octavia.common import constants
from octavia.tests.unit.api.common import base
class TestListener(object):
class TestListener:
_type = None

View File

@ -21,7 +21,7 @@ from octavia.api.v2.types import load_balancer as lb_type
from octavia.tests.unit.api.common import base
class TestLoadBalancer(object):
class TestLoadBalancer:
_type = None

View File

@ -26,7 +26,7 @@ from octavia.common import data_models
from octavia.tests.unit.api.common import base
class TestSessionPersistence(object):
class TestSessionPersistence:
_type = None

View File

@ -34,7 +34,7 @@ class BaseLocalCSRTestCase(base.TestCase):
)
csr = x509.CertificateSigningRequestBuilder().subject_name(
x509.Name([
x509.NameAttribute(x509.oid.NameOID.COMMON_NAME, u"test"),
x509.NameAttribute(x509.oid.NameOID.COMMON_NAME, "test"),
])).sign(csr_key, hashes.SHA256(), backends.default_backend())
self.certificate_signing_request = csr.public_bytes(
serialization.Encoding.PEM)

View File

@ -39,13 +39,13 @@ class TestLocalGenerator(local_csr.BaseLocalCSRTestCase):
ca_cert = ca_cert.not_valid_after(valid_until_datetime)
ca_cert = ca_cert.serial_number(1)
subject_name = x509.Name([
x509.NameAttribute(x509.oid.NameOID.COUNTRY_NAME, u"US"),
x509.NameAttribute(x509.oid.NameOID.COUNTRY_NAME, "US"),
x509.NameAttribute(x509.oid.NameOID.STATE_OR_PROVINCE_NAME,
u"Oregon"),
x509.NameAttribute(x509.oid.NameOID.LOCALITY_NAME, u"Springfield"),
"Oregon"),
x509.NameAttribute(x509.oid.NameOID.LOCALITY_NAME, "Springfield"),
x509.NameAttribute(x509.oid.NameOID.ORGANIZATION_NAME,
u"Springfield Nuclear Power Plant"),
x509.NameAttribute(x509.oid.NameOID.COMMON_NAME, u"maggie1"),
"Springfield Nuclear Power Plant"),
x509.NameAttribute(x509.oid.NameOID.COMMON_NAME, "maggie1"),
])
ca_cert = ca_cert.subject_name(subject_name)
ca_cert = ca_cert.issuer_name(subject_name)

View File

@ -34,7 +34,7 @@ class TestBarbicanManager(base.TestCase):
self.barbican_endpoint = 'http://localhost:9311/v1'
self.secret_uuid = uuid.uuid4()
self.secret_ref = '{0}/secrets/{1}'.format(
self.secret_ref = '{}/secrets/{}'.format(
self.barbican_endpoint, self.secret_uuid
)

View File

@ -39,7 +39,7 @@ class TestBarbicanManager(base.TestCase):
self.private_key_uuid = uuidutils.generate_uuid()
self.private_key_passphrase_uuid = uuidutils.generate_uuid()
self.container_ref = '{0}/containers/{1}'.format(
self.container_ref = '{}/containers/{}'.format(
self.barbican_endpoint, self.container_uuid
)

View File

@ -62,13 +62,13 @@ class TestLocalManager(base.TestCase):
mode = stat.S_IRUSR | stat.S_IWUSR # mode 0600
open_mock.assert_has_calls([
mock.call(
os.path.join('/tmp/{0}.crt'.format(cert_id)), flags, mode),
os.path.join(f'/tmp/{cert_id}.crt'), flags, mode),
mock.call(
os.path.join('/tmp/{0}.key'.format(cert_id)), flags, mode),
os.path.join(f'/tmp/{cert_id}.key'), flags, mode),
mock.call(
os.path.join('/tmp/{0}.int'.format(cert_id)), flags, mode),
os.path.join(f'/tmp/{cert_id}.int'), flags, mode),
mock.call(
os.path.join('/tmp/{0}.pass'.format(cert_id)), flags, mode)
os.path.join(f'/tmp/{cert_id}.pass'), flags, mode)
], any_order=True)
# Verify the writes were made
@ -98,10 +98,10 @@ class TestLocalManager(base.TestCase):
# Verify the correct files were opened
flags = os.O_RDONLY
open_mock.assert_has_calls([
mock.call(os.path.join('/tmp/{0}.crt'.format(cert_id)), flags),
mock.call(os.path.join('/tmp/{0}.key'.format(cert_id)), flags),
mock.call(os.path.join('/tmp/{0}.int'.format(cert_id)), flags),
mock.call(os.path.join('/tmp/{0}.pass'.format(cert_id)), flags)
mock.call(os.path.join(f'/tmp/{cert_id}.crt'), flags),
mock.call(os.path.join(f'/tmp/{cert_id}.key'), flags),
mock.call(os.path.join(f'/tmp/{cert_id}.int'), flags),
mock.call(os.path.join(f'/tmp/{cert_id}.pass'), flags)
], any_order=True)
# The returned data should be a Cert object
@ -117,10 +117,10 @@ class TestLocalManager(base.TestCase):
# Verify the correct files were removed
remove_mock.assert_has_calls([
mock.call(os.path.join('/tmp/{0}.crt'.format(cert_id))),
mock.call(os.path.join('/tmp/{0}.key'.format(cert_id))),
mock.call(os.path.join('/tmp/{0}.int'.format(cert_id))),
mock.call(os.path.join('/tmp/{0}.pass'.format(cert_id)))
mock.call(os.path.join(f'/tmp/{cert_id}.crt')),
mock.call(os.path.join(f'/tmp/{cert_id}.key')),
mock.call(os.path.join(f'/tmp/{cert_id}.int')),
mock.call(os.path.join(f'/tmp/{cert_id}.pass'))
], any_order=True)
def test_store_cert(self):
@ -151,8 +151,7 @@ class TestLocalManager(base.TestCase):
# Verify the correct files were opened
flags = os.O_RDONLY
open_mock.assert_called_once_with('/tmp/{0}.crt'.format(secret_id),
flags)
open_mock.assert_called_once_with(f'/tmp/{secret_id}.crt', flags)
# Test failure path
with mock.patch('os.open', open_mock), mock.patch.object(

View File

@ -28,7 +28,7 @@ CONF = cfg.CONF
class TestHealthCheckerCMD(base.TestCase):
def setUp(self):
super(TestHealthCheckerCMD, self).setUp()
super().setUp()
self.CONF = self.useFixture(oslo_fixture.Config(cfg.CONF))
def test_crc32c(self):

View File

@ -1555,11 +1555,9 @@ class TestHaproxyCfg(base.TestCase):
"check check-alpn {alpn} inter 30s fall 3 rise 2 cookie "
"sample_member_id_2 {opts} alpn {alpn}\n\n").format(
maxconn=constants.HAPROXY_DEFAULT_MAXCONN,
opts="%s %s %s %s %s %s" % (
"ssl", "crt", pool_client_cert,
"ca-file %s" % pool_ca_cert,
"crl-file %s" % pool_crl,
"verify required sni ssl_fc_sni no-sslv3 no-tlsv10 no-tlsv11"),
opts=(f"ssl crt {pool_client_cert} ca-file {pool_ca_cert} "
f"crl-file {pool_crl} verify required sni ssl_fc_sni "
f"no-sslv3 no-tlsv10 no-tlsv11"),
alpn=",".join(constants.AMPHORA_SUPPORTED_ALPN_PROTOCOLS))
rendered_obj = self.jinja_cfg.render_loadbalancer_obj(
sample_configs_combined.sample_amphora_tuple(),

View File

@ -89,7 +89,7 @@ class LoggingJinjaTestCase(base.TestCase):
lj = logging_jinja_cfg.LoggingJinjaTemplater()
expected_config = (
u'*.* stop')
'*.* stop')
logging_cfg = lj.build_logging_config()

View File

@ -23,13 +23,13 @@ import octavia.tests.unit.base as base
class TestDecorator(base.TestCase):
@dec.rename_kwargs(a='z')
class TestClass(object):
class TestClass:
def __init__(self, x, z=None):
self.x = x
self.z = z
@dec.rename_kwargs(a='z')
class TestClassDupe(object):
class TestClassDupe:
def __init__(self, x, z=None):
self.x = x
self.z = z

View File

@ -22,7 +22,7 @@ import octavia.tests.unit.base as base
class TestRetryTasks(base.TestCase):
def setUp(self):
super(TestRetryTasks, self).setUp()
super().setUp()
@mock.patch('time.sleep')
def test_sleeping_retry_times_controller(self, mock_sleep):

View File

@ -156,11 +156,11 @@ class HackingTestCase(base.BaseTestCase):
def test_no_log_translations(self):
for log in checks._all_log_levels:
for hint in checks._all_hints:
bad = 'LOG.%s(%s("Bad"))' % (log, hint)
bad = f'LOG.{log}({hint}("Bad"))'
self.assertEqual(
1, len(list(checks.no_translate_logs(bad, 'f'))))
# Catch abuses when used with a variable and not a literal
bad = 'LOG.%s(%s(msg))' % (log, hint)
bad = f'LOG.{log}({hint}(msg))'
self.assertEqual(
1, len(list(checks.no_translate_logs(bad, 'f'))))
# Do not do validations in tests

View File

@ -25,7 +25,7 @@ CONF = cfg.CONF
class TestNoopImageDriver(base.TestCase):
def setUp(self):
super(TestNoopImageDriver, self).setUp()
super().setUp()
self.driver = driver.NoopImageDriver()
def test_get_image_id_by_tag(self):

View File

@ -27,7 +27,7 @@ class TestGlanceClient(base.TestCase):
self.manager = glance_common.ImageManager()
self.manager.manager = mock.MagicMock()
super(TestGlanceClient, self).setUp()
super().setUp()
def test_no_images(self):
self.manager.manager.list.return_value = []

View File

@ -25,7 +25,7 @@ from octavia.tests.unit import base
class TestStatsUpdateDb(base.TestCase):
def setUp(self):
super(TestStatsUpdateDb, self).setUp()
super().setUp()
self.amphora_id = uuidutils.generate_uuid()
self.listener_id = uuidutils.generate_uuid()

View File

@ -29,7 +29,7 @@ STATS_DRIVERS = ['stats_db', 'stats_logger']
class TestStatsBase(base.TestCase):
def setUp(self):
super(TestStatsBase, self).setUp()
super().setUp()
self.conf = oslo_fixture.Config(cfg.CONF)
self.conf.config(group="controller_worker",