Version 0 Patch

- All classification types expressed in a single
   resource.
 - Classifications can be grouped inside classification
   groups resource which can be consumed by other
   neutron resources.
 - A list of supported classifications exists through
   classification type list resource.
 - Switch to stestr

Change-Id: I05acaf6419c294692ff270ad4a488a5e68749d67
Co-Authored-By: David Shaughnessy <david.shaughnessy@intel.com>
Co-Authored-By: Nakul Dahiwade <nakul.dahiwade@intel.com>
This commit is contained in:
Thaynara Silva 2017-07-25 16:28:04 +00:00 committed by Nakul Dahiwade
parent 28f020f592
commit 133d5be638
33 changed files with 1570 additions and 107 deletions

1
.gitignore vendored
View File

@ -28,6 +28,7 @@ pip-log.txt
nosetests.xml
.testrepository
.venv
.stestr/
# Translations
*.mo

View File

@ -1,3 +1,3 @@
[DEFAULT]
test_path=${OS_TEST_PATH:-./neutron-classifier/tests/functional}
test_path=${OS_TEST_PATH:-./neutron_classifier/tests/unit}
top_dir=./

View File

@ -1,7 +0,0 @@
[DEFAULT]
test_command=OS_STDOUT_CAPTURE=${OS_STDOUT_CAPTURE:-1} \
OS_STDERR_CAPTURE=${OS_STDERR_CAPTURE:-1} \
OS_TEST_TIMEOUT=${OS_TEST_TIMEOUT:-60} \
${PYTHON:-python} -m subunit.run discover -t ./ ${OS_TEST_PATH:-./neutron_classifier/tests/unit} $LISTOPT $IDOPTION
test_id_option=--load-list $IDFILE
test_list_option=--list

View File

@ -0,0 +1,18 @@
# Copyright (c) 2017 Intel Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
validators_dict = {'ethertype': [],
'src_addr': [],
'dst_addr': [],
}

View File

@ -19,21 +19,29 @@ neutron-classifier exception handling.
from neutron_lib import exceptions as nexceptions
class InvalidEthernetClassifier(nexceptions.NeutronException):
message = ('Invalid ethernet classifier value for %(eth_type)s.')
class ConsumedClassification(nexceptions.NeutronException):
message = ("""One or more classification is already being consumed
and can't be used or deleted.""")
class EthertypeConflictWithProtocol(nexceptions.NeutronException):
message = ("Invalid ethertype %(ethertype)s for protocol %(protocol)s.")
class InvalidClassificationGroupId(nexceptions.NeutronException):
message = ("One or more id is not a valid classification group id.")
class IpAddressConflict(nexceptions.NeutronException):
message = ("IP address do not agree with the given IP Version.")
class InvalidClassificationId(nexceptions.NeutronException):
message = ("One or more id is not a valid classification id")
class InvalidICMPParameter(nexceptions.NeutronException):
message = ("%(param)s are not allowed when protocol is set to ICMP.")
class ConsumedClassificationGroup(nexceptions.NeutronException):
message = ("""One or more classification group is being consumed
and can't be deleted.""")
class InvalidPortRange(nexceptions.NeutronException):
message = ("Invalid port range %(port_range).")
class InvalidUpdateRequest(nexceptions.NeutronException):
message = ("""The update request is invalid. Only the name and description
can be updated.""")
class InvalidClassificationDefintion(nexceptions.NeutronException):
message = ("""The classification definition(fields) is incorrect.
Please check the valid fields for the classification.""")

View File

@ -0,0 +1,27 @@
# Copyright (c) 2017 Intel Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
validators_dict = {'dscp': [],
'dscp_mask': [],
'ecn': [],
'length_min': [],
'length_max': [],
'flags': [],
'flags_mask': [],
'ttl_min': [],
'ttl_max': [],
'protocol': [],
'src_addr': [],
'dst_addr': [],
}

View File

@ -0,0 +1,25 @@
# Copyright (c) 2017 Intel Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
validators_dict = {'dscp': [],
'dscp_mask': [],
'ecn': [],
'length_min': [],
'length_max': [],
'next_header': [],
'hops_min': [],
'hops_max': [],
'src_addr': [],
'dst_addr': [],
}

View File

@ -0,0 +1,110 @@
# Copyright 2017 Intel Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron_lib.api import converters
from neutron_lib.db import constants as const
def validate_string(String):
if String is None:
String = ''
return String
CLASSIFICATION_GROUP_RESOURCE_MAP = {
'id': {
'allow_post': False, 'allow_put': False,
'validate': {'type:uuid': None},
'is_visible': True, 'primary_key': True},
'name': {
'allow_post': True, 'allow_put': True,
'is_visible': True, 'default': '',
'validate': {'type:string': const.NAME_FIELD_SIZE}},
'description': {
'allow_post': True, 'allow_put': True,
'is_visible': True, 'default': '',
'validate': {'type:string': const.DESCRIPTION_FIELD_SIZE},
'convert_to': validate_string},
'project_id': {
'allow_post': True, 'allow_put': False,
'required_by_policy': True,
'validate': {'type:string': const.PROJECT_ID_FIELD_SIZE},
'is_visible': True},
'shared': {
'allow_post': True, 'allow_put': True,
'is_visible': True, 'default': False,
'convert_to': converters.convert_to_boolean},
'operator': {
'allow_post': True, 'allow_put': True,
'is_visible': True, 'default': 'and',
'validate': {'type:string': const.NAME_FIELD_SIZE},
'convert_to': validate_string},
'classifications': {
'allow_post': True, 'allow_put': True,
'is_visible': True, 'default': [],
'convert_to': converters.convert_to_list},
'cg_ids': {
'allow_post': True, 'allow_put': True,
'is_visible': True, 'default': []},
}
CLASSIFICATION_RESOURCE_MAP = {
'id': {
'allow_post': False, 'allow_put': False,
'validate': {'type:uuid': None},
'is_visible': True, 'primary_key': True},
'name': {
'allow_post': True, 'allow_put': True,
'is_visible': True, 'default': None,
'validate': {'type:string': const.NAME_FIELD_SIZE},
'convert_to': validate_string},
'description': {
'allow_post': True, 'allow_put': True,
'is_visible': True, 'default': None,
'validate': {'type:string': const.DESCRIPTION_FIELD_SIZE},
'convert_to': validate_string},
'project_id': {
'allow_post': True, 'allow_put': False,
'required_by_policy': True,
'validate': {'type:string': const.PROJECT_ID_FIELD_SIZE},
'is_visible': True},
'shared': {
'allow_post': True, 'allow_put': True,
'is_visible': True, 'default': False,
'convert_to': converters.convert_to_boolean},
'c_type': {
'allow_post': True, 'allow_put': True,
'is_visible': True, 'default': None,
'validate': {'type:string': const.NAME_FIELD_SIZE},
'convert_to': validate_string},
'negated': {
'allow_post': True, 'allow_put': True,
'is_visible': True, 'default': False,
'convert_to': converters.convert_to_boolean},
'definition': {
'allow_post': True, 'allow_put': True,
'is_visible': True,
'convert_to': converters.convert_none_to_empty_dict},
}
CLASSIFICATION_TYPE_RESOURCE_MAP = {
'type': {
'allow_post': True, 'allow_put': True,
'is_visible': True, 'default': None,
'validate': {'type:string': const.NAME_FIELD_SIZE},
'convert_to': validate_string},
'supported_parameters': {
'allow_post': True, 'allow_put': True,
'is_visible': True, 'default': [],
'convert_to': converters.convert_to_list},
}

