[server side] Floating IP port forwarding plugin

This patch implements the plugin.
This patch introduces an new service plugin for port forwarding resources,
named 'pf_plugin', and supports create/update/delete port forwarding
operation towards a free Floating IP.

This patch including some works below:
* Introduces portforwarding extension and the base class of plugin
* Introduces portforwarding plugin, support CRUD port forwarding
resources
* Add the policy of portforwarding

The race issue fix in:
https://review.openstack.org/#/c/574673/

Fip extend port forwarding field addition in:
https://review.openstack.org/#/c/575326/

Partially-Implements: blueprint port-forwarding
Change-Id: Ibc446f8234bff80d5b16c988f900d3940245ba89
Partial-Bug: #1491317
This commit is contained in:
ZhaoBo 2018-07-03 17:05:36 +08:00
parent ca0c2746a8
commit 21ae99d5b3
13 changed files with 1086 additions and 2 deletions

View File

@ -248,5 +248,11 @@
"get_log": "rule:admin_only",
"get_logs": "rule:admin_only",
"update_log": "rule:admin_only",
"delete_log": "rule:admin_only"
"delete_log": "rule:admin_only",
"create_floatingip_port_forwarding": "rule:regular_user",
"get_floatingip_port_forwarding": "rule:regular_user",
"get_floatingip_port_forwardings": "rule:regular_user",
"update_floatingip_port_forwarding": "rule:regular_user",
"delete_floatingip_port_forwarding": "rule:regular_user"
}

View File

@ -122,6 +122,9 @@ class L3RpcCallback(object):
self._ensure_host_set_on_ports(context, host, routers)
# refresh the data structure after ports are bound
routers = self._routers_to_sync(context, router_ids, host)
pf_plugin = directory.get_plugin(plugin_constants.PORTFORWARDING)
if pf_plugin:
pf_plugin.sync_port_forwarding_fip(context, routers)
return routers
def _routers_to_sync(self, context, router_ids, host=None):

View File

@ -0,0 +1,116 @@
# Copyright (c) 2018 OpenStack Foundation
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import itertools
from neutron_lib.api.definitions import floating_ip_port_forwarding as apidef
from neutron_lib.api import extensions as api_extensions
from neutron_lib.plugins import constants as plugin_consts
from neutron_lib.plugins import directory
from neutron_lib.services import base as service_base
import six
from neutron.api import extensions
from neutron.api.v2 import base
from neutron.api.v2 import resource_helper
class Floating_ip_port_forwarding(api_extensions.APIExtensionDescriptor):
"""Floating IP Port Forwarding API extension."""
api_definition = apidef
@classmethod
def get_plugin_interface(cls):
return PortForwardingPluginBase
@classmethod
def get_resources(cls):
"""Returns Ext Resources."""
special_mappings = {'floatingips': 'floatingip'}
plural_mappings = resource_helper.build_plural_mappings(
special_mappings, itertools.chain(
apidef.RESOURCE_ATTRIBUTE_MAP,
apidef.SUB_RESOURCE_ATTRIBUTE_MAP))
resources = resource_helper.build_resource_info(
plural_mappings,
apidef.RESOURCE_ATTRIBUTE_MAP,
plugin_consts.PORTFORWARDING,
translate_name=True,
allow_bulk=True)
plugin = directory.get_plugin(plugin_consts.PORTFORWARDING)
parent = apidef.SUB_RESOURCE_ATTRIBUTE_MAP[
apidef.COLLECTION_NAME].get('parent')
params = apidef.SUB_RESOURCE_ATTRIBUTE_MAP[apidef.COLLECTION_NAME].get(
'parameters')
controller = base.create_resource(apidef.COLLECTION_NAME,
apidef.RESOURCE_NAME,
plugin, params,
allow_bulk=True,
parent=parent,
allow_pagination=True,
allow_sorting=True)
resource = extensions.ResourceExtension(
apidef.COLLECTION_NAME,
controller, parent,
attr_map=params)
resources.append(resource)
return resources
@six.add_metaclass(abc.ABCMeta)
class PortForwardingPluginBase(service_base.ServicePluginBase):
path_prefix = apidef.API_PREFIX
@classmethod
def get_plugin_type(cls):
return plugin_consts.PORTFORWARDING
def get_plugin_description(self):
return "Port Forwarding Service Plugin"
@abc.abstractmethod
def create_floatingip_port_forwarding(self, context, floatingip_id,
port_forwarding):
pass
@abc.abstractmethod
def update_floatingip_port_forwarding(self, context, id, floatingip_id,
port_forwarding):
pass
@abc.abstractmethod
def get_floatingip_port_forwarding(self, context, id, floatingip_id,
fields=None):
pass
@abc.abstractmethod
def get_floatingip_port_forwardings(self, context, floatingip_id=None,
filters=None, fields=None, sorts=None,
limit=None, marker=None,
page_reverse=False):
pass
@abc.abstractmethod
def delete_floatingip_port_forwarding(self, context, id, floatingip_id):
pass

View File

@ -0,0 +1,25 @@
# Copyright 2018 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron._i18n import _
from neutron_lib import exceptions as n_exc
class PortForwardingNotFound(n_exc.NotFound):
message = _("Port Forwarding %(id)s could not be found.")
class PortForwardingNotSupportFilterField(n_exc.BadRequest):
message = _("Port Forwarding filter %(filter)s is not supported.")

View File

