diff --git a/src/jsonschema_colander/meta.py b/src/jsonschema_colander/meta.py index 98207bc..b7ccc62 100644 --- a/src/jsonschema_colander/meta.py +++ b/src/jsonschema_colander/meta.py @@ -1,11 +1,13 @@ import abc import colander +import typing as t from functools import cached_property from types import MappingProxyType -import typing as t +from .validators import JS_Schema_Validator try: from deform.schema import default_widget_makers + READONLY_WIDGET = True except ImportError: default_widget_makers = {} @@ -16,16 +18,16 @@ class Path(str): fragments: t.Sequence[str] - def __new__(cls, value: t.Union[str, 'Path']): + def __new__(cls, value: t.Union[str, "Path"]): if isinstance(value, Path): return value # idempotency string = super().__new__(cls, value) - string.fragments = string.split('.') + string.fragments = string.split(".") return string def resolve(self, node: t.Mapping[str, t.Any]) -> t.Any: for stub in self.fragments: - node = node[stub] + node = node[stub] return node def missing(self): @@ -34,11 +36,12 @@ def deferred_missing(node, kw): """in order to work, you need to bind the schema with 'data'. 'data' being the equivalent of the appstruct. """ - if data := kw.get('data'): + if data := kw.get("data"): try: return self.resolve(data) except KeyError: return None + return deferred_missing @classmethod @@ -47,11 +50,11 @@ def create(cls, parent, name: str): return cls(name or "") if name: if parent.__path__: - return cls(f'{parent.__path__}.{name}') + return cls(f"{parent.__path__}.{name}") return cls(name) if parent.__path__: return cls(parent.__path__) - raise NameError('Unnamed field with no parent.') + raise NameError("Unnamed field with no parent.") class DefinitionsHolder: @@ -61,9 +64,17 @@ class DefinitionsHolder: class JSONField(abc.ABC): supported: t.ClassVar[set] ignore: t.ClassVar[set] = { - 'name', 'type', 'title', 'description', 'anyOf', 'if', 'then' + "name", + "type", + "title", + "description", + "anyOf", + "if", + "then", + "dependentSchemas", + "allOf", } - allowed: t.ClassVar[set] = {'default'} + allowed: t.ClassVar[set] = {"default"} type: str name: str @@ -74,25 +85,25 @@ class JSONField(abc.ABC): required: bool readonly: bool __path__: str - parent: t.Optional['JSONField'] = None + parent: t.Optional["JSONField"] = None factory: t.Optional[t.Callable] = None config: t.Optional[t.Mapping] = None - def __init__(self, - type: str, - name: str, - required: bool, - validators: t.List, - attributes: t.Dict, - *, - label: str = '', - description: str = '', - config: t.Optional[t.Mapping] = None, - parent: t.Optional['JSONField'] = None - ): + def __init__( + self, + type: str, + name: str, + required: bool, + validators: t.List, + attributes: t.Dict, + *, + label: str = "", + description: str = "", + config: t.Optional[t.Mapping] = None, + parent: t.Optional["JSONField"] = None, + ): if type not in self.supported: - raise TypeError( - f'{self.__class__} does not support the {type} type.') + raise TypeError(f"{self.__class__} does not support the {type} type.") self.__path__ = Path.create(parent, name) @@ -113,7 +124,7 @@ def __init__(self, 'readonly', self.config.get('readonly', False) ) self.validators = validators - if validators := self.fieldconf.get('validators'): + if validators := self.fieldconf.get("validators"): self.validators.extend(validators) self.attributes = attributes self.parent = parent @@ -135,21 +146,21 @@ def get_options(self): options = { **self.attributes, - 'name': self.name, - 'title': self.label, - 'description': self.description, - 'missing': missing + "name": self.name, + "title": self.label, + "description": self.description, + "missing": missing, +# "oid": self.name, } if len(self.validators) > 1: - options['validator'] = colander.All(*self.validators) + options["validator"] = colander.All(*self.validators) elif len(self.validators) == 1: - options['validator'] = self.validators[0] + options["validator"] = self.validators[0] return options @abc.abstractmethod def get_factory(self): - """Returns the colander type needed for the schema node. - """ + """Returns the colander type needed for the schema node.""" pass def get_widget(self, factory, options): @@ -173,27 +184,35 @@ def extract(cls, params: dict, available: set) -> t.Tuple[t.List, t.Dict]: @classmethod def from_json( - cls, - params: dict, - *, - parent: t.Optional['JSONField'] = None, - name: t.Optional[str] = None, - config: t.Optional[dict] = None, - required: bool = False + cls, + params: dict, + *, + parent: t.Optional["JSONField"] = None, + name: t.Optional[str] = None, + config: t.Optional[dict] = None, + required: bool = False, ): available = set(params.keys()) if illegal := ((available - cls.ignore) - cls.allowed): - raise NotImplementedError( - f'Unsupported attributes: {illegal} for {cls}.') + raise NotImplementedError(f"Unsupported attributes: {illegal} for {cls}.") validators, attributes = cls.extract(params, available) + + if dependentSchemas := params.get('dependentSchemas'): + validators.append( + JS_Schema_Validator('dependentSchemas', dependentSchemas)) + + if allOf := params.get('allOf'): + validators.append( + JS_Schema_Validator('allOf', allOf)) + return cls( - params['type'], + params["type"], name, required, validators, attributes, parent=parent, config=config, - label=params.get('title'), - description=params.get('description') + label=params.get("title"), + description=params.get("description"), ) diff --git a/src/jsonschema_colander/types.py b/src/jsonschema_colander/types.py index 26147a4..fee9995 100644 --- a/src/jsonschema_colander/types.py +++ b/src/jsonschema_colander/types.py @@ -161,7 +161,7 @@ class Array(JSONField): supported = {'array'} allowed = { - 'items', 'minItems', 'maxItems', 'default', 'definitions' + 'items', 'minItems', 'maxItems', 'default', 'definitions', 'enum', 'format', } subfield: Optional[JSONField] = None @@ -210,6 +210,8 @@ def extract(cls, params: Mapping, available: set): min=params.get('minItems', -1), max=params.get('maxItems', -1) )) + if 'enum' in available: + attributes['choices'] = [(v, v) for v in params['enum']] if 'default' in available: attributes['default'] = params['default'] return validators, attributes diff --git a/src/jsonschema_colander/validators.py b/src/jsonschema_colander/validators.py index 90532ac..2bc2c7b 100644 --- a/src/jsonschema_colander/validators.py +++ b/src/jsonschema_colander/validators.py @@ -1,16 +1,18 @@ import math import colander +from jsonschema import validate, ValidationError class NumberRange(colander.Range): - - def __init__(self, - min=None, - max=None, - exclusive_min=None, - exclusive_max=None, - min_err=colander.Range._MIN_ERR, - max_err=colander.Range._MAX_ERR): + def __init__( + self, + min=None, + max=None, + exclusive_min=None, + exclusive_max=None, + min_err=colander.Range._MIN_ERR, + max_err=colander.Range._MAX_ERR, + ): self.exclusive_min = exclusive_min self.exclusive_max = exclusive_max super().__init__(min=min, max=max, min_err=min_err, max_err=max_err) @@ -18,28 +20,83 @@ def __init__(self, def __call__(self, node, value): if value is not None and not math.isnan(value): if self.min is not None and value < self.min: - raise colander.Invalid(node, colander._( - self.min_err, mapping={ - 'val': value, 'min': self.min - } - )) - elif self.exclusive_min is not None and \ - value <= self.exclusive_min: - raise colander.Invalid(node, colander._( - self.min_err, mapping={ - 'val': value, 'min': self.exclusive_min - } - )) + raise colander.Invalid( + node, + colander._(self.min_err, mapping={"val": value, "min": self.min}), + ) + elif self.exclusive_min is not None and value <= self.exclusive_min: + raise colander.Invalid( + node, + colander._( + self.min_err, mapping={"val": value, "min": self.exclusive_min} + ), + ) if self.max is not None and value > self.max: - raise colander.Invalid(node, colander._( - self.max_err, mapping={ - 'val': value, 'max': self.max - } - )) - elif self.exclusive_max is not None and \ - value >= self.exclusive_max: - raise colander.Invalid(node, colander._( - self.max_err, mapping={ - 'val': value, 'max': self.exclusive_max - } - )) + raise colander.Invalid( + node, + colander._(self.max_err, mapping={"val": value, "max": self.max}), + ) + elif self.exclusive_max is not None and value >= self.exclusive_max: + raise colander.Invalid( + node, + colander._( + self.max_err, mapping={"val": value, "max": self.exclusive_max} + ), + ) + + +def node_json_traverser(node, stack): + if not stack: + return node + if children := getattr(node, "children", None): + name, stack = stack[0], stack[1:] + if isinstance(name, str): + for child in children: + if child.name == name: + return node_json_traverser(child, stack) + elif isinstance(name, int): + assert len(children) == 1 + items = children[0] + assert items.name == "items" + if not stack: + return node + return node_json_traverser(items, stack) + + raise LookupError("Node not found") + + +def node_json_error(error, node, stack): + if not stack: + return error + if children := getattr(node, "children", None): + name, stack = stack[0], stack[1:] + if isinstance(name, str): + for num, child in enumerate(children): + if child.name == name: + suberror = colander.Invalid(child) + error.add(suberror, num) + return node_json_error(suberror, child, stack) + elif isinstance(name, int): + assert len(children) == 1 + items = children[0] + assert items.name == "items" + if not stack: + return error + return node_json_error(error, items, stack) + + raise LookupError("Node not found") + + +class JS_Schema_Validator: + def __init__(self, key, jsonschema): + self.jsonschema = {"type": "object", key: jsonschema} + + def __call__(self, node, value, **kwargs): + """Prevent duplicate usernames.""" + try: + validate(value, self.jsonschema) + except ValidationError as e: + base_error = colander.Invalid(node) + error = node_json_error(base_error, node, list(e.path) or e.validator_value) + error.msg = e.message + raise base_error diff --git a/tests/conftest.py b/tests/conftest.py index 028e9f9..2aaa3c5 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -42,3 +42,17 @@ def refs_and_defs_schema(request): path = pathlib.Path(__file__).parent / 'refs_defs.json' with path.open('r') as fp: return json.load(fp) + + +@pytest.fixture(scope="session") +def extended_validation_schema(request): + path = pathlib.Path(__file__).parent / 'extended_validation.json' + with path.open('r') as fp: + return json.load(fp) + + +@pytest.fixture(scope="session") +def extended_validation_schema_all_of(request): + path = pathlib.Path(__file__).parent / 'extended_validation_allOf.json' + with path.open('r') as fp: + return json.load(fp) diff --git a/tests/extended_validation.json b/tests/extended_validation.json new file mode 100644 index 0000000..f1d2db2 --- /dev/null +++ b/tests/extended_validation.json @@ -0,0 +1,34 @@ +{ + "type": "object", + "properties": { + "grund": { + "type": "string", + "enum": [ + "Geschäftsaufgabe", + "Betriebsüberweisung an einen anderen Unfallversicherungsträger", + "AGrund" + ], + "format": "radio", + "title": "Warum möchten Sie die Versicherung kündigen?" + }, + "grund2": { + "type": "string", + "format": "textarea", + "title": "Bitte geben Sie einen Grund für Ihre Kündigung an" + } + }, + "dependentSchemas": { + "grund": { + "allOf": [ + { + "if": { + "properties": { + "grund": {"const": "AGrund"} + } + }, + "then": {"required": ["grund2"]} + } + ] + } + } +} diff --git a/tests/extended_validation_allOf.json b/tests/extended_validation_allOf.json new file mode 100644 index 0000000..e48a9b7 --- /dev/null +++ b/tests/extended_validation_allOf.json @@ -0,0 +1,38 @@ +{ + "type": "object", + "properties": { + "familienstand": { + "type": "string", + "enum": [ + "ledig", + "verheiratet" + ], + "format": "radio", + "title": "Familienstand" + }, + "anschrift-ehepartner": { + "type": "string", + "format": "textarea", + "title": "Name und Anschrift des Ehepartners" + } + }, + "allOf": [ + { + "if": { + "properties": { + "familienstand": { + "const": "verheiratet" + } + } + }, + "then": { + "required": [ + "anschrift-ehepartner" + ] + } + } + ], + "required": [ + "familienstand" + ] +} diff --git a/tests/fields/test_array.py b/tests/fields/test_array.py index 180072a..2a75ec1 100644 --- a/tests/fields/test_array.py +++ b/tests/fields/test_array.py @@ -29,6 +29,20 @@ def test_simple_array(): assert factory == colander.SequenceSchema +def test_array_enum(): + + field = Array.from_json({ + "type": "array", + "enum": [ + "A", + "B" + ] + }, name='test', required=True) + + factory = field.get_factory() + assert factory == colander.Set + + def test_array_length(): field = Array.from_json({ diff --git a/tests/test_extended_validation.py b/tests/test_extended_validation.py new file mode 100644 index 0000000..6aa9050 --- /dev/null +++ b/tests/test_extended_validation.py @@ -0,0 +1,74 @@ +import pytest +import hamcrest +import colander +import jsonschema +import jsonschema_colander.types + +""" +We handle object-typed schema. +They can be fields in a form or just a plain form. +They are created just like any other field. +""" + + +def test_extended_validation(extended_validation_schema): + + schema = jsonschema_colander.types.Object.from_json( + extended_validation_schema) + + hamcrest.assert_that( + schema.fields, + hamcrest.has_entries( + { + "grund": hamcrest.instance_of( + jsonschema_colander.types.String), + "grund2": hamcrest.instance_of( + jsonschema_colander.types.String), + } + ), + ) + + assert schema.fields["grund"].required is False + assert schema.fields["grund2"].required is False + + schema = schema().bind() + assert schema.deserialize({}) == {} + assert schema.deserialize({"grund": "test"}) == {"grund": "test"} + with pytest.raises(colander.Invalid): + assert schema.deserialize({"grund": "AGrund"}) == { + "grund": "AGrund" + } + assert schema.deserialize({"grund": "AGrund", "grund2": "test"}) == { + "grund": "AGrund", + "grund2": "test", + } + + +def test_extended_validation_all_of(extended_validation_schema_all_of): + schema = jsonschema_colander.types.Object.from_json( + extended_validation_schema_all_of + ) + hamcrest.assert_that( + schema.fields, + hamcrest.has_entries( + { + "familienstand": hamcrest.instance_of(jsonschema_colander.types.String), + "anschrift-ehepartner": hamcrest.instance_of( + jsonschema_colander.types.String + ), + } + ), + ) + + assert schema.fields["familienstand"].required is True + assert schema.fields["anschrift-ehepartner"].required is False + schema = schema().bind() + with pytest.raises(colander.Invalid): + assert schema.deserialize({}) == {} + with pytest.raises(colander.Invalid): + assert schema.deserialize({"familienstand": "verheiratet"}) == { + "familienstand": "verheiratet" + } + assert schema.deserialize({"familienstand": "verheiratet", "anschrift-ehepartner": 'TEST'}) == { + "familienstand": "verheiratet", "anschrift-ehepartner": "TEST" + }