View File

@ -0,0 +1,23 @@
# Copyright (c) 2017 Intel Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
validators_dict = {'src_port_min': [],
'src_port_max': [],
'dst_port_min': [],
'dst_port_max': [],
'flags': [],
'flags_mask': [],
'window_min': [],
'window_max': []
}

View File

@ -0,0 +1,21 @@
# Copyright (c) 2017 Intel Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
validators_dict = {'src_port_min': [],
'src_port_max': [],
'dst_port_min': [],
'dst_port_max': [],
'length_min': [],
'length_max': [],
}

View File

@ -0,0 +1,68 @@
# Copyright (c) 2017 Intel Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron_classifier.common import eth_validators
from neutron_classifier.common import exceptions
from neutron_classifier.common import ipv4_validators
from neutron_classifier.common import ipv6_validators
from neutron_classifier.common import tcp_validators
from neutron_classifier.common import udp_validators
from neutron_classifier.db import models
from neutron_classifier.objects import classifications
from neutron.db import api as db_api
type_validators = {}
type_validators['ethernet'] = eth_validators.validators_dict
type_validators['ipv4'] = ipv4_validators.validators_dict
type_validators['ipv6'] = ipv6_validators.validators_dict
type_validators['tcp'] = tcp_validators.validators_dict
type_validators['udp'] = udp_validators.validators_dict
def check_valid_classifications(context, cs):
for c_id in cs:
c_model = classifications.ClassificationBase
c = c_model.get_object(context, id=c_id)
c_type_clas = classifications.CLASS_MAP[c.c_type]
classification = c_type_clas.get_object(context, id=c_id)
if not classification or (classification.id != c_id):
raise exceptions.InvalidClassificationId()
def check_valid_classification_groups(context, cgs):
for cg_id in cgs:
cg = models._read_classification_group(context, cg_id)
if not cg or (cg.id != cg_id):
raise exceptions.InvalidClassificationGroupId()
def check_can_delete_classification_group(context, cg_id):
"""Checks whether a classification group can be deleted.
Here we are checking whether a classification group is a child of another
classification group, meaning is already mapped to a parent classification
group. In that case we cannot delete it and will raise an exception.
"""
cgs = classifications.ClassificationGroup.get_objects(context)
for cg in cgs:
with db_api.context_manager.writer.using(context):
cg_obj = classifications.ClassificationGroup.get_object(context,
id=cg.id)
mapped_cgs = classifications._get_mapped_classification_groups(
context, cg_obj)
if cg_id in [mcg.id for mcg in mapped_cgs]:
raise exceptions.ConsumedClassificationGroup()
return True

View File

@ -0,0 +1,115 @@
# Copyright 2017 Intel Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log as logging
from oslo_utils import uuidutils
from neutron.db import api as db_api
from neutron.db import common_db_mixin
from neutron.objects import base as base_obj
from neutron_classifier.common import exceptions
from neutron_classifier.common import validators
from neutron_classifier.objects import classifications
LOG = logging.getLogger(__name__)
class TrafficClassificationGroupPlugin(common_db_mixin.CommonDbMixin):
def __init__(self):
super(TrafficClassificationGroupPlugin, self).__init__()
def create_classification_group(self, context, classification_group):
details = classification_group['classification_group']
if details['classifications']:
validators.check_valid_classifications(context,
details['classifications'])
if details['classification_groups']:
validators.check_valid_classification_groups(
context, details['classification_groups'])
details['id'] = uuidutils.generate_uuid()
mappings = {'c_ids': details['classifications'],
'cg_ids': details['classification_groups']}
db_dict = details
cg = classifications.ClassificationGroup(context, **details)
with db_api.context_manager.writer.using(context):
cg.create()
db_dict['id'] = cg.id
with db_api.context_manager.writer.using(context):
for cl in mappings['c_ids']:
cg_c_mapping = classifications.CGToClassificationMapping(
context,
container_cg_id=cg.id,
stored_classification_id=cl)
cg_c_mapping.create()
for cg_id in mappings['cg_ids']:
cg_cg_mapping = classifications.CGToClassificationGroupMapping(
context,
container_cg_id=cg.id,
stored_cg_id=cg_id
)
cg_cg_mapping.create()
db_dict['classifications'] = details['classifications']
db_dict['classification_group'] = details['classification_groups']
return db_dict
def delete_classification_group(self, context, classification_group_id):
if validators.check_can_delete_classification_group(
context, classification_group_id):
cg = classifications.ClassificationGroup.get_object(
context, id=classification_group_id)
with db_api.context_manager.writer.using(context):
cg.delete()
def update_classification_group(self, context, classification_group_id,
fields_to_update):
field_keys = list(fields_to_update.keys())
valid_keys = ['name', 'description']
for key in field_keys:
if key not in valid_keys:
raise exceptions.InvalidUpdateRequest()
with db_api.context_manager.writer.using(context):
cg = classifications.ClassificationGroup.update_object(
context, fields_to_update, id=classification_group_id)
return cg
def _make_db_dict(self, obj):
db_dict = {'classification_group': {}}
for key in obj.fields.keys():
db_dict['classification_group'][key] = obj[key]
return db_dict
def get_classification_group(self, context, classification_group_id):
with db_api.context_manager.writer.using(context):
cg = classifications.ClassificationGroup.get_object(
context, id=classification_group_id)
db_dict = self._make_db_dict(cg)
db_dict['classification_group']['classifications'] =\
classifications._get_mapped_classifications(context, cg)
db_dict['classification_group']['classification_groups'] = \
classifications._get_mapped_classification_groups(context, cg)
return db_dict
def get_classification_groups(self, context, sorts=None, limit=None,
marker=None, page_reverse=False):
pager = base_obj.Pager(sorts, limit, page_reverse, marker)
cgs = classifications.ClassificationGroup.get_objects(context,
_pager=pager)
return cgs

View File

