From e4555733d40c6b84ec77aaaee25e89f81354ac60 Mon Sep 17 00:00:00 2001 From: zhangtongjian <125163227@qq.com> Date: Fri, 3 Jun 2022 11:18:58 +0800 Subject: [PATCH] Remove usage of six Remove six-library Replace the following items with Python 3 style code. - six.iteritems - six.string_types Change-Id: I102520d28c9189a38cd1760b040eecdd4046b16f --- .../service_drivers/agents/l2/fwaas_v2.py | 3 +- .../db/firewall/v2/test_firewall_db_v2.py | 45 +++++++++---------- .../service_drivers/agents/test_agents.py | 3 +- .../services/firewall/test_fwaas_plugin_v2.py | 3 +- 4 files changed, 25 insertions(+), 29 deletions(-) diff --git a/neutron_fwaas/services/firewall/service_drivers/agents/l2/fwaas_v2.py b/neutron_fwaas/services/firewall/service_drivers/agents/l2/fwaas_v2.py index f9b0394d2..e82b6a5b6 100644 --- a/neutron_fwaas/services/firewall/service_drivers/agents/l2/fwaas_v2.py +++ b/neutron_fwaas/services/firewall/service_drivers/agents/l2/fwaas_v2.py @@ -15,7 +15,6 @@ from oslo_concurrency import lockutils from oslo_config import cfg from oslo_log import log as logging -import six from neutron.agent import securitygroups_rpc from neutron import manager @@ -426,7 +425,7 @@ class PortFirewallGroupMap(object): # information. Need to consider map initialization in __init__() def port_id(self, port): - return (port if isinstance(port, six.string_types) + return (port if isinstance(port, str) else port.get('port_id', port.get('id'))) def get_fwg(self, fwg_id): diff --git a/neutron_fwaas/tests/unit/db/firewall/v2/test_firewall_db_v2.py b/neutron_fwaas/tests/unit/db/firewall/v2/test_firewall_db_v2.py index 41c21db5d..595e7ce4e 100644 --- a/neutron_fwaas/tests/unit/db/firewall/v2/test_firewall_db_v2.py +++ b/neutron_fwaas/tests/unit/db/firewall/v2/test_firewall_db_v2.py @@ -14,7 +14,6 @@ # under the License. import mock -import six import testtools import webob.exc @@ -54,7 +53,7 @@ class TestFirewallDBPluginV2(test_fwaas_plugin_v2.FirewallPluginV2TestCase): with self.firewall_policy(name=name, shared=self.SHARED, firewall_rules=None, audited=self.AUDITED ) as firewall_policy: - for k, v in six.iteritems(attrs): + for k, v in attrs.items(): self.assertEqual(v, firewall_policy['firewall_policy'][k]) def test_create_firewall_policy_with_rules(self): @@ -70,7 +69,7 @@ class TestFirewallDBPluginV2(test_fwaas_plugin_v2.FirewallPluginV2TestCase): with self.firewall_policy(name=name, shared=self.SHARED, firewall_rules=fw_rule_ids, audited=self.AUDITED) as fwp: - for k, v in six.iteritems(attrs): + for k, v in attrs.items(): self.assertEqual(v, fwp['firewall_policy'][k]) def test_create_firewall_policy_with_previously_associated_rule(self): @@ -92,7 +91,7 @@ class TestFirewallDBPluginV2(test_fwaas_plugin_v2.FirewallPluginV2TestCase): audited=self.AUDITED) as fwp: res = self._show_req('firewall_policies', fwp['firewall_policy']['id']) - for k, v in six.iteritems(attrs): + for k, v in attrs.items(): self.assertEqual(v, res['firewall_policy'][k]) def test_list_firewall_policies(self): @@ -114,7 +113,7 @@ class TestFirewallDBPluginV2(test_fwaas_plugin_v2.FirewallPluginV2TestCase): req = self.new_update_request('firewall_policies', data, fwp['firewall_policy']['id']) res = self.deserialize(self.fmt, req.get_response(self.ext_api)) - for k, v in six.iteritems(attrs): + for k, v in attrs.items(): self.assertEqual(v, res['firewall_policy'][k]) def _test_update_firewall_policy(self, with_audited): @@ -131,7 +130,7 @@ class TestFirewallDBPluginV2(test_fwaas_plugin_v2.FirewallPluginV2TestCase): res = self.deserialize(self.fmt, req.get_response(self.ext_api)) attrs['description'] = 'fw_p1' - for k, v in six.iteritems(attrs): + for k, v in attrs.items(): self.assertEqual(v, res['firewall_policy'][k]) def test_update_firewall_policy_set_audited_false(self): @@ -161,7 +160,7 @@ class TestFirewallDBPluginV2(test_fwaas_plugin_v2.FirewallPluginV2TestCase): # TODO(sridar): set it so that the ordering is maintained res['firewall_policy']['firewall_rules'] = sorted( res['firewall_policy']['firewall_rules']) - for k, v in six.iteritems(attrs): + for k, v in attrs.items(): self.assertEqual(v, res['firewall_policy'][k]) def test_update_firewall_policy_replace_rules(self): @@ -191,7 +190,7 @@ class TestFirewallDBPluginV2(test_fwaas_plugin_v2.FirewallPluginV2TestCase): res = self.deserialize(self.fmt, req.get_response(self.ext_api)) attrs['audited'] = False - for k, v in six.iteritems(attrs): + for k, v in attrs.items(): self.assertEqual(v, res['firewall_policy'][k]) @testtools.skip('bug/1614673') @@ -256,7 +255,7 @@ class TestFirewallDBPluginV2(test_fwaas_plugin_v2.FirewallPluginV2TestCase): # check if none of the rules got added to the policy res = self._show_req('firewall_policies', fwp['firewall_policy']['id']) - for k, v in six.iteritems(attrs): + for k, v in attrs.items(): self.assertEqual(v, res['firewall_policy'][k]) def test_update_shared_firewall_policy_with_nonshared_rule(self): @@ -434,28 +433,28 @@ class TestFirewallDBPluginV2(test_fwaas_plugin_v2.FirewallPluginV2TestCase): attrs = self._get_test_firewall_rule_attrs() with self.firewall_rule() as firewall_rule: - for k, v in six.iteritems(attrs): + for k, v in attrs.items(): self.assertEqual(v, firewall_rule['firewall_rule'][k]) attrs['source_port'] = None attrs['destination_port'] = None with self.firewall_rule(source_port=None, destination_port=None) as firewall_rule: - for k, v in six.iteritems(attrs): + for k, v in attrs.items(): self.assertEqual(v, firewall_rule['firewall_rule'][k]) attrs['source_port'] = '10000' attrs['destination_port'] = '80' with self.firewall_rule(source_port=10000, destination_port=80) as firewall_rule: - for k, v in six.iteritems(attrs): + for k, v in attrs.items(): self.assertEqual(v, firewall_rule['firewall_rule'][k]) attrs['source_port'] = '10000' attrs['destination_port'] = '80' with self.firewall_rule(source_port='10000', destination_port='80') as firewall_rule: - for k, v in six.iteritems(attrs): + for k, v in attrs.items(): self.assertEqual(v, firewall_rule['firewall_rule'][k]) def test_create_firewall_src_port_illegal_range(self): @@ -485,7 +484,7 @@ class TestFirewallDBPluginV2(test_fwaas_plugin_v2.FirewallPluginV2TestCase): with self.firewall_rule(source_port=None, destination_port=None, protocol='icmp') as firewall_rule: - for k, v in six.iteritems(attrs): + for k, v in attrs.items(): self.assertEqual(v, firewall_rule['firewall_rule'][k]) def test_create_firewall_without_source(self): @@ -519,7 +518,7 @@ class TestFirewallDBPluginV2(test_fwaas_plugin_v2.FirewallPluginV2TestCase): with self.firewall_rule() as fw_rule: res = self._show_req('firewall_rules', fw_rule['firewall_rule']['id']) - for k, v in six.iteritems(attrs): + for k, v in attrs.items(): self.assertEqual(v, res['firewall_rule'][k]) @testtools.skip('bug/1614673') @@ -537,7 +536,7 @@ class TestFirewallDBPluginV2(test_fwaas_plugin_v2.FirewallPluginV2TestCase): req.get_response(self.ext_api) res = self._show_req('firewall_rules', fw_rule['firewall_rule']['id']) - for k, v in six.iteritems(attrs): + for k, v in attrs.items(): self.assertEqual(v, res['firewall_rule'][k]) def test_create_firewall_rule_with_ipv6_addrs_and_wrong_ip_version(self): @@ -586,7 +585,7 @@ class TestFirewallDBPluginV2(test_fwaas_plugin_v2.FirewallPluginV2TestCase): fwr['firewall_rule']['id']) res = self.deserialize(self.fmt, req.get_response(self.ext_api)) - for k, v in six.iteritems(attrs): + for k, v in attrs.items(): self.assertEqual(v, res['firewall_rule'][k]) attrs['source_port'] = '10000' @@ -600,7 +599,7 @@ class TestFirewallDBPluginV2(test_fwaas_plugin_v2.FirewallPluginV2TestCase): fwr['firewall_rule']['id']) res = self.deserialize(self.fmt, req.get_response(self.ext_api)) - for k, v in six.iteritems(attrs): + for k, v in attrs.items(): self.assertEqual(v, res['firewall_rule'][k]) attrs['source_port'] = '10000' @@ -614,7 +613,7 @@ class TestFirewallDBPluginV2(test_fwaas_plugin_v2.FirewallPluginV2TestCase): fwr['firewall_rule']['id']) res = self.deserialize(self.fmt, req.get_response(self.ext_api)) - for k, v in six.iteritems(attrs): + for k, v in attrs.items(): self.assertEqual(v, res['firewall_rule'][k]) attrs['source_port'] = None @@ -627,7 +626,7 @@ class TestFirewallDBPluginV2(test_fwaas_plugin_v2.FirewallPluginV2TestCase): fwr['firewall_rule']['id']) res = self.deserialize(self.fmt, req.get_response(self.ext_api)) - for k, v in six.iteritems(attrs): + for k, v in attrs.items(): self.assertEqual(v, res['firewall_rule'][k]) def test_update_firewall_rule_with_port_and_no_proto(self): @@ -729,7 +728,7 @@ class TestFirewallDBPluginV2(test_fwaas_plugin_v2.FirewallPluginV2TestCase): fwr['firewall_rule']['id']) res = self.deserialize(self.fmt, req.get_response(self.ext_api)) - for k, v in six.iteritems(attrs): + for k, v in attrs.items(): self.assertEqual(v, res['firewall_rule'][k]) res = self._show_req('firewall_policies', fwp['firewall_policy']['id']) @@ -1038,7 +1037,7 @@ class TestFirewallDBPluginV2(test_fwaas_plugin_v2.FirewallPluginV2TestCase): admin_state_up=self.ADMIN_STATE_UP) as firewall_group: res = self._show_req('firewall_groups', firewall_group['firewall_group']['id']) - for k, v in six.iteritems(attrs): + for k, v in attrs.items(): self.assertEqual(v, res['firewall_group'][k]) def test_show_firewall_group(self): @@ -1089,7 +1088,7 @@ class TestFirewallDBPluginV2(test_fwaas_plugin_v2.FirewallPluginV2TestCase): firewall['firewall_group']['id']) res = self.deserialize(self.fmt, req.get_response(self.ext_api)) - for k, v in six.iteritems(attrs): + for k, v in attrs.items(): self.assertEqual(v, res['firewall_group'][k]) def test_existing_default_create_default_firewall_group(self): diff --git a/neutron_fwaas/tests/unit/services/firewall/service_drivers/agents/test_agents.py b/neutron_fwaas/tests/unit/services/firewall/service_drivers/agents/test_agents.py index ce92b3f83..204d36f8a 100644 --- a/neutron_fwaas/tests/unit/services/firewall/service_drivers/agents/test_agents.py +++ b/neutron_fwaas/tests/unit/services/firewall/service_drivers/agents/test_agents.py @@ -14,7 +14,6 @@ # under the License. import mock -import six from neutron import extensions as neutron_extensions from neutron.tests.unit.extensions import test_l3 @@ -526,7 +525,7 @@ class TestAgentDriver(test_fwaas_plugin_v2.FirewallPluginV2TestCase, fwr['firewall_rule']['id']) res = self.deserialize(self.fmt, req.get_response(self.ext_api)) - for k, v in six.iteritems(attrs): + for k, v in attrs.items(): self.assertEqual(v, res['firewall_rule'][k]) def test_update_firewall_rule_on_pending_create_fwg(self): diff --git a/neutron_fwaas/tests/unit/services/firewall/test_fwaas_plugin_v2.py b/neutron_fwaas/tests/unit/services/firewall/test_fwaas_plugin_v2.py index 5d6582d51..da71dab60 100644 --- a/neutron_fwaas/tests/unit/services/firewall/test_fwaas_plugin_v2.py +++ b/neutron_fwaas/tests/unit/services/firewall/test_fwaas_plugin_v2.py @@ -16,7 +16,6 @@ import contextlib import mock -import six import webob.exc from neutron.api import extensions as api_ext @@ -424,7 +423,7 @@ class FirewallPluginV2TestCase(test_db_plugin.NeutronDbPluginV2TestCase): admin_state_up=self.ADMIN_STATE_UP, ports=attrs['ports'] if 'ports' in attrs else None, ) as firewall_group: - for k, v in six.iteritems(attrs): + for k, v in attrs.items(): self.assertEqual(v, firewall_group['firewall_group'][k])