[agent side] L3 agent side Floating IP port forwarding

This patch contains the l3 agent extension and agent part code.
This patch introduce a new l3 agent extension named "port_forwarding",
to process the binding of the port forwarding resources, manage its own
floatingip configuration on router interface and floatingip status.
Currrently, we support all Neutron Router reference implementations.

This extension uses the period router sync task and PortForwarding OVO
rpc.

* The main idea about this new extension is using the generic router sync
  rpc to maintain the host port forwarding resources,
* For a single port forwarding create/update/delete, process it one by one
  in smaller scope for forbidding refresh the iptables with a larger
  scope frequently.

Partially-Implements: blueprint port-forwarding
Partial-Bug: #1491317
Change-Id: Ic56e67d428f6177099c285a9d1bccabc1e710f2b
This commit is contained in:
ZhaoBo 2018-05-31 16:49:18 +08:00
parent d00a1558a5
commit de9b39ed2c
8 changed files with 1156 additions and 3 deletions

View File

@ -0,0 +1,460 @@
# Copyright 2018 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import netaddr
from oslo_concurrency import lockutils
from oslo_log import log as logging
from neutron.agent.linux import ip_lib
from neutron.api.rpc.callbacks.consumer import registry
from neutron.api.rpc.callbacks import events
from neutron.api.rpc.callbacks import resources
from neutron.api.rpc.handlers import resources_rpc
from neutron.common import rpc as n_rpc
from neutron_lib.agent import l3_extension
from neutron_lib import constants as lib_consts
LOG = logging.getLogger(__name__)
DEFAULT_PORT_FORWARDING_CHAIN = 'fip-pf'
PORT_FORWARDING_PREFIX = 'fip_portforwarding-'
PORT_FORWARDING_CHAIN_PREFIX = 'pf-'
# TODO(bzhao) If there are other files use this constant, and move it into
# constants file. This line will be removed and get the value from constants
# file.
MAX_CHAIN_LEN_WRAP = 11
class RouterFipPortForwardingMapping(object):
def __init__(self):
self.managed_port_forwardings = {}
"""
fip_port_forwarding = {
fip_id_1: set(pf_id1, pf_id2),
fip_id_2: set(pf_id3, pf_id4)
}
"""
self.fip_port_forwarding = collections.defaultdict(set)
"""
router_fip_mapping = {
router_id_1: set(fip_id_1, fip_id_2),
router_id_2: set(fip_id_3, fip_id_4)
}
"""
self.router_fip_mapping = collections.defaultdict(set)
def set_port_forwardings(self, port_forwardings):
for port_forwarding in port_forwardings:
self.set_fip_port_forwarding(port_forwarding.floatingip_id,
port_forwarding,
port_forwarding.router_id)
def update_port_forwardings(self, port_forwardings):
for port_forwarding in port_forwardings:
self.managed_port_forwardings[port_forwarding.id] = port_forwarding
def get_port_forwarding(self, port_forwarding_id):
return self.managed_port_forwardings.get(port_forwarding_id)
def del_port_forwardings(self, port_forwardings):
for port_forwarding in port_forwardings:
if not self.get_port_forwarding(port_forwarding.id):
continue
self.managed_port_forwardings.pop(port_forwarding.id)
self.fip_port_forwarding[port_forwarding.floatingip_id].remove(
port_forwarding.id)
if not self.fip_port_forwarding[port_forwarding.floatingip_id]:
self.fip_port_forwarding.pop(port_forwarding.floatingip_id)
self.router_fip_mapping[port_forwarding.router_id].remove(
port_forwarding.floatingip_id)
if not self.router_fip_mapping[port_forwarding.router_id]:
del self.router_fip_mapping[port_forwarding.router_id]
def set_fip_port_forwarding(self, fip_id, pf, router_id):
self.router_fip_mapping[router_id].add(fip_id)
self.fip_port_forwarding[fip_id].add(pf.id)
self.managed_port_forwardings[pf.id] = pf
def clear_by_fip(self, fip_id, router_id):
self.router_fip_mapping[router_id].remove(fip_id)
if len(self.router_fip_mapping[router_id]) == 0:
del self.router_fip_mapping[router_id]
for pf_id in self.fip_port_forwarding[fip_id]:
del self.managed_port_forwardings[pf_id]
del self.fip_port_forwarding[fip_id]
def check_port_forwarding_changes(self, new_pf):
old_pf = self.get_port_forwarding(new_pf.id)
return old_pf != new_pf
class PortForwardingAgentExtension(l3_extension.L3AgentExtension):
SUPPORTED_RESOURCE_TYPES = [resources.PORTFORWARDING]
def initialize(self, connection, driver_type):
self.resource_rpc = resources_rpc.ResourcesPullRpcApi()
self._register_rpc_consumers()
self.mapping = RouterFipPortForwardingMapping()
def _register_rpc_consumers(self):
registry.register(self._handle_notification,
resources.PORTFORWARDING)
self._connection = n_rpc.Connection()
endpoints = [resources_rpc.ResourcesPushRpcCallback()]
topic = resources_rpc.resource_type_versioned_topic(
resources.PORTFORWARDING)
self._connection.create_consumer(topic, endpoints, fanout=True)
self._connection.consume_in_threads()
def consume_api(self, agent_api):
self.agent_api = agent_api
@lockutils.synchronized('port-forwarding')
def _handle_notification(self, context, resource_type,
forwardings, event_type):
for forwarding in forwardings:
self._process_port_forwarding_event(
context, forwarding, event_type)
def _store_local(self, pf_objs, event_type):
if event_type == events.CREATED:
self.mapping.set_port_forwardings(pf_objs)
elif event_type == events.UPDATED:
self.mapping.update_port_forwardings(pf_objs)
elif event_type == events.DELETED:
self.mapping.del_port_forwardings(pf_objs)
def _get_fip_rules(self, port_forward, wrap_name):
chain_rule_list = []
pf_chain_name = self._get_port_forwarding_chain_name(port_forward.id)
chain_rule_list.append((DEFAULT_PORT_FORWARDING_CHAIN,
'-j %s-%s' %
(wrap_name, pf_chain_name)))
floating_ip_address = str(port_forward.floating_ip_address)
protocol = port_forward.protocol
internal_ip_address = str(port_forward.internal_ip_address)
internal_port = port_forward.internal_port
external_port = port_forward.external_port
chain_rule = (pf_chain_name,
'-d %s/32 -p %s -m %s --dport %s '
'-j DNAT --to-destination %s:%s' % (
floating_ip_address, protocol, protocol,
external_port, internal_ip_address,
internal_port))
chain_rule_list.append(chain_rule)
return chain_rule_list
def _rule_apply(self, iptables_manager, port_forwarding, rule_tag):
iptables_manager.ipv4['nat'].clear_rules_by_tag(rule_tag)
if DEFAULT_PORT_FORWARDING_CHAIN not in iptables_manager.ipv4[
'nat'].chains:
self._install_default_rules(iptables_manager)
for chain, rule in self._get_fip_rules(
port_forwarding, iptables_manager.wrap_name):
if chain not in iptables_manager.ipv4['nat'].chains:
iptables_manager.ipv4['nat'].add_chain(chain)
iptables_manager.ipv4['nat'].add_rule(chain, rule, tag=rule_tag)
def _process_create(self, port_forwardings, ri, interface_name, namespace,
iptables_manager):
if not port_forwardings:
return
device = ip_lib.IPDevice(interface_name, namespace=namespace)
is_distributed = ri.router.get('distributed')
ha_port = ri.router.get(lib_consts.HA_INTERFACE_KEY, None)
fip_statuses = {}
for port_forwarding in port_forwardings:
# check if the port forwarding is managed in this agent from
# OVO and router rpc.
if port_forwarding.id in self.mapping.managed_port_forwardings:
LOG.debug("Skip port forwarding %s for create, as it had been "
"managed by agent", port_forwarding.id)
continue
existing_cidrs = ri.get_router_cidrs(device)
fip_ip = str(port_forwarding.floating_ip_address)
fip_cidr = str(netaddr.IPNetwork(fip_ip))
status = ''
if fip_cidr not in existing_cidrs:
try:
if not is_distributed:
fip_statuses[port_forwarding.floatingip_id] = (
ri.add_floating_ip(
{'floating_ip_address': fip_ip},
interface_name, device))
else:
if not ha_port:
device.addr.add(fip_cidr)
ip_lib.send_ip_addr_adv_notif(namespace,
interface_name,
fip_ip)
else:
ri._add_vip(fip_cidr, interface_name)
status = lib_consts.FLOATINGIP_STATUS_ACTIVE
except Exception:
# Any error will causes the fip status to be set 'ERROR'
status = lib_consts.FLOATINGIP_STATUS_ERROR
LOG.warning("Unable to configure floating IP %(fip_id)s "
"for port forwarding %(pf_id)s",
{'fip_id': port_forwarding.floatingip_id,
'pf_id': port_forwarding.id})
else:
if not ha_port:
ip_lib.send_ip_addr_adv_notif(namespace,
interface_name,
fip_ip)
if status:
fip_statuses[port_forwarding.floatingip_id] = status
if ha_port and ha_port['status'] == lib_consts.PORT_STATUS_ACTIVE:
ri.enable_keepalived()
for port_forwarding in port_forwardings:
rule_tag = PORT_FORWARDING_PREFIX + port_forwarding.id
self._rule_apply(iptables_manager, port_forwarding, rule_tag)
iptables_manager.apply()
self._sending_port_forwarding_fip_status(ri, fip_statuses)
self._store_local(port_forwardings, events.CREATED)
def _sending_port_forwarding_fip_status(self, ri, statuses):
if not statuses:
return
LOG.debug('Sending Port Forwarding floating ip '
'statuses: %s', statuses)
# Update floating IP status on the neutron server
ri.agent.plugin_rpc.update_floatingip_statuses(
ri.agent.context, ri.router_id, statuses)
def _get_resource_by_router(self, ri):
is_distributed = ri.router.get('distributed')
ex_gw_port = ri.get_ex_gw_port()
if not is_distributed:
interface_name = ri.get_external_device_interface_name(ex_gw_port)
namespace = ri.ns_name
iptables_manager = ri.iptables_manager
else:
interface_name = ri.get_snat_external_device_interface_name(
ex_gw_port)
namespace = ri.snat_namespace.name
iptables_manager = ri.snat_iptables_manager
return interface_name, namespace, iptables_manager
def _check_if_need_process(self, ri, force=False):
# force means the request comes from, if True means it comes from OVO,
# as we get a actually port forwarding object, then we need to check in
# the following steps. But False, means it comes from router rpc.
if not ri or not ri.get_ex_gw_port() or (
not force and not ri.fip_managed_by_port_forwardings):
# agent doesn't hold the router. pass
# This router doesn't own a gw port. pass
# This router doesn't hold a port forwarding mapping. pass
return False
is_distributed = ri.router.get('distributed')
agent_mode = ri.agent_conf.agent_mode
if (is_distributed and
agent_mode in [lib_consts.L3_AGENT_MODE_DVR_NO_EXTERNAL,
lib_consts.L3_AGENT_MODE_DVR]):
# just support centralized cases
return False
return True
def _process_port_forwarding_event(self, context, port_forwarding,
event_type):
router_id = port_forwarding.router_id
ri = self._get_router_info(router_id)
if not self._check_if_need_process(ri, force=True):
return
(interface_name, namespace,
iptables_manager) = self._get_resource_by_router(ri)
if event_type == events.CREATED:
self._process_create(
[port_forwarding], ri, interface_name, namespace,
iptables_manager)
elif event_type == events.UPDATED:
self._process_update([port_forwarding], iptables_manager,
interface_name, namespace)
elif event_type == events.DELETED:
self._process_delete(
context, [port_forwarding], ri, interface_name, namespace,
iptables_manager)
def _process_update(self, port_forwardings, iptables_manager,
interface_name, namespace):
if not port_forwardings:
return
device = ip_lib.IPDevice(interface_name, namespace=namespace)
for port_forwarding in port_forwardings:
# check if port forwarding change from OVO and router rpc
if not self.mapping.check_port_forwarding_changes(port_forwarding):
LOG.debug("Skip port forwarding %s for update, as there is no "
"difference between the memory managed by agent",
port_forwarding.id)
continue
current_chain = self._get_port_forwarding_chain_name(
port_forwarding.id)
iptables_manager.ipv4['nat'].remove_chain(current_chain)
ori_pf = self.mapping.managed_port_forwardings[port_forwarding.id]
device.delete_socket_conntrack_state(
str(ori_pf.floating_ip_address), ori_pf.external_port,
protocol=ori_pf.protocol)
rule_tag = PORT_FORWARDING_PREFIX + port_forwarding.id
self._rule_apply(iptables_manager, port_forwarding, rule_tag)
iptables_manager.apply()
self._store_local(port_forwardings, events.UPDATED)
def _process_delete(self, context, port_forwardings, ri, interface_name,
namespace, iptables_manager):
if not port_forwardings:
return
device = ip_lib.IPDevice(interface_name, namespace=namespace)
for port_forwarding in port_forwardings:
current_chain = self._get_port_forwarding_chain_name(
port_forwarding.id)
iptables_manager.ipv4['nat'].remove_chain(current_chain)
fip_address = str(port_forwarding.floating_ip_address)
device.delete_socket_conntrack_state(
fip_address, port_forwarding.external_port,
protocol=port_forwarding.protocol)
iptables_manager.apply()
fip_id_cidrs = set([(pf.floatingip_id,
str(pf.floating_ip_address)) for pf in
port_forwardings])
self._sync_and_remove_fip(context, fip_id_cidrs, device, ri)
self._store_local(port_forwardings, events.DELETED)
def _sync_and_remove_fip(self, context, fip_id_cidrs, device, ri):
if not fip_id_cidrs:
return
ha_port = ri.router.get(lib_consts.HA_INTERFACE_KEY)
fip_ids = [item[0] for item in fip_id_cidrs]
pfs = self.resource_rpc.bulk_pull(context, resources.PORTFORWARDING,
filter_kwargs={
'floatingip_id': fip_ids})
exist_fips = set()
fip_status = {}
for pf in pfs:
exist_fips.add(pf.floatingip_id)
for fip_id_cidr in fip_id_cidrs:
if fip_id_cidr[0] not in exist_fips:
if ha_port:
ri._remove_vip(fip_id_cidr[1])
else:
device.delete_addr_and_conntrack_state(fip_id_cidr[1])
fip_status[fip_id_cidr[0]] = 'DOWN'
if ha_port:
ri.enable_keepalived()
self._sending_port_forwarding_fip_status(ri, fip_status)
for fip_id in fip_status.keys():
self.mapping.clear_by_fip(fip_id, ri.router_id)
def _get_router_info(self, router_id):
router_info = self.agent_api.get_router_info(router_id)
if router_info:
return router_info
LOG.debug("Router %s is not managed by this agent. "
"It was possibly deleted concurrently.", router_id)
def _get_port_forwarding_chain_name(self, pf_id):
chain_name = PORT_FORWARDING_CHAIN_PREFIX + pf_id
return chain_name[:MAX_CHAIN_LEN_WRAP]
def _install_default_rules(self, iptables_manager):
default_rule = '-j %s-%s' % (iptables_manager.wrap_name,
DEFAULT_PORT_FORWARDING_CHAIN)
iptables_manager.ipv4['nat'].add_chain(DEFAULT_PORT_FORWARDING_CHAIN)
iptables_manager.ipv4['nat'].add_rule('PREROUTING', default_rule)
iptables_manager.apply()
def check_local_port_forwardings(self, context, ri, fip_ids):
pfs = self.resource_rpc.bulk_pull(context, resources.PORTFORWARDING,
filter_kwargs={
'floatingip_id': fip_ids})
(interface_name, namespace,
iptable_manager) = self._get_resource_by_router(ri)
local_pfs = set(self.mapping.managed_port_forwardings.keys())
new_pfs = []
updated_pfs = []
current_pfs = set()
for pf in pfs:
# check the request port forwardings, and split them into
# update, new, current part from router rpc
if pf.id in self.mapping.managed_port_forwardings:
if self.mapping.check_port_forwarding_changes(pf):
updated_pfs.append(pf)
else:
new_pfs.append(pf)
current_pfs.add(pf.id)
remove_pf_ids_set = local_pfs - current_pfs
remove_pfs = [self.mapping.managed_port_forwardings[pf_id]
for pf_id in remove_pf_ids_set]
self._process_update(updated_pfs, iptable_manager,
interface_name, namespace)
self._process_create(new_pfs, ri, interface_name,
namespace, iptable_manager)
self._process_delete(context, remove_pfs, ri, interface_name,
namespace, iptable_manager)
def process_port_forwarding(self, context, data):
ri = self._get_router_info(data['id'])
if not self._check_if_need_process(ri):
return
self.check_local_port_forwardings(
context, ri, ri.fip_managed_by_port_forwardings)
@lockutils.synchronized('port-forwarding')
def add_router(self, context, data):
"""Handle a router add event.
Called on router create.
:param context: RPC context.
:param data: Router data.
"""
self.process_port_forwarding(context, data)
@lockutils.synchronized('port-forwarding')
def update_router(self, context, data):
"""Handle a router update event.
Called on router update.
:param context: RPC context.
:param data: Router data.
"""
self.process_port_forwarding(context, data)
def delete_router(self, context, data):
"""Handle a router delete event.
:param context: RPC context.
:param data: Router data.
"""
pass
def ha_state_change(self, context, data):
pass