@ -0,0 +1,339 @@
# Copyright (c) 2018 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import functools
import netaddr
from neutron_lib.api.definitions import floating_ip_port_forwarding as apidef
from neutron_lib.callbacks import registry
from neutron_lib import constants as lib_consts
from neutron_lib.db import utils as db_utils
from neutron_lib import exceptions as lib_exc
from neutron_lib.exceptions import l3 as lib_l3_exc
from neutron_lib.objects import exceptions as obj_exc
from neutron_lib.plugins import constants
from neutron_lib.plugins import directory
from neutron._i18n import _
from neutron.api.rpc.callbacks import events as rpc_events
from neutron.api.rpc.handlers import resources_rpc
from neutron.common import utils
from neutron.db import _resource_extend as resource_extend
from neutron.db import api as db_api
from neutron.db import db_base_plugin_common
from neutron.extensions import floating_ip_port_forwarding as fip_pf
from neutron.objects import base as base_obj
from neutron.objects import port_forwarding as pf
from neutron.objects import router
from neutron.services.portforwarding.common import exceptions as pf_exc
def make_result_with_fields(f):
@functools.wraps(f)
def inner(*args, **kwargs):
fields = kwargs.get('fields')
result = f(*args, **kwargs)
if fields is None:
return result
elif isinstance(result, list):
return [db_utils.resource_fields(r, fields) for r in result]
else:
return db_utils.resource_fields(result, fields)
return inner
@resource_extend.has_resource_extenders
@registry.has_registry_receivers
class PortForwardingPlugin(fip_pf.PortForwardingPluginBase):
"""Implementation of the Neutron Port Forwarding Service Plugin.
This class implements a Port Forwarding plugin.
"""
supported_extension_aliases = ['floating-ip-port-forwarding']
__native_pagination_support = True
__native_sorting_support = True
def __init__(self):
super(PortForwardingPlugin, self).__init__()
self.push_api = resources_rpc.ResourcesPushRpcApi()
self.l3_plugin = directory.get_plugin(constants.L3)
self.core_plugin = directory.get_plugin()
def _get_internal_ip_subnet(self, request_ip, fixed_ips):
request_ip = netaddr.IPNetwork(request_ip)
for fixed_ip in fixed_ips:
if netaddr.IPNetwork(fixed_ip['ip_address']) == request_ip:
return fixed_ip['subnet_id']
def _find_a_router_for_fip_port_forwarding(
self, context, pf_dict, fip_obj):
internal_port_id = pf_dict['internal_port_id']
internal_port = self.core_plugin.get_port(context, internal_port_id)
v4_fixed_ips = [fixed_ip for fixed_ip in internal_port['fixed_ips']
if (netaddr.IPNetwork(fixed_ip['ip_address']
).version ==
lib_consts.IP_VERSION_4)]
if not v4_fixed_ips:
# As port forwarding works with ipv4 addresses,
# if there is no ipv4 address, we need to raise.
message = _("Requested internal port %s must allocate "
"an IPv4 address at least.") % internal_port_id
raise lib_exc.BadRequest(resource=apidef.RESOURCE_NAME,
msg=message)
# Get the internal ip address, if not specified, choose the first ipv4
# address.
internal_ip_address = pf_dict.get('internal_ip_address')
if not internal_ip_address:
internal_ip_address = v4_fixed_ips[0]['ip_address']
pf_dict['internal_ip_address'] = internal_ip_address
internal_subnet_id = v4_fixed_ips[0]['subnet_id']
else:
# check the matched fixed ip
internal_subnet_id = self._get_internal_ip_subnet(
internal_ip_address, v4_fixed_ips)
if not internal_subnet_id:
message = _(
"Requested internal IP address %(internal_ip_address)s is "
"not suitable for internal neutron port "
"%(internal_port_id)s, as its fixed_ips are "
"%(fixed_ips)s") % {
'internal_ip_address': internal_ip_address,
'internal_port_id': internal_port['id'],
'fixed_ips': v4_fixed_ips}
raise lib_exc.BadRequest(resource=apidef.RESOURCE_NAME,
msg=message)
internal_subnet = self.core_plugin.get_subnet(
context, internal_subnet_id)
external_network_id = fip_obj.floating_network_id
try:
return self.l3_plugin.get_router_for_floatingip(
context, internal_port, internal_subnet, external_network_id)
except lib_l3_exc.ExternalGatewayForFloatingIPNotFound:
message = _(
"External network %(external_net_id)s is not reachable from "
"subnet %(internal_subnet_id)s. Cannot set "
"Port forwarding for port %(internal_port_id)s with "
"Floating IP %(port_forwarding_id)s") % {
'external_net_id': external_network_id,
'internal_subnet_id': internal_subnet_id,
'internal_port_id': internal_port_id,
'port_forwarding_id': fip_obj.id}
raise lib_exc.BadRequest(resource=apidef.RESOURCE_NAME,
msg=message)
@db_base_plugin_common.convert_result_to_dict
def create_floatingip_port_forwarding(self, context, floatingip_id,
port_forwarding):
port_forwarding = port_forwarding.get(apidef.RESOURCE_NAME)
port_forwarding['floatingip_id'] = floatingip_id
pf_obj = pf.PortForwarding(context, **port_forwarding)
try:
with db_api.context_manager.writer.using(context):
fip_obj = self._get_fip_obj(context, floatingip_id)
router_id = self._find_a_router_for_fip_port_forwarding(
context, port_forwarding, fip_obj)
# If this func does not raise an exception, means the
# router_id matched.
# case1: fip_obj.router_id = None
# case2: fip_obj.router_id is the same with we selected.
self._check_router_match(context, fip_obj,
router_id, port_forwarding)
if not fip_obj.router_id:
fip_obj.router_id = router_id
fip_obj.update()
pf_obj.create()
except obj_exc.NeutronDbObjectDuplicateEntry:
(__, conflict_params) = self._find_existing_port_forwarding(
context, floatingip_id, port_forwarding)
message = _("A duplicate port forwarding entry with same "
"attributes already exists, conflicting values "
"are %s") % conflict_params
raise lib_exc.BadRequest(resource=apidef.RESOURCE_NAME,
msg=message)
self.push_api.push(context, [pf_obj], rpc_events.CREATED)
return pf_obj
@db_base_plugin_common.convert_result_to_dict
def update_floatingip_port_forwarding(self, context, id, floatingip_id,
port_forwarding):
port_forwarding = port_forwarding.get(apidef.RESOURCE_NAME)
new_internal_port_id = None
if port_forwarding and port_forwarding.get('internal_port_id'):
new_internal_port_id = port_forwarding.get('internal_port_id')
try:
with db_api.context_manager.writer.using(context):
fip_obj = self._get_fip_obj(context, floatingip_id)
pf_obj = pf.PortForwarding.get_object(context, id=id)
if not pf_obj:
raise pf_exc.PortForwardingNotFound(id=id)
ori_internal_port_id = pf_obj.internal_port_id
if new_internal_port_id and (new_internal_port_id !=
ori_internal_port_id):
router_id = self._find_a_router_for_fip_port_forwarding(
context, port_forwarding, fip_obj)
self._check_router_match(context, fip_obj,
router_id, port_forwarding)
# As the socket will update when dict contains
# internal_ip_address and internal_port.
internal_ip_address = port_forwarding.get(
'internal_ip_address')
internal_port = port_forwarding.get('internal_port')
if any([internal_ip_address, internal_port]):
port_forwarding.update({
'internal_ip_address': internal_ip_address
if internal_ip_address else
str(pf_obj.internal_ip_address),
'internal_port': internal_port if internal_port else
pf_obj.internal_port
})
pf_obj.update_fields(port_forwarding, reset_changes=True)
pf_obj.update()
except obj_exc.NeutronDbObjectDuplicateEntry:
(__, conflict_params) = self._find_existing_port_forwarding(
context, floatingip_id, pf_obj.to_dict())
message = _("A duplicate port forwarding entry with same "
"attributes already exists, conflicting values "
"are %s") % conflict_params
raise lib_exc.BadRequest(resource=apidef.RESOURCE_NAME,
msg=message)
self.push_api.push(context, [pf_obj], rpc_events.UPDATED)
return pf_obj
def _check_router_match(self, context, fip_obj, router_id, pf_dict):
internal_port_id = pf_dict['internal_port_id']
if fip_obj.router_id and fip_obj.router_id != router_id:
objs = pf.PortForwarding.get_objects(
context, floatingip_id=fip_obj.id,
internal_ip_address=pf_dict['internal_ip_address'],
internal_port=pf_dict['internal_port'])
if objs:
message = _("Floating IP %(floatingip_id)s with params: "
"internal_ip_address: %(internal_ip_address)s, "
"internal_port: %(internal_port)s "
"already exists") % {
'floatingip_id': fip_obj.id,
'internal_ip_address': pf_dict['internal_ip_address'],
'internal_port': pf_dict['internal_port']}
else:
message = _("The Floating IP %(floatingip_id)s had been set "
"on router %(router_id)s, the internal Neutron "
"port %(internal_port_id)s can not reach it") % {
'floatingip_id': fip_obj.id,
'router_id': fip_obj.router_id,
'internal_port_id': internal_port_id}
raise lib_exc.BadRequest(resource=apidef.RESOURCE_NAME,
msg=message)
def _find_existing_port_forwarding(self, context, floatingip_id,
port_forwarding, specify_params=None):
# Because the session had been flushed by NeutronDbObjectDuplicateEntry
# so if we want to use the context to get another db queries, we need
# to rollback first.
context.session.rollback()
if not specify_params:
specify_params = [
{'floatingip_id': floatingip_id,
'external_port': port_forwarding['external_port']},
{'internal_port_id': port_forwarding['internal_port_id'],
'internal_ip_address': port_forwarding['internal_ip_address'],
'internal_port': port_forwarding['internal_port']}]
for param in specify_params:
objs = pf.PortForwarding.get_objects(context, **param)
if objs:
return (objs[0], param)
def _get_fip_obj(self, context, fip_id):
fip_obj = router.FloatingIP.get_object(context, id=fip_id)
if not fip_obj:
raise lib_l3_exc.FloatingIPNotFound(floatingip_id=fip_id)
return fip_obj
@make_result_with_fields
@db_base_plugin_common.convert_result_to_dict
def get_floatingip_port_forwarding(self, context, id, floatingip_id,
fields=None):
self._get_fip_obj(context, floatingip_id)
obj = pf.PortForwarding.get_object(context, id=id)
if not obj:
raise pf_exc.PortForwardingNotFound(id=id)
return obj
def _validate_filter_for_port_forwarding(self, request_filter):
if not request_filter:
return
for filter_member_key in request_filter.keys():
if filter_member_key in pf.FIELDS_NOT_SUPPORT_FILTER:
raise pf_exc.PortForwardingNotSupportFilterField(
filter=filter_member_key)
@make_result_with_fields
@db_base_plugin_common.convert_result_to_dict
def get_floatingip_port_forwardings(self, context, floatingip_id=None,
filters=None, fields=None, sorts=None,
limit=None, marker=None,
page_reverse=False):
self._get_fip_obj(context, floatingip_id)
filters = filters or {}
self._validate_filter_for_port_forwarding(filters)
pager = base_obj.Pager(sorts, limit, page_reverse, marker)
return pf.PortForwarding.get_objects(
context, _pager=pager, floatingip_id=floatingip_id, **filters)
def delete_floatingip_port_forwarding(self, context, id, floatingip_id):
pf_obj = pf.PortForwarding.get_object(context, id=id)
if not pf_obj or pf_obj.floatingip_id != floatingip_id:
raise pf_exc.PortForwardingNotFound(id=id)
with db_api.context_manager.writer.using(context):
fip_obj = self._get_fip_obj(context, pf_obj.floatingip_id)
pf_objs = pf.PortForwarding.get_objects(
context, floatingip_id=pf_obj.floatingip_id)
if len(pf_objs) == 1 and pf_objs[0].id == pf_obj.id:
fip_obj.update_fields({'router_id': None})
fip_obj.update()
pf_obj.delete()
self.push_api.push(context, [pf_obj], rpc_events.DELETED)
def sync_port_forwarding_fip(self, context, routers):
if not routers:
return
router_ids = [router.get('id') for router in routers]
router_pf_fip_set = collections.defaultdict(set)
fip_pfs = collections.defaultdict(set)
router_fip = collections.defaultdict(set)
item_pf_fields = pf.PortForwarding.get_port_forwarding_obj_by_routers(
context, router_ids)
for router_id, fip_addr, pf_id, fip_id in item_pf_fields:
router_pf_fip_set[router_id].add(utils.ip_to_cidr(fip_addr, 32))
fip_pfs[fip_id].add(pf_id)
router_fip[router_id].add(fip_id)
for router in routers:
if router['id'] in router_fip:
router['port_forwardings_fip_set'] = router_pf_fip_set[
router['id']]
router['fip_managed_by_port_forwardings'] = router_fip[
router['id']]