@ -41,6 +41,26 @@ def upgrade():
sa.Column('operator', sa.Enum("AND", "OR", name="operator_types"),
nullable=False))
op.create_table(
'classificationgrouprbacs',
sa.Column('id', sa.String(length=36), primary_key=True,
nullable=False),
sa.Column('project_id', sa.String(length=255)),
sa.Column('target_tenant', sa.String(length=255),
nullable=False),
sa.Column('action', sa.String(length=255), nullable=False),
sa.Column('object_id', sa.String(length=36),
nullable=False),
sa.ForeignKeyConstraint(['object_id'],
['classification_groups.id'],
ondelete='CASCADE'),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('target_tenant',
'object_id', 'action'))
op.create_index(op.f('ix_classificationgrouprbacs_project_id'),
'classificationgrouprbacs',
['project_id'], unique=False)
op.create_table(
'classifications',
sa.Column('id', sa.String(length=36), primary_key=True),

View File

@ -1,5 +1,4 @@
# Copyright (c) 2015 Mirantis, Inc.
# Copyright 2017 Intel Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@ -20,7 +19,6 @@ import sqlalchemy as sa
from sqlalchemy import orm
# Service plugin models
class ClassificationGroup(model_base.BASEV2, model_base.HasId,
model_base.HasProject):
__tablename__ = 'classification_groups'

View File

@ -0,0 +1,139 @@
# Copyright 2017 Intel Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from abc import ABCMeta
from abc import abstractmethod
import six
from oslo_config import cfg
from neutron_lib.api import extensions as api_ext
from neutron_lib.services import base as service_base
from neutron.api import extensions as ext
from neutron.api.v2 import resource_helper
from neutron_classifier.common import resources as classifier_resources
from neutron_classifier import extensions
cfg.CONF.import_opt('api_extensions_path', 'neutron.common.config')
ext.append_api_extensions_path(extensions.__path__)
EXT_NAME = "neutron_classifier"
def validate_string(String):
if String is None:
String = ''
return String
RESOURCE_ATTRIBUTE_MAP = {
'classification_type':
classifier_resources.CLASSIFICATION_TYPE_RESOURCE_MAP,
'classification_groups':
classifier_resources.CLASSIFICATION_GROUP_RESOURCE_MAP,
'classifications':
classifier_resources.CLASSIFICATION_RESOURCE_MAP,
}
class Classification(api_ext.ExtensionDescriptor):
"""Classification API extension."""
@classmethod
def get_name(cls):
return "Neutron Classifier"
@classmethod
def get_alias(cls):
return "neutron_classifier"
@classmethod
def get_description(cls):
return "Extension that provides a common classification framework."
@classmethod
def get_updated(cls):
return "2015-07-12T10:00:00-00:00"
@classmethod
def get_plugin_interface(cls):
return extensions.classification.NeutronClassificationPluginBase
@classmethod
def get_resources(cls):
"""Returns Ext Resources."""
special_mappings = {}
plural_mappings = resource_helper.build_plural_mappings(
special_mappings, RESOURCE_ATTRIBUTE_MAP)
resources = resource_helper.build_resource_info(plural_mappings,
RESOURCE_ATTRIBUTE_MAP,
EXT_NAME,
translate_name=False,
allow_bulk=True)
for resource in resources:
resource.path_prefix = '/classifications'
return resources
def update_attributes_map(self, attributes, extension_attrs_map=None):
super(Classification, self).update_attributes_map(
attributes, extension_attrs_map=RESOURCE_ATTRIBUTE_MAP)
def get_extended_resources(self, version):
return RESOURCE_ATTRIBUTE_MAP
@six.add_metaclass(ABCMeta)
class NeutronClassificationPluginBase(service_base.ServicePluginBase):
path_prefix = '/classifications'
def get_plugin_name(self):
return EXT_NAME
def get_plugin_type(self):
return EXT_NAME
def get_plugin_description(self):
return 'Neutron Classifier service plugin.'
@abstractmethod
def create_classification(self, context, classification):
pass
@abstractmethod
def update_classification(self, context, classification_id,
classification):
pass
@abstractmethod
def delete_classification(self, context, classification_id):
pass
@abstractmethod
def get_classification(self, context, classification_id):
pass
@abstractmethod
def get_classifications(self, context, **kwargs):
pass
@abstractmethod
def get_classification_type(self, context, **kwargs):
pass

View File

@ -18,10 +18,10 @@ import six
from oslo_versionedobjects import base as obj_base
from oslo_versionedobjects import fields as obj_fields
from neutron.db import api as db_api
from neutron.objects import base
from neutron.objects import common_types
from neutron.objects import rbac_db
from neutron_lib.db import api as db_api
from neutron_classifier.db import models
from neutron_classifier.db.rbac_db_models import ClassificationGroupRBAC

View File

View File

@ -0,0 +1,176 @@
# Copyright 2017 Intel Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log as logging
from neutron.db import api as db_api
from neutron.objects import base as base_obj
from neutron_classifier.common import exceptions
from neutron_classifier.common import validators
from neutron_classifier.db import classification as c_db
from neutron_classifier.extensions import classification
from neutron_classifier.objects import classification_type as type_obj
from neutron_classifier.objects import classifications as class_group
LOG = logging.getLogger(__name__)
class ClassificationPlugin(classification.NeutronClassificationPluginBase,
c_db.TrafficClassificationGroupPlugin):
supported_extension_aliases = ['neutron_classifier']
def __init__(self):
super(ClassificationPlugin, self).__init__()
self.driver_manager = None
def create_classification(self, context, classification):
details = self.break_out_headers(classification)
c_type = details['c_type']
headers = classification['classification']['definition']
for key in headers:
if key not in validators.type_validators[c_type].keys():
raise exceptions.InvalidClassificationDefintion()
cl = class_group.CLASS_MAP[c_type](context, **details)
with db_api.context_manager.writer.using(context):
cl.create()
db_dict = self.merge_header(cl)
db_dict['id'] = cl['id']
return db_dict
def delete_classification(self, context, classification_id):
cl = class_group.ClassificationBase.get_object(context,
id=classification_id)
cl_class = class_group.CLASS_MAP[cl.c_type]
classification = cl_class.get_object(context, id=classification_id)
validators.check_valid_classifications(context,
[classification_id])
with db_api.context_manager.writer.using(context):
classification.delete()
def update_classification(self, context, classification_id,
fields_to_update):
field_keys = list(fields_to_update.keys())
valid_keys = ['name', 'description']
for key in field_keys:
if key not in valid_keys:
raise exceptions.InvalidUpdateRequest()
cl = class_group.ClassificationBase.get_object(context,
id=classification_id)
cl_class = class_group.CLASS_MAP[cl.c_type]
with db_api.context_manager.writer.using(context):
classification = cl_class.update_object(
context, fields_to_update, id=classification_id)
return classification
def get_classification(self, context, classification_id, fields=None):
cl = class_group.ClassificationBase.get_object(context,
id=classification_id)
cl_class = class_group.CLASS_MAP[cl.c_type]
classification = cl_class.get_object(context, id=classification_id)
clas = self.merge_header(classification)
return clas
def get_classifications(self, context, filters=None, fields=None,
sorts=None, limit=None, marker=None,
page_reverse=False):
# NOTE(ndahiwad): If the filters are not passed by the end-user
# then will fetch all the classifications. Otherwise, only the
# classification_types that the user wants will be returned.
if not filters['c_type']:
filters['c_type'] = ['tcp', 'udp', 'ipv4', 'ipv6', 'ethernet']
c_dict = {'classifications': []}
for c_type in filters['c_type']:
pager = base_obj.Pager(sorts, limit, page_reverse, marker)
cl = class_group.CLASS_MAP[c_type].get_objects(context,
_pager=pager)
db_dict = self.merge_headers(cl)
c_dict['classifications'].append(db_dict)
return c_dict
def get_classification_type(self, context, filters=None, fields=None,
sorts=None, limit=None, marker=None,
page_reverse=False):
ret_list = []
if not filters:
filters = {}
for key in class_group.CLASS_MAP.keys():
types = {}
obj = type_obj.ClassificationType.get_object(key)
types['type'] = obj.type
types['supported_parameters'] = obj.supported_parameters
ret_list.append(types)
return ret_list
def __getattr__(self, resource):
return super(ClassificationPlugin, self).__getattr__(resource)
def break_out_headers(self, classification):
details = classification['classification']
cl_dict = {'name': details['name'],
'description': details['description'],
'project_id': details['project_id'],
'shared': details['shared'],
'c_type': details['c_type'],
'negated': details['negated']}
definition = details['definition']
for key, value in definition.items():
cl_dict[key] = value
return cl_dict
def merge_headers(self, classifications):
c_type = classifications[0]['c_type']
ret_list = {CLASSIFICATION_MAP[c_type]: []}
for clas in classifications:
db_dict = self.merge_header(clas)
db_dict['id'] = clas.get('id', None)
ret_list[CLASSIFICATION_MAP[c_type]].append(db_dict)
return ret_list
def merge_header(self, classification):
db_dict = {'id': classification['id'],
'name': classification['name'],
'project_id': classification['project_id'],
'description': classification['description'],
'c_type': classification['c_type'],
'negated': classification['negated'],
'shared': classification['shared']}
c_type = classification['c_type']
headers = validators.type_validators[c_type].keys()
definition = {}
for header in headers:
definition[header] = classification.get(header, None)
db_dict['definition'] = definition
return db_dict
CLASSIFICATION_MAP = {'ethernet': 'EthernetClassifications',
'ipv4': 'IPV4Classifications',
'ipv6': 'IPV6Classifications',
'udp': 'UDPClassifications',
'tcp': 'TCPClassifications'}

View File

@ -0,0 +1,187 @@
# Copyright (c) 2018 Intel Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_utils import uuidutils
from neutron.db import api as db_api
from neutron.tests.unit import testlib_api
from neutron_classifier.common import exceptions
from neutron_classifier.common import validators
from neutron_classifier.db.classification import\
TrafficClassificationGroupPlugin as cg_plugin
from neutron_classifier.objects import classifications
from neutron_classifier.services.classification.plugin import\
ClassificationPlugin as c_plugin
from neutron_classifier.tests import objects_base as obj_base
class ClassificationGroupApiTest(testlib_api.MySQLTestCaseMixin,
testlib_api.SqlTestCase,
obj_base._CCFObjectsTestCommon):
def setUp(self):
super(ClassificationGroupApiTest, self).setUp()
self.test_plugin = cg_plugin()
def test_get_classification_group(self):
with db_api.context_manager.writer.using(self.ctx):
cg = self._create_test_cg('Test Group 0')
cg_dict = self.test_plugin._make_db_dict(cg)
fetch_cg = self.test_plugin.get_classification_group(self.ctx,
cg.id)
cg_dict['classification_group']['classifications'] =\
fetch_cg['classification_group']['classifications']
cg_dict['classification_group']['classification_groups'] = \
fetch_cg['classification_group']['classification_groups']
self.assertEqual(cg_dict, fetch_cg)
def test_get_classification_groups(self):
with db_api.context_manager.writer.using(self.ctx):
cg1 = self._create_test_cg('Test Group 1')
cg2 = self._create_test_cg('Test Group 2')
cgs = self.test_plugin.get_classification_groups(self.ctx)
self.assertIn(cg1, cgs)
self.assertIn(cg2, cgs)
def test_create_classification_group(self):
with db_api.context_manager.writer.using(self.ctx):
tcp_class = classifications.TCPClassification
ipv4_class = classifications.IPV4Classification
cg2 = self._create_test_cg('Test Group 1')
tcp = self._create_test_classification('tcp', tcp_class)
ipv4 = self._create_test_classification('ipv4', ipv4_class)
cg_dict = {'classification_group':
{'name': 'Test Group 0',
'description': "Description of test group",
'project_id': uuidutils.generate_uuid(),
'operator': 'AND',
'classifications': [tcp.id, ipv4.id],
'classification_groups': [cg2.id]
}}
cg1 = self.test_plugin.create_classification_group(self.ctx,
cg_dict)
fetch_cg1 = classifications.ClassificationGroup.get_object(
self.ctx, id=cg1['id'])
mapped_cgs = classifications._get_mapped_classification_groups(
self.ctx, fetch_cg1)
mapped_cs = classifications._get_mapped_classifications(
self.ctx, fetch_cg1)
mapped_classification_groups = [cg.id for cg in mapped_cgs]
mapped_classifications = [c.id for c in mapped_cs]
self.assertEqual(cg1, cg_dict['classification_group'])
for cg in mapped_classification_groups:
self.assertIn(
cg,
cg_dict['classification_group']['classification_groups'])
for c in mapped_classifications:
self.assertIn(
c, cg_dict['classification_group']['classifications'])
def test_update_classification_group(self):
with db_api.context_manager.writer.using(self.ctx):
cg1 = self._create_test_cg('Test Group 0')
cg2 = self._create_test_cg('Test Group 1')
self.test_plugin.update_classification_group(
self.ctx, cg1.id, {'name': 'Test Group updated'})
fetch_cg1 = classifications.ClassificationGroup.get_object(
self.ctx, id=cg1['id'])
self.assertRaises(
exceptions.InvalidUpdateRequest,
self.test_plugin.update_classification_group,
self.ctx, cg2.id, {'name': 'Test Group updated',
'operator': 'OR'})
self.assertEqual(fetch_cg1.name, 'Test Group updated')
def test_delete_classification_group(self):
with db_api.context_manager.writer.using(self.ctx):
cg1 = self._create_test_cg('Test Group 0')
self.test_plugin.delete_classification_group(self.ctx, cg1.id)
fetch_cg1 = classifications.ClassificationGroup.get_object(
self.ctx, id=cg1['id'])
self.assertIsNone(fetch_cg1)
class ClassificationApiTest(testlib_api.MySQLTestCaseMixin,
testlib_api.SqlTestCase,
obj_base._CCFObjectsTestCommon):
def setUp(self):
super(ClassificationApiTest, self).setUp()
self.test_clas_plugin = c_plugin()
def test_create_classification(self):
attrs = self.get_random_attrs(classifications.EthernetClassification)
c_type = 'ethernet'
attrs['c_type'] = c_type
attrs['definition'] = {}
for key in validators.type_validators[c_type].keys():
attrs['definition'][key] = attrs.pop(key, None)
c_attrs = {'classification': attrs}
with db_api.context_manager.writer.using(self.ctx):
c1 = self.test_clas_plugin.create_classification(self.ctx,
c_attrs)
fetch_c1 = classifications.EthernetClassification.get_object(
self.ctx, id=c1['id']
)
c_attrs['classification']['definition']['src_port'] = 'xyz'
self.assertRaises(exceptions.InvalidClassificationDefintion,
self.test_clas_plugin.create_classification,
self.ctx, c_attrs)
eth = c1.pop('definition', None)
for k, v in c1.items():
self.assertEqual(v, fetch_c1[k])
for x, y in eth.items():
self.assertEqual(y, fetch_c1[x])
def test_delete_classification(self):
tcp_class = classifications.TCPClassification
with db_api.context_manager.writer.using(self.ctx):
tcp = self._create_test_classification('tcp', tcp_class)
self.test_clas_plugin.delete_classification(self.ctx, tcp.id)
fetch_tcp = classifications.TCPClassification.get_object(
self.ctx, id=tcp.id)
self.assertIsNone(fetch_tcp)
def test_get_classification(self):
ipv4_class = classifications.IPV4Classification
with db_api.context_manager.writer.using(self.ctx):
ipv4 = self._create_test_classification('ipv4', ipv4_class)
fetch_ipv4 = self.test_clas_plugin.get_classification(self.ctx,
ipv4.id)
self.assertEqual(fetch_ipv4, self.test_clas_plugin.merge_header(ipv4))
def test_get_classifications(self):
with db_api.context_manager.writer.using(self.ctx):
c1 = self._create_test_classification(
'ipv6', classifications.IPV6Classification)
c2 = self._create_test_classification(
'udp', classifications.UDPClassification)
fetch_cs = self.test_clas_plugin.get_classifications(
self.ctx, filters={'c_type': ['udp', 'ipv6']})
c1_dict = self.test_clas_plugin.merge_header(c1)
c2_dict = self.test_clas_plugin.merge_header(c2)
self.assertIn({'UDPClassifications': [c2_dict]},
fetch_cs['classifications'])
self.assertIn({'IPV6Classifications': [c1_dict]},
fetch_cs['classifications'])
def test_update_classification(self):
c1 = self._create_test_classification(
'ethernet', classifications.EthernetClassification)
updated_name = 'Test Updated Classification'
with db_api.context_manager.writer.using(self.ctx):
self.test_clas_plugin.update_classification(self.ctx, c1.id,
{'name': updated_name})
fetch_c1 = classifications.EthernetClassification.get_object(
self.ctx, id=c1.id)
self.assertEqual(fetch_c1.name, updated_name)

View File

@ -0,0 +1,79 @@
# Copyright 2018 Intel Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_utils import uuidutils
import oslo_versionedobjects
from neutron_lib import context
from neutron.tests.unit.objects import test_base
from neutron_classifier.objects import classifications
from neutron_classifier.tests import tools
class _CCFObjectsTestCommon(object):
# TODO(ndahiwade): this represents classifications containing Enum fields,
# will need to be reworked if more classifications are added here later.
_Enum_classifications = [classifications.IPV4Classification,
classifications.IPV6Classification]
_Enumfield = oslo_versionedobjects.fields.EnumField
ctx = context.get_admin_context()
def get_random_attrs(self, obj=None):
obj = obj
attrs = {}
for field, field_obj in obj.fields.items():
if field != 'c_type' and type(field_obj) != self._Enumfield:
random_generator = test_base.FIELD_TYPE_VALUE_GENERATOR_MAP[
type(field_obj)]
attrs[field] = random_generator()
return attrs
def _create_test_cg(self, name):
attrs = {'name': name,
'id': uuidutils.generate_uuid(),
'description': "Description of test group",
'project_id': uuidutils.generate_uuid(),
'shared': False,
'operator': 'AND'}
cg = classifications.ClassificationGroup(self.ctx, **attrs)
cg.create()
return cg
def _create_test_classification(self, c_type, classification):
attrs = self.get_random_attrs(classification)
if classification in self._Enum_classifications:
attrs['ecn'] = tools.get_random_ecn()
attrs['c_type'] = c_type
c = classification(self.ctx, **attrs)
c.create()
return c
def _create_test_cg_cg_mapping(self, cg1, cg2):
attrs = {'container_cg_id': cg1,
'stored_cg_id': cg2}
cg_m_cg = classifications.CGToClassificationGroupMapping(self.ctx,
**attrs)
cg_m_cg.create()
return cg_m_cg
def _create_test_cg_c_mapping(self, cg, c):
attrs = {'container_cg_id': cg,
'stored_classification_id': c}
cg_m_c = classifications.CGToClassificationMapping(self.ctx,
**attrs)
cg_m_c.create()
return cg_m_c

View File

@ -0,0 +1,238 @@
# Copyright (c) 2018 Intel Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from neutron.objects import base as base_obj
from neutron_classifier.db import classification as cg_api
from neutron_classifier.objects import classifications
from neutron_classifier.tests import base
from neutron_lib import context
from oslo_utils import uuidutils
class TestClassificationGroupPlugin(base.BaseClassificationTestCase):
def setUp(self):
super(TestClassificationGroupPlugin, self).setUp()
self.setup_coreplugin(load_plugins=False)
mock.patch('neutron.objects.db.api.create_object').start()
mock.patch('neutron.objects.db.api.update_object').start()
mock.patch('neutron.objects.db.api.delete_object').start()
mock.patch('neutron.objects.db.api.get_object').start()
self.cg_plugin = cg_api.TrafficClassificationGroupPlugin()
self.ctxt = context.Context('fake_user', 'fake_tenant')
mock.patch.object(self.ctxt.session, 'refresh').start()
mock.patch.object(self.ctxt.session, 'expunge').start()
self.validator_classifications = mock.patch(
'neutron_classifier.common.validators.check_valid_classifications')
self.validator_cg = mock.patch(
'neutron_classifier.common.validators.'
'check_valid_classification_groups')
self.validator_cg.start()
self.validator_classifications.start()
self.test_classification_attrs = {
'description': 'Test Classification',
'project_id': uuidutils.generate_uuid(),
'shared': True,
'negated': True,
}
def _generate_test_classification_group(self, name):
self.cg_id = uuidutils.generate_uuid()
self.c_id1 = uuidutils.generate_uuid()
self.c_id2 = uuidutils.generate_uuid()
self.test_cg = {
'classification_group': {
'name': name,
'id': uuidutils.generate_uuid(),
'description': "Description of test group",
'project_id': uuidutils.generate_uuid(),
'operator': 'AND',
'shared': False,
'classifications': [self.c_id1, self.c_id2],
'classification_groups': [self.cg_id]}
}
return self.test_cg
@mock.patch.object(classifications.CGToClassificationGroupMapping,
'create')
@mock.patch.object(classifications.CGToClassificationMapping, 'create')
@mock.patch.object(classifications.ClassificationGroup, 'create')
def test_create_classification_group(self, mock_cg_create,
mock_cg_c_mapping_create,
mock_cg_cg_mapping_create):
mock_manager = mock.Mock()
mock_manager.attach_mock(mock_cg_create, 'create_cg')
mock_manager.attach_mock(mock_cg_c_mapping_create, 'create_cg_c')
mock_manager.attach_mock(mock_cg_cg_mapping_create, 'create_cg_cg')
mock_manager.reset_mock()
test_cg = self._generate_test_classification_group('Test Group')
test_cg['classification_group'].pop('id', None)
val = self.cg_plugin.create_classification_group(self.ctxt,
test_cg)
expected_val = test_cg['classification_group']
self.assertEqual(val, expected_val)
c_len = len(val['classifications'])
cg_len = len(val['classification_groups'])
mock_call_len = len(mock_manager.mock_calls)
self.assertEqual(mock_call_len, c_len + cg_len + 1)
mock_manager.create_cg.assert_called_once()
mock_manager.create_cg_cg.assert_called_once()
self.assertEqual(mock_manager.create_cg_c.call_count, c_len)
@mock.patch.object(classifications.ClassificationGroup, 'get_object')
@mock.patch('neutron_classifier.common.validators.'
'check_can_delete_classification_group')
def test_delete_classification_group(self, mock_valid_delete,
mock_cg_get):
mock_manager = mock.Mock()
mock_manager.attach_mock(mock_valid_delete, 'valid_del')
mock_manager.attach_mock(mock_cg_get, 'get_cg')
mock_manager.reset_mock()
mock_manager.valid_del.return_value = True
mock_cg_id = uuidutils.generate_uuid()
self.cg_plugin.delete_classification_group(self.ctxt, mock_cg_id)
mock_cg_delete_call = mock.call.get_cg().delete()
mock_cg_check_validity = mock.call.valid_del(self.ctxt, mock_cg_id)
mock_cg_get_call = mock.call.get_cg(self.ctxt, id=mock_cg_id)
mock_cg_delete_call.assert_called_once()
mock_cg_get_call.assert_called_once_with(mock_cg_id)
mock_cg_check_validity.assert_called()
self.assertEqual(3, len(mock_manager.mock_calls))
self.assertTrue(
mock_manager.mock_calls.index(mock_cg_check_validity) <
mock_manager.mock_calls.index(mock_cg_get_call) <
mock_manager.mock_calls.index(mock_cg_delete_call))
def _mock_mapped_classifications(self):
self.mock_c1 = mock.Mock(id=uuidutils.generate_uuid(),
name='Ethernet', c_type='ethernet',
**self.test_classification_attrs)
self.mock_c2 = mock.Mock(id=uuidutils.generate_uuid(), name='TCP',
c_type='tcp',
**self.test_classification_attrs)
return [self.mock_c1, self.mock_c2]
@mock.patch('neutron_classifier.objects.classifications.'
'_get_mapped_classification_groups')
@mock.patch('neutron_classifier.objects.classifications.'
'_get_mapped_classifications')
@mock.patch.object(classifications.ClassificationGroup, 'get_object')
def test_get_classification_group(self, mock_cg_get,
mock_mapped_classifications,
mock_mapped_cgs):
mock_manager = mock.Mock()
mock_manager.attach_mock(mock_cg_get, 'get_cg')
mock_manager.attach_mock(mock_mapped_classifications, 'get_mapped_cs')
mock_manager.attach_mock(mock_mapped_cgs, 'get_mapped_cgs')
mock_manager.reset_mock()
mock_manager.get_mapped_cs.side_effect =\
self._mock_mapped_classifications()
mock_manager.get_mapped_cgs.side_effect = ['cg2']
test_cg = self._generate_test_classification_group('Test Group')
test_cg['classification_group'].pop('classifications', None)
test_cg['classification_group'].pop('classification_groups', None)
mock_manager.get_cg.return_value = test_cg
with mock.patch('neutron_classifier.db.classification.'
'TrafficClassificationGroupPlugin._make_db_dict',
return_value=test_cg):
val1 = self.cg_plugin.get_classification_group(
self.ctxt, test_cg['classification_group']['id'])
self.assertEqual(val1, test_cg)
mock_manager.get_cg.assert_called_with(
self.ctxt, id=test_cg['classification_group']['id']
)
self.assertEqual(val1['classification_group']['classifications'],
self.mock_c1)
val1['classification_group']['classifications'] =\
classifications._get_mapped_classifications(self.ctxt,
test_cg)
self.assertEqual(val1['classification_group']['classifications'],
self.mock_c2)
self.assertEqual(val1['classification_group']
['classification_groups'], 'cg2')
mapped_cs_call_count = mock_manager.get_mapped_cs.call_count
self.assertEqual(2, mapped_cs_call_count)
@mock.patch.object(base_obj, 'Pager')
@mock.patch.object(classifications.ClassificationGroup, 'get_objects')
def test_get_classification_groups(self, mock_cgs_get, mock_pager):
mock_manager = mock.Mock()
mock_manager.attach_mock(mock_cgs_get, 'get_cgs')
mock_manager.attach_mock(mock_pager, 'pager')
mock_manager.reset_mock()
test_cg1 = self._generate_test_classification_group('Test Group1')
test_cg2 = self._generate_test_classification_group('Test Group2')
test_cg1 = test_cg1['classification_group']
test_cg2 = test_cg2['classification_group']
cg1 = classifications.ClassificationGroup(self.ctxt, **test_cg1)
cg2 = classifications.ClassificationGroup(self.ctxt, **test_cg2)
cg_list = [cg1, cg2]
mock_manager.get_cgs.return_value = cg_list
val = self.cg_plugin.get_classification_groups(self.ctxt)
self.assertEqual(val, cg_list)
mock_manager.get_cgs.assert_called_once()
mock_manager.pager.assert_called_once()
self.assertEqual(len(mock_manager.mock_calls), 2)
@mock.patch.object(classifications.ClassificationGroup, 'update_object')
def test_update_classification_group(self, mock_cg_update):
mock_manager = mock.Mock()
mock_manager.attach_mock(mock_cg_update, 'cg_update')
mock_manager.reset_mock()
test_cg = self._generate_test_classification_group('Test Group')
test_cg = test_cg['classification_group']
cg = classifications.ClassificationGroup(self.ctxt, **test_cg)
updated_fields = {'name': 'Test Group Updated',
'description': 'Updated Description'}
self.cg_plugin.update_classification_group(self.ctxt, cg.id,
updated_fields)
mock_manager.cg_update.assert_called_once()
mock_manager.cg_update.assert_called_once_with(self.ctxt,
updated_fields,
id=cg.id)

View File

@ -12,10 +12,10 @@
# License for the specific language governing permissions and limitations
# under the License.
from oslo_utils import uuidutils
import oslo_versionedobjects
from neutron_classifier.objects import classifications
from neutron_classifier.tests import objects_base as obj_base
from neutron_classifier.tests import tools
from neutron_lib import context
@ -25,64 +25,9 @@ from neutron.tests.unit.objects import test_base
from neutron.tests.unit import testlib_api
class _CCFObjectsTestCommon(object):
# TODO(ndahiwade): this represents classifications containing Enum fields,
# will need to be reworked if more classifications are added here later.
_Enum_classifications = [classifications.IPV4Classification,
classifications.IPV6Classification]
_Enumfield = oslo_versionedobjects.fields.EnumField
ctx = context.get_admin_context()
def get_random_attrs(self, obj=None):
obj = obj
attrs = {}
for field, field_obj in obj.fields.items():
if field != 'c_type' and type(field_obj) != self._Enumfield:
random_generator = test_base.FIELD_TYPE_VALUE_GENERATOR_MAP[
type(field_obj)]
attrs[field] = random_generator()
return attrs
def _create_test_cg(self, name):
attrs = {'name': name,
'id': uuidutils.generate_uuid(),
'description': "Description of test group",
'project_id': uuidutils.generate_uuid(),
'operator': 'AND'}
cg = classifications.ClassificationGroup(self.ctx, **attrs)
cg.create()
return cg
def _create_test_classification(self, c_type, classification):
attrs = self.get_random_attrs(classification)
if classification in self._Enum_classifications:
attrs['ecn'] = tools.get_random_ecn()
attrs['c_type'] = c_type
c = classification(self.ctx, **attrs)
c.create()
return c
def _create_test_cg_cg_mapping(self, cg1, cg2):
attrs = {'container_cg_id': cg1,
'stored_cg_id': cg2}
cg_m_cg = classifications.CGToClassificationGroupMapping(self.ctx,
**attrs)
cg_m_cg.create()
return cg_m_cg
def _create_test_cg_c_mapping(self, cg, c):
attrs = {'container_cg_id': cg,
'stored_classification_id': c}
cg_m_c = classifications.CGToClassificationMapping(self.ctx,
**attrs)
cg_m_c.create()
return cg_m_c
class ClassificationGroupTest(test_base.BaseDbObjectTestCase,
testlib_api.SqlTestCase,
_CCFObjectsTestCommon):
obj_base._CCFObjectsTestCommon):
# NOTE(ndahiwade): As the FIELD_TYPE_VALUE_GENERATOR_MAP in neutron's
# test_base for objects doesn't have an entry for operator Enum fields,
# we are adding it here for our use rather than adding in neutron.
@ -108,7 +53,7 @@ class ClassificationGroupTest(test_base.BaseDbObjectTestCase,
# mapping class inheritence (polymorphic_identity), and as this is unique to
# CCF we have decided not to use it for tests for individual classifications.
class UDPClassificationTest(testlib_api.SqlTestCase,
_CCFObjectsTestCommon):
obj_base._CCFObjectsTestCommon):
test_class = classifications.UDPClassification
@ -126,7 +71,7 @@ class UDPClassificationTest(testlib_api.SqlTestCase,
class IPV4ClassificationTest(testlib_api.SqlTestCase,
_CCFObjectsTestCommon):
obj_base._CCFObjectsTestCommon):
test_class = classifications.IPV4Classification
@ -144,7 +89,7 @@ class IPV4ClassificationTest(testlib_api.SqlTestCase,
class IPV6ClassificationTest(testlib_api.SqlTestCase,
_CCFObjectsTestCommon):
obj_base._CCFObjectsTestCommon):
test_class = classifications.IPV6Classification
@ -162,7 +107,7 @@ class IPV6ClassificationTest(testlib_api.SqlTestCase,
class TCPClassificationTest(testlib_api.SqlTestCase,
_CCFObjectsTestCommon):
obj_base._CCFObjectsTestCommon):
test_class = classifications.TCPClassification
@ -180,7 +125,7 @@ class TCPClassificationTest(testlib_api.SqlTestCase,
class EthernetClassificationTest(testlib_api.SqlTestCase,
_CCFObjectsTestCommon):
obj_base._CCFObjectsTestCommon):
test_class = classifications.EthernetClassification
@ -201,7 +146,7 @@ class EthernetClassificationTest(testlib_api.SqlTestCase,
class CGToClassificationGroupMappingTest(testlib_api.SqlTestCase,
_CCFObjectsTestCommon):
obj_base._CCFObjectsTestCommon):
def test_get_object(self):
with db_api.context_manager.writer.using(self.ctx):
@ -235,7 +180,7 @@ class CGToClassificationGroupMappingTest(testlib_api.SqlTestCase,
class CGToClassificationMappingTest(testlib_api.SqlTestCase,
_CCFObjectsTestCommon):
obj_base._CCFObjectsTestCommon):
ctx = context.get_admin_context()

