Add Database Models and OVOs for classifications

- Introducing database models and oslo versioned objects for
   classification resources used by the common classification
   framework service plugin.
 - Add playbooks, post_test_hook, gate_hook,
   modify requirements, .zuul.yaml for gate.
   Add migration test, fix models and migration scripts.
 - Change ClassificationType decorator to NeutronObjectRegistry,
   add get_objects method.
   Remove ClassificationType from object version test,
   Add get_bound_tenant_ids to ClassificationGroups
   Add openstack-python35-jobs-neutron to zuul.
Change-Id: I41d5b399352b47d74000596e6518e199d36709a7
Co-Authored-By: David Shaughnessy <david.shaughnessy@intel.com>
Co-Authored-By: Nakul Dahiwade <nakul.dahiwade@intel.com>
Co-Authored-By: John Hinman <john.hinman@intel.com>
This commit is contained in:
Thaynara Silva 2017-08-31 12:07:55 +00:00 committed by Nakul Dahiwade
parent c9eb004689
commit 28f020f592
32 changed files with 1619 additions and 812 deletions

3
.stestr.conf Normal file
View File

@ -0,0 +1,3 @@
[DEFAULT]
test_path=${OS_TEST_PATH:-./neutron-classifier/tests/functional}
top_dir=./

View File

@ -1,17 +1,20 @@
- project:
check:
jobs:
- openstack-tox-functional:
required-projects:
- openstack/neutron
- openstack-tox-functional-py35:
- neutron-classifier-functional-dsvm:
required-projects:
- openstack/neutron
gate:
jobs:
- openstack-tox-functional:
required-projects:
- openstack/neutron
- openstack-tox-functional-py35:
- neutron-classifier-functional-dsvm:
required-projects:
- openstack/neutron
- job:
name: neutron-classifier-functional-dsvm
parent: legacy-dsvm-base
run: playbooks/legacy/neutron-classifier-functional-dsvm/run.yaml
post-run: playbooks/legacy/neutron-classifier-functional-dsvm/post.yaml
timeout: 7800
required-projects:
- openstack-infra/devstack-gate

View File

@ -14,31 +14,17 @@
# under the License.
CLASSIFIER_TYPES = ['ip_classifier', 'ipv4_classifier', 'ipv6_classifier',
'transport_classifier', 'ethernet_classifier',
'encapsulation_classifier', 'neutron_port_classifier']
from neutron_classifier.objects import classifications as cs
# Protocol names and numbers
PROTO_NAME_ICMP = 'icmp'
PROTO_NAME_ICMP_V6 = 'icmpv6'
PROTO_NAME_TCP = 'tcp'
PROTO_NAME_UDP = 'udp'
FIELDS_IPV4 = cs.IPV4Classification.fields.keys()
FIELDS_IPV6 = cs.IPV6Classification.fields.keys()
FIELDS_TCP = cs.TCPClassification.fields.keys()
FIELDS_UDP = cs.UDPClassification.fields.keys()
FIELDS_ETHERNET = cs.EthernetClassification.fields.keys()
# TODO(sc68cal) add more protocols`
PROTOCOLS = [PROTO_NAME_ICMP, PROTO_NAME_ICMP_V6,
PROTO_NAME_TCP, PROTO_NAME_UDP]
ENCAPSULATION_TYPES = ['vxlan', 'gre']
NEUTRON_SERVICES = ['neutron-fwaas', 'networking-sfc', 'security-group']
DIRECTIONS = ['INGRESS', 'EGRESS', 'BIDIRECTIONAL']
ETHERTYPE_IPV4 = 0x0800
ETHERTYPE_IPV6 = 0x86DD
IP_VERSION_4 = 4
IP_VERSION_6 = 6
SECURITYGROUP_ETHERTYPE_IPV4 = 'IPv4'
SECURITYGROUP_ETHERTYPE_IPV6 = 'IPv6'
SUPPORTED_FIELDS = {'ipv4': FIELDS_IPV4,
'ipv6': FIELDS_IPV6,
'tcp': FIELDS_TCP,
'udp': FIELDS_UDP,
'ethernet': FIELDS_ETHERNET}

View File

@ -0,0 +1,55 @@
# Copyright 2011, VMware, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# Borrowed from the Neutron code base, more utilities will be added/borrowed as
# and when needed.
import importlib
import os
import re
import sys
import neutron_classifier
_SEPARATOR_REGEX = re.compile(r'[/\\]+')
def import_modules_recursively(topdir):
'''Import and return all modules below the topdir directory.'''
topdir = _SEPARATOR_REGEX.sub('/', topdir)
modules = []
for root, dirs, files in os.walk(topdir):
for file_ in files:
if file_[-3:] != '.py':
continue
module = file_[:-3]
if module == '__init__':
continue
import_base = _SEPARATOR_REGEX.sub('.', root)
# NOTE(ihrachys): in Python3, or when we are not located in the
# directory containing neutron code, __file__ is absolute, so we
# should truncate it to exclude PYTHONPATH prefix
prefixlen = len(os.path.dirname(neutron_classifier.__file__))
import_base = 'neutron_classifier' + import_base[prefixlen:]
module = '.'.join([import_base, module])
if module not in sys.modules:
importlib.import_module(module)
modules.append(module)
return modules

View File

@ -0,0 +1,17 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from neutron.db.migration.models import head
def get_metadata():
return head.model_base.BASEV2.metadata

View File

@ -1,202 +0,0 @@
# Copyright (c) 2015 Mirantis, Inc.
# Copyright (c) 2015 Huawei Technologies India Pvt Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron_classifier.common import constants
from neutron_classifier.db import models
from neutron_classifier.db import validators
def security_group_ethertype_to_ethertype_value(ethertype):
if ethertype == constants.SECURITYGROUP_ETHERTYPE_IPV6:
return constants.ETHERTYPE_IPV6
else:
return constants.ETHERTYPE_IPV4
def ethertype_value_to_security_group_ethertype(ethertype):
if ethertype == constants.ETHERTYPE_IPV6:
return constants.SECURITYGROUP_ETHERTYPE_IPV6
else:
return constants.SECURITYGROUP_ETHERTYPE_IPV4
def get_classifier_group(context, classifier_group_id):
return context.session.query(models.ClassifierGroup).get(
classifier_group_id)
def create_classifier_chain(classifier_group, classifiers,
incremeting_sequence=False):
if incremeting_sequence:
seq = 0
for classifier in classifiers:
ce = models.ClassifierChainEntry(classifier_group=classifier_group,
classifier=classifier)
if incremeting_sequence:
ce.sequence = seq
classifier_group.classifier_chain.append(ce)
def convert_security_group_to_classifier(context, security_group):
cgroup = models.ClassifierGroup()
cgroup.service = 'security-group'
for rule in security_group['security_group_rules']:
convert_security_group_rule_to_classifier(context, rule, cgroup)
context.session.add(cgroup)
context.session.commit()
return cgroup
def convert_security_group_rule_to_classifier(context, sgr, group):
cl1 = cl2 = cl3 = cl4 = cl5 = None
# Ethertype
if validators.is_ethernetclassifier_valid(sgr, validators.SG_RULE_TYPE):
cl1 = models.EthernetClassifier()
cl1.ethertype = security_group_ethertype_to_ethertype_value(
sgr['ethertype'])
# protocol
if validators.is_protocolclassifier_valid(sgr, validators.SG_RULE_TYPE):
if cl1 and cl1.ethertype == constants.ETHERTYPE_IPV6:
cl2 = models.Ipv6Classifier()
cl2.next_header = sgr['protocol']
else:
cl2 = models.Ipv4Classifier()
cl2.protocol = sgr['protocol']
# remote ip
if validators.is_ipclassifier_valid(sgr, validators.SG_RULE_TYPE):
cl3 = models.IpClassifier()
cl3.source_ip_prefix = sgr['remote_ip_prefix']
# Ports
if validators.is_transportclassifier_valid(sgr, validators.SG_RULE_TYPE):
cl4 = models.TransportClassifier(
destination_port_range_min=sgr['port_range_min'],
destination_port_range_max=sgr['port_range_max'])
# Direction
if validators.is_directionclassifier_valid(sgr, validators.SG_RULE_TYPE):
cl5 = models.DirectionClassifier(direction=sgr['direction'])
classifiers = [cl1, cl2, cl3, cl4, cl5]
create_classifier_chain(group, classifiers)
def convert_classifier_group_to_security_group(context, classifier_group_id):
sg_dict = {}
cg = get_classifier_group(context, classifier_group_id)
for classifier in [link.classifier for link in cg.classifier_chain]:
classifier_type = type(classifier)
if classifier_type is models.TransportClassifier:
sg_dict['port_range_min'] = classifier.destination_port_range_min
sg_dict['port_range_max'] = classifier.destination_port_range_max
continue
if classifier_type is models.IpClassifier:
sg_dict['remote_ip_prefix'] = classifier.source_ip_prefix
continue
if classifier_type is models.DirectionClassifier:
sg_dict['direction'] = classifier.direction
continue
if classifier_type is models.EthernetClassifier:
sg_dict['ethertype'] = ethertype_value_to_security_group_ethertype(
classifier.ethertype)
continue
if classifier_type is models.Ipv4Classifier:
sg_dict['protocol'] = classifier.protocol
continue
if classifier_type is models.Ipv6Classifier:
sg_dict['protocol'] = classifier.next_header
continue
return sg_dict
def convert_firewall_policy_to_classifier(context, firewall):
cgroup = models.ClassifierGroup()
cgroup.service = 'neutron-fwaas'
for rule in firewall['firewall_rules']:
convert_firewall_rule_to_classifier(context, rule, cgroup)
context.session.add(cgroup)
context.session.commit()
return cgroup
def convert_firewall_rule_to_classifier(context, fwr, group):
cl1 = cl2 = cl3 = cl4 = None
# ip_version
if validators.is_ethernetclassifier_valid(fwr, validators.FW_RULE_TYPE):
cl1 = models.EthernetClassifier()
cl1.ethertype = fwr['ip_version']
# protocol
if validators.is_protocolclassifier_valid(fwr, validators.FW_RULE_TYPE):
if cl1.ethertype == constants.IP_VERSION_6:
cl2 = models.Ipv6Classifier()
cl2.next_header = fwr['protocol']
else:
cl2 = models.Ipv4Classifier()
cl2.protocol = fwr['protocol']
# Source and destination ip
if validators.is_ipclassifier_valid(fwr, validators.FW_RULE_TYPE):
cl3 = models.IpClassifier()
cl3.source_ip_prefix = fwr['source_ip_address']
cl3.destination_ip_prefix = fwr['destination_ip_address']
# Ports
if validators.is_transportclassifier_valid(fwr, validators.FW_RULE_TYPE):
cl4 = models.TransportClassifier(
source_port_range_min=fwr['source_port_range_min'],
source_port_range_max=fwr['source_port_range_max'],
destination_port_range_min=fwr['destination_port_range_min'],
destination_port_range_max=fwr['destination_port_range_max'])
classifiers = [cl1, cl2, cl3, cl4]
create_classifier_chain(group, classifiers)
def convert_classifier_to_firewall(context, classifier_group_id):
fw_rule = {}
cg = get_classifier_group(context, classifier_group_id)
for classifier in [link.classifier for link in cg.classifier_chain]:
classifier_type = type(classifier)
if classifier_type is models.EthernetClassifier:
fw_rule['ip_version'] = classifier.ethertype
continue
if classifier_type is models.Ipv4Classifier:
fw_rule['protocol'] = classifier.protocol
continue
if classifier_type is models.Ipv6Classifier:
fw_rule['protocol'] = classifier.next_header
continue
if classifier_type is models.TransportClassifier:
fw_rule['source_port_range_min'] = classifier.source_port_range_min
fw_rule['source_port_range_max'] = classifier.source_port_range_max
fw_rule['destination_port_range_min'] = \
classifier.destination_port_range_min
fw_rule['destination_port_range_max'] = \
classifier.destination_port_range_max
continue
if classifier_type is models.IpClassifier:
fw_rule['source_ip_address'] = classifier.source_ip_prefix
fw_rule['destination_ip_address'] = \
classifier.destination_ip_prefix
continue
return fw_rule