View File

@ -248,5 +248,11 @@
"get_log": "rule:admin_only",
"get_logs": "rule:admin_only",
"update_log": "rule:admin_only",
"delete_log": "rule:admin_only"
"delete_log": "rule:admin_only",
"create_floatingip_port_forwarding": "rule:regular_user",
"get_floatingip_port_forwarding": "rule:regular_user",
"get_floatingip_port_forwardings": "rule:regular_user",
"update_floatingip_port_forwarding": "rule:regular_user",
"delete_floatingip_port_forwarding": "rule:regular_user"
}

View File

@ -0,0 +1,262 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from neutron_lib.api.definitions import floating_ip_port_forwarding as apidef
from neutron_lib import exceptions as lib_exc
from oslo_utils import uuidutils
from neutron.services.portforwarding.common import exceptions as pf_exc
from neutron.services.portforwarding import pf_plugin
from neutron.tests.unit.plugins.ml2 import base as ml2_test_base
class PortForwardingTestCaseBase(ml2_test_base.ML2TestFramework):
def setUp(self):
super(PortForwardingTestCaseBase, self).setUp()
self.pf_plugin = pf_plugin.PortForwardingPlugin()
def _create_floatingip(self, network_id, port_id=None,
fixed_ip_address=None):
body = {"floating_network_id": network_id,
"port_id": port_id,
"fixed_ip_address": fixed_ip_address,
"tenant_id": self._tenant_id,
"project_id": self._tenant_id}
return self.l3_plugin.create_floatingip(
self.context,
{"floatingip": body})
def _get_floatingip(self, floatingip_id):
return self.l3_plugin.get_floatingip(self.context, floatingip_id)
def _add_router_interface(self, router_id, subnet_id):
interface_info = {"subnet_id": subnet_id}
self.l3_plugin.add_router_interface(
self.context, router_id, interface_info=interface_info)
def _set_router_gw(self, router_id, ext_net_id):
body = {
'router':
{'external_gateway_info': {'network_id': ext_net_id}}}
self.l3_plugin.update_router(self.context, router_id, body)
class PortForwardingTestCase(PortForwardingTestCaseBase):
def setUp(self):
super(PortForwardingTestCase, self).setUp()
self._prepare_env()
def _prepare_env(self):
self.router = self._create_router()
self.ext_net = self._create_network(
self.fmt, 'ext-net', True, arg_list=("router:external",),
**{"router:external": True}).json['network']
self.ext_subnet = self._create_subnet(
self.fmt, self.ext_net['id'], '172.24.2.0/24').json['subnet']
self.net = self._create_network(self.fmt, 'private', True).json[
'network']
self.subnet = self._create_subnet(self.fmt, self.net['id'],
'10.0.0.0/24').json['subnet']
self._set_router_gw(self.router['id'], self.ext_net['id'])
self._add_router_interface(self.router['id'], self.subnet['id'])
self.fip = self._create_floatingip(self.ext_net['id'])
self.port = self._create_port(self.fmt, self.net['id']).json['port']
self.port_forwarding = {
apidef.RESOURCE_NAME:
{apidef.EXTERNAL_PORT: 2225,
apidef.INTERNAL_PORT: 25,
apidef.INTERNAL_PORT_ID: self.port['id'],
apidef.PROTOCOL: "tcp",
apidef.INTERNAL_IP_ADDRESS:
self.port['fixed_ips'][0]['ip_address']}}
def test_create_floatingip_port_forwarding(self):
res = self.pf_plugin.create_floatingip_port_forwarding(
self.context, self.fip['id'], self.port_forwarding)
expect = {
"external_port": 2225,
"internal_port": 25,
"internal_port_id": self.port['id'],
"protocol": "tcp",
"internal_ip_address": self.port['fixed_ips'][0]['ip_address'],
'id': mock.ANY,
'router_id': self.router['id'],
'floating_ip_address': self.fip['floating_ip_address'],
'floatingip_id': self.fip['id']}
self.assertEqual(expect, res)
def test_negative_create_floatingip_port_forwarding(self):
self.pf_plugin.create_floatingip_port_forwarding(
self.context, self.fip['id'], self.port_forwarding)
# This will be fail with the same params
self.assertRaises(lib_exc.BadRequest,
self.pf_plugin.create_floatingip_port_forwarding,
self.context, self.fip['id'], self.port_forwarding)
def test_update_floatingip_port_forwarding(self):
# create a test port forwarding
res = self.pf_plugin.create_floatingip_port_forwarding(
self.context, self.fip['id'], self.port_forwarding)
# update the socket port only
update_body = {
apidef.RESOURCE_NAME: {
"external_port": 2226,
"internal_port": 26,
"protocol": "udp"
}
}
update_res = self.pf_plugin.update_floatingip_port_forwarding(
self.context, res['id'], self.fip['id'], update_body)
expect = {
"external_port": 2226,
"internal_port": 26,
"internal_port_id": self.port['id'],
"protocol": "udp",
"internal_ip_address": self.port['fixed_ips'][0]['ip_address'],
'id': res['id'],
'router_id': self.router['id'],
'floating_ip_address': self.fip['floating_ip_address'],
'floatingip_id': self.fip['id']}
self.assertEqual(expect, update_res)
# update the neutron port and success
new_port = self._create_port(self.fmt, self.net['id']).json['port']
update_body = {
apidef.RESOURCE_NAME: {
"external_port": 2227,
"internal_port": 27,
"protocol": "tcp",
"internal_port_id": new_port['id'],
"internal_ip_address": new_port['fixed_ips'][0]['ip_address']
}
}
update_res = self.pf_plugin.update_floatingip_port_forwarding(
self.context, res['id'], self.fip['id'], update_body)
expect = {
"external_port": 2227,
"internal_port": 27,
"internal_port_id": new_port['id'],
"protocol": "tcp",
"internal_ip_address": new_port['fixed_ips'][0]['ip_address'],
'id': res['id'],
'router_id': self.router['id'],
'floating_ip_address': self.fip['floating_ip_address'],
'floatingip_id': self.fip['id']}
self.assertEqual(expect, update_res)
def test_negative_update_floatingip_port_forwarding(self):
# prepare a port forwarding
res = self.pf_plugin.create_floatingip_port_forwarding(
self.context, self.fip['id'], self.port_forwarding)
# prepare another port and make its gateway set on other router
new_router = self._create_router()
new_subnet = self._create_subnet(self.fmt, self.net['id'],
'11.0.0.0/24').json['subnet']
self._set_router_gw(new_router['id'], self.ext_net['id'])
self._add_router_interface(new_router['id'], new_subnet['id'])
# create a port based on the new subnet
new_port = self._create_port(
self.fmt, self.net['id'],
fixed_ips=[{'subnet_id': new_subnet['id']}]).json['port']
update_body = {
apidef.RESOURCE_NAME: {
"external_port": 2227,
"internal_port": 27,
"protocol": "tcp",
"internal_port_id": new_port['id'],
"internal_ip_address": new_port['fixed_ips'][0]['ip_address']
}
}
# This will be fail, as the new found router_id not match.
self.assertRaises(lib_exc.BadRequest,
self.pf_plugin.update_floatingip_port_forwarding,
self.context, res['id'], self.fip['id'], update_body)
# There is already a port forwarding. We create another port forwarding
# with the new_port, and update the new one with the same params of the
# existing one.
new_port = self._create_port(self.fmt, self.net['id']).json['port']
self.port_forwarding[apidef.RESOURCE_NAME].update({
'internal_port_id': new_port['id'],
'internal_ip_address': new_port['fixed_ips'][0]['ip_address'],
'external_port': self.port_forwarding[
apidef.RESOURCE_NAME]['external_port'] + 1
})
new_res = self.pf_plugin.create_floatingip_port_forwarding(
self.context, self.fip['id'], self.port_forwarding)
self.port_forwarding[apidef.RESOURCE_NAME].update({
'internal_port_id': self.port['id'],
'internal_ip_address': self.port['fixed_ips'][0]['ip_address'],
'external_port': self.port_forwarding[
apidef.RESOURCE_NAME]['external_port'] - 1
})
# This will be fail, as the duplicate record.
self.assertRaises(lib_exc.BadRequest,
self.pf_plugin.update_floatingip_port_forwarding,
self.context, new_res['id'], self.fip['id'],
update_body)
def test_delete_floatingip_port_forwarding(self):
# create two port forwardings for a floatingip
pf_1 = self.pf_plugin.create_floatingip_port_forwarding(
self.context, self.fip['id'], self.port_forwarding)
new_port = self._create_port(self.fmt, self.net['id']).json['port']
self.port_forwarding[apidef.RESOURCE_NAME].update({
'external_port': 2226,
'internal_port_id': new_port['id'],
'internal_ip_address': new_port['fixed_ips'][0]['ip_address']
})
pf_2 = self.pf_plugin.create_floatingip_port_forwarding(
self.context, self.fip['id'], self.port_forwarding)
floatingip = self._get_floatingip(self.fip['id'])
self.assertEqual(self.router['id'], floatingip['router_id'])
# delete pf_1, check the router_id of floatingip is not change.
self.pf_plugin.delete_floatingip_port_forwarding(
self.context, pf_1['id'], self.fip['id'])
exist_pfs = self.pf_plugin.get_floatingip_port_forwardings(
self.context, floatingip_id=self.fip['id'])
self.assertEqual(1, len(exist_pfs))
self.assertEqual(pf_2['id'], exist_pfs[0]['id'])
# delete pf_2, it's the last port forwarding of floatingip.
self.pf_plugin.delete_floatingip_port_forwarding(
self.context, pf_2['id'], self.fip['id'])
exist_pfs = self.pf_plugin.get_floatingip_port_forwardings(
self.context, floatingip_id=self.fip['id'])
self.assertEqual(0, len(exist_pfs))
floatingip = self._get_floatingip(self.fip['id'])
self.assertIsNone(floatingip['router_id'])
def test_negative_delete_floatingip_port_forwarding(self):
# prepare a good port forwarding
res = self.pf_plugin.create_floatingip_port_forwarding(
self.context, self.fip['id'], self.port_forwarding)
# pass non-existing port forwarding id
self.assertRaises(pf_exc.PortForwardingNotFound,
self.pf_plugin.delete_floatingip_port_forwarding,
self.context, uuidutils.generate_uuid(),
self.fip['id'])
# pass existing port forwarding but non-existing floatingip_id
self.assertRaises(pf_exc.PortForwardingNotFound,
self.pf_plugin.delete_floatingip_port_forwarding,
self.context, res['id'], uuidutils.generate_uuid())