View File

@ -80,6 +80,8 @@ class RouterInfo(object):
self.process_monitor = None
# radvd is a neutron.agent.linux.ra.DaemonMonitor
self.radvd = None
self.centralized_port_forwarding_fip_set = set()
self.fip_managed_by_port_forwardings = None
def initialize(self, process_monitor):
"""Initialize the router on the system.
@ -378,7 +380,9 @@ class RouterInfo(object):
# that's how the caller determines that it was removed
fip_statuses[fip['id']] = FLOATINGIP_STATUS_NOCHANGE
fips_to_remove = (
ip_cidr for ip_cidr in existing_cidrs - new_cidrs - gw_cidrs
ip_cidr
for ip_cidr in (existing_cidrs - new_cidrs - gw_cidrs -
self.centralized_port_forwarding_fip_set)
if common_utils.is_cidr_host(ip_cidr))
for ip_cidr in fips_to_remove:
LOG.debug("Removing floating ip %s from interface %s in "
@ -748,13 +752,15 @@ class RouterInfo(object):
for gw_ip in gateway_ips)
def external_gateway_added(self, ex_gw_port, interface_name):
preserve_ips = self._list_floating_ip_cidrs()
preserve_ips = self._list_floating_ip_cidrs() + list(
self.centralized_port_forwarding_fip_set)
preserve_ips.extend(self.agent.pd.get_preserve_ips(self.router_id))
self._external_gateway_added(
ex_gw_port, interface_name, self.ns_name, preserve_ips)
def external_gateway_updated(self, ex_gw_port, interface_name):
preserve_ips = self._list_floating_ip_cidrs()
preserve_ips = self._list_floating_ip_cidrs() + list(
self.centralized_port_forwarding_fip_set)
preserve_ips.extend(self.agent.pd.get_preserve_ips(self.router_id))
self._external_gateway_added(
ex_gw_port, interface_name, self.ns_name, preserve_ips)
@ -1156,6 +1162,8 @@ class RouterInfo(object):
:param agent: Passes the agent in order to send RPC messages.
"""
LOG.debug("Process updates, router %s", self.router['id'])
self.centralized_port_forwarding_fip_set = set(self.router.get(
'port_forwardings_fip_set', set()))
self._process_internal_ports()
self.agent.pd.sync_router(self.router['id'])
self.process_external()
@ -1169,3 +1177,5 @@ class RouterInfo(object):
self.fip_map = dict([(fip['floating_ip_address'],
fip['fixed_ip_address'])
for fip in self.get_floating_ips()])
self.fip_managed_by_port_forwardings = self.router.get(
'fip_managed_by_port_forwardings')

