Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: Validate records with fastjsonschema #2066

Closed
Show file tree
Hide file tree
Changes from 40 commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
58ba902
added fastjsonschema
BuzzCutNorman Nov 13, 2023
56a1a02
working version bypassing errors
BuzzCutNorman Nov 14, 2023
538c44d
Merge branch 'feat-validate-json-schema-w-fastjsonschema' into 2045-v…
BuzzCutNorman Nov 22, 2023
4ac5475
updated benchmark to work with fastjsonschema
BuzzCutNorman Nov 22, 2023
cb793f2
update fastjsonschema to 2.19.0
BuzzCutNorman Nov 22, 2023
01be771
log jsonschema validation errors to info level
BuzzCutNorman Nov 22, 2023
0099672
Merge branch 'main' of https://github.com/BuzzCutNorman/sdk into 2045…
BuzzCutNorman Nov 28, 2023
bb07997
appended -00:00 to deleted_at element of bench_record
BuzzCutNorman Nov 28, 2023
0a89fbc
Merge branch 'main' of https://github.com/BuzzCutNorman/sdk into 2045…
BuzzCutNorman Nov 28, 2023
d64605a
attempt to match pr1471 Support custom JSON schema validation
BuzzCutNorman Nov 29, 2023
39d900c
Merge branch 'main' into 2045-validate-records-with-fastjsonschema
edgarrmondragon Nov 29, 2023
e788c20
Ignore missing fastjsonschema types
edgarrmondragon Nov 29, 2023
e7a7d5b
Apply suggestions from code review on 11/29
BuzzCutNorman Nov 30, 2023
dd69bea
remove -00:00 from deleted_at element of bench_record
BuzzCutNorman Nov 30, 2023
cd732a1
add test coverage for JsonSchemaValueException
BuzzCutNorman Nov 30, 2023
54a7376
ran poerty lock
BuzzCutNorman Nov 30, 2023
dc7cfd8
Merge branch 'main' into 2045-validate-records-with-fastjsonschema
edgarrmondragon Dec 1, 2023
bb0e5b2
change CHECK_RECORD_FORMATS to validate_field_string_format
BuzzCutNorman Dec 1, 2023
7c719de
update references to CHECK_RECORD_FORMATS to validate_field_string_fo…
BuzzCutNorman Dec 1, 2023
5a94ce6
added test for type checking on _validator initialization
BuzzCutNorman Dec 1, 2023
657969b
Apply suggestions from code review 12/1
BuzzCutNorman Dec 1, 2023
d1a8895
comment added to record_validator
BuzzCutNorman Dec 1, 2023
57f65d1
record_validator comment removed
BuzzCutNorman Dec 1, 2023
4e97ee0
first attempt at RecordValidator
BuzzCutNorman Dec 1, 2023
c532f3e
Merge branch 'main' of https://github.com/BuzzCutNorman/sdk into 2045…
BuzzCutNorman Dec 4, 2023
fc6a4af
Apply suggestions from first review of validator interface.
BuzzCutNorman Dec 4, 2023
f36f0d9
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Dec 4, 2023
7df254d
Merge branch 'main' into 2045-validate-records-with-fastjsonschema
BuzzCutNorman Dec 4, 2023
10445a6
Apply suggestions from code of validator interface.
BuzzCutNorman Dec 4, 2023
bf410b3
jsonschema validator interface class version 1
BuzzCutNorman Dec 4, 2023
59d9309
removed unnecessary check in _validate_and_parse
BuzzCutNorman Dec 4, 2023
4487f56
Move things around
edgarrmondragon Dec 4, 2023
6dbfba2
Update tests/core/sinks/test_type_checker.py
edgarrmondragon Dec 4, 2023
19f66ed
Add some `@overrides`
edgarrmondragon Dec 4, 2023
d2996fe
Merge branch 'main' into 2045-validate-records-with-fastjsonschema
BuzzCutNorman Dec 5, 2023
ebf3bfe
Merge branch 'main' of https://github.com/BuzzCutNorman/sdk into 2045…
BuzzCutNorman Dec 5, 2023
65d0578
added stop on field validation exception flag
BuzzCutNorman Dec 5, 2023
07c3339
Fix a few typos and rename flag
edgarrmondragon Dec 5, 2023
b2380b5
Merge branch 'main' into 2045-validate-records-with-fastjsonschema
edgarrmondragon Dec 5, 2023
a5be4c6
Merge branch 'main' into 2045-validate-records-with-fastjsonschema
edgarrmondragon Dec 6, 2023
5c1e944
Merge branch 'main' of https://github.com/BuzzCutNorman/sdk into 2045…
BuzzCutNorman Dec 11, 2023
eb591dc
Merge branch 'main' into 2045-validate-records-with-fastjsonschema
edgarrmondragon Jan 3, 2024
d79a096
Merge branch 'main' into 2045-validate-records-with-fastjsonschema
edgarrmondragon Jan 3, 2024
0a6d725
Merge branch 'main' of https://github.com/BuzzCutNorman/sdk into 2045…
BuzzCutNorman Jan 11, 2024
89ef305
working version post pr2136
BuzzCutNorman Jan 11, 2024
00829ac
Merge branch 'main' into 2045-validate-records-with-fastjsonschema
BuzzCutNorman Jan 11, 2024
21b48e0
change FastJSONSchemaValidator to JSONSchemaValidator
BuzzCutNorman Jan 11, 2024
e255fd5
raise a more generic message on schema validation error
BuzzCutNorman Jan 11, 2024
8adfd7c
Merge branch 'main' into 2045-validate-records-with-fastjsonschema
edgarrmondragon Jan 17, 2024
a727cae
Merge branch 'main' of https://github.com/BuzzCutNorman/sdk into 2045…
BuzzCutNorman Jan 22, 2024
fe43b36
Merge branch 'main' into 2045-validate-records-with-fastjsonschema
edgarrmondragon Jan 23, 2024
f055237
Merge branch 'main' of https://github.com/BuzzCutNorman/sdk into 2045…
BuzzCutNorman Jan 29, 2024
9d4fb94
Merge branch 'main' into 2045-validate-records-with-fastjsonschema
edgarrmondragon Feb 9, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 22 additions & 8 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ backoff = { version = ">=2.0.0", python = "<4" }
backports-datetime-fromisoformat = { version = ">=2.0.1", python = "<3.11" }
click = "~=8.0"
cryptography = ">=3.4.6"
fastjsonschema = ">=2.19.0"
fs = ">=2.4.16"
importlib-metadata = {version = "<7.0.0", python = "<3.12"}
importlib-resources = {version = ">=5.12.0", markers = "python_version < \"3.9\""}
Expand Down Expand Up @@ -216,6 +217,7 @@ warn_unused_ignores = true
ignore_missing_imports = true
module = [
"backports.datetime_fromisoformat.*",
"fastjsonschema.*",
"joblib.*", # TODO: Remove when https://github.com/joblib/joblib/issues/1516 is shipped
"jsonpath_ng.*",
"pyarrow.*", # TODO: Remove when https://github.com/apache/arrow/issues/32609 if implemented and released
Expand Down
10 changes: 9 additions & 1 deletion singer_sdk/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,4 +137,12 @@ class ConformedNameClashException(Exception):


class MissingKeyPropertiesError(Exception):
"""Raised when a recieved (and/or transformed) record is missing key properties."""
"""Raised when a received (and/or transformed) record is missing key properties."""


class InvalidJSONSchema(Exception):
"""Raised when a JSON schema is invalid."""


class InvalidRecord(Exception):
"""Raised when a stream record is invalid according to its declared schema."""
4 changes: 2 additions & 2 deletions singer_sdk/io_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ class SingerReader(metaclass=abc.ABCMeta):
def listen(self, file_input: t.IO[str] | None = None) -> None:
"""Read from input until all messages are processed.

This method is internal to the SDK and should not need to be overridden.

Args:
file_input: Readable stream of messages. Defaults to standard in.

This method is internal to the SDK and should not need to be overridden.
"""
if not file_input:
file_input = sys.stdin
Expand Down
125 changes: 117 additions & 8 deletions singer_sdk/sinks/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,14 @@
from gzip import open as gzip_open
from types import MappingProxyType

from jsonschema import Draft7Validator
import fastjsonschema
from typing_extensions import override

from singer_sdk.exceptions import MissingKeyPropertiesError
from singer_sdk.exceptions import (
InvalidJSONSchema,
InvalidRecord,
MissingKeyPropertiesError,
)
from singer_sdk.helpers._batch import (
BaseBatchFileEncoding,
BatchConfig,
Expand All @@ -39,7 +44,76 @@

from singer_sdk.target_base import Target

JSONSchemaValidator = Draft7Validator

class BaseJSONSchemaValidator(abc.ABC):
"""Abstract base class for JSONSchema validator."""

def __init__(self, schema: dict[str, t.Any]) -> None:
"""Initialize the record validator.

Args:
schema: Schema of the stream to sink.
"""
self.schema = schema

@abc.abstractmethod
def validate(self, record: dict[str, t.Any]) -> None:
"""Validate a record message.

This method MUST raise an ``InvalidRecord`` exception if the record is invalid.

Args:
record: Record message to validate.
"""


class FastJSONSchemaValidator(BaseJSONSchemaValidator):
BuzzCutNorman marked this conversation as resolved.
Show resolved Hide resolved
"""Validate records using the ``fastjsonschema`` library."""

def __init__(
self,
schema: dict,
*,
validate_formats: bool = False,
format_validators: dict[str, t.Callable] | None = None,
):
"""Initialize the validator.

Args:
schema: Schema of the stream to sink.
validate_formats: Whether JSON string formats (e.g. ``date-time``) should
be validated.
format_validators: User-defined format validators.

Raises:
InvalidJSONSchema: If the schema provided from tap or mapper is invalid.
"""
super().__init__(schema)
try:
self.validator = fastjsonschema.compile(
self.schema,
use_formats=validate_formats,
formats=format_validators or {},
)
except fastjsonschema.JsonSchemaDefinitionException as e:
error_message = f"Schema Validation Error: {e}"
BuzzCutNorman marked this conversation as resolved.
Show resolved Hide resolved
raise InvalidJSONSchema(error_message) from e

@override
def validate(self, record: dict): # noqa: ANN201
"""Validate a record message.

Args:
record: Record message to validate.

Raises:
InvalidRecord: If the record is invalid.
"""
try:
self.validator(record)
except fastjsonschema.JsonSchemaValueException as e:
error_message = f"Record Message Validation Error: {e.message}"
raise InvalidRecord(error_message) from e


class Sink(metaclass=abc.ABCMeta):
Expand All @@ -51,6 +125,15 @@ class Sink(metaclass=abc.ABCMeta):

MAX_SIZE_DEFAULT = 10000

validate_schema = True
"""Enable JSON schema record validation."""

validate_field_string_format = False
BuzzCutNorman marked this conversation as resolved.
Show resolved Hide resolved
"""Enable JSON schema format validation, for example `date-time` string fields."""

fail_on_record_validation_exception: bool = True
"""Interrupt the target execution when a record fails schema validation."""

def __init__(
self,
target: Target,
Expand Down Expand Up @@ -95,10 +178,24 @@ def __init__(
self._batch_records_read: int = 0
self._batch_dupe_records_merged: int = 0

self._validator = Draft7Validator(
schema,
format_checker=Draft7Validator.FORMAT_CHECKER,
)
self._validator: BaseJSONSchemaValidator | None = self.get_validator()

def get_validator(self) -> BaseJSONSchemaValidator | None:
"""Get a record validator for this sink.

Override this method to use a custom format validator, or disable record
validation by returning `None`.

Returns:
An instance of a subclass of ``BaseJSONSchemaValidator``.
"""
if self.validate_schema:
return FastJSONSchemaValidator(
self.schema,
validate_formats=self.validate_field_string_format,
format_validators={},
)
return None

def _get_context(self, record: dict) -> dict: # noqa: ARG002
"""Return an empty dictionary by default.
Expand Down Expand Up @@ -328,8 +425,20 @@ def _validate_and_parse(self, record: dict) -> dict:

Returns:
TODO

Raises:
InvalidRecord: If the record is invalid.
"""
self._validator.validate(record)
if self._validator is not None:
# TODO: Check the performance impact of this try/except block. It runs
# on every record, so it's probably bad and should be moved up the stack.
try:
self._validator.validate(record)
except InvalidRecord as e:
if self.fail_on_record_validation_exception:
raise InvalidRecord(e) from e
self.logger.exception("Record validation failed %s", e)

self._parse_timestamps_in_record(
record=record,
schema=self.schema,
Expand Down
Loading