Improve object fields coverage and fixed error handling

- Fixed bad error handling.
- Removed unreachable code.
- Minor cleanup for consistency.

Change-Id: Ie632f3c1358a6d25a75dfc023fa053b89b32ebf4
This commit is contained in:
Erik Olof Gunnar Andersson 2023-11-04 15:33:37 -07:00
parent 30d8c70814
commit 5c88c50349
2 changed files with 277 additions and 62 deletions

View File

@ -33,9 +33,6 @@ class BooleanField(ovoo_fields.BooleanField):
class PolymorphicObject(ovoo_fields.Object):
def coerce(self, obj, attr, value):
if hasattr(value, '__bases__'):
check_value = value.__bases__[0]
super().coerce(obj, attr, check_value)
return value
@ -74,11 +71,9 @@ class IntegerFields(IntegerField):
return value
if value < self.min:
# return self.min
raise ValueError('Value must be >= {} for field {}'.format(
self.min, attr)
)
raise ValueError(f'Value must be >= {self.min} for field {attr}')
if self.max and value > self.max:
raise ValueError('Value too high for %s' % attr)
raise ValueError(f'Value too high for {attr}')
return value
@ -98,7 +93,7 @@ class StringFields(ovoo_fields.StringField):
else:
value = super().coerce(obj, attr, value)
if self.maxLength and len(value) > self.maxLength:
raise ValueError('Value too long for %s' % attr)
raise ValueError(f'Value too long for {attr}')
return value
@ -182,11 +177,11 @@ class DomainField(StringFields):
domain = value.split('.')
for host in domain:
if len(host) > 63:
raise ValueError("Host %s is too long" % host)
raise ValueError(f'Host {host} is too long')
if not value.endswith('.'):
raise ValueError("Domain %s does not end with a dot" % value)
raise ValueError(f'Domain {value} does not end with a dot')
if not constants.RE_ZONENAME.match(value):
raise ValueError("Domain %s is invalid" % value)
raise ValueError(f'Domain {value} is invalid')
return value
@ -197,10 +192,10 @@ class EmailField(StringFields):
def coerce(self, obj, attr, value):
value = super().coerce(obj, attr, value)
if value.count('@') != 1:
raise ValueError("%s is not an email" % value)
raise ValueError(f'{value} is not an email')
email = value.replace('@', '.')
if not constants.RE_ZONENAME.match("%s." % email):
raise ValueError("Email %s is invalid" % value)
if not constants.RE_ZONENAME.match(f'{email}.'):
raise ValueError(f'Email {value} is invalid')
return value
@ -215,11 +210,11 @@ class HostField(StringFields):
hostname = value.split('.')
for host in hostname:
if len(host) > 63:
raise ValueError("Host %s is too long" % host)
raise ValueError(f'Host {host} is too long')
if value.endswith('.') is False:
raise ValueError("Host name %s does not end with a dot" % value)
raise ValueError(f'Host name {value} does not end with a dot')
if not constants.RE_HOSTNAME.match(value):
raise ValueError("Host name %s is invalid" % value)
raise ValueError(f'Host name {value} is invalid')
return value
@ -234,11 +229,11 @@ class SRVField(StringFields):
srvtype = value.split('.')
for host in srvtype:
if len(host) > 63:
raise ValueError("Host %s is too long" % host)
raise ValueError(f'Host {host} is too long')
if value.endswith('.') is False:
raise ValueError("Host name %s does not end with a dot" % value)
raise ValueError(f'Host name {value} does not end with a dot')
if not constants.RE_SRV_HOST_NAME.match(value):
raise ValueError("Host name %s is not a SRV record" % value)
raise ValueError(f'Host name {value} is not a SRV record')
return value
@ -259,8 +254,8 @@ class Sshfp(StringFields):
def coerce(self, obj, attr, value):
value = super().coerce(obj, attr, value)
if not constants.RE_SSHFP_FINGERPRINT.match("%s" % value):
raise ValueError("Host name %s is not a SSHFP record" % value)
if not constants.RE_SSHFP_FINGERPRINT.match(f'{value}'):
raise ValueError(f'Host name {value} is not a SSHFP record')
return value
@ -271,7 +266,7 @@ class TldField(StringFields):
def coerce(self, obj, attr, value):
value = super().coerce(obj, attr, value)
if not constants.RE_TLDNAME.match(value):
raise ValueError("%s is not a TLD" % value)
raise ValueError(f'{value} is not a TLD')
return value
@ -281,11 +276,15 @@ class NaptrFlagsField(StringFields):
def coerce(self, obj, attr, value):
value = super().coerce(obj, attr, value)
if (len(value) > 255):
raise ValueError("NAPTR record flags field cannot be longer than"
" 255 characters" % value)
if not constants.RE_NAPTR_FLAGS.match("%s" % value):
raise ValueError("NAPTR record flags can be S, A, U and P" % value)
if len(value) > 255:
raise ValueError(
f'NAPTR record {value} flags field cannot be longer than '
'255 characters'
)
if not constants.RE_NAPTR_FLAGS.match(f'{value}'):
raise ValueError(
f'NAPTR record {value} flags can be S, A, U and P'
)
return value
@ -295,11 +294,15 @@ class NaptrServiceField(StringFields):
def coerce(self, obj, attr, value):
value = super().coerce(obj, attr, value)
if (len(value) > 255):
raise ValueError("NAPTR record service field cannot be longer than"
" 255 characters" % value)
if not constants.RE_NAPTR_SERVICE.match("%s" % value):
raise ValueError("%s NAPTR record service is invalid" % value)
if len(value) > 255:
raise ValueError(
f'NAPTR record {value} service field cannot be longer than '
'255 characters'
)
if not constants.RE_NAPTR_SERVICE.match(f'{value}'):
raise ValueError(
f'{value} NAPTR record service is invalid'
)
return value
@ -309,12 +312,14 @@ class NaptrRegexpField(StringFields):
def coerce(self, obj, attr, value):
value = super().coerce(obj, attr, value)
if (len(value) > 255):
raise ValueError("NAPTR record regexp field cannot be longer than"
" 255 characters" % value)
if len(value) > 255:
raise ValueError(
f'NAPTR record {value} regexp field cannot be longer than '
'255 characters'
)
if value:
if not constants.RE_NAPTR_REGEXP.match("%s" % value):
raise ValueError("%s NAPTR record is invalid" % value)
if not constants.RE_NAPTR_REGEXP.match(f'{value}'):
raise ValueError(f'{value} NAPTR record is invalid')
return value
@ -327,20 +332,19 @@ class CaaPropertyField(StringFields):
prpt = value.split(' ', 1)
tag = prpt[0]
val = prpt[1]
if (tag == 'issue' or tag == 'issuewild'):
if tag == 'issue' or tag == 'issuewild':
entries = val.split(';')
idn = entries.pop(0)
domain = idn.split('.')
for host in domain:
if len(host) > 63:
raise ValueError("Host %s is too long" % host)
raise ValueError(f'Host {host} is too long')
idn_with_dot = idn + '.'
if not constants.RE_ZONENAME.match(idn_with_dot):
raise ValueError("Domain %s is invalid" % idn)
raise ValueError(f'Domain {idn} is invalid')
for entry in entries:
if not constants.RE_KVP.match(entry):
raise ValueError("%s is not a valid key-value pair" %
entry)
raise ValueError(f'{entry} is not a valid key-value pair')
elif tag == 'iodef':
if constants.RE_URL_MAIL.match(val):
parts = val.split('@')
@ -348,25 +352,26 @@ class CaaPropertyField(StringFields):
domain = idn.split('.')
for host in domain:
if len(host) > 63:
raise ValueError("Host %s is too long" % host)
raise ValueError(f'Host {host} is too long')
idn_with_dot = idn + '.'
if not constants.RE_ZONENAME.match(idn_with_dot):
raise ValueError("Domain %s is invalid" % idn)
raise ValueError(f'Domain {idn} is invalid')
elif constants.RE_URL_HTTP.match(val):
parts = val.split('/')
idn = parts[2]
domain = idn.split('.')
for host in domain:
if len(host) > 63:
raise ValueError("Host %s is too long" % host)
raise ValueError(f'Host {host} is too long')
idn_with_dot = idn + '.'
if not constants.RE_ZONENAME.match(idn_with_dot):
raise ValueError("Domain %s is invalid" % idn)
raise ValueError(f'Domain {idn} is invalid')
else:
raise ValueError("%s is not a valid URL" % val)
raise ValueError(f'{val} is not a valid URL')
else:
raise ValueError("Property tag %s must be 'issue', 'issuewild'"
" or 'iodef'" % value)
raise ValueError(
f"Property tag {value} must be 'issue', 'issuewild' or 'iodef'"
)
return value
@ -376,9 +381,10 @@ class CertTypeField(StringFields):
def coerce(self, obj, attr, value):
value = super().coerce(obj, attr, value)
if not constants.RE_CERT_TYPE.match("%s" % value):
raise ValueError("Cert type %s is not a valid Mnemonic or "
"value" % value)
if not constants.RE_CERT_TYPE.match(f'{value}'):
raise ValueError(
f'Cert type {value} is not a valid Mnemonic or value'
)
return value
@ -388,9 +394,10 @@ class CertAlgoField(StringFields):
def coerce(self, obj, attr, value):
value = super().coerce(obj, attr, value)
if not constants.RE_CERT_ALGO.match("%s" % value):
raise ValueError("Cert Algo %s is not a valid Mnemonic or "
"value" % value)
if not constants.RE_CERT_ALGO.match(f'{value}'):
raise ValueError(
f'Cert Algo {value} is not a valid Mnemonic or value'
)
return value
@ -407,10 +414,7 @@ class AnyField(ovoo_fields.AutoTypedField):
class BaseObject(ovoo_fields.FieldType):
@staticmethod
def coerce(obj, attr, value):
if isinstance(value, object):
return value
else:
raise ValueError("BaseObject valid values are not valid")
return value
class BaseObjectField(ovoo_fields.AutoTypedField):
@ -429,7 +433,7 @@ class IPOrHost(IPV4AndV6AddressField):
value = super().coerce(obj, attr, value)
except ValueError:
if not constants.RE_ZONENAME.match(value):
raise ValueError("%s is not IP address or host name" % value)
raise ValueError(f'{value} is not IP address or host name')
return value
@ -444,7 +448,7 @@ class DenylistFields(StringFields):
return self._null(obj, attr)
# determine the validity if a regex expression filter has been used.
msg = "%s is not a valid regular expression" % value
msg = f'{value} is not a valid regular expression'
if not len(value):
raise ValueError(msg)
try:

