Source code for monk.validators

# coding: utf-8
#
#    Monk is an unobtrusive data modeling, manipulation and validation library.
#    Copyright © 2011—2014  Andrey Mikhaylenko
#
#    This file is part of Monk.
#
#    Monk is free software: you can redistribute it and/or modify
#    it under the terms of the GNU Lesser General Public License as published
#    by the Free Software Foundation, either version 3 of the License, or
#    (at your option) any later version.
#
#    Monk is distributed in the hope that it will be useful,
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#    GNU Lesser General Public License for more details.
#
#    You should have received a copy of the GNU Lesser General Public License
#    along with Monk.  If not, see <http://gnu.org/licenses/>.
"""
~~~~~~~~~~
Validators
~~~~~~~~~~
"""
__all__ = [
    'BaseValidator',

    # combinators
    'BaseCombinator',
    'All',
    'Any',

    # requirements
    'BaseRequirement',
    'Anything',
    'Exists',
    'IsA',
    'HasAttr',
    'Equals',
    'Contains',
    'InRange',
    'Length',
    'ListOf',
    'ListOfAll',
    'ListOfAny',
    'DictOf',

    # functions
    'translate',

    # special objects
    'MISSING',
]


import copy

from . import compat
from .errors import (
    CombinedValidationError, AtLeastOneFailed, AllFailed, ValidationError,
    NoDefaultValue, InvalidKeys, MissingKeys, StructureSpecificationError,
    DictValueError,
)


#: The value is valid if any of its items passes validation.
ITEM_STRATEGY_ANY = 'any'
#: The value is valid if all of its items pass validation.
ITEM_STRATEGY_ALL = 'all'


