Source code for jsl.fields

# coding: utf-8
from __future__ import unicode_literals

import re
import sre_constants
import itertools

from . import registry


RECURSIVE_REFERENCE_CONSTANT = 'self'


def _validate_regex(regex):
    """
    :type regex: str
    :raises: ValueError
    :return:
    """
    try:
        re.compile(regex)
    except sre_constants.error as e:
        raise ValueError('Invalid regular expression: {0}'.format(e))


[docs]class BaseField(object): """A base class for fields in a JSL :class:`.document.Document`. Instances of this class may be added to a document to define its properties. :param required: If the field is required, defaults to False. """ def __init__(self, required=False): self.required = required def get_definitions_and_schema(self, ref_documents=None): # pragma: no cover """Returns a tuple of two elements. The second element is a JSON schema of the data described by this field, and the first is a dictionary containing definitions that are referenced from the field schema. :arg definitions: Overrides some of the nested :class:`DocumentField`s schemas. If :class:`DocumentField`'s document definition id (see :meth:`get_definition_id`) is in this dictionary, its value will be used instead of the document's schema. :type definitions: dict :rtype: (dict, dict) """ raise NotImplementedError()
[docs] def get_schema(self): """Returns a JSON schema (draft v4) of the data described by this field.""" definitions, schema = self.get_definitions_and_schema() if definitions: schema['definitions'] = definitions return schema
def walk(self, through_document_fields=False, visited_documents=frozenset()): """Yields nested fields in a DFS order.""" yield self
[docs]class BaseSchemaField(BaseField): """A base class for fields that directly map to JSON Schema validator. :param required: If the field is required, defaults to False. :param default: The default value for this field. May be a callable. :param enum: A list of valid choices. May be a callable. :param title: A short explanation about the purpose of the data described by this field. :param description: A detailed explanation about the purpose of the data described by this field. """ def __init__(self, default=None, enum=None, title=None, description=None, **kwargs): self.title = title self.description = description self._enum = enum self._default = default super(BaseSchemaField, self).__init__(**kwargs) @property def enum(self): enum = self._enum if callable(self._enum): enum = self._enum() return enum @property def default(self): default = self._default if callable(self._default): default = self._default() return default def _get_common_schema_fields(self): rv = {} if self.title is not None: rv['title'] = self.title if self.description is not None: rv['description'] = self.description if self.enum: rv['enum'] = list(self.enum) if self._default is not None: rv['default'] = self.default return rv
[docs]class BooleanField(BaseSchemaField): """A boolean field.""" def get_definitions_and_schema(self, ref_documents=None): schema = {'type': 'boolean'} schema.update(self._get_common_schema_fields()) return {}, schema
[docs]class StringField(BaseSchemaField): """A string field. :param pattern: A regular expression (ECMA 262) that a string value must match. :type pattern: string :param format: A semantic format of the string (for example, "date-time", "email", or "uri"). :type format: string :param min_length: A minimum length. :type min_length: int :param max_length: A maximum length. :type max_length: int """ _FORMAT = None def __init__(self, pattern=None, format=None, min_length=None, max_length=None, **kwargs): self.pattern = pattern if self.pattern is not None: _validate_regex(self.pattern) self.format = format or self._FORMAT self.max_length = max_length self.min_length = min_length super(StringField, self).__init__(**kwargs) def get_definitions_and_schema(self, ref_documents=None): schema = {'type': 'string'} schema.update(self._get_common_schema_fields()) if self.pattern: schema['pattern'] = self.pattern if self.min_length is not None: schema['minLength'] = self.min_length if self.max_length is not None: schema['maxLength'] = self.max_length if self.format is not None: schema['format'] = self.format return {}, schema
[docs]class EmailField(StringField): """An email field.""" _FORMAT = 'email'
[docs]class IPv4Type(StringField): """An IPv4 field.""" _FORMAT = 'ipv4'
[docs]class DateTimeField(StringField): """An ISO 8601 formatted date-time field.""" _FORMAT = 'date-time'
[docs]class UriField(StringField): """A URI field.""" _FORMAT = 'uri'
[docs]class NumberField(BaseSchemaField): """A number field. :param multiple_of: A value must be a multiple of this factor. :param minimum: A minimum allowed value. :param exclusive_minimum: Whether a value is allowed to exactly equal the minimum. :param maximum: A maximum allowed value. :param exclusive_maximum: Whether a value is allowed to exactly equal the maximum. """ _NUMBER_TYPE = 'number' def __init__(self, multiple_of=None, minimum=None, maximum=None, exclusive_minimum=False, exclusive_maximum=False, **kwargs): self.multiple_of = multiple_of self.minimum = minimum self.exclusive_minimum = exclusive_minimum self.maximum = maximum self.exclusive_maximum = exclusive_maximum super(NumberField, self).__init__(**kwargs) def get_definitions_and_schema(self, ref_documents=None): schema = {'type': self._NUMBER_TYPE} schema.update(self._get_common_schema_fields()) if self.multiple_of is not None: schema['multipleOf'] = self.multiple_of if self.minimum is not None: schema['minimum'] = self.minimum if self.exclusive_minimum: schema['exclusiveMinumum'] = True if self.maximum is not None: schema['maximum'] = self.maximum if self.exclusive_maximum: schema['exclusiveMaximum'] = True return {}, schema
[docs]class IntField(NumberField): """An integer field.""" _NUMBER_TYPE = 'integer'
[docs]class ArrayField(BaseSchemaField): """An array field. :param items: Either of the following: * :class:`BaseField` -- all items of the array must match the field schema; * a list or a tuple of :class:`BaseField` s -- all items of the array must be valid according to the field schema at the corresponding index (tuple typing). :param min_items: A minimum length of an array. :type min_items: int :param max_items: A maximum length of an array. :type max_items: int :param unique_items: Whether all the values in the array must be distinct. :type unique_items: bool :param additional_items: If the value of ``items`` is a list or a tuple, and the array length is larger than the number of fields in ``items``, then the additional items are described by the schema in this property. :type unique_items: bool or :class:`BaseField` """ def __init__(self, items, min_items=None, max_items=None, unique_items=False, additional_items=None, **kwargs): self.items = items self.min_items = min_items self.max_items = max_items self.unique_items = unique_items self.additional_items = additional_items super(ArrayField, self).__init__(**kwargs) def get_definitions_and_schema(self, ref_documents=None): if isinstance(self.items, (list, tuple)): nested_definitions = {} nested_schema = [] for item in self.items: item_definitions, item_schema = item.get_definitions_and_schema( ref_documents=ref_documents) nested_definitions.update(item_definitions) nested_schema.append(item_schema) else: nested_definitions, nested_schema = self.items.get_definitions_and_schema( ref_documents=ref_documents) schema = { 'type': 'array', 'items': nested_schema, } schema.update(self._get_common_schema_fields()) if self.min_items is not None: schema['minItems'] = self.min_items if self.max_items is not None: schema['maxItems'] = self.max_items if self.unique_items: schema['uniqueItems'] = True if self.additional_items is not None: if isinstance(self.additional_items, bool): schema['additionalItems'] = self.additional_items else: items_definitions, items_schema = self.additional_items.get_definitions_and_schema( ref_documents=ref_documents) schema['additionalItems'] = items_schema nested_definitions.update(items_definitions) return nested_definitions, schema def walk(self, through_document_fields=False, visited_documents=frozenset()): yield self if isinstance(self.items, (list, tuple)): for field in self.items: for field_ in field.walk(through_document_fields=through_document_fields, visited_documents=visited_documents): yield field_ else: for field in self.items.walk(through_document_fields=through_document_fields, visited_documents=visited_documents): yield field
[docs]class DictField(BaseSchemaField): """A dictionary field. :param properties: A dictionary containing fields. :type properties: dict from str to :class:`BaseField` :param pattern_properties: A dictionary whose keys are regular expressions (ECMA 262). Properties match against these regular expressions, and for any that match, the property is described by the corresponding field schema. :type pattern_properties: dict from str to :class:`BaseField` :param additional_properties: Describes properties that are not described by the ``properties`` or ``pattern_properties``. :type additional_properties: bool or :class:`BaseField` :param min_properties: A minimum number of properties. :param max_properties: A maximum number of properties """ def __init__(self, properties=None, pattern_properties=None, additional_properties=None, min_properties=None, max_properties=None, **kwargs): self.properties = properties self.pattern_properties = pattern_properties self.additional_properties = additional_properties self.min_properties = min_properties self.max_properties = max_properties super(DictField, self).__init__(**kwargs) @staticmethod def _process_properties(properties, ref_documents=None): nested_definitions = {} schema = {} required = [] for prop, field in properties.iteritems(): field_definitions, field_schema = field.get_definitions_and_schema(ref_documents=ref_documents) if field.required: required.append(prop) schema[prop] = field_schema nested_definitions.update(field_definitions) return nested_definitions, required, schema def get_definitions_and_schema(self, ref_documents=None): nested_definitions = {} schema = {'type': 'object'} schema.update(self._get_common_schema_fields()) if self.properties is not None: properties_definitions, properties_required, properties_schema = self._process_properties( self.properties, ref_documents=ref_documents) schema['properties'] = properties_schema if properties_required: schema['required'] = properties_required nested_definitions.update(properties_definitions) if self.pattern_properties is not None: for key in self.pattern_properties.iterkeys(): _validate_regex(key) properties_definitions, _, properties_schema = self._process_properties( self.pattern_properties, ref_documents=ref_documents) schema['patternProperties'] = properties_schema nested_definitions.update(properties_definitions) if self.additional_properties is not None: if isinstance(self.additional_properties, bool): schema['additionalProperties'] = self.additional_properties else: properties_definitions, properties_schema = self.additional_properties.get_definitions_and_schema( ref_documents=ref_documents) schema['additionalProperties'] = properties_schema nested_definitions.update(properties_definitions) if self.min_properties is not None: schema['minProperties'] = self.min_properties if self.max_properties is not None: schema['maxProperties'] = self.max_properties return nested_definitions, schema def walk(self, through_document_fields=False, visited_documents=frozenset()): fields_to_visit = [] if self.properties is not None: fields_to_visit.append(self.properties.itervalues()) if self.pattern_properties is not None: fields_to_visit.append(self.pattern_properties.itervalues()) if self.additional_properties is not None and not isinstance(self.additional_properties, bool): fields_to_visit.append([self.additional_properties]) yield self for field in itertools.chain(*fields_to_visit): for field_ in field.walk(through_document_fields=through_document_fields, visited_documents=visited_documents): yield field_
class BaseOfField(BaseSchemaField): _KEYWORD = None def __init__(self, fields, **kwargs): self.fields = list(fields) super(BaseOfField, self).__init__(**kwargs) def get_definitions_and_schema(self, ref_documents=None): nested_definitions = {} one_of = [] for field in self.fields: field_definitions, field_schema = field.get_definitions_and_schema(ref_documents=ref_documents) nested_definitions.update(field_definitions) one_of.append(field_schema) schema = {self._KEYWORD: one_of} schema.update(self._get_common_schema_fields()) return nested_definitions, schema def walk(self, through_document_fields=False, visited_documents=frozenset()): yield self for field in self.fields: for field_ in field.walk(through_document_fields=through_document_fields, visited_documents=visited_documents): yield field_
[docs]class OneOfField(BaseOfField): """ :param fields: a list of fields, exactly one of which describes the data :type fields: list of :class:`BaseField` """ _KEYWORD = 'oneOf'
[docs]class AnyOfField(BaseOfField): """ :param fields: a list of fields, at least one of which describes the data :type fields: list of :class:`BaseField` """ _KEYWORD = 'anyOf'
[docs]class AllOfField(BaseOfField): """ :param fields: a list of fields, all of which describe the data :type fields: list of :class:`BaseField` """ _KEYWORD = 'allOf'
[docs]class NotField(BaseSchemaField): """ :param field: a field to negate :type field: :class:`BaseField` """ def __init__(self, field, **kwargs): self.field = field super(NotField, self).__init__(**kwargs) def get_definitions_and_schema(self, ref_documents=None): field_definitions, field_schema = self.field.get_definitions_and_schema( ref_documents=ref_documents) schema = {'not': field_schema} schema.update(self._get_common_schema_fields()) return field_definitions, schema
[docs]class DocumentField(BaseField): """A reference to a nested document. :param document_cls: A string (dot-separated path to document class, i.e. 'app.resources.User'), :data:`RECURSIVE_REFERENCE_CONSTANT` or a :class:`Document` :param as_ref: If true, ``document_cls``'s schema is placed into the definitions section, and the field schema is just a reference to it: ``{"$ref": "#/definitions/..."}``. Makes a resulting schema more readable. """ def __init__(self, document_cls, as_ref=False, **kwargs): self._document_cls = document_cls self.owner_cls = None self.as_ref = as_ref super(DocumentField, self).__init__(**kwargs) def walk(self, through_document_fields=False, visited_documents=frozenset()): yield self if through_document_fields and self.document_cls not in visited_documents: for field in self.document_cls.walk(through_document_fields=through_document_fields, visited_documents=visited_documents | set([self.document_cls])): yield field def get_definitions_and_schema(self, ref_documents=None): definition_id = self.document_cls._get_definition_id() if ref_documents and self.document_cls in ref_documents: return {}, {'$ref': '#/definitions/{0}'.format(definition_id)} else: document_definitions, document_schema = self.document_cls.get_definitions_and_schema( ref_documents=ref_documents) if self.as_ref: document_definitions[definition_id] = document_schema return document_definitions, {'$ref': '#/definitions/{0}'.format(definition_id)} else: return document_definitions, document_schema def set_owner(self, owner_cls): self.owner_cls = owner_cls @property def document_cls(self): if isinstance(self._document_cls, basestring): if self._document_cls == RECURSIVE_REFERENCE_CONSTANT: if self.owner_cls is None: raise ValueError('owner_cls is not set') return self.owner_cls else: try: return registry.get_document(self._document_cls) except KeyError: if self.owner_cls is None: raise ValueError('owner_cls is not set') return registry.get_document(self._document_cls, module=self.owner_cls.__module__) else: return self._document_cls