Merge "Stop mixing IPs and domains"

This commit is contained in:
Jenkins 2015-08-13 09:13:22 +00:00 committed by Gerrit Code Review
commit 27dfeb252c
3 changed files with 84 additions and 190 deletions

View File

@ -14,7 +14,6 @@
from __future__ import absolute_import
import logging
import socket
import netaddr
@ -67,83 +66,53 @@ def iter_alternative_names(csr, types, fail_other_types=True):
"'%s'" % (parts[1], parts[0]))
def check_networks(domain, allowed_networks):
"""Check the domain resolves to an IP that is within an allowed network
def check_networks(ip, allowed_networks):
"""Check the IP is within an allowed network."""
if not isinstance(ip, netaddr.IPAddress):
raise TypeError("ip must be a netaddr ip address")
Resolve all of the IP addresses for 'domain' and ensure that
at least one of the IP addresses is listed in allowed_networks for the
deployment.
"""
if not allowed_networks:
# no valid networks were provided, so we can't make any assertions
logger.warning("No valid network IP ranges were given, skipping")
return True
try:
networks = socket.gethostbyname_ex(domain)
except socket.gaierror:
# the domain is not a valid ip address
return False
for possible_network in networks[2]:
ip = netaddr.IPAddress(possible_network)
if any(ip in netaddr.IPNetwork(net) for net in allowed_networks):
return True
if any(ip in netaddr.IPNetwork(net) for net in allowed_networks):
return True
return False
def check_networks_strict(domain, allowed_networks):
"""Check the domain resolves to an IP that is within an allowed network
Resolve all of the IP addresses for 'domain' and ensure that
at each of the IP addresses is listed in allowed_networks for the
deployment. This is the stricter form of check_networks.
"""
try:
networks = socket.gethostbyname_ex(domain)[2]
except socket.gaierror:
# the domain is not a valid ip address
return False
for possible_network in networks:
ip = netaddr.IPAddress(possible_network)
if not any(ip in netaddr.IPNetwork(net) for net in allowed_networks):
return False
return True
def common_name(csr, allowed_domains=[], allowed_networks=[], **kwargs):
"""Check the CN entry is a known domain.
Refuse requests for certificates if they contain multiple CN
entries, or the domain does not match the list of known suffixes
or network ranges.
entries, or the domain does not match the list of known suffixes.
"""
alt_present = any(ext.get_name() == "subjectAltName"
for ext in csr.get_extensions())
CNs = csr.get_subject().get_entries_by_nid(x509_name.NID_commonName)
if alt_present:
if len(CNs) > 1:
raise ValidationError("Too many CNs in the request")
else:
if len(CNs) > 1:
raise ValidationError("Too many CNs in the request")
if not alt_present:
# rfc5280#section-4.2.1.6 says so
if len(csr.get_subject()) == 0:
if len(CNs) == 0:
raise ValidationError("Alt subjects have to exist if the main"
" subject doesn't")
if len(CNs) > 0:
cn = csr_get_cn(csr)
if not (check_domains(cn, allowed_domains)):
raise ValidationError("Domain '%s' not allowed (does not match"
" known domains)" % cn)
if not (check_networks(cn, allowed_networks)):
raise ValidationError("Network '%s' not allowed (does not match"
" known networks)" % cn)
try:
# is it an IP rather than domain?
ip = netaddr.IPAddress(cn)
if not (check_networks(ip, allowed_networks)):
raise ValidationError("Address '%s' not allowed (does not "
"match known networks)" % cn)
except netaddr.AddrFormatError:
if not (check_domains(cn, allowed_domains)):
raise ValidationError("Domain '%s' not allowed (does not "
"match known domains)" % cn)
def alternative_names(csr, allowed_domains=[], **kwargs):
@ -156,7 +125,7 @@ def alternative_names(csr, allowed_domains=[], **kwargs):
for name_type, name in iter_alternative_names(csr, ['DNS']):
if not check_domains(name, allowed_domains):
raise ValidationError("Domain '%s' not allowed (doesn't"
" match known domains or networks)"
" match known domains)"
% name)
@ -169,11 +138,14 @@ def alternative_names_ip(csr, allowed_domains=[], allowed_networks=[],
"""
for name_type, name in iter_alternative_names(csr, ['DNS', 'IP Address']):
if not (check_domains(name, allowed_domains) or
check_networks(name, allowed_networks)):
if name_type == 'DNS' and not check_domains(name, allowed_domains):
raise ValidationError("Domain '%s' not allowed (doesn't"
" match known domains or networks)"
% name)
" match known domains)" % name)
if name_type == 'IP Address':
ip = netaddr.IPAddress(name)
if not check_networks(ip, allowed_networks):
raise ValidationError("Address '%s' not allowed (doesn't"
" match known networks)" % name)
def blacklist_names(csr, domains=[], **kwargs):

View File

@ -14,11 +14,10 @@
# License for the specific language governing permissions and limitations
# under the License.
import socket
import textwrap
import unittest
import mock
import netaddr
from anchor import validators
from anchor.X509 import signing_request
@ -100,58 +99,17 @@ class TestBaseValidators(unittest.TestCase):
self.assertTrue(validators.check_domains(test_domain, test_allowed))
self.assertFalse(validators.check_domains('gmail.com', test_allowed))
@mock.patch('socket.gethostbyname_ex')
def test_check_networks_bad_domain(self, gethostbyname_ex):
gethostbyname_ex.side_effect = socket.gaierror()
bad_domain = 'bad!$domain'
allowed_networks = ['127/8', '10/8']
self.assertFalse(validators.check_networks(
bad_domain, allowed_networks))
def test_check_networks(self):
good_ip = netaddr.IPAddress('10.2.3.4')
bad_ip = netaddr.IPAddress('88.2.3.4')
test_allowed = ['10/8']
self.assertTrue(validators.check_networks(good_ip, test_allowed))
self.assertFalse(validators.check_networks(bad_ip, test_allowed))
@mock.patch('socket.gethostbyname_ex')
def test_check_networks_both(self, gethostbyname_ex):
allowed_networks = ['15/8', '74.125/16']
gethostbyname_ex.return_value = (
'example.com',
[],
[
'74.125.224.64',
'74.125.224.67',
'74.125.224.68',
'74.125.224.70',
]
)
self.assertTrue(validators.check_networks(
'example.com', allowed_networks))
self.assertTrue(validators.check_networks_strict(
'example.com', allowed_networks))
def test_check_networks_invalid(self):
with self.assertRaises(TypeError):
validators.check_networks('1.2.3.4', ['10/8'])
gethostbyname_ex.return_value = ('example.com', [], ['12.2.2.2'])
self.assertFalse(validators.check_networks(
'example.com', allowed_networks))
gethostbyname_ex.return_value = (
'example.com',
[],
[
'15.8.2.2',
'15.8.2.1',
'16.1.1.1',
]
)
self.assertFalse(validators.check_networks_strict(
'example.com', allowed_networks))
@mock.patch('socket.gethostbyname_ex')
def test_check_networks_exception(self, gethostbyname_ex):
gethostbyname_ex.side_effect = socket.gaierror()
self.assertFalse(
validators.check_networks('mock', ['mock']),
)
@mock.patch('socket.gethostbyname_ex')
def test_check_networks_strict_exception(self, gethostbyname_ex):
gethostbyname_ex.side_effect = socket.gaierror()
self.assertFalse(
validators.check_networks_strict('mock', ['mock']),
)
def test_check_networks_passthrough(self):
good_ip = netaddr.IPAddress('10.2.3.4')
self.assertTrue(validators.check_networks(good_ip, []))

View File

@ -17,6 +17,7 @@
import unittest
import mock
import netaddr
from anchor import validators
from anchor.X509 import name as x509_name
@ -29,46 +30,17 @@ class TestValidators(unittest.TestCase):
def tearDown(self):
super(TestValidators, self).tearDown()
@mock.patch('socket.gethostbyname_ex')
def test_check_networks_good(self, gethostbyname_ex):
def test_check_networks_good(self):
allowed_networks = ['15/8', '74.125/16']
gethostbyname_ex.return_value = (
'example.com',
[],
[
'74.125.224.64',
'74.125.224.67',
'74.125.224.68',
'74.125.224.70',
]
)
self.assertTrue(validators.check_networks(
'example.com', allowed_networks))
self.assertTrue(validators.check_networks_strict(
'example.com', allowed_networks))
netaddr.IPAddress('74.125.224.64'), allowed_networks))
@mock.patch('socket.gethostbyname_ex')
def test_check_networks_bad(self, gethostbyname_ex):
def test_check_networks_bad(self):
allowed_networks = ['15/8', '74.125/16']
gethostbyname_ex.return_value = ('example.com', [], ['12.2.2.2'])
self.assertFalse(validators.check_networks(
'example.com', allowed_networks))
netaddr.IPAddress('12.2.2.2'), allowed_networks))
gethostbyname_ex.return_value = (
'example.com',
['mock.mock'],
[
'15.8.2.2',
'15.8.2.1',
'16.1.1.1',
]
)
self.assertFalse(validators.check_networks_strict(
'example.com', allowed_networks))
@mock.patch('socket.gethostbyname_ex')
def test_check_domains_empty(self, gethostbyname_ex):
gethostbyname_ex.return_value = ('example.com', [], ['12.2.2.2'])
def test_check_domains_empty(self):
self.assertTrue(validators.check_domains(
'example.com', []))
@ -106,10 +78,7 @@ class TestValidators(unittest.TestCase):
self.assertEqual("Alt subjects have to exist if the main subject"
" doesn't", str(e.exception))
@mock.patch('socket.gethostbyname_ex')
def test_common_name_good_CN(self, gethostbyname_ex):
gethostbyname_ex.return_value = ('master.test.com', [], ['10.0.0.1'])
def test_common_name_good_CN(self):
cn_mock = mock.MagicMock()
cn_mock.get_value.return_value = 'master.test.com'
@ -125,14 +94,10 @@ class TestValidators(unittest.TestCase):
validators.common_name(
csr=csr_mock,
allowed_domains=['.test.com'],
allowed_networks=['10/8']
)
)
@mock.patch('socket.gethostbyname_ex')
def test_common_name_bad_CN(self, gethostbyname_ex):
gethostbyname_ex.return_value = ('master.test.com', [], ['10.0.0.1'])
def test_common_name_bad_CN(self):
name = x509_name.X509Name()
name.add_name_entry(x509_name.NID_commonName, 'test.baddomain.com')
@ -142,34 +107,29 @@ class TestValidators(unittest.TestCase):
with self.assertRaises(validators.ValidationError) as e:
validators.common_name(
csr=csr_mock,
allowed_domains=['.test.com'],
allowed_networks=['10/8'])
allowed_domains=['.test.com'])
self.assertEqual("Domain 'test.baddomain.com' not allowed (does not "
"match known domains)", str(e.exception))
def test_common_name_good_ip_CN(self):
cn_mock = mock.MagicMock()
cn_mock.get_value.return_value = '10.0.0.1'
def test_common_name_ip_good(self):
name = x509_name.X509Name()
name.add_name_entry(x509_name.NID_commonName, '10.1.1.1')
csr_config = {
'get_subject.return_value.__len__.return_value': 1,
'get_subject.return_value.get_entries_by_nid.return_value':
[cn_mock],
}
csr_mock = mock.MagicMock(**csr_config)
csr_mock = mock.MagicMock()
csr_mock.get_subject.return_value = name
self.assertEqual(
None,
validators.common_name(
csr=csr_mock,
allowed_domains=[],
allowed_domains=['.test.com'],
allowed_networks=['10/8']
)
)
def test_common_name_bad_ip_CN(self):
def test_common_name_ip_bad(self):
name = x509_name.X509Name()
name.add_name_entry(x509_name.NID_commonName, '12.0.0.1')
name.add_name_entry(x509_name.NID_commonName, '15.1.1.1')
csr_mock = mock.MagicMock()
csr_mock.get_subject.return_value = name
@ -177,15 +137,12 @@ class TestValidators(unittest.TestCase):
with self.assertRaises(validators.ValidationError) as e:
validators.common_name(
csr=csr_mock,
allowed_domains=[],
allowed_domains=['.test.com'],
allowed_networks=['10/8'])
self.assertEqual("Network '12.0.0.1' not allowed (does not match "
"known networks)", str(e.exception))
@mock.patch('socket.gethostbyname_ex')
def test_alternative_names_good_domain(self, gethostbyname_ex):
gethostbyname_ex.return_value = ('master.test.com', [], ['10.0.0.1'])
self.assertEqual("Address '15.1.1.1' not allowed (does not "
"match known networks)", str(e.exception))
def test_alternative_names_good_domain(self):
ext_mock = mock.MagicMock()
ext_mock.get_value.return_value = 'DNS:master.test.com'
ext_mock.get_name.return_value = 'subjectAltName'
@ -200,10 +157,7 @@ class TestValidators(unittest.TestCase):
)
)
@mock.patch('socket.gethostbyname_ex')
def test_alternative_names_bad_domain(self, gethostbyname_ex):
gethostbyname_ex.return_value = ('master.test.com', [], ['10.0.0.1'])
def test_alternative_names_bad_domain(self):
ext_mock = mock.MagicMock()
ext_mock.get_value.return_value = 'DNS:test.baddomain.com'
ext_mock.get_name.return_value = 'subjectAltName'
@ -216,7 +170,7 @@ class TestValidators(unittest.TestCase):
csr=csr_mock,
allowed_domains=['.test.com'])
self.assertEqual("Domain 'test.baddomain.com' not allowed (doesn't "
"match known domains or networks)", str(e.exception))
"match known domains)", str(e.exception))
def test_alternative_names_ext(self):
ext_mock = mock.MagicMock()
@ -233,10 +187,7 @@ class TestValidators(unittest.TestCase):
self.assertEqual("Alt name should have 2 parts, but found: 'BAD'",
str(e.exception))
@mock.patch('socket.gethostbyname_ex')
def test_alternative_names_ip_good(self, gethostbyname_ex):
gethostbyname_ex.return_value = ('master.test.com', [], ['10.0.0.1'])
def test_alternative_names_ip_good(self):
ext_mock = mock.MagicMock()
ext_mock.get_value.return_value = 'IP Address:10.1.1.1'
ext_mock.get_name.return_value = 'subjectAltName'
@ -253,9 +204,7 @@ class TestValidators(unittest.TestCase):
)
)
@mock.patch('socket.gethostbyname_ex')
def test_alternative_names_ip_bad(self, gethostbyname_ex):
gethostbyname_ex.return_value = ('master.test.com', [], ['10.0.0.1'])
def test_alternative_names_ip_bad(self):
ext_mock = mock.MagicMock()
ext_mock.get_value.return_value = 'IP Address:10.1.1.1'
@ -269,8 +218,23 @@ class TestValidators(unittest.TestCase):
csr=csr_mock,
allowed_domains=['.test.com'],
allowed_networks=['99/8'])
self.assertEqual("Domain '10.1.1.1' not allowed (doesn't match known "
"domains or networks)", str(e.exception))
self.assertEqual("Address '10.1.1.1' not allowed (doesn't match known "
"networks)", str(e.exception))
def test_alternative_names_ip_bad_domain(self):
ext_mock = mock.MagicMock()
ext_mock.get_value.return_value = 'DNS:test.baddomain.com'
ext_mock.get_name.return_value = 'subjectAltName'
csr_mock = mock.MagicMock()
csr_mock.get_extensions.return_value = [ext_mock]
with self.assertRaises(validators.ValidationError) as e:
validators.alternative_names_ip(
csr=csr_mock,
allowed_domains=['.test.com'])
self.assertEqual("Domain 'test.baddomain.com' not allowed (doesn't "
"match known domains)", str(e.exception))
def test_alternative_names_ip_ext(self):
ext_mock = mock.MagicMock()