View File

@ -37,8 +37,9 @@ def upgrade():
sa.Column('description', sa.String(length=255)),
sa.Column('project_id', sa.String(length=255),
index=True),
sa.Column('shared', sa.Boolean(), nullable=False),
sa.Column('operator', sa.Enum("AND", "OR"), nullable=False))
sa.Column('shared', sa.Boolean()),
sa.Column('operator', sa.Enum("AND", "OR", name="operator_types"),
nullable=False))
op.create_table(
'classifications',
@ -46,32 +47,35 @@ def upgrade():
sa.Column('c_type', sa.String(length=36)),
sa.Column('name', sa.String(length=255)),
sa.Column('description', sa.String(length=255)),
sa.Column('negated', sa.Boolean(), nullable=True),
sa.Column('shared', sa.Boolean(), nullable=True),
sa.Column('negated', sa.Boolean()),
sa.Column('shared', sa.Boolean()),
sa.Column('project_id', sa.String(length=255),
index=True))
op.create_table(
'classification_group_to_classification_mappings',
sa.Column('container_cg_id', sa.String(length=36), sa.ForeignKey(
"classification_groups.id"), primary_key=True),
"classification_groups.id", ondelete="CASCADE"),
primary_key=True),
sa.Column('stored_classification_id', sa.String(length=36),
sa.ForeignKey("classifications.id"), primary_key=True))
op.create_table(
'classification_group_to_cg_mappings',
sa.Column('container_cg_id', sa.String(length=36), sa.ForeignKey(
"classification_groups.id"), primary_key=True),
"classification_groups.id", ondelete="CASCADE"),
primary_key=True),
sa.Column('stored_cg_id', sa.String(length=36), sa.ForeignKey(
"classification_groups.id"), primary_key=True))
op.create_table(
'ipv4_classifications',
sa.Column('id', sa.String(length=36), sa.ForeignKey(
"classifications.id"), primary_key=True),
"classifications.id", ondelete="CASCADE"),
primary_key=True),
sa.Column('dscp', sa.Integer()),
sa.Column('dscp_mask', sa.Integer()),
sa.Column('ecn', sa.Enum("0", "1", "2", "3")),
sa.Column('ecn', sa.Enum("0", "1", "2", "3", name="ecn_types")),
sa.Column('length_min', sa.Integer()),
sa.Column('length_max', sa.Integer()),
sa.Column('flags', sa.Integer()),
@ -85,10 +89,11 @@ def upgrade():
op.create_table(
'ipv6_classifications',
sa.Column('id', sa.String(length=36), sa.ForeignKey(
"classifications.id"), primary_key=True),
"classifications.id", ondelete="CASCADE"),
primary_key=True),
sa.Column('dscp', sa.Integer()),
sa.Column('dscp_mask', sa.Integer()),
sa.Column('ecn', sa.Enum("0", "1", "2", "3")),
sa.Column('ecn', sa.Enum("0", "1", "2", "3", name="ecn_types")),
sa.Column('length_min', sa.Integer()),
sa.Column('length_max', sa.Integer()),
sa.Column('next_header', sa.Integer()),
@ -100,7 +105,8 @@ def upgrade():
op.create_table(
'ethernet_classifications',
sa.Column('id', sa.String(length=36), sa.ForeignKey(
"classifications.id"), primary_key=True),
"classifications.id", ondelete="CASCADE"),
primary_key=True),
sa.Column('ethertype', sa.Integer()),
sa.Column('src_addr', sa.String(length=17)),
sa.Column('dst_addr', sa.String(length=17)))
@ -108,7 +114,8 @@ def upgrade():
op.create_table(
'udp_classifications',
sa.Column('id', sa.String(length=36), sa.ForeignKey(
"classifications.id"), primary_key=True),
"classifications.id", ondelete="CASCADE"),
primary_key=True),
sa.Column('src_port_min', sa.Integer()),
sa.Column('src_port_max', sa.Integer()),
sa.Column('dst_port_min', sa.Integer()),
@ -119,7 +126,8 @@ def upgrade():
op.create_table(
'tcp_classifications',
sa.Column('id', sa.String(length=36), sa.ForeignKey(
"classifications.id"), primary_key=True),
"classifications.id", ondelete="CASCADE"),
primary_key=True),
sa.Column('src_port_min', sa.Integer()),
sa.Column('src_port_max', sa.Integer()),
sa.Column('dst_port_min', sa.Integer()),

View File