View File

@ -331,6 +331,20 @@ class IPDevice(SubProcessBase):
LOG.exception("Failed deleting egress connection state of"
" floatingip %s", ip_str)
def delete_socket_conntrack_state(self, cidr, dport, protocol):
ip_str = str(netaddr.IPNetwork(cidr).ip)
ip_wrapper = IPWrapper(namespace=self.namespace)
cmd = ["conntrack", "-D", "-d", ip_str, '-p', protocol,
'--dport', dport]
try:
ip_wrapper.netns.execute(cmd, check_exit_code=True,
extra_ok_codes=[1])
except RuntimeError:
LOG.exception("Failed deleting ingress connection state of "
"socket %(ip)s:%(port)s", {'ip': ip_str,
'port': dport})
def disable_ipv6(self):
if not ipv6_utils.is_enabled_and_bind_by_default():
return

View File

@ -0,0 +1,187 @@
# Copyright 2018 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import re
from neutron_lib import constants
from oslo_utils import uuidutils
import mock
from neutron.agent.l3 import agent as neutron_l3_agent
from neutron.agent.l3.extensions import port_forwarding as pf
from neutron.agent.linux import ip_lib
from neutron.agent.linux import iptables_manager as iptable_mng
from neutron.agent.linux import utils
from neutron.common import utils as common_utils
from neutron.objects import port_forwarding as pf_obj
from neutron.tests.functional.agent.l3 import framework
from neutron.tests.functional.agent.l3 import test_dvr_router
_uuid = uuidutils.generate_uuid
class L3AgentFipPortForwardingExtensionTestFramework(
framework.L3AgentTestFramework):
def setUp(self):
super(L3AgentFipPortForwardingExtensionTestFramework, self).setUp()
self.conf.set_override('extensions', ['port_forwarding'], 'agent')
self.agent = neutron_l3_agent.L3NATAgentWithStateReport('agent1',
self.conf)
self.fip_pf_ext = pf.PortForwardingAgentExtension()
self.fip_id1 = _uuid()
self.fip_id2 = _uuid()
self.fip_id3 = _uuid()
self.portforwarding1 = pf_obj.PortForwarding(
context=None, id=_uuid(), floatingip_id=self.fip_id1,
external_port=1111, protocol='tcp', internal_port_id=_uuid(),
internal_ip_address='1.1.1.1', internal_port=11111,
floating_ip_address='111.111.111.111', router_id=_uuid())
self.portforwarding2 = pf_obj.PortForwarding(
context=None, id=_uuid(), floatingip_id=self.fip_id1,
external_port=1112, protocol='tcp', internal_port_id=_uuid(),
internal_ip_address='1.1.1.2', internal_port=11112,
floating_ip_address='111.111.111.111', router_id=_uuid())
self.portforwarding3 = pf_obj.PortForwarding(
context=None, id=_uuid(), floatingip_id=self.fip_id2,
external_port=1113, protocol='tcp', internal_port_id=_uuid(),
internal_ip_address='1.1.1.3', internal_port=11113,
floating_ip_address='111.222.111.222', router_id=_uuid())
self.portforwarding4 = pf_obj.PortForwarding(
context=None, id=_uuid(), floatingip_id=self.fip_id3,
external_port=2222, protocol='tcp', internal_port_id=_uuid(),
internal_ip_address='2.2.2.2', internal_port=22222,
floating_ip_address='222.222.222.222', router_id=_uuid())
self.port_forwardings = [self.portforwarding1, self.portforwarding2,
self.portforwarding3, self.portforwarding4]
self._set_bulk_pull_mock()
self.managed_fips = [self.fip_id1, self.fip_id2, self.fip_id3]
self.fip_list_for_pf = ['111.111.111.111/32', '111.222.111.222/32',
'222.222.222.222/32']
def _set_bulk_pull_mock(self):
def _bulk_pull_mock(context, resource_type, filter_kwargs=None):
if 'floatingip_id' in filter_kwargs:
result = []
for pfobj in self.port_forwardings:
if pfobj.floatingip_id in filter_kwargs['floatingip_id']:
result.append(pfobj)
return result
return self.port_forwardings
self.bulk_pull = mock.patch(
'neutron.api.rpc.handlers.resources_rpc.'
'ResourcesPullRpcApi.bulk_pull').start()
self.bulk_pull.side_effect = _bulk_pull_mock
def _assert_port_forwarding_fip_is_set(self, router_info, pf_fip):
(interface_name, namespace,
iptables_manager) = self.fip_pf_ext._get_resource_by_router(
router_info)
device = ip_lib.IPDevice(interface_name, namespace=namespace)
pf_fip_cidr = str(pf_fip) + '/32'
def check_existing_cidrs():
existing_cidrs = router_info.get_router_cidrs(device)
return pf_fip_cidr in existing_cidrs
common_utils.wait_until_true(check_existing_cidrs)
def _assert_port_forwarding_iptables_is_set(self, router_info, pf):
(interface_name, namespace,
iptables_manager) = self.fip_pf_ext._get_resource_by_router(
router_info)
chain_rule = self.fip_pf_ext._get_fip_rules(
pf, iptables_manager.wrap_name)[1]
chain_name = chain_rule[0]
rule = chain_rule[1]
rule_tag = 'fip_portforwarding-' + pf.id
rule_obj = iptable_mng.IptablesRule(
chain_name, rule, True, False, iptables_manager.wrap_name,
rule_tag, None)
def check_chain_rules_set():
existing_chains = iptables_manager.ipv4['nat'].chains
if chain_name not in existing_chains:
return False
existing_rules = iptables_manager.ipv4['nat'].rules
return rule_obj in existing_rules
common_utils.wait_until_true(check_chain_rules_set)
def _assert_harouter_fip_is_set(self, router_info, fip_pf):
(interface_name, namespace,
iptables_manager) = self.fip_pf_ext._get_resource_by_router(
router_info)
keepalived_pm = router_info.keepalived_manager.get_process()
utils.get_conf_file_name(keepalived_pm.pids_path,
keepalived_pm.uuid,
keepalived_pm.service_pid_fname)
conf_path = os.path.join(keepalived_pm.pids_path, keepalived_pm.uuid,
'keepalived.conf')
regex = "%s dev %s" % (fip_pf, interface_name)
pattern = re.compile(regex)
def check_harouter_fip_is_set():
if re.findall(pattern, utils.get_value_from_file(conf_path)):
return True
return False
common_utils.wait_until_true(check_harouter_fip_is_set)
def _test_centralized_routers(self, router_info, enable_ha=False):
router_id = router_info['id']
for pfobj in self.port_forwardings:
pfobj.router_id = router_id
router_info['fip_managed_by_port_forwardings'] = self.managed_fips
router_info['port_forwardings_fip_set'] = set(self.fip_list_for_pf)
ri = self.manage_router(self.agent, router_info)
for pfobj in self.port_forwardings:
self._assert_port_forwarding_fip_is_set(ri,
pfobj.floating_ip_address)
self._assert_port_forwarding_iptables_is_set(ri, pfobj)
if enable_ha:
for fip_pf in self.fip_list_for_pf:
self._assert_harouter_fip_is_set(ri, fip_pf)
class TestL3AgentFipPortForwardingExtension(
L3AgentFipPortForwardingExtensionTestFramework):
def test_legacy_router_fip_portforwarding(self):
router_info = self.generate_router_info(enable_ha=False)
self._test_centralized_routers(router_info, enable_ha=False)
def test_ha_router_fip_portforwarding(self):
router_info = self.generate_router_info(enable_ha=True)
self._test_centralized_routers(router_info, enable_ha=True)
class TestL3AgentFipPortForwardingExtensionDVR(
test_dvr_router.TestDvrRouter,
L3AgentFipPortForwardingExtensionTestFramework):
def test_dvr_edge_router(self):
self.agent.conf.agent_mode = constants.L3_AGENT_MODE_DVR_SNAT
router_info = self.generate_dvr_router_info(enable_ha=False)
self._test_centralized_routers(router_info, enable_ha=False)
def test_dvr_ha_router(self):
self.agent.conf.agent_mode = constants.L3_AGENT_MODE_DVR_SNAT
router_info = self.generate_dvr_router_info(enable_ha=True)
self._test_centralized_routers(router_info, enable_ha=True)

