diff --git a/doc/source/cli/command-objects/default-security-group-rule.rst b/doc/source/cli/command-objects/default-security-group-rule.rst new file mode 100644 index 000000000..1905614bf --- /dev/null +++ b/doc/source/cli/command-objects/default-security-group-rule.rst @@ -0,0 +1,11 @@ +=========================== +default security group rule +=========================== + +A **default security group rule** specifies the template of the security group +rules which will be used by neutron to create rules in every new security group. + +Network v2 + +.. autoprogram-cliff:: openstack.network.v2 + :command: default security group rule * diff --git a/openstackclient/network/utils.py b/openstackclient/network/utils.py index 4d4d18e47..85a4f5ac1 100644 --- a/openstackclient/network/utils.py +++ b/openstackclient/network/utils.py @@ -81,3 +81,103 @@ def str2dict(strdict): key, sep, value = kv.partition(':') result[key] = value return result + + +def format_security_group_rule_show(obj): + data = transform_compute_security_group_rule(obj) + return zip(*sorted(data.items())) + + +def format_network_port_range(rule): + # Display port range or ICMP type and code. For example: + # - ICMP type: 'type=3' + # - ICMP type and code: 'type=3:code=0' + # - ICMP code: Not supported + # - Matching port range: '443:443' + # - Different port range: '22:24' + # - Single port: '80:80' + # - No port range: '' + port_range = '' + if is_icmp_protocol(rule['protocol']): + if rule['port_range_min']: + port_range += 'type=' + str(rule['port_range_min']) + if rule['port_range_max']: + port_range += ':code=' + str(rule['port_range_max']) + elif rule['port_range_min'] or rule['port_range_max']: + port_range_min = str(rule['port_range_min']) + port_range_max = str(rule['port_range_max']) + if rule['port_range_min'] is None: + port_range_min = port_range_max + if rule['port_range_max'] is None: + port_range_max = port_range_min + port_range = port_range_min + ':' + port_range_max + return port_range + + +def format_remote_ip_prefix(rule): + remote_ip_prefix = rule['remote_ip_prefix'] + if remote_ip_prefix is None: + ethertype = rule['ether_type'] + if ethertype == 'IPv4': + remote_ip_prefix = '0.0.0.0/0' + elif ethertype == 'IPv6': + remote_ip_prefix = '::/0' + return remote_ip_prefix + + +def convert_ipvx_case(string): + if string.lower() == 'ipv4': + return 'IPv4' + if string.lower() == 'ipv6': + return 'IPv6' + return string + + +def is_icmp_protocol(protocol): + # NOTE(rtheis): Neutron has deprecated protocol icmpv6. + # However, while the OSC CLI doesn't document the protocol, + # the code must still handle it. In addition, handle both + # protocol names and numbers. + if protocol in ['icmp', 'icmpv6', 'ipv6-icmp', '1', '58']: + return True + else: + return False + + +def convert_to_lowercase(string): + return string.lower() + + +def get_protocol(parsed_args, default_protocol='any'): + protocol = default_protocol + if parsed_args.protocol is not None: + protocol = parsed_args.protocol + if hasattr(parsed_args, "proto") and parsed_args.proto is not None: + protocol = parsed_args.proto + if protocol == 'any': + protocol = None + return protocol + + +def get_ethertype(parsed_args, protocol): + ethertype = 'IPv4' + if parsed_args.ethertype is not None: + ethertype = parsed_args.ethertype + elif is_ipv6_protocol(protocol): + ethertype = 'IPv6' + return ethertype + + +def is_ipv6_protocol(protocol): + # NOTE(rtheis): Neutron has deprecated protocol icmpv6. + # However, while the OSC CLI doesn't document the protocol, + # the code must still handle it. In addition, handle both + # protocol names and numbers. + if ( + protocol is not None + and protocol.startswith('ipv6-') + or protocol in ['icmpv6', '41', '43', '44', '58', '59', '60'] + ): + return True + else: + return False diff --git a/openstackclient/network/v2/default_security_group_rule.py b/openstackclient/network/v2/default_security_group_rule.py new file mode 100644 index 000000000..b8232a252 --- /dev/null +++ b/openstackclient/network/v2/default_security_group_rule.py @@ -0,0 +1,399 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +"""Default Security Group Rule action implementations""" + +import logging + +from osc_lib.cli import parseractions +from osc_lib.command import command +from osc_lib import exceptions +from osc_lib import utils + +from openstackclient.i18n import _ +from openstackclient.network import common +from openstackclient.network import utils as network_utils + +LOG = logging.getLogger(__name__) + + +def _get_columns(item): + column_map = {} + hidden_columns = ['location', 'name', 'revision_number'] + return utils.get_osc_show_columns_for_sdk_resource( + item, column_map, hidden_columns + ) + + +class CreateDefaultSecurityGroupRule( + command.ShowOne, common.NeutronCommandWithExtraArgs +): + _description = _("Create a new default security group rule") + + def get_parser(self, prog_name): + parser = super().get_parser(prog_name) + + parser.add_argument( + '--description', + metavar='', + help=_("Set default security group rule description"), + ) + parser.add_argument( + '--icmp-type', + metavar='', + type=int, + help=_("ICMP type for ICMP IP protocols"), + ) + parser.add_argument( + '--icmp-code', + metavar='', + type=int, + help=_("ICMP code for ICMP IP protocols"), + ) + direction_group = parser.add_mutually_exclusive_group() + direction_group.add_argument( + '--ingress', + action='store_true', + help=_("Rule will apply to incoming network traffic (default)"), + ) + direction_group.add_argument( + '--egress', + action='store_true', + help=_("Rule will apply to outgoing network traffic"), + ) + parser.add_argument( + '--ethertype', + metavar='', + choices=['IPv4', 'IPv6'], + type=network_utils.convert_ipvx_case, + help=_( + "Ethertype of network traffic " + "(IPv4, IPv6; default: based on IP protocol)" + ), + ) + remote_group = parser.add_mutually_exclusive_group() + remote_group.add_argument( + "--remote-ip", + metavar="", + help=_( + "Remote IP address block (may use CIDR notation; " + "default for IPv4 rule: 0.0.0.0/0, " + "default for IPv6 rule: ::/0)" + ), + ) + remote_group.add_argument( + "--remote-group", + metavar="", + help=_("Remote security group (ID)"), + ) + remote_group.add_argument( + "--remote-address-group", + metavar="", + help=_("Remote address group (ID)"), + ) + + parser.add_argument( + '--dst-port', + metavar='', + action=parseractions.RangeAction, + help=_( + "Destination port, may be a single port or a starting and " + "ending port range: 137:139. Required for IP protocols TCP " + "and UDP. Ignored for ICMP IP protocols." + ), + ) + parser.add_argument( + '--protocol', + metavar='', + type=network_utils.convert_to_lowercase, + help=_( + "IP protocol (ah, dccp, egp, esp, gre, icmp, igmp, " + "ipv66-encap, ipv6-frag, ipv6-icmp, ipv6-nonxt, ipv6-opts, " + "ipv6-route, ospf, pgm, rsvp, sctp, tcp, udp, udplite, vrrp " + "and integer representations [0-255] or any; " + "default: any (all protocols))" + ), + ) + parser.add_argument( + '--for-default-sg', + action='store_true', + default=False, + help=_( + "Set this default security group rule to be used in all " + "default security groups created automatically for each " + "project" + ), + ) + parser.add_argument( + '--for-custom-sg', + action='store_true', + default=True, + help=_( + "Set this default security group rule to be used in all " + "custom security groups created manually by users" + ), + ) + return parser + + def take_action(self, parsed_args): + client = self.app.client_manager.sdk_connection.network + # Build the create attributes. + attrs = {} + attrs['protocol'] = network_utils.get_protocol(parsed_args) + + if parsed_args.description is not None: + attrs['description'] = parsed_args.description + + # NOTE: A direction must be specified and ingress + # is the default. + if parsed_args.ingress or not parsed_args.egress: + attrs['direction'] = 'ingress' + if parsed_args.egress: + attrs['direction'] = 'egress' + + # NOTE(rtheis): Use ethertype specified else default based + # on IP protocol. + attrs['ethertype'] = network_utils.get_ethertype( + parsed_args, attrs['protocol'] + ) + + # NOTE(rtheis): Validate the port range and ICMP type and code. + # It would be ideal if argparse could do this. + if parsed_args.dst_port and ( + parsed_args.icmp_type or parsed_args.icmp_code + ): + msg = _( + 'Argument --dst-port not allowed with arguments ' + '--icmp-type and --icmp-code' + ) + raise exceptions.CommandError(msg) + if parsed_args.icmp_type is None and parsed_args.icmp_code is not None: + msg = _('Argument --icmp-type required with argument --icmp-code') + raise exceptions.CommandError(msg) + is_icmp_protocol = network_utils.is_icmp_protocol(attrs['protocol']) + if not is_icmp_protocol and ( + parsed_args.icmp_type or parsed_args.icmp_code + ): + msg = _( + 'ICMP IP protocol required with arguments ' + '--icmp-type and --icmp-code' + ) + raise exceptions.CommandError(msg) + # NOTE(rtheis): For backwards compatibility, continue ignoring + # the destination port range when an ICMP IP protocol is specified. + if parsed_args.dst_port and not is_icmp_protocol: + attrs['port_range_min'] = parsed_args.dst_port[0] + attrs['port_range_max'] = parsed_args.dst_port[1] + if parsed_args.icmp_type is not None and parsed_args.icmp_type >= 0: + attrs['port_range_min'] = parsed_args.icmp_type + if parsed_args.icmp_code is not None and parsed_args.icmp_code >= 0: + attrs['port_range_max'] = parsed_args.icmp_code + + if parsed_args.remote_group is not None: + attrs['remote_group_id'] = parsed_args.remote_group + elif parsed_args.remote_address_group is not None: + attrs['remote_address_group_id'] = parsed_args.remote_address_group + elif parsed_args.remote_ip is not None: + attrs['remote_ip_prefix'] = parsed_args.remote_ip + elif attrs['ethertype'] == 'IPv4': + attrs['remote_ip_prefix'] = '0.0.0.0/0' + elif attrs['ethertype'] == 'IPv6': + attrs['remote_ip_prefix'] = '::/0' + + attrs['used_in_default_sg'] = parsed_args.for_default_sg + attrs['used_in_non_default_sg'] = parsed_args.for_custom_sg + + attrs.update( + self._parse_extra_properties(parsed_args.extra_properties) + ) + + # Create and show the security group rule. + obj = client.create_default_security_group_rule(**attrs) + display_columns, columns = _get_columns(obj) + data = utils.get_item_properties(obj, columns) + return (display_columns, data) + + +class DeleteDefaultSecurityGroupRule(command.Command): + _description = _("Delete default security group rule(s)") + + def get_parser(self, prog_name): + parser = super().get_parser(prog_name) + parser.add_argument( + 'rule', + metavar='', + nargs="+", + help=_("Default security group rule(s) to delete (ID only)"), + ) + return parser + + def take_action(self, parsed_args): + result = 0 + client = self.app.client_manager.sdk_connection.network + for r in parsed_args.rule: + try: + obj = client.find_default_security_group_rule( + r, ignore_missing=False + ) + client.delete_default_security_group_rule(obj) + except Exception as e: + result += 1 + LOG.error( + _( + "Failed to delete default SG rule with " + "ID '%(rule_id)s': %(e)s" + ), + {'rule_id': r, 'e': e}, + ) + + if result > 0: + total = len(parsed_args.rule) + msg = _( + "%(result)s of %(total)s default rules failed to delete." + ) % {'result': result, 'total': total} + raise exceptions.CommandError(msg) + + +class ListDefaultSecurityGroupRule(command.Lister): + _description = _("List default security group rules") + + def _format_network_security_group_rule(self, rule): + """Transform the SDK DefaultSecurityGroupRule object to a dict + + The SDK object gets in the way of reformatting columns... + Create port_range column from port_range_min and port_range_max + """ + rule = rule.to_dict() + rule['port_range'] = network_utils.format_network_port_range(rule) + rule['remote_ip_prefix'] = network_utils.format_remote_ip_prefix(rule) + return rule + + def get_parser(self, prog_name): + parser = super().get_parser(prog_name) + + parser.add_argument( + '--protocol', + metavar='', + type=network_utils.convert_to_lowercase, + help=_( + "List rules by the IP protocol (ah, dhcp, egp, esp, gre, " + "icmp, igmp, ipv6-encap, ipv6-frag, ipv6-icmp, " + "ipv6-nonxt, ipv6-opts, ipv6-route, ospf, pgm, rsvp, " + "sctp, tcp, udp, udplite, vrrp and integer " + "representations [0-255] or any; " + "default: any (all protocols))" + ), + ) + parser.add_argument( + '--ethertype', + metavar='', + type=network_utils.convert_to_lowercase, + help=_("List default rules by the Ethertype (IPv4 or IPv6)"), + ) + direction_group = parser.add_mutually_exclusive_group() + direction_group.add_argument( + '--ingress', + action='store_true', + help=_( + "List default rules which will be applied to incoming " + "network traffic" + ), + ) + direction_group.add_argument( + '--egress', + action='store_true', + help=_( + "List default rules which will be applied to outgoing " + "network traffic" + ), + ) + return parser + + def take_action(self, parsed_args): + client = self.app.client_manager.sdk_connection.network + column_headers = ( + 'ID', + 'IP Protocol', + 'Ethertype', + 'IP Range', + 'Port Range', + 'Direction', + 'Remote Security Group', + 'Remote Address Group', + 'Used in default Security Group', + 'Used in custom Security Group', + ) + columns = ( + 'id', + 'protocol', + 'ether_type', + 'remote_ip_prefix', + 'port_range', + 'direction', + 'remote_group_id', + 'remote_address_group_id', + 'used_in_default_sg', + 'used_in_non_default_sg', + ) + + # Get the security group rules using the requested query. + query = {} + if parsed_args.ingress: + query['direction'] = 'ingress' + if parsed_args.egress: + query['direction'] = 'egress' + if parsed_args.protocol is not None: + query['protocol'] = parsed_args.protocol + + rules = [ + self._format_network_security_group_rule(r) + for r in client.default_security_group_rules(**query) + ] + + return ( + column_headers, + ( + utils.get_dict_properties( + s, + columns, + ) + for s in rules + ), + ) + + +class ShowDefaultSecurityGroupRule(command.ShowOne): + _description = _("Display default security group rule details") + + def get_parser(self, prog_name): + parser = super().get_parser(prog_name) + parser.add_argument( + 'rule', + metavar="", + help=_("Default security group rule to display (ID only)"), + ) + return parser + + def take_action(self, parsed_args): + client = self.app.client_manager.sdk_connection.network + obj = client.find_default_security_group_rule( + parsed_args.rule, ignore_missing=False + ) + # necessary for old rules that have None in this field + if not obj['remote_ip_prefix']: + obj['remote_ip_prefix'] = network_utils.format_remote_ip_prefix( + obj + ) + display_columns, columns = _get_columns(obj) + data = utils.get_item_properties(obj, columns) + return (display_columns, data) diff --git a/openstackclient/network/v2/security_group_rule.py b/openstackclient/network/v2/security_group_rule.py index 24a1fcfb0..5e0de120b 100644 --- a/openstackclient/network/v2/security_group_rule.py +++ b/openstackclient/network/v2/security_group_rule.py @@ -28,48 +28,6 @@ from openstackclient.network import utils as network_utils LOG = logging.getLogger(__name__) -def _format_security_group_rule_show(obj): - data = network_utils.transform_compute_security_group_rule(obj) - return zip(*sorted(data.items())) - - -def _format_network_port_range(rule): - # Display port range or ICMP type and code. For example: - # - ICMP type: 'type=3' - # - ICMP type and code: 'type=3:code=0' - # - ICMP code: Not supported - # - Matching port range: '443:443' - # - Different port range: '22:24' - # - Single port: '80:80' - # - No port range: '' - port_range = '' - if _is_icmp_protocol(rule['protocol']): - if rule['port_range_min']: - port_range += 'type=' + str(rule['port_range_min']) - if rule['port_range_max']: - port_range += ':code=' + str(rule['port_range_max']) - elif rule['port_range_min'] or rule['port_range_max']: - port_range_min = str(rule['port_range_min']) - port_range_max = str(rule['port_range_max']) - if rule['port_range_min'] is None: - port_range_min = port_range_max - if rule['port_range_max'] is None: - port_range_max = port_range_min - port_range = port_range_min + ':' + port_range_max - return port_range - - -def _format_remote_ip_prefix(rule): - remote_ip_prefix = rule['remote_ip_prefix'] - if remote_ip_prefix is None: - ethertype = rule['ether_type'] - if ethertype == 'IPv4': - remote_ip_prefix = '0.0.0.0/0' - elif ethertype == 'IPv6': - remote_ip_prefix = '::/0' - return remote_ip_prefix - - def _get_columns(item): column_map = {} hidden_columns = ['location', 'tenant_id'] @@ -78,29 +36,6 @@ def _get_columns(item): ) -def _convert_to_lowercase(string): - return string.lower() - - -def _convert_ipvx_case(string): - if string.lower() == 'ipv4': - return 'IPv4' - if string.lower() == 'ipv6': - return 'IPv6' - return string - - -def _is_icmp_protocol(protocol): - # NOTE(rtheis): Neutron has deprecated protocol icmpv6. - # However, while the OSC CLI doesn't document the protocol, - # the code must still handle it. In addition, handle both - # protocol names and numbers. - if protocol in ['icmp', 'icmpv6', 'ipv6-icmp', '1', '58']: - return True - else: - return False - - # TODO(abhiraut): Use the SDK resource mapped attribute names once the # OSC minimum requirements include SDK 1.0. class CreateSecurityGroupRule( @@ -188,7 +123,7 @@ class CreateSecurityGroupRule( protocol_group.add_argument( '--protocol', metavar='', - type=_convert_to_lowercase, + type=network_utils.convert_to_lowercase, help=protocol_help, **proto_choices ) @@ -196,7 +131,7 @@ class CreateSecurityGroupRule( protocol_group.add_argument( '--proto', metavar='', - type=_convert_to_lowercase, + type=network_utils.convert_to_lowercase, help=argparse.SUPPRESS, **proto_choices ) @@ -246,7 +181,7 @@ class CreateSecurityGroupRule( '--ethertype', metavar='', choices=['IPv4', 'IPv6'], - type=_convert_ipvx_case, + type=network_utils.convert_ipvx_case, help=self.enhance_help_neutron( _( "Ethertype of network traffic " @@ -264,38 +199,6 @@ class CreateSecurityGroupRule( ) return parser - def _get_protocol(self, parsed_args, default_protocol='any'): - protocol = default_protocol - if parsed_args.protocol is not None: - protocol = parsed_args.protocol - if parsed_args.proto is not None: - protocol = parsed_args.proto - if protocol == 'any': - protocol = None - return protocol - - def _get_ethertype(self, parsed_args, protocol): - ethertype = 'IPv4' - if parsed_args.ethertype is not None: - ethertype = parsed_args.ethertype - elif self._is_ipv6_protocol(protocol): - ethertype = 'IPv6' - return ethertype - - def _is_ipv6_protocol(self, protocol): - # NOTE(rtheis): Neutron has deprecated protocol icmpv6. - # However, while the OSC CLI doesn't document the protocol, - # the code must still handle it. In addition, handle both - # protocol names and numbers. - if ( - protocol is not None - and protocol.startswith('ipv6-') - or protocol in ['icmpv6', '41', '43', '44', '58', '59', '60'] - ): - return True - else: - return False - def take_action_network(self, client, parsed_args): # Get the security group ID to hold the rule. security_group_id = client.find_security_group( @@ -304,7 +207,7 @@ class CreateSecurityGroupRule( # Build the create attributes. attrs = {} - attrs['protocol'] = self._get_protocol(parsed_args) + attrs['protocol'] = network_utils.get_protocol(parsed_args) if parsed_args.description is not None: attrs['description'] = parsed_args.description @@ -318,7 +221,7 @@ class CreateSecurityGroupRule( # NOTE(rtheis): Use ethertype specified else default based # on IP protocol. - attrs['ethertype'] = self._get_ethertype( + attrs['ethertype'] = network_utils.get_ethertype( parsed_args, attrs['protocol'] ) @@ -335,7 +238,7 @@ class CreateSecurityGroupRule( if parsed_args.icmp_type is None and parsed_args.icmp_code is not None: msg = _('Argument --icmp-type required with argument --icmp-code') raise exceptions.CommandError(msg) - is_icmp_protocol = _is_icmp_protocol(attrs['protocol']) + is_icmp_protocol = network_utils.is_icmp_protocol(attrs['protocol']) if not is_icmp_protocol and ( parsed_args.icmp_type or parsed_args.icmp_code ): @@ -390,7 +293,9 @@ class CreateSecurityGroupRule( def take_action_compute(self, client, parsed_args): group = client.api.security_group_find(parsed_args.group) - protocol = self._get_protocol(parsed_args, default_protocol='tcp') + protocol = network_utils.get_protocol( + parsed_args, default_protocol='tcp' + ) if protocol == 'icmp': from_port, to_port = -1, -1 else: @@ -414,7 +319,7 @@ class CreateSecurityGroupRule( remote_ip=remote_ip, remote_group=parsed_args.remote_group, ) - return _format_security_group_rule_show(obj) + return network_utils.format_security_group_rule_show(obj) class DeleteSecurityGroupRule(common.NetworkAndComputeDelete): @@ -451,8 +356,8 @@ class ListSecurityGroupRule(common.NetworkAndComputeLister): Create port_range column from port_range_min and port_range_max """ rule = rule.to_dict() - rule['port_range'] = _format_network_port_range(rule) - rule['remote_ip_prefix'] = _format_remote_ip_prefix(rule) + rule['port_range'] = network_utils.format_network_port_range(rule) + rule['remote_ip_prefix'] = network_utils.format_remote_ip_prefix(rule) return rule def update_parser_common(self, parser): @@ -478,7 +383,7 @@ class ListSecurityGroupRule(common.NetworkAndComputeLister): parser.add_argument( '--protocol', metavar='', - type=_convert_to_lowercase, + type=network_utils.convert_to_lowercase, help=self.enhance_help_neutron( _( "List rules by the IP protocol (ah, dhcp, egp, esp, gre, " @@ -493,7 +398,7 @@ class ListSecurityGroupRule(common.NetworkAndComputeLister): parser.add_argument( '--ethertype', metavar='', - type=_convert_to_lowercase, + type=network_utils.convert_to_lowercase, help=self.enhance_help_neutron( _("List rules by the Ethertype (IPv4 or IPv6)") ), @@ -677,7 +582,9 @@ class ShowSecurityGroupRule(common.NetworkAndComputeShowOne): ) # necessary for old rules that have None in this field if not obj['remote_ip_prefix']: - obj['remote_ip_prefix'] = _format_remote_ip_prefix(obj) + obj['remote_ip_prefix'] = network_utils.format_remote_ip_prefix( + obj + ) display_columns, columns = _get_columns(obj) data = utils.get_item_properties(obj, columns) return (display_columns, data) @@ -704,4 +611,4 @@ class ShowSecurityGroupRule(common.NetworkAndComputeShowOne): raise exceptions.CommandError(msg) # NOTE(rtheis): Format security group rule - return _format_security_group_rule_show(obj) + return network_utils.format_security_group_rule_show(obj) diff --git a/openstackclient/tests/functional/network/v2/test_default_security_group_rule.py b/openstackclient/tests/functional/network/v2/test_default_security_group_rule.py new file mode 100644 index 000000000..76c8053be --- /dev/null +++ b/openstackclient/tests/functional/network/v2/test_default_security_group_rule.py @@ -0,0 +1,69 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import random + +from openstackclient.tests.functional.network.v2 import common + + +class SecurityGroupRuleTests(common.NetworkTests): + """Functional tests for security group rule""" + + def setUp(self): + super(SecurityGroupRuleTests, self).setUp() + # Nothing in this class works with Nova Network + if not self.haz_network: + self.skipTest("No Network service present") + if not self.is_extension_enabled("security-groups-default-rules"): + self.skipTest("No security-groups-default-rules extension present") + + self.port = random.randint(1, 65535) + self.protocol = random.choice(["tcp", "udp"]) + self.direction = random.choice(["ingress", "egress"]) + # Create the default security group rule. + cmd_output = self.openstack( + 'default security group rule create ' + '--protocol %(protocol)s ' + '--dst-port %(port)s:%(port)s ' + '--%(direction)s --ethertype IPv4 ' + % { + 'protocol': self.protocol, + 'port': self.port, + 'direction': self.direction, + }, + parse_output=True, + ) + self.addCleanup( + self.openstack, + 'default security group rule delete ' + cmd_output['id'], + ) + self.DEFAULT_SG_RULE_ID = cmd_output['id'] + + def test_security_group_rule_list(self): + cmd_output = self.openstack( + 'default security group rule list ', + parse_output=True, + ) + self.assertIn( + self.DEFAULT_SG_RULE_ID, [rule['ID'] for rule in cmd_output] + ) + + def test_security_group_rule_show(self): + cmd_output = self.openstack( + 'default security group rule show ' + self.DEFAULT_SG_RULE_ID, + parse_output=True, + ) + self.assertEqual(self.DEFAULT_SG_RULE_ID, cmd_output['id']) + self.assertEqual(self.protocol, cmd_output['protocol']) + self.assertEqual(self.port, cmd_output['port_range_min']) + self.assertEqual(self.port, cmd_output['port_range_max']) + self.assertEqual(self.direction, cmd_output['direction']) diff --git a/openstackclient/tests/unit/network/v2/test_default_security_group_rule.py b/openstackclient/tests/unit/network/v2/test_default_security_group_rule.py new file mode 100644 index 000000000..a7a6ec69b --- /dev/null +++ b/openstackclient/tests/unit/network/v2/test_default_security_group_rule.py @@ -0,0 +1,1133 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +from unittest import mock +from unittest.mock import call +import uuid + +from openstack.network.v2 import _proxy +from openstack.network.v2 import ( + default_security_group_rule as _default_security_group_rule, +) +from openstack.test import fakes as sdk_fakes +from osc_lib import exceptions + +from openstackclient.network import utils as network_utils +from openstackclient.network.v2 import default_security_group_rule +from openstackclient.tests.unit.network.v2 import fakes as network_fakes +from openstackclient.tests.unit import utils as tests_utils + + +class TestDefaultSecurityGroupRule(network_fakes.TestNetworkV2): + def setUp(self): + super(TestDefaultSecurityGroupRule, self).setUp() + + self.app.client_manager.sdk_connection = mock.Mock() + self.app.client_manager.sdk_connection.network = mock.Mock( + spec=_proxy.Proxy, + ) + self.sdk_client = self.app.client_manager.sdk_connection.network + + +class TestCreateDefaultSecurityGroupRule(TestDefaultSecurityGroupRule): + expected_columns = ( + 'description', + 'direction', + 'ether_type', + 'id', + 'port_range_max', + 'port_range_min', + 'protocol', + 'remote_address_group_id', + 'remote_group_id', + 'remote_ip_prefix', + 'used_in_default_sg', + 'used_in_non_default_sg', + ) + + expected_data = None + + def _setup_default_security_group_rule(self, attrs=None): + default_security_group_rule_attrs = { + 'description': 'default-security-group-rule-description-' + + uuid.uuid4().hex, + 'direction': 'ingress', + 'ether_type': 'IPv4', + 'id': 'default-security-group-rule-id-' + uuid.uuid4().hex, + 'port_range_max': None, + 'port_range_min': None, + 'protocol': None, + 'remote_group_id': None, + 'remote_address_group_id': None, + 'remote_ip_prefix': '0.0.0.0/0', + 'location': 'MUNCHMUNCHMUNCH', + 'used_in_default_sg': False, + 'used_in_non_default_sg': True, + } + attrs = attrs or {} + # Overwrite default attributes. + default_security_group_rule_attrs.update(attrs) + self._default_sg_rule = sdk_fakes.generate_fake_resource( + _default_security_group_rule.DefaultSecurityGroupRule, + **default_security_group_rule_attrs + ) + + self.sdk_client.create_default_security_group_rule.return_value = ( + self._default_sg_rule + ) + self.expected_data = ( + self._default_sg_rule.description, + self._default_sg_rule.direction, + self._default_sg_rule.ether_type, + self._default_sg_rule.id, + self._default_sg_rule.port_range_max, + self._default_sg_rule.port_range_min, + self._default_sg_rule.protocol, + self._default_sg_rule.remote_address_group_id, + self._default_sg_rule.remote_group_id, + self._default_sg_rule.remote_ip_prefix, + self._default_sg_rule.used_in_default_sg, + self._default_sg_rule.used_in_non_default_sg, + ) + + def setUp(self): + super(TestCreateDefaultSecurityGroupRule, self).setUp() + + # Get the command object to test + self.cmd = default_security_group_rule.CreateDefaultSecurityGroupRule( + self.app, self.namespace + ) + + def test_create_all_remote_options(self): + arglist = [ + '--remote-ip', + '10.10.0.0/24', + '--remote-group', + 'test-remote-group-id', + '--remote-address-group', + 'test-remote-address-group-id', + ] + self.assertRaises( + tests_utils.ParserException, + self.check_parser, + self.cmd, + arglist, + [], + ) + + def test_create_bad_ethertype(self): + arglist = [ + '--ethertype', + 'foo', + ] + self.assertRaises( + tests_utils.ParserException, + self.check_parser, + self.cmd, + arglist, + [], + ) + + def test_lowercase_ethertype(self): + arglist = [ + '--ethertype', + 'ipv4', + ] + parsed_args = self.check_parser(self.cmd, arglist, []) + self.assertEqual('IPv4', parsed_args.ethertype) + + def test_lowercase_v6_ethertype(self): + arglist = [ + '--ethertype', + 'ipv6', + ] + parsed_args = self.check_parser(self.cmd, arglist, []) + self.assertEqual('IPv6', parsed_args.ethertype) + + def test_proper_case_ethertype(self): + arglist = [ + '--ethertype', + 'IPv6', + ] + parsed_args = self.check_parser(self.cmd, arglist, []) + self.assertEqual('IPv6', parsed_args.ethertype) + + def test_create_all_port_range_options(self): + arglist = [ + '--dst-port', + '80:80', + '--icmp-type', + '3', + '--icmp-code', + '1', + ] + verifylist = [ + ('dst_port', (80, 80)), + ('icmp_type', 3), + ('icmp_code', 1), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + self.assertRaises( + exceptions.CommandError, self.cmd.take_action, parsed_args + ) + + def test_create_default_rule(self): + self._setup_default_security_group_rule( + { + 'protocol': 'tcp', + 'port_range_max': 443, + 'port_range_min': 443, + } + ) + arglist = [ + '--protocol', + 'tcp', + '--dst-port', + str(self._default_sg_rule.port_range_min), + ] + verifylist = [ + ( + 'dst_port', + ( + self._default_sg_rule.port_range_min, + self._default_sg_rule.port_range_max, + ), + ), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + self.sdk_client.create_default_security_group_rule.assert_called_once_with( + **{ + 'direction': self._default_sg_rule.direction, + 'ethertype': self._default_sg_rule.ether_type, + 'port_range_max': self._default_sg_rule.port_range_max, + 'port_range_min': self._default_sg_rule.port_range_min, + 'protocol': self._default_sg_rule.protocol, + 'remote_ip_prefix': self._default_sg_rule.remote_ip_prefix, + 'used_in_default_sg': False, + 'used_in_non_default_sg': True, + } + ) + self.assertEqual(self.expected_columns, columns) + self.assertEqual(self.expected_data, data) + + def test_create_protocol_any(self): + self._setup_default_security_group_rule( + { + 'protocol': None, + 'remote_ip_prefix': '10.0.2.0/24', + } + ) + arglist = [ + '--protocol', + 'any', + '--remote-ip', + self._default_sg_rule.remote_ip_prefix, + ] + verifylist = [ + ('protocol', 'any'), + ('remote_ip', self._default_sg_rule.remote_ip_prefix), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + self.sdk_client.create_default_security_group_rule.assert_called_once_with( + **{ + 'direction': self._default_sg_rule.direction, + 'ethertype': self._default_sg_rule.ether_type, + 'protocol': self._default_sg_rule.protocol, + 'remote_ip_prefix': self._default_sg_rule.remote_ip_prefix, + 'used_in_default_sg': False, + 'used_in_non_default_sg': True, + } + ) + self.assertEqual(self.expected_columns, columns) + self.assertEqual(self.expected_data, data) + + def test_create_remote_address_group(self): + self._setup_default_security_group_rule( + { + 'protocol': 'icmp', + 'remote_address_group_id': 'remote-address-group-id', + } + ) + arglist = [ + '--protocol', + 'icmp', + '--remote-address-group', + self._default_sg_rule.remote_address_group_id, + ] + verifylist = [ + ( + 'remote_address_group', + self._default_sg_rule.remote_address_group_id, + ), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + self.sdk_client.create_default_security_group_rule.assert_called_once_with( + **{ + 'direction': self._default_sg_rule.direction, + 'ethertype': self._default_sg_rule.ether_type, + 'protocol': self._default_sg_rule.protocol, + 'remote_address_group_id': self._default_sg_rule.remote_address_group_id, + 'used_in_default_sg': False, + 'used_in_non_default_sg': True, + } + ) + self.assertEqual(self.expected_columns, columns) + self.assertEqual(self.expected_data, data) + + def test_create_remote_group(self): + self._setup_default_security_group_rule( + { + 'protocol': 'tcp', + 'port_range_max': 22, + 'port_range_min': 22, + } + ) + arglist = [ + '--protocol', + 'tcp', + '--dst-port', + str(self._default_sg_rule.port_range_min), + '--ingress', + '--remote-group', + 'remote-group-id', + ] + verifylist = [ + ( + 'dst_port', + ( + self._default_sg_rule.port_range_min, + self._default_sg_rule.port_range_max, + ), + ), + ('ingress', True), + ('remote_group', 'remote-group-id'), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + self.sdk_client.create_default_security_group_rule.assert_called_once_with( + **{ + 'direction': self._default_sg_rule.direction, + 'ethertype': self._default_sg_rule.ether_type, + 'port_range_max': self._default_sg_rule.port_range_max, + 'port_range_min': self._default_sg_rule.port_range_min, + 'protocol': self._default_sg_rule.protocol, + 'remote_group_id': 'remote-group-id', + 'used_in_default_sg': False, + 'used_in_non_default_sg': True, + } + ) + self.assertEqual(self.expected_columns, columns) + self.assertEqual(self.expected_data, data) + + def test_create_source_group(self): + self._setup_default_security_group_rule( + { + 'remote_group_id': 'remote-group-id', + } + ) + arglist = [ + '--ingress', + '--remote-group', + 'remote-group-id', + ] + verifylist = [ + ('ingress', True), + ('remote_group', 'remote-group-id'), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + self.sdk_client.create_default_security_group_rule.assert_called_once_with( + **{ + 'direction': self._default_sg_rule.direction, + 'ethertype': self._default_sg_rule.ether_type, + 'protocol': self._default_sg_rule.protocol, + 'remote_group_id': 'remote-group-id', + 'used_in_default_sg': False, + 'used_in_non_default_sg': True, + } + ) + self.assertEqual(self.expected_columns, columns) + self.assertEqual(self.expected_data, data) + + def test_create_source_ip(self): + self._setup_default_security_group_rule( + { + 'protocol': 'icmp', + 'remote_ip_prefix': '10.0.2.0/24', + } + ) + arglist = [ + '--protocol', + self._default_sg_rule.protocol, + '--remote-ip', + self._default_sg_rule.remote_ip_prefix, + ] + verifylist = [ + ('protocol', self._default_sg_rule.protocol), + ('remote_ip', self._default_sg_rule.remote_ip_prefix), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + self.sdk_client.create_default_security_group_rule.assert_called_once_with( + **{ + 'direction': self._default_sg_rule.direction, + 'ethertype': self._default_sg_rule.ether_type, + 'protocol': self._default_sg_rule.protocol, + 'remote_ip_prefix': self._default_sg_rule.remote_ip_prefix, + 'used_in_default_sg': False, + 'used_in_non_default_sg': True, + } + ) + self.assertEqual(self.expected_columns, columns) + self.assertEqual(self.expected_data, data) + + def test_create_remote_ip(self): + self._setup_default_security_group_rule( + { + 'protocol': 'icmp', + 'remote_ip_prefix': '10.0.2.0/24', + } + ) + arglist = [ + '--protocol', + self._default_sg_rule.protocol, + '--remote-ip', + self._default_sg_rule.remote_ip_prefix, + ] + verifylist = [ + ('protocol', self._default_sg_rule.protocol), + ('remote_ip', self._default_sg_rule.remote_ip_prefix), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + self.sdk_client.create_default_security_group_rule.assert_called_once_with( + **{ + 'direction': self._default_sg_rule.direction, + 'ethertype': self._default_sg_rule.ether_type, + 'protocol': self._default_sg_rule.protocol, + 'remote_ip_prefix': self._default_sg_rule.remote_ip_prefix, + 'used_in_default_sg': False, + 'used_in_non_default_sg': True, + } + ) + self.assertEqual(self.expected_columns, columns) + self.assertEqual(self.expected_data, data) + + def test_create_tcp_with_icmp_type(self): + arglist = [ + '--protocol', + 'tcp', + '--icmp-type', + '15', + ] + verifylist = [ + ('protocol', 'tcp'), + ('icmp_type', 15), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + self.assertRaises( + exceptions.CommandError, self.cmd.take_action, parsed_args + ) + + def test_create_icmp_code(self): + arglist = [ + '--protocol', + '1', + '--icmp-code', + '1', + ] + verifylist = [ + ('protocol', '1'), + ('icmp_code', 1), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + self.assertRaises( + exceptions.CommandError, self.cmd.take_action, parsed_args + ) + + def test_create_icmp_code_zero(self): + self._setup_default_security_group_rule( + { + 'port_range_min': 15, + 'port_range_max': 0, + 'protocol': 'icmp', + 'remote_ip_prefix': '0.0.0.0/0', + } + ) + arglist = [ + '--protocol', + self._default_sg_rule.protocol, + '--icmp-type', + str(self._default_sg_rule.port_range_min), + '--icmp-code', + str(self._default_sg_rule.port_range_max), + ] + verifylist = [ + ('protocol', self._default_sg_rule.protocol), + ('icmp_code', self._default_sg_rule.port_range_max), + ('icmp_type', self._default_sg_rule.port_range_min), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + columns, data = self.cmd.take_action(parsed_args) + self.assertEqual(self.expected_columns, columns) + self.assertEqual(self.expected_data, data) + + def test_create_icmp_code_greater_than_zero(self): + self._setup_default_security_group_rule( + { + 'port_range_min': 15, + 'port_range_max': 18, + 'protocol': 'icmp', + 'remote_ip_prefix': '0.0.0.0/0', + } + ) + arglist = [ + '--protocol', + self._default_sg_rule.protocol, + '--icmp-type', + str(self._default_sg_rule.port_range_min), + '--icmp-code', + str(self._default_sg_rule.port_range_max), + ] + verifylist = [ + ('protocol', self._default_sg_rule.protocol), + ('icmp_type', self._default_sg_rule.port_range_min), + ('icmp_code', self._default_sg_rule.port_range_max), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + columns, data = self.cmd.take_action(parsed_args) + self.assertEqual(self.expected_columns, columns) + self.assertEqual(self.expected_data, data) + + def test_create_icmp_code_negative_value(self): + self._setup_default_security_group_rule( + { + 'port_range_min': 15, + 'port_range_max': None, + 'protocol': 'icmp', + 'remote_ip_prefix': '0.0.0.0/0', + } + ) + arglist = [ + '--protocol', + self._default_sg_rule.protocol, + '--icmp-type', + str(self._default_sg_rule.port_range_min), + '--icmp-code', + '-2', + ] + verifylist = [ + ('protocol', self._default_sg_rule.protocol), + ('icmp_type', self._default_sg_rule.port_range_min), + ('icmp_code', -2), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + columns, data = self.cmd.take_action(parsed_args) + self.assertEqual(self.expected_columns, columns) + self.assertEqual(self.expected_data, data) + + def test_create_icmp_type(self): + self._setup_default_security_group_rule( + { + 'port_range_min': 15, + 'protocol': 'icmp', + 'remote_ip_prefix': '0.0.0.0/0', + } + ) + arglist = [ + '--icmp-type', + str(self._default_sg_rule.port_range_min), + '--protocol', + self._default_sg_rule.protocol, + ] + verifylist = [ + ('dst_port', None), + ('icmp_type', self._default_sg_rule.port_range_min), + ('icmp_code', None), + ('protocol', self._default_sg_rule.protocol), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + self.sdk_client.create_default_security_group_rule.assert_called_once_with( + **{ + 'direction': self._default_sg_rule.direction, + 'ethertype': self._default_sg_rule.ether_type, + 'port_range_min': self._default_sg_rule.port_range_min, + 'protocol': self._default_sg_rule.protocol, + 'remote_ip_prefix': self._default_sg_rule.remote_ip_prefix, + 'used_in_default_sg': False, + 'used_in_non_default_sg': True, + } + ) + self.assertEqual(self.expected_columns, columns) + self.assertEqual(self.expected_data, data) + + def test_create_icmp_type_zero(self): + self._setup_default_security_group_rule( + { + 'port_range_min': 0, + 'protocol': 'icmp', + 'remote_ip_prefix': '0.0.0.0/0', + } + ) + arglist = [ + '--icmp-type', + str(self._default_sg_rule.port_range_min), + '--protocol', + self._default_sg_rule.protocol, + ] + verifylist = [ + ('dst_port', None), + ('icmp_type', self._default_sg_rule.port_range_min), + ('icmp_code', None), + ('protocol', self._default_sg_rule.protocol), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + self.sdk_client.create_default_security_group_rule.assert_called_once_with( + **{ + 'direction': self._default_sg_rule.direction, + 'ethertype': self._default_sg_rule.ether_type, + 'port_range_min': self._default_sg_rule.port_range_min, + 'protocol': self._default_sg_rule.protocol, + 'remote_ip_prefix': self._default_sg_rule.remote_ip_prefix, + 'used_in_default_sg': False, + 'used_in_non_default_sg': True, + } + ) + self.assertEqual(self.expected_columns, columns) + self.assertEqual(self.expected_data, data) + + def test_create_icmp_type_greater_than_zero(self): + self._setup_default_security_group_rule( + { + 'port_range_min': 13, # timestamp + 'protocol': 'icmp', + 'remote_ip_prefix': '0.0.0.0/0', + } + ) + arglist = [ + '--icmp-type', + str(self._default_sg_rule.port_range_min), + '--protocol', + self._default_sg_rule.protocol, + ] + verifylist = [ + ('dst_port', None), + ('icmp_type', self._default_sg_rule.port_range_min), + ('icmp_code', None), + ('protocol', self._default_sg_rule.protocol), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + self.sdk_client.create_default_security_group_rule.assert_called_once_with( + **{ + 'direction': self._default_sg_rule.direction, + 'ethertype': self._default_sg_rule.ether_type, + 'port_range_min': self._default_sg_rule.port_range_min, + 'protocol': self._default_sg_rule.protocol, + 'remote_ip_prefix': self._default_sg_rule.remote_ip_prefix, + 'used_in_default_sg': False, + 'used_in_non_default_sg': True, + } + ) + self.assertEqual(self.expected_columns, columns) + self.assertEqual(self.expected_data, data) + + def test_create_icmp_type_negative_value(self): + self._setup_default_security_group_rule( + { + 'port_range_min': None, # timestamp + 'protocol': 'icmp', + 'remote_ip_prefix': '0.0.0.0/0', + } + ) + arglist = [ + '--icmp-type', + '-13', + '--protocol', + self._default_sg_rule.protocol, + ] + verifylist = [ + ('dst_port', None), + ('icmp_type', -13), + ('icmp_code', None), + ('protocol', self._default_sg_rule.protocol), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + self.sdk_client.create_default_security_group_rule.assert_called_once_with( + **{ + 'direction': self._default_sg_rule.direction, + 'ethertype': self._default_sg_rule.ether_type, + 'protocol': self._default_sg_rule.protocol, + 'remote_ip_prefix': self._default_sg_rule.remote_ip_prefix, + 'used_in_default_sg': False, + 'used_in_non_default_sg': True, + } + ) + self.assertEqual(self.expected_columns, columns) + self.assertEqual(self.expected_data, data) + + def test_create_ipv6_icmp_type_code(self): + self._setup_default_security_group_rule( + { + 'ether_type': 'IPv6', + 'port_range_min': 139, + 'port_range_max': 2, + 'protocol': 'ipv6-icmp', + 'remote_ip_prefix': '::/0', + } + ) + arglist = [ + '--icmp-type', + str(self._default_sg_rule.port_range_min), + '--icmp-code', + str(self._default_sg_rule.port_range_max), + '--protocol', + self._default_sg_rule.protocol, + ] + verifylist = [ + ('dst_port', None), + ('icmp_type', self._default_sg_rule.port_range_min), + ('icmp_code', self._default_sg_rule.port_range_max), + ('protocol', self._default_sg_rule.protocol), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + self.sdk_client.create_default_security_group_rule.assert_called_once_with( + **{ + 'direction': self._default_sg_rule.direction, + 'ethertype': self._default_sg_rule.ether_type, + 'port_range_min': self._default_sg_rule.port_range_min, + 'port_range_max': self._default_sg_rule.port_range_max, + 'protocol': self._default_sg_rule.protocol, + 'remote_ip_prefix': self._default_sg_rule.remote_ip_prefix, + 'used_in_default_sg': False, + 'used_in_non_default_sg': True, + } + ) + self.assertEqual(self.expected_columns, columns) + self.assertEqual(self.expected_data, data) + + def test_create_icmpv6_type(self): + self._setup_default_security_group_rule( + { + 'ether_type': 'IPv6', + 'port_range_min': 139, + 'protocol': 'icmpv6', + 'remote_ip_prefix': '::/0', + } + ) + arglist = [ + '--icmp-type', + str(self._default_sg_rule.port_range_min), + '--protocol', + self._default_sg_rule.protocol, + ] + verifylist = [ + ('dst_port', None), + ('icmp_type', self._default_sg_rule.port_range_min), + ('icmp_code', None), + ('protocol', self._default_sg_rule.protocol), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + self.sdk_client.create_default_security_group_rule.assert_called_once_with( + **{ + 'direction': self._default_sg_rule.direction, + 'ethertype': self._default_sg_rule.ether_type, + 'port_range_min': self._default_sg_rule.port_range_min, + 'protocol': self._default_sg_rule.protocol, + 'remote_ip_prefix': self._default_sg_rule.remote_ip_prefix, + 'used_in_default_sg': False, + 'used_in_non_default_sg': True, + } + ) + self.assertEqual(self.expected_columns, columns) + self.assertEqual(self.expected_data, data) + + def test_create_with_description(self): + self._setup_default_security_group_rule( + { + 'description': 'Setting SGR', + } + ) + arglist = [ + '--description', + self._default_sg_rule.description, + ] + verifylist = [ + ('description', self._default_sg_rule.description), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + self.sdk_client.create_default_security_group_rule.assert_called_once_with( + **{ + 'description': self._default_sg_rule.description, + 'direction': self._default_sg_rule.direction, + 'ethertype': self._default_sg_rule.ether_type, + 'protocol': self._default_sg_rule.protocol, + 'remote_ip_prefix': self._default_sg_rule.remote_ip_prefix, + 'used_in_default_sg': False, + 'used_in_non_default_sg': True, + } + ) + self.assertEqual(self.expected_columns, columns) + self.assertEqual(self.expected_data, data) + + +class TestDeleteDefaultSecurityGroupRule(TestDefaultSecurityGroupRule): + # The default security group rules to be deleted. + default_security_group_rule_attrs = { + 'direction': 'ingress', + 'ether_type': 'IPv4', + 'port_range_max': None, + 'port_range_min': None, + 'protocol': None, + 'remote_group_id': None, + 'remote_address_group_id': None, + 'remote_ip_prefix': '0.0.0.0/0', + 'location': 'MUNCHMUNCHMUNCH', + 'used_in_default_sg': False, + 'used_in_non_default_sg': True, + } + _default_sg_rules = list( + sdk_fakes.generate_fake_resources( + _default_security_group_rule.DefaultSecurityGroupRule, + count=2, + attrs=default_security_group_rule_attrs, + ) + ) + + def setUp(self): + super(TestDeleteDefaultSecurityGroupRule, self).setUp() + + self.sdk_client.delete_default_security_group_rule.return_value = None + + # Get the command object to test + self.cmd = default_security_group_rule.DeleteDefaultSecurityGroupRule( + self.app, self.namespace + ) + + def test_default_security_group_rule_delete(self): + arglist = [ + self._default_sg_rules[0].id, + ] + verifylist = [ + ('rule', [self._default_sg_rules[0].id]), + ] + self.sdk_client.find_default_security_group_rule.return_value = ( + self._default_sg_rules[0] + ) + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + result = self.cmd.take_action(parsed_args) + + self.sdk_client.delete_default_security_group_rule.assert_called_once_with( + self._default_sg_rules[0] + ) + self.assertIsNone(result) + + def test_multi_default_security_group_rules_delete(self): + arglist = [] + verifylist = [] + + for s in self._default_sg_rules: + arglist.append(s.id) + verifylist = [ + ('rule', arglist), + ] + self.sdk_client.find_default_security_group_rule.side_effect = ( + self._default_sg_rules + ) + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + result = self.cmd.take_action(parsed_args) + + calls = [] + for s in self._default_sg_rules: + calls.append(call(s)) + self.sdk_client.delete_default_security_group_rule.assert_has_calls( + calls + ) + self.assertIsNone(result) + + def test_multi_default_security_group_rules_delete_with_exception(self): + arglist = [ + self._default_sg_rules[0].id, + 'unexist_rule', + ] + verifylist = [ + ('rule', [self._default_sg_rules[0].id, 'unexist_rule']), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + find_mock_result = [ + self._default_sg_rules[0], + exceptions.CommandError, + ] + self.sdk_client.find_default_security_group_rule = mock.Mock( + side_effect=find_mock_result + ) + + try: + self.cmd.take_action(parsed_args) + self.fail('CommandError should be raised.') + except exceptions.CommandError as e: + self.assertEqual('1 of 2 default rules failed to delete.', str(e)) + + self.sdk_client.find_default_security_group_rule.assert_any_call( + self._default_sg_rules[0].id, ignore_missing=False + ) + self.sdk_client.find_default_security_group_rule.assert_any_call( + 'unexist_rule', ignore_missing=False + ) + self.sdk_client.delete_default_security_group_rule.assert_called_once_with( + self._default_sg_rules[0] + ) + + +class TestListDefaultSecurityGroupRule(TestDefaultSecurityGroupRule): + # The security group rule to be listed. + _default_sg_rule_tcp = sdk_fakes.generate_fake_resource( + _default_security_group_rule.DefaultSecurityGroupRule, + **{'protocol': 'tcp', 'port_range_max': 80, 'port_range_min': 80} + ) + _default_sg_rule_icmp = sdk_fakes.generate_fake_resource( + _default_security_group_rule.DefaultSecurityGroupRule, + **{'protocol': 'icmp', 'remote_ip_prefix': '10.0.2.0/24'} + ) + _default_sg_rules = [ + _default_sg_rule_tcp, + _default_sg_rule_icmp, + ] + + expected_columns = ( + 'ID', + 'IP Protocol', + 'Ethertype', + 'IP Range', + 'Port Range', + 'Direction', + 'Remote Security Group', + 'Remote Address Group', + 'Used in default Security Group', + 'Used in custom Security Group', + ) + + expected_data = [] + expected_data_no_group = [] + for _default_sg_rule in _default_sg_rules: + expected_data.append( + ( + _default_sg_rule.id, + _default_sg_rule.protocol, + _default_sg_rule.ether_type, + _default_sg_rule.remote_ip_prefix, + network_utils.format_network_port_range(_default_sg_rule), + _default_sg_rule.direction, + _default_sg_rule.remote_group_id, + _default_sg_rule.remote_address_group_id, + _default_sg_rule.used_in_default_sg, + _default_sg_rule.used_in_non_default_sg, + ) + ) + + def setUp(self): + super(TestListDefaultSecurityGroupRule, self).setUp() + + self.sdk_client.default_security_group_rules.return_value = ( + self._default_sg_rules + ) + + # Get the command object to test + self.cmd = default_security_group_rule.ListDefaultSecurityGroupRule( + self.app, self.namespace + ) + + def test_list_default(self): + self._default_sg_rule_tcp.port_range_min = 80 + parsed_args = self.check_parser(self.cmd, [], []) + + columns, data = self.cmd.take_action(parsed_args) + + self.sdk_client.default_security_group_rules.assert_called_once_with( + **{} + ) + self.assertEqual(self.expected_columns, columns) + self.assertEqual(self.expected_data, list(data)) + + def test_list_with_protocol(self): + self._default_sg_rule_tcp.port_range_min = 80 + arglist = [ + '--protocol', + 'tcp', + ] + verifylist = [ + ('protocol', 'tcp'), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + self.sdk_client.default_security_group_rules.assert_called_once_with( + **{ + 'protocol': 'tcp', + } + ) + self.assertEqual(self.expected_columns, columns) + self.assertEqual(self.expected_data, list(data)) + + def test_list_with_ingress(self): + self._default_sg_rule_tcp.port_range_min = 80 + arglist = [ + '--ingress', + ] + verifylist = [ + ('ingress', True), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + self.sdk_client.default_security_group_rules.assert_called_once_with( + **{ + 'direction': 'ingress', + } + ) + self.assertEqual(self.expected_columns, columns) + self.assertEqual(self.expected_data, list(data)) + + def test_list_with_wrong_egress(self): + self._default_sg_rule_tcp.port_range_min = 80 + arglist = [ + '--egress', + ] + verifylist = [ + ('egress', True), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + self.sdk_client.default_security_group_rules.assert_called_once_with( + **{ + 'direction': 'egress', + } + ) + self.assertEqual(self.expected_columns, columns) + self.assertEqual(self.expected_data, list(data)) + + +class TestShowDefaultSecurityGroupRule(TestDefaultSecurityGroupRule): + # The default security group rule to be shown. + _default_sg_rule = sdk_fakes.generate_fake_resource( + _default_security_group_rule.DefaultSecurityGroupRule + ) + + columns = ( + 'description', + 'direction', + 'ether_type', + 'id', + 'port_range_max', + 'port_range_min', + 'protocol', + 'remote_address_group_id', + 'remote_group_id', + 'remote_ip_prefix', + 'used_in_default_sg', + 'used_in_non_default_sg', + ) + + data = ( + _default_sg_rule.description, + _default_sg_rule.direction, + _default_sg_rule.ether_type, + _default_sg_rule.id, + _default_sg_rule.port_range_max, + _default_sg_rule.port_range_min, + _default_sg_rule.protocol, + _default_sg_rule.remote_address_group_id, + _default_sg_rule.remote_group_id, + _default_sg_rule.remote_ip_prefix, + _default_sg_rule.used_in_default_sg, + _default_sg_rule.used_in_non_default_sg, + ) + + def setUp(self): + super(TestShowDefaultSecurityGroupRule, self).setUp() + + self.sdk_client.find_default_security_group_rule.return_value = ( + self._default_sg_rule + ) + + # Get the command object to test + self.cmd = default_security_group_rule.ShowDefaultSecurityGroupRule( + self.app, self.namespace + ) + + def test_show_no_options(self): + self.assertRaises( + tests_utils.ParserException, self.check_parser, self.cmd, [], [] + ) + + def test_show_all_options(self): + arglist = [ + self._default_sg_rule.id, + ] + verifylist = [ + ('rule', self._default_sg_rule.id), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + self.sdk_client.find_default_security_group_rule.assert_called_once_with( + self._default_sg_rule.id, ignore_missing=False + ) + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, data) diff --git a/openstackclient/tests/unit/network/v2/test_security_group_rule_compute.py b/openstackclient/tests/unit/network/v2/test_security_group_rule_compute.py index 2a1609bac..853920439 100644 --- a/openstackclient/tests/unit/network/v2/test_security_group_rule_compute.py +++ b/openstackclient/tests/unit/network/v2/test_security_group_rule_compute.py @@ -49,7 +49,7 @@ class TestCreateSecurityGroupRuleCompute(TestSecurityGroupRuleCompute): ( expected_columns, expected_data, - ) = security_group_rule._format_security_group_rule_show( + ) = network_utils.format_security_group_rule_show( self._security_group_rule ) return expected_columns, expected_data @@ -513,7 +513,7 @@ class TestShowSecurityGroupRuleCompute(TestSecurityGroupRuleCompute): # The security group rule to be shown. _security_group_rule = compute_fakes.create_one_security_group_rule() - columns, data = security_group_rule._format_security_group_rule_show( + columns, data = network_utils.format_security_group_rule_show( _security_group_rule ) diff --git a/openstackclient/tests/unit/network/v2/test_security_group_rule_network.py b/openstackclient/tests/unit/network/v2/test_security_group_rule_network.py index 675ba9bd8..7b28ae345 100644 --- a/openstackclient/tests/unit/network/v2/test_security_group_rule_network.py +++ b/openstackclient/tests/unit/network/v2/test_security_group_rule_network.py @@ -16,6 +16,7 @@ from unittest.mock import call from osc_lib import exceptions +from openstackclient.network import utils as network_utils from openstackclient.network.v2 import security_group_rule from openstackclient.tests.unit.identity.v3 import fakes as identity_fakes from openstackclient.tests.unit.network.v2 import fakes as network_fakes @@ -1124,9 +1125,7 @@ class TestListSecurityGroupRuleNetwork(TestSecurityGroupRuleNetwork): _security_group_rule.protocol, _security_group_rule.ether_type, _security_group_rule.remote_ip_prefix, - security_group_rule._format_network_port_range( - _security_group_rule - ), + network_utils.format_network_port_range(_security_group_rule), _security_group_rule.direction, _security_group_rule.remote_group_id, _security_group_rule.remote_address_group_id, @@ -1138,9 +1137,7 @@ class TestListSecurityGroupRuleNetwork(TestSecurityGroupRuleNetwork): _security_group_rule.protocol, _security_group_rule.ether_type, _security_group_rule.remote_ip_prefix, - security_group_rule._format_network_port_range( - _security_group_rule - ), + network_utils.format_network_port_range(_security_group_rule), _security_group_rule.direction, _security_group_rule.remote_group_id, _security_group_rule.remote_address_group_id, diff --git a/releasenotes/notes/Add-default-security-group-rule-CRUD-2916568f829ea38c.yaml b/releasenotes/notes/Add-default-security-group-rule-CRUD-2916568f829ea38c.yaml new file mode 100644 index 000000000..955d50155 --- /dev/null +++ b/releasenotes/notes/Add-default-security-group-rule-CRUD-2916568f829ea38c.yaml @@ -0,0 +1,8 @@ +--- +features: + - | + Add ``default security group rule create``, ``default security group rule + delete``, ``default security group rule list`` and ``default security group + rule show`` commands to support Neutron Default Security Group Rule CRUD + operations. + [Bug `1983053 `_] diff --git a/requirements.txt b/requirements.txt index a6b67dcc3..a1b87a042 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,7 +7,7 @@ pbr!=2.1.0,>=2.0.0 # Apache-2.0 cryptography>=2.7 # BSD/Apache-2.0 cliff>=3.5.0 # Apache-2.0 iso8601>=0.1.11 # MIT -openstacksdk>=1.4.0 # Apache-2.0 +openstacksdk>=2.0.0 # Apache-2.0 osc-lib>=2.3.0 # Apache-2.0 oslo.i18n>=3.15.3 # Apache-2.0 python-keystoneclient>=3.22.0 # Apache-2.0 diff --git a/setup.cfg b/setup.cfg index de1350756..26b5ae309 100644 --- a/setup.cfg +++ b/setup.cfg @@ -571,6 +571,11 @@ openstack.network.v2 = security_group_rule_list = openstackclient.network.v2.security_group_rule:ListSecurityGroupRule security_group_rule_show = openstackclient.network.v2.security_group_rule:ShowSecurityGroupRule + default_security_group_rule_create = openstackclient.network.v2.default_security_group_rule:CreateDefaultSecurityGroupRule + default_security_group_rule_delete = openstackclient.network.v2.default_security_group_rule:DeleteDefaultSecurityGroupRule + default_security_group_rule_list = openstackclient.network.v2.default_security_group_rule:ListDefaultSecurityGroupRule + default_security_group_rule_show = openstackclient.network.v2.default_security_group_rule:ShowDefaultSecurityGroupRule + subnet_create = openstackclient.network.v2.subnet:CreateSubnet subnet_delete = openstackclient.network.v2.subnet:DeleteSubnet subnet_list = openstackclient.network.v2.subnet:ListSubnet