@ -1,4 +1,5 @@
# Copyright (c) 2015 Mirantis, Inc.
# Copyright 2017 Intel Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@ -12,161 +13,189 @@
# License for the specific language governing permissions and limitations
# under the License.
from neutron_classifier.common import constants
from oslo_utils import uuidutils
from neutron.db import _model_query as mq
from neutron_lib.db import model_base
import sqlalchemy as sa
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.ext.orderinglist import ordering_list
from sqlalchemy import orm
Base = declarative_base()
# Stolen from neutron/db/model_base.py
class HasTenant(object):
"""Tenant mixin, add to subclasses that have a tenant."""
tenant_id = sa.Column(sa.String(255), index=True)
# Stolen from neutron/db/model_base.py
class HasId(object):
"""id mixin, add to subclasses that have an id."""
id = sa.Column(sa.String(36),
primary_key=True,
default=uuidutils.generate_uuid)
class Classifier(Base, HasId):
__tablename__ = 'classifiers'
classifier_type = sa.Column(sa.String)
__mapper_args__ = {'polymorphic_on': classifier_type}
class ClassifierGroup(Base, HasTenant, HasId):
__tablename__ = 'classifier_groups'
# Service plugin models
class ClassificationGroup(model_base.BASEV2, model_base.HasId,
model_base.HasProject):
__tablename__ = 'classification_groups'
name = sa.Column(sa.String(255))
description = sa.Column(sa.String(255))
classifier_chain = orm.relationship(
'ClassifierChainEntry',
backref=orm.backref('classifier_chains', cascade='all, delete'),
order_by='ClassifierChainEntry.sequence',
collection_class=ordering_list('sequence', count_from=1))
service = sa.Column(sa.Enum(*constants.NEUTRON_SERVICES), index=True)
shared = sa.Column(sa.Boolean, default=False)
operator = sa.Column(sa.Enum('AND', 'OR'), default='AND', nullable=False)
classifications = orm.relationship(
"ClassificationBase", lazy="subquery",
secondary='classification_group_to_classification_mappings')
classification_groups = orm.relationship(
"ClassificationGroup", lazy="subquery",
secondary='classification_group_to_cg_mappings',
primaryjoin="ClassificationGroup.id=="
"CGToClassificationGroupMapping.container_cg_id",
secondaryjoin="ClassificationGroup.id=="
"CGToClassificationGroupMapping.stored_cg_id")
class ClassifierChainEntry(Base, HasId):
__tablename__ = 'classifier_chains'
classifier_group_id = sa.Column(sa.String(36),
sa.ForeignKey('classifier_groups.id',
ondelete="CASCADE"))
classifier_id = sa.Column(sa.String(36),
sa.ForeignKey('classifiers.id',
ondelete="CASCADE"))
classifier = orm.relationship(Classifier)
sequence = sa.Column(sa.Integer)
classifier_group = orm.relationship(ClassifierGroup)
def __init__(self, classifier_group=None, classifier=None, sequence=None):
super(ClassifierChainEntry, self).__init__()
self.classifier = classifier
self.classifier_group = classifier_group
self.sequence = sequence
class CGToClassificationMapping(model_base.BASEV2):
__tablename__ = 'classification_group_to_classification_mappings'
container_cg_id = sa.Column(sa.String(36),
sa.ForeignKey('classification_groups.id',
ondelete='CASCADE'), primary_key=True)
classification = orm.relationship("ClassificationBase", lazy="subquery")
stored_classification_id = sa.Column(sa.String(36),
sa.ForeignKey('classifications.id'),
primary_key=True)
class DirectionClassifier(Classifier):
__tablename__ = 'direction_classifiers'
__mapper_args__ = {'polymorphic_identity': 'directionclassifier'}
id = sa.Column(sa.String(36), sa.ForeignKey('classifiers.id'),
primary_key=True)
direction = sa.Column(sa.Enum(*constants.DIRECTIONS))
def __init__(self, direction=None):
super(DirectionClassifier, self).__init__()
self.direction = direction
class CGToClassificationGroupMapping(model_base.BASEV2):
__tablename__ = 'classification_group_to_cg_mappings'
container_cg_id = sa.Column(sa.String(36),
sa.ForeignKey('classification_groups.id',
ondelete='CASCADE'), primary_key=True)
stored_cg_id = sa.Column(sa.String(36),
sa.ForeignKey('classification_groups.id'),
primary_key=True)
class EncapsulationClassifier(Classifier):
__tablename__ = 'encapsulation_classifiers'
__mapper_args__ = {'polymorphic_identity': 'encapsulationclassifier'}
id = sa.Column(sa.String(36), sa.ForeignKey('classifiers.id'),
primary_key=True)
encapsulation_type = sa.Column(sa.Enum(*constants.ENCAPSULATION_TYPES))
encapsulation_id = sa.Column(sa.String(255))
class ClassificationBase(model_base.HasId, model_base.HasProject,
model_base.BASEV2):
__tablename__ = 'classifications'
c_type = sa.Column(sa.String(36))
__mapper_args__ = {'polymorphic_on': c_type}
name = sa.Column(sa.String(255))
description = sa.Column(sa.String(255))
shared = sa.Column(sa.Boolean())
negated = sa.Column(sa.Boolean())
class EthernetClassifier(Classifier):
__tablename__ = 'ethernet_classifiers'
__mapper_args__ = {'polymorphic_identity': 'ethernetclassifier'}
id = sa.Column(sa.String(36), sa.ForeignKey('classifiers.id'),
primary_key=True)
ethertype = sa.Column(sa.Integer)
source_mac = sa.Column(sa.String(255))
destination_mac = sa.Column(sa.String(255))
class IPV4Classification(ClassificationBase):
__tablename__ = 'ipv4_classifications'
__mapper_args__ = {'polymorphic_identity': 'ipv4'}
id = sa.Column(sa.String(36), sa.ForeignKey('classifications.id',
ondelete='CASCADE'), primary_key=True)
dscp = sa.Column(sa.Integer())
dscp_mask = sa.Column(sa.Integer())
ecn = sa.Column(sa.Enum("0", "1", "2", "3", name='ecn_types'))
length_min = sa.Column(sa.Integer())
length_max = sa.Column(sa.Integer())
flags = sa.Column(sa.Integer())
flags_mask = sa.Column(sa.Integer())
ttl_min = sa.Column(sa.SmallInteger())
ttl_max = sa.Column(sa.SmallInteger())
protocol = sa.Column(sa.Integer())
src_addr = sa.Column(sa.String(19))
dst_addr = sa.Column(sa.String(19))
class IpClassifier(Classifier):
__tablename__ = 'ip_classifiers'
__mapper_args__ = {'polymorphic_identity': 'ipclassifier'}
id = sa.Column(sa.String(36), sa.ForeignKey('classifiers.id'),
primary_key=True)
source_ip_prefix = sa.Column(sa.String(255))
destination_ip_prefix = sa.Column(sa.String(255))
class IPV6Classification(ClassificationBase):
__tablename__ = 'ipv6_classifications'
__mapper_args__ = {'polymorphic_identity': 'ipv6'}
id = sa.Column(sa.String(36), sa.ForeignKey('classifications.id',
ondelete='CASCADE'), primary_key=True)
dscp = sa.Column(sa.Integer())
dscp_mask = sa.Column(sa.Integer())
ecn = sa.Column(sa.Enum("0", "1", "2", "3", name='ecn_types'))
length_min = sa.Column(sa.Integer())
length_max = sa.Column(sa.Integer())
next_header = sa.Column(sa.Integer())
hops_min = sa.Column(sa.SmallInteger())
hops_max = sa.Column(sa.SmallInteger())
src_addr = sa.Column(sa.String(49))
dst_addr = sa.Column(sa.String(49))
class Ipv4Classifier(Classifier):
__tablename__ = 'ipv4_classifiers'
__mapper_args__ = {'polymorphic_identity': 'ipv4classifier'}
id = sa.Column(sa.String(36), sa.ForeignKey('classifiers.id'),
primary_key=True)
dscp_tag = sa.Column(sa.String(255))
protocol = sa.column(sa.Enum(*constants.PROTOCOLS))
dscp_mask = sa.Column(sa.String(255))
class EthernetClassification(ClassificationBase):
__tablename__ = 'ethernet_classifications'
__mapper_args__ = {'polymorphic_identity': 'ethernet'}
id = sa.Column(sa.String(36), sa.ForeignKey('classifications.id',
ondelete='CASCADE'), primary_key=True)
ethertype = sa.Column(sa.Integer())
src_addr = sa.Column(sa.String(17))
dst_addr = sa.Column(sa.String(17))
class Ipv6Classifier(Classifier):
__tablename__ = 'ipv6_classifiers'
__mapper_args__ = {'polymorphic_identity': 'ipv6classifier'}
id = sa.Column(sa.String(36), sa.ForeignKey('classifiers.id'),
primary_key=True)
next_header = sa.Column(sa.Enum(*constants.PROTOCOLS))
traffic_class = sa.Column(sa.String(255))
flow_label = sa.Column(sa.String(255))
class UDPClassification(ClassificationBase):
__tablename__ = 'udp_classifications'
__mapper_args__ = {'polymorphic_identity': 'udp'}
id = sa.Column(sa.String(36), sa.ForeignKey('classifications.id',
ondelete='CASCADE'), primary_key=True)
src_port_min = sa.Column(sa.Integer)
src_port_max = sa.Column(sa.Integer)
dst_port_min = sa.Column(sa.Integer)
dst_port_max = sa.Column(sa.Integer)
length_min = sa.Column(sa.Integer())
length_max = sa.Column(sa.Integer())
class NeutronPortClassifier(Classifier):
__tablename__ = 'neutron_port_classifiers'
__mapper_args__ = {'polymorphic_identity': 'neutronportclassifier'}
id = sa.Column(sa.String(36), sa.ForeignKey('classifiers.id'),
primary_key=True)
logical_source_port = sa.Column(sa.String(255))
logical_destination_port = sa.Column(sa.String(255))
class TCPClassification(ClassificationBase):
__tablename__ = 'tcp_classifications'
__mapper_args__ = {'polymorphic_identity': 'tcp'}
id = sa.Column(sa.String(36), sa.ForeignKey('classifications.id',
ondelete='CASCADE'), primary_key=True)
src_port_min = sa.Column(sa.Integer)
src_port_max = sa.Column(sa.Integer)
dst_port_min = sa.Column(sa.Integer)
dst_port_max = sa.Column(sa.Integer)
flags = sa.Column(sa.Integer())
flags_mask = sa.Column(sa.Integer())
window_min = sa.Column(sa.Integer())
window_max = sa.Column(sa.Integer())
class TransportClassifier(Classifier):
__tablename__ = 'transport_classifiers'
__mapper_args__ = {'polymorphic_identity': 'transportclassifier'}
id = sa.Column(sa.String(36), sa.ForeignKey('classifiers.id'),
primary_key=True)
source_port_range_max = sa.Column(sa.Integer)
source_port_range_min = sa.Column(sa.Integer)
destination_port_range_max = sa.Column(sa.Integer)
destination_port_range_min = sa.Column(sa.Integer)
def _read_classification_group(context, id):
"""Returns a classification group."""
def __init__(self, source_port_range_min=None,
source_port_range_max=None,
destination_port_range_min=None,
destination_port_range_max=None):
super(TransportClassifier, self).__init__()
self.destination_port_range_min = destination_port_range_min
self.destination_port_range_max = destination_port_range_max
self.source_port_range_min = source_port_range_min
self.source_port_range_max = source_port_range_max
cg = mq.get_by_id(context, ClassificationGroup, id)
return cg
class VlanClassifier(Classifier):
__tablename__ = 'vlan_classifiers'
__mapper_args__ = {'polymorphic_identity': 'vlanclassifier'}
id = sa.Column(sa.String(36), sa.ForeignKey('classifiers.id'),
primary_key=True)
vlan_priority = sa.Column(sa.Integer)
def _read_classifications(context, id):
"""Returns all the classifications mapped/related to a
classification group.
"""
class_group = _read_classification_group(context, id)
return class_group.classifications
def _read_classification_groups(context, id):
"""Returns all the classification groups mapped/related to a
classification group.
"""
class_group = _read_classification_group(context, id)
return class_group.classification_groups
def _generate_dict_from_cg_db(model, fields=None):
resp = {}
resp['id'] = model.id
resp['name'] = model.name
resp['description'] = model.description
resp['project_id'] = model.project_id
resp['classifications'] = model.classifications
resp['classification_groups'] = model.classification_groups
resp['shared'] = model.shared
resp['operator'] = model.operator
return resp
def _read_all_classification_groups(plugin, context):
"""Returns all classification groups."""
class_group = plugin._get_collection(context, ClassificationGroup,
_generate_dict_from_cg_db)
return class_group
RESOURCE_MODELS = {'ethernet_classification': EthernetClassification,
'ipv4_classification': IPV4Classification,
'ipv6_classification': IPV6Classification,
'tcp_classification': TCPClassification,
'udp_classification': UDPClassification}

View File

@ -0,0 +1,26 @@
# Copyright 2017 Intel Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron.db import rbac_db_models
from neutron_lib.db import model_base
class ClassificationGroupRBAC(rbac_db_models.RBACColumns, model_base.BASEV2):
"""RBAC table for classification groups."""
object_id = rbac_db_models._object_id_column('classification_groups.id')
object_type = 'classification_group'
def get_valid_actions(self):
return (rbac_db_models.ACCESS_SHARED)

View File

@ -1,140 +0,0 @@
# Copyright (c) 2016 Huawei Technologies India Pvt Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron_classifier.common import constants as const
from neutron_classifier.common import exceptions as exc
import netaddr
SG_RULE_TYPE = 1
FW_RULE_TYPE = 2
def get_attr_value(dict, key):
return dict.get(key, None)
def _validate_fwr_protocol_parameters(fwr, protocol):
"""Check if given port values and protocol is valid."""
if protocol in (const.PROTO_NAME_ICMP, const.PROTO_NAME_ICMP_V6):
source_port_range_min = get_attr_value(fwr, 'source_port_range_min')
source_port_range_max = get_attr_value(fwr, 'source_port_range_max')
destination_port_range_min = get_attr_value(
fwr, 'destination_port_range_min')
destination_port_range_max = get_attr_value(
fwr, 'destination_port_range_max')
if (source_port_range_min or source_port_range_max or
destination_port_range_min or destination_port_range_max):
raise exc.InvalidICMPParameter(param="Source, destination port")
def _validate_sg_ethertype_and_protocol(rule, protocol):
"""Check if given ethertype and protocol is valid."""
eth_value = get_attr_value(rule, 'ethertype')
if protocol == const.PROTO_NAME_ICMP_V6:
if eth_value == const.SECURITYGROUP_ETHERTYPE_IPV4:
raise exc.EthertypeConflictWithProtocol(ethertype=eth_value,
protocol=protocol)
def validate_port_range(min_port, max_port):
"""Check that port_range is valid."""
port_range = '%s:%s' % (min_port, max_port)
if(min_port is None and max_port is None):
return
if (int(min_port) <= 0 or int(max_port) <= 0):
raise exc.InvalidPortRange(port_range=port_range)
if int(min_port) > int(max_port):
raise exc.InvalidPortRange(port_range=port_range)
def is_ethernetclassifier_valid(rule, type):
"""Check ethertype or ip_version in rule dict."""
if type == SG_RULE_TYPE:
attr_type = 'ethertype'
attr_list = [const.SECURITYGROUP_ETHERTYPE_IPV4,
const.SECURITYGROUP_ETHERTYPE_IPV6]
else:
attr_type = 'ip_version'
attr_list = [const.IP_VERSION_4, const.IP_VERSION_6]
eth_value = get_attr_value(rule, attr_type)
if not eth_value:
return False
elif eth_value not in attr_list:
raise exc.InvalidEthernetClassifier(eth_type=attr_type)
return True
def is_protocolclassifier_valid(rule, type):
"""Check protocol in rule dict and validate dependent params"""
protocol = get_attr_value(rule, 'protocol')
if not protocol:
return False
if type == SG_RULE_TYPE:
_validate_sg_ethertype_and_protocol(rule, protocol)
else:
_validate_fwr_protocol_parameters(rule, protocol)
return True
def is_ipclassifier_valid(rule, type):
"""validate the ip address received in rule dict"""
src_ip_version = dst_ip_version = None
src_ip_address = dst_ip_address = None
if type == SG_RULE_TYPE:
dst_ip_address = get_attr_value(rule, 'remote_ip_prefix')
attr_type = 'ethertype'
else:
src_ip_address = get_attr_value(rule, 'source_ip_address')
dst_ip_address = get_attr_value(rule, 'destination_ip_address')
attr_type = 'ip_version'
if src_ip_address:
src_ip_version = netaddr.IPNetwork(src_ip_address).version
if dst_ip_address:
dst_ip_version = netaddr.IPNetwork(dst_ip_address).version
rule_ip_version = get_attr_value(rule, attr_type)
if type == SG_RULE_TYPE:
if rule_ip_version != "IPv%d" % dst_ip_version:
raise exc.IpAddressConflict()
elif ((src_ip_version and src_ip_version != rule_ip_version) or
(dst_ip_version and dst_ip_version != rule_ip_version)):
raise exc.IpAddressConflict()
return True
def is_directionclassifier_valid(rule, type):
"""Check direction param in rule dict"""
direction = get_attr_value(rule, 'direction')
if not direction:
return False
return True
def is_transportclassifier_valid(rule, type):
"""Verify port range values"""
if type == SG_RULE_TYPE:
port_range_min = get_attr_value(rule, 'port_range_min')
port_range_max = get_attr_value(rule, 'port_range_max')
validate_port_range(port_range_min, port_range_max)
else:
source_port_range_min = get_attr_value(rule, 'source_port_range_min')
source_port_range_max = get_attr_value(rule, 'source_port_range_max')
destination_port_range_min = get_attr_value(
rule, 'destination_port_range_min')
destination_port_range_max = get_attr_value(
rule, 'destination_port_range_max')
validate_port_range(source_port_range_min, source_port_range_max)
validate_port_range(destination_port_range_min,
destination_port_range_max)
return True

