security-group-rule-list: show all info of rules briefly

Previously security-group-rule-list does not display full
information of rules by default (e.g., port ranges, ether types)
and users need to use security-group-rule-show to check details.
It is not convenient.

This commit introduces some aggregated columns ("protocol/port" and
"remote") to show infomration briefly and as a result full attributes
of rules will be displayed.

Closes-Bug: #1182629
Change-Id: I047bf9a1ccba5b023d66f22ef5256f7786196113
This commit is contained in:
Akihiro Motoki 2015-03-01 19:37:40 +09:00
parent 5a6e608926
commit 4829e25b6a
3 changed files with 194 additions and 69 deletions

View File

@ -21,6 +21,16 @@ from neutronclient.i18n import _
from neutronclient.neutron import v2_0 as neutronV20
def _get_remote(rule):
if rule['remote_ip_prefix']:
remote = '%s (CIDR)' % rule['remote_ip_prefix']
elif rule['remote_group_id']:
remote = '%s (group)' % rule['remote_group_id']
else:
remote = None
return remote
def _get_protocol_port(rule):
proto = rule['protocol']
port_min = rule['port_range_min']
@ -157,10 +167,19 @@ class ListSecurityGroupRule(neutronV20.ListCommand):
"""List security group rules that belong to a given tenant."""
resource = 'security_group_rule'
list_columns = ['id', 'security_group_id', 'direction', 'protocol',
'remote_ip_prefix', 'remote_group_id']
list_columns = ['id', 'security_group_id', 'direction',
'ethertype', 'protocol/port', 'remote']
# replace_rules: key is an attribute name in Neutron API and
# corresponding value is a display name shown by CLI.
replace_rules = {'security_group_id': 'security_group',
'remote_group_id': 'remote_group'}
digest_fields = {
'remote': {
'method': _get_remote,
'depends_on': ['remote_ip_prefix', 'remote_group_id']},
'protocol/port': {
'method': _get_protocol_port,
'depends_on': ['protocol', 'port_range_min', 'port_range_max']}}
pagination_support = True
sorting_support = True
@ -177,19 +196,28 @@ class ListSecurityGroupRule(neutronV20.ListCommand):
rules = dict((rules[k], k) for k in rules.keys())
return [rules.get(col, col) for col in cols]
def get_required_fields(self, fields):
fields = self.replace_columns(fields, self.replace_rules, reverse=True)
for field, digest_fields in self.digest_fields.items():
if field in fields:
fields += digest_fields['depends_on']
fields.remove(field)
return fields
def retrieve_list(self, parsed_args):
parsed_args.fields = self.replace_columns(parsed_args.fields,
self.replace_rules,
reverse=True)
parsed_args.fields = self.get_required_fields(parsed_args.fields)
return super(ListSecurityGroupRule, self).retrieve_list(parsed_args)
def extend_list(self, data, parsed_args):
if parsed_args.no_nameconv:
return
def _get_sg_name_dict(self, data, page_size, no_nameconv):
"""Get names of security groups referred in the retrieved rules.
:return: a dict from secgroup ID to secgroup name
"""
if no_nameconv:
return {}
neutron_client = self.get_client()
search_opts = {'fields': ['id', 'name']}
if self.pagination_support:
page_size = parsed_args.page_size
if page_size:
search_opts.update({'limit': page_size})
sec_group_ids = set()
@ -222,14 +250,28 @@ class ListSecurityGroupRule(neutronV20.ListCommand):
secgroups.extend(
_get_sec_group_list(sec_group_ids[i: i + chunk_size]))
sg_dict = dict([(sg['id'], sg['name'])
for sg in secgroups if sg['name']])
return dict([(sg['id'], sg['name'])
for sg in secgroups if sg['name']])
@staticmethod
def _has_fileds(rule, required_fileds):
return all([key in rule for key in required_fileds])
def extend_list(self, data, parsed_args):
sg_dict = self._get_sg_name_dict(data, parsed_args.page_size,
parsed_args.no_nameconv)
for rule in data:
# Replace security group UUID with its name.
for key in self.replace_rules:
if key in rule:
rule[key] = sg_dict.get(rule[key], rule[key])
for field, digest_rule in self.digest_fields.items():
if self._has_fileds(rule, digest_rule['depends_on']):
rule[field] = digest_rule['method'](rule) or 'any'
def setup_columns(self, info, parsed_args):
# Translate the specified columns from the command line
# into field names used in "info".
parsed_args.columns = self.replace_columns(parsed_args.columns,
self.replace_rules,
reverse=True)
@ -240,6 +282,7 @@ class ListSecurityGroupRule(neutronV20.ListCommand):
parsed_args)
cols = info[0]
if not parsed_args.no_nameconv:
# Replace column names in the header line (in info[0])
cols = self.replace_columns(info[0], self.replace_rules)
parsed_args.columns = cols
return (cols, info[1])

