Add validation to eliminate URIReference.is_valid
Even though URIReference.is_valid does the same thing (roughly), this adds to our existing Validator framework the ability to check that a URI is in fact compliant with the specification.
This commit is contained in:
parent
a3bca13363
commit
2cdfd8ed33
|
@ -90,3 +90,22 @@ class PasswordForbidden(ValidationError):
|
|||
)
|
||||
)
|
||||
self.uri = uri
|
||||
|
||||
|
||||
class InvalidComponentsError(ValidationError):
|
||||
"""Exception raised when one or more components are invalid."""
|
||||
|
||||
def __init__(self, uri, *component_names):
|
||||
"""Initialize the error with the invalid component name(s)."""
|
||||
verb = 'was'
|
||||
if len(component_names) > 1:
|
||||
verb = 'were'
|
||||
|
||||
self.uri = uri
|
||||
self.components = sorted(component_names)
|
||||
components = ', '.join(self.components)
|
||||
super(InvalidComponentsError, self).__init__(
|
||||
"{} {} found to be invalid".format(components, verb),
|
||||
uri,
|
||||
self.components,
|
||||
)
|
||||
|
|
|
@ -70,6 +70,7 @@ class Validator(object):
|
|||
'query': False,
|
||||
'fragment': False,
|
||||
}
|
||||
self.validated_components = self.required_components.copy()
|
||||
|
||||
def allow_schemes(self, *schemes):
|
||||
"""Require the scheme to be one of the provided schemes.
|
||||
|
@ -147,9 +148,36 @@ class Validator(object):
|
|||
self.allow_password = False
|
||||
return self
|
||||
|
||||
def check_validity_of(self, *components):
|
||||
"""Check the validity of the components provided.
|
||||
|
||||
This can be specified repeatedly.
|
||||
|
||||
.. versionadded:: 1.1
|
||||
|
||||
:param components:
|
||||
Names of components from :attr:`Validator.COMPONENT_NAMES`.
|
||||
:returns:
|
||||
The validator instance.
|
||||
:rtype:
|
||||
Validator
|
||||
"""
|
||||
components = [c.lower() for c in components]
|
||||
for component in components:
|
||||
if component not in self.COMPONENT_NAMES:
|
||||
raise ValueError(
|
||||
'"{}" is not a valid component'.format(component)
|
||||
)
|
||||
self.validated_components.update({
|
||||
component: True for component in components
|
||||
})
|
||||
return self
|
||||
|
||||
def require_presence_of(self, *components):
|
||||
"""Require the components provided.
|
||||
|
||||
This can be specified repeatedly.
|
||||
|
||||
.. versionadded:: 1.0
|
||||
|
||||
:param components:
|
||||
|
@ -186,6 +214,8 @@ class Validator(object):
|
|||
:raises PasswordForbidden:
|
||||
When a password is present in the userinfo component but is
|
||||
not permitted by configuration.
|
||||
:raises InvalidComponentsError:
|
||||
When a component was found to be invalid.
|
||||
"""
|
||||
if not self.allow_password:
|
||||
check_password(uri)
|
||||
|
@ -195,8 +225,15 @@ class Validator(object):
|
|||
for component, required in self.required_components.items()
|
||||
if required
|
||||
]
|
||||
validated_components = [
|
||||
component
|
||||
for component, required in self.validated_components.items()
|
||||
if required
|
||||
]
|
||||
if required_components:
|
||||
ensure_required_components_exist(uri, required_components)
|
||||
if validated_components:
|
||||
ensure_components_are_valid(uri, validated_components)
|
||||
|
||||
ensure_one_of(self.allowed_schemes, uri, 'scheme')
|
||||
ensure_one_of(self.allowed_hosts, uri, 'host')
|
||||
|
@ -337,3 +374,52 @@ def valid_ipv4_host_address(host):
|
|||
# If the host exists, and it might be IPv4, check each byte in the
|
||||
# address.
|
||||
return all([0 <= int(byte, base=10) <= 255 for byte in host.split('.')])
|
||||
|
||||
|
||||
_COMPONENT_VALIDATORS = {
|
||||
'scheme': scheme_is_valid,
|
||||
'path': path_is_valid,
|
||||
'query': query_is_valid,
|
||||
'fragment': fragment_is_valid,
|
||||
}
|
||||
|
||||
_SUBAUTHORITY_VALIDATORS = set(['userinfo', 'host', 'port'])
|
||||
|
||||
|
||||
def subauthority_component_is_valid(uri, component):
|
||||
"""Determine if the userinfo, host, and port are valid."""
|
||||
try:
|
||||
subauthority_dict = uri.authority_info()
|
||||
except exceptions.InvalidAuthority:
|
||||
return False
|
||||
|
||||
# If we can parse the authority into sub-components and we're not
|
||||
# validating the port, we can assume it's valid.
|
||||
if component != 'port':
|
||||
return True
|
||||
|
||||
try:
|
||||
port = int(subauthority_dict['port'])
|
||||
except TypeError:
|
||||
# If the port wasn't provided it'll be None and int(None) raises a
|
||||
# TypeError
|
||||
return True
|
||||
|
||||
return (0 <= port <= 65535)
|
||||
|
||||
|
||||
def ensure_components_are_valid(uri, validated_components):
|
||||
"""Assert that all components are valid in the URI."""
|
||||
invalid_components = set([])
|
||||
for component in validated_components:
|
||||
if component in _SUBAUTHORITY_VALIDATORS:
|
||||
if not subauthority_component_is_valid(uri, component):
|
||||
invalid_components.add(component)
|
||||
continue
|
||||
|
||||
validator = _COMPONENT_VALIDATORS[component]
|
||||
if not validator(getattr(uri, component)):
|
||||
invalid_components.add(component)
|
||||
|
||||
if invalid_components:
|
||||
raise exceptions.InvalidComponentsError(uri, *invalid_components)
|
||||
|
|
|
@ -52,6 +52,12 @@ def test_requiring_invalid_component():
|
|||
validators.Validator().require_presence_of('frob')
|
||||
|
||||
|
||||
def test_checking_validity_of_component():
|
||||
"""Verify that we validate components we're validating."""
|
||||
with pytest.raises(ValueError):
|
||||
validators.Validator().check_validity_of('frob')
|
||||
|
||||
|
||||
def test_use_of_password():
|
||||
"""Verify the behaviour of {forbid,allow}_use_of_password."""
|
||||
validator = validators.Validator()
|
||||
|
@ -182,6 +188,22 @@ def test_allowed_hosts_and_schemes(uri, failed_component):
|
|||
rfc3986.uri_reference('ssh://git.openstack.org/sigmavirus24'),
|
||||
rfc3986.uri_reference('ssh://ssh@git.openstack.org:22/sigmavirus24'),
|
||||
rfc3986.uri_reference('https://git.openstack.org:443/sigmavirus24'),
|
||||
rfc3986.uri_reference(
|
||||
'ssh://ssh@git.openstack.org:22/sigmavirus24?foo=bar#fragment'
|
||||
),
|
||||
rfc3986.uri_reference(
|
||||
'ssh://git.openstack.org:22/sigmavirus24?foo=bar#fragment'
|
||||
),
|
||||
rfc3986.uri_reference('ssh://git.openstack.org:22/?foo=bar#fragment'),
|
||||
rfc3986.uri_reference('ssh://git.openstack.org:22/sigmavirus24#fragment'),
|
||||
rfc3986.uri_reference('ssh://git.openstack.org:22/#fragment'),
|
||||
rfc3986.uri_reference('ssh://git.openstack.org:22/'),
|
||||
rfc3986.uri_reference('ssh://ssh@git.openstack.org:22/?foo=bar#fragment'),
|
||||
rfc3986.uri_reference(
|
||||
'ssh://ssh@git.openstack.org:22/sigmavirus24#fragment'
|
||||
),
|
||||
rfc3986.uri_reference('ssh://ssh@git.openstack.org:22/#fragment'),
|
||||
rfc3986.uri_reference('ssh://ssh@git.openstack.org:22/'),
|
||||
])
|
||||
def test_successful_complex_validation(uri):
|
||||
"""Verify we do not raise ValidationErrors for good URIs."""
|
||||
|
@ -193,4 +215,23 @@ def test_successful_complex_validation(uri):
|
|||
'22', '443',
|
||||
).require_presence_of(
|
||||
'scheme', 'host', 'path',
|
||||
).check_validity_of(
|
||||
'scheme', 'userinfo', 'host', 'port', 'path', 'query', 'fragment',
|
||||
).validate(uri)
|
||||
|
||||
|
||||
def test_invalid_uri_generates_error(invalid_uri):
|
||||
"""Verify we catch invalid URIs."""
|
||||
uri = rfc3986.uri_reference(invalid_uri)
|
||||
with pytest.raises(exceptions.InvalidComponentsError):
|
||||
validators.Validator().check_validity_of('host').validate(uri)
|
||||
|
||||
|
||||
def test_invalid_uri_with_invalid_path(invalid_uri):
|
||||
"""Verify we catch multiple invalid components."""
|
||||
uri = rfc3986.uri_reference(invalid_uri)
|
||||
uri = uri.copy_with(path='#foobar')
|
||||
with pytest.raises(exceptions.InvalidComponentsError):
|
||||
validators.Validator().check_validity_of(
|
||||
'host', 'path',
|
||||
).validate(uri)
|
||||
|
|
Loading…
Reference in New Issue