View File

@ -0,0 +1,269 @@
# Copyright (c) 2017 Intel Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from neutron.objects import base as base_obj
from neutron_classifier.objects import classifications as class_group
from neutron_classifier.services.classification import plugin
from neutron_classifier.tests import base
from neutron_lib import context
from oslo_utils import uuidutils
class TestPlugin(base.BaseClassificationTestCase):
def setUp(self):
super(TestPlugin, self).setUp()
self.setup_coreplugin(load_plugins=False)
mock.patch('neutron.objects.db.api.create_object').start()
mock.patch('neutron.objects.db.api.update_object').start()
mock.patch('neutron.objects.db.api.delete_object').start()
mock.patch('neutron.objects.db.api.get_object').start()
self.cl_plugin = plugin.ClassificationPlugin()
self.ctxt = context.Context('fake_user', 'fake_tenant')
mock.patch.object(self.ctxt.session, 'refresh').start()
mock.patch.object(self.ctxt.session, 'expunge').start()
mock.patch('neutron_classifier.objects.classifications').start()
self._generate_test_classifications()
def _generate_test_classifications(self):
self.test_classification = {
'classification': {'id': uuidutils.generate_uuid(),
'name': 'test_ethernet_classification',
'description': 'Test Ethernet Classification',
'project_id': uuidutils.generate_uuid(),
'shared': True,
'negated': True,
'c_type': 'ethernet',
'definition': {'src_addr': '00:07:E9:63:CE:53',
'dst_addr': '00:07:E9:42:AC:28',
'ethertype': 8100}}}
self.test_classification_broken_headers = {
'id': self.test_classification['classification']['id'],
'name': 'test_ethernet_classification',
'description': 'Test Ethernet Classification',
'project_id': self.test_classification[
'classification']['project_id'],
'shared': True,
'negated': True,
'c_type': 'ethernet',
'src_addr': '00:07:E9:63:CE:53',
'dst_addr': '00:07:E9:42:AC:28',
'ethertype': 8100}
self.test_classification_2 = {
'classification': {'id': uuidutils.generate_uuid(),
'name': 'test_second_ethernet_cl',
'description': 'Test Second '
'Ethernet Classification',
'project_id': uuidutils.generate_uuid(),
'shared': False,
'negated': False,
'c_type': 'ethernet',
'definition': {'src_addr': '00:54:TY:89:G6:67',
'dst_adrr': '00:54:TY:65:T7:44',
'ethertype': 8100}}}
self.test_classification_2_broken_headers = {
'id': self.test_classification_2['classification']['id'],
'name': 'test_second_ethernet_cl',
'description': 'Test Second Ethernet Classification',
'project_id': self.test_classification_2[
'classification']['project_id'],
'shared': False,
'negated': False,
'c_type': 'ethernet',
'src_addr': '00:54:TY:89:G6:67',
'dst_addr': '00:54:TY:65:T7:44',
'ethertype': 8100}
def test_classification_break_out_headers(self):
cl = self.cl_plugin.break_out_headers(
self.test_classification)
self.test_classification_broken_headers.pop('id', None)
self.assertEqual(self.test_classification_broken_headers, cl)
def test_merge_header(self):
cl = self.cl_plugin.merge_header(
self.test_classification_broken_headers)
self.assertEqual(self.test_classification['classification'],
cl)
@mock.patch.object(class_group.EthernetClassification, 'create')
@mock.patch.object(class_group.EthernetClassification, 'id',
return_value=uuidutils.generate_uuid())
def test_create_classification(self, mock_ethernet_id,
mock_ethernet_create):
mock_manager = mock.Mock()
mock_manager.attach_mock(mock_ethernet_create, 'create')
mock_manager.attach_mock(mock_ethernet_id, 'id')
mock_manager.reset_mock()
mock_manager.start()
self.test_classification['classification'].pop('id', None)
val = self.cl_plugin.create_classification(
self.ctxt, self.test_classification)
expected_val = self.test_classification['classification']
expected_val['id'] = class_group.EthernetClassification.id
self.assertEqual(expected_val, val)
mock_manager.create.assert_called_once()
@mock.patch.object(class_group.ClassificationBase, 'get_object')
@mock.patch.object(class_group.EthernetClassification, 'update_object')
def test_update_classification(self, mock_ethernet_update,
mock_class_get):
mock_manager = mock.Mock()
mock_manager.attach_mock(mock_ethernet_update, 'update')
mock_manager.attach_mock(mock_class_get, 'get_classification')
mock_manager.reset_mock()
mock_manager.start()
class_obj = class_group.EthernetClassification(
self.ctxt, **self.test_classification_broken_headers)
ethernet_classification_update = {
'name': 'test_ethernet_classification Version 2',
'description': 'Test Ethernet Classification Version 2'}
mock_manager.get_classification().c_type = 'ethernet'
self.cl_plugin.update_classification(
self.ctxt, class_obj.id,
ethernet_classification_update)
classification_update_mock_call = mock.call.update(
self.ctxt,
{'description': 'Test Ethernet Classification Version 2',
'name': 'test_ethernet_classification Version 2'},
id=class_obj.id)
self.assertIn(classification_update_mock_call, mock_manager.mock_calls)
self.assertEqual(mock_manager.get_classification.call_count, 2)
@mock.patch.object(class_group.ClassificationBase, 'get_object')
@mock.patch.object(class_group.EthernetClassification, 'get_object')
def test_delete_classification(self, mock_ethernet_get, mock_base_get):
mock_manager = mock.Mock()
mock_manager.attach_mock(mock_base_get, 'get_object')
mock_manager.attach_mock(mock_ethernet_get, 'get_object')
eth_class_obj = class_group.EthernetClassification(
self.ctxt, **self.test_classification_broken_headers)
eth_class_obj.delete = mock.Mock()
base_class_obj = class_group.ClassificationBase(
self.ctxt, **self.test_classification_broken_headers)
mock_base_get.return_value = base_class_obj
mock_ethernet_get.return_value = eth_class_obj
mock_manager.reset_mock()
self.cl_plugin.delete_classification(
self.ctxt, base_class_obj.id)
get_obj_mock_call = mock.call.get_object(
self.ctxt,
id=self.test_classification_broken_headers['id'])
self.assertIn(get_obj_mock_call, mock_manager.mock_calls)
self.assertEqual([get_obj_mock_call, get_obj_mock_call,
get_obj_mock_call, get_obj_mock_call],
mock_manager.mock_calls)
self.assertTrue(eth_class_obj.delete.assert_called_once)
@mock.patch.object(class_group.ClassificationBase, 'get_object')
@mock.patch.object(class_group.EthernetClassification, 'get_object')
def test_get_classification(self, mock_ethernet_get,
mock_base_get):
mock_manager = mock.Mock()
mock_manager.attach_mock(mock_base_get, 'get_object')
mock_manager.attach_mock(mock_ethernet_get, 'get_object')
eth_classification = self.test_classification[
'classification']
definition = eth_classification.pop('definition')
base_class_obj = class_group.ClassificationBase(
self.ctxt, **eth_classification)
eth_class_obj = class_group.EthernetClassification(
self.ctxt, **self.test_classification_broken_headers)
mock_base_get.return_value = base_class_obj
mock_ethernet_get.return_value = eth_class_obj
eth_classification['definition'] = definition
mock_manager.reset_mock()
value = self.cl_plugin.get_classification(
self.ctxt, eth_classification['id'])
get_obj_mock_call = mock.call.get_object(
self.ctxt, id=eth_classification['id'])
self.assertIn(get_obj_mock_call, mock_manager.mock_calls)
self.assertEqual([get_obj_mock_call, get_obj_mock_call],
mock_manager.mock_calls)
self.assertTrue(eth_classification, value)
@mock.patch.object(class_group.ClassificationBase, 'get_objects')
@mock.patch.object(class_group.EthernetClassification, 'get_objects')
@mock.patch.object(base_obj, 'Pager')
def test_get_classifications(self, mock_pager, mock_ethernet_get,
mock_base_get):
mock_manager = mock.Mock()
mock_manager.attach_mock(mock_base_get, 'get_objects')
mock_manager.attach_mock(mock_ethernet_get, 'get_objects')
eth_cl_1 = self.test_classification['classification']
eth_cl_2 = self.test_classification_2['classification']
definition = eth_cl_1.pop('definition')
definition_2 = eth_cl_2.pop('definition')
base_class_obj_1 = class_group.ClassificationBase(
self.ctxt, **eth_cl_1)
base_class_obj_2 = class_group.ClassificationBase(
self.ctxt, **eth_cl_2)
eth_class_obj_1 = class_group.EthernetClassification(
self.ctxt, **self.test_classification_broken_headers)
eth_class_obj_2 = class_group.EthernetClassification(
self.ctxt, **self.test_classification_2_broken_headers)
base_list = [base_class_obj_1, base_class_obj_2]
eth_list = [eth_class_obj_1, eth_class_obj_2]
mock_base_get.return_value = base_list
mock_ethernet_get.return_value = eth_list
mock_pager.return_value = None
eth_cl_1['definition'] = definition
eth_cl_2['defintion'] = definition_2
result_list = [eth_cl_1, eth_cl_2]
mock_manager.reset_mock()
value = self.cl_plugin.get_classifications(
self.ctxt, filters={'c_type': ['ethernet']})
get_objs_mock_call = mock.call.get_objects(
self.ctxt, _pager=None)
self.assertIn(get_objs_mock_call, mock_manager.mock_calls)
self.assertEqual([get_objs_mock_call], mock_manager.mock_calls)
self.assertTrue(result_list, value)