View File

@ -115,9 +115,8 @@ class SimpleReadOnlyNeutronClientTest(base.ClientTestBase):
security_grp = self.parser.listing(self.neutron
('security-group-rule-list'))
self.assertTableStruct(security_grp, ['id', 'security_group',
'direction', 'protocol',
'remote_ip_prefix',
'remote_group'])
'direction', 'ethertype',
'protocol/port', 'remote'])
def test_neutron_subnet_list(self):
subnet_list = self.parser.listing(self.neutron('subnet-list'))

View File

@ -332,9 +332,9 @@ class CLITestV20SecurityGroupsJSON(test_cli20.CLITestV20Base):
self._test_show_resource(resource, cmd, self.test_id,
args, ['id'])
def _test_list_security_group_rules_extend(self, data=None, expected=None,
def _test_list_security_group_rules_extend(self, api_data, expected,
args=(), conv=True,
query_field=False):
query_fields=None):
def setup_list_stub(resources, data, query):
reses = {resources: data}
resstr = self.client.serialize(reses)
@ -349,38 +349,22 @@ class CLITestV20SecurityGroupsJSON(test_cli20.CLITestV20Base):
headers=mox.ContainsKeyValue(
'X-Auth-Token', test_cli20.TOKEN)).AndReturn(resp)
# Setup the default data
_data = {'cols': ['id', 'security_group_id', 'remote_group_id'],
'data': [('ruleid1', 'myid1', 'myid1'),
('ruleid2', 'myid2', 'myid3'),
('ruleid3', 'myid2', 'myid2')]}
_expected = {'cols': ['id', 'security_group', 'remote_group'],
'data': [('ruleid1', 'group1', 'group1'),
('ruleid2', 'group2', 'group3'),
('ruleid3', 'group2', 'group2')]}
if data is None:
data = _data
list_data = [dict(zip(data['cols'], d)) for d in data['data']]
if expected is None:
expected = {}
expected['cols'] = expected.get('cols', _expected['cols'])
expected['data'] = expected.get('data', _expected['data'])
cmd = securitygroup.ListSecurityGroupRule(
test_cli20.MyApp(sys.stdout), None)
self.mox.StubOutWithMock(cmd, 'get_client')
self.mox.StubOutWithMock(self.client.httpclient, 'request')
cmd.get_client().AndReturn(self.client)
query = ''
if query_field:
query = '&'.join(['fields=' + f for f in data['cols']])
setup_list_stub('security_group_rules', list_data, query)
if query_fields:
query = '&'.join(['fields=' + f for f in query_fields])
setup_list_stub('security_group_rules', api_data, query)
if conv:
cmd.get_client().AndReturn(self.client)
sec_ids = set()
for n in data['data']:
sec_ids.add(n[1])
sec_ids.add(n[2])
for n in api_data:
sec_ids.add(n['security_group_id'])
if n.get('remote_group_id'):
sec_ids.add(n['remote_group_id'])
filters = ''
for id in sec_ids:
filters = filters + "&id=%s" % id
@ -397,50 +381,131 @@ class CLITestV20SecurityGroupsJSON(test_cli20.CLITestV20Base):
self.mox.VerifyAll()
self.mox.UnsetStubs()
# Check columns
self.assertEqual(result[0], expected['cols'])
self.assertEqual(expected['cols'], result[0])
# Check data
_result = [x for x in result[1]]
self.assertEqual(len(_result), len(expected['data']))
for res, exp in zip(_result, expected['data']):
self.assertEqual(len(res), len(exp))
self.assertEqual(res, exp)
self.assertEqual(len(exp), len(res))
self.assertEqual(exp, res)
def test_list_security_group_rules_extend_source_id(self):
self._test_list_security_group_rules_extend()
def _test_list_security_group_rules_extend_sg_name(
self, expected_mode=None, args=(), conv=True, query_field=False):
if query_field:
field_filters = ['id', 'security_group_id',
'remote_ip_prefix', 'remote_group_id']
else:
field_filters = None
def test_list_security_group_rules_extend_no_nameconv(self):
expected = {'cols': ['id', 'security_group_id', 'remote_group_id'],
'data': [('ruleid1', 'myid1', 'myid1'),
('ruleid2', 'myid2', 'myid3'),
('ruleid3', 'myid2', 'myid2')]}
args = ['--no-nameconv']
self._test_list_security_group_rules_extend(expected=expected,
args=args, conv=False)
data = [self._prepare_rule(rule_id='ruleid1', sg_id='myid1',
remote_group_id='myid1',
filters=field_filters),
self._prepare_rule(rule_id='ruleid2', sg_id='myid2',
remote_group_id='myid3',
filters=field_filters),
self._prepare_rule(rule_id='ruleid3', sg_id='myid2',
remote_group_id='myid2',
filters=field_filters),
]
def test_list_security_group_rules_extend_with_columns(self):
if expected_mode == 'noconv':
expected = {'cols': ['id', 'security_group_id', 'remote_group_id'],
'data': [('ruleid1', 'myid1', 'myid1'),
('ruleid2', 'myid2', 'myid3'),
('ruleid3', 'myid2', 'myid2')]}
elif expected_mode == 'remote_group_id':
expected = {'cols': ['id', 'security_group', 'remote_group'],
'data': [('ruleid1', 'group1', 'group1'),
('ruleid2', 'group2', 'group3'),
('ruleid3', 'group2', 'group2')]}
else:
expected = {'cols': ['id', 'security_group', 'remote'],
'data': [('ruleid1', 'group1', 'group1 (group)'),
('ruleid2', 'group2', 'group3 (group)'),
('ruleid3', 'group2', 'group2 (group)')]}
self._test_list_security_group_rules_extend(
data, expected, args=args, conv=conv, query_fields=field_filters)
def test_list_security_group_rules_extend_remote_sg_name(self):
args = '-c id -c security_group -c remote'.split()
self._test_list_security_group_rules_extend_sg_name(args=args)
def test_list_security_group_rules_extend_sg_name_noconv(self):
args = '--no-nameconv -c id -c security_group_id -c remote_group_id'
args = args.split()
self._test_list_security_group_rules_extend_sg_name(
expected_mode='noconv', args=args, conv=False)
def test_list_security_group_rules_extend_sg_name_with_columns(self):
args = '-c id -c security_group_id -c remote_group_id'.split()
self._test_list_security_group_rules_extend(args=args)
self._test_list_security_group_rules_extend_sg_name(
expected_mode='remote_group_id', args=args)
def test_list_security_group_rules_extend_with_columns_no_id(self):
def test_list_security_group_rules_extend_sg_name_with_columns_no_id(self):
args = '-c id -c security_group -c remote_group'.split()
self._test_list_security_group_rules_extend(args=args)
self._test_list_security_group_rules_extend_sg_name(
expected_mode='remote_group_id', args=args)
def test_list_security_group_rules_extend_with_fields(self):
args = '-F id -F security_group_id -F remote_group_id'.split()
self._test_list_security_group_rules_extend(args=args,
query_field=True)
def test_list_security_group_rules_extend_sg_name_with_fields(self):
# NOTE: remote_ip_prefix is required to show "remote" column
args = ('-F id -F security_group_id '
'-F remote_ip_prefix -F remote_group_id').split()
self._test_list_security_group_rules_extend_sg_name(
args=args, query_field=True)
def test_list_security_group_rules_extend_with_fields_no_id(self):
args = '-F id -F security_group -F remote_group'.split()
self._test_list_security_group_rules_extend(args=args,
query_field=True)
def test_list_security_group_rules_extend_sg_name_with_fields_no_id(self):
# NOTE: remote_ip_prefix is required to show "remote" column
args = ('-F id -F security_group '
'-F remote_ip_prefix -F remote_group').split()
self._test_list_security_group_rules_extend_sg_name(args=args,
query_field=True)
def _prepare_rule(self, direction=None, ethertype=None,
def test_list_security_group_rules_extend_remote(self):
args = '-c id -c security_group -c remote'.split()
data = [self._prepare_rule(rule_id='ruleid1', sg_id='myid1',
remote_ip_prefix='172.16.18.0/24'),
self._prepare_rule(rule_id='ruleid2', sg_id='myid2',
remote_ip_prefix='172.16.20.0/24'),
self._prepare_rule(rule_id='ruleid3', sg_id='myid2',
remote_group_id='myid3')]
expected = {'cols': ['id', 'security_group', 'remote'],
'data': [('ruleid1', 'group1', '172.16.18.0/24 (CIDR)'),
('ruleid2', 'group2', '172.16.20.0/24 (CIDR)'),
('ruleid3', 'group2', 'group3 (group)')]}
self._test_list_security_group_rules_extend(data, expected, args)
def test_list_security_group_rules_extend_proto_port(self):
data = [self._prepare_rule(rule_id='ruleid1', sg_id='myid1',
protocol='tcp',
port_range_min=22, port_range_max=22),
self._prepare_rule(rule_id='ruleid2', sg_id='myid2',
direction='egress', ethertype='IPv6',
protocol='udp',
port_range_min=80, port_range_max=81),
self._prepare_rule(rule_id='ruleid3', sg_id='myid2',
protocol='icmp',
remote_ip_prefix='10.2.0.0/16')]
expected = {
'cols': ['id', 'security_group', 'direction', 'ethertype',
'protocol/port', 'remote'],
'data': [
('ruleid1', 'group1', 'ingress', 'IPv4', '22/tcp', 'any'),
('ruleid2', 'group2', 'egress', 'IPv6', '80-81/udp', 'any'),
('ruleid3', 'group2', 'ingress', 'IPv4', 'icmp',
'10.2.0.0/16 (CIDR)')
]}
self._test_list_security_group_rules_extend(data, expected)
def _prepare_rule(self, rule_id=None, sg_id=None, tenant_id=None,
direction=None, ethertype=None,
protocol=None, port_range_min=None, port_range_max=None,
remote_ip_prefix=None, remote_group_id=None):
return {'id': str(uuid.uuid4()),
'tenant_id': str(uuid.uuid4()),
'security_group_id': str(uuid.uuid4()),
remote_ip_prefix=None, remote_group_id=None,
filters=None):
rule = {'id': rule_id or str(uuid.uuid4()),
'tenant_id': tenant_id or str(uuid.uuid4()),
'security_group_id': sg_id or str(uuid.uuid4()),
'direction': direction or 'ingress',
'ethertype': ethertype or 'IPv4',
'protocol': protocol,
@ -448,6 +513,24 @@ class CLITestV20SecurityGroupsJSON(test_cli20.CLITestV20Base):
'port_range_max': port_range_max,
'remote_ip_prefix': remote_ip_prefix,
'remote_group_id': remote_group_id}
if filters:
return dict([(k, v) for k, v in rule.items() if k in filters])
else:
return rule
def test__get_remote_both_unspecified(self):
sg_rule = self._prepare_rule(remote_ip_prefix=None,
remote_group_id=None)
self.assertIsNone(securitygroup._get_remote(sg_rule))
def test__get_remote_remote_ip_prefix_specified(self):
sg_rule = self._prepare_rule(remote_ip_prefix='172.16.18.0/24')
self.assertEqual('172.16.18.0/24 (CIDR)',
securitygroup._get_remote(sg_rule))
def test__get_remote_remote_group_specified(self):
sg_rule = self._prepare_rule(remote_group_id='sg_id1')
self.assertEqual('sg_id1 (group)', securitygroup._get_remote(sg_rule))
def test__get_protocol_port_all_none(self):
sg_rule = self._prepare_rule()