security-group-rule-list: show all info of rules briefly
Previously security-group-rule-list does not display full information of rules by default (e.g., port ranges, ether types) and users need to use security-group-rule-show to check details. It is not convenient. This commit introduces some aggregated columns ("protocol/port" and "remote") to show infomration briefly and as a result full attributes of rules will be displayed. Closes-Bug: #1182629 Change-Id: I047bf9a1ccba5b023d66f22ef5256f7786196113
This commit is contained in:
parent
5a6e608926
commit
4829e25b6a
|
@ -21,6 +21,16 @@ from neutronclient.i18n import _
|
|||
from neutronclient.neutron import v2_0 as neutronV20
|
||||
|
||||
|
||||
def _get_remote(rule):
|
||||
if rule['remote_ip_prefix']:
|
||||
remote = '%s (CIDR)' % rule['remote_ip_prefix']
|
||||
elif rule['remote_group_id']:
|
||||
remote = '%s (group)' % rule['remote_group_id']
|
||||
else:
|
||||
remote = None
|
||||
return remote
|
||||
|
||||
|
||||
def _get_protocol_port(rule):
|
||||
proto = rule['protocol']
|
||||
port_min = rule['port_range_min']
|
||||
|
@ -157,10 +167,19 @@ class ListSecurityGroupRule(neutronV20.ListCommand):
|
|||
"""List security group rules that belong to a given tenant."""
|
||||
|
||||
resource = 'security_group_rule'
|
||||
list_columns = ['id', 'security_group_id', 'direction', 'protocol',
|
||||
'remote_ip_prefix', 'remote_group_id']
|
||||
list_columns = ['id', 'security_group_id', 'direction',
|
||||
'ethertype', 'protocol/port', 'remote']
|
||||
# replace_rules: key is an attribute name in Neutron API and
|
||||
# corresponding value is a display name shown by CLI.
|
||||
replace_rules = {'security_group_id': 'security_group',
|
||||
'remote_group_id': 'remote_group'}
|
||||
digest_fields = {
|
||||
'remote': {
|
||||
'method': _get_remote,
|
||||
'depends_on': ['remote_ip_prefix', 'remote_group_id']},
|
||||
'protocol/port': {
|
||||
'method': _get_protocol_port,
|
||||
'depends_on': ['protocol', 'port_range_min', 'port_range_max']}}
|
||||
pagination_support = True
|
||||
sorting_support = True
|
||||
|
||||
|
@ -177,19 +196,28 @@ class ListSecurityGroupRule(neutronV20.ListCommand):
|
|||
rules = dict((rules[k], k) for k in rules.keys())
|
||||
return [rules.get(col, col) for col in cols]
|
||||
|
||||
def get_required_fields(self, fields):
|
||||
fields = self.replace_columns(fields, self.replace_rules, reverse=True)
|
||||
for field, digest_fields in self.digest_fields.items():
|
||||
if field in fields:
|
||||
fields += digest_fields['depends_on']
|
||||
fields.remove(field)
|
||||
return fields
|
||||
|
||||
def retrieve_list(self, parsed_args):
|
||||
parsed_args.fields = self.replace_columns(parsed_args.fields,
|
||||
self.replace_rules,
|
||||
reverse=True)
|
||||
parsed_args.fields = self.get_required_fields(parsed_args.fields)
|
||||
return super(ListSecurityGroupRule, self).retrieve_list(parsed_args)
|
||||
|
||||
def extend_list(self, data, parsed_args):
|
||||
if parsed_args.no_nameconv:
|
||||
return
|
||||
def _get_sg_name_dict(self, data, page_size, no_nameconv):
|
||||
"""Get names of security groups referred in the retrieved rules.
|
||||
|
||||
:return: a dict from secgroup ID to secgroup name
|
||||
"""
|
||||
if no_nameconv:
|
||||
return {}
|
||||
neutron_client = self.get_client()
|
||||
search_opts = {'fields': ['id', 'name']}
|
||||
if self.pagination_support:
|
||||
page_size = parsed_args.page_size
|
||||
if page_size:
|
||||
search_opts.update({'limit': page_size})
|
||||
sec_group_ids = set()
|
||||
|
@ -222,14 +250,28 @@ class ListSecurityGroupRule(neutronV20.ListCommand):
|
|||
secgroups.extend(
|
||||
_get_sec_group_list(sec_group_ids[i: i + chunk_size]))
|
||||
|
||||
sg_dict = dict([(sg['id'], sg['name'])
|
||||
for sg in secgroups if sg['name']])
|
||||
return dict([(sg['id'], sg['name'])
|
||||
for sg in secgroups if sg['name']])
|
||||
|
||||
@staticmethod
|
||||
def _has_fileds(rule, required_fileds):
|
||||
return all([key in rule for key in required_fileds])
|
||||
|
||||
def extend_list(self, data, parsed_args):
|
||||
sg_dict = self._get_sg_name_dict(data, parsed_args.page_size,
|
||||
parsed_args.no_nameconv)
|
||||
for rule in data:
|
||||
# Replace security group UUID with its name.
|
||||
for key in self.replace_rules:
|
||||
if key in rule:
|
||||
rule[key] = sg_dict.get(rule[key], rule[key])
|
||||
for field, digest_rule in self.digest_fields.items():
|
||||
if self._has_fileds(rule, digest_rule['depends_on']):
|
||||
rule[field] = digest_rule['method'](rule) or 'any'
|
||||
|
||||
def setup_columns(self, info, parsed_args):
|
||||
# Translate the specified columns from the command line
|
||||
# into field names used in "info".
|
||||
parsed_args.columns = self.replace_columns(parsed_args.columns,
|
||||
self.replace_rules,
|
||||
reverse=True)
|
||||
|
@ -240,6 +282,7 @@ class ListSecurityGroupRule(neutronV20.ListCommand):
|
|||
parsed_args)
|
||||
cols = info[0]
|
||||
if not parsed_args.no_nameconv:
|
||||
# Replace column names in the header line (in info[0])
|
||||
cols = self.replace_columns(info[0], self.replace_rules)
|
||||
parsed_args.columns = cols
|
||||
return (cols, info[1])
|
||||
|
|
|
@ -115,9 +115,8 @@ class SimpleReadOnlyNeutronClientTest(base.ClientTestBase):
|
|||
security_grp = self.parser.listing(self.neutron
|
||||
('security-group-rule-list'))
|
||||
self.assertTableStruct(security_grp, ['id', 'security_group',
|
||||
'direction', 'protocol',
|
||||
'remote_ip_prefix',
|
||||
'remote_group'])
|
||||
'direction', 'ethertype',
|
||||
'protocol/port', 'remote'])
|
||||
|
||||
def test_neutron_subnet_list(self):
|
||||
subnet_list = self.parser.listing(self.neutron('subnet-list'))
|
||||
|
|
|
@ -332,9 +332,9 @@ class CLITestV20SecurityGroupsJSON(test_cli20.CLITestV20Base):
|
|||
self._test_show_resource(resource, cmd, self.test_id,
|
||||
args, ['id'])
|
||||
|
||||
def _test_list_security_group_rules_extend(self, data=None, expected=None,
|
||||
def _test_list_security_group_rules_extend(self, api_data, expected,
|
||||
args=(), conv=True,
|
||||
query_field=False):
|
||||
query_fields=None):
|
||||
def setup_list_stub(resources, data, query):
|
||||
reses = {resources: data}
|
||||
resstr = self.client.serialize(reses)
|
||||
|
@ -349,38 +349,22 @@ class CLITestV20SecurityGroupsJSON(test_cli20.CLITestV20Base):
|
|||
headers=mox.ContainsKeyValue(
|
||||
'X-Auth-Token', test_cli20.TOKEN)).AndReturn(resp)
|
||||
|
||||
# Setup the default data
|
||||
_data = {'cols': ['id', 'security_group_id', 'remote_group_id'],
|
||||
'data': [('ruleid1', 'myid1', 'myid1'),
|
||||
('ruleid2', 'myid2', 'myid3'),
|
||||
('ruleid3', 'myid2', 'myid2')]}
|
||||
_expected = {'cols': ['id', 'security_group', 'remote_group'],
|
||||
'data': [('ruleid1', 'group1', 'group1'),
|
||||
('ruleid2', 'group2', 'group3'),
|
||||
('ruleid3', 'group2', 'group2')]}
|
||||
if data is None:
|
||||
data = _data
|
||||
list_data = [dict(zip(data['cols'], d)) for d in data['data']]
|
||||
if expected is None:
|
||||
expected = {}
|
||||
expected['cols'] = expected.get('cols', _expected['cols'])
|
||||
expected['data'] = expected.get('data', _expected['data'])
|
||||
|
||||
cmd = securitygroup.ListSecurityGroupRule(
|
||||
test_cli20.MyApp(sys.stdout), None)
|
||||
self.mox.StubOutWithMock(cmd, 'get_client')
|
||||
self.mox.StubOutWithMock(self.client.httpclient, 'request')
|
||||
cmd.get_client().AndReturn(self.client)
|
||||
query = ''
|
||||
if query_field:
|
||||
query = '&'.join(['fields=' + f for f in data['cols']])
|
||||
setup_list_stub('security_group_rules', list_data, query)
|
||||
if query_fields:
|
||||
query = '&'.join(['fields=' + f for f in query_fields])
|
||||
setup_list_stub('security_group_rules', api_data, query)
|
||||
if conv:
|
||||
cmd.get_client().AndReturn(self.client)
|
||||
sec_ids = set()
|
||||
for n in data['data']:
|
||||
sec_ids.add(n[1])
|
||||
sec_ids.add(n[2])
|
||||
for n in api_data:
|
||||
sec_ids.add(n['security_group_id'])
|
||||
if n.get('remote_group_id'):
|
||||
sec_ids.add(n['remote_group_id'])
|
||||
filters = ''
|
||||
for id in sec_ids:
|
||||
filters = filters + "&id=%s" % id
|
||||
|
@ -397,50 +381,131 @@ class CLITestV20SecurityGroupsJSON(test_cli20.CLITestV20Base):
|
|||
self.mox.VerifyAll()
|
||||
self.mox.UnsetStubs()
|
||||
# Check columns
|
||||
self.assertEqual(result[0], expected['cols'])
|
||||
self.assertEqual(expected['cols'], result[0])
|
||||
# Check data
|
||||
_result = [x for x in result[1]]
|
||||
self.assertEqual(len(_result), len(expected['data']))
|
||||
for res, exp in zip(_result, expected['data']):
|
||||
self.assertEqual(len(res), len(exp))
|
||||
self.assertEqual(res, exp)
|
||||
self.assertEqual(len(exp), len(res))
|
||||
self.assertEqual(exp, res)
|
||||
|
||||
def test_list_security_group_rules_extend_source_id(self):
|
||||
self._test_list_security_group_rules_extend()
|
||||
def _test_list_security_group_rules_extend_sg_name(
|
||||
self, expected_mode=None, args=(), conv=True, query_field=False):
|
||||
if query_field:
|
||||
field_filters = ['id', 'security_group_id',
|
||||
'remote_ip_prefix', 'remote_group_id']
|
||||
else:
|
||||
field_filters = None
|
||||
|
||||
def test_list_security_group_rules_extend_no_nameconv(self):
|
||||
expected = {'cols': ['id', 'security_group_id', 'remote_group_id'],
|
||||
'data': [('ruleid1', 'myid1', 'myid1'),
|
||||
('ruleid2', 'myid2', 'myid3'),
|
||||
('ruleid3', 'myid2', 'myid2')]}
|
||||
args = ['--no-nameconv']
|
||||
self._test_list_security_group_rules_extend(expected=expected,
|
||||
args=args, conv=False)
|
||||
data = [self._prepare_rule(rule_id='ruleid1', sg_id='myid1',
|
||||
remote_group_id='myid1',
|
||||
filters=field_filters),
|
||||
self._prepare_rule(rule_id='ruleid2', sg_id='myid2',
|
||||
remote_group_id='myid3',
|
||||
filters=field_filters),
|
||||
self._prepare_rule(rule_id='ruleid3', sg_id='myid2',
|
||||
remote_group_id='myid2',
|
||||
filters=field_filters),
|
||||
]
|
||||
|
||||
def test_list_security_group_rules_extend_with_columns(self):
|
||||
if expected_mode == 'noconv':
|
||||
expected = {'cols': ['id', 'security_group_id', 'remote_group_id'],
|
||||
'data': [('ruleid1', 'myid1', 'myid1'),
|
||||
('ruleid2', 'myid2', 'myid3'),
|
||||
('ruleid3', 'myid2', 'myid2')]}
|
||||
elif expected_mode == 'remote_group_id':
|
||||
expected = {'cols': ['id', 'security_group', 'remote_group'],
|
||||
'data': [('ruleid1', 'group1', 'group1'),
|
||||
('ruleid2', 'group2', 'group3'),
|
||||
('ruleid3', 'group2', 'group2')]}
|
||||
else:
|
||||
expected = {'cols': ['id', 'security_group', 'remote'],
|
||||
'data': [('ruleid1', 'group1', 'group1 (group)'),
|
||||
('ruleid2', 'group2', 'group3 (group)'),
|
||||
('ruleid3', 'group2', 'group2 (group)')]}
|
||||
|
||||
self._test_list_security_group_rules_extend(
|
||||
data, expected, args=args, conv=conv, query_fields=field_filters)
|
||||
|
||||
def test_list_security_group_rules_extend_remote_sg_name(self):
|
||||
args = '-c id -c security_group -c remote'.split()
|
||||
self._test_list_security_group_rules_extend_sg_name(args=args)
|
||||
|
||||
def test_list_security_group_rules_extend_sg_name_noconv(self):
|
||||
args = '--no-nameconv -c id -c security_group_id -c remote_group_id'
|
||||
args = args.split()
|
||||
self._test_list_security_group_rules_extend_sg_name(
|
||||
expected_mode='noconv', args=args, conv=False)
|
||||
|
||||
def test_list_security_group_rules_extend_sg_name_with_columns(self):
|
||||
args = '-c id -c security_group_id -c remote_group_id'.split()
|
||||
self._test_list_security_group_rules_extend(args=args)
|
||||
self._test_list_security_group_rules_extend_sg_name(
|
||||
expected_mode='remote_group_id', args=args)
|
||||
|
||||
def test_list_security_group_rules_extend_with_columns_no_id(self):
|
||||
def test_list_security_group_rules_extend_sg_name_with_columns_no_id(self):
|
||||
args = '-c id -c security_group -c remote_group'.split()
|
||||
self._test_list_security_group_rules_extend(args=args)
|
||||
self._test_list_security_group_rules_extend_sg_name(
|
||||
expected_mode='remote_group_id', args=args)
|
||||
|
||||
def test_list_security_group_rules_extend_with_fields(self):
|
||||
args = '-F id -F security_group_id -F remote_group_id'.split()
|
||||
self._test_list_security_group_rules_extend(args=args,
|
||||
query_field=True)
|
||||
def test_list_security_group_rules_extend_sg_name_with_fields(self):
|
||||
# NOTE: remote_ip_prefix is required to show "remote" column
|
||||
args = ('-F id -F security_group_id '
|
||||
'-F remote_ip_prefix -F remote_group_id').split()
|
||||
self._test_list_security_group_rules_extend_sg_name(
|
||||
args=args, query_field=True)
|
||||
|
||||
def test_list_security_group_rules_extend_with_fields_no_id(self):
|
||||
args = '-F id -F security_group -F remote_group'.split()
|
||||
self._test_list_security_group_rules_extend(args=args,
|
||||
query_field=True)
|
||||
def test_list_security_group_rules_extend_sg_name_with_fields_no_id(self):
|
||||
# NOTE: remote_ip_prefix is required to show "remote" column
|
||||
args = ('-F id -F security_group '
|
||||
'-F remote_ip_prefix -F remote_group').split()
|
||||
self._test_list_security_group_rules_extend_sg_name(args=args,
|
||||
query_field=True)
|
||||
|
||||
def _prepare_rule(self, direction=None, ethertype=None,
|
||||
def test_list_security_group_rules_extend_remote(self):
|
||||
args = '-c id -c security_group -c remote'.split()
|
||||
|
||||
data = [self._prepare_rule(rule_id='ruleid1', sg_id='myid1',
|
||||
remote_ip_prefix='172.16.18.0/24'),
|
||||
self._prepare_rule(rule_id='ruleid2', sg_id='myid2',
|
||||
remote_ip_prefix='172.16.20.0/24'),
|
||||
self._prepare_rule(rule_id='ruleid3', sg_id='myid2',
|
||||
remote_group_id='myid3')]
|
||||
expected = {'cols': ['id', 'security_group', 'remote'],
|
||||
'data': [('ruleid1', 'group1', '172.16.18.0/24 (CIDR)'),
|
||||
('ruleid2', 'group2', '172.16.20.0/24 (CIDR)'),
|
||||
('ruleid3', 'group2', 'group3 (group)')]}
|
||||
self._test_list_security_group_rules_extend(data, expected, args)
|
||||
|
||||
def test_list_security_group_rules_extend_proto_port(self):
|
||||
data = [self._prepare_rule(rule_id='ruleid1', sg_id='myid1',
|
||||
protocol='tcp',
|
||||
port_range_min=22, port_range_max=22),
|
||||
self._prepare_rule(rule_id='ruleid2', sg_id='myid2',
|
||||
direction='egress', ethertype='IPv6',
|
||||
protocol='udp',
|
||||
port_range_min=80, port_range_max=81),
|
||||
self._prepare_rule(rule_id='ruleid3', sg_id='myid2',
|
||||
protocol='icmp',
|
||||
remote_ip_prefix='10.2.0.0/16')]
|
||||
expected = {
|
||||
'cols': ['id', 'security_group', 'direction', 'ethertype',
|
||||
'protocol/port', 'remote'],
|
||||
'data': [
|
||||
('ruleid1', 'group1', 'ingress', 'IPv4', '22/tcp', 'any'),
|
||||
('ruleid2', 'group2', 'egress', 'IPv6', '80-81/udp', 'any'),
|
||||
('ruleid3', 'group2', 'ingress', 'IPv4', 'icmp',
|
||||
'10.2.0.0/16 (CIDR)')
|
||||
]}
|
||||
self._test_list_security_group_rules_extend(data, expected)
|
||||
|
||||
def _prepare_rule(self, rule_id=None, sg_id=None, tenant_id=None,
|
||||
direction=None, ethertype=None,
|
||||
protocol=None, port_range_min=None, port_range_max=None,
|
||||
remote_ip_prefix=None, remote_group_id=None):
|
||||
return {'id': str(uuid.uuid4()),
|
||||
'tenant_id': str(uuid.uuid4()),
|
||||
'security_group_id': str(uuid.uuid4()),
|
||||
remote_ip_prefix=None, remote_group_id=None,
|
||||
filters=None):
|
||||
rule = {'id': rule_id or str(uuid.uuid4()),
|
||||
'tenant_id': tenant_id or str(uuid.uuid4()),
|
||||
'security_group_id': sg_id or str(uuid.uuid4()),
|
||||
'direction': direction or 'ingress',
|
||||
'ethertype': ethertype or 'IPv4',
|
||||
'protocol': protocol,
|
||||
|
@ -448,6 +513,24 @@ class CLITestV20SecurityGroupsJSON(test_cli20.CLITestV20Base):
|
|||
'port_range_max': port_range_max,
|
||||
'remote_ip_prefix': remote_ip_prefix,
|
||||
'remote_group_id': remote_group_id}
|
||||
if filters:
|
||||
return dict([(k, v) for k, v in rule.items() if k in filters])
|
||||
else:
|
||||
return rule
|
||||
|
||||
def test__get_remote_both_unspecified(self):
|
||||
sg_rule = self._prepare_rule(remote_ip_prefix=None,
|
||||
remote_group_id=None)
|
||||
self.assertIsNone(securitygroup._get_remote(sg_rule))
|
||||
|
||||
def test__get_remote_remote_ip_prefix_specified(self):
|
||||
sg_rule = self._prepare_rule(remote_ip_prefix='172.16.18.0/24')
|
||||
self.assertEqual('172.16.18.0/24 (CIDR)',
|
||||
securitygroup._get_remote(sg_rule))
|
||||
|
||||
def test__get_remote_remote_group_specified(self):
|
||||
sg_rule = self._prepare_rule(remote_group_id='sg_id1')
|
||||
self.assertEqual('sg_id1 (group)', securitygroup._get_remote(sg_rule))
|
||||
|
||||
def test__get_protocol_port_all_none(self):
|
||||
sg_rule = self._prepare_rule()
|
||||
|
|
Loading…
Reference in New Issue