Conntrack Helper - Plugin

Implements Conntrack Helper service plugin for conntrack
helper resources. Supports create, update and delete
conntrack helper for l3 routers.

A new configuration option:
  [l3-conntrack-helpers]/allowed_conntrack_helpers
introduced to allow the operator to configure CT
helpers, and the helper protocol constraints.

Related-Bug: #1823633
Depends-On: https://review.opendev.org/663446
Change-Id: I58193955261f50b18b1946261fe662da6b20f0f5
This commit is contained in:
Harald Jensås 2019-03-27 19:44:43 +01:00
parent e16b789257
commit 16679e9700
18 changed files with 904 additions and 21 deletions

View File

@ -0,0 +1,51 @@
# Copyright (c) 2019 Red Hat, Inc.
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron_lib import constants as n_const
from oslo_config import cfg
from neutron._i18n import _
conntrack_helper_opts = [
cfg.ListOpt('allowed_conntrack_helpers',
default=[
{'amanda': n_const.PROTO_NAME_TCP},
{'ftp': n_const.PROTO_NAME_TCP},
{'h323': n_const.PROTO_NAME_UDP},
{'h323': n_const.PROTO_NAME_TCP},
{'irc': n_const.PROTO_NAME_TCP},
{'netbios-ns': n_const.PROTO_NAME_UDP},
{'pptp': n_const.PROTO_NAME_TCP},
{'sane': n_const.PROTO_NAME_TCP},
{'sip': n_const.PROTO_NAME_UDP},
{'sip': n_const.PROTO_NAME_TCP},
{'snmp': n_const.PROTO_NAME_UDP},
{'tftp': n_const.PROTO_NAME_UDP}
],
item_type=cfg.types.Dict(),
sample_default=[
{'tftp': 'udp'},
{'ftp': 'tcp'},
{'sip': 'tcp'},
{'sip': 'udp'}
],
help=_('Defines the allowed conntrack helpers, and '
'conntack helper module protocol constraints.')
)
]
def register_conntrack_helper_opts(cfg=cfg.CONF):
cfg.register_opts(conntrack_helper_opts)

View File

@ -24,6 +24,7 @@ from neutron.conf.policies import flavor
from neutron.conf.policies import floatingip
from neutron.conf.policies import floatingip_pools
from neutron.conf.policies import floatingip_port_forwarding
from neutron.conf.policies import l3_conntrack_helper
from neutron.conf.policies import logging
from neutron.conf.policies import metering
from neutron.conf.policies import network
@ -52,6 +53,7 @@ def list_rules():
floatingip.list_rules(),
floatingip_pools.list_rules(),
floatingip_port_forwarding.list_rules(),
l3_conntrack_helper.list_rules(),
logging.list_rules(),
metering.list_rules(),
network.list_rules(),

View File

@ -0,0 +1,77 @@
# Copyright (c) 2019 Red Hat, Inc.
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_policy import policy
from neutron.conf.policies import base
COLLECTION_PATH = '/routers/{router_id}/conntrack_helpers'
RESOURCE_PATH = ('/routers/{router_id}'
'/conntrack_helpers/{conntrack_helper_id}')
rules = [
policy.DocumentedRuleDefault(
'create_router_conntrack_helper',
base.RULE_ADMIN_OR_PARENT_OWNER,
'Create a router conntrack helper',
[
{
'method': 'POST',
'path': COLLECTION_PATH,
},
]
),
policy.DocumentedRuleDefault(
'get_router_conntrack_helper',
base.RULE_ADMIN_OR_PARENT_OWNER,
'Get a router conntrack helper',
[
{
'method': 'GET',
'path': COLLECTION_PATH,
},
{
'method': 'GET',
'path': RESOURCE_PATH,
},
]
),
policy.DocumentedRuleDefault(
'update_router_conntrack_helper',
base.RULE_ADMIN_OR_PARENT_OWNER,
'Update a router conntrack helper',
[
{
'method': 'PUT',
'path': RESOURCE_PATH,
},
]
),
policy.DocumentedRuleDefault(
'delete_router_conntrack_helper',
base.RULE_ADMIN_OR_PARENT_OWNER,
'Delete a router conntrack helper',
[
{
'method': 'DELETE',
'path': RESOURCE_PATH,
},
]
),
]
def list_rules():
return rules

View File