View File

@ -0,0 +1,326 @@
# Copyright (C) 2018 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from neutron_lib import context
from neutron_lib import exceptions as lib_exc
from neutron_lib.exceptions import l3 as lib_l3_exc
from neutron_lib.objects import exceptions as obj_exc
from neutron_lib.plugins import constants as lib_plugin_conts
from neutron_lib.plugins import directory
from oslo_config import cfg
from neutron.api.rpc.callbacks.consumer import registry as cons_registry
from neutron.api.rpc.callbacks import events as rpc_events
from neutron.api.rpc.callbacks.producer import registry as prod_registry
from neutron.api.rpc.callbacks import resource_manager
from neutron.api.rpc.handlers import resources_rpc
from neutron.db import db_base_plugin_v2
from neutron.db import l3_db
from neutron import manager
from neutron.objects import port_forwarding
from neutron.objects import router
from neutron.services.portforwarding.common import exceptions as pf_exc
from neutron.services.portforwarding import pf_plugin
from neutron.tests.unit import testlib_api
DB_PLUGIN_KLASS = 'neutron.db.db_base_plugin_v2.NeutronDbPluginV2'
class TestPortForwardingPlugin(testlib_api.SqlTestCase):
def setUp(self):
super(TestPortForwardingPlugin, self).setUp()
with mock.patch.object(
resource_manager.ResourceCallbacksManager, '_singleton',
new_callable=mock.PropertyMock(return_value=False)):
self.cons_mgr = resource_manager.ConsumerResourceCallbacksManager()
self.prod_mgr = resource_manager.ProducerResourceCallbacksManager()
for mgr in (self.cons_mgr, self.prod_mgr):
mgr.clear()
mock.patch.object(
cons_registry, '_get_manager', return_value=self.cons_mgr).start()
mock.patch.object(
prod_registry, '_get_manager', return_value=self.prod_mgr).start()
self.setup_coreplugin(load_plugins=False)
mock.patch('neutron.objects.db.api.create_object').start()
mock.patch('neutron.objects.db.api.update_object').start()
mock.patch('neutron.objects.db.api.delete_object').start()
mock.patch('neutron.objects.db.api.get_object').start()
# We don't use real models as per mocks above. We also need to mock-out
# methods that work with real data types
mock.patch(
'neutron.objects.base.NeutronDbObject.modify_fields_from_db'
).start()
cfg.CONF.set_override("core_plugin", DB_PLUGIN_KLASS)
cfg.CONF.set_override("service_plugins", ["router", "port_forwarding"])
manager.init()
self.pf_plugin = directory.get_plugin(lib_plugin_conts.PORTFORWARDING)
self.ctxt = context.Context('admin', 'fake_tenant')
mock.patch.object(self.ctxt.session, 'refresh').start()
mock.patch.object(self.ctxt.session, 'expunge').start()
@mock.patch.object(port_forwarding.PortForwarding, 'get_object')
def test_get_floatingip_port_forwarding(self, get_object_mock):
self.pf_plugin.get_floatingip_port_forwarding(
self.ctxt, 'pf_id', 'test-fip-id', fields=None)
get_object_mock.assert_called_once_with(self.ctxt, id='pf_id')
@mock.patch.object(port_forwarding.PortForwarding, 'get_object',
return_value=None)
def test_negative_get_floatingip_port_forwarding(self, get_object_mock):
self.assertRaises(
pf_exc.PortForwardingNotFound,
self.pf_plugin.get_floatingip_port_forwarding,
self.ctxt, 'pf_id', 'test-fip-id', fields=None)
@mock.patch.object(port_forwarding.PortForwarding, 'get_objects')
def test_get_floatingip_port_forwardings(self, get_objects_mock):
self.pf_plugin.get_floatingip_port_forwardings(self.ctxt)
get_objects_mock.assert_called_once_with(
self.ctxt, _pager=mock.ANY, floatingip_id=None)
@mock.patch.object(resources_rpc.ResourcesPushRpcApi, 'push')
@mock.patch.object(port_forwarding.PortForwarding, 'get_object')
@mock.patch.object(port_forwarding.PortForwarding, 'get_objects')
@mock.patch.object(router.FloatingIP, 'get_object')
def test_delete_floatingip_port_forwarding(
self, fip_get_object_mock, pf_get_objects_mock,
pf_get_object_mock, push_api_mock):
# After delete, not empty resource list
pf_get_objects_mock.return_value = [mock.Mock(id='pf_id'),
mock.Mock(id='pf_id2')]
pf_obj = mock.Mock(id='pf_id', floatingip_id='fip_id')
pf_get_object_mock.return_value = pf_obj
self.pf_plugin.delete_floatingip_port_forwarding(
self.ctxt, 'pf_id', 'fip_id')
pf_get_objects_mock.assert_called_once_with(
self.ctxt, floatingip_id='fip_id')
pf_obj.delete.assert_called()
push_api_mock.assert_called_once_with(
self.ctxt, mock.ANY, rpc_events.DELETED)
# After delete, empty resource list
pf_get_objects_mock.reset_mock()
pf_get_object_mock.reset_mock()
push_api_mock.reset_mock()
pf_obj = mock.Mock(id='need_to_delete_pf_id', floatingip_id='fip_id')
fip_obj = mock.Mock(id='fip_id')
fip_get_object_mock.return_value = fip_obj
pf_get_object_mock.return_value = pf_obj
pf_get_objects_mock.return_value = [
mock.Mock(id='need_to_delete_pf_id')]
self.pf_plugin.delete_floatingip_port_forwarding(
self.ctxt, 'need_to_delete_pf_id', 'fip_id')
pf_get_objects_mock.assert_called_once_with(
self.ctxt, floatingip_id='fip_id')
pf_get_object_mock.assert_called_once_with(
self.ctxt, id='need_to_delete_pf_id')
fip_obj.update_fields.assert_called_once_with({'router_id': None})
fip_obj.update.assert_called()
push_api_mock.assert_called_once_with(
self.ctxt, mock.ANY, rpc_events.DELETED)
@mock.patch.object(port_forwarding.PortForwarding, 'get_object')
def test_negative_delete_floatingip_port_forwarding(
self, pf_get_object_mock):
pf_get_object_mock.return_value = None
self.assertRaises(
pf_exc.PortForwardingNotFound,
self.pf_plugin.delete_floatingip_port_forwarding,
self.ctxt, 'pf_id', floatingip_id='fip_id')
@mock.patch.object(resources_rpc.ResourcesPushRpcApi, 'push')
@mock.patch.object(port_forwarding.PortForwarding, 'get_object')
def test_update_floatingip_port_forwarding(
self, mock_pf_get_object, mock_rpc_push):
pf_input = {
'port_forwarding':
{'port_forwarding': {
'internal_ip_address': '1.1.1.1',
'floatingip_id': 'fip_id'}},
'floatingip_id': 'fip_id'}
pf_obj = mock.Mock()
mock_pf_get_object.return_value = pf_obj
self.pf_plugin.update_floatingip_port_forwarding(
self.ctxt, 'pf_id', **pf_input)
mock_pf_get_object.assert_called_once_with(self.ctxt, id='pf_id')
self.assertTrue(pf_obj.update_fields)
self.assertTrue(pf_obj.update)
mock_rpc_push.assert_called_once_with(
self.ctxt, mock.ANY, rpc_events.UPDATED)
@mock.patch.object(port_forwarding.PortForwarding, 'get_object')
def test_negative_update_floatingip_port_forwarding(
self, mock_pf_get_object):
pf_input = {
'port_forwarding':
{'port_forwarding': {
'internal_ip_address': '1.1.1.1',
'floatingip_id': 'fip_id'}},
'floatingip_id': 'fip_id'}
mock_pf_get_object.return_value = None
self.assertRaises(
pf_exc.PortForwardingNotFound,
self.pf_plugin.update_floatingip_port_forwarding,
self.ctxt, 'pf_id', **pf_input)
@mock.patch.object(resources_rpc.ResourcesPushRpcApi, 'push')
@mock.patch.object(pf_plugin.PortForwardingPlugin, '_check_router_match')
@mock.patch.object(pf_plugin.PortForwardingPlugin,
'_find_a_router_for_fip_port_forwarding',
return_value='target_router_id')
@mock.patch.object(router.FloatingIP, 'get_object')
@mock.patch('neutron.objects.port_forwarding.PortForwarding')
def test_create_floatingip_port_forwarding(
self, mock_port_forwarding, mock_fip_get_object, mock_find_router,
mock_check_router_match, mock_push_api):
# Update fip
pf_input = {
'port_forwarding':
{'port_forwarding': {
'internal_ip_address': '1.1.1.1',
'floatingip_id': 'fip_id'}},
'floatingip_id': 'fip_id'}
pf_obj = mock.Mock()
fip_obj = mock.Mock()
mock_port_forwarding.return_value = pf_obj
mock_fip_get_object.return_value = fip_obj
fip_obj.router_id = ''
self.pf_plugin.create_floatingip_port_forwarding(
self.ctxt, **pf_input)
mock_port_forwarding.assert_called_once_with(
self.ctxt, **pf_input['port_forwarding']['port_forwarding'])
self.assertTrue(fip_obj.update.called)
self.assertTrue(pf_obj.create.called)
mock_push_api.assert_called_once_with(
self.ctxt, mock.ANY, rpc_events.CREATED)
# Not update fip
pf_obj.reset_mock()
fip_obj.reset_mock()
mock_port_forwarding.reset_mock()
mock_push_api.reset_mock()
mock_port_forwarding.return_value = pf_obj
fip_obj.router_id = 'router_id'
self.pf_plugin.create_floatingip_port_forwarding(
self.ctxt, **pf_input)
mock_port_forwarding.assert_called_once_with(
self.ctxt, **pf_input['port_forwarding']['port_forwarding'])
self.assertTrue(pf_obj.create.called)
self.assertFalse(fip_obj.update.called)
mock_push_api.assert_called_once_with(
self.ctxt, mock.ANY, rpc_events.CREATED)
@mock.patch.object(pf_plugin.PortForwardingPlugin,
'_find_existing_port_forwarding')
@mock.patch.object(pf_plugin.PortForwardingPlugin,
'_check_router_match')
@mock.patch.object(pf_plugin.PortForwardingPlugin,
'_find_a_router_for_fip_port_forwarding',
return_value='target_router_id')
@mock.patch.object(router.FloatingIP, 'get_object')
@mock.patch('neutron.objects.port_forwarding.PortForwarding')
def test_negative_create_floatingip_port_forwarding(
self, mock_port_forwarding, mock_fip_get_object,
mock_find_router,
mock_check_router_match, mock_try_find_exist):
pf_input = {
'port_forwarding': {
'internal_ip_address': '1.1.1.1',
'floatingip_id': 'fip_id'}}
pf_obj = mock.Mock()
fip_obj = mock.Mock()
mock_port_forwarding.return_value = pf_obj
mock_fip_get_object.return_value = fip_obj
pf_obj.create.side_effect = obj_exc.NeutronDbObjectDuplicateEntry(
mock.Mock(), mock.Mock())
mock_try_find_exist.return_value = ('pf_obj', 'conflict_param')
self.assertRaises(
lib_exc.BadRequest,
self.pf_plugin.create_floatingip_port_forwarding,
self.ctxt, 'fip_id', pf_input)
@mock.patch.object(pf_plugin.PortForwardingPlugin,
'_get_internal_ip_subnet')
@mock.patch.object(l3_db.L3_NAT_dbonly_mixin, 'get_router_for_floatingip')
@mock.patch.object(db_base_plugin_v2.NeutronDbPluginV2, 'get_port')
@mock.patch.object(db_base_plugin_v2.NeutronDbPluginV2, 'get_subnet')
def test_negative_find_a_router_for_fip_port_forwarding(
self, mock_get_subnet, mock_get_port, mock_get_router,
mock_get_ip_subnet):
fip_obj = mock.Mock()
pf_dict = {'internal_port_id': 'internal_neutron_port',
'internal_ip_address': '10.0.0.1'}
port_dict = {'id': 'ID', 'fixed_ips': [{"subnet_id": "test-subnet-id",
"ip_address": "10.0.0.1"}]}
mock_get_port.return_value = port_dict
mock_get_ip_subnet.return_value = None
self.assertRaises(
lib_exc.BadRequest,
self.pf_plugin._find_a_router_for_fip_port_forwarding,
self.ctxt, pf_dict, fip_obj)
self.assertTrue(not mock_get_subnet.called)
mock_get_ip_subnet.return_value = 'internal_subnet_id'
mock_get_router.side_effect = (
lib_l3_exc.ExternalGatewayForFloatingIPNotFound(
external_network_id=mock.Mock(),
subnet_id=mock.Mock(), port_id=mock.Mock()))
self.assertRaises(
lib_exc.BadRequest,
self.pf_plugin._find_a_router_for_fip_port_forwarding,
self.ctxt, pf_dict, fip_obj)
self.assertTrue(mock_get_subnet.called)
ipv6_port_dict = {'id': 'ID',
'fixed_ips': [{"subnet_id": "test-subnet-id",
"ip_address": "1::1"}]}
mock_get_port.return_value = ipv6_port_dict
self.assertRaises(
lib_exc.BadRequest,
self.pf_plugin._find_a_router_for_fip_port_forwarding,
self.ctxt, pf_dict, fip_obj)
@mock.patch.object(port_forwarding.PortForwarding, 'get_objects')
def test_negative_check_router_match(self, mock_pf_get_objects):
pf_dict = {
'internal_port_id': 'internal_neutron_port',
'internal_ip_address': 'internal_fixed_ip',
'internal_port': 'internal protocol port num'}
fip_obj = mock.Mock()
mock_pf_get_objects.return_value = ['Exist port forwardings']
router_id = 'selected router id'
self.assertRaises(lib_exc.BadRequest,
self.pf_plugin._check_router_match,
self.ctxt, fip_obj, router_id, pf_dict)
mock_pf_get_objects.return_value = []
self.assertRaises(lib_exc.BadRequest,
self.pf_plugin._check_router_match,
self.ctxt, fip_obj, router_id, pf_dict)

View File

@ -73,6 +73,7 @@ neutron.service_plugins =
trunk = neutron.services.trunk.plugin:TrunkPlugin
loki = neutron.services.loki.loki_plugin:LokiPlugin
log = neutron.services.logapi.logging_plugin:LoggingPlugin
port_forwarding = neutron.services.portforwarding.pf_plugin:PortForwardingPlugin
neutron.ml2.type_drivers =
flat = neutron.plugins.ml2.drivers.type_flat:FlatTypeDriver
local = neutron.plugins.ml2.drivers.type_local:LocalTypeDriver