Add "security group rule show" command

Add the "os security group rule show" command which will use
the SDK when neutron is enabled, and use the nova client when
nova network is enabled.

Change-Id: I41efaa4468ec15e4e86d74144cc72edc25a29024
Partial-Bug: #1519512
Implements: blueprint neutron-client
This commit is contained in:
Richard Theis 2016-02-19 10:19:28 -06:00 committed by Steve Martinelli
parent 20f86465af
commit dccde70c57
7 changed files with 236 additions and 4 deletions

View File

@ -67,3 +67,18 @@ List security group rules
.. describe:: <group>
List all rules in this security group (name or ID)
security group rule show
------------------------
Display security group rule details
.. program:: security group rule show
.. code:: bash
os security group rule show
<rule>
.. describe:: <rule>
Security group rule to display (ID only)

View File

@ -57,3 +57,10 @@ class SecurityGroupRuleTests(test.TestCase):
self.SECURITY_GROUP_NAME +
opts)
self.assertIn(self.SECURITY_GROUP_RULE_ID, raw_output)
def test_security_group_rule_show(self):
opts = self.get_show_opts(self.ID_FIELD)
raw_output = self.openstack('security group rule show ' +
self.SECURITY_GROUP_RULE_ID +
opts)
self.assertEqual(self.SECURITY_GROUP_RULE_ID + "\n", raw_output)

View File

@ -13,9 +13,54 @@
"""Security Group Rule action implementations"""
import six
from openstackclient.common import exceptions
from openstackclient.common import utils
from openstackclient.network import common
def _xform_security_group_rule(sgroup):
info = {}
info.update(sgroup)
from_port = info.pop('from_port')
to_port = info.pop('to_port')
if isinstance(from_port, int) and isinstance(to_port, int):
port_range = {'port_range': "%u:%u" % (from_port, to_port)}
elif from_port is None and to_port is None:
port_range = {'port_range': ""}
else:
port_range = {'port_range': "%s:%s" % (from_port, to_port)}
info.update(port_range)
if 'cidr' in info['ip_range']:
info['ip_range'] = info['ip_range']['cidr']
else:
info['ip_range'] = ''
if info['ip_protocol'] is None:
info['ip_protocol'] = ''
elif info['ip_protocol'].lower() == 'icmp':
info['port_range'] = ''
group = info.pop('group')
if 'name' in group:
info['remote_security_group'] = group['name']
else:
info['remote_security_group'] = ''
return info
def _format_security_group_rule_show(obj):
data = _xform_security_group_rule(obj)
return zip(*sorted(six.iteritems(data)))
def _get_columns(item):
columns = item.keys()
if 'tenant_id' in columns:
columns.remove('tenant_id')
columns.append('project_id')
return tuple(sorted(columns))
class DeleteSecurityGroupRule(common.NetworkAndComputeCommand):
"""Delete a security group rule"""
@ -33,3 +78,44 @@ class DeleteSecurityGroupRule(common.NetworkAndComputeCommand):
def take_action_compute(self, client, parsed_args):
client.security_group_rules.delete(parsed_args.rule)
class ShowSecurityGroupRule(common.NetworkAndComputeShowOne):
"""Display security group rule details"""
def update_parser_common(self, parser):
parser.add_argument(
'rule',
metavar="<rule>",
help="Security group rule to display (ID only)"
)
return parser
def take_action_network(self, client, parsed_args):
obj = client.find_security_group_rule(parsed_args.rule,
ignore_missing=False)
columns = _get_columns(obj)
data = utils.get_item_properties(obj, columns)
return (columns, data)
def take_action_compute(self, client, parsed_args):
# NOTE(rtheis): Unfortunately, compute does not have an API
# to get or list security group rules so parse through the
# security groups to find all accessible rules in search of
# the requested rule.
obj = None
security_group_rules = []
for security_group in client.security_groups.list():
security_group_rules.extend(security_group.rules)
for security_group_rule in security_group_rules:
if parsed_args.rule == str(security_group_rule.get('id')):
obj = security_group_rule
break
if obj is None:
msg = "Could not find security group rule " \
"with ID %s" % parsed_args.rule
raise exceptions.CommandError(msg)
# NOTE(rtheis): Format security group rule
return _format_security_group_rule_show(obj)

View File