[docs]class MISSING: """ Stub for Exists validator to pass if the value is missing (e.g. for dictionary keys). """ pass
def _reluctantly_translate(spec): # `translate()` can do it itself but some validators have the `implies` # attribute which can trigger instantiation of a BaseValidator subclass # before the translation function is ready. # # We don't want to defer its usage as far as we can because it is best # to fully build the validator in order to fail early. # # So this function is just a small barrier that prevents NameError # in some cases. if isinstance(spec, BaseValidator): return spec else: return translate(spec) class BaseValidator(object): error_class = ValidationError _default = NotImplemented negated = False def _combine(self, other, combinator): # XXX should we flatten same-logic one-item combs? if isinstance(other, type) and issubclass(other, BaseValidator): # e.g. Exists instead of Exists() raise TypeError('got {cls} class instead of its instance' .format(cls=other.__name__)) return combinator([self, _reluctantly_translate(other)]) def _merge(self, value): if value is not None: raise NoDefaultValue('value is not None') if self._default is NotImplemented: raise NoDefaultValue('self._default is not implemented') return self._default def __and__(self, other): return self._combine(other, All) def __or__(self, other): return self._combine(other, Any) def __eq__(self, other): return isinstance(other, type(self)) and self.__dict__ == other.__dict__ def __invert__(self): clone = copy.deepcopy(self) clone.negated = not self.negated return clone def __call__(self, value): try: self._check(value) except ValidationError: if self.negated: return else: raise else: if self.negated: self._raise_error(value) def __hash__(self): # TODO think this over and check Python docs #return hash(((k,v) for k,v in self.__dict__.items())) return hash('validator_'+str(self.__dict__)) def get_default_for(self, value, silent=True): try: return self._merge(value) except NoDefaultValue: if silent: return value else: raise def _check(self, value): raise NotImplementedError def _raise_error(self, value): raise self.error_class(repr(self)) class BaseCombinator(BaseValidator): error_class = CombinedValidationError break_on_first_fail = False _repr_tmpl = '{not_}({items})' _repr_items_sep = '; ' def __init__(self, specs, default=None, first_is_default=False): assert specs self._specs = [_reluctantly_translate(s) for s in specs] self._default = default self._first_is_default = first_is_default def _check(self, value): errors = [] for spec in self._specs: # TODO: group errors by exception type # TODO: try recursive validators after all flat ones are OK # (may be not a good idea because the order may matter) #if spec.is_recursive and errors: # # Don't collect nested errors if we already have one here. # # Another optimized strategy would be to fail early instead of # # trying to collect all exceptions for the node. # continue try: spec(value) except ValidationError as e: if self.break_on_first_fail: # don't even wrap the error raise errors.append(e) if not self.can_tolerate(errors): raise self.error_class(*errors) def __repr__(self): return self._repr_tmpl.format( cls=self.__class__.__name__, items=self._repr_items_sep.join(map(str, self._specs)), not_='not ' if self.negated else '') def can_tolerate(self, errors): raise NotImplementedError def _merge(self, value): if self._default: return self._default defaults = [] for choice in self._specs: try: default = choice.get_default_for(value, silent=False) except NoDefaultValue: pass else: defaults.append(default) if not defaults: return value if len(defaults) == 1: return defaults[0] else: if self._first_is_default: return defaults[0] else: return value
[docs]class All(BaseCombinator): """ Requires that the value passes all nested validators. """ error_class = AtLeastOneFailed break_on_first_fail = True _repr_items_sep = ' and ' def can_tolerate(self, errors): # TODO: fail early, work as `or` does # (this also enables basic if-then in the schema) if not errors: return True
[docs]class Any(BaseCombinator): """ Requires that the value passes at least one of nested validators. """ error_class = AllFailed _repr_items_sep = ' or ' def can_tolerate(self, errors): if len(errors) < len(self._specs): return True
class BaseRequirement(BaseValidator): # a hint for combinators, see their code is_recursive = False implies = NotImplemented def __call__(self, value): if self.implies is not NotImplemented: self.implies(value) super(BaseRequirement, self).__call__(value) def _represent(self): return self.__dict__ def __repr__(self): return '{negated}{cls}({rep})'.format( cls=self.__class__.__name__, rep=self._represent(), negated='~' if self.negated else '')
[docs]class Anything(BaseRequirement): """ Any values passes validation. """ def _check(self, value): pass def _represent(self): return ''
[docs]class IsA(BaseRequirement): """ Requires that the value is an instance of given type. """ def __init__(self, expected_type, default=None): self.expected_type = expected_type self._default = default def _check(self, value): if not isinstance(value, self.expected_type): self._raise_error(value) def __repr__(self): s = 'must be {pattern_}'.format(pattern_=self.expected_type.__name__) if self.negated: s = 'not ({s})'.format(s=s) return s
[docs]class Equals(BaseRequirement): """ Requires that the value equals given expected value. """ def __init__(self, expected_value): self._expected_value = expected_value def _check(self, value): if self._expected_value != value: self._raise_error(value) def __repr__(self): s = 'must equal {pattern_!r}'.format(pattern_=self._expected_value) if self.negated: s = 'not ({s})'.format(s=s) return s @property def _default(self): return self._expected_value
[docs]class Contains(BaseRequirement): """ Requires that the value contains given expected value. """ def __init__(self, expected_value): self._expected_value = expected_value def _check(self, value): if self._expected_value not in value: self._raise_error(value) def __repr__(self): s = 'must contain {pattern_!r}'.format(pattern_=self._expected_value) if self.negated: s = 'not ({s})'.format(s=s) return s @property def _default(self): return self._expected_value
[docs]class Exists(BaseRequirement): """ Requires that the value exists. Obviously this only makes sense in special cases like dictionary keys; otherwise there's simply nothing to validate. Note that this is *not* a check against `None` or `False`. """ def __init__(self, default=None): self._default = default def _check(self, value): if value is MISSING: self._raise_error(value) def __repr__(self): if self.negated: return 'must not exist' else: return 'must exist'
class BaseListOf(BaseRequirement): """ The base class for validating lists. Supports different error toleration strategies which can be selected by subclasses. In many aspects this is similar to :class:`BaseCombinator`. """ implies = IsA(list) item_strategy = NotImplemented error_class = CombinedValidationError is_recursive = True def __init__(self, validator, default=None): self._nested_validator = translate(validator) self._default = default def _check(self, value): if not value: try: self._nested_validator(MISSING) except ValidationError as e: raise ValidationError('lacks item: {error}' .format(error=e)) errors = [] for i, nested_value in enumerate(value): try: self._nested_validator(nested_value) except ValidationError as e: annotated_error = ValidationError( 'item #{elem}: {error}'.format(elem=i, error=e)) if self.item_strategy == ITEM_STRATEGY_ALL: raise annotated_error errors.append(annotated_error) if self.can_tolerate(errors, value): return raise self.error_class(*errors) def can_tolerate(self, errors, value): if self.item_strategy == ITEM_STRATEGY_ALL: if errors: return False else: return True elif self.item_strategy == ITEM_STRATEGY_ANY: if len(errors) < len(value): return True else: return False else: raise ValueError('unknown strategy') def _represent(self): return repr(self._nested_validator) def _merge(self, value): """ Returns a list based on `value`: * missing required value is converted to an empty list; * missing required items are never created; * nested items are merged recursively. """ if not value: return [] if value is not None and not isinstance(value, list): # bogus value; will not pass validation but should be preserved return value item_spec = self._nested_validator return [x if x is None else item_spec.get_default_for(x) for x in value]
[docs]class ListOfAll(BaseListOf): """ Requires that the value is a `list` which items match given validator. Usage:: >>> v = ListOfAll(IsA(int) | IsA(str)) >>> v([123, 'hello']) >>> v([123, 'hello', 5.5]) Traceback (most recent call last): ... ValidationError: item #2: must be int or must be str """ error_class = AtLeastOneFailed item_strategy = ITEM_STRATEGY_ALL
[docs]class ListOfAny(BaseListOf): """ Same as :class:`ListOfAll` but tolerates invalid items as long as there is at least one valid among them. """ error_class = AllFailed item_strategy = ITEM_STRATEGY_ANY
ListOf = ListOfAll #@requirement(implies=[IsA(dict)], is_recursive=True, vars=['key', 'req']) #def dict_contains(ctx, value): # nested_value = value[ctx['key']] # ctx['req'](nested_value)
[docs]class DictOf(BaseRequirement): """ Requires that the value is a `dict` which items match given patterns. Usage:: >>> v = DictOf([ ... # key "name" must exist; its value must be a `str` ... (Equals('name'), IsA(str)), ... # key "age" may not exist; its value must be an `int` ... (Equals('age') | ~Exists(), IsA(int)), ... # there may be other `str` keys with `str` or `int` values ... (IsA(str), IsA(str) | IsA(int)), ... ]) >>> v({'name': 'John'}) >>> v({'name': 'John', 'age': 25}) >>> v({'name': 'John', 'age': 25.5}) Traceback (most recent call last): ... DictValueError: 'age' value must be int >>> v({'name': 'John', 'age': 25, 'note': 'custom field'}) >>> v({'name': 'John', 'age': 25, 'note': 5.5}) Traceback (most recent call last): ... DictValueError: 'note' value must be str or must be int Note that this validator supports :class:`Exists` to mark keys that can be missing. """ implies = IsA(dict) def __init__(self, pairs): self._pairs = pairs def _represent(self): return repr(self._pairs) def _check(self, value): value = value or {} validated_data_keys = [] missing_key_specs = [] for k_validator, v_validator in self._pairs: # NOTE kspec.datatype can be None => any key of any datatype # NOTE kspec.default can be None => any key of given datatype # gather data keys that match given kspec; # then validate them against vspec matched = False for k,v in value.items(): if k in validated_data_keys: continue # check if this key is described by current key validator; # if it isn't, just skip it (and try another validator # on it later on) try: k_validator(k) except (TypeError, ValidationError): continue # this key *is* described by current value validator; # validate the value (it *must* validate) try: v_validator(v) except (ValidationError, TypeError) as e: if isinstance(e, DictValueError): msg = 'in {k!r} ({e})' else: msg = '{k!r} value {e}' raise DictValueError(msg.format(k=k, e=e)) validated_data_keys.append(k) matched = True # if not matched and not k_validator.optional: if not matched: try: k_validator(MISSING) except ValidationError: missing_key_specs.append(k_validator) # TODO document that unknown keys are checked before missing ones # check if there are data keys that did not match any key spec; # if yes, raise InvalidKey for them if len(validated_data_keys) < len(value): invalid_keys = set(value) - set(validated_data_keys) raise InvalidKeys(*invalid_keys) if missing_key_specs: # XXX this prints validators, not keys as strings; # one exception is the Equals validator from which we get # the expected value via internal API. And that's gross. reprs = (spec._expected_value if isinstance(spec, Equals) else spec for spec in missing_key_specs) raise MissingKeys(*reprs) def _merge(self, value): """ Returns a dictionary based on `value` with each value recursively merged with `spec`. """ if value is not None and not isinstance(value, dict): # bogus value; will not pass validation but should be preserved return value if not self._pairs: return {} collected = {} # collected.update(value) for k_validator, v_validator in self._pairs: k_default = k_validator.get_default_for(None) if k_default is None: continue # even None is ok if value: v_for_this_k = value.get(k_default) else: v_for_this_k = None v_default = v_validator.get_default_for(v_for_this_k) collected.update({k_default: v_default}) if value: for k, v in value.items(): if k not in collected: collected[k] = v return collected
[docs]class InRange(BaseRequirement): """ Requires that the numeric value is in given boundaries. """ implies = IsA(int) | IsA(float) def __init__(self, min=None, max=None, default=NotImplemented): self._min = min self._max = max if default is not NotImplemented: self._default = default def _check(self, value): if self._min is not None and self._min > value: self._raise_error(value) if self._max is not None and self._max < value: self._raise_error(value) def __repr__(self): if self.negated: must = 'must not' else: must = 'must' def _fmt(x): return '' if x is None else x return '{must} belong to {min_}..{max_}'.format( must=must, min_=_fmt(self._min), max_=_fmt(self._max))
[docs]class HasAttr(BaseRequirement): """ Requires that the value has given attribute. """ def __init__(self, attr_name): self._attr_name = attr_name def _check(self, value): if not hasattr(value, self._attr_name): self._raise_error(value) def __repr__(self): if self.negated: must = 'must not' else: must = 'must' return '{must} have attribute {name!r}'.format( must=must, name=self._attr_name)
[docs]class Length(InRange): """ Requires that the value length is in given boundaries. """ implies = HasAttr('__len__') def _check(self, value): try: super(Length, self)._check(len(value)) except ValidationError as e: self._raise_error(value) def __repr__(self): if self.negated: must = 'must not' else: must = 'must' def _fmt(x): return '' if x is None else x return '{must} have length of {min_}..{max_}'.format( must=must, min_=_fmt(self._min), max_=_fmt(self._max))
[docs]def translate(value): """ Translates given schema from "pythonic" syntax to a validator. Usage:: >>> translate(str) IsA(str) >>> translate('hello') IsA(str, default='hello') """ if isinstance(value, BaseValidator): return value if value is None: return Anything() if isinstance(value, type): return IsA(value) if type(value) in compat.func_types: real_value = value() return IsA(type(real_value), default=real_value) if isinstance(value, list): if value == []: # no inner spec, just an empty list as the default value return IsA(list) elif len(value) == 1: # the only item as spec for each item of the collection return ListOf(translate(value[0])) else: raise StructureSpecificationError( 'Expected a list containing exactly 1 item; ' 'got {cnt}: {spec}'.format(cnt=len(value), spec=value)) if isinstance(value, dict): if not value: return IsA(dict) items = [] for k, v in value.items(): if isinstance(k, BaseValidator): k_validator = k else: k_validator = translate(k) default = k_validator.get_default_for(None) if default is not None: k_validator = Equals(default) v_validator = translate(v) items.append((k_validator, v_validator)) return DictOf(items) return IsA(type(value), default=value)