bilean/bilean/common/schema.py

431 lines
13 KiB
Python

# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import numbers
import six
from oslo_log import log as logging
from oslo_utils import strutils
from bilean.common import exception
from bilean.common.i18n import _
LOG = logging.getLogger(__name__)
class AnyIndexDict(collections.Mapping):
'''Convenience schema for a list.'''
def __init__(self, value):
self.value = value
def __getitem__(self, key):
if key != '*' and not isinstance(key, six.integer_types):
raise KeyError(_('Invalid key %s') % str(key))
return self.value
def __iter__(self):
yield '*'
def __len__(self):
return 1
class Schema(collections.Mapping):
'''Class for validating rule specifications.'''
KEYS = (
TYPE, DESCRIPTION, DEFAULT, REQUIRED, SCHEMA, UPDATABLE,
CONSTRAINTS, READONLY,
) = (
'type', 'description', 'default', 'required', 'schema', 'updatable',
'constraints', 'readonly',
)
TYPES = (
INTEGER, STRING, NUMBER, BOOLEAN, MAP, LIST,
) = (
'Integer', 'String', 'Number', 'Boolean', 'Map', 'List',
)
def __init__(self, description=None, default=None,
required=False, schema=None, updatable=False,
readonly=False, constraints=None):
if schema is not None:
if type(self) not in (List, Map):
msg = _('Schema valid only for List or Map, not '
'"%s"') % self[self.TYPE]
raise exception.InvalidSchemaError(message=msg)
if self[self.TYPE] == self.LIST:
self.schema = AnyIndexDict(schema)
else:
self.schema = schema
self.description = description
self.default = default
self.required = required
self.updatable = updatable
self.constraints = constraints or []
self.readonly = readonly
self._len = None
def has_default(self):
return self.default is not None
def get_default(self):
return self.resolve(self.default)
def _validate_default(self, context):
if self.default is None:
return
try:
self.validate(self.default, context)
except (ValueError, TypeError) as exc:
raise exception.InvalidSchemaError(
message=_('Invalid default %(default)s (%(exc)s)') %
dict(default=self.default, exc=exc))
def validate(self, context=None):
'''Validates the schema.
This method checks if the schema itself is valid.
'''
self._validate_default(context)
# validated nested schema: List or Map
if self.schema:
if isinstance(self.schema, AnyIndexDict):
self.schema.value.validate(context)
else:
for nested_schema in self.schema.values():
nested_schema.validate(context)
def validate_constraints(self, value, context=None, skipped=None):
if not skipped:
skipped = []
try:
for constraint in self.constraints:
if type(constraint) not in skipped:
constraint.validate(value, context)
except ValueError as ex:
raise exception.SpecValidationFailed(message=six.text_type(ex))
def __getitem__(self, key):
if key == self.DESCRIPTION:
if self.description is not None:
return self.description
elif key == self.DEFAULT:
if self.default is not None:
return self.default
elif key == self.SCHEMA:
if self.schema is not None:
return dict((n, dict(s)) for n, s in self.schema.items())
elif key == self.REQUIRED:
return self.required
elif key == self.READONLY:
return self.readonly
elif key == self.CONSTRAINTS:
if self.constraints:
return [dict(c) for c in self.constraints]
raise KeyError(key)
def __iter__(self):
for k in self.KEYS:
try:
self[k]
except KeyError:
pass
else:
yield k
def __len__(self):
if self._len is None:
self._len = len(list(iter(self)))
return self._len
class Boolean(Schema):
def __getitem__(self, key):
if key == self.TYPE:
return self.BOOLEAN
else:
return super(Boolean, self).__getitem__(key)
def to_schema_type(self, value):
return strutils.bool_from_string(str(value), strict=True)
def resolve(self, value):
if str(value).lower() not in ('true', 'false'):
msg = _('The value "%s" is not a valid Boolean') % value
raise exception.SpecValidationFailed(message=msg)
return strutils.bool_from_string(value, strict=True)
def validate(self, value, context=None):
if isinstance(value, bool):
return
self.resolve(value)
class Integer(Schema):
def __getitem__(self, key):
if key == self.TYPE:
return self.INTEGER
else:
return super(Integer, self).__getitem__(key)
def to_schema_type(self, value):
if isinstance(value, six.integer_types):
return value
try:
num = int(value)
except ValueError:
raise ValueError(_('%s is not an intger.') % num)
return num
def resolve(self, value):
try:
return int(value)
except (TypeError, ValueError):
msg = _('The value "%s" cannot be converted into an '
'integer.') % value
raise exception.SpecValidationFailed(message=msg)
def validate(self, value, context=None):
if not isinstance(value, six.integer_types):
value = self.resolve(value)
self.validate_constraints(value, self, context)
class String(Schema):
def __getitem__(self, key):
if key == self.TYPE:
return self.STRING
else:
return super(String, self).__getitem__(key)
def to_schema_type(self, value):
return str(value)
def resolve(self, value):
try:
return str(value)
except (TypeError, ValueError):
raise
def validate(self, value, context=None):
if not isinstance(value, six.string_types):
msg = _('The value "%s" cannot be converted into a '
'string.') % value
raise exception.SpecValidationFailed(message=msg)
self.resolve(value)
self.validate_constraints(value, self, context)
class Number(Schema):
def __getitem__(self, key):
if key == self.TYPE:
return self.NUMBER
else:
return super(Number, self).__getitem__(key)
def to_schema_type(self, value):
if isinstance(value, numbers.Number):
return value
try:
return int(value)
except ValueError:
return float(value)
def resolve(self, value):
if isinstance(value, numbers.Number):
return value
try:
return int(value)
except ValueError:
return float(value)
def validate(self, value, context=None):
if isinstance(value, numbers.Number):
return
self.resolve(value)
self.resolve_constraints(value, self, context)
class List(Schema):
def __getitem__(self, key):
if key == self.TYPE:
return self.LIST
else:
return super(List, self).__getitem__(key)
def _get_children(self, values, keys, context):
sub_schema = self.schema
if sub_schema is not None:
# We have a child schema specified for list elements
# Fake a dict of array elements, since we have only one schema
schema_arr = dict((k, sub_schema[k]) for k in keys)
subspec = Spec(schema_arr, dict(values))
subspec.validate()
return ((k, subspec[k]) for k in keys)
else:
return values
def get_default(self):
if not isinstance(self.default, collections.Sequence):
raise TypeError(_('"%s" is not a List') % self.default)
return self.default
def resolve(self, value, context=None):
if not isinstance(value, collections.Sequence):
raise TypeError(_('"%s" is not a List') % value)
return [v[1] for v in self._get_children(enumerate(value),
list(range(len(value))),
context)]
def validate(self, value, context=None):
if not isinstance(value, collections.Mapping):
raise TypeError(_('"%s" is not a Map') % value)
for key, child in self.schema.items():
item_value = value.get(key)
child.validate(item_value, context)
class Map(Schema):
def __getitem__(self, key):
if key == self.TYPE:
return self.MAP
else:
return super(Map, self).__getitem__(key)
def _get_children(self, values, context=None):
# There are cases where the Map is not specified to the very detailed
# levels, we treat them as valid specs as well.
if self.schema is None:
return values
sub_schema = self.schema
if sub_schema is not None:
# sub_schema shoud be a dict here
subspec = Spec(sub_schema, dict(values))
subspec.validate()
return ((k, subspec[k]) for k in sub_schema)
else:
return values
def get_default(self):
if not isinstance(self.default, collections.Mapping):
raise TypeError(_('"%s" is not a Map') % self.default)
return self.default
def resolve(self, value, context=None):
if not isinstance(value, collections.Mapping):
raise TypeError(_('"%s" is not a Map') % value)
return dict(self._get_children(six.iteritems(value), context))
def validate(self, value, context=None):
if not isinstance(value, collections.Mapping):
raise TypeError(_('"%s" is not a Map') % value)
for key, child in self.schema.items():
item_value = value.get(key)
child.validate(item_value, context)
class Spec(collections.Mapping):
'''A class that contains all spec items.'''
def __init__(self, schema, data):
self._schema = schema
self._data = data
def validate(self):
'''Validate the schema.'''
for (k, s) in self._schema.items():
try:
# validate through resolve
self.resolve_value(k)
except (TypeError, ValueError) as err:
msg = _('Spec validation error (%(key)s): %(err)s') % dict(
key=k, err=six.text_type(err))
raise exception.SpecValidationFailed(message=msg)
for key in self._data:
if key not in self._schema:
msg = _('Unrecognizable spec item "%s"') % key
raise exception.SpecValidationFailed(message=msg)
def resolve_value(self, key):
if key not in self:
raise KeyError(_('Invalid spec item: "%s"') % key)
schema_item = self._schema[key]
if key in self._data:
raw_value = self._data[key]
return schema_item.resolve(raw_value)
elif schema_item.has_default():
return schema_item.get_default()
elif schema_item.required:
raise ValueError(_('Required spec item "%s" not assigned') % key)
def __getitem__(self, key):
'''Lazy evaluation for spec items.'''
return self.resolve_value(key)
def __len__(self):
'''Number of items in the spec.
A spec always contain all keys though some may be not specified.
'''
return len(self._schema)
def __contains__(self, key):
return key in self._schema
def __iter__(self):
return iter(self._schema)
def get_spec_version(spec):
if not isinstance(spec, dict):
msg = _('The provided spec is not a map.')
raise exception.SpecValidationFailed(message=msg)
if 'type' not in spec:
msg = _("The 'type' key is missing from the provided spec map.")
raise exception.SpecValidationFailed(message=msg)
if 'version' not in spec:
msg = _("The 'version' key is missing from the provided spec map.")
raise exception.SpecValidationFailed(message=msg)
return (spec['type'], spec['version'])