View File

@ -0,0 +1,17 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
def register_objects():
# local import to avoid circular import failure
__import__('neutron_classifier.objects.classifications')
__import__('neutron_classifier.objects.classification_type')

View File

@ -0,0 +1,40 @@
# Copyright (c) 2017 Intel Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron.objects import base
from neutron_classifier.common import constants
from oslo_versionedobjects import fields as obj_fields
@base.NeutronObjectRegistry.register
class ClassificationType(base.NeutronObject):
VERSION = '1.0'
fields = {
'type': obj_fields.StringField(),
'supported_parameters': obj_fields.ListOfStringsField(),
}
@classmethod
def get_object(cls, classification_type, **kwargs):
parameters = list(constants.SUPPORTED_FIELDS[classification_type])
return cls(type=classification_type, supported_parameters=parameters)
@classmethod
def get_objects(cls, classification_type, **kwargs):
# Only get one object
pass

View File

@ -0,0 +1,304 @@
# Copyright 2017 Intel Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import six
from oslo_versionedobjects import base as obj_base
from oslo_versionedobjects import fields as obj_fields
from neutron.db import api as db_api
from neutron.objects import base
from neutron.objects import common_types
from neutron.objects import rbac_db
from neutron_classifier.db import models
from neutron_classifier.db.rbac_db_models import ClassificationGroupRBAC
@obj_base.VersionedObjectRegistry.register
class ClassificationGroup(rbac_db.NeutronRbacObject):
# Version 1.0: Initial version
VERSION = '1.0'
# required by RbacNeutronMetaclass
rbac_db_cls = ClassificationGroupRBAC
db_model = models.ClassificationGroup
fields = {
'id': common_types.UUIDField(),
'name': obj_fields.StringField(),
'description': obj_fields.StringField(),
'project_id': obj_fields.StringField(),
'shared': obj_fields.BooleanField(default=False),
'operator': obj_fields.EnumField(['AND', 'OR'], default='AND'),
}
fields_no_update = ['id', 'project_id']
@classmethod
def get_object(cls, context, **kwargs):
# We want to get the policy regardless of its tenant id. We'll make
# sure the tenant has permission to access the policy later on.
admin_context = context.elevated()
with db_api.autonested_transaction(admin_context.session):
obj = super(ClassificationGroup, cls).get_object(admin_context,
**kwargs)
if not obj or not cls.is_accessible(context, obj):
return
return obj
@classmethod
def get_bound_tenant_ids(cls, context, **kwargs):
# If we can return the policy regardless of tenant, we don't need
# to return the tenant id.
pass
@obj_base.VersionedObjectRegistry.register
class CGToClassificationMapping(base.NeutronDbObject):
VERSION = '1.0'
rbac_db_model = ClassificationGroupRBAC
db_model = models.CGToClassificationMapping
fields = {
'container_cg_id': common_types.UUIDField(),
'stored_classification_id': common_types.UUIDField()}
@obj_base.VersionedObjectRegistry.register
class CGToClassificationGroupMapping(base.NeutronDbObject):
VERSION = '1.0'
rbac_db_model = ClassificationGroupRBAC
db_model = models.CGToClassificationGroupMapping
fields = {
'container_cg_id': common_types.UUIDField(),
'stored_cg_id': common_types.UUIDField()
}
@six.add_metaclass(abc.ABCMeta)
class ClassificationBase(base.NeutronDbObject):
VERSION = '1.0'
db_model = models.ClassificationBase
fields = {
'id': common_types.UUIDField(),
'name': obj_fields.StringField(),
'description': obj_fields.StringField(),
'project_id': obj_fields.StringField(),
'shared': obj_fields.BooleanField(default=False),
'c_type': obj_fields.StringField(),
'negated': obj_fields.BooleanField(default=False),
}
fields_no_update = ['id', 'c_type']
@classmethod
def get_objects(cls, context, _pager=None, validate_filters=True,
**kwargs):
with db_api.autonested_transaction(context.session):
objects = super(ClassificationBase,
cls).get_objects(context, _pager,
validate_filters,
**kwargs)
return objects
@obj_base.VersionedObjectRegistry.register
class IPV4Classification(ClassificationBase):
VERSION = '1.0'
db_model = models.IPV4Classification
fields = {
'dscp': obj_fields.IntegerField(nullable=True),
'dscp_mask': obj_fields.IntegerField(nullable=True),
'ecn': obj_fields.EnumField(valid_values=["0", "1", "2", "3"],
nullable=True),
'length_min': obj_fields.IntegerField(nullable=True),
'length_max': obj_fields.IntegerField(nullable=True),
'flags': obj_fields.IntegerField(nullable=True),
'flags_mask': obj_fields.IntegerField(nullable=True),
'ttl_min': obj_fields.IntegerField(nullable=True),
'ttl_max': obj_fields.IntegerField(nullable=True),
'protocol': obj_fields.IntegerField(nullable=True),
'src_addr': obj_fields.StringField(nullable=True),
'dst_addr': obj_fields.StringField(nullable=True),
}
def create(self):
with db_api.autonested_transaction(self.obj_context.session):
super(ClassificationBase, self).create()
@classmethod
def get_object(cls, context, **kwargs):
with db_api.autonested_transaction(context.session):
obj = super(IPV4Classification,
cls).get_object(context, c_type='ipv4',
**kwargs)
return obj
@obj_base.VersionedObjectRegistry.register
class IPV6Classification(ClassificationBase):
VERSION = '1.0'
db_model = models.IPV6Classification
fields = {
'dscp': obj_fields.IntegerField(nullable=True),
'dscp_mask': obj_fields.IntegerField(nullable=True),
'ecn': obj_fields.EnumField(valid_values=["0", "1", "2", "3"],
nullable=True),
'length_min': obj_fields.IntegerField(nullable=True),
'length_max': obj_fields.IntegerField(nullable=True),
'next_header': obj_fields.IntegerField(nullable=True),
'hops_min': obj_fields.IntegerField(nullable=True),
'hops_max': obj_fields.IntegerField(nullable=True),
'src_addr': obj_fields.StringField(nullable=True),
'dst_addr': obj_fields.StringField(nullable=True),
}
def create(self):
with db_api.autonested_transaction(self.obj_context.session):
super(ClassificationBase, self).create()
@classmethod
def get_object(cls, context, **kwargs):
with db_api.autonested_transaction(context.session):
obj = super(IPV6Classification,
cls).get_object(context, c_type='ipv6',
**kwargs)
return obj
@obj_base.VersionedObjectRegistry.register
class EthernetClassification(ClassificationBase):
VERSION = '1.0'
db_model = models.EthernetClassification
fields = {
'ethertype': obj_fields.IntegerField(nullable=True),
'src_addr': obj_fields.StringField(nullable=True),
'dst_addr': obj_fields.StringField(nullable=True),
}
def create(self):
with db_api.autonested_transaction(self.obj_context.session):
super(ClassificationBase, self).create()
@classmethod
def get_object(cls, context, **kwargs):
with db_api.autonested_transaction(context.session):
obj = super(EthernetClassification,
cls).get_object(context, c_type='ethernet',
**kwargs)
return obj
@obj_base.VersionedObjectRegistry.register
class UDPClassification(ClassificationBase):
VERSION = '1.0'
db_model = models.UDPClassification
fields = {
'src_port_min': obj_fields.IntegerField(nullable=True),
'src_port_max': obj_fields.IntegerField(nullable=True),
'dst_port_min': obj_fields.IntegerField(nullable=True),
'dst_port_max': obj_fields.IntegerField(nullable=True),
'length_min': obj_fields.IntegerField(nullable=True),
'length_max': obj_fields.IntegerField(nullable=True),
}
def create(self):
with db_api.autonested_transaction(self.obj_context.session):
super(ClassificationBase, self).create()
@classmethod
def get_object(cls, context, **kwargs):
with db_api.autonested_transaction(context.session):
obj = super(UDPClassification,
cls).get_object(context, c_type='udp',
**kwargs)
return obj
@obj_base.VersionedObjectRegistry.register
class TCPClassification(ClassificationBase):
VERSION = '1.0'
db_model = models.TCPClassification
fields = {
'src_port_min': obj_fields.IntegerField(nullable=True),
'src_port_max': obj_fields.IntegerField(nullable=True),
'dst_port_min': obj_fields.IntegerField(nullable=True),
'dst_port_max': obj_fields.IntegerField(nullable=True),
'flags': obj_fields.IntegerField(nullable=True),
'flags_mask': obj_fields.IntegerField(nullable=True),
'window_min': obj_fields.IntegerField(nullable=True),
'window_max': obj_fields.IntegerField(nullable=True),
}
def create(self):
with db_api.autonested_transaction(self.obj_context.session):
super(ClassificationBase, self).create()
@classmethod
def get_object(cls, context, **kwargs):
with db_api.autonested_transaction(context.session):
obj = super(TCPClassification,
cls).get_object(context, c_type='tcp',
**kwargs)
return obj
# NOTE(ndahiwade): These methods were added to get the list of mapped
# classifications and classification groups to a ClassificationGroup as
# currently we don't have synthetic fields supporting subclasses and
# self-referential relationships.
def _get_mapped_classifications(context, obj):
"""Returns a list of classifications mapped to a classification group.
:param context:
:param obj: ClassificationGroup object
:return: list of Classification objects
"""
mapped_db_classifications = models._read_classifications(context, obj.id)
objs_cls = [CLASS_MAP[c.c_type] for c in mapped_db_classifications]
mapped_obj_classifications = []
for x in zip(objs_cls, mapped_db_classifications):
mapped_obj_classifications.append(x[0]._load_object(context, x[1]))
return mapped_obj_classifications
def _get_mapped_classification_groups(context, obj):
"""Returns a list of classification groups mapped to another group.
:param context:
:param obj: ClassificationGroup object
:return: list of ClassificationGroup objects
"""
mapped_cgs = [ClassificationGroup._load_object(context, cg) for cg in
models._read_classification_groups(context, obj.id)]
return mapped_cgs
CLASS_MAP = {'ethernet': EthernetClassification,
'ipv4': IPV4Classification,
'ipv6': IPV6Classification,
'udp': UDPClassification,
'tcp': TCPClassification}

View File