View File

@ -1,6 +1,7 @@
pbr>=2.0.0,!=2.1.0 # Apache-2.0
Babel>=2.3.4,!=2.4.0 # BSD
SQLAlchemy>=1.0.10,!=1.1.5,!=1.1.6,!=1.1.7,!=1.1.8 # MIT
neutron-lib>=1.7.0 # Apache-2.0
neutron>=12.0.0 # Apache-2.0
oslo.utils>=3.20.0 # Apache-2.0
neutron-lib>=1.18.0 # Apache-2.0
oslo.utils>=3.33.0 # Apache-2.0
-e git+https://git.openstack.org/openstack/neutron@master#egg=neutron

View File

@ -25,6 +25,9 @@ packages =
neutron_classifier
[entry_points]
neutron.service_plugins =
neutron_classifier = neutron_classifier.services.classification.plugin:ClassificationPlugin
neutron.db.alembic_migrations =
neutron-classifier = neutron_classifier.db.migration:alembic_migrations

View File

@ -11,7 +11,7 @@ openstackdocstheme>=1.18.1 # Apache-2.0
oslosphinx>=4.7.0 # Apache-2.0
WebOb>=1.7.1 # MIT
oslotest>=3.2.0 # Apache-2.0
os-testr>=1.0.0 # Apache-2.0
stestr>=2.0.0 # Apache-2.0
testresources>=2.0.0 # Apache-2.0/BSD
testscenarios>=0.4 # Apache-2.0/BSD
testtools>=2.2.0 # MIT

