Refactor security group rule delete to use SDK

Refactored the 'os security group rule delete' command to use the
SDK when neutron is enabled, but continue to use the nova client
when nova network is enabled.

This patch set also introduces new FakeSecurityGroupRule classes
for testing network and compute security group rules. And fixes
were made to the network FakeSecurityGroup class.

Change-Id: I8d0917925aa464e8255defae95a2a2adfb6cfb75
Partial-Bug: #1519512
Related-to: blueprint neutron-client
This commit is contained in:
Richard Theis 2016-02-05 09:02:27 -06:00
parent 624c39ab1b
commit a29c9732d7
7 changed files with 268 additions and 35 deletions

View File

@ -169,24 +169,6 @@ class CreateSecurityGroupRule(command.ShowOne):
return zip(*sorted(six.iteritems(info)))
class DeleteSecurityGroupRule(command.Command):
"""Delete a security group rule"""
def get_parser(self, prog_name):
parser = super(DeleteSecurityGroupRule, self).get_parser(prog_name)
parser.add_argument(
'rule',
metavar='<rule>',
help='Security group rule to delete (ID only)',
)
return parser
def take_action(self, parsed_args):
compute_client = self.app.client_manager.compute
compute_client.security_group_rules.delete(parsed_args.rule)
class ListSecurityGroup(command.Lister):
"""List security groups"""

View File

@ -19,7 +19,13 @@ from openstackclient.common import command
@six.add_metaclass(abc.ABCMeta)
class NetworkAndComputeCommand(command.Command):
"""Network and Compute Command"""
"""Network and Compute Command
Command class for commands that support implementation via
the network or compute endpoint. Such commands have different
implementations for take_action() and may even have different
arguments.
"""
def take_action(self, parsed_args):
if self.app.client_manager.is_network_endpoint_enabled():

View File

@ -0,0 +1,35 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
"""Security Group Rule action implementations"""
from openstackclient.network import common
class DeleteSecurityGroupRule(common.NetworkAndComputeCommand):
"""Delete a security group rule"""
def update_parser_common(self, parser):
parser.add_argument(
'rule',
metavar='<rule>',
help='Security group rule to delete (ID only)',
)
return parser
def take_action_network(self, client, parsed_args):
obj = client.find_security_group_rule(parsed_args.rule)
client.delete_security_group_rule(obj)
def take_action_compute(self, client, parsed_args):
client.security_group_rules.delete(parsed_args.rule)

View File

@ -228,6 +228,68 @@ class FakeHypervisor(object):
return hypervisors
class FakeSecurityGroupRule(object):
"""Fake one or more security group rules."""
@staticmethod
def create_one_security_group_rule(attrs={}, methods={}):
"""Create a fake security group rule.
:param Dictionary attrs:
A dictionary with all attributes
:param Dictionary methods:
A dictionary with all methods
:return:
A FakeResource object, with id, etc.
"""
# Set default attributes.
security_group_rule_attrs = {
'from_port': -1,
'group': {},
'id': 'security-group-rule-id-' + uuid.uuid4().hex,
'ip_protocol': 'icmp',
'ip_range': {'cidr': '0.0.0.0/0'},
'parent_group_id': 'security-group-id-' + uuid.uuid4().hex,
'to_port': -1,
}
# Overwrite default attributes.
security_group_rule_attrs.update(attrs)
# Set default methods.
security_group_rule_methods = {}
# Overwrite default methods.
security_group_rule_methods.update(methods)
security_group_rule = fakes.FakeResource(
info=copy.deepcopy(security_group_rule_attrs),
methods=copy.deepcopy(security_group_rule_methods),
loaded=True)
return security_group_rule
@staticmethod
def create_security_group_rules(attrs={}, methods={}, count=2):
"""Create multiple fake security group rules.
:param Dictionary attrs:
A dictionary with all attributes
:param Dictionary methods:
A dictionary with all methods
:param int count:
The number of security group rules to fake
:return:
A list of FakeResource objects faking the security group rules
"""
security_group_rules = []
for i in range(0, count):
security_group_rules.append(
FakeSecurityGroupRule.create_one_security_group_rule(
attrs, methods))
return security_group_rules
class FakeServer(object):
"""Fake one or more compute servers."""

View File