View File

@ -0,0 +1,417 @@
# Copyright 2018 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from neutron_lib import constants as lib_const
from neutron_lib import context
from oslo_utils import uuidutils
from neutron.agent.l3 import agent as l3_agent
from neutron.agent.l3.extensions import port_forwarding as pf
from neutron.agent.l3 import l3_agent_extension_api as l3_ext_api
from neutron.agent.l3 import router_info as l3router
from neutron.agent.linux import iptables_manager
from neutron.api.rpc.callbacks.consumer import registry
from neutron.api.rpc.callbacks import resources
from neutron.api.rpc.handlers import resources_rpc
from neutron.objects import port_forwarding as pf_obj
from neutron.objects import router
from neutron.tests import base
from neutron.tests.unit.agent.l3 import test_agent
_uuid = uuidutils.generate_uuid
TEST_FIP = '10.100.2.45'
BINARY_NAME = iptables_manager.get_binary_name()
DEFAULT_RULE = ('PREROUTING', '-j %s-fip-pf' % BINARY_NAME)
DEFAULT_CHAIN = 'fip-pf'
HOSTNAME = 'testhost'
class PortForwardingExtensionBaseTestCase(
test_agent.BasicRouterOperationsFramework):
def setUp(self):
super(PortForwardingExtensionBaseTestCase, self).setUp()
self.fip_pf_ext = pf.PortForwardingAgentExtension()
self.context = context.get_admin_context()
self.connection = mock.Mock()
self.floatingip2 = router.FloatingIP(context=None, id=_uuid(),
floating_ip_address='172.24.6.12',
floating_network_id=_uuid(),
router_id=_uuid(),
status='ACTIVE')
self.portforwarding1 = pf_obj.PortForwarding(
context=None, id=_uuid(), floatingip_id=self.floatingip2.id,
external_port=1111, protocol='tcp', internal_port_id=_uuid(),
internal_ip_address='1.1.1.1', internal_port=11111,
floating_ip_address=self.floatingip2.floating_ip_address,
router_id=self.floatingip2.router_id)
self.agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
self.ex_gw_port = {'id': _uuid()}
self.fip = {'id': _uuid(),
'floating_ip_address': TEST_FIP,
'fixed_ip_address': '192.168.0.1',
'floating_network_id': _uuid(),
'port_id': _uuid(),
'host': HOSTNAME}
self.router = {'id': self.floatingip2.router_id,
'gw_port': self.ex_gw_port,
'ha': False,
'distributed': False,
lib_const.FLOATINGIP_KEY: [self.fip]}
self.router_info = l3router.RouterInfo(
self.agent, self.floatingip2.router_id, self.router,
**self.ri_kwargs)
self.centralized_port_forwarding_fip_set = set(
[str(self.floatingip2.floating_ip_address) + '/32'])
self.pf_managed_fips = [self.floatingip2.id]
self.router_info.ex_gw_port = self.ex_gw_port
self.router_info.fip_managed_by_port_forwardings = self.pf_managed_fips
self.agent.router_info[self.router['id']] = self.router_info
self.get_router_info = mock.patch(
'neutron.agent.l3.l3_agent_extension_api.'
'L3AgentExtensionAPI.get_router_info').start()
self.get_router_info.return_value = self.router_info
self.agent_api = l3_ext_api.L3AgentExtensionAPI(None)
self.fip_pf_ext.consume_api(self.agent_api)
self.port_forwardings = [self.portforwarding1]
class FipPortForwardingExtensionInitializeTestCase(
PortForwardingExtensionBaseTestCase):
@mock.patch.object(registry, 'register')
@mock.patch.object(resources_rpc, 'ResourcesPushRpcCallback')
def test_initialize_subscribed_to_rpc(self, rpc_mock, subscribe_mock):
call_to_patch = 'neutron.common.rpc.Connection'
with mock.patch(call_to_patch,
return_value=self.connection) as create_connection:
self.fip_pf_ext.initialize(
self.connection, lib_const.L3_AGENT_MODE)
create_connection.assert_has_calls([mock.call()])
self.connection.create_consumer.assert_has_calls(
[mock.call(
resources_rpc.resource_type_versioned_topic(
resources.PORTFORWARDING),
[rpc_mock()],
fanout=True)]
)
subscribe_mock.assert_called_with(
mock.ANY, resources.PORTFORWARDING)
class FipPortForwardingExtensionTestCase(PortForwardingExtensionBaseTestCase):
def setUp(self):
super(FipPortForwardingExtensionTestCase, self).setUp()
self.fip_pf_ext.initialize(
self.connection, lib_const.L3_AGENT_MODE)
self._set_bulk_pull_mock()
def _set_bulk_pull_mock(self):
def _bulk_pull_mock(context, resource_type, filter_kwargs=None):
if 'floatingip_id' in filter_kwargs:
result = []
for pfobj in self.port_forwardings:
if pfobj.floatingip_id in filter_kwargs['floatingip_id']:
result.append(pfobj)
return result
return self.port_forwardings
self.bulk_pull = mock.patch(
'neutron.api.rpc.handlers.resources_rpc.'
'ResourcesPullRpcApi.bulk_pull').start()
self.bulk_pull.side_effect = _bulk_pull_mock
def _get_chainrule_tag_from_pf_obj(self, target_obj):
rule_tag = 'fip_portforwarding-' + target_obj.id
chain_name = ('pf-' + target_obj.id)[:pf.MAX_CHAIN_LEN_WRAP]
chain_rule = (chain_name,
'-d %s/32 -p %s -m %s --dport %s '
'-j DNAT --to-destination %s:%s' % (
target_obj.floating_ip_address,
target_obj.protocol,
target_obj.protocol,
target_obj.external_port,
target_obj.internal_ip_address,
target_obj.internal_port))
return chain_name, chain_rule, rule_tag
def _assert_called_iptables_process(self, mock_add_chain,
mock_add_rule, mock_add_fip,
mock_send_fip_status, target_obj=None):
if target_obj:
obj = target_obj
else:
obj = self.portforwarding1
(chain_name,
chain_rule, rule_tag) = self._get_chainrule_tag_from_pf_obj(obj)
mock_add_chain.assert_has_calls([mock.call('fip-pf'),
mock.call(chain_name)])
mock_add_rule.assert_has_calls(
[mock.call(DEFAULT_RULE[0], DEFAULT_RULE[1]),
mock.call(DEFAULT_CHAIN, ('-j %s-' % BINARY_NAME) + chain_name,
tag=rule_tag),
mock.call(chain_name, chain_rule[1], tag=rule_tag)])
mock_add_fip.assert_called_once_with(
{'floating_ip_address': str(obj.floating_ip_address)},
mock.ANY, mock.ANY)
fip_status = {
obj.floatingip_id:
lib_const.FLOATINGIP_STATUS_ACTIVE}
mock_send_fip_status.assert_called_once_with(mock.ANY, fip_status)
@mock.patch.object(pf.PortForwardingAgentExtension,
'_sending_port_forwarding_fip_status')
@mock.patch.object(iptables_manager.IptablesTable, 'add_rule')
@mock.patch.object(iptables_manager.IptablesTable, 'add_chain')
@mock.patch.object(l3router.RouterInfo, 'add_floating_ip')
def test_add_update_router(self, mock_add_fip,
mock_add_chain, mock_add_rule,
mock_send_fip_status):
# simulate the router add and already there is a port forwarding
# resource association.
mock_add_fip.return_value = lib_const.FLOATINGIP_STATUS_ACTIVE
self.fip_pf_ext.add_router(self.context, self.router)
self._assert_called_iptables_process(
mock_add_chain, mock_add_rule, mock_add_fip, mock_send_fip_status,
target_obj=self.portforwarding1)
# Then we create another port forwarding with the same fip
mock_add_fip.reset_mock()
mock_send_fip_status.reset_mock()
mock_add_chain.reset_mock()
mock_add_rule.reset_mock()
test_portforwarding = pf_obj.PortForwarding(
context=None, id=_uuid(), floatingip_id=self.floatingip2.id,
external_port=2222, protocol='tcp', internal_port_id=_uuid(),
internal_ip_address='2.2.2.2', internal_port=22222,
floating_ip_address=self.floatingip2.floating_ip_address,
router_id=self.floatingip2.router_id)
self.pf_managed_fips.append(self.floatingip2.id)
self.port_forwardings.append(test_portforwarding)
self.fip_pf_ext.update_router(self.context, self.router)
self._assert_called_iptables_process(
mock_add_chain, mock_add_rule, mock_add_fip, mock_send_fip_status,
target_obj=test_portforwarding)
@mock.patch.object(iptables_manager.IptablesTable, 'add_rule')
@mock.patch.object(iptables_manager.IptablesTable, 'add_chain')
@mock.patch('neutron.agent.linux.ip_lib.IPDevice')
@mock.patch.object(iptables_manager.IptablesTable, 'remove_chain')
def test_add_update_router_port_forwarding_change(
self, mock_remove_chain, mock_ip_device, mock_add_chain,
mock_add_rule):
self.fip_pf_ext.add_router(self.context, self.router)
update_portforwarding = pf_obj.PortForwarding(
context=None, id=self.portforwarding1.id,
floatingip_id=self.portforwarding1.floatingip_id,
external_port=2222, protocol='tcp', internal_port_id=_uuid(),
internal_ip_address='2.2.2.2', internal_port=22222,
floating_ip_address=self.portforwarding1.floating_ip_address,
router_id=self.portforwarding1.router_id)
self.port_forwardings = [update_portforwarding]
mock_delete = mock.Mock()
mock_ip_device.return_value = mock_delete
self.fip_pf_ext.update_router(self.context, self.router)
current_chain = ('pf-' + self.portforwarding1.id)[
:pf.MAX_CHAIN_LEN_WRAP]
mock_remove_chain.assert_called_once_with(current_chain)
mock_delete.delete_socket_conntrack_state.assert_called_once_with(
str(self.portforwarding1.floating_ip_address),
self.portforwarding1.external_port,
protocol=self.portforwarding1.protocol)
(chain_name,
chain_rule, rule_tag) = self._get_chainrule_tag_from_pf_obj(
update_portforwarding)
mock_add_chain.assert_has_calls([mock.call('fip-pf'),
mock.call(chain_name)])
mock_add_rule.assert_has_calls(
[mock.call(DEFAULT_RULE[0], DEFAULT_RULE[1]),
mock.call(DEFAULT_CHAIN, ('-j %s-' % BINARY_NAME) + chain_name,
tag=rule_tag),
mock.call(chain_name, chain_rule[1], tag=rule_tag)])
@mock.patch.object(pf.PortForwardingAgentExtension,
'_sending_port_forwarding_fip_status')
@mock.patch('neutron.agent.linux.ip_lib.IPDevice')
@mock.patch.object(iptables_manager.IptablesTable, 'remove_chain')
def test_add_update_router_port_forwarding_remove(
self, mock_remove_chain, mock_ip_device,
mock_send_fip_status):
self.fip_pf_ext.add_router(self.context, self.router)
mock_send_fip_status.reset_mock()
self.port_forwardings = []
mock_device = mock.Mock()
mock_ip_device.return_value = mock_device
self.fip_pf_ext.update_router(self.context, self.router)
current_chain = ('pf-' + self.portforwarding1.id)[
:pf.MAX_CHAIN_LEN_WRAP]
mock_remove_chain.assert_called_once_with(current_chain)
mock_device.delete_socket_conntrack_state.assert_called_once_with(
str(self.portforwarding1.floating_ip_address),
self.portforwarding1.external_port,
protocol=self.portforwarding1.protocol)
mock_device.delete_addr_and_conntrack_state.assert_called_once_with(
str(self.portforwarding1.floating_ip_address))
fip_status = {
self.portforwarding1.floatingip_id:
lib_const.FLOATINGIP_STATUS_DOWN}
mock_send_fip_status.assert_called_once_with(mock.ANY, fip_status)
class RouterFipPortForwardingMappingTestCase(base.BaseTestCase):
def setUp(self):
super(RouterFipPortForwardingMappingTestCase, self).setUp()
self.mapping = pf.RouterFipPortForwardingMapping()
self.router1 = _uuid()
self.router2 = _uuid()
self.floatingip1 = _uuid()
self.floatingip2 = _uuid()
self.floatingip3 = _uuid()
self.portforwarding1 = pf_obj.PortForwarding(
context=None, id=_uuid(), floatingip_id=self.floatingip1,
external_port=1111, protocol='tcp', internal_port_id=_uuid(),
internal_ip_address='1.1.1.1', internal_port=11111,
floating_ip_address='111.111.111.111',
router_id=self.router1)
self.portforwarding2 = pf_obj.PortForwarding(
context=None, id=_uuid(), floatingip_id=self.floatingip1,
external_port=1112, protocol='tcp', internal_port_id=_uuid(),
internal_ip_address='1.1.1.2', internal_port=11112,
floating_ip_address='111.111.111.111',
router_id=self.router1)
self.portforwarding3 = pf_obj.PortForwarding(
context=None, id=_uuid(), floatingip_id=self.floatingip2,
external_port=1113, protocol='tcp', internal_port_id=_uuid(),
internal_ip_address='1.1.1.3', internal_port=11113,
floating_ip_address='111.222.111.222',
router_id=self.router1)
self.portforwarding4 = pf_obj.PortForwarding(
context=None, id=_uuid(), floatingip_id=self.floatingip3,
external_port=2222, protocol='tcp', internal_port_id=_uuid(),
internal_ip_address='2.2.2.2', internal_port=22222,
floating_ip_address='222.222.222.222',
router_id=self.router2)
self.portforwardings_dict = {
self.portforwarding1.id: self.portforwarding1,
self.portforwarding2.id: self.portforwarding2,
self.portforwarding3.id: self.portforwarding3,
self.portforwarding4.id: self.portforwarding4}
def _set_pf(self):
self.mapping.set_port_forwardings(self.portforwardings_dict.values())
def test_set_port_forwardings(self):
self._set_pf()
pf_ids = self.portforwardings_dict.keys()
for pf_id, obj in self.mapping.managed_port_forwardings.items():
self.assertIn(pf_id, pf_ids)
self.assertEqual(obj, self.portforwardings_dict[pf_id])
self.assertEqual(
len(pf_ids), len(self.mapping.managed_port_forwardings.keys()))
fip_pf_set = {
self.floatingip1: set(
[self.portforwarding1.id, self.portforwarding2.id]),
self.floatingip2: set([self.portforwarding3.id]),
self.floatingip3: set([self.portforwarding4.id])
}
for fip_id, pf_set in self.mapping.fip_port_forwarding.items():
self.assertIn(
fip_id, [self.floatingip1, self.floatingip2, self.floatingip3])
self.assertEqual(0, len(pf_set - fip_pf_set[fip_id]))
self.assertEqual(
len([self.floatingip1, self.floatingip2, self.floatingip3]),
len(self.mapping.fip_port_forwarding))
router_fip = {
self.router1: set([self.floatingip1, self.floatingip2]),
self.router2: set([self.floatingip3])
}
for router_id, fip_set in self.mapping.router_fip_mapping.items():
self.assertIn(router_id, [self.router1, self.router2])
self.assertEqual(0, len(fip_set - router_fip[router_id]))
self.assertEqual(
len([self.router1, self.router2]),
len(self.mapping.router_fip_mapping.keys()))
def test_update_port_forwarding(self):
self._set_pf()
new_pf1 = pf_obj.PortForwarding(
context=None, id=self.portforwarding2.id,
floatingip_id=self.floatingip1,
external_port=11122, protocol='tcp',
internal_port_id=self.portforwarding2.internal_port_id,
internal_ip_address='1.1.1.22', internal_port=11122,
floating_ip_address='111.111.111.111',
router_id=self.router1)
self.mapping.update_port_forwardings([new_pf1])
self.assertEqual(
new_pf1,
self.mapping.managed_port_forwardings[self.portforwarding2.id])
def test_del_port_forwardings(self):
self._set_pf()
del_pfs = [self.portforwarding3, self.portforwarding2,
self.portforwarding4]
self.mapping.del_port_forwardings(del_pfs)
self.assertEqual(
[self.portforwarding1.id],
list(self.mapping.managed_port_forwardings.keys()))
self.assertEqual({self.floatingip1: set([self.portforwarding1.id])},
self.mapping.fip_port_forwarding)
self.assertEqual({self.router1: set([self.floatingip1])},
self.mapping.router_fip_mapping)
def test_clear_by_fip(self):
self._set_pf()
self.mapping.clear_by_fip(self.floatingip1, self.router1)
router_fip = {
self.router1: set([self.floatingip2]),
self.router2: set([self.floatingip3])
}
for router_id, fip_set in self.mapping.router_fip_mapping.items():
self.assertIn(router_id, [self.router1, self.router2])
self.assertEqual(0, len(fip_set - router_fip[router_id]))
fip_pf_set = {
self.floatingip2: set([self.portforwarding3.id]),
self.floatingip3: set([self.portforwarding4.id])
}
for fip_id, pf_set in self.mapping.fip_port_forwarding.items():
self.assertIn(
fip_id, [self.floatingip2, self.floatingip3])
self.assertEqual(0, len(pf_set - fip_pf_set[fip_id]))
self.assertEqual(
len([self.floatingip2, self.floatingip3]),
len(self.mapping.fip_port_forwarding))
pfs_dict = {self.portforwarding3.id: self.portforwarding3,
self.portforwarding4.id: self.portforwarding4}
for pf_id, obj in self.mapping.managed_port_forwardings.items():
self.assertIn(pf_id,
[self.portforwarding3.id, self.portforwarding4.id])
self.assertEqual(obj, pfs_dict[pf_id])
self.assertEqual(
len([self.portforwarding3.id, self.portforwarding4.id]),
len(self.mapping.managed_port_forwardings.keys()))