View File

@ -1,8 +0,0 @@
#!/bin/sh
# preserve old behavior of using an arg as a regex when '--' is not present
case $@ in
(*--*) ostestr $@;;
('') ostestr;;
(*) ostestr --regex "$@"
esac

27
tox.ini
View File

@ -7,15 +7,22 @@ skipsdist = True
usedevelop = True
install_command = pip install {opts} {packages}
setenv =
VIRTUAL_ENV={envdir}
OS_LOG_CAPTURE={env:OS_LOG_CAPTURE:true}
OS_STDOUT_CAPTURE={env:OS_STDOUT_CAPTURE:true}
OS_STDERR_CAPTURE={env:OS_STDERR_CAPTURE:true}
PYTHONWARNINGS=default::DeprecationWarning
deps =
-c{env:UPPER_CONSTRAINTS_FILE:https://git.openstack.org/cgit/openstack/requirements/plain/upper-constraints.txt}
-r{toxinidir}/requirements.txt
-r{toxinidir}/test-requirements.txt
whitelist_externals = rm
whitelist_externals =
sh
find
commands =
rm -f .testrepository/times.dbm
python setup.py test --slowest --testr-args='{posargs}'
find . -type f -name "*.py[c|o]" -delete
find . -type d -name "__pycache__" -delete
stestr run {posargs}
[testenv:pep8]
commands =
@ -35,8 +42,7 @@ setenv = {[testenv]setenv}
deps =
{[testenv]deps}
-r{toxinidir}/neutron_classifier/tests/functional/requirements.txt
commands =
{toxinidir}/tools/ostestr_compat_shim.sh {posargs}
commands = stestr run {posargs}
[testenv:functional-py35]
basepython = python3.5
@ -47,8 +53,7 @@ setenv = {[testenv]setenv}
deps =
{[testenv]deps}
-r{toxinidir}/neutron_classifier/tests/functional/requirements.txt
commands =
{toxinidir}/tools/ostestr_compat_shim.sh {posargs}
commands = stestr run {posargs}
[testenv:dsvm-functional]
basepython = python2.7
@ -57,14 +62,16 @@ setenv = {[testenv:functional]setenv}
sitepackages=True
deps =
{[testenv:functional]deps}
commands =
{toxinidir}/tools/ostestr_compat_shim.sh {posargs}
commands = stestr run {posargs}
[testenv:venv]
commands = {posargs}
[testenv:cover]
commands = python setup.py test --coverage --testr-args='{posargs}'
setenv =
PYTHON=coverage run --source neutron_classifier --parallel-mode
commands =
stestr run '{posargs}'
[testenv:docs]
commands = python setup.py build_sphinx