@ -479,15 +479,13 @@ class FakeSecurityGroupRule(object):
:param Dictionary methods:
A dictionary with all methods
:return:
A FakeResource object, with id, name, etc.
A FakeResource object, with id, etc.
"""
# Set default attributes.
security_group_rule_attrs = {
'description': 'security-group-rule-desc-' + uuid.uuid4().hex,
'direction': 'ingress',
'ethertype': 'IPv4',
'id': 'security-group-rule-id-' + uuid.uuid4().hex,
'name': 'security-group-rule-name-' + uuid.uuid4().hex,
'port_range_max': None,
'port_range_min': None,
'protocol': None,
@ -501,7 +499,11 @@ class FakeSecurityGroupRule(object):
security_group_rule_attrs.update(attrs)
# Set default methods.
security_group_rule_methods = {}
security_group_rule_methods = {
'keys': ['direction', 'ethertype', 'id', 'port_range_max',
'port_range_min', 'protocol', 'remote_group_id',
'remote_ip_prefix', 'security_group_id', 'tenant_id'],
}
# Overwrite default methods.
security_group_rule_methods.update(methods)
@ -510,6 +512,10 @@ class FakeSecurityGroupRule(object):
info=copy.deepcopy(security_group_rule_attrs),
methods=copy.deepcopy(security_group_rule_methods),
loaded=True)
# Set attributes with special mappings.
security_group_rule.project_id = security_group_rule_attrs['tenant_id']
return security_group_rule
@staticmethod

View File

@ -11,11 +11,14 @@
# under the License.
#
import copy
import mock
from openstackclient.network.v2 import security_group_rule
from openstackclient.tests.compute.v2 import fakes as compute_fakes
from openstackclient.tests import fakes
from openstackclient.tests.network.v2 import fakes as network_fakes
from openstackclient.tests import utils as tests_utils
class TestSecurityGroupRuleNetwork(network_fakes.TestNetworkV2):
@ -98,3 +101,112 @@ class TestDeleteSecurityGroupRuleCompute(TestSecurityGroupRuleCompute):
self.compute.security_group_rules.delete.assert_called_with(
self._security_group_rule.id)
self.assertIsNone(result)
class TestShowSecurityGroupRuleNetwork(TestSecurityGroupRuleNetwork):
# The security group rule to be shown.
_security_group_rule = \
network_fakes.FakeSecurityGroupRule.create_one_security_group_rule()
columns = (
'direction',
'ethertype',
'id',
'port_range_max',
'port_range_min',
'project_id',
'protocol',
'remote_group_id',
'remote_ip_prefix',
'security_group_id',
)
data = (
_security_group_rule.direction,
_security_group_rule.ethertype,
_security_group_rule.id,
_security_group_rule.port_range_max,
_security_group_rule.port_range_min,
_security_group_rule.project_id,
_security_group_rule.protocol,
_security_group_rule.remote_group_id,
_security_group_rule.remote_ip_prefix,
_security_group_rule.security_group_id,
)
def setUp(self):
super(TestShowSecurityGroupRuleNetwork, self).setUp()
self.network.find_security_group_rule = mock.Mock(
return_value=self._security_group_rule)
# Get the command object to test
self.cmd = security_group_rule.ShowSecurityGroupRule(
self.app, self.namespace)
def test_show_no_options(self):
self.assertRaises(tests_utils.ParserException,
self.check_parser, self.cmd, [], [])
def test_show_all_options(self):
arglist = [
self._security_group_rule.id,
]
verifylist = [
('rule', self._security_group_rule.id),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = self.cmd.take_action(parsed_args)
self.network.find_security_group_rule.assert_called_with(
self._security_group_rule.id, ignore_missing=False)
self.assertEqual(tuple(self.columns), columns)
self.assertEqual(self.data, data)
class TestShowSecurityGroupRuleCompute(TestSecurityGroupRuleCompute):
# The security group rule to be shown.
_security_group_rule = \
compute_fakes.FakeSecurityGroupRule.create_one_security_group_rule()
columns, data = \
security_group_rule._format_security_group_rule_show(
_security_group_rule._info)
def setUp(self):
super(TestShowSecurityGroupRuleCompute, self).setUp()
self.app.client_manager.network_endpoint_enabled = False
# Build a security group fake customized for this test.
security_group_rules = [self._security_group_rule._info]
security_group = fakes.FakeResource(
info=copy.deepcopy({'rules': security_group_rules}),
loaded=True)
security_group.rules = security_group_rules
self.compute.security_groups.list.return_value = [security_group]
# Get the command object to test
self.cmd = security_group_rule.ShowSecurityGroupRule(self.app, None)
def test_show_no_options(self):
self.assertRaises(tests_utils.ParserException,
self.check_parser, self.cmd, [], [])
def test_show_all_options(self):
arglist = [
self._security_group_rule.id,
]
verifylist = [
('rule', self._security_group_rule.id),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = self.cmd.take_action(parsed_args)
self.compute.security_groups.list.assert_called_with()
self.assertEqual(self.columns, columns)
self.assertEqual(self.data, data)

View File

@ -0,0 +1,5 @@
---
features:
- |
Add support for ``security group rule show`` command.
[Bug `1519512 <https://bugs.launchpad.net/bugs/1519512>`_]

View File

@ -340,6 +340,7 @@ openstack.network.v2 =
router_show = openstackclient.network.v2.router:ShowRouter
security_group_delete = openstackclient.network.v2.security_group:DeleteSecurityGroup
security_group_rule_delete = openstackclient.network.v2.security_group_rule:DeleteSecurityGroupRule
security_group_rule_show = openstackclient.network.v2.security_group_rule:ShowSecurityGroupRule
subnet_list = openstackclient.network.v2.subnet:ListSubnet
subnet_pool_delete = openstackclient.network.v2.subnet_pool:DeleteSubnetPool
subnet_pool_list = openstackclient.network.v2.subnet_pool:ListSubnetPool