From de9b39ed2c4423cfb1ea8b3b92f2b0b40e9c9d62 Mon Sep 17 00:00:00 2001 From: ZhaoBo Date: Thu, 31 May 2018 16:49:18 +0800 Subject: [PATCH] [agent side] L3 agent side Floating IP port forwarding This patch contains the l3 agent extension and agent part code. This patch introduce a new l3 agent extension named "port_forwarding", to process the binding of the port forwarding resources, manage its own floatingip configuration on router interface and floatingip status. Currrently, we support all Neutron Router reference implementations. This extension uses the period router sync task and PortForwarding OVO rpc. * The main idea about this new extension is using the generic router sync rpc to maintain the host port forwarding resources, * For a single port forwarding create/update/delete, process it one by one in smaller scope for forbidding refresh the iptables with a larger scope frequently. Partially-Implements: blueprint port-forwarding Partial-Bug: #1491317 Change-Id: Ic56e67d428f6177099c285a9d1bccabc1e710f2b --- .../agent/l3/extensions/port_forwarding.py | 460 ++++++++++++++++++ neutron/agent/l3/router_info.py | 16 +- neutron/agent/linux/ip_lib.py | 14 + .../test_port_forwarding_extension.py | 187 +++++++ .../l3/extensions/test_port_forwarding.py | 417 ++++++++++++++++ neutron/tests/unit/agent/l3/test_agent.py | 46 ++ neutron/tests/unit/agent/linux/test_ip_lib.py | 18 + setup.cfg | 1 + 8 files changed, 1156 insertions(+), 3 deletions(-) create mode 100644 neutron/agent/l3/extensions/port_forwarding.py create mode 100644 neutron/tests/functional/agent/l3/extensions/test_port_forwarding_extension.py create mode 100644 neutron/tests/unit/agent/l3/extensions/test_port_forwarding.py diff --git a/neutron/agent/l3/extensions/port_forwarding.py b/neutron/agent/l3/extensions/port_forwarding.py new file mode 100644 index 00000000000..b8001306458 --- /dev/null +++ b/neutron/agent/l3/extensions/port_forwarding.py @@ -0,0 +1,460 @@ +# Copyright 2018 OpenStack Foundation +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import collections + +import netaddr +from oslo_concurrency import lockutils +from oslo_log import log as logging + +from neutron.agent.linux import ip_lib +from neutron.api.rpc.callbacks.consumer import registry +from neutron.api.rpc.callbacks import events +from neutron.api.rpc.callbacks import resources +from neutron.api.rpc.handlers import resources_rpc +from neutron.common import rpc as n_rpc +from neutron_lib.agent import l3_extension +from neutron_lib import constants as lib_consts + +LOG = logging.getLogger(__name__) +DEFAULT_PORT_FORWARDING_CHAIN = 'fip-pf' +PORT_FORWARDING_PREFIX = 'fip_portforwarding-' +PORT_FORWARDING_CHAIN_PREFIX = 'pf-' +# TODO(bzhao) If there are other files use this constant, and move it into +# constants file. This line will be removed and get the value from constants +# file. +MAX_CHAIN_LEN_WRAP = 11 + + +class RouterFipPortForwardingMapping(object): + def __init__(self): + self.managed_port_forwardings = {} + """ + fip_port_forwarding = { + fip_id_1: set(pf_id1, pf_id2), + fip_id_2: set(pf_id3, pf_id4) + } + """ + self.fip_port_forwarding = collections.defaultdict(set) + """ + router_fip_mapping = { + router_id_1: set(fip_id_1, fip_id_2), + router_id_2: set(fip_id_3, fip_id_4) + } + """ + self.router_fip_mapping = collections.defaultdict(set) + + def set_port_forwardings(self, port_forwardings): + for port_forwarding in port_forwardings: + self.set_fip_port_forwarding(port_forwarding.floatingip_id, + port_forwarding, + port_forwarding.router_id) + + def update_port_forwardings(self, port_forwardings): + for port_forwarding in port_forwardings: + self.managed_port_forwardings[port_forwarding.id] = port_forwarding + + def get_port_forwarding(self, port_forwarding_id): + return self.managed_port_forwardings.get(port_forwarding_id) + + def del_port_forwardings(self, port_forwardings): + for port_forwarding in port_forwardings: + if not self.get_port_forwarding(port_forwarding.id): + continue + self.managed_port_forwardings.pop(port_forwarding.id) + self.fip_port_forwarding[port_forwarding.floatingip_id].remove( + port_forwarding.id) + if not self.fip_port_forwarding[port_forwarding.floatingip_id]: + self.fip_port_forwarding.pop(port_forwarding.floatingip_id) + self.router_fip_mapping[port_forwarding.router_id].remove( + port_forwarding.floatingip_id) + if not self.router_fip_mapping[port_forwarding.router_id]: + del self.router_fip_mapping[port_forwarding.router_id] + + def set_fip_port_forwarding(self, fip_id, pf, router_id): + self.router_fip_mapping[router_id].add(fip_id) + self.fip_port_forwarding[fip_id].add(pf.id) + self.managed_port_forwardings[pf.id] = pf + + def clear_by_fip(self, fip_id, router_id): + self.router_fip_mapping[router_id].remove(fip_id) + if len(self.router_fip_mapping[router_id]) == 0: + del self.router_fip_mapping[router_id] + for pf_id in self.fip_port_forwarding[fip_id]: + del self.managed_port_forwardings[pf_id] + del self.fip_port_forwarding[fip_id] + + def check_port_forwarding_changes(self, new_pf): + old_pf = self.get_port_forwarding(new_pf.id) + return old_pf != new_pf + + +class PortForwardingAgentExtension(l3_extension.L3AgentExtension): + SUPPORTED_RESOURCE_TYPES = [resources.PORTFORWARDING] + + def initialize(self, connection, driver_type): + self.resource_rpc = resources_rpc.ResourcesPullRpcApi() + self._register_rpc_consumers() + self.mapping = RouterFipPortForwardingMapping() + + def _register_rpc_consumers(self): + registry.register(self._handle_notification, + resources.PORTFORWARDING) + + self._connection = n_rpc.Connection() + endpoints = [resources_rpc.ResourcesPushRpcCallback()] + topic = resources_rpc.resource_type_versioned_topic( + resources.PORTFORWARDING) + self._connection.create_consumer(topic, endpoints, fanout=True) + self._connection.consume_in_threads() + + def consume_api(self, agent_api): + self.agent_api = agent_api + + @lockutils.synchronized('port-forwarding') + def _handle_notification(self, context, resource_type, + forwardings, event_type): + for forwarding in forwardings: + self._process_port_forwarding_event( + context, forwarding, event_type) + + def _store_local(self, pf_objs, event_type): + if event_type == events.CREATED: + self.mapping.set_port_forwardings(pf_objs) + elif event_type == events.UPDATED: + self.mapping.update_port_forwardings(pf_objs) + elif event_type == events.DELETED: + self.mapping.del_port_forwardings(pf_objs) + + def _get_fip_rules(self, port_forward, wrap_name): + chain_rule_list = [] + pf_chain_name = self._get_port_forwarding_chain_name(port_forward.id) + chain_rule_list.append((DEFAULT_PORT_FORWARDING_CHAIN, + '-j %s-%s' % + (wrap_name, pf_chain_name))) + floating_ip_address = str(port_forward.floating_ip_address) + protocol = port_forward.protocol + internal_ip_address = str(port_forward.internal_ip_address) + internal_port = port_forward.internal_port + external_port = port_forward.external_port + chain_rule = (pf_chain_name, + '-d %s/32 -p %s -m %s --dport %s ' + '-j DNAT --to-destination %s:%s' % ( + floating_ip_address, protocol, protocol, + external_port, internal_ip_address, + internal_port)) + chain_rule_list.append(chain_rule) + return chain_rule_list + + def _rule_apply(self, iptables_manager, port_forwarding, rule_tag): + iptables_manager.ipv4['nat'].clear_rules_by_tag(rule_tag) + if DEFAULT_PORT_FORWARDING_CHAIN not in iptables_manager.ipv4[ + 'nat'].chains: + self._install_default_rules(iptables_manager) + + for chain, rule in self._get_fip_rules( + port_forwarding, iptables_manager.wrap_name): + if chain not in iptables_manager.ipv4['nat'].chains: + iptables_manager.ipv4['nat'].add_chain(chain) + iptables_manager.ipv4['nat'].add_rule(chain, rule, tag=rule_tag) + + def _process_create(self, port_forwardings, ri, interface_name, namespace, + iptables_manager): + if not port_forwardings: + return + device = ip_lib.IPDevice(interface_name, namespace=namespace) + + is_distributed = ri.router.get('distributed') + ha_port = ri.router.get(lib_consts.HA_INTERFACE_KEY, None) + fip_statuses = {} + for port_forwarding in port_forwardings: + # check if the port forwarding is managed in this agent from + # OVO and router rpc. + if port_forwarding.id in self.mapping.managed_port_forwardings: + LOG.debug("Skip port forwarding %s for create, as it had been " + "managed by agent", port_forwarding.id) + continue + existing_cidrs = ri.get_router_cidrs(device) + fip_ip = str(port_forwarding.floating_ip_address) + fip_cidr = str(netaddr.IPNetwork(fip_ip)) + status = '' + if fip_cidr not in existing_cidrs: + try: + if not is_distributed: + fip_statuses[port_forwarding.floatingip_id] = ( + ri.add_floating_ip( + {'floating_ip_address': fip_ip}, + interface_name, device)) + else: + if not ha_port: + device.addr.add(fip_cidr) + ip_lib.send_ip_addr_adv_notif(namespace, + interface_name, + fip_ip) + else: + ri._add_vip(fip_cidr, interface_name) + status = lib_consts.FLOATINGIP_STATUS_ACTIVE + except Exception: + # Any error will causes the fip status to be set 'ERROR' + status = lib_consts.FLOATINGIP_STATUS_ERROR + LOG.warning("Unable to configure floating IP %(fip_id)s " + "for port forwarding %(pf_id)s", + {'fip_id': port_forwarding.floatingip_id, + 'pf_id': port_forwarding.id}) + else: + if not ha_port: + ip_lib.send_ip_addr_adv_notif(namespace, + interface_name, + fip_ip) + if status: + fip_statuses[port_forwarding.floatingip_id] = status + + if ha_port and ha_port['status'] == lib_consts.PORT_STATUS_ACTIVE: + ri.enable_keepalived() + + for port_forwarding in port_forwardings: + rule_tag = PORT_FORWARDING_PREFIX + port_forwarding.id + self._rule_apply(iptables_manager, port_forwarding, rule_tag) + + iptables_manager.apply() + self._sending_port_forwarding_fip_status(ri, fip_statuses) + self._store_local(port_forwardings, events.CREATED) + + def _sending_port_forwarding_fip_status(self, ri, statuses): + if not statuses: + return + LOG.debug('Sending Port Forwarding floating ip ' + 'statuses: %s', statuses) + # Update floating IP status on the neutron server + ri.agent.plugin_rpc.update_floatingip_statuses( + ri.agent.context, ri.router_id, statuses) + + def _get_resource_by_router(self, ri): + is_distributed = ri.router.get('distributed') + ex_gw_port = ri.get_ex_gw_port() + if not is_distributed: + interface_name = ri.get_external_device_interface_name(ex_gw_port) + namespace = ri.ns_name + iptables_manager = ri.iptables_manager + else: + interface_name = ri.get_snat_external_device_interface_name( + ex_gw_port) + namespace = ri.snat_namespace.name + iptables_manager = ri.snat_iptables_manager + return interface_name, namespace, iptables_manager + + def _check_if_need_process(self, ri, force=False): + # force means the request comes from, if True means it comes from OVO, + # as we get a actually port forwarding object, then we need to check in + # the following steps. But False, means it comes from router rpc. + if not ri or not ri.get_ex_gw_port() or ( + not force and not ri.fip_managed_by_port_forwardings): + # agent doesn't hold the router. pass + # This router doesn't own a gw port. pass + # This router doesn't hold a port forwarding mapping. pass + return False + + is_distributed = ri.router.get('distributed') + agent_mode = ri.agent_conf.agent_mode + if (is_distributed and + agent_mode in [lib_consts.L3_AGENT_MODE_DVR_NO_EXTERNAL, + lib_consts.L3_AGENT_MODE_DVR]): + # just support centralized cases + return False + return True + + def _process_port_forwarding_event(self, context, port_forwarding, + event_type): + router_id = port_forwarding.router_id + ri = self._get_router_info(router_id) + if not self._check_if_need_process(ri, force=True): + return + + (interface_name, namespace, + iptables_manager) = self._get_resource_by_router(ri) + + if event_type == events.CREATED: + self._process_create( + [port_forwarding], ri, interface_name, namespace, + iptables_manager) + elif event_type == events.UPDATED: + self._process_update([port_forwarding], iptables_manager, + interface_name, namespace) + elif event_type == events.DELETED: + self._process_delete( + context, [port_forwarding], ri, interface_name, namespace, + iptables_manager) + + def _process_update(self, port_forwardings, iptables_manager, + interface_name, namespace): + if not port_forwardings: + return + device = ip_lib.IPDevice(interface_name, namespace=namespace) + for port_forwarding in port_forwardings: + # check if port forwarding change from OVO and router rpc + if not self.mapping.check_port_forwarding_changes(port_forwarding): + LOG.debug("Skip port forwarding %s for update, as there is no " + "difference between the memory managed by agent", + port_forwarding.id) + continue + current_chain = self._get_port_forwarding_chain_name( + port_forwarding.id) + iptables_manager.ipv4['nat'].remove_chain(current_chain) + ori_pf = self.mapping.managed_port_forwardings[port_forwarding.id] + device.delete_socket_conntrack_state( + str(ori_pf.floating_ip_address), ori_pf.external_port, + protocol=ori_pf.protocol) + rule_tag = PORT_FORWARDING_PREFIX + port_forwarding.id + self._rule_apply(iptables_manager, port_forwarding, rule_tag) + iptables_manager.apply() + self._store_local(port_forwardings, events.UPDATED) + + def _process_delete(self, context, port_forwardings, ri, interface_name, + namespace, iptables_manager): + if not port_forwardings: + return + device = ip_lib.IPDevice(interface_name, namespace=namespace) + for port_forwarding in port_forwardings: + current_chain = self._get_port_forwarding_chain_name( + port_forwarding.id) + iptables_manager.ipv4['nat'].remove_chain(current_chain) + fip_address = str(port_forwarding.floating_ip_address) + device.delete_socket_conntrack_state( + fip_address, port_forwarding.external_port, + protocol=port_forwarding.protocol) + + iptables_manager.apply() + + fip_id_cidrs = set([(pf.floatingip_id, + str(pf.floating_ip_address)) for pf in + port_forwardings]) + self._sync_and_remove_fip(context, fip_id_cidrs, device, ri) + self._store_local(port_forwardings, events.DELETED) + + def _sync_and_remove_fip(self, context, fip_id_cidrs, device, ri): + if not fip_id_cidrs: + return + ha_port = ri.router.get(lib_consts.HA_INTERFACE_KEY) + fip_ids = [item[0] for item in fip_id_cidrs] + pfs = self.resource_rpc.bulk_pull(context, resources.PORTFORWARDING, + filter_kwargs={ + 'floatingip_id': fip_ids}) + exist_fips = set() + fip_status = {} + for pf in pfs: + exist_fips.add(pf.floatingip_id) + + for fip_id_cidr in fip_id_cidrs: + if fip_id_cidr[0] not in exist_fips: + if ha_port: + ri._remove_vip(fip_id_cidr[1]) + else: + device.delete_addr_and_conntrack_state(fip_id_cidr[1]) + fip_status[fip_id_cidr[0]] = 'DOWN' + + if ha_port: + ri.enable_keepalived() + self._sending_port_forwarding_fip_status(ri, fip_status) + for fip_id in fip_status.keys(): + self.mapping.clear_by_fip(fip_id, ri.router_id) + + def _get_router_info(self, router_id): + router_info = self.agent_api.get_router_info(router_id) + if router_info: + return router_info + LOG.debug("Router %s is not managed by this agent. " + "It was possibly deleted concurrently.", router_id) + + def _get_port_forwarding_chain_name(self, pf_id): + chain_name = PORT_FORWARDING_CHAIN_PREFIX + pf_id + return chain_name[:MAX_CHAIN_LEN_WRAP] + + def _install_default_rules(self, iptables_manager): + default_rule = '-j %s-%s' % (iptables_manager.wrap_name, + DEFAULT_PORT_FORWARDING_CHAIN) + iptables_manager.ipv4['nat'].add_chain(DEFAULT_PORT_FORWARDING_CHAIN) + iptables_manager.ipv4['nat'].add_rule('PREROUTING', default_rule) + iptables_manager.apply() + + def check_local_port_forwardings(self, context, ri, fip_ids): + pfs = self.resource_rpc.bulk_pull(context, resources.PORTFORWARDING, + filter_kwargs={ + 'floatingip_id': fip_ids}) + + (interface_name, namespace, + iptable_manager) = self._get_resource_by_router(ri) + local_pfs = set(self.mapping.managed_port_forwardings.keys()) + new_pfs = [] + updated_pfs = [] + current_pfs = set() + for pf in pfs: + # check the request port forwardings, and split them into + # update, new, current part from router rpc + if pf.id in self.mapping.managed_port_forwardings: + if self.mapping.check_port_forwarding_changes(pf): + updated_pfs.append(pf) + else: + new_pfs.append(pf) + current_pfs.add(pf.id) + remove_pf_ids_set = local_pfs - current_pfs + remove_pfs = [self.mapping.managed_port_forwardings[pf_id] + for pf_id in remove_pf_ids_set] + self._process_update(updated_pfs, iptable_manager, + interface_name, namespace) + self._process_create(new_pfs, ri, interface_name, + namespace, iptable_manager) + self._process_delete(context, remove_pfs, ri, interface_name, + namespace, iptable_manager) + + def process_port_forwarding(self, context, data): + ri = self._get_router_info(data['id']) + if not self._check_if_need_process(ri): + return + + self.check_local_port_forwardings( + context, ri, ri.fip_managed_by_port_forwardings) + + @lockutils.synchronized('port-forwarding') + def add_router(self, context, data): + """Handle a router add event. + + Called on router create. + + :param context: RPC context. + :param data: Router data. + """ + self.process_port_forwarding(context, data) + + @lockutils.synchronized('port-forwarding') + def update_router(self, context, data): + """Handle a router update event. + + Called on router update. + + :param context: RPC context. + :param data: Router data. + """ + self.process_port_forwarding(context, data) + + def delete_router(self, context, data): + """Handle a router delete event. + + :param context: RPC context. + :param data: Router data. + """ + pass + + def ha_state_change(self, context, data): + pass diff --git a/neutron/agent/l3/router_info.py b/neutron/agent/l3/router_info.py index d279efe919d..4568d704e7b 100644 --- a/neutron/agent/l3/router_info.py +++ b/neutron/agent/l3/router_info.py @@ -80,6 +80,8 @@ class RouterInfo(object): self.process_monitor = None # radvd is a neutron.agent.linux.ra.DaemonMonitor self.radvd = None + self.centralized_port_forwarding_fip_set = set() + self.fip_managed_by_port_forwardings = None def initialize(self, process_monitor): """Initialize the router on the system. @@ -378,7 +380,9 @@ class RouterInfo(object): # that's how the caller determines that it was removed fip_statuses[fip['id']] = FLOATINGIP_STATUS_NOCHANGE fips_to_remove = ( - ip_cidr for ip_cidr in existing_cidrs - new_cidrs - gw_cidrs + ip_cidr + for ip_cidr in (existing_cidrs - new_cidrs - gw_cidrs - + self.centralized_port_forwarding_fip_set) if common_utils.is_cidr_host(ip_cidr)) for ip_cidr in fips_to_remove: LOG.debug("Removing floating ip %s from interface %s in " @@ -748,13 +752,15 @@ class RouterInfo(object): for gw_ip in gateway_ips) def external_gateway_added(self, ex_gw_port, interface_name): - preserve_ips = self._list_floating_ip_cidrs() + preserve_ips = self._list_floating_ip_cidrs() + list( + self.centralized_port_forwarding_fip_set) preserve_ips.extend(self.agent.pd.get_preserve_ips(self.router_id)) self._external_gateway_added( ex_gw_port, interface_name, self.ns_name, preserve_ips) def external_gateway_updated(self, ex_gw_port, interface_name): - preserve_ips = self._list_floating_ip_cidrs() + preserve_ips = self._list_floating_ip_cidrs() + list( + self.centralized_port_forwarding_fip_set) preserve_ips.extend(self.agent.pd.get_preserve_ips(self.router_id)) self._external_gateway_added( ex_gw_port, interface_name, self.ns_name, preserve_ips) @@ -1156,6 +1162,8 @@ class RouterInfo(object): :param agent: Passes the agent in order to send RPC messages. """ LOG.debug("Process updates, router %s", self.router['id']) + self.centralized_port_forwarding_fip_set = set(self.router.get( + 'port_forwardings_fip_set', set())) self._process_internal_ports() self.agent.pd.sync_router(self.router['id']) self.process_external() @@ -1169,3 +1177,5 @@ class RouterInfo(object): self.fip_map = dict([(fip['floating_ip_address'], fip['fixed_ip_address']) for fip in self.get_floating_ips()]) + self.fip_managed_by_port_forwardings = self.router.get( + 'fip_managed_by_port_forwardings') diff --git a/neutron/agent/linux/ip_lib.py b/neutron/agent/linux/ip_lib.py index 67e0572463f..a9f80feba5e 100644 --- a/neutron/agent/linux/ip_lib.py +++ b/neutron/agent/linux/ip_lib.py @@ -331,6 +331,20 @@ class IPDevice(SubProcessBase): LOG.exception("Failed deleting egress connection state of" " floatingip %s", ip_str) + def delete_socket_conntrack_state(self, cidr, dport, protocol): + ip_str = str(netaddr.IPNetwork(cidr).ip) + ip_wrapper = IPWrapper(namespace=self.namespace) + cmd = ["conntrack", "-D", "-d", ip_str, '-p', protocol, + '--dport', dport] + try: + ip_wrapper.netns.execute(cmd, check_exit_code=True, + extra_ok_codes=[1]) + + except RuntimeError: + LOG.exception("Failed deleting ingress connection state of " + "socket %(ip)s:%(port)s", {'ip': ip_str, + 'port': dport}) + def disable_ipv6(self): if not ipv6_utils.is_enabled_and_bind_by_default(): return diff --git a/neutron/tests/functional/agent/l3/extensions/test_port_forwarding_extension.py b/neutron/tests/functional/agent/l3/extensions/test_port_forwarding_extension.py new file mode 100644 index 00000000000..78d916f2488 --- /dev/null +++ b/neutron/tests/functional/agent/l3/extensions/test_port_forwarding_extension.py @@ -0,0 +1,187 @@ +# Copyright 2018 OpenStack Foundation +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import os +import re + +from neutron_lib import constants +from oslo_utils import uuidutils + +import mock +from neutron.agent.l3 import agent as neutron_l3_agent +from neutron.agent.l3.extensions import port_forwarding as pf +from neutron.agent.linux import ip_lib +from neutron.agent.linux import iptables_manager as iptable_mng +from neutron.agent.linux import utils +from neutron.common import utils as common_utils +from neutron.objects import port_forwarding as pf_obj +from neutron.tests.functional.agent.l3 import framework +from neutron.tests.functional.agent.l3 import test_dvr_router + +_uuid = uuidutils.generate_uuid + + +class L3AgentFipPortForwardingExtensionTestFramework( + framework.L3AgentTestFramework): + + def setUp(self): + super(L3AgentFipPortForwardingExtensionTestFramework, self).setUp() + self.conf.set_override('extensions', ['port_forwarding'], 'agent') + self.agent = neutron_l3_agent.L3NATAgentWithStateReport('agent1', + self.conf) + self.fip_pf_ext = pf.PortForwardingAgentExtension() + self.fip_id1 = _uuid() + self.fip_id2 = _uuid() + self.fip_id3 = _uuid() + self.portforwarding1 = pf_obj.PortForwarding( + context=None, id=_uuid(), floatingip_id=self.fip_id1, + external_port=1111, protocol='tcp', internal_port_id=_uuid(), + internal_ip_address='1.1.1.1', internal_port=11111, + floating_ip_address='111.111.111.111', router_id=_uuid()) + self.portforwarding2 = pf_obj.PortForwarding( + context=None, id=_uuid(), floatingip_id=self.fip_id1, + external_port=1112, protocol='tcp', internal_port_id=_uuid(), + internal_ip_address='1.1.1.2', internal_port=11112, + floating_ip_address='111.111.111.111', router_id=_uuid()) + self.portforwarding3 = pf_obj.PortForwarding( + context=None, id=_uuid(), floatingip_id=self.fip_id2, + external_port=1113, protocol='tcp', internal_port_id=_uuid(), + internal_ip_address='1.1.1.3', internal_port=11113, + floating_ip_address='111.222.111.222', router_id=_uuid()) + self.portforwarding4 = pf_obj.PortForwarding( + context=None, id=_uuid(), floatingip_id=self.fip_id3, + external_port=2222, protocol='tcp', internal_port_id=_uuid(), + internal_ip_address='2.2.2.2', internal_port=22222, + floating_ip_address='222.222.222.222', router_id=_uuid()) + self.port_forwardings = [self.portforwarding1, self.portforwarding2, + self.portforwarding3, self.portforwarding4] + self._set_bulk_pull_mock() + self.managed_fips = [self.fip_id1, self.fip_id2, self.fip_id3] + self.fip_list_for_pf = ['111.111.111.111/32', '111.222.111.222/32', + '222.222.222.222/32'] + + def _set_bulk_pull_mock(self): + + def _bulk_pull_mock(context, resource_type, filter_kwargs=None): + if 'floatingip_id' in filter_kwargs: + result = [] + for pfobj in self.port_forwardings: + if pfobj.floatingip_id in filter_kwargs['floatingip_id']: + result.append(pfobj) + return result + return self.port_forwardings + self.bulk_pull = mock.patch( + 'neutron.api.rpc.handlers.resources_rpc.' + 'ResourcesPullRpcApi.bulk_pull').start() + self.bulk_pull.side_effect = _bulk_pull_mock + + def _assert_port_forwarding_fip_is_set(self, router_info, pf_fip): + (interface_name, namespace, + iptables_manager) = self.fip_pf_ext._get_resource_by_router( + router_info) + device = ip_lib.IPDevice(interface_name, namespace=namespace) + pf_fip_cidr = str(pf_fip) + '/32' + + def check_existing_cidrs(): + existing_cidrs = router_info.get_router_cidrs(device) + return pf_fip_cidr in existing_cidrs + + common_utils.wait_until_true(check_existing_cidrs) + + def _assert_port_forwarding_iptables_is_set(self, router_info, pf): + (interface_name, namespace, + iptables_manager) = self.fip_pf_ext._get_resource_by_router( + router_info) + chain_rule = self.fip_pf_ext._get_fip_rules( + pf, iptables_manager.wrap_name)[1] + chain_name = chain_rule[0] + rule = chain_rule[1] + rule_tag = 'fip_portforwarding-' + pf.id + rule_obj = iptable_mng.IptablesRule( + chain_name, rule, True, False, iptables_manager.wrap_name, + rule_tag, None) + + def check_chain_rules_set(): + existing_chains = iptables_manager.ipv4['nat'].chains + if chain_name not in existing_chains: + return False + existing_rules = iptables_manager.ipv4['nat'].rules + return rule_obj in existing_rules + + common_utils.wait_until_true(check_chain_rules_set) + + def _assert_harouter_fip_is_set(self, router_info, fip_pf): + (interface_name, namespace, + iptables_manager) = self.fip_pf_ext._get_resource_by_router( + router_info) + keepalived_pm = router_info.keepalived_manager.get_process() + utils.get_conf_file_name(keepalived_pm.pids_path, + keepalived_pm.uuid, + keepalived_pm.service_pid_fname) + + conf_path = os.path.join(keepalived_pm.pids_path, keepalived_pm.uuid, + 'keepalived.conf') + + regex = "%s dev %s" % (fip_pf, interface_name) + pattern = re.compile(regex) + + def check_harouter_fip_is_set(): + if re.findall(pattern, utils.get_value_from_file(conf_path)): + return True + return False + + common_utils.wait_until_true(check_harouter_fip_is_set) + + def _test_centralized_routers(self, router_info, enable_ha=False): + router_id = router_info['id'] + for pfobj in self.port_forwardings: + pfobj.router_id = router_id + router_info['fip_managed_by_port_forwardings'] = self.managed_fips + router_info['port_forwardings_fip_set'] = set(self.fip_list_for_pf) + ri = self.manage_router(self.agent, router_info) + for pfobj in self.port_forwardings: + self._assert_port_forwarding_fip_is_set(ri, + pfobj.floating_ip_address) + self._assert_port_forwarding_iptables_is_set(ri, pfobj) + if enable_ha: + for fip_pf in self.fip_list_for_pf: + self._assert_harouter_fip_is_set(ri, fip_pf) + + +class TestL3AgentFipPortForwardingExtension( + L3AgentFipPortForwardingExtensionTestFramework): + + def test_legacy_router_fip_portforwarding(self): + router_info = self.generate_router_info(enable_ha=False) + self._test_centralized_routers(router_info, enable_ha=False) + + def test_ha_router_fip_portforwarding(self): + router_info = self.generate_router_info(enable_ha=True) + self._test_centralized_routers(router_info, enable_ha=True) + + +class TestL3AgentFipPortForwardingExtensionDVR( + test_dvr_router.TestDvrRouter, + L3AgentFipPortForwardingExtensionTestFramework): + + def test_dvr_edge_router(self): + self.agent.conf.agent_mode = constants.L3_AGENT_MODE_DVR_SNAT + router_info = self.generate_dvr_router_info(enable_ha=False) + self._test_centralized_routers(router_info, enable_ha=False) + + def test_dvr_ha_router(self): + self.agent.conf.agent_mode = constants.L3_AGENT_MODE_DVR_SNAT + router_info = self.generate_dvr_router_info(enable_ha=True) + self._test_centralized_routers(router_info, enable_ha=True) diff --git a/neutron/tests/unit/agent/l3/extensions/test_port_forwarding.py b/neutron/tests/unit/agent/l3/extensions/test_port_forwarding.py new file mode 100644 index 00000000000..c2316359cd3 --- /dev/null +++ b/neutron/tests/unit/agent/l3/extensions/test_port_forwarding.py @@ -0,0 +1,417 @@ +# Copyright 2018 OpenStack Foundation +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import mock +from neutron_lib import constants as lib_const +from neutron_lib import context +from oslo_utils import uuidutils + +from neutron.agent.l3 import agent as l3_agent +from neutron.agent.l3.extensions import port_forwarding as pf +from neutron.agent.l3 import l3_agent_extension_api as l3_ext_api +from neutron.agent.l3 import router_info as l3router +from neutron.agent.linux import iptables_manager +from neutron.api.rpc.callbacks.consumer import registry +from neutron.api.rpc.callbacks import resources +from neutron.api.rpc.handlers import resources_rpc +from neutron.objects import port_forwarding as pf_obj +from neutron.objects import router +from neutron.tests import base +from neutron.tests.unit.agent.l3 import test_agent + +_uuid = uuidutils.generate_uuid + +TEST_FIP = '10.100.2.45' +BINARY_NAME = iptables_manager.get_binary_name() +DEFAULT_RULE = ('PREROUTING', '-j %s-fip-pf' % BINARY_NAME) +DEFAULT_CHAIN = 'fip-pf' +HOSTNAME = 'testhost' + + +class PortForwardingExtensionBaseTestCase( + test_agent.BasicRouterOperationsFramework): + + def setUp(self): + super(PortForwardingExtensionBaseTestCase, self).setUp() + + self.fip_pf_ext = pf.PortForwardingAgentExtension() + + self.context = context.get_admin_context() + self.connection = mock.Mock() + self.floatingip2 = router.FloatingIP(context=None, id=_uuid(), + floating_ip_address='172.24.6.12', + floating_network_id=_uuid(), + router_id=_uuid(), + status='ACTIVE') + self.portforwarding1 = pf_obj.PortForwarding( + context=None, id=_uuid(), floatingip_id=self.floatingip2.id, + external_port=1111, protocol='tcp', internal_port_id=_uuid(), + internal_ip_address='1.1.1.1', internal_port=11111, + floating_ip_address=self.floatingip2.floating_ip_address, + router_id=self.floatingip2.router_id) + + self.agent = l3_agent.L3NATAgent(HOSTNAME, self.conf) + self.ex_gw_port = {'id': _uuid()} + self.fip = {'id': _uuid(), + 'floating_ip_address': TEST_FIP, + 'fixed_ip_address': '192.168.0.1', + 'floating_network_id': _uuid(), + 'port_id': _uuid(), + 'host': HOSTNAME} + self.router = {'id': self.floatingip2.router_id, + 'gw_port': self.ex_gw_port, + 'ha': False, + 'distributed': False, + lib_const.FLOATINGIP_KEY: [self.fip]} + self.router_info = l3router.RouterInfo( + self.agent, self.floatingip2.router_id, self.router, + **self.ri_kwargs) + self.centralized_port_forwarding_fip_set = set( + [str(self.floatingip2.floating_ip_address) + '/32']) + self.pf_managed_fips = [self.floatingip2.id] + self.router_info.ex_gw_port = self.ex_gw_port + self.router_info.fip_managed_by_port_forwardings = self.pf_managed_fips + self.agent.router_info[self.router['id']] = self.router_info + + self.get_router_info = mock.patch( + 'neutron.agent.l3.l3_agent_extension_api.' + 'L3AgentExtensionAPI.get_router_info').start() + self.get_router_info.return_value = self.router_info + + self.agent_api = l3_ext_api.L3AgentExtensionAPI(None) + self.fip_pf_ext.consume_api(self.agent_api) + + self.port_forwardings = [self.portforwarding1] + + +class FipPortForwardingExtensionInitializeTestCase( + PortForwardingExtensionBaseTestCase): + + @mock.patch.object(registry, 'register') + @mock.patch.object(resources_rpc, 'ResourcesPushRpcCallback') + def test_initialize_subscribed_to_rpc(self, rpc_mock, subscribe_mock): + call_to_patch = 'neutron.common.rpc.Connection' + with mock.patch(call_to_patch, + return_value=self.connection) as create_connection: + self.fip_pf_ext.initialize( + self.connection, lib_const.L3_AGENT_MODE) + create_connection.assert_has_calls([mock.call()]) + self.connection.create_consumer.assert_has_calls( + [mock.call( + resources_rpc.resource_type_versioned_topic( + resources.PORTFORWARDING), + [rpc_mock()], + fanout=True)] + ) + subscribe_mock.assert_called_with( + mock.ANY, resources.PORTFORWARDING) + + +class FipPortForwardingExtensionTestCase(PortForwardingExtensionBaseTestCase): + + def setUp(self): + super(FipPortForwardingExtensionTestCase, self).setUp() + self.fip_pf_ext.initialize( + self.connection, lib_const.L3_AGENT_MODE) + self._set_bulk_pull_mock() + + def _set_bulk_pull_mock(self): + + def _bulk_pull_mock(context, resource_type, filter_kwargs=None): + if 'floatingip_id' in filter_kwargs: + result = [] + for pfobj in self.port_forwardings: + if pfobj.floatingip_id in filter_kwargs['floatingip_id']: + result.append(pfobj) + return result + return self.port_forwardings + self.bulk_pull = mock.patch( + 'neutron.api.rpc.handlers.resources_rpc.' + 'ResourcesPullRpcApi.bulk_pull').start() + self.bulk_pull.side_effect = _bulk_pull_mock + + def _get_chainrule_tag_from_pf_obj(self, target_obj): + rule_tag = 'fip_portforwarding-' + target_obj.id + chain_name = ('pf-' + target_obj.id)[:pf.MAX_CHAIN_LEN_WRAP] + chain_rule = (chain_name, + '-d %s/32 -p %s -m %s --dport %s ' + '-j DNAT --to-destination %s:%s' % ( + target_obj.floating_ip_address, + target_obj.protocol, + target_obj.protocol, + target_obj.external_port, + target_obj.internal_ip_address, + target_obj.internal_port)) + return chain_name, chain_rule, rule_tag + + def _assert_called_iptables_process(self, mock_add_chain, + mock_add_rule, mock_add_fip, + mock_send_fip_status, target_obj=None): + if target_obj: + obj = target_obj + else: + obj = self.portforwarding1 + (chain_name, + chain_rule, rule_tag) = self._get_chainrule_tag_from_pf_obj(obj) + mock_add_chain.assert_has_calls([mock.call('fip-pf'), + mock.call(chain_name)]) + mock_add_rule.assert_has_calls( + [mock.call(DEFAULT_RULE[0], DEFAULT_RULE[1]), + mock.call(DEFAULT_CHAIN, ('-j %s-' % BINARY_NAME) + chain_name, + tag=rule_tag), + mock.call(chain_name, chain_rule[1], tag=rule_tag)]) + mock_add_fip.assert_called_once_with( + {'floating_ip_address': str(obj.floating_ip_address)}, + mock.ANY, mock.ANY) + fip_status = { + obj.floatingip_id: + lib_const.FLOATINGIP_STATUS_ACTIVE} + mock_send_fip_status.assert_called_once_with(mock.ANY, fip_status) + + @mock.patch.object(pf.PortForwardingAgentExtension, + '_sending_port_forwarding_fip_status') + @mock.patch.object(iptables_manager.IptablesTable, 'add_rule') + @mock.patch.object(iptables_manager.IptablesTable, 'add_chain') + @mock.patch.object(l3router.RouterInfo, 'add_floating_ip') + def test_add_update_router(self, mock_add_fip, + mock_add_chain, mock_add_rule, + mock_send_fip_status): + # simulate the router add and already there is a port forwarding + # resource association. + mock_add_fip.return_value = lib_const.FLOATINGIP_STATUS_ACTIVE + self.fip_pf_ext.add_router(self.context, self.router) + self._assert_called_iptables_process( + mock_add_chain, mock_add_rule, mock_add_fip, mock_send_fip_status, + target_obj=self.portforwarding1) + + # Then we create another port forwarding with the same fip + mock_add_fip.reset_mock() + mock_send_fip_status.reset_mock() + mock_add_chain.reset_mock() + mock_add_rule.reset_mock() + + test_portforwarding = pf_obj.PortForwarding( + context=None, id=_uuid(), floatingip_id=self.floatingip2.id, + external_port=2222, protocol='tcp', internal_port_id=_uuid(), + internal_ip_address='2.2.2.2', internal_port=22222, + floating_ip_address=self.floatingip2.floating_ip_address, + router_id=self.floatingip2.router_id) + self.pf_managed_fips.append(self.floatingip2.id) + self.port_forwardings.append(test_portforwarding) + self.fip_pf_ext.update_router(self.context, self.router) + self._assert_called_iptables_process( + mock_add_chain, mock_add_rule, mock_add_fip, mock_send_fip_status, + target_obj=test_portforwarding) + + @mock.patch.object(iptables_manager.IptablesTable, 'add_rule') + @mock.patch.object(iptables_manager.IptablesTable, 'add_chain') + @mock.patch('neutron.agent.linux.ip_lib.IPDevice') + @mock.patch.object(iptables_manager.IptablesTable, 'remove_chain') + def test_add_update_router_port_forwarding_change( + self, mock_remove_chain, mock_ip_device, mock_add_chain, + mock_add_rule): + self.fip_pf_ext.add_router(self.context, self.router) + update_portforwarding = pf_obj.PortForwarding( + context=None, id=self.portforwarding1.id, + floatingip_id=self.portforwarding1.floatingip_id, + external_port=2222, protocol='tcp', internal_port_id=_uuid(), + internal_ip_address='2.2.2.2', internal_port=22222, + floating_ip_address=self.portforwarding1.floating_ip_address, + router_id=self.portforwarding1.router_id) + self.port_forwardings = [update_portforwarding] + mock_delete = mock.Mock() + mock_ip_device.return_value = mock_delete + self.fip_pf_ext.update_router(self.context, self.router) + current_chain = ('pf-' + self.portforwarding1.id)[ + :pf.MAX_CHAIN_LEN_WRAP] + mock_remove_chain.assert_called_once_with(current_chain) + mock_delete.delete_socket_conntrack_state.assert_called_once_with( + str(self.portforwarding1.floating_ip_address), + self.portforwarding1.external_port, + protocol=self.portforwarding1.protocol) + (chain_name, + chain_rule, rule_tag) = self._get_chainrule_tag_from_pf_obj( + update_portforwarding) + mock_add_chain.assert_has_calls([mock.call('fip-pf'), + mock.call(chain_name)]) + mock_add_rule.assert_has_calls( + [mock.call(DEFAULT_RULE[0], DEFAULT_RULE[1]), + mock.call(DEFAULT_CHAIN, ('-j %s-' % BINARY_NAME) + chain_name, + tag=rule_tag), + mock.call(chain_name, chain_rule[1], tag=rule_tag)]) + + @mock.patch.object(pf.PortForwardingAgentExtension, + '_sending_port_forwarding_fip_status') + @mock.patch('neutron.agent.linux.ip_lib.IPDevice') + @mock.patch.object(iptables_manager.IptablesTable, 'remove_chain') + def test_add_update_router_port_forwarding_remove( + self, mock_remove_chain, mock_ip_device, + mock_send_fip_status): + self.fip_pf_ext.add_router(self.context, self.router) + mock_send_fip_status.reset_mock() + self.port_forwardings = [] + mock_device = mock.Mock() + mock_ip_device.return_value = mock_device + self.fip_pf_ext.update_router(self.context, self.router) + current_chain = ('pf-' + self.portforwarding1.id)[ + :pf.MAX_CHAIN_LEN_WRAP] + mock_remove_chain.assert_called_once_with(current_chain) + mock_device.delete_socket_conntrack_state.assert_called_once_with( + str(self.portforwarding1.floating_ip_address), + self.portforwarding1.external_port, + protocol=self.portforwarding1.protocol) + mock_device.delete_addr_and_conntrack_state.assert_called_once_with( + str(self.portforwarding1.floating_ip_address)) + fip_status = { + self.portforwarding1.floatingip_id: + lib_const.FLOATINGIP_STATUS_DOWN} + mock_send_fip_status.assert_called_once_with(mock.ANY, fip_status) + + +class RouterFipPortForwardingMappingTestCase(base.BaseTestCase): + + def setUp(self): + super(RouterFipPortForwardingMappingTestCase, self).setUp() + self.mapping = pf.RouterFipPortForwardingMapping() + self.router1 = _uuid() + self.router2 = _uuid() + self.floatingip1 = _uuid() + self.floatingip2 = _uuid() + self.floatingip3 = _uuid() + self.portforwarding1 = pf_obj.PortForwarding( + context=None, id=_uuid(), floatingip_id=self.floatingip1, + external_port=1111, protocol='tcp', internal_port_id=_uuid(), + internal_ip_address='1.1.1.1', internal_port=11111, + floating_ip_address='111.111.111.111', + router_id=self.router1) + self.portforwarding2 = pf_obj.PortForwarding( + context=None, id=_uuid(), floatingip_id=self.floatingip1, + external_port=1112, protocol='tcp', internal_port_id=_uuid(), + internal_ip_address='1.1.1.2', internal_port=11112, + floating_ip_address='111.111.111.111', + router_id=self.router1) + self.portforwarding3 = pf_obj.PortForwarding( + context=None, id=_uuid(), floatingip_id=self.floatingip2, + external_port=1113, protocol='tcp', internal_port_id=_uuid(), + internal_ip_address='1.1.1.3', internal_port=11113, + floating_ip_address='111.222.111.222', + router_id=self.router1) + self.portforwarding4 = pf_obj.PortForwarding( + context=None, id=_uuid(), floatingip_id=self.floatingip3, + external_port=2222, protocol='tcp', internal_port_id=_uuid(), + internal_ip_address='2.2.2.2', internal_port=22222, + floating_ip_address='222.222.222.222', + router_id=self.router2) + self.portforwardings_dict = { + self.portforwarding1.id: self.portforwarding1, + self.portforwarding2.id: self.portforwarding2, + self.portforwarding3.id: self.portforwarding3, + self.portforwarding4.id: self.portforwarding4} + + def _set_pf(self): + self.mapping.set_port_forwardings(self.portforwardings_dict.values()) + + def test_set_port_forwardings(self): + self._set_pf() + pf_ids = self.portforwardings_dict.keys() + for pf_id, obj in self.mapping.managed_port_forwardings.items(): + self.assertIn(pf_id, pf_ids) + self.assertEqual(obj, self.portforwardings_dict[pf_id]) + self.assertEqual( + len(pf_ids), len(self.mapping.managed_port_forwardings.keys())) + + fip_pf_set = { + self.floatingip1: set( + [self.portforwarding1.id, self.portforwarding2.id]), + self.floatingip2: set([self.portforwarding3.id]), + self.floatingip3: set([self.portforwarding4.id]) + } + for fip_id, pf_set in self.mapping.fip_port_forwarding.items(): + self.assertIn( + fip_id, [self.floatingip1, self.floatingip2, self.floatingip3]) + self.assertEqual(0, len(pf_set - fip_pf_set[fip_id])) + self.assertEqual( + len([self.floatingip1, self.floatingip2, self.floatingip3]), + len(self.mapping.fip_port_forwarding)) + + router_fip = { + self.router1: set([self.floatingip1, self.floatingip2]), + self.router2: set([self.floatingip3]) + } + for router_id, fip_set in self.mapping.router_fip_mapping.items(): + self.assertIn(router_id, [self.router1, self.router2]) + self.assertEqual(0, len(fip_set - router_fip[router_id])) + self.assertEqual( + len([self.router1, self.router2]), + len(self.mapping.router_fip_mapping.keys())) + + def test_update_port_forwarding(self): + self._set_pf() + new_pf1 = pf_obj.PortForwarding( + context=None, id=self.portforwarding2.id, + floatingip_id=self.floatingip1, + external_port=11122, protocol='tcp', + internal_port_id=self.portforwarding2.internal_port_id, + internal_ip_address='1.1.1.22', internal_port=11122, + floating_ip_address='111.111.111.111', + router_id=self.router1) + self.mapping.update_port_forwardings([new_pf1]) + self.assertEqual( + new_pf1, + self.mapping.managed_port_forwardings[self.portforwarding2.id]) + + def test_del_port_forwardings(self): + self._set_pf() + del_pfs = [self.portforwarding3, self.portforwarding2, + self.portforwarding4] + self.mapping.del_port_forwardings(del_pfs) + self.assertEqual( + [self.portforwarding1.id], + list(self.mapping.managed_port_forwardings.keys())) + self.assertEqual({self.floatingip1: set([self.portforwarding1.id])}, + self.mapping.fip_port_forwarding) + self.assertEqual({self.router1: set([self.floatingip1])}, + self.mapping.router_fip_mapping) + + def test_clear_by_fip(self): + self._set_pf() + self.mapping.clear_by_fip(self.floatingip1, self.router1) + router_fip = { + self.router1: set([self.floatingip2]), + self.router2: set([self.floatingip3]) + } + for router_id, fip_set in self.mapping.router_fip_mapping.items(): + self.assertIn(router_id, [self.router1, self.router2]) + self.assertEqual(0, len(fip_set - router_fip[router_id])) + fip_pf_set = { + self.floatingip2: set([self.portforwarding3.id]), + self.floatingip3: set([self.portforwarding4.id]) + } + for fip_id, pf_set in self.mapping.fip_port_forwarding.items(): + self.assertIn( + fip_id, [self.floatingip2, self.floatingip3]) + self.assertEqual(0, len(pf_set - fip_pf_set[fip_id])) + self.assertEqual( + len([self.floatingip2, self.floatingip3]), + len(self.mapping.fip_port_forwarding)) + pfs_dict = {self.portforwarding3.id: self.portforwarding3, + self.portforwarding4.id: self.portforwarding4} + for pf_id, obj in self.mapping.managed_port_forwardings.items(): + self.assertIn(pf_id, + [self.portforwarding3.id, self.portforwarding4.id]) + self.assertEqual(obj, pfs_dict[pf_id]) + self.assertEqual( + len([self.portforwarding3.id, self.portforwarding4.id]), + len(self.mapping.managed_port_forwardings.keys())) diff --git a/neutron/tests/unit/agent/l3/test_agent.py b/neutron/tests/unit/agent/l3/test_agent.py index cc3123fa37c..33f24b97150 100644 --- a/neutron/tests/unit/agent/l3/test_agent.py +++ b/neutron/tests/unit/agent/l3/test_agent.py @@ -3594,3 +3594,49 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework): pass self.assertTrue(mock_delete.called) self.assertFalse(mock_dscm.called) + + @mock.patch.object(lla.LinkLocalAllocator, '_write') + @mock.patch.object(l3router.RouterInfo, '_get_gw_ips_cidr') + def test_process_floating_ip_addresses_not_care_port_forwarding( + self, mock_get_gw_cidr, mock_lla_write): + pf_used_fip = [{'cidr': '15.1.2.4/32'}, {'cidr': '15.1.2.5/32'}] + gw_cidr = {'cidr': '15.1.2.79/24'} + need_to_remove_fip = [{'cidr': '15.1.2.99/32'}] + fake_floatingips = {'floatingips': [ + {'id': _uuid(), + 'floating_ip_address': '15.1.2.3', + 'fixed_ip_address': '192.168.0.1', + 'status': 'DOWN', + 'floating_network_id': _uuid(), + 'port_id': _uuid(), + 'host': HOSTNAME}]} + + router = l3_test_common.prepare_router_data(enable_snat=True) + router[lib_constants.FLOATINGIP_KEY] = fake_floatingips['floatingips'] + agent = l3_agent.L3NATAgent(HOSTNAME, self.conf) + ri = l3router.RouterInfo(agent, router['id'], + router, **self.ri_kwargs) + ri.centralized_port_forwarding_fip_set = set( + [i['cidr'] for i in pf_used_fip]) + ri.iptables_manager.ipv4['nat'] = mock.MagicMock() + ri.get_external_device_name = mock.Mock(return_value='exgw') + floating_ips = ri.get_floating_ips() + fip_id = floating_ips[0]['id'] + device = self.mock_ip_dev + device.addr.list.return_value = ( + pf_used_fip + need_to_remove_fip + [gw_cidr]) + ri.iptables_manager.ipv4['nat'] = mock.MagicMock() + mock_get_gw_cidr.return_value = set([gw_cidr['cidr']]) + ri.get_centralized_router_cidrs = mock.Mock( + return_value=set()) + ri.add_floating_ip = mock.Mock( + return_value=lib_constants.FLOATINGIP_STATUS_ACTIVE) + ri.remove_floating_ip = mock.Mock() + fip_statuses = ri.process_floating_ip_addresses( + mock.sentinel.interface_name) + self.assertEqual({fip_id: lib_constants.FLOATINGIP_STATUS_ACTIVE}, + fip_statuses) + ri.add_floating_ip.assert_called_once_with( + floating_ips[0], mock.sentinel.interface_name, device) + ri.remove_floating_ip.assert_called_once_with( + device, need_to_remove_fip[0]['cidr']) diff --git a/neutron/tests/unit/agent/linux/test_ip_lib.py b/neutron/tests/unit/agent/linux/test_ip_lib.py index 58263e5fb25..e4c9df6e377 100644 --- a/neutron/tests/unit/agent/linux/test_ip_lib.py +++ b/neutron/tests/unit/agent/linux/test_ip_lib.py @@ -1791,3 +1791,21 @@ class TestSysctl(base.BaseTestCase): return_value=False): dev.disable_ipv6() self.assertFalse(self.execute.called) + + +class TestConntrack(base.BaseTestCase): + def setUp(self): + super(TestConntrack, self).setUp() + self.execute_p = mock.patch.object(ip_lib.IpNetnsCommand, 'execute') + self.execute = self.execute_p.start() + + def test_delete_socket_conntrack_state(self): + device = ip_lib.IPDevice('tap0', 'ns1') + ip_str = '1.1.1.1' + dport = '3378' + protocol = 'tcp' + expect_cmd = ["conntrack", "-D", "-d", ip_str, '-p', protocol, + '--dport', dport] + device.delete_socket_conntrack_state(ip_str, dport, protocol) + self.execute.assert_called_once_with(expect_cmd, check_exit_code=True, + extra_ok_codes=[1]) diff --git a/setup.cfg b/setup.cfg index 57595a0cb92..c0af76104cd 100644 --- a/setup.cfg +++ b/setup.cfg @@ -108,6 +108,7 @@ neutron.agent.l2.extensions = log = neutron.services.logapi.agent.log_extension:LoggingExtension neutron.agent.l3.extensions = fip_qos = neutron.agent.l3.extensions.qos.fip:FipQosAgentExtension + port_forwarding = neutron.agent.l3.extensions.port_forwarding:PortForwardingAgentExtension neutron.services.logapi.drivers = ovs = neutron.services.logapi.drivers.openvswitch.ovs_firewall_log:OVSFirewallLoggingDriver neutron.qos.agent_drivers =