From 3046c4ae22b10f9e4fa83a47bfe089554d4a4681 Mon Sep 17 00:00:00 2001 From: Salvatore Orlando Date: Thu, 19 Dec 2013 00:26:38 -0800 Subject: [PATCH] Improve handling of security group updates Currently updates to security group rules or membership are handled by immediately triggering a call to refresh_firewall. This call is quite expensive, and it is often executed with a very high frequency. With this patch, the notification handler simply adds devices for which the firewall should be refreshed to a set, which will then be processed in another routine. The latter is supposed to be called in the main agent loop. This patch for 'provider updates' simply sets a flag for refreshing the firewall for all devices. In order to avoid breaking other agents leveraging the security group RPC mixin, the reactive behaviour is still available, and is still the default way of handling security group updates. Partial-Bug: #1253993 Partially implements blueprint: neutron-tempest-parallel Change-Id: I1574544734865506ff5383404516cc9349c16ec4 --- neutron/agent/securitygroups_rpc.py | 81 +++++-- .../openvswitch/agent/ovs_neutron_agent.py | 25 +- .../openvswitch/test_ovs_neutron_agent.py | 11 +- .../tests/unit/test_security_groups_rpc.py | 215 +++++++++++++++++- 4 files changed, 296 insertions(+), 36 deletions(-) diff --git a/neutron/agent/securitygroups_rpc.py b/neutron/agent/securitygroups_rpc.py index a38e4355bff..666438b7bcf 100644 --- a/neutron/agent/securitygroups_rpc.py +++ b/neutron/agent/securitygroups_rpc.py @@ -107,10 +107,18 @@ class SecurityGroupAgentRpcMixin(object): support in agent implementations. """ - def init_firewall(self): + def init_firewall(self, defer_refresh_firewall=False): firewall_driver = cfg.CONF.SECURITYGROUP.firewall_driver LOG.debug(_("Init firewall settings (driver=%s)"), firewall_driver) self.firewall = importutils.import_object(firewall_driver) + # The following flag will be set to true if port filter must not be + # applied as soon as a rule or membership notification is received + self.defer_refresh_firewall = defer_refresh_firewall + # Stores devices for which firewall should be refreshed when + # deferred refresh is enabled. + self.devices_to_refilter = set() + # Flag raised when a global refresh is needed + self.global_refresh_firewall = False def prepare_devices_filter(self, device_ids): if not device_ids: @@ -141,14 +149,24 @@ class SecurityGroupAgentRpcMixin(object): sec_grp_set = set(security_groups) for device in self.firewall.ports.values(): if sec_grp_set & set(device.get(attribute, [])): - devices.append(device) - - if devices: + devices.append(device['device']) + if self.defer_refresh_firewall: + LOG.debug(_("Adding %s devices to the list of devices " + "for which firewall needs to be refreshed"), + devices) + self.devices_to_refilter |= set(devices) + elif devices: self.refresh_firewall(devices) def security_groups_provider_updated(self): LOG.info(_("Provider rule updated")) - self.refresh_firewall() + if self.defer_refresh_firewall: + # NOTE(salv-orlando): A 'global refresh' might not be + # necessary if the subnet for which the provider rules + # were updated is known + self.global_refresh_firewall = True + else: + self.refresh_firewall() def remove_devices_filter(self, device_ids): if not device_ids: @@ -161,16 +179,13 @@ class SecurityGroupAgentRpcMixin(object): continue self.firewall.remove_port_filter(device) - def refresh_firewall(self, devices=None): + def refresh_firewall(self, device_ids=None): LOG.info(_("Refresh firewall rules")) - - if devices: - device_ids = [d['device'] for d in devices] - else: - device_ids = self.firewall.ports.keys() if not device_ids: - LOG.info(_("No ports here to refresh firewall")) - return + device_ids = self.firewall.ports.keys() + if not device_ids: + LOG.info(_("No ports here to refresh firewall")) + return devices = self.plugin_rpc.security_group_rules_for_devices( self.context, device_ids) with self.firewall.defer_apply(): @@ -178,6 +193,46 @@ class SecurityGroupAgentRpcMixin(object): LOG.debug(_("Update port filter for %s"), device['device']) self.firewall.update_port_filter(device) + def firewall_refresh_needed(self): + return self.global_refresh_firewall or self.devices_to_refilter + + def setup_port_filters(self, new_devices, updated_devices): + """Configure port filters for devices. + + This routine applies filters for new devices and refreshes firewall + rules when devices have been updated, or when there are changes in + security group membership or rules. + + :param new_devices: set containing identifiers for new devices + :param updated_devices: set containining identifiers for + updated devices + """ + if new_devices: + LOG.debug(_("Preparing device filters for %d new devices"), + len(new_devices)) + self.prepare_devices_filter(new_devices) + # These data structures are cleared here in order to avoid + # losing updates occurring during firewall refresh + devices_to_refilter = self.devices_to_refilter + global_refresh_firewall = self.global_refresh_firewall + self.devices_to_refilter = set() + self.global_refresh_firewall = False + # TODO(salv-orlando): Avoid if possible ever performing the global + # refresh providing a precise list of devices for which firewall + # should be refreshed + if global_refresh_firewall: + LOG.debug(_("Refreshing firewall for all filtered devices")) + self.refresh_firewall() + else: + # If a device is both in new and updated devices + # avoid reprocessing it + updated_devices = ((updated_devices | devices_to_refilter) - + new_devices) + if updated_devices: + LOG.debug(_("Refreshing firewall for %d devices"), + len(updated_devices)) + self.refresh_firewall(updated_devices) + class SecurityGroupAgentRpcApiMixin(object): diff --git a/neutron/plugins/openvswitch/agent/ovs_neutron_agent.py b/neutron/plugins/openvswitch/agent/ovs_neutron_agent.py index 22467a71a92..9ab6cfc243a 100644 --- a/neutron/plugins/openvswitch/agent/ovs_neutron_agent.py +++ b/neutron/plugins/openvswitch/agent/ovs_neutron_agent.py @@ -117,7 +117,7 @@ class OVSSecurityGroupAgent(sg_rpc.SecurityGroupAgentRpcMixin): self.context = context self.plugin_rpc = plugin_rpc self.root_helper = root_helper - self.init_firewall() + self.init_firewall(defer_refresh_firewall=True) class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, @@ -1019,14 +1019,16 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, def process_network_ports(self, port_info): resync_a = False resync_b = False + # TODO(salv-orlando): consider a solution for ensuring notifications + # are processed exactly in the same order in which they were + # received. This is tricky because there are two notification + # sources: the neutron server, and the ovs db monitor process # If there is an exception while processing security groups ports # will not be wired anyway, and a resync will be triggered - self.sg_agent.prepare_devices_filter(port_info.get('added', set())) - # TODO(salv-orlando): Optimize by avoiding unnecessary applying - # filters twice to the same ports, and unnecessary calls to the - # plugin (eg: when there are no IP address changes) - if port_info.get('updated'): - self.sg_agent.refresh_firewall() + # TODO(salv-orlando): Optimize avoiding applying filters unnecessarily + # (eg: when there are no IP address changes) + self.sg_agent.setup_port_filters(port_info.get('added', set()), + port_info.get('updated', set())) # VIF wiring needs to be performed always for 'new' devices. # For updated ports, re-wiring is not needed in most cases, but needs # to be performed anyway when the admin state of a device is changed. @@ -1101,7 +1103,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, def _agent_has_updates(self, polling_manager): return (polling_manager.is_polling_required or - self.updated_ports) + self.updated_ports or + self.sg_agent.firewall_refresh_needed()) def _port_info_has_changes(self, port_info): return (port_info.get('added') or @@ -1159,8 +1162,10 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, "Elapsed:%(elapsed).3f"), {'iter_num': self.iter_num, 'elapsed': time.time() - start}) - # notify plugin about port deltas - if self._port_info_has_changes(port_info): + # Secure and wire/unwire VIFs and update their status + # on Neutron server + if (self._port_info_has_changes(port_info) or + self.sg_agent.firewall_refresh_needed()): LOG.debug(_("Starting to process devices in:%s"), port_info) # If treat devices fails - must resync with plugin diff --git a/neutron/tests/unit/openvswitch/test_ovs_neutron_agent.py b/neutron/tests/unit/openvswitch/test_ovs_neutron_agent.py index 18f1c60015a..a2f94aee2dd 100644 --- a/neutron/tests/unit/openvswitch/test_ovs_neutron_agent.py +++ b/neutron/tests/unit/openvswitch/test_ovs_neutron_agent.py @@ -333,18 +333,15 @@ class TestOvsNeutronAgent(base.BaseTestCase): def _test_process_network_ports(self, port_info): with contextlib.nested( - mock.patch.object(self.agent.sg_agent, "prepare_devices_filter"), - mock.patch.object(self.agent.sg_agent, "refresh_firewall"), + mock.patch.object(self.agent.sg_agent, "setup_port_filters"), mock.patch.object(self.agent, "treat_devices_added_or_updated", return_value=False), mock.patch.object(self.agent, "treat_devices_removed", return_value=False) - ) as (prep_dev_filter, refresh_fw, - device_added_updated, device_removed): + ) as (setup_port_filters, device_added_updated, device_removed): self.assertFalse(self.agent.process_network_ports(port_info)) - prep_dev_filter.assert_called_once_with(port_info['added']) - if port_info.get('updated'): - self.assertEqual(1, refresh_fw.call_count) + setup_port_filters.assert_called_once_with( + port_info['added'], port_info.get('updated', set())) device_added_updated.assert_called_once_with( port_info['added'] | port_info.get('updated', set())) device_removed.assert_called_once_with(port_info['removed']) diff --git a/neutron/tests/unit/test_security_groups_rpc.py b/neutron/tests/unit/test_security_groups_rpc.py index a0a386c5554..eeb7602f85c 100644 --- a/neutron/tests/unit/test_security_groups_rpc.py +++ b/neutron/tests/unit/test_security_groups_rpc.py @@ -15,6 +15,7 @@ # License for the specific language governing permissions and limitations # under the License. +from contextlib import contextmanager from contextlib import nested import mock @@ -501,14 +502,14 @@ class SGAgentRpcCallBackMixinTestCase(base.BaseTestCase): class SecurityGroupAgentRpcTestCase(base.BaseTestCase): - def setUp(self): + def setUp(self, defer_refresh_firewall=False): super(SecurityGroupAgentRpcTestCase, self).setUp() self.agent = sg_rpc.SecurityGroupAgentRpcMixin() self.agent.context = None self.addCleanup(mock.patch.stopall) mock.patch('neutron.agent.linux.iptables_manager').start() self.agent.root_helper = 'sudo' - self.agent.init_firewall() + self.agent.init_firewall(defer_refresh_firewall=defer_refresh_firewall) self.firewall = mock.Mock() firewall_object = firewall_base.FirewallDriver() self.firewall.defer_apply.side_effect = firewall_object.defer_apply @@ -543,7 +544,7 @@ class SecurityGroupAgentRpcTestCase(base.BaseTestCase): self.agent.prepare_devices_filter(['fake_port_id']) self.agent.security_groups_rule_updated(['fake_sgid1', 'fake_sgid3']) self.agent.refresh_firewall.assert_has_calls( - [call.refresh_firewall([self.fake_device])]) + [call.refresh_firewall([self.fake_device['device']])]) def test_security_groups_rule_not_updated(self): self.agent.refresh_firewall = mock.Mock() @@ -556,7 +557,7 @@ class SecurityGroupAgentRpcTestCase(base.BaseTestCase): self.agent.prepare_devices_filter(['fake_port_id']) self.agent.security_groups_member_updated(['fake_sgid2', 'fake_sgid3']) self.agent.refresh_firewall.assert_has_calls( - [call.refresh_firewall([self.fake_device])]) + [call.refresh_firewall([self.fake_device['device']])]) def test_security_groups_member_not_updated(self): self.agent.refresh_firewall = mock.Mock() @@ -593,6 +594,208 @@ class SecurityGroupAgentRpcTestCase(base.BaseTestCase): self.firewall.assert_has_calls([]) +class SecurityGroupAgentRpcWithDeferredRefreshTestCase( + SecurityGroupAgentRpcTestCase): + + def setUp(self): + super(SecurityGroupAgentRpcWithDeferredRefreshTestCase, self).setUp( + defer_refresh_firewall=True) + + @contextmanager + def add_fake_device(self, device, sec_groups, source_sec_groups=None): + fake_device = {'device': device, + 'security_groups': sec_groups, + 'security_group_source_groups': source_sec_groups or [], + 'security_group_rules': [{'security_group_id': + 'fake_sgid1', + 'remote_group_id': + 'fake_sgid2'}]} + self.firewall.ports[device] = fake_device + yield + del self.firewall.ports[device] + + def test_security_groups_rule_updated(self): + self.agent.security_groups_rule_updated(['fake_sgid1', 'fake_sgid3']) + self.assertIn('fake_device', self.agent.devices_to_refilter) + + def test_multiple_security_groups_rule_updated_same_port(self): + with self.add_fake_device(device='fake_device_2', + sec_groups=['fake_sgidX']): + self.agent.refresh_firewall = mock.Mock() + self.agent.security_groups_rule_updated(['fake_sgid1']) + self.agent.security_groups_rule_updated(['fake_sgid2']) + self.assertIn('fake_device', self.agent.devices_to_refilter) + self.assertNotIn('fake_device_2', self.agent.devices_to_refilter) + + def test_security_groups_rule_updated_multiple_ports(self): + with self.add_fake_device(device='fake_device_2', + sec_groups=['fake_sgid2']): + self.agent.refresh_firewall = mock.Mock() + self.agent.security_groups_rule_updated(['fake_sgid1', + 'fake_sgid2']) + self.assertIn('fake_device', self.agent.devices_to_refilter) + self.assertIn('fake_device_2', self.agent.devices_to_refilter) + + def test_multiple_security_groups_rule_updated_multiple_ports(self): + with self.add_fake_device(device='fake_device_2', + sec_groups=['fake_sgid2']): + self.agent.refresh_firewall = mock.Mock() + self.agent.security_groups_rule_updated(['fake_sgid1']) + self.agent.security_groups_rule_updated(['fake_sgid2']) + self.assertIn('fake_device', self.agent.devices_to_refilter) + self.assertIn('fake_device_2', self.agent.devices_to_refilter) + + def test_security_groups_member_updated(self): + self.agent.security_groups_member_updated(['fake_sgid2', 'fake_sgid3']) + self.assertIn('fake_device', self.agent.devices_to_refilter) + + def test_multiple_security_groups_member_updated_same_port(self): + with self.add_fake_device(device='fake_device_2', + sec_groups=['fake_sgid1', 'fake_sgid1B'], + source_sec_groups=['fake_sgidX']): + self.agent.refresh_firewall = mock.Mock() + self.agent.security_groups_member_updated(['fake_sgid1', + 'fake_sgid3']) + self.agent.security_groups_member_updated(['fake_sgid2', + 'fake_sgid3']) + self.assertIn('fake_device', self.agent.devices_to_refilter) + self.assertNotIn('fake_device_2', self.agent.devices_to_refilter) + + def test_security_groups_member_updated_multiple_ports(self): + with self.add_fake_device(device='fake_device_2', + sec_groups=['fake_sgid1', 'fake_sgid1B'], + source_sec_groups=['fake_sgid2']): + self.agent.security_groups_member_updated(['fake_sgid2']) + self.assertIn('fake_device', self.agent.devices_to_refilter) + self.assertIn('fake_device_2', self.agent.devices_to_refilter) + + def test_multiple_security_groups_member_updated_multiple_ports(self): + with self.add_fake_device(device='fake_device_2', + sec_groups=['fake_sgid1', 'fake_sgid1B'], + source_sec_groups=['fake_sgid1B']): + self.agent.security_groups_member_updated(['fake_sgid1B']) + self.agent.security_groups_member_updated(['fake_sgid2']) + self.assertIn('fake_device', self.agent.devices_to_refilter) + self.assertIn('fake_device_2', self.agent.devices_to_refilter) + + def test_security_groups_provider_updated(self): + self.agent.security_groups_provider_updated() + self.assertTrue(self.agent.global_refresh_firewall) + + def test_setup_port_filters_new_ports_only(self): + self.agent.prepare_devices_filter = mock.Mock() + self.agent.refresh_firewall = mock.Mock() + self.agent.devices_to_refilter = set() + self.agent.global_refresh_firewall = False + self.agent.setup_port_filters(set(['fake_new_device']), set()) + self.assertFalse(self.agent.devices_to_refilter) + self.assertFalse(self.agent.global_refresh_firewall) + self.agent.prepare_devices_filter.assert_called_once_with( + set(['fake_new_device'])) + self.assertFalse(self.agent.refresh_firewall.called) + + def test_setup_port_filters_updated_ports_only(self): + self.agent.prepare_devices_filter = mock.Mock() + self.agent.refresh_firewall = mock.Mock() + self.agent.devices_to_refilter = set() + self.agent.global_refresh_firewall = False + self.agent.setup_port_filters(set(), set(['fake_updated_device'])) + self.assertFalse(self.agent.devices_to_refilter) + self.assertFalse(self.agent.global_refresh_firewall) + self.agent.refresh_firewall.assert_called_once_with( + set(['fake_updated_device'])) + self.assertFalse(self.agent.prepare_devices_filter.called) + + def test_setup_port_filter_new_and_updated_ports(self): + self.agent.prepare_devices_filter = mock.Mock() + self.agent.refresh_firewall = mock.Mock() + self.agent.devices_to_refilter = set() + self.agent.global_refresh_firewall = False + self.agent.setup_port_filters(set(['fake_new_device']), + set(['fake_updated_device'])) + self.assertFalse(self.agent.devices_to_refilter) + self.assertFalse(self.agent.global_refresh_firewall) + self.agent.prepare_devices_filter.assert_called_once_with( + set(['fake_new_device'])) + self.agent.refresh_firewall.assert_called_once_with( + set(['fake_updated_device'])) + + def test_setup_port_filters_sg_updates_only(self): + self.agent.prepare_devices_filter = mock.Mock() + self.agent.refresh_firewall = mock.Mock() + self.agent.devices_to_refilter = set(['fake_device']) + self.agent.global_refresh_firewall = False + self.agent.setup_port_filters(set(), set()) + self.assertFalse(self.agent.devices_to_refilter) + self.assertFalse(self.agent.global_refresh_firewall) + self.agent.refresh_firewall.assert_called_once_with( + set(['fake_device'])) + self.assertFalse(self.agent.prepare_devices_filter.called) + + def test_setup_port_filters_sg_updates_and_new_ports(self): + self.agent.prepare_devices_filter = mock.Mock() + self.agent.refresh_firewall = mock.Mock() + self.agent.devices_to_refilter = set(['fake_device']) + self.agent.global_refresh_firewall = False + self.agent.setup_port_filters(set(['fake_new_device']), set()) + self.assertFalse(self.agent.devices_to_refilter) + self.assertFalse(self.agent.global_refresh_firewall) + self.agent.prepare_devices_filter.assert_called_once_with( + set(['fake_new_device'])) + self.agent.refresh_firewall.assert_called_once_with( + set(['fake_device'])) + + def test_setup_port_filters_sg_updates_and_updated_ports(self): + self.agent.prepare_devices_filter = mock.Mock() + self.agent.refresh_firewall = mock.Mock() + self.agent.devices_to_refilter = set(['fake_device', 'fake_device_2']) + self.agent.global_refresh_firewall = False + self.agent.setup_port_filters( + set(), set(['fake_device', 'fake_updated_device'])) + self.assertFalse(self.agent.devices_to_refilter) + self.assertFalse(self.agent.global_refresh_firewall) + self.agent.refresh_firewall.assert_called_once_with( + set(['fake_device', 'fake_device_2', 'fake_updated_device'])) + self.assertFalse(self.agent.prepare_devices_filter.called) + + def test_setup_port_filters_all_updates(self): + self.agent.prepare_devices_filter = mock.Mock() + self.agent.refresh_firewall = mock.Mock() + self.agent.devices_to_refilter = set(['fake_device', 'fake_device_2']) + self.agent.global_refresh_firewall = False + self.agent.setup_port_filters( + set(['fake_new_device']), + set(['fake_device', 'fake_updated_device'])) + self.assertFalse(self.agent.devices_to_refilter) + self.assertFalse(self.agent.global_refresh_firewall) + self.agent.prepare_devices_filter.assert_called_once_with( + set(['fake_new_device'])) + self.agent.refresh_firewall.assert_called_once_with( + set(['fake_device', 'fake_device_2', 'fake_updated_device'])) + + def test_setup_port_filters_no_update(self): + self.agent.prepare_devices_filter = mock.Mock() + self.agent.refresh_firewall = mock.Mock() + self.agent.devices_to_refilter = set() + self.agent.global_refresh_firewall = False + self.agent.setup_port_filters(set(), set()) + self.assertFalse(self.agent.devices_to_refilter) + self.assertFalse(self.agent.global_refresh_firewall) + self.assertFalse(self.agent.refresh_firewall.called) + self.assertFalse(self.agent.prepare_devices_filter.called) + + def test_setup_port_filters_with_global_refresh(self): + self.agent.prepare_devices_filter = mock.Mock() + self.agent.refresh_firewall = mock.Mock() + self.agent.devices_to_refilter = set() + self.agent.global_refresh_firewall = True + self.agent.setup_port_filters(set(), set()) + self.assertFalse(self.agent.devices_to_refilter) + self.assertFalse(self.agent.global_refresh_firewall) + self.agent.refresh_firewall.assert_called_once_with() + self.assertFalse(self.agent.prepare_devices_filter.called) + + class FakeSGRpcApi(agent_rpc.PluginApi, sg_rpc.SecurityGroupServerRpcApiMixin): pass @@ -1228,7 +1431,7 @@ class TestSecurityGroupAgentWithIptables(base.BaseTestCase): PHYSDEV_INGRESS = 'physdev-out' PHYSDEV_EGRESS = 'physdev-in' - def setUp(self): + def setUp(self, defer_refresh_firewall=False): super(TestSecurityGroupAgentWithIptables, self).setUp() cfg.CONF.set_override( 'firewall_driver', @@ -1241,7 +1444,7 @@ class TestSecurityGroupAgentWithIptables(base.BaseTestCase): self.root_helper = 'sudo' self.agent.root_helper = 'sudo' - self.agent.init_firewall() + self.agent.init_firewall(defer_refresh_firewall=defer_refresh_firewall) self.iptables = self.agent.firewall.iptables self.iptables_execute = mock.patch.object(self.iptables,