View File

@ -3594,3 +3594,49 @@ class TestBasicRouterOperations(BasicRouterOperationsFramework):
pass
self.assertTrue(mock_delete.called)
self.assertFalse(mock_dscm.called)
@mock.patch.object(lla.LinkLocalAllocator, '_write')
@mock.patch.object(l3router.RouterInfo, '_get_gw_ips_cidr')
def test_process_floating_ip_addresses_not_care_port_forwarding(
self, mock_get_gw_cidr, mock_lla_write):
pf_used_fip = [{'cidr': '15.1.2.4/32'}, {'cidr': '15.1.2.5/32'}]
gw_cidr = {'cidr': '15.1.2.79/24'}
need_to_remove_fip = [{'cidr': '15.1.2.99/32'}]
fake_floatingips = {'floatingips': [
{'id': _uuid(),
'floating_ip_address': '15.1.2.3',
'fixed_ip_address': '192.168.0.1',
'status': 'DOWN',
'floating_network_id': _uuid(),
'port_id': _uuid(),
'host': HOSTNAME}]}
router = l3_test_common.prepare_router_data(enable_snat=True)
router[lib_constants.FLOATINGIP_KEY] = fake_floatingips['floatingips']
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
ri = l3router.RouterInfo(agent, router['id'],
router, **self.ri_kwargs)
ri.centralized_port_forwarding_fip_set = set(
[i['cidr'] for i in pf_used_fip])
ri.iptables_manager.ipv4['nat'] = mock.MagicMock()
ri.get_external_device_name = mock.Mock(return_value='exgw')
floating_ips = ri.get_floating_ips()
fip_id = floating_ips[0]['id']
device = self.mock_ip_dev
device.addr.list.return_value = (
pf_used_fip + need_to_remove_fip + [gw_cidr])
ri.iptables_manager.ipv4['nat'] = mock.MagicMock()
mock_get_gw_cidr.return_value = set([gw_cidr['cidr']])
ri.get_centralized_router_cidrs = mock.Mock(
return_value=set())
ri.add_floating_ip = mock.Mock(
return_value=lib_constants.FLOATINGIP_STATUS_ACTIVE)
ri.remove_floating_ip = mock.Mock()
fip_statuses = ri.process_floating_ip_addresses(
mock.sentinel.interface_name)
self.assertEqual({fip_id: lib_constants.FLOATINGIP_STATUS_ACTIVE},
fip_statuses)
ri.add_floating_ip.assert_called_once_with(
floating_ips[0], mock.sentinel.interface_name, device)
ri.remove_floating_ip.assert_called_once_with(
device, need_to_remove_fip[0]['cidr'])