@ -1,20 +1,35 @@
# Copyright 2010-2011 OpenStack Foundation
# Copyright (c) 2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslotest import base
import mock
from neutron.api.rpc.callbacks import resource_manager
from neutron.tests import base
class TestCase(base.BaseTestCase):
"""Test case base class for all tests."""
class BaseClassificationTestCase(base.BaseTestCase):
def setUp(self):
super(BaseClassificationTestCase, self).setUp()
with mock.patch.object(
resource_manager.ResourceCallbacksManager, '_singleton',
new_callable=mock.PropertyMock(return_value=False)):
self.consumer_manager = resource_manager.\
ConsumerResourceCallbacksManager()
self.producer_manager = resource_manager.\
ProducerResourceCallbacksManager()
for manager in (self.consumer_manager, self.producer_manager):
manager.clear()

View File

@ -0,0 +1,49 @@
#!/usr/bin/env bash
set -ex
VENV=${1:-"dsvm-functional"}
GATE_DEST=$BASE/new
NEUTRON_PATH=$GATE_DEST/neutron
NETWORKING_CCF_PATH=$GATE_DEST/neutron-classifier
GATE_HOOKS=$NETWORKING_CCF_PATH/neutron_classifier/tests/contrib/hooks
DEVSTACK_PATH=$GATE_DEST/devstack
LOCAL_CONF=$DEVSTACK_PATH/late-local.conf
DSCONF=/tmp/devstack-tools/bin/dsconf
# Install devstack-tools used to produce local.conf; we can't rely on
# test-requirements.txt because the gate hook is triggered before neutron is
# installed
sudo -H pip install virtualenv
virtualenv /tmp/devstack-tools
/tmp/devstack-tools/bin/pip install -U devstack-tools==0.4.0
case $VENV in
"dsvm-functional"|"dsvm-fullstack")
# The following need to be set before sourcing
# configure_for_func_testing.
GATE_STACK_USER=stack
PROJECT_NAME=neutron-classifier
IS_GATE=True
LOCAL_CONF=$DEVSTACK_PATH/local.conf
source $DEVSTACK_PATH/functions
source $NEUTRON_PATH/devstack/lib/ovs
source $NEUTRON_PATH/tools/configure_for_func_testing.sh
configure_host_for_func_testing
# Make the workspace owned by the stack user
sudo chown -R $STACK_USER:$STACK_USER $BASE
;;
"dsvm-neutron-classifier")
export DEVSTACK_LOCALCONF=$(cat $LOCAL_CONF)
$BASE/new/devstack-gate/devstack-vm-gate.sh
;;
*)
echo "Unrecognized environment $VENV".
exit 1
esac

View File

