Refactor CIDR field to use netaddr.IPNetwork

This makes the CIDR field become the IPNetwork field (and associate
versioned fields). It also uses the netaddr.IPNetwork class, which
is what should have been done in the first place.

Wire format is unchanged, so we can make this without any version
bumps or upgrade impact.

Related to blueprint nova-network-objects

Change-Id: I1a3ab6a5f4c624f3da0ab1c78c75c6f1e474541f
This commit is contained in:
Dan Smith 2014-01-14 07:14:29 -08:00
parent 8633ab069d
commit 3994512ac6
5 changed files with 83 additions and 38 deletions

View File

@ -296,6 +296,33 @@ class IPV6Address(IPAddress):
return result
class IPNetwork(IPAddress):
@staticmethod
def coerce(obj, attr, value):
try:
return netaddr.IPNetwork(value)
except netaddr.AddrFormatError as e:
raise ValueError(str(e))
class IPV4Network(IPNetwork):
@staticmethod
def coerce(obj, attr, value):
try:
return netaddr.IPNetwork(value, version=4)
except netaddr.AddrFormatError as e:
raise ValueError(str(e))
class IPV6Network(IPNetwork):
@staticmethod
def coerce(obj, attr, value):
try:
return netaddr.IPNetwork(value, version=6)
except netaddr.AddrFormatError as e:
raise ValueError(str(e))
class CompoundFieldType(FieldType):
def __init__(self, element_type, **field_args):
self._element_type = Field(element_type, **field_args)
@ -397,30 +424,6 @@ class NetworkModel(FieldType):
return network_model.NetworkInfo.hydrate(value)
class CIDR(FieldType):
@staticmethod
def coerce(obj, attr, value):
try:
network, length = value.split('/')
except (ValueError, AttributeError):
raise ValueError(_('CIDR "%s" is not in proper form') % value)
try:
network = netaddr.IPAddress(network)
except netaddr.AddrFormatError:
raise ValueError(_('Network "%s" is not valid') % network)
try:
length = int(length)
assert (length >= 0)
except (ValueError, AssertionError):
raise ValueError(_('Netmask length "%s" is not valid') % length)
if ((network.version == 4 and length > 32) or
(network.version == 6 and length > 128)):
raise ValueError(_('Netmask length "%(length)s" is not valid '
'for IPv%(version)i address') %
{'length': length, 'version': network.version})
return value
class AutoTypedField(Field):
AUTO_TYPE = None
@ -464,6 +467,18 @@ class IPV6AddressField(AutoTypedField):
AUTO_TYPE = IPV6Address()
class IPNetworkField(AutoTypedField):
AUTO_TYPE = IPNetwork()
class IPV4NetworkField(AutoTypedField):
AUTO_TYPE = IPV4Network()
class IPV6NetworkField(AutoTypedField):
AUTO_TYPE = IPV6Network()
class DictOfStringsField(AutoTypedField):
AUTO_TYPE = Dict(String())

View File

@ -29,7 +29,7 @@ class SecurityGroupRule(base.NovaPersistentObject, base.NovaObject):
'protocol': fields.StringField(nullable=True),
'from_port': fields.IntegerField(nullable=True),
'to_port': fields.IntegerField(nullable=True),
'cidr': fields.Field(fields.CIDR(), nullable=True),
'cidr': fields.IPNetworkField(nullable=True),
'parent_group': fields.ObjectField('SecurityGroup', nullable=True),
'grantee_group': fields.ObjectField('SecurityGroup', nullable=True),
}

View File

@ -251,16 +251,43 @@ class TestNetworkModel(TestField):
self.from_primitive_values = [(model.json(), model)]
class TestCIDR(TestField):
class TestIPNetwork(TestField):
def setUp(self):
super(TestCIDR, self).setUp()
self.field = fields.Field(fields.CIDR())
good = ['192.168.0.1/24', '192.168.0.0/16', '192.168.0.0/8',
'192.168.0.0/0', '1.2.3.4/32', '1.2.3.4/22', '0/0',
'::1/128', '::1/64', '::1/0']
self.coerce_good_values = [(x, x) for x in good]
self.coerce_bad_values = ['192.168.0.0', '192.168.0.0/f',
'192.168.0.0/foo', '192.168.0.0/33',
super(TestIPNetwork, self).setUp()
self.field = fields.Field(fields.IPNetwork())
good = ['192.168.1.0/24', '0.0.0.0/0', '::1/128', '::1/64', '::1/0']
self.coerce_good_values = [(x, netaddr.IPNetwork(x)) for x in good]
self.coerce_bad_values = ['192.168.0.0/f', '192.168.0.0/foo',
'::1/129', '192.168.0.0/-1']
self.to_primitive_values = [(x, x) for x in good]
self.from_primitive_values = self.to_primitive_values
self.to_primitive_values = [(netaddr.IPNetwork(x), x)
for x in good]
self.from_primitive_values = [(x, netaddr.IPNetwork(x))
for x in good]
class TestIPV4Network(TestField):
def setUp(self):
super(TestIPV4Network, self).setUp()
self.field = fields.Field(fields.IPV4Network())
good = ['192.168.1.0/24', '0.0.0.0/0']
self.coerce_good_values = [(x, netaddr.IPNetwork(x)) for x in good]
self.coerce_bad_values = ['192.168.0.0/f', '192.168.0.0/foo',
'::1/129', '192.168.0.0/-1']
self.to_primitive_values = [(netaddr.IPNetwork(x), x)
for x in good]
self.from_primitive_values = [(x, netaddr.IPNetwork(x))
for x in good]
class TestIPV6Network(TestField):
def setUp(self):
super(TestIPV6Network, self).setUp()
self.field = fields.Field(fields.IPV6Network())
good = ['::1/128', '::1/64', '::1/0']
self.coerce_good_values = [(x, netaddr.IPNetwork(x)) for x in good]
self.coerce_bad_values = ['192.168.0.0/f', '192.168.0.0/foo',
'::1/129', '192.168.0.0/-1']
self.to_primitive_values = [(netaddr.IPNetwork(x), x)
for x in good]
self.from_primitive_values = [(x, netaddr.IPNetwork(x))
for x in good]

View File

@ -40,7 +40,10 @@ class _TestSecurityGroupRuleObject(object):
rule = security_group_rule.SecurityGroupRule.get_by_id(
self.context, 1)
for field in fake_rule:
self.assertEqual(fake_rule[field], rule[field])
if field == 'cidr':
self.assertEqual(fake_rule[field], str(rule[field]))
else:
self.assertEqual(fake_rule[field], rule[field])
sgrg.assert_called_with(self.context, 1)
def test_get_by_security_group(self):

View File

@ -400,7 +400,7 @@ class IptablesFirewallDriver(FirewallDriver):
args += self._build_icmp_rule(rule, version)
if rule['cidr']:
LOG.debug('Using cidr %r', rule['cidr'], instance=instance)
args += ['-s', rule['cidr']]
args += ['-s', str(rule['cidr'])]
fw_rules += [' '.join(args)]
else:
if rule['grantee_group']: