Merge "Add support for default security group rules in SDK"

This commit is contained in:
Zuul 2023-09-04 14:10:42 +00:00 committed by Gerrit Code Review
commit bc541377a1
5 changed files with 380 additions and 0 deletions

View File

@ -71,6 +71,15 @@ Auto Allocated Topology Operations
:members: delete_auto_allocated_topology, get_auto_allocated_topology,
validate_auto_allocated_topology
Default Security Group Rules Operations
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.. autoclass:: openstack.network.v2._proxy.Proxy
:noindex:
:members: create_default_security_group_rule,
find_default_security_group_rule, get_default_security_group_rule,
delete_default_security_group_rule, default_security_group_rules
Security Group Operations
^^^^^^^^^^^^^^^^^^^^^^^^^

View File

@ -32,6 +32,9 @@ from openstack.network.v2 import (
from openstack.network.v2 import (
bgpvpn_router_association as _bgpvpn_router_association,
)
from openstack.network.v2 import (
default_security_group_rule as _default_security_group_rule,
)
from openstack.network.v2 import extension
from openstack.network.v2 import firewall_group as _firewall_group
from openstack.network.v2 import firewall_policy as _firewall_policy
@ -120,6 +123,9 @@ class Proxy(proxy.Proxy):
"bgpvpn_router_association": (
_bgpvpn_router_association.BgpVpnRouterAssociation
),
"default_security_group_rule": (
_default_security_group_rule.DefaultSecurityGroupRule
),
"extension": extension.Extension,
"firewall_group": _firewall_group.FirewallGroup,
"firewall_policy": _firewall_policy.FirewallPolicy,
@ -4841,6 +4847,114 @@ class Proxy(proxy.Proxy):
"""
return self._list(_security_group_rule.SecurityGroupRule, **query)
def create_default_security_group_rule(self, **attrs):
"""Create a new default security group rule from attributes
:param attrs: Keyword arguments which will be used to create a
:class:`~openstack.network.v2.default_security_group_rule.
DefaultSecurityGroupRule`,
comprised of the properties on the DefaultSecurityGroupRule class.
:returns: The results of default security group rule creation
:rtype:
:class:`~openstack.network.v2.default_security_group_rule.
DefaultSecurityGroupRule`
"""
return self._create(
_default_security_group_rule.DefaultSecurityGroupRule, **attrs
)
def delete_default_security_group_rule(
self,
default_security_group_rule,
ignore_missing=True,
):
"""Delete a default security group rule
:param default_security_group_rule:
The value can be either the ID of a default security group rule
or a
:class:`~openstack.network.v2.default_security_group_rule.
DefaultSecurityGroupRule` instance.
:param bool ignore_missing: When set to ``False``
:class:`~openstack.exceptions.ResourceNotFound` will be
raised when the defaul security group rule does not exist.
When set to ``True``, no exception will be set when
attempting to delete a nonexistent default security group rule.
:returns: ``None``
"""
self._delete(
_default_security_group_rule.DefaultSecurityGroupRule,
default_security_group_rule,
ignore_missing=ignore_missing,
)
def find_default_security_group_rule(
self, name_or_id, ignore_missing=True, **query
):
"""Find a single default security group rule
:param str name_or_id: The ID of a default security group rule.
:param bool ignore_missing: When set to ``False``
:class:`~openstack.exceptions.ResourceNotFound` will be
raised when the resource does not exist.
When set to ``True``, None will be returned when
attempting to find a nonexistent resource.
:param dict query: Any additional parameters to be passed into
underlying methods. such as query filters.
:returns: One
:class:`~openstack.network.v2.default_security_group_rule.
DefaultSecurityGroupRule` or None
"""
return self._find(
_default_security_group_rule.DefaultSecurityGroupRule,
name_or_id,
ignore_missing=ignore_missing,
**query,
)
def get_default_security_group_rule(self, default_security_group_rule):
"""Get a single default security group rule
:param default_security_group_rule:
The value can be the ID of a default security group rule or a
:class:`~openstack.network.v2.default_security_group_rule.
DefaultSecurityGroupRule` instance.
:returns:
:class:`~openstack.network.v2.default_security_group_rule.
DefaultSecurityGroupRule`
:raises: :class:`~openstack.exceptions.ResourceNotFound`
when no resource can be found.
"""
return self._get(
_default_security_group_rule.DefaultSecurityGroupRule,
default_security_group_rule,
)
def default_security_group_rules(self, **query):
"""Return a generator of default security group rules
:param kwargs query: Optional query parameters to be sent to limit
the resources being returned. Available parameters include:
* ``description``: The default security group rule description
* ``direction``: Default security group rule direction
* ``ether_type``: Must be IPv4 or IPv6, and addresses represented
in CIDR must match the ingress or egress rule.
* ``protocol``: Default security group rule protocol
* ``remote_group_id``: ID of a remote security group
:returns: A generator of default security group rule objects
:rtype:
:class:`~openstack.network.v2.default_security_group_rule.
DefaultSecurityGroupRule`
"""
return self._list(
_default_security_group_rule.DefaultSecurityGroupRule, **query
)
def create_segment(self, **attrs):
"""Create a new segment from attributes

View File

@ -0,0 +1,89 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from openstack.network.v2 import _base
from openstack import resource
class DefaultSecurityGroupRule(_base.NetworkResource):
resource_key = 'default_security_group_rule'
resources_key = 'default_security_group_rules'
base_path = '/default-security-group-rules'
# capabilities
allow_create = True
allow_fetch = True
allow_commit = False
allow_delete = True
allow_list = True
_query_mapping = resource.QueryParameters(
'id',
'description',
'remote_group_id',
'remote_address_group_id',
'direction',
'protocol',
'port_range_min',
'port_range_max',
'remote_ip_prefix',
'used_in_default_sg',
'used_in_non_default_sg',
'sort_dir',
'sort_key',
ether_type='ethertype',
)
# Properties
#: The default security group rule description.
description = resource.Body('description')
#: The remote security group ID to be associated with this security
#: group rule created from this template.
#: You can specify either ``remote_group_id`` or #:
#: ``remote_address_group_id`` or ``remote_ip_prefix``.
remote_group_id = resource.Body('remote_group_id')
#: The remote address group ID to be associated with this security
#: group rule created from that template.
#: You can specify either ``remote_group_id`` or
#: ``remote_address_group_id`` or ``remote_ip_prefix``.
remote_address_group_id = resource.Body('remote_address_group_id')
#: ``ingress`` or ``egress``: The direction in which the security group #:
#: rule will be applied. See 'direction' field in the security group rule
#: API.
direction = resource.Body('direction')
#: The protocol that is matched by the security group rule.
#: Valid values are ``null``, ``tcp``, ``udp``, and ``icmp``.
protocol = resource.Body('protocol')
#: The minimum port number in the range that is matched by the
#: security group rule. If the protocol is TCP or UDP, this value
#: must be less than or equal to the value of the port_range_max
#: attribute. If the protocol is ICMP, this value must be an ICMP type.
port_range_min = resource.Body('port_range_min', type=int)
#: The maximum port number in the range that is matched by the
#: security group rule. The port_range_min attribute constrains
#: the port_range_max attribute. If the protocol is ICMP, this
#: value must be an ICMP type.
port_range_max = resource.Body('port_range_max', type=int)
#: The remote IP prefix to be associated with this security group rule.
#: You can specify either ``remote_group_id`` or
#: ``remote_address_group_id`` or ``remote_ip_prefix``.
#: This attribute matches the specified IP prefix as the source or
#: destination IP address of the IP packet depending on direction.
remote_ip_prefix = resource.Body('remote_ip_prefix')
#: Must be IPv4 or IPv6, and addresses represented in CIDR must match
#: the ingress or egress rules.
ether_type = resource.Body('ethertype')
#: Indicate if this template be used to create security group rules in the
#: default security group created automatically for each project.
used_in_default_sg = resource.Body('used_in_default_sg', type=bool)
#: Indicate if this template be used to create security group rules in the
#: custom security groups created in the project by users.
used_in_non_default_sg = resource.Body('used_in_non_default_sg', type=bool)

View File

@ -0,0 +1,83 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import random
from openstack.network.v2 import default_security_group_rule
from openstack.tests.functional import base
class TestDefaultSecurityGroupRule(base.BaseFunctionalTest):
def setUp(self):
super().setUp()
if not self.user_cloud._has_neutron_extension(
"security-groups-default-rules"
):
self.skipTest(
"Neutron security-groups-default-rules extension "
"is required for this test"
)
self.IPV4 = random.choice(["IPv4", "IPv6"])
self.PROTO = random.choice(["tcp", "udp"])
self.PORT = random.randint(1, 65535)
self.DIR = random.choice(["ingress", "egress"])
self.USED_IN_DEFAULT_SG = random.choice([True, False])
self.USED_IN_NON_DEFAULT_SG = random.choice([True, False])
rul = self.operator_cloud.network.create_default_security_group_rule(
direction=self.DIR,
ethertype=self.IPV4,
port_range_max=self.PORT,
port_range_min=self.PORT,
protocol=self.PROTO,
used_in_default_sg=self.USED_IN_DEFAULT_SG,
used_in_non_default_sg=self.USED_IN_NON_DEFAULT_SG,
)
assert isinstance(
rul, default_security_group_rule.DefaultSecurityGroupRule
)
self.RULE_ID = rul.id
def tearDown(self):
sot = self.operator_cloud.network.delete_default_security_group_rule(
self.RULE_ID, ignore_missing=False
)
self.assertIsNone(sot)
super(TestDefaultSecurityGroupRule, self).tearDown()
def test_find(self):
sot = self.operator_cloud.network.find_default_security_group_rule(
self.RULE_ID
)
self.assertEqual(self.RULE_ID, sot.id)
def test_get(self):
sot = self.operator_cloud.network.get_default_security_group_rule(
self.RULE_ID
)
self.assertEqual(self.RULE_ID, sot.id)
self.assertEqual(self.DIR, sot.direction)
self.assertEqual(self.PROTO, sot.protocol)
self.assertEqual(self.PORT, sot.port_range_min)
self.assertEqual(self.PORT, sot.port_range_max)
self.assertEqual(self.USED_IN_DEFAULT_SG, sot.used_in_default_sg)
self.assertEqual(
self.USED_IN_NON_DEFAULT_SG, sot.used_in_non_default_sg
)
def test_list(self):
ids = [
o.id
for o in self.operator_cloud.network.default_security_group_rules()
]
self.assertIn(self.RULE_ID, ids)

View File

@ -0,0 +1,85 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from openstack.network.v2 import default_security_group_rule
from openstack.tests.unit import base
IDENTIFIER = 'IDENTIFIER'
EXAMPLE = {
'description': '1',
'direction': '2',
'ethertype': '3',
'id': IDENTIFIER,
'port_range_max': 4,
'port_range_min': 5,
'protocol': '6',
'remote_group_id': '7',
'remote_ip_prefix': '8',
'remote_address_group_id': '13',
'used_in_default_sg': True,
'used_in_non_default_sg': True,
}
class TestDefaultSecurityGroupRule(base.TestCase):
def test_basic(self):
sot = default_security_group_rule.DefaultSecurityGroupRule()
self.assertEqual('default_security_group_rule', sot.resource_key)
self.assertEqual('default_security_group_rules', sot.resources_key)
self.assertEqual('/default-security-group-rules', sot.base_path)
self.assertTrue(sot.allow_create)
self.assertTrue(sot.allow_fetch)
self.assertFalse(sot.allow_commit)
self.assertTrue(sot.allow_delete)
self.assertTrue(sot.allow_list)
self.assertDictEqual(
{
'description': 'description',
'direction': 'direction',
'id': 'id',
'ether_type': 'ethertype',
'limit': 'limit',
'marker': 'marker',
'port_range_max': 'port_range_max',
'port_range_min': 'port_range_min',
'protocol': 'protocol',
'remote_group_id': 'remote_group_id',
'remote_address_group_id': 'remote_address_group_id',
'remote_ip_prefix': 'remote_ip_prefix',
'sort_dir': 'sort_dir',
'sort_key': 'sort_key',
'used_in_default_sg': 'used_in_default_sg',
'used_in_non_default_sg': 'used_in_non_default_sg',
},
sot._query_mapping._mapping,
)
def test_make_it(self):
sot = default_security_group_rule.DefaultSecurityGroupRule(**EXAMPLE)
self.assertEqual(EXAMPLE['description'], sot.description)
self.assertEqual(EXAMPLE['direction'], sot.direction)
self.assertEqual(EXAMPLE['ethertype'], sot.ether_type)
self.assertEqual(EXAMPLE['id'], sot.id)
self.assertEqual(EXAMPLE['port_range_max'], sot.port_range_max)
self.assertEqual(EXAMPLE['port_range_min'], sot.port_range_min)
self.assertEqual(EXAMPLE['protocol'], sot.protocol)
self.assertEqual(EXAMPLE['remote_group_id'], sot.remote_group_id)
self.assertEqual(
EXAMPLE['remote_address_group_id'], sot.remote_address_group_id
)
self.assertEqual(EXAMPLE['remote_ip_prefix'], sot.remote_ip_prefix)
self.assertEqual(EXAMPLE['used_in_default_sg'], sot.used_in_default_sg)
self.assertEqual(
EXAMPLE['used_in_non_default_sg'], sot.used_in_non_default_sg
)