@ -0,0 +1,42 @@
#!/usr/bin/env bash
set -xe
CCF_DIR="$BASE/new/neutron-classifier"
SCRIPTS_DIR="/usr/os-testr-env/bin/"
venv=${1:-"dsvm-functional"}
function generate_testr_results {
# Give job user rights to access tox logs
sudo -H -u $owner chmod o+rw .
sudo -H -u $owner chmod o+rw -R .stestr
if [ -f ".stestr/0" ] ; then
.tox/$venv/bin/subunit-1to2 < .stestr/0 > ./stestr.subunit
$SCRIPTS_DIR/subunit2html ./stestr.subunit testr_results.html
gzip -9 ./stestr.subunit
gzip -9 ./testr_results.html
sudo mv ./*.gz /opt/stack/logs/
fi
}
if [[ "$venv" == dsvm-functional* ]] || [[ "$venv" == dsvm-fullstack* ]]
then
owner=stack
sudo_env=
# Set owner permissions according to job's requirements.
cd $CCF_DIR
sudo chown -R $owner:stack $CCF_DIR
# Run tests
echo "Running neutron-classifier $venv test suite"
set +e
sudo -H -u $owner $sudo_env tox -e $venv
testr_exit_code=$?
set -e
# Collect and parse results
generate_testr_results
exit $testr_exit_code
fi

View File

@ -0,0 +1,63 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_config import cfg
from neutron.db.migration.alembic_migrations import external
from neutron.db.migration import cli as migration
from neutron.tests.functional.db import test_migrations
from neutron.tests.unit import testlib_api
from neutron_classifier.db.migration.models import head
# Tables from other repos that we depend on but do not manage.
# EXTERNAL_TABLES should contain all names of tables that are not related to
# current repo.
EXTERNAL_TABLES = set(external.TABLES)
VERSION_TABLE = 'alembic_version_classifier'
class _TestModelsMigrationsCCF(test_migrations._TestModelsMigrations):
def db_sync(self, engine):
cfg.CONF.set_override('connection', engine.url, group='database')
for conf in migration.get_alembic_configs():
self.alembic_config = conf
self.alembic_config.neutron_config = cfg.CONF
migration.do_alembic_command(conf, 'upgrade', 'heads')
def get_metadata(self):
return head.get_metadata()
def include_object(self, object_, name, type_, reflected, compare_to):
if type_ == 'table' and (name.startswith('alembic') or
name == VERSION_TABLE or
name in EXTERNAL_TABLES):
return False
else:
return True
class TestModelsMigrationsMysql(testlib_api.MySQLTestCaseMixin,
_TestModelsMigrationsCCF,
testlib_api.SqlTestCaseLight):
pass
class TestModelsMigrationsPostgresql(testlib_api.PostgreSQLTestCaseMixin,
_TestModelsMigrationsCCF,
testlib_api.SqlTestCaseLight):
pass

View File

@ -0,0 +1,157 @@
# Copyright (c) 2017 Intel Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
from neutron.db import _model_query as mq
from neutron.tests.unit import testlib_api
from neutron_classifier.db import models
from neutron_lib import context
from oslo_utils import uuidutils
class TestDatabaseModels(testlib_api.MySQLTestCaseMixin,
testlib_api.SqlTestCase):
class _MockServicePlugin(object):
def __init__(self):
self.cg_list = []
self.c_list = []
ctx = context.get_admin_context()
standard_group = {'description': "Description of test group",
'project_id': uuidutils.generate_uuid(),
'shared': True,
'operator': 'AND'}
standard_class = {'description': "Description of classification",
'project_id': uuidutils.generate_uuid()}
for n in range(4):
standard_group['name'] = "Test Group " + str(n)
standard_group['id'] = uuidutils.generate_uuid()
standard_class['name'] = "Test Class " + str(n)
standard_class['id'] = uuidutils.generate_uuid()
self._create_db_model(ctx, models.ClassificationGroup,
**standard_group)
self._create_db_model(ctx, models.ClassificationBase,
**standard_class)
self.cg_list.append(copy.copy(standard_group))
self.c_list.append(copy.copy(standard_class))
self.cg_to_c_list = [{'container_cg_id': self.cg_list[0]['id'],
'stored_classification_id':
self.c_list[0]['id']},
{'container_cg_id': self.cg_list[0]['id'],
'stored_classification_id':
self.c_list[1]['id']},
{'container_cg_id': self.cg_list[0]['id'],
'stored_classification_id':
self.c_list[2]['id']},
{'container_cg_id': self.cg_list[1]['id'],
'stored_classification_id':
self.c_list[3]['id']}]
self.cg_to_cg_list = [{'container_cg_id': self.cg_list[0]['id'],
'stored_cg_id': self.cg_list[1]['id']},
{'container_cg_id': self.cg_list[0]['id'],
'stored_cg_id': self.cg_list[2]['id']},
{'container_cg_id': self.cg_list[0]['id'],
'stored_cg_id': self.cg_list[3]['id']},
{'container_cg_id': self.cg_list[3]['id'],
'stored_cg_id': self.cg_list[1]['id']},
{'container_cg_id': self.cg_list[3]['id'],
'stored_cg_id': self.cg_list[2]['id']}]
for n in range(4):
self._create_db_model(ctx, models.CGToClassificationMapping,
**self.cg_to_c_list[n])
self._create_db_model(ctx,
models.CGToClassificationGroupMapping,
**self.cg_to_cg_list[n])
def _create_db_model(self, ctx, model, **kwargs):
model1 = model(**kwargs)
with ctx.session.begin(subtransactions=True):
ctx.session.add(model1)
def _get_collection(self, ctx, model, func):
return mq.get_collection(ctx, model,
models._generate_dict_from_cg_db)
def setUp(self):
super(TestDatabaseModels, self).setUp()
self.ctxt = context.get_admin_context()
self.mock_plugin = self._MockServicePlugin()
def test_read_classification_group(self):
ret = models._read_classification_group(self.ctxt,
self.mock_plugin.cg_list[0]
['id'])
cg = self.mock_plugin.cg_list[0]
self.assertEqual(ret.name, cg['name'])
self.assertEqual(ret.description, cg['description'])
self.assertEqual(ret.shared, cg['shared'])
self.assertEqual(ret.operator, cg['operator'])
def test_read_classifications(self):
ret = models._read_classifications(self.ctxt,
self.mock_plugin.cg_list[0]['id'])
cs = [x.name for x in ret]
self.assertEqual(len(ret), 3)
self.assertIn("Test Class 0", cs)
self.assertIn("Test Class 1", cs)
self.assertIn("Test Class 2", cs)
def test_read_classification_groups(self):
ret = models._read_classification_groups(self.ctxt,
self.mock_plugin.cg_list[0]
['id'])
cg_ids = [x.id for x in ret]
cg_names = [x.name for x in ret]
self.assertEqual(len(ret), 3)
self.assertIn(ret[0].id, cg_ids)
self.assertIn(ret[1].id, cg_ids)
self.assertIn(ret[2].id, cg_ids)
self.assertIn("Test Group 1", cg_names)
self.assertIn("Test Group 2", cg_names)
self.assertIn("Test Group 3", cg_names)
def test_read_all_classification_groups(self):
ret = models._read_all_classification_groups(self.mock_plugin,
self.ctxt)
cgs = [x['name'] for x in ret]
self.assertIn("Test Group 0", cgs)
self.assertIn("Test Group 1", cgs)
self.assertIn("Test Group 2", cgs)
self.assertIn("Test Group 3", cgs)
def test_generate_dict_from_cg_db(self):
model = models._read_classification_group(self.ctxt,
self.mock_plugin.cg_list[2]
['id'])
ret = models._generate_dict_from_cg_db(model)
self.assertEqual(ret['name'], model.name)
self.assertEqual(ret['id'], model.id)
self.assertEqual(ret['description'], model.description)
self.assertEqual(ret['project_id'], model.project_id)
self.assertEqual(ret['classifications'], model.classifications)
self.assertEqual(ret['classification_groups'],
model.classification_groups)
self.assertEqual(ret['shared'], model.shared)
self.assertEqual(ret['operator'], model.operator)

View File

@ -2,3 +2,6 @@
# The order of packages is significant, because pip processes them in the order
# of appearance. Changing the order has an impact on the overall integration
# process, which may cause wedges in the gate later.
psutil>=3.2.2 # BSD
psycopg2
PyMySQL>=0.7.6 # MIT License

View File

@ -1,7 +0,0 @@
from neutron_classifier.tests import base
class PlaceholderTest(base.TestCase):
def test_noop(self):
pass

View File

@ -0,0 +1,27 @@
# Copyright 2017 Intel Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import random
VALID_OPERATORS = ('AND', 'OR')
VALID_ECN_TYPES = ('0', '1', '2', '3')
def get_random_operator():
return random.choice(VALID_OPERATORS)
def get_random_ecn():
return random.choice(VALID_ECN_TYPES)

View File

@ -0,0 +1,68 @@
# Copyright (c) 2017 Intel Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron_classifier.objects import classification_type
from neutron_classifier.tests import base
class TestClassificationType(base.BaseClassificationTestCase):
def setUp(self):
super(TestClassificationType, self).setUp()
common_fields = ['c_type', 'description', 'negated', 'shared',
'project_id', 'id', 'name']
common_ipv = ['src_addr', 'ecn', 'length_min', 'dscp', 'dscp_mask',
'length_max', 'dst_addr']
common_tcp_udp = ['src_port_min', 'src_port_max', 'dst_port_min',
'dst_port_max']
self.ipv4_fields = common_fields + common_ipv + ['ttl_max', 'flags',
'protocol', 'ttl_min',
'flags_mask']
self.ipv6_fields = common_fields + common_ipv + ['hops_min',
'hops_max',
'next_header']
self.tcp_fields = common_fields + common_tcp_udp + ['window_min',
'flags',
'window_max',
'flags_mask']
self.udp_fields = common_fields + common_tcp_udp + ['length_min',
'length_max']
self.ethernet_fields = common_fields + ['ethertype', 'src_addr',
'dst_addr']
def test_ipv4_cls_type(self):
ipv4_obj = classification_type.ClassificationType.get_object('ipv4')
self.assertEqual(sorted(ipv4_obj.supported_parameters),
sorted(self.ipv4_fields))
def test_ipv6_cls_type(self):
ipv6_obj = classification_type.ClassificationType.get_object('ipv6')
self.assertEqual(sorted(ipv6_obj.supported_parameters),
sorted(self.ipv6_fields))
def test_tcp_cls_type(self):
tcp_obj = classification_type.ClassificationType.get_object('tcp')
self.assertEqual(sorted(tcp_obj.supported_parameters),
sorted(self.tcp_fields))
def test_udp_cls_type(self):
udp_obj = classification_type.ClassificationType.get_object('udp')
self.assertEqual(sorted(udp_obj.supported_parameters),
sorted(self.udp_fields))
def test_ethernet_cls_type(self):
ethernet_obj = classification_type.ClassificationType.get_object(
'ethernet')
self.assertEqual(sorted(ethernet_obj.supported_parameters),
sorted(self.ethernet_fields))

View File

@ -0,0 +1,73 @@
# Copyright 2017 Intel Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# NOTE(davidsha) This file is largely a copy of the test_object.py file
# from Neutron
import os
import pprint
from oslo_versionedobjects import base as obj_base
from oslo_versionedobjects import fixture
from neutron import objects as n_obj
from neutron_classifier import objects
from neutron_classifier.tests import base as test_base
# NOTE: The hashes in this list should only be changed if they come with a
# corresponding version bump in the affected objects. Please keep the list in
# alphabetic order.
# This list also includes VersionedObjects from Neutron that are registered
# through dependencies.
object_data = {
'ClassificationGroup': '1.0-e621ff663f76bb494072872222f5fe72',
'CGToClassificationGroupMapping': '1.0-8ebed0ef1035bcc4b307da1bbdc6be64',
'CGToClassificationMapping': '1.0-fe5942adbe82301a38b67bdce484efb1',
'EthernetClassification': '1.0-267f03162a6e011197b663ee34e6cb0b',
'IPV4Classification': '1.0-d4f25a09ceaad9ec817dcebb3b5c4e78',
'IPV6Classification': '1.0-1051e98146a016522d516fe1bec49079',
'TCPClassification': '1.0-1c8a4bb3b2dcdebe8913adc00788c704',
'UDPClassification': '1.0-e55c7b58b9e2c7587cf9a0113225586b'}
class TestObjectVersions(test_base.BaseClassificationTestCase):
def setUp(self):
super(TestObjectVersions, self).setUp()
# NOTE(davidsha): Neutron Classifier OvO's need to be seeded,
# There also appears to be some versioned objects leaking in from
# Neutron from dependencies.
# Because of this I've included all Neutron OvO's and added them
# to the local object_data variable.
# This dependency will prevent upgrades to a neutron OvO from breaking
# this test if they were stored statically here.
objects.register_objects()
n_obj.register_objects()
def test_versions(self):
checker = fixture.ObjectVersionChecker(
obj_base.VersionedObjectRegistry.obj_classes())
fingerprints = checker.get_hashes()
if os.getenv('GENERATE_HASHES'):
with open('object_hashes.txt', 'w') as hashes_file:
hashes_file.write(pprint.pformat(fingerprints))
expected, actual = checker.test_hashes(object_data)
self.assertEqual(expected, actual,
'Some objects have changed; please make sure the '
'versions have been bumped, and then update their '
'hashes in the object_data map in this test module.')

View File

@ -0,0 +1,276 @@
# Copyright 2018 Intel Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_utils import uuidutils
import oslo_versionedobjects
from neutron_classifier.objects import classifications
from neutron_classifier.tests import tools
from neutron_lib import context
from neutron.db import api as db_api
from neutron.tests.unit.objects import test_base
from neutron.tests.unit import testlib_api
class _CCFObjectsTestCommon(object):
# TODO(ndahiwade): this represents classifications containing Enum fields,
# will need to be reworked if more classifications are added here later.
_Enum_classifications = [classifications.IPV4Classification,
classifications.IPV6Classification]
_Enumfield = oslo_versionedobjects.fields.EnumField
ctx = context.get_admin_context()
def get_random_attrs(self, obj=None):
obj = obj
attrs = {}
for field, field_obj in obj.fields.items():
if field != 'c_type' and type(field_obj) != self._Enumfield:
random_generator = test_base.FIELD_TYPE_VALUE_GENERATOR_MAP[
type(field_obj)]
attrs[field] = random_generator()
return attrs
def _create_test_cg(self, name):
attrs = {'name': name,
'id': uuidutils.generate_uuid(),
'description': "Description of test group",
'project_id': uuidutils.generate_uuid(),
'operator': 'AND'}
cg = classifications.ClassificationGroup(self.ctx, **attrs)
cg.create()
return cg
def _create_test_classification(self, c_type, classification):
attrs = self.get_random_attrs(classification)
if classification in self._Enum_classifications:
attrs['ecn'] = tools.get_random_ecn()
attrs['c_type'] = c_type
c = classification(self.ctx, **attrs)
c.create()
return c
def _create_test_cg_cg_mapping(self, cg1, cg2):
attrs = {'container_cg_id': cg1,
'stored_cg_id': cg2}
cg_m_cg = classifications.CGToClassificationGroupMapping(self.ctx,
**attrs)
cg_m_cg.create()
return cg_m_cg
def _create_test_cg_c_mapping(self, cg, c):
attrs = {'container_cg_id': cg,
'stored_classification_id': c}
cg_m_c = classifications.CGToClassificationMapping(self.ctx,
**attrs)
cg_m_c.create()
return cg_m_c
class ClassificationGroupTest(test_base.BaseDbObjectTestCase,
testlib_api.SqlTestCase,
_CCFObjectsTestCommon):
# NOTE(ndahiwade): As the FIELD_TYPE_VALUE_GENERATOR_MAP in neutron's
# test_base for objects doesn't have an entry for operator Enum fields,
# we are adding it here for our use rather than adding in neutron.
test_base.FIELD_TYPE_VALUE_GENERATOR_MAP[
oslo_versionedobjects.fields.EnumField] = tools.get_random_operator
_test_class = classifications.ClassificationGroup
def test_get_object(self):
cg = self._create_test_cg('Test Group 0')
fetch_cg = classifications.ClassificationGroup.get_object(
self.ctx, id=cg.id)
self.assertEqual(cg, fetch_cg)
def test_get_objects(self):
cg1 = self._create_test_cg('Test Group 1')
cg2 = self._create_test_cg('Test Group 2')
cgs = classifications.ClassificationGroup.get_objects(self.ctx)
self.assertIn(cg1, cgs)
self.assertIn(cg2, cgs)
# NOTE(ndahiwade): Currently BaseDbObjectTestCase doesn't have support for
# mapping class inheritence (polymorphic_identity), and as this is unique to
# CCF we have decided not to use it for tests for individual classifications.
class UDPClassificationTest(testlib_api.SqlTestCase,
_CCFObjectsTestCommon):
test_class = classifications.UDPClassification
def test_get_object(self):
udp = self._create_test_classification('udp', self.test_class)
fetch_udp = self.test_class.get_object(self.ctx, id=udp.id)
self.assertEqual(udp, fetch_udp)
def test_get_objects(self):
udp1 = self._create_test_classification('udp', self.test_class)
udp2 = self._create_test_classification('udp', self.test_class)
fetch_udps = self.test_class.get_objects(self.ctx)
self.assertIn(udp1, fetch_udps)
self.assertIn(udp2, fetch_udps)
class IPV4ClassificationTest(testlib_api.SqlTestCase,
_CCFObjectsTestCommon):
test_class = classifications.IPV4Classification
def test_get_object(self):
ipv4 = self._create_test_classification('ipv4', self.test_class)
fetch_ipv4 = self.test_class.get_object(self.ctx, id=ipv4.id)
self.assertEqual(ipv4, fetch_ipv4)
def test_get_objects(self):
ipv4_1 = self._create_test_classification('ipv4', self.test_class)
ipv4_2 = self._create_test_classification('ipv4', self.test_class)
fetch_ipv4s = self.test_class.get_objects(self.ctx)
self.assertIn(ipv4_1, fetch_ipv4s)
self.assertIn(ipv4_2, fetch_ipv4s)
class IPV6ClassificationTest(testlib_api.SqlTestCase,
_CCFObjectsTestCommon):
test_class = classifications.IPV6Classification
def test_get_object(self):
ipv6 = self._create_test_classification('ipv6', self.test_class)
fetch_ipv6 = self.test_class.get_object(self.ctx, id=ipv6.id)
self.assertEqual(ipv6, fetch_ipv6)
def test_get_objects(self):
ipv6_1 = self._create_test_classification('ipv6', self.test_class)
ipv6_2 = self._create_test_classification('ipv6', self.test_class)
fetch_ipv6s = self.test_class.get_objects(self.ctx)
self.assertIn(ipv6_1, fetch_ipv6s)
self.assertIn(ipv6_2, fetch_ipv6s)
class TCPClassificationTest(testlib_api.SqlTestCase,
_CCFObjectsTestCommon):
test_class = classifications.TCPClassification
def test_get_object(self):
tcp = self._create_test_classification('tcp', self.test_class)
fetch_tcp = self.test_class.get_object(self.ctx, id=tcp.id)
self.assertEqual(tcp, fetch_tcp)
def test_get_objects(self):
tcp_1 = self._create_test_classification('tcp', self.test_class)
tcp_2 = self._create_test_classification('tcp', self.test_class)
fetch_tcps = self.test_class.get_objects(self.ctx)
self.assertIn(tcp_1, fetch_tcps)
self.assertIn(tcp_2, fetch_tcps)
class EthernetClassificationTest(testlib_api.SqlTestCase,
_CCFObjectsTestCommon):
test_class = classifications.EthernetClassification
def test_get_object(self):
ethernet = self._create_test_classification('ethernet',
self.test_class)
fetch_ethernet = self.test_class.get_object(self.ctx, id=ethernet.id)
self.assertEqual(ethernet, fetch_ethernet)
def test_get_objects(self):
ethernet_1 = self._create_test_classification('ethernet',
self.test_class)
ethernet_2 = self._create_test_classification('ethernet',
self.test_class)
fetch_ethernets = self.test_class.get_objects(self.ctx)
self.assertIn(ethernet_1, fetch_ethernets)
self.assertIn(ethernet_2, fetch_ethernets)
class CGToClassificationGroupMappingTest(testlib_api.SqlTestCase,
_CCFObjectsTestCommon):
def test_get_object(self):
with db_api.context_manager.writer.using(self.ctx):
cg1 = self._create_test_cg('Test Group 0')
cg2 = self._create_test_cg('Test Group 1')
cg_m_cg = self._create_test_cg_cg_mapping(cg1.id, cg2.id)
fetch_cg = classifications.ClassificationGroup.get_object(
self.ctx, id=cg1.id)
mapped_cg = classifications._get_mapped_classification_groups(
self.ctx, fetch_cg)
fetch_cg_m_cg = classifications.CGToClassificationGroupMapping.\
get_object(self.ctx, id=cg_m_cg.container_cg_id)
self.assertEqual(mapped_cg[0], cg2)
self.assertEqual(cg_m_cg, fetch_cg_m_cg)
def test_multiple_cg_mappings(self):
with db_api.context_manager.writer.using(self.ctx):
cg1 = self._create_test_cg('Test Group 0')
cg2 = self._create_test_cg('Test Group 1')
cg3 = self._create_test_cg('Test Group 2')
cg4 = self._create_test_cg('Test Group 3')
cgs = [cg2, cg3, cg4]
for cg in cgs:
self._create_test_cg_cg_mapping(cg1.id, cg.id)
fetch_cg1 = classifications.ClassificationGroup.get_object(
self.ctx, id=cg1.id)
mapped_cgs = classifications._get_mapped_classification_groups(
self.ctx, fetch_cg1)
for cg in cgs:
self.assertIn(cg, mapped_cgs)
class CGToClassificationMappingTest(testlib_api.SqlTestCase,
_CCFObjectsTestCommon):
ctx = context.get_admin_context()
def test_get_object(self):
with db_api.context_manager.writer.using(self.ctx):
cg = self._create_test_cg('Test Group')
cl_ = self._create_test_classification(
'udp', classifications.UDPClassification)
cg_m_c = self._create_test_cg_c_mapping(cg.id, cl_.id)
fetch_c = classifications.UDPClassification.get_object(
self.ctx, id=cl_.id)
fetch_cg = classifications.ClassificationGroup.get_object(
self.ctx, id=cg.id)
mapped_cs = classifications._get_mapped_classifications(
self.ctx, fetch_cg)
fetch_cg_m_c = classifications.CGToClassificationMapping. \
get_object(self.ctx, id=cg_m_c.container_cg_id)
self.assertIn(fetch_c, mapped_cs)
self.assertEqual(cg_m_c, fetch_cg_m_c)
def test_multiple_c_mappings(self):
with db_api.context_manager.writer.using(self.ctx):
cg = self._create_test_cg('Test Group')
c1 = self._create_test_classification(
'tcp', classifications.TCPClassification)
c2 = self._create_test_classification(
'udp', classifications.UDPClassification)
c3 = self._create_test_classification(
'ethernet', classifications.EthernetClassification)
cs = [c1, c2, c3]
for c in cs:
self._create_test_cg_c_mapping(cg.id, c.id)
fetch_cg = classifications.ClassificationGroup.get_object(
self.ctx, id=cg.id)
mapped_cs = classifications._get_mapped_classifications(
self.ctx, fetch_cg)
for c in cs:
self.assertIn(c, mapped_cs)

View File

@ -1,265 +0,0 @@
# Copyright (c) 2015 Mirantis, Inc.
# Copyright (c) 2015 Huawei Technologies India Pvt Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy as cp
from neutron_classifier.db import api
from neutron_classifier.db import models
import sqlalchemy as sa
from sqlalchemy.orm import sessionmaker
from oslo_utils import uuidutils
from oslotest import base
FAKE_SG_RULE_V6 = {'direction': 'INGRESS', 'protocol': 'tcp', 'ethertype':
'IPv6', 'tenant_id': 'fake_tenant', 'port_range_min': 80,
'port_range_max': 80, 'remote_ip_prefix':
'fddf:cb3b:bc4::/48', }
FAKE_SG_RULE_V4 = {'direction': 'INGRESS', 'protocol': 'tcp', 'ethertype':
'IPv4', 'tenant_id': 'fake_tenant', 'port_range_min': 80,
'port_range_max': 80, 'remote_ip_prefix': '10.0.0.0/8', }
FAKE_SG_V6 = {'name': 'fake security group', 'tenant_id':
uuidutils.generate_uuid(), 'description': 'this is fake',
'security_group_rules': [FAKE_SG_RULE_V6]}
FAKE_FW_RULE_V4 = {'ip_version': 4, 'protocol': 'udp',
'source_port_range_min': 1, 'source_port_range_max': 80,
'destination_port_range_min': 1,
'destination_port_range_max': 80,
'source_ip_address': '20.1.1.1/24',
'destination_ip_address': '30.1.1.1/24',
'position': 1, 'action': 'ALLOW', 'enabled': True,
'tenant_id': 'fake_tenant', }
FAKE_FW_RULE_V6 = {'ip_version': 6, 'protocol': 'udp',
'source_port_range_min': 1, 'source_port_range_max': 80,
'destination_port_range_min': 1,
'destination_port_range_max': 80,
'source_ip_address': 'fddf:cb3b:bc4::/48',
'destination_ip_address': 'fddf:cb3b:b33f::/48',
'position': 1, 'action': 'ALLOW', 'enabled': True,
'tenant_id': 'fake_tenant', }
FAKE_FW_V4 = {'name': 'fake firewall policy',
'tenant_id': uuidutils.generate_uuid(),
'description': 'this is fake',
'firewall_rules': [FAKE_FW_RULE_V4]}
FAKE_FW_V6 = {'name': 'fake firewall policy',
'tenant_id': uuidutils.generate_uuid(),
'description': 'this is fake',
'firewall_rules': [FAKE_FW_RULE_V6]}
class ClassifierTestContext(object):
"Classifier Database Context."
engine = None
session = None
def __init__(self):
self.engine = sa.create_engine('sqlite:///:memory:', echo=True)
self.session = sessionmaker(bind=self.engine)()
class DbApiTestCase(base.BaseTestCase):
def setUp(self):
super(DbApiTestCase, self).setUp()
self.context = ClassifierTestContext()
models.Base.metadata.create_all(self.context.engine)
def _create_classifier_group(self, service):
cg = models.ClassifierGroup()
cg.tenant_id = uuidutils.generate_uuid()
cg.name = 'test classifier'
cg.description = 'ensure all data inserted correctly'
cg.service = service
return cg
def test_create_classifier_chain(self):
cg = self._create_classifier_group('neutron-fwaas')
ipc = models.IpClassifier()
ipc.destination_ip_prefix = 'fd70:fbb6:449e::/48'
ipc.source_ip_prefix = 'fddf:cb3b:bc4::/48'
api.create_classifier_chain(cg, [ipc])
self.assertGreater(len(cg.classifier_chain), 0)
def _test_convert_security_group_rule_to_classifier(self,
security_group_rule):
# TODO(sc68cal) make this not call session.commit directly
cg = self._create_classifier_group('security-group')
api.convert_security_group_rule_to_classifier(self.context,
security_group_rule, cg)
# Save to the database
self.context.session.add(cg)
self.context.session.commit()
# Refresh the classifier group from the DB
cg = api.get_classifier_group(self.context, cg.id)
self.assertGreater(len(cg.classifier_chain), 0)
def test_convert_security_group_rule_v4_to_classifier(self):
self._test_convert_security_group_rule_to_classifier(FAKE_SG_RULE_V4)
def test_convert_security_group_rule_v6_to_classifier(self):
self._test_convert_security_group_rule_to_classifier(FAKE_SG_RULE_V6)
def test_convert_security_group_to_classifier_chain(self):
result = api.convert_security_group_to_classifier(self.context,
FAKE_SG_V6)
self.assertIsNotNone(result)
def test_convert_classifier_chain_to_security_group(self):
classifier_id = api.convert_security_group_to_classifier(
self.context, FAKE_SG_V6).id
result = api.convert_classifier_group_to_security_group(self.context,
classifier_id)
result['tenant_id'] = FAKE_SG_RULE_V6['tenant_id']
self.assertEqual(FAKE_SG_RULE_V6, result)
def _test_convert_sg_rule_to_classifier_exception(self, sg_rule):
try:
self._test_convert_security_group_rule_to_classifier(sg_rule)
except Exception:
pass
def test_convert_sg_rule_to_classifier_with_no_ethertype(self):
FAKE_SG_RULE = cp.copy(FAKE_SG_RULE_V4)
del FAKE_SG_RULE['ethertype']
self._test_convert_sg_rule_to_classifier_exception(FAKE_SG_RULE)
# test case for invalid ip-version
def test_convert_sg_rule_to_classifier_with_invalid_ethertype(self):
FAKE_SG_RULE = cp.copy(FAKE_SG_RULE_V4)
FAKE_SG_RULE['ethertype'] = 'IPvx'
self._test_convert_sg_rule_to_classifier_exception(FAKE_SG_RULE)
# test case for protocol none
def test_convert_sg_rule_to_classifier_with_None_protocol(self):
FAKE_SG_RULE = cp.copy(FAKE_SG_RULE_V4)
del FAKE_SG_RULE['protocol']
self._test_convert_sg_rule_to_classifier_exception(FAKE_SG_RULE)
# can not allow icmpv6 protocol with IPv4 version
def test_convert_sg_rule_to_classifier_with_icmpv6_protocol(self):
FAKE_SG_RULE = cp.copy(FAKE_SG_RULE_V4)
FAKE_SG_RULE['protocol'] = 'icmpv6'
self._test_convert_sg_rule_to_classifier_exception(FAKE_SG_RULE)
# ip-version is 4 and remote ip as v6 address
def test_convert_sg_rule_to_classifier_with_invalid_remote_ipv6(self):
FAKE_SG_RULE = cp.copy(FAKE_SG_RULE_V4)
FAKE_SG_RULE['remote_ip_prefix'] = 'fddf:cb3b:bc4::/48'
self._test_convert_sg_rule_to_classifier_exception(FAKE_SG_RULE)
# ip-version is 6 and remote ip as v4 address
def test_convert_sg_rule_to_classifier_with_invalid_dest_ipv4(self):
FAKE_SG_RULE = cp.copy(FAKE_SG_RULE_V6)
FAKE_SG_RULE['remote_ip_prefix'] = '1.2.3.4/24'
self._test_convert_sg_rule_to_classifier_exception(FAKE_SG_RULE)
# invalid port-range
def test_convert_sg_rule_to_classifier_with_invalid_port_range(self):
FAKE_SG_RULE = cp.copy(FAKE_SG_RULE_V4)
FAKE_SG_RULE['port_range_min'] = 200
FAKE_SG_RULE['port_range_max'] = 10
self._test_convert_sg_rule_to_classifier_exception(FAKE_SG_RULE)
# Firewall testcases
def _test_convert_firewall_rule_to_classifier(self, fw_rule):
cg = self._create_classifier_group('neutron-fwaas')
api.convert_firewall_rule_to_classifier(self.context, fw_rule, cg)
# Save to the database
self.context.session.add(cg)
self.context.session.commit()
# Refresh the classifier group from the DB
cg = api.get_classifier_group(self.context, cg.id)
self.assertGreater(len(cg.classifier_chain), 0)
def test_convert_firewall_rule_v4_to_classifier(self):
self._test_convert_firewall_rule_to_classifier(FAKE_FW_RULE_V4)
def test_convert_firewall_rule_v6_to_classifier(self):
self._test_convert_firewall_rule_to_classifier(FAKE_FW_RULE_V6)
def test_convert_firewall_policy_v4_to_classifier_chain(self):
result = api.convert_firewall_policy_to_classifier(self.context,
FAKE_FW_V4)
self.assertIsNotNone(result)
def test_convert_firewall_policy_v6_to_classifier_chain(self):
result = api.convert_firewall_policy_to_classifier(self.context,
FAKE_FW_V6)
self.assertIsNotNone(result)
def test_convert_classifier_chain_to_firewall(self):
classifier_id = api.convert_firewall_policy_to_classifier(
self.context, FAKE_FW_V6).id
result = api.convert_classifier_to_firewall(self.context,
classifier_id)
result['tenant_id'] = FAKE_FW_RULE_V6['tenant_id']
result['position'] = FAKE_FW_RULE_V6['position']
result['action'] = FAKE_FW_RULE_V6['action']
result['enabled'] = FAKE_FW_RULE_V6['enabled']
self.assertEqual(FAKE_FW_RULE_V6, result)
def _test_convert_firewall_rule_to_classifier_exception(self, fw_rule):
try:
self._test_convert_firewall_rule_to_classifier(fw_rule)
except Exception:
pass
# test case for invalid ip-version
def test_convert_firewall_rule_to_classifier_with_invalid_ip_version(self):
FAKE_FW_RULE = cp.copy(FAKE_FW_RULE_V4)
FAKE_FW_RULE['ip_version'] = 5
self._test_convert_firewall_rule_to_classifier_exception(FAKE_FW_RULE)
# test case for protocol none
def test_convert_firewall_rule_to_classifier_with_None_protocol(self):
FAKE_FW_RULE = cp.copy(FAKE_FW_RULE_V4)
del FAKE_FW_RULE['protocol']
self._test_convert_firewall_rule_to_classifier_exception(FAKE_FW_RULE)
# icmp protocol with valid port range
def test_convert_firewall_rule_to_classifier_with_icmp_protocol(self):
FAKE_FW_RULE = cp.copy(FAKE_FW_RULE_V4)
FAKE_FW_RULE['protocol'] = 'icmp'
self._test_convert_firewall_rule_to_classifier_exception(FAKE_FW_RULE)
# ip-version is 4 and source ip as v6 address
def test_convert_firewall_rule_to_classifier_with_invalid_source_ip(self):
FAKE_FW_RULE = cp.copy(FAKE_FW_RULE_V4)
FAKE_FW_RULE['source_ip_address'] = 'fddf:cb3b:bc4::/48'
self._test_convert_firewall_rule_to_classifier_exception(FAKE_FW_RULE)
# ip-version is 6 and dest ip as v4 address
def test_convert_firewall_rule_to_classifier_with_invalid_dest_ip(self):
FAKE_FW_RULE = cp.copy(FAKE_FW_RULE_V6)
FAKE_FW_RULE['destination_ip_address'] = '1.2.3.4/24'
self._test_convert_firewall_rule_to_classifier_exception(FAKE_FW_RULE)
# invalid port-range
def test_convert_firewall_rule_to_classifier_with_invalid_port_range(self):
FAKE_FW_RULE = cp.copy(FAKE_FW_RULE_V4)
FAKE_FW_RULE['source_port_range_min'] = 200
FAKE_FW_RULE['source_port_range_max'] = 10
FAKE_FW_RULE['destination_port_range_min'] = 100
FAKE_FW_RULE['destination_port_range_max'] = 10
self._test_convert_firewall_rule_to_classifier_exception(FAKE_FW_RULE)

View File

@ -0,0 +1,80 @@
- hosts: primary
tasks:
- name: Copy files from {{ ansible_user_dir }}/workspace/ on node
synchronize:
src: '{{ ansible_user_dir }}/workspace/'
dest: '{{ zuul.executor.log_root }}'
mode: pull
copy_links: true
verify_host: true
rsync_opts:
- --include=**/*nose_results.html
- --include=*/
- --exclude=*
- --prune-empty-dirs
- name: Copy files from {{ ansible_user_dir }}/workspace/ on node
synchronize:
src: '{{ ansible_user_dir }}/workspace/'
dest: '{{ zuul.executor.log_root }}'
mode: pull
copy_links: true
verify_host: true
rsync_opts:
- --include=**/*testr_results.html.gz
- --include=*/
- --exclude=*
- --prune-empty-dirs
- name: Copy files from {{ ansible_user_dir }}/workspace/ on node
synchronize:
src: '{{ ansible_user_dir }}/workspace/'
dest: '{{ zuul.executor.log_root }}'
mode: pull
copy_links: true
verify_host: true
rsync_opts:
- --include=/.testrepository/tmp*
- --include=*/
- --exclude=*
- --prune-empty-dirs
- name: Copy files from {{ ansible_user_dir }}/workspace/ on node
synchronize:
src: '{{ ansible_user_dir }}/workspace/'
dest: '{{ zuul.executor.log_root }}'
mode: pull
copy_links: true
verify_host: true
rsync_opts:
- --include=**/*testrepository.subunit.gz
- --include=*/
- --exclude=*
- --prune-empty-dirs
- name: Copy files from {{ ansible_user_dir }}/workspace/ on node
synchronize:
src: '{{ ansible_user_dir }}/workspace/'
dest: '{{ zuul.executor.log_root }}/tox'
mode: pull
copy_links: true
verify_host: true
rsync_opts:
- --include=/.tox/*/log/*
- --include=*/
- --exclude=*
- --prune-empty-dirs
- name: Copy files from {{ ansible_user_dir }}/workspace/ on node
synchronize:
src: '{{ ansible_user_dir }}/workspace/'
dest: '{{ zuul.executor.log_root }}'
mode: pull
copy_links: true
verify_host: true
rsync_opts:
- --include=/logs/**
- --include=*/
- --exclude=*
- --prune-empty-dirs

View File

@ -0,0 +1,70 @@
- hosts: all
name: neutron-classifier-functional-dsvm
tasks:
- name: Ensure legacy workspace directory
file:
path: '{{ ansible_user_dir }}/workspace'
state: directory
- shell:
cmd: |
set -e
set -x
cat > clonemap.yaml << EOF
clonemap:
- name: openstack-infra/devstack-gate
dest: devstack-gate
EOF
/usr/zuul-env/bin/zuul-cloner -m clonemap.yaml --cache-dir /opt/git \
git://git.openstack.org \
openstack-infra/devstack-gate
executable: /bin/bash
chdir: '{{ ansible_user_dir }}/workspace'
environment: '{{ zuul | zuul_legacy_vars }}'
- shell:
cmd: |
set -e
set -x
cat << 'EOF' >>"/tmp/dg-local.conf"
[[local|localrc]]
enable_plugin neutron-classifier git://git.openstack.org/openstack/neutron-classifier
EOF
executable: /bin/bash
chdir: '{{ ansible_user_dir }}/workspace'
environment: '{{ zuul | zuul_legacy_vars }}'
- shell:
cmd: |
set -e
set -x
export PYTHONUNBUFFERED=true
export DEVSTACK_GATE_UNSTACK=1
export DEVSTACK_GATE_TEMPEST=0
export DEVSTACK_GATE_EXERCISES=0
export DEVSTACK_GATE_NEUTRON=1
export DEVSTACK_GATE_INSTALL_TESTONLY=1
export BRANCH_OVERRIDE=default
if [ "$BRANCH_OVERRIDE" != "default" ] ; then
export OVERRIDE_ZUUL_BRANCH=$BRANCH_OVERRIDE
fi
# Because we are testing a non standard project, add
# our project repository. This makes zuul do the right
# reference magic for testing changes.
export PROJECTS="openstack/neutron-classifier $PROJECTS"
# Keep localrc to be able to set some vars in pre_test_hook
export KEEP_LOCALRC=1
function gate_hook {
bash -xe $BASE/new/neutron-classifier/neutron_classifier/tests/contrib/gate_hook.sh dsvm-functional
}
export -f gate_hook
function post_test_hook {
bash -xe $BASE/new/neutron-classifier/neutron_classifier/tests/contrib/post_test_hook.sh dsvm-functional
}
export -f post_test_hook
cp devstack-gate/devstack-vm-gate-wrap.sh ./safe-devstack-vm-gate-wrap.sh
./safe-devstack-vm-gate-wrap.sh
executable: /bin/bash
chdir: '{{ ansible_user_dir }}/workspace'
environment: '{{ zuul | zuul_legacy_vars }}'

View File

@ -2,4 +2,5 @@ pbr>=2.0.0,!=2.1.0 # Apache-2.0
Babel>=2.3.4,!=2.4.0 # BSD
SQLAlchemy>=1.0.10,!=1.1.5,!=1.1.6,!=1.1.7,!=1.1.8 # MIT
neutron-lib>=1.7.0 # Apache-2.0
neutron>=12.0.0 # Apache-2.0
oslo.utils>=3.20.0 # Apache-2.0

View File

@ -1,11 +1,20 @@
#The order of packages is significant, because pip processes them in the order
# of appearance. Changing the order has an impact on the overall integration
# process, which may cause wedges in the gate later.
hacking!=0.13.0,<0.14,>=0.12.0 # Apache-2.0
coverage!=4.4,>=4.0 # Apache-2.0
python-subunit>=0.0.18 # Apache-2.0/BSD
sphinx>=1.6.2 # BSD
oslosphinx>=4.7.0 # Apache-2.0
oslotest>=1.10.0 # Apache-2.0
coverage!=4.4,>=4.0 # Apache-2.0
python-subunit>=1.0.0 # Apache-2.0/BSD
sphinx!=1.6.6,!=1.6.7,>=1.6.2 # BSD
openstackdocstheme>=1.18.1 # Apache-2.0
oslosphinx>=4.7.0 # Apache-2.0
WebOb>=1.7.1 # MIT
oslotest>=3.2.0 # Apache-2.0
os-testr>=1.0.0 # Apache-2.0
testrepository>=0.0.18 # Apache-2.0/BSD
testscenarios>=0.4 # Apache-2.0/BSD
testtools>=1.4.0 # MIT
pylint==1.4.5 # GPLv2
testresources>=2.0.0 # Apache-2.0/BSD
testscenarios>=0.4 # Apache-2.0/BSD
testtools>=2.2.0 # MIT
reno>=2.5.0 # Apache-2.0
pylint==1.4.5 # GPLv2
tempest>=17.1.0 # Apache-2.0