# coding: utf-8
import re
import sre_constants
import itertools

from . import registry
from .scope import ResolutionScope
from ._compat import iteritems, iterkeys, itervalues, string_types, OrderedDict


def _validate_regex(regex):
    :type regex: str
    :raises: ValueError
    except sre_constants.error as e:
        raise ValueError('Invalid regular expression: {0}'.format(e))

[docs]class BaseField(object): """A base class for fields in a JSL :class:`.document.Document`. Instances of this class may be added to a document to define its properties. :param required: If the field is required, defaults to False. """ def __init__(self, required=False): self.required = required def get_definitions_and_schema(self, scope=ResolutionScope(), ordered=False, ref_documents=None): # pragma: no cover """Returns a tuple of two elements. The second element is a JSON schema of the data described by this field, and the first is a dictionary containing definitions that are referenced from the field schema. :arg ordered: If True, the resulting schema is an OrderedDict and its properties are ordered in a sensible way, which makes it more readable. :arg scope: Current resolution scope. :type scope: :class:`.scope.ResolutionScope` :arg ref_documents: If subclass of :class:`Document` is in this set, all :class:`DocumentField` s pointing to it will be resolved to a reference: ``{"$ref": "#/definitions/..."}``. Note: resulting definitions will not contain schema for this document. :type ref_documents: set :rtype: (dict, dict) """ raise NotImplementedError()
[docs] def get_schema(self): """Returns a JSON schema (draft v4) of the data described by this field. :arg ordered: If True, the resulting schema is an OrderedDict and its properties are ordered in a sensible way, which makes it more readable. """ definitions, schema = self.get_definitions_and_schema() if definitions: schema['definitions'] = definitions return schema
def iter_fields(self): return iter([]) def walk(self, through_document_fields=False, visited_documents=frozenset()): """Yields nested fields in a DFS order.""" yield self for field in self.iter_fields(): for field_ in field.walk(through_document_fields=through_document_fields, visited_documents=visited_documents): yield field_
[docs]class BaseSchemaField(BaseField): """A base class for fields that directly map to JSON Schema validator. :param required: If the field is required, defaults to False. :param default: The default value for this field. May be a callable. :param enum: A list of valid choices. May be a callable. :param title: A short explanation about the purpose of the data described by this field. :param description: A detailed explanation about the purpose of the data described by this field. """ def __init__(self, id='', default=None, enum=None, title=None, description=None, **kwargs): = id self.title = title self.description = description self._enum = enum self._default = default super(BaseSchemaField, self).__init__(**kwargs) @property def enum(self): enum = self._enum if callable(self._enum): enum = self._enum() return enum @property def default(self): default = self._default if callable(self._default): default = self._default() return default def _update_schema_with_common_fields(self, schema, id=''): if id: schema['id'] = id if self.title is not None: schema['title'] = self.title if self.description is not None: schema['description'] = self.description if self.enum: schema['enum'] = list(self.enum) if self._default is not None: schema['default'] = self.default return schema
[docs]class BooleanField(BaseSchemaField): """A boolean field.""" def get_definitions_and_schema(self, scope=ResolutionScope(), ordered=False, ref_documents=None): schema = (OrderedDict if ordered else dict)(type='boolean') id, scope = scope.alter( schema = self._update_schema_with_common_fields(schema, id=id) return {}, schema
[docs]class StringField(BaseSchemaField): """A string field. :param pattern: A regular expression (ECMA 262) that a string value must match. :type pattern: string :param format: A semantic format of the string (for example, "date-time", "email", or "uri"). :type format: string :param min_length: A minimum length. :type min_length: int :param max_length: A maximum length. :type max_length: int """ _FORMAT = None def __init__(self, pattern=None, format=None, min_length=None, max_length=None, **kwargs): self.pattern = pattern if self.pattern is not None: _validate_regex(self.pattern) self.format = format or self._FORMAT self.max_length = max_length self.min_length = min_length super(StringField, self).__init__(**kwargs) def get_definitions_and_schema(self, scope=ResolutionScope(), ordered=False, ref_documents=None): schema = (OrderedDict if ordered else dict)(type='string') id, scope = scope.alter( schema = self._update_schema_with_common_fields(schema, id=id) if self.pattern: schema['pattern'] = self.pattern if self.min_length is not None: schema['minLength'] = self.min_length if self.max_length is not None: schema['maxLength'] = self.max_length if self.format is not None: schema['format'] = self.format return {}, schema
[docs]class EmailField(StringField): """An email field.""" _FORMAT = 'email'
[docs]class IPv4Type(StringField): """An IPv4 field.""" _FORMAT = 'ipv4'
[docs]class DateTimeField(StringField): """An ISO 8601 formatted date-time field.""" _FORMAT = 'date-time'
[docs]class UriField(StringField): """A URI field.""" _FORMAT = 'uri'
[docs]class NumberField(BaseSchemaField): """A number field. :param multiple_of: A value must be a multiple of this factor. :param minimum: A minimum allowed value. :param exclusive_minimum: Whether a value is allowed to exactly equal the minimum. :param maximum: A maximum allowed value. :param exclusive_maximum: Whether a value is allowed to exactly equal the maximum. """ _NUMBER_TYPE = 'number' def __init__(self, multiple_of=None, minimum=None, maximum=None, exclusive_minimum=False, exclusive_maximum=False, **kwargs): self.multiple_of = multiple_of self.minimum = minimum self.exclusive_minimum = exclusive_minimum self.maximum = maximum self.exclusive_maximum = exclusive_maximum super(NumberField, self).__init__(**kwargs) def get_definitions_and_schema(self, scope=ResolutionScope(), ordered=False, ref_documents=None): schema = (OrderedDict if ordered else dict)(type=self._NUMBER_TYPE) id, scope = scope.alter( schema = self._update_schema_with_common_fields(schema, id=id) if self.multiple_of is not None: schema['multipleOf'] = self.multiple_of if self.minimum is not None: schema['minimum'] = self.minimum if self.exclusive_minimum: schema['exclusiveMinumum'] = True if self.maximum is not None: schema['maximum'] = self.maximum if self.exclusive_maximum: schema['exclusiveMaximum'] = True return {}, schema
[docs]class IntField(NumberField): """An integer field.""" _NUMBER_TYPE = 'integer'
[docs]class ArrayField(BaseSchemaField): """An array field. :param items: Either of the following: * :class:`BaseField` -- all items of the array must match the field schema; * a list or a tuple of :class:`BaseField` s -- all items of the array must be valid according to the field schema at the corresponding index (tuple typing). :param min_items: A minimum length of an array. :type min_items: int :param max_items: A maximum length of an array. :type max_items: int :param unique_items: Whether all the values in the array must be distinct. :type unique_items: bool :param additional_items: If the value of ``items`` is a list or a tuple, and the array length is larger than the number of fields in ``items``, then the additional items are described by the :class:`BaseField` passed using this argument. :type unique_items: bool or :class:`BaseField` """ def __init__(self, items, min_items=None, max_items=None, unique_items=False, additional_items=None, **kwargs): self.items = items self.min_items = min_items self.max_items = max_items self.unique_items = unique_items self.additional_items = additional_items super(ArrayField, self).__init__(**kwargs) def get_definitions_and_schema(self, scope=ResolutionScope(), ordered=False, ref_documents=None): id, scope = scope.alter( if isinstance(self.items, (list, tuple)): nested_definitions = {} nested_schema = [] for item in self.items: item_definitions, item_schema = item.get_definitions_and_schema( scope=scope, ordered=ordered, ref_documents=ref_documents) nested_definitions.update(item_definitions) nested_schema.append(item_schema) else: nested_definitions, nested_schema = self.items.get_definitions_and_schema( scope=scope, ordered=ordered, ref_documents=ref_documents) schema = (OrderedDict if ordered else dict)(type='array') schema = self._update_schema_with_common_fields(schema, id=id) schema['items'] = nested_schema if self.additional_items is not None: if isinstance(self.additional_items, bool): schema['additionalItems'] = self.additional_items else: items_definitions, items_schema = self.additional_items.get_definitions_and_schema( scope=scope, ordered=ordered, ref_documents=ref_documents) schema['additionalItems'] = items_schema nested_definitions.update(items_definitions) if self.min_items is not None: schema['minItems'] = self.min_items if self.max_items is not None: schema['maxItems'] = self.max_items if self.unique_items: schema['uniqueItems'] = True return nested_definitions, schema def iter_fields(self): if isinstance(self.items, (list, tuple)): for field in self.items: yield field else: yield self.items if isinstance(self.additional_items, BaseField): yield self.additional_items
[docs]class DictField(BaseSchemaField): """A dictionary field. :param properties: A dictionary containing fields. :type properties: dict from str to :class:`BaseField` :param pattern_properties: A dictionary whose keys are regular expressions (ECMA 262). Properties match against these regular expressions, and for any that match, the property is described by the corresponding field schema. :type pattern_properties: dict from str to :class:`BaseField` :param additional_properties: Describes properties that are not described by the ``properties`` or ``pattern_properties``. :type additional_properties: bool or :class:`BaseField` :param min_properties: A minimum number of properties. :param max_properties: A maximum number of properties """ def __init__(self, properties=None, pattern_properties=None, additional_properties=None, min_properties=None, max_properties=None, **kwargs): = properties self.pattern_properties = pattern_properties self.additional_properties = additional_properties self.min_properties = min_properties self.max_properties = max_properties super(DictField, self).__init__(**kwargs) def _process_properties(self, properties, scope, ordered=False, ref_documents=None): nested_definitions = {} schema = OrderedDict() if ordered else {} required = [] for prop, field in iteritems(properties): field_definitions, field_schema = field.get_definitions_and_schema( scope=scope, ordered=ordered, ref_documents=ref_documents) if field.required: required.append(prop) schema[prop] = field_schema nested_definitions.update(field_definitions) return nested_definitions, required, schema def get_definitions_and_schema(self, scope=ResolutionScope(), ordered=False, ref_documents=None): nested_definitions = {} schema = (OrderedDict if ordered else dict)(type='object') id, scope = scope.alter( schema = self._update_schema_with_common_fields(schema, id=id) if is not None: properties_definitions, properties_required, properties_schema = self._process_properties(, scope, ordered=ordered, ref_documents=ref_documents) schema['properties'] = properties_schema if properties_required: schema['required'] = properties_required nested_definitions.update(properties_definitions) if self.pattern_properties is not None: for key in iterkeys(self.pattern_properties): _validate_regex(key) properties_definitions, _, properties_schema = self._process_properties( self.pattern_properties, scope, ordered=ordered, ref_documents=ref_documents) schema['patternProperties'] = properties_schema nested_definitions.update(properties_definitions) if self.additional_properties is not None: if isinstance(self.additional_properties, bool): schema['additionalProperties'] = self.additional_properties else: properties_definitions, properties_schema = self.additional_properties.get_definitions_and_schema( scope=scope, ordered=ordered, ref_documents=ref_documents) schema['additionalProperties'] = properties_schema nested_definitions.update(properties_definitions) if self.min_properties is not None: schema['minProperties'] = self.min_properties if self.max_properties is not None: schema['maxProperties'] = self.max_properties return nested_definitions, schema def iter_fields(self): fields_to_visit = [] if is not None: fields_to_visit.append(itervalues( if self.pattern_properties is not None: fields_to_visit.append(itervalues(self.pattern_properties)) if self.additional_properties is not None and isinstance(self.additional_properties, BaseField): fields_to_visit.append([self.additional_properties]) return itertools.chain(*fields_to_visit)
class BaseOfField(BaseSchemaField): _KEYWORD = None def __init__(self, fields, **kwargs): self.fields = list(fields) super(BaseOfField, self).__init__(**kwargs) def get_definitions_and_schema(self, scope=ResolutionScope(), ordered=False, ref_documents=None): nested_definitions = {} one_of = [] id, scope = scope.alter( for field in self.fields: field_definitions, field_schema = field.get_definitions_and_schema( scope=scope, ordered=ordered, ref_documents=ref_documents) nested_definitions.update(field_definitions) one_of.append(field_schema) schema = OrderedDict() if ordered else {} schema[self._KEYWORD] = one_of schema = self._update_schema_with_common_fields(schema, id=id) return nested_definitions, schema def iter_fields(self): return iter(self.fields)
[docs]class OneOfField(BaseOfField): """ :param fields: a list of fields, exactly one of which describes the data :type fields: list of :class:`BaseField` """ _KEYWORD = 'oneOf'
[docs]class AnyOfField(BaseOfField): """ :param fields: a list of fields, at least one of which describes the data :type fields: list of :class:`BaseField` """ _KEYWORD = 'anyOf'
[docs]class AllOfField(BaseOfField): """ :param fields: a list of fields, all of which describe the data :type fields: list of :class:`BaseField` """ _KEYWORD = 'allOf'
[docs]class NotField(BaseSchemaField): """ :param field: a field to negate :type field: :class:`BaseField` """ def __init__(self, field, **kwargs): self.field = field super(NotField, self).__init__(**kwargs) def get_definitions_and_schema(self, scope=ResolutionScope(), ordered=False, ref_documents=None): id, scope = scope.alter( field_definitions, field_schema = self.field.get_definitions_and_schema( scope=scope, ordered=ordered, ref_documents=ref_documents) schema = OrderedDict() if ordered else {} schema['not'] = field_schema schema = self._update_schema_with_common_fields(schema, id=id) return field_definitions, schema
[docs]class DocumentField(BaseField): """A reference to a nested document. :param document_cls: A string (dot-separated path to document class, i.e. 'app.resources.User'), :data:`RECURSIVE_REFERENCE_CONSTANT` or a :class:`Document` :param as_ref: If true, ``document_cls``'s schema is placed into the definitions section, and the field schema is just a reference to it: ``{"$ref": "#/definitions/..."}``. Makes a resulting schema more readable. """ def __init__(self, document_cls, as_ref=False, **kwargs): self._document_cls = document_cls self.owner_cls = None self.as_ref = as_ref super(DocumentField, self).__init__(**kwargs) def iter_fields(self): return self.document_cls.iter_fields() def walk(self, through_document_fields=False, visited_documents=frozenset()): yield self if through_document_fields and self.document_cls not in visited_documents: visited_documents = visited_documents | set([self.document_cls]) for field in super(DocumentField, self).walk(through_document_fields=through_document_fields, visited_documents=visited_documents): yield field def get_definitions_and_schema(self, scope=ResolutionScope(), ordered=False, ref_documents=None): definition_id = self.document_cls._get_definition_id() if ref_documents and self.document_cls in ref_documents: return {}, scope.create_ref(definition_id) else: document_definitions, document_schema = self.document_cls.get_definitions_and_schema( scope=scope, ordered=ordered, ref_documents=ref_documents) if self.as_ref: document_definitions[definition_id] = document_schema return document_definitions, scope.create_ref(definition_id) else: return document_definitions, document_schema def set_owner(self, owner_cls): self.owner_cls = owner_cls @property def document_cls(self): if isinstance(self._document_cls, string_types): if self._document_cls == RECURSIVE_REFERENCE_CONSTANT: if self.owner_cls is None: raise ValueError('owner_cls is not set') return self.owner_cls else: try: return registry.get_document(self._document_cls) except KeyError: if self.owner_cls is None: raise ValueError('owner_cls is not set') return registry.get_document(self._document_cls, module=self.owner_cls.__module__) else: return self._document_cls