View File

@ -0,0 +1,211 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from unittest import mock
import oslotest.base
from designate.objects import fields
class EnumTest(oslotest.base.BaseTestCase):
def test_get_schema(self):
result = fields.Enum('valid').get_schema()
self.assertEqual('valid', result['enum'])
self.assertEqual('any', result['type'])
class DomainFieldTest(oslotest.base.BaseTestCase):
def test_not_valid_domain_field(self):
sub_domain = 'a' * 64
self.assertRaisesRegex(
ValueError,
f'Host {sub_domain} is too long',
fields.DomainField().coerce, mock.Mock(), mock.Mock(),
f'{sub_domain}.example.org.'
)
class EmailFieldTest(oslotest.base.BaseTestCase):
def test_not_valid_email_field(self):
self.assertRaisesRegex(
ValueError,
'- is not an email',
fields.EmailField().coerce, mock.Mock(), mock.Mock(),
'-'
)
self.assertRaisesRegex(
ValueError,
'Email @ is invalid',
fields.EmailField().coerce, mock.Mock(), mock.Mock(),
'@'
)
class HostFieldTest(oslotest.base.BaseTestCase):
def test_is_none(self):
self.assertIsNone(
fields.HostField(nullable=True).coerce(
mock.Mock(), mock.Mock(), None
)
)
def test_not_valid_host_field(self):
sub_domain = 'a' * 64
self.assertRaisesRegex(
ValueError,
f'Host {sub_domain} is too long',
fields.HostField().coerce, mock.Mock(), mock.Mock(),
f'{sub_domain}.example.org.'
)
self.assertRaisesRegex(
ValueError,
'Host name example.org does not end with a dot',
fields.HostField().coerce, mock.Mock(), mock.Mock(),
'example.org'
)
class SRVFieldTest(oslotest.base.BaseTestCase):
def test_is_none(self):
self.assertIsNone(
fields.SRVField(nullable=True).coerce(
mock.Mock(), mock.Mock(), None
)
)
def test_not_valid_srv_field(self):
sub_domain = 'a' * 64
self.assertRaisesRegex(
ValueError,
f'Host {sub_domain} is too long',
fields.SRVField().coerce, mock.Mock(), mock.Mock(),
f'{sub_domain}.example.org.'
)
self.assertRaisesRegex(
ValueError,
'Host name example.org does not end with a dot',
fields.SRVField().coerce, mock.Mock(), mock.Mock(),
'example.org'
)
class TxtFieldTest(oslotest.base.BaseTestCase):
def test_not_valid_txt(self):
self.assertRaisesRegex(
ValueError,
r"Do NOT put '\\' into end of TXT record",
fields.TxtField().coerce, mock.Mock(), mock.Mock(),
" \\"
)
class SshfpTest(oslotest.base.BaseTestCase):
def test_not_valid_sshfp(self):
self.assertRaisesRegex(
ValueError,
'Host name is not a SSHFP record',
fields.Sshfp().coerce, mock.Mock(), mock.Mock(),
''
)
class NaptrFlagsFieldTest(oslotest.base.BaseTestCase):
def test_not_valid_naptr_flags(self):
record = 'A' * 256
self.assertRaisesRegex(
ValueError,
f'NAPTR record {record} flags field cannot be longer than 255 '
'characters',
fields.NaptrFlagsField().coerce, mock.Mock(), mock.Mock(),
record
)
self.assertRaisesRegex(
ValueError,
'NAPTR record record flags can be S, A, U and P',
fields.NaptrFlagsField().coerce, mock.Mock(), mock.Mock(),
'record'
)
class NaptrServiceFieldTest(oslotest.base.BaseTestCase):
def test_not_valid_naptr_service(self):
record = 'A' * 256
self.assertRaisesRegex(
ValueError,
f'NAPTR record {record} service field cannot be longer than 255 '
'characters',
fields.NaptrServiceField().coerce, mock.Mock(), mock.Mock(),
record
)
self.assertRaisesRegex(
ValueError,
'_ NAPTR record service is invalid',
fields.NaptrServiceField().coerce, mock.Mock(), mock.Mock(),
'_'
)
class NaptrRegexpFieldTest(oslotest.base.BaseTestCase):
def test_not_valid_naptr_regexp(self):
record = 'A' * 256
self.assertRaisesRegex(
ValueError,
f'NAPTR record {record} regexp field cannot be longer than 255 '
'characters',
fields.NaptrRegexpField().coerce, mock.Mock(), mock.Mock(),
record
)
self.assertRaisesRegex(
ValueError,
'_ NAPTR record is invalid',
fields.NaptrRegexpField().coerce, mock.Mock(), mock.Mock(),
'_'
)
class CertTypeFieldTest(oslotest.base.BaseTestCase):
def test_not_valid_cert_type(self):
self.assertRaisesRegex(
ValueError,
'Cert type _ is not a valid Mnemonic or value',
fields.CertTypeField().coerce, mock.Mock(), mock.Mock(),
'_'
)
class CertAlgoFieldTest(oslotest.base.BaseTestCase):
def test_not_valid_cert_algo(self):
self.assertRaisesRegex(
ValueError,
'Cert Algo _ is not a valid Mnemonic or value',
fields.CertAlgoField().coerce, mock.Mock(), mock.Mock(),
'_'
)
class IPOrHostTest(oslotest.base.BaseTestCase):
def test_valid(self):
self.assertEqual(
'example.org.',
fields.IPOrHost().coerce(mock.Mock(), mock.Mock(), 'example.org.')
)
self.assertEqual(
'192.0.2.1',
fields.IPOrHost().coerce(mock.Mock(), mock.Mock(), '192.0.2.1')
)
def test_not_valid_ip_or_host(self):
self.assertRaisesRegex(
ValueError,
'example.org is not IP address or host name',
fields.IPOrHost().coerce, mock.Mock(), mock.Mock(), 'example.org'
)