View File

@ -1791,3 +1791,21 @@ class TestSysctl(base.BaseTestCase):
return_value=False):
dev.disable_ipv6()
self.assertFalse(self.execute.called)
class TestConntrack(base.BaseTestCase):
def setUp(self):
super(TestConntrack, self).setUp()
self.execute_p = mock.patch.object(ip_lib.IpNetnsCommand, 'execute')
self.execute = self.execute_p.start()
def test_delete_socket_conntrack_state(self):
device = ip_lib.IPDevice('tap0', 'ns1')
ip_str = '1.1.1.1'
dport = '3378'
protocol = 'tcp'
expect_cmd = ["conntrack", "-D", "-d", ip_str, '-p', protocol,
'--dport', dport]
device.delete_socket_conntrack_state(ip_str, dport, protocol)
self.execute.assert_called_once_with(expect_cmd, check_exit_code=True,
extra_ok_codes=[1])

View File

@ -108,6 +108,7 @@ neutron.agent.l2.extensions =
log = neutron.services.logapi.agent.log_extension:LoggingExtension
neutron.agent.l3.extensions =
fip_qos = neutron.agent.l3.extensions.qos.fip:FipQosAgentExtension
port_forwarding = neutron.agent.l3.extensions.port_forwarding:PortForwardingAgentExtension
neutron.services.logapi.drivers =
ovs = neutron.services.logapi.drivers.openvswitch.ovs_firewall_log:OVSFirewallLoggingDriver
neutron.qos.agent_drivers =