@ -463,28 +463,76 @@ class FakeSecurityGroup(object):
security_groups = []
for i in range(0, count):
security_groups.append(
FakeRouter.create_one_security_group(attrs, methods))
FakeSecurityGroup.create_one_security_group(attrs, methods))
return security_groups
class FakeSecurityGroupRule(object):
"""Fake one or more security group rules."""
@staticmethod
def get_security_groups(security_groups=None, count=2):
"""Get an iterable MagicMock object with a list of faked security groups.
def create_one_security_group_rule(attrs={}, methods={}):
"""Create a fake security group rule.
If security group list is provided, then initialize the Mock object
with the list. Otherwise create one.
:param List security groups:
A list of FakeResource objects faking security groups
:param int count:
The number of security groups to fake
:param Dictionary attrs:
A dictionary with all attributes
:param Dictionary methods:
A dictionary with all methods
:return:
An iterable Mock object with side_effect set to a list of faked
security groups
A FakeResource object, with id, name, etc.
"""
if security_groups is None:
security_groups = FakeRouter.create_security_groups(count)
return mock.MagicMock(side_effect=security_groups)
# Set default attributes.
security_group_rule_attrs = {
'description': 'security-group-rule-desc-' + uuid.uuid4().hex,
'direction': 'ingress',
'ethertype': 'IPv4',
'id': 'security-group-rule-id-' + uuid.uuid4().hex,
'name': 'security-group-rule-name-' + uuid.uuid4().hex,
'port_range_max': None,
'port_range_min': None,
'protocol': None,
'remote_group_id': 'remote-security-group-id-' + uuid.uuid4().hex,
'remote_ip_prefix': None,
'security_group_id': 'security-group-id-' + uuid.uuid4().hex,
'tenant_id': 'project-id-' + uuid.uuid4().hex,
}
# Overwrite default attributes.
security_group_rule_attrs.update(attrs)
# Set default methods.
security_group_rule_methods = {}
# Overwrite default methods.
security_group_rule_methods.update(methods)
security_group_rule = fakes.FakeResource(
info=copy.deepcopy(security_group_rule_attrs),
methods=copy.deepcopy(security_group_rule_methods),
loaded=True)
return security_group_rule
@staticmethod
def create_security_group_rules(attrs={}, methods={}, count=2):
"""Create multiple fake security group rules.
:param Dictionary attrs:
A dictionary with all attributes
:param Dictionary methods:
A dictionary with all methods
:param int count:
The number of security group rules to fake
:return:
A list of FakeResource objects faking the security group rules
"""
security_group_rules = []
for i in range(0, count):
security_group_rules.append(
FakeSecurityGroupRule.create_one_security_group_rule(
attrs, methods))
return security_group_rules
class FakeSubnet(object):

View File

@ -0,0 +1,100 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import mock
from openstackclient.network.v2 import security_group_rule
from openstackclient.tests.compute.v2 import fakes as compute_fakes
from openstackclient.tests.network.v2 import fakes as network_fakes
class TestSecurityGroupRuleNetwork(network_fakes.TestNetworkV2):
def setUp(self):
super(TestSecurityGroupRuleNetwork, self).setUp()
# Get a shortcut to the network client
self.network = self.app.client_manager.network
class TestSecurityGroupRuleCompute(compute_fakes.TestComputev2):
def setUp(self):
super(TestSecurityGroupRuleCompute, self).setUp()
# Get a shortcut to the network client
self.compute = self.app.client_manager.compute
class TestDeleteSecurityGroupRuleNetwork(TestSecurityGroupRuleNetwork):
# The security group rule to be deleted.
_security_group_rule = \
network_fakes.FakeSecurityGroupRule.create_one_security_group_rule()
def setUp(self):
super(TestDeleteSecurityGroupRuleNetwork, self).setUp()
self.network.delete_security_group_rule = mock.Mock(return_value=None)
self.network.find_security_group_rule = mock.Mock(
return_value=self._security_group_rule)
# Get the command object to test
self.cmd = security_group_rule.DeleteSecurityGroupRule(
self.app, self.namespace)
def test_security_group_rule_delete(self):
arglist = [
self._security_group_rule.id,
]
verifylist = [
('rule', self._security_group_rule.id),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
self.network.delete_security_group_rule.assert_called_with(
self._security_group_rule)
self.assertEqual(None, result)
class TestDeleteSecurityGroupRuleCompute(TestSecurityGroupRuleCompute):
# The security group rule to be deleted.
_security_group_rule = \
compute_fakes.FakeSecurityGroupRule.create_one_security_group_rule()
def setUp(self):
super(TestDeleteSecurityGroupRuleCompute, self).setUp()
self.app.client_manager.network_endpoint_enabled = False
# Get the command object to test
self.cmd = security_group_rule.DeleteSecurityGroupRule(self.app, None)
def test_security_group_rule_delete(self):
arglist = [
self._security_group_rule.id,
]
verifylist = [
('rule', self._security_group_rule.id),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
self.compute.security_group_rules.delete.assert_called_with(
self._security_group_rule.id)
self.assertEqual(None, result)

View File

@ -107,7 +107,6 @@ openstack.compute.v2 =
security_group_set = openstackclient.compute.v2.security_group:SetSecurityGroup
security_group_show = openstackclient.compute.v2.security_group:ShowSecurityGroup
security_group_rule_create = openstackclient.compute.v2.security_group:CreateSecurityGroupRule
security_group_rule_delete = openstackclient.compute.v2.security_group:DeleteSecurityGroupRule
security_group_rule_list = openstackclient.compute.v2.security_group:ListSecurityGroupRule
server_add_security_group = openstackclient.compute.v2.server:AddServerSecurityGroup
@ -340,6 +339,7 @@ openstack.network.v2 =
router_set = openstackclient.network.v2.router:SetRouter
router_show = openstackclient.network.v2.router:ShowRouter
security_group_delete = openstackclient.network.v2.security_group:DeleteSecurityGroup
security_group_rule_delete = openstackclient.network.v2.security_group_rule:DeleteSecurityGroupRule
subnet_list = openstackclient.network.v2.subnet:ListSubnet
openstack.object_store.v1 =