@ -77,6 +77,21 @@ def filter_fields(f):
return inner_filter
def make_result_with_fields(f):
@functools.wraps(f)
def inner(*args, **kwargs):
fields = kwargs.get('fields')
result = f(*args, **kwargs)
if fields is None:
return result
elif isinstance(result, list):
return [db_utils.resource_fields(r, fields) for r in result]
else:
return db_utils.resource_fields(result, fields)
return inner
class DbBasePluginCommon(object):
"""Stores getters and helper methods for db_base_plugin_v2

View File

@ -0,0 +1,21 @@
# Copyright (c) 2019 Red Hat, Inc.
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron_lib.api.definitions import expose_l3_conntrack_helper as apidef
from neutron_lib.api import extensions
class Expose_l3_conntrack_helper(extensions.APIExtensionDescriptor):
api_definition = apidef

View File

@ -0,0 +1,119 @@
# Copyright (c) 2019 Red Hat, Inc.
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import itertools
from neutron_lib.api.definitions import l3_conntrack_helper as apidef
from neutron_lib.api import extensions as api_extensions
from neutron_lib.plugins import constants as plugin_consts
from neutron_lib.plugins import directory
from neutron_lib.services import base as service_base
import six
from neutron.api import extensions
from neutron.api.v2 import base
from neutron.api.v2 import resource_helper
from neutron.conf.extensions import conntrack_helper as cth_conf
cth_conf.register_conntrack_helper_opts()
class L3_conntrack_helper(api_extensions.APIExtensionDescriptor):
"""Router conntrack helpers API extension."""
api_definition = apidef
@classmethod
def get_plugin_interface(cls):
return ConntrackHelperPluginBase
@classmethod
def get_resources(cls):
"""Returns Ext Resources."""
special_mappings = {'routers': 'router'}
plural_mappings = resource_helper.build_plural_mappings(
special_mappings, itertools.chain(
apidef.RESOURCE_ATTRIBUTE_MAP,
apidef.SUB_RESOURCE_ATTRIBUTE_MAP))
resources = resource_helper.build_resource_info(
plural_mappings,
apidef.RESOURCE_ATTRIBUTE_MAP,
plugin_consts.CONNTRACKHELPER,
translate_name=True,
allow_bulk=True)
plugin = directory.get_plugin(plugin_consts.CONNTRACKHELPER)
parent = apidef.SUB_RESOURCE_ATTRIBUTE_MAP[
apidef.COLLECTION_NAME].get('parent')
params = apidef.SUB_RESOURCE_ATTRIBUTE_MAP[apidef.COLLECTION_NAME].get(
'parameters')
controller = base.create_resource(apidef.COLLECTION_NAME,
apidef.RESOURCE_NAME,
plugin, params,
allow_bulk=True,
parent=parent,
allow_pagination=True,
allow_sorting=True)
resource = extensions.ResourceExtension(
apidef.COLLECTION_NAME,
controller, parent,
attr_map=params)
resources.append(resource)
return resources
@six.add_metaclass(abc.ABCMeta)
class ConntrackHelperPluginBase(service_base.ServicePluginBase):
path_prefix = apidef.API_PREFIX
@classmethod
def get_plugin_type(cls):
return plugin_consts.CONNTRACKHELPER
def get_plugin_description(self):
return "Conntrack Helper Service Plugin"
@abc.abstractmethod
def create_router_conntrack_helper(self, context, router_id,
conntrack_helper):
pass
@abc.abstractmethod
def update_router_conntrack_helper(self, context, id, router_id,
conntrack_helper):
pass
@abc.abstractmethod
def get_router_conntrack_helper(self, context, id, router_id, fields=None):
pass
@abc.abstractmethod
def get_router_conntrack_helpers(self, context, router_id=None,
filters=None, fields=None, sorts=None,
limit=None, marker=None,
page_reverse=False):
pass
@abc.abstractmethod
def delete_router_conntrack_helper(self, context, id, router_id):
pass

View File

@ -40,6 +40,7 @@ import neutron.conf.db.l3_dvr_db
import neutron.conf.db.l3_gwmode_db
import neutron.conf.db.l3_hamode_db
import neutron.conf.extensions.allowedaddresspairs
import neutron.conf.extensions.conntrack_helper
import neutron.conf.plugins.ml2.config
import neutron.conf.plugins.ml2.drivers.agent
import neutron.conf.plugins.ml2.drivers.driver_type
@ -100,8 +101,11 @@ def list_agent_opts():
def list_extension_opts():
return [
('DEFAULT',
neutron.conf.extensions.allowedaddresspairs
.allowed_address_pair_opts),
itertools.chain(
neutron.conf.extensions.allowedaddresspairs
.allowed_address_pair_opts,
neutron.conf.extensions.conntrack_helper.conntrack_helper_opts)
),
('quotas',
itertools.chain(
neutron.conf.quota.l3_quota_opts,

View File

@ -0,0 +1,30 @@
# Copyright (c) 2019 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron._i18n import _
from neutron_lib import exceptions as n_exc
class ConntrackHelperNotFound(n_exc.NotFound):
message = _("Conntrack Helper %(id)s could not be found.")
class ConntrackHelperNotAllowed(n_exc.BadRequest):
message = _("Conntrack Helper %(helper)s is not allowed.")
class InvalidProtocolForHelper(n_exc.BadRequest):
message = _("Conntrack Helper %(helper)s does not support: %(protocol)s. "
"Supported protocols are: %(supported_protocols)s")

View File

@ -0,0 +1,191 @@
# Copyright (c) 2019 Red Hat, Inc.
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
from neutron_lib.api.definitions import expose_l3_conntrack_helper as exposedef
from neutron_lib.api.definitions import l3
from neutron_lib.api.definitions import l3_conntrack_helper as apidef
from neutron_lib.callbacks import registry
from neutron_lib.db import api as db_api
from neutron_lib.db import resource_extend
from neutron_lib import exceptions as lib_exc
from neutron_lib.exceptions import l3 as lib_l3_exc
from neutron_lib.objects import exceptions as obj_exc
from neutron_lib.plugins import constants
from neutron_lib.plugins import directory
from oslo_config import cfg
from oslo_db import exception as oslo_db_exc
from neutron._i18n import _
from neutron.api.rpc.callbacks import events as rpc_events
from neutron.api.rpc.handlers import resources_rpc
from neutron.db import db_base_plugin_common
from neutron.extensions import l3_conntrack_helper
from neutron.objects import base as base_obj
from neutron.objects import conntrack_helper as cth
from neutron.objects import router
from neutron.services.conntrack_helper.common import exceptions as cth_exc
@resource_extend.has_resource_extenders
@registry.has_registry_receivers
class Plugin(l3_conntrack_helper.ConntrackHelperPluginBase):
"""Implementation of the Neutron Conntrack Helper Service Plugin.
This class implements a Conntrack Helper plugin.
"""
required_service_plugins = [l3.ROUTER]
supported_extension_aliases = [apidef.ALIAS, exposedef.ALIAS]
__native_pagination_support = True
__native_sorting_support = True
__filter_validation_support = True
def __init__(self):
super(Plugin, self).__init__()
self.push_api = resources_rpc.ResourcesPushRpcApi()
self.l3_plugin = directory.get_plugin(constants.L3)
self.core_plugin = directory.get_plugin()
# Option allowed_conntrack_helpers is a list of key, value pairs.
# The list can contain same key (conntrack helper module) multiple
# times with a different value (protocol). Merge to a dictonary
# with key (conntrack helper) and values (protocols) as a list.
self.constraints = collections.defaultdict(list)
for x in cfg.CONF.allowed_conntrack_helpers:
self.constraints[next(iter(x.keys()))].append(
next(iter(x.values())))
@staticmethod
@resource_extend.extends([l3.ROUTERS])
def _extend_router_dict(result_dict, db):
fields = [apidef.PROTOCOL, apidef.PORT, apidef.HELPER]
result_dict[apidef.COLLECTION_NAME] = []
if db.conntrack_helpers:
conntack_helper_result = []
for conntack_helper in db.conntrack_helpers:
cth_dict = cth.ConntrackHelper.modify_fields_from_db(
conntack_helper)
for key in list(cth_dict.keys()):
if key not in fields:
cth_dict.pop(key)
conntack_helper_result.append(cth_dict)
result_dict[apidef.COLLECTION_NAME] = conntack_helper_result
return result_dict
def get_router(self, context, router_id, fields=None):
router_obj = router.Router.get_object(context, id=router_id)
if not router_obj:
raise lib_l3_exc.RouterNotFound(router_id=router_id)
return router_obj
def _find_existing_conntrack_helper(self, context, router_id,
conntrack_helper):
# Because the session had been flushed by NeutronDbObjectDuplicateEntry
# so if we want to use the context to get another db queries, we need
# to rollback first.
context.session.rollback()
param = {'router_id': router_id,
'protocol': conntrack_helper['protocol'],
'port': conntrack_helper['port'],
'helper': conntrack_helper['helper']}
objs = cth.ConntrackHelper.get_objects(context, **param)
if objs:
return (objs[0], param)
def _get_conntrack_helper(self, context, id):
cth_obj = cth.ConntrackHelper.get_object(context, id=id)
if not cth_obj:
raise cth_exc.ConntrackHelperNotFound(id=id)
return cth_obj
def _check_conntrack_helper_constraints(self, cth_obj):
if cth_obj.helper not in self.constraints:
raise cth_exc.ConntrackHelperNotAllowed(helper=cth_obj.helper)
elif cth_obj.protocol not in self.constraints[cth_obj.helper]:
raise cth_exc.InvalidProtocolForHelper(
helper=cth_obj.helper, protocol=cth_obj.protocol,
supported_protocols=', '.join(
self.constraints[cth_obj.helper]))
@db_base_plugin_common.convert_result_to_dict
def create_router_conntrack_helper(self, context, router_id,
conntrack_helper):
conntrack_helper = conntrack_helper.get(apidef.RESOURCE_NAME)
conntrack_helper['router_id'] = router_id
cth_obj = cth.ConntrackHelper(context, **conntrack_helper)
self._check_conntrack_helper_constraints(cth_obj)
try:
with db_api.CONTEXT_WRITER.using(context):
# If this get_router does not raise an exception, a router
# with router_id exists.
self.get_router(context, router_id)
cth_obj.create()
except obj_exc.NeutronDbObjectDuplicateEntry:
(__, conflict_params) = self._find_existing_conntrack_helper(
context, router_id, cth_obj.to_dict())
message = _("A duplicate conntrack helper entry with same "
"attributes already exists, conflicting values "
"are %s") % conflict_params
raise lib_exc.BadRequest(resource=apidef.RESOURCE_NAME,
msg=message)
self.push_api.push(context, [cth_obj], rpc_events.CREATED)
return cth_obj
@db_base_plugin_common.convert_result_to_dict
def update_router_conntrack_helper(self, context, id, router_id,
conntrack_helper):
conntrack_helper = conntrack_helper.get(apidef.RESOURCE_NAME)
try:
with db_api.CONTEXT_WRITER.using(context):
cth_obj = self._get_conntrack_helper(context, id)
cth_obj.update_fields(conntrack_helper, reset_changes=True)
self._check_conntrack_helper_constraints(cth_obj)
cth_obj.update()
except oslo_db_exc.DBDuplicateEntry:
(__, conflict_params) = self._find_existing_conntrack_helper(
context, cth_obj.router_id, cth_obj.to_dict())
message = _("A duplicate conntrack helper entry with same "
"attributes already exists, conflicting values "
"are %s") % conflict_params
raise lib_exc.BadRequest(resource=apidef.RESOURCE_NAME,
msg=message)
self.push_api.push(context, [cth_obj], rpc_events.UPDATED)
return cth_obj
@db_base_plugin_common.make_result_with_fields
@db_base_plugin_common.convert_result_to_dict
def get_router_conntrack_helper(self, context, id, router_id, fields=None):
return self._get_conntrack_helper(context, id)
@db_base_plugin_common.make_result_with_fields
@db_base_plugin_common.convert_result_to_dict
def get_router_conntrack_helpers(self, context, router_id=None,
filters=None, fields=None, sorts=None,
limit=None, marker=None,
page_reverse=False):
filters = filters or {}
pager = base_obj.Pager(sorts, limit, page_reverse, marker)
return cth.ConntrackHelper.get_objects(context, _pager=pager,
router_id=router_id, **filters)
def delete_router_conntrack_helper(self, context, id, router_id):
cth_obj = self._get_conntrack_helper(context, id)
with db_api.CONTEXT_WRITER.using(context):
cth_obj.delete()
self.push_api.push(context, [cth_obj], rpc_events.DELETED)

View File

@ -14,7 +14,6 @@
# under the License.
import collections
import functools
import netaddr
from neutron_lib.api.definitions import expose_port_forwarding_in_fip
@ -26,7 +25,6 @@ from neutron_lib.callbacks import resources
from neutron_lib import constants as lib_consts
from neutron_lib.db import api as db_api
from neutron_lib.db import resource_extend
from neutron_lib.db import utils as db_utils
from neutron_lib import exceptions as lib_exc
from neutron_lib.exceptions import l3 as lib_l3_exc
from neutron_lib.objects import exceptions as obj_exc
@ -52,21 +50,6 @@ LOG = logging.getLogger(__name__)
PORT_FORWARDING_FLOATINGIP_KEY = '_pf_floatingips'
def make_result_with_fields(f):
@functools.wraps(f)
def inner(*args, **kwargs):
fields = kwargs.get('fields')
result = f(*args, **kwargs)
if fields is None:
return result
elif isinstance(result, list):
return [db_utils.resource_fields(r, fields) for r in result]
else:
return db_utils.resource_fields(result, fields)
return inner
@resource_extend.has_resource_extenders
@registry.has_registry_receivers
class PortForwardingPlugin(fip_pf.PortForwardingPluginBase):
@ -464,7 +447,7 @@ class PortForwardingPlugin(fip_pf.PortForwardingPluginBase):
raise lib_l3_exc.FloatingIPNotFound(floatingip_id=fip_id)
return fip_obj
@make_result_with_fields
@db_base_plugin_common.make_result_with_fields
@db_base_plugin_common.convert_result_to_dict
def get_floatingip_port_forwarding(self, context, id, floatingip_id,
fields=None):
@ -482,7 +465,7 @@ class PortForwardingPlugin(fip_pf.PortForwardingPluginBase):
raise pf_exc.PortForwardingNotSupportFilterField(
filter=filter_member_key)
@make_result_with_fields
@db_base_plugin_common.make_result_with_fields
@db_base_plugin_common.convert_result_to_dict
def get_floatingip_port_forwardings(self, context, floatingip_id=None,
filters=None, fields=None, sorts=None,

View File

@ -22,6 +22,7 @@ NETWORK_API_EXTENSIONS+=",fip-port-details"
NETWORK_API_EXTENSIONS+=",flavors"
NETWORK_API_EXTENSIONS+=",floatingip-pools"
NETWORK_API_EXTENSIONS+=",ip-substring-filtering"
NETWORK_API_EXTENSIONS+=",l3-conntrack-helper"
NETWORK_API_EXTENSIONS+=",l3-flavors"
NETWORK_API_EXTENSIONS+=",l3-ha"
NETWORK_API_EXTENSIONS+=",l3_agent_scheduler"

View File

@ -0,0 +1,126 @@
# Copyright (c) 2019 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from neutron_lib.api.definitions import l3_conntrack_helper as apidef
from neutron_lib import exceptions as lib_exc
from neutron_lib.exceptions import l3 as lib_l3_exc
from neutron_lib.plugins import directory
from oslo_utils import uuidutils
from neutron.services.conntrack_helper.common import exceptions as cth_exc
from neutron.services.conntrack_helper import plugin as cth_plugin
from neutron.tests.functional import base as functional_base
from neutron.tests.unit.plugins.ml2 import base as ml2_test_base
INVALID_ID = uuidutils.generate_uuid()
class ConntrackHelperTestCase(ml2_test_base.ML2TestFramework,
functional_base.BaseLoggingTestCase):
def setUp(self):
super(ConntrackHelperTestCase, self).setUp()
self.cth_plugin = cth_plugin.Plugin()
directory.add_plugin("CONNTRACKHELPER", self.cth_plugin)
self.router = self._create_router(distributed=True)
self.conntack_helper = {
apidef.RESOURCE_NAME:
{apidef.PROTOCOL: 'udp',
apidef.PORT: 69,
apidef.HELPER: 'tftp'}
}
def test_create_conntrack_helper(self):
res = self.cth_plugin.create_router_conntrack_helper(
self.context, self.router['id'], self.conntack_helper)
expected = {
'id': mock.ANY,
'protocol': 'udp',
'port': 69,
'helper': 'tftp',
'router_id': self.router['id']
}
self.assertEqual(expected, res)
def test_negative_duplicate_create_conntrack_helper(self):
self.cth_plugin.create_router_conntrack_helper(
self.context, self.router['id'], self.conntack_helper)
self.assertRaises(lib_exc.BadRequest,
self.cth_plugin.create_router_conntrack_helper,
self.context, self.router['id'],
self.conntack_helper)
def test_negative_create_conntrack_helper(self):
self.assertRaises(lib_l3_exc.RouterNotFound,
self.cth_plugin.create_router_conntrack_helper,
self.context, INVALID_ID,
self.conntack_helper)
def test_update_conntrack_helper(self):
res = self.cth_plugin.create_router_conntrack_helper(
self.context, self.router['id'], self.conntack_helper)
new_conntack_helper = {
apidef.RESOURCE_NAME:
{apidef.PROTOCOL: 'udp',
apidef.PORT: 6969,
apidef.HELPER: 'tftp'}
}
update = self.cth_plugin.update_router_conntrack_helper(
self.context, res['id'], self.router['id'], new_conntack_helper)
expected = {
'id': res['id'],
'protocol': 'udp',
'port': 6969,
'helper': 'tftp',
'router_id': self.router['id']
}
self.assertEqual(expected, update)
def test_negative_update_conntrack_helper(self):
self.assertRaises(cth_exc.ConntrackHelperNotFound,
self.cth_plugin.update_router_conntrack_helper,
self.context, INVALID_ID, self.router['id'], {})
def test_negative_duplicate_update_conntrack_helper(self):
self.cth_plugin.create_router_conntrack_helper(
self.context, self.router['id'], self.conntack_helper)
new_conntack_helper = {
apidef.RESOURCE_NAME:
{apidef.PROTOCOL: 'udp',
apidef.PORT: 6969,
apidef.HELPER: 'tftp'}
}
res = self.cth_plugin.create_router_conntrack_helper(
self.context, self.router['id'], new_conntack_helper)
new_conntack_helper[apidef.RESOURCE_NAME][apidef.PORT] = 69
self.assertRaises(lib_exc.BadRequest,
self.cth_plugin.update_router_conntrack_helper,
self.context, res['id'], self.router['id'],
new_conntack_helper)
def test_delete_conntrack_helper(self):
res = self.cth_plugin.create_router_conntrack_helper(
self.context, self.router['id'], self.conntack_helper)
delete = self.cth_plugin.delete_router_conntrack_helper(
self.context, res['id'], self.router['id'])
self.assertIsNone(delete)
def test_negative_delete_conntrack_helper(self):
self.assertRaises(cth_exc.ConntrackHelperNotFound,
self.cth_plugin.delete_router_conntrack_helper,
self.context, INVALID_ID, self.router['id'])

View File

@ -0,0 +1,262 @@
# Copyright (c) 2019 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from neutron_lib import context
from neutron_lib import exceptions as lib_exc
from neutron_lib.objects import exceptions as obj_exc
from neutron_lib.plugins import directory
from oslo_config import cfg
from neutron.api.rpc.callbacks.consumer import registry as cons_registry
from neutron.api.rpc.callbacks import events as rpc_events
from neutron.api.rpc.callbacks.producer import registry as prod_registry
from neutron.api.rpc.callbacks import resource_manager
from neutron.api.rpc.handlers import resources_rpc
from neutron import manager
from neutron.objects import conntrack_helper
from neutron.services.conntrack_helper.common import exceptions as cth_exc
from neutron.services.conntrack_helper import plugin as cth_plugin
from neutron.tests.unit import testlib_api
DB_PLUGIN_KLASS = 'neutron.db.db_base_plugin_v2.NeutronDbPluginV2'
class TestConntrackHelperPlugin(testlib_api.SqlTestCase):
def setUp(self):
super(TestConntrackHelperPlugin, self).setUp()
with mock.patch.object(
resource_manager.ResourceCallbacksManager, '_singleton',
new_callable=mock.PropertyMock(return_value=False)):
self.cons_mgr = resource_manager.ConsumerResourceCallbacksManager()
self.prod_mgr = resource_manager.ProducerResourceCallbacksManager()
for mgr in (self.cons_mgr, self.prod_mgr):
mgr.clear()
mock.patch.object(
cons_registry, '_get_manager', return_value=self.cons_mgr).start()
mock.patch.object(
prod_registry, '_get_manager', return_value=self.prod_mgr).start()
self.setup_coreplugin(load_plugins=False)
mock.patch('neutron.objects.db.api.create_object').start()
mock.patch('neutron.objects.db.api.update_object').start()
mock.patch('neutron.objects.db.api.delete_object').start()
mock.patch('neutron.objects.db.api.get_object').start()
# We don't use real models as per mocks above. We also need to mock-out
# methods that work with real data types
mock.patch(
'neutron.objects.base.NeutronDbObject.modify_fields_from_db'
).start()
cfg.CONF.set_override("core_plugin", DB_PLUGIN_KLASS)
cfg.CONF.set_override("service_plugins", ["router",
"conntrack_helper"])
manager.init()
# TODO(hjensas): Add CONNTRACKHELPER to neutron-lib Well-known
# service type constants.
self.cth_plugin = directory.get_plugin("CONNTRACKHELPER")
self.ctxt = context.Context('admin', 'fake_tenant')
mock.patch.object(self.ctxt.session, 'refresh').start()
mock.patch.object(self.ctxt.session, 'expunge').start()
@mock.patch.object(resources_rpc.ResourcesPushRpcApi, 'push')
@mock.patch.object(cth_plugin.Plugin, 'get_router')
@mock.patch('neutron.objects.conntrack_helper.ConntrackHelper')
def test_create_conntrack_helper(self, mock_conntrack_helper,
mock_get_router, mock_push_api):
cth_input = {
'conntrack_helper': {
'conntrack_helper': {
'protocol': 'udp',
'port': 69,
'helper': 'tftp'}
}
}
cth_obj = mock.Mock()
cth_obj.helper = 'tftp'
cth_obj.protocol = 'udp'
cth_obj.port = 69
router_obj = mock.Mock()
router_obj.id = 'faker-router-id'
mock_get_router.return_value = router_obj
mock_conntrack_helper.return_value = cth_obj
self.cth_plugin.create_router_conntrack_helper(
self.ctxt, router_obj.id, **cth_input)
mock_conntrack_helper.assert_called_once_with(
self.ctxt, **cth_input['conntrack_helper']['conntrack_helper'])
self.assertTrue(cth_obj.create.called)
mock_push_api.assert_called_once_with(
self.ctxt, mock.ANY, rpc_events.CREATED)
@mock.patch.object(cth_plugin.Plugin, '_find_existing_conntrack_helper')
@mock.patch.object(cth_plugin.Plugin, 'get_router')
@mock.patch('neutron.objects.conntrack_helper.ConntrackHelper')
def test_negative_create_conntrack_helper(self, mock_conntrack_helper,
mock_get_router,
mock_find_existing):
cth_input = {
'conntrack_helper': {
'protocol': 'udp',
'port': '69',
'helper': 'tftp'}
}
cth_obj = mock.Mock()
router_obj = mock.Mock()
router_obj.id = 'faker-router-id'
mock_get_router.return_value = router_obj
mock_conntrack_helper.return_value = cth_obj
cth_obj.create.side_effect = obj_exc.NeutronDbObjectDuplicateEntry(
mock.Mock(), mock.Mock())
mock_find_existing.return_value = ('cth_obj', 'conflict_param')
self.assertRaises(
lib_exc.BadRequest,
self.cth_plugin.create_router_conntrack_helper,
self.ctxt, router_obj.id, cth_input)
@mock.patch.object(cth_plugin.Plugin, '_find_existing_conntrack_helper')
@mock.patch.object(cth_plugin.Plugin, 'get_router')
@mock.patch('neutron.objects.conntrack_helper.ConntrackHelper')
def test_negative_create_helper_not_allowed(
self, mock_conntrack_helper, mock_get_router,
mock_find_existing):
cth_input = {
'conntrack_helper': {
'protocol': 'udp',
'port': 70,
'helper': 'foo'}
}
cth_obj = mock.Mock()
cth_obj.helper = cth_input['conntrack_helper']['helper']
cth_obj.protocol = cth_input['conntrack_helper']['protocol']
cth_obj.port = cth_input['conntrack_helper']['port']
router_obj = mock.Mock()
router_obj.id = 'faker-router-id'
mock_get_router.return_value = router_obj
mock_conntrack_helper.return_value = cth_obj
self.assertRaises(
cth_exc.ConntrackHelperNotAllowed,
self.cth_plugin.create_router_conntrack_helper,
self.ctxt, router_obj.id, cth_input)
@mock.patch.object(cth_plugin.Plugin, '_find_existing_conntrack_helper')
@mock.patch.object(cth_plugin.Plugin, 'get_router')
@mock.patch('neutron.objects.conntrack_helper.ConntrackHelper')
def test_negative_create_helper_invalid_proto_for_helper(
self, mock_conntrack_helper, mock_get_router,
mock_find_existing):
cth_input = {
'conntrack_helper': {
'protocol': 'tcp',
'port': 69,
'helper': 'tftp'}
}
cth_obj = mock.Mock()
cth_obj.helper = cth_input['conntrack_helper']['helper']
cth_obj.protocol = cth_input['conntrack_helper']['protocol']
cth_obj.port = cth_input['conntrack_helper']['port']
router_obj = mock.Mock()
router_obj.id = 'faker-router-id'
mock_get_router.return_value = router_obj
mock_conntrack_helper.return_value = cth_obj
self.assertRaises(
cth_exc.InvalidProtocolForHelper,
self.cth_plugin.create_router_conntrack_helper,
self.ctxt, router_obj.id, cth_input)
@mock.patch.object(resources_rpc.ResourcesPushRpcApi, 'push')
@mock.patch.object(conntrack_helper.ConntrackHelper, 'get_object')
def test_update_conntrack_helper(self, mock_cth_get_object, mock_rpc_push):
cth_input = {
'conntrack_helper': {
'conntrack_helper': {
'protocol': 'udp',
'port': 69,
'helper': 'tftp'}
}
}
cth_obj = mock.Mock()
cth_obj.helper = 'tftp'
cth_obj.protocol = 'udp'
mock_cth_get_object.return_value = cth_obj
self.cth_plugin.update_router_conntrack_helper(
self.ctxt, 'cth_id', mock.ANY, **cth_input)
mock_cth_get_object.assert_called_once_with(self.ctxt, id='cth_id')
self.assertTrue(cth_obj.update_fields)
self.assertTrue(cth_obj.update)
mock_rpc_push.assert_called_once_with(
self.ctxt, mock.ANY, rpc_events.UPDATED)
@mock.patch.object(conntrack_helper.ConntrackHelper, 'get_object')
def test_negative_update_conntrack_helper(self, mock_cth_get_object):
cth_input = {
'conntrack_helper': {
'conntrack_helper': {
'protocol': 'udp',
'port': 69,
'helper': 'tftp'}
}
}
mock_cth_get_object.return_value = None
self.assertRaises(
cth_exc.ConntrackHelperNotFound,
self.cth_plugin.update_router_conntrack_helper,
self.ctxt, 'cth_id', mock.ANY, **cth_input)
@mock.patch.object(conntrack_helper.ConntrackHelper, 'get_object')
def test_get_conntrack_helper(self, get_object_mock):
self.cth_plugin.get_router_conntrack_helper(
self.ctxt, 'cth_id', mock.ANY, fields=None)
get_object_mock.assert_called_once_with(self.ctxt, id='cth_id')
@mock.patch.object(conntrack_helper.ConntrackHelper, 'get_object')
def test_negative_get_conntrack_helper(self, get_object_mock):
get_object_mock.return_value = None
self.assertRaises(
cth_exc.ConntrackHelperNotFound,
self.cth_plugin.get_router_conntrack_helper,
self.ctxt, 'cth_id', mock.ANY, fields=None)
@mock.patch.object(conntrack_helper.ConntrackHelper, 'get_objects')
def test_get_conntrack_helpers(self, get_objects_mock):
self.cth_plugin.get_router_conntrack_helpers(self.ctxt)
get_objects_mock.assert_called_once_with(self.ctxt, _pager=mock.ANY,
router_id=None)
@mock.patch.object(resources_rpc.ResourcesPushRpcApi, 'push')
@mock.patch.object(conntrack_helper.ConntrackHelper, 'get_object')
def test_delete_conntrack_helper(self, get_object_mock, mock_rpc_push):
cth_obj = mock.Mock(id='cth_id',
router_id='fake-router',
protocol='udp',
port=69,
helper='tftp')
get_object_mock.return_value = cth_obj
self.cth_plugin.delete_router_conntrack_helper(self.ctxt, 'cth_id',
mock.ANY)
cth_obj.delete.assert_called()
mock_rpc_push.assert_called_once_with(
self.ctxt, mock.ANY, rpc_events.DELETED)
@mock.patch.object(conntrack_helper.ConntrackHelper, 'get_object')
def test_negative_delete_conntrack_helper(self, get_object_mock):
get_object_mock.return_value = None
self.assertRaises(cth_exc.ConntrackHelperNotFound,
self.cth_plugin.delete_router_conntrack_helper,
self.ctxt, 'cth_id', mock.ANY)

View File

@ -77,6 +77,7 @@ neutron.service_plugins =
log = neutron.services.logapi.logging_plugin:LoggingPlugin
port_forwarding = neutron.services.portforwarding.pf_plugin:PortForwardingPlugin
placement = neutron.services.placement_report.plugin:PlacementReportPlugin
conntrack_helper = neutron.services.conntrack_helper.plugin:Plugin
neutron.ml2.type_drivers =
flat = neutron.plugins.ml2.drivers.type_flat:FlatTypeDriver
local = neutron.plugins.ml2.drivers.type_local:LocalTypeDriver