From def575f595f999e96336ecd9d7cdf2fcaa3aafba Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Sat, 10 Aug 2024 02:59:28 +0200 Subject: [PATCH] Zyp: A compact transformation engine A data model and implementation for a compact transformation engine written in Python. - Based on JSON Pointer (RFC 6901), JMESPath, and transon - Implemented using `attrs` and `cattrs` - Includes built-in transformation functions `to_datetime` and `to_unixtime` - Ability to marshall and unmarshall its representation to/from JSON and YAML --- .github/workflows/tests.yml | 64 ++++- CHANGES.md | 2 + README.md | 20 +- doc/backlog.md | 3 + doc/decode.md | 2 +- doc/index.md | 3 +- doc/zyp/backlog.md | 48 ++++ doc/zyp/index.md | 192 +++++++++++++++ doc/zyp/research.md | 25 ++ pyproject.toml | 29 ++- src/commons_codec/model.py | 2 +- src/zyp/__init__.py | 0 src/zyp/function.py | 35 +++ src/zyp/model/__init__.py | 0 src/zyp/model/base.py | 75 ++++++ src/zyp/model/bucket.py | 180 ++++++++++++++ src/zyp/model/collection.py | 36 +++ src/zyp/model/fluent.py | 25 ++ src/zyp/model/moksha.py | 68 ++++++ src/zyp/model/project.py | 32 +++ src/zyp/util/__init__.py | 0 src/zyp/util/data.py | 10 + src/zyp/util/dictx.py | 165 +++++++++++++ src/zyp/util/expression.py | 18 ++ src/zyp/util/locator.py | 37 +++ tests/transform/test_aws_dms.py | 5 + tests/zyp/__init__.py | 0 tests/zyp/conftest.py | 6 + tests/zyp/test_bucket.py | 289 +++++++++++++++++++++++ tests/zyp/test_collection.py | 102 ++++++++ tests/zyp/test_function.py | 41 ++++ tests/zyp/test_locator.py | 18 ++ tests/zyp/test_model.py | 15 ++ tests/zyp/test_moksha.py | 69 ++++++ tests/zyp/test_project.py | 24 ++ tests/zyp/test_util.py | 50 ++++ tests/zyp/transformation-bucket.json | 30 +++ tests/zyp/transformation-collection.yaml | 22 ++ 38 files changed, 1727 insertions(+), 15 deletions(-) create mode 100644 doc/zyp/backlog.md create mode 100644 doc/zyp/index.md create mode 100644 doc/zyp/research.md create mode 100644 src/zyp/__init__.py create mode 100644 src/zyp/function.py create mode 100644 src/zyp/model/__init__.py create mode 100644 src/zyp/model/base.py create mode 100644 src/zyp/model/bucket.py create mode 100644 src/zyp/model/collection.py create mode 100644 src/zyp/model/fluent.py create mode 100644 src/zyp/model/moksha.py create mode 100644 src/zyp/model/project.py create mode 100644 src/zyp/util/__init__.py create mode 100644 src/zyp/util/data.py create mode 100644 src/zyp/util/dictx.py create mode 100644 src/zyp/util/expression.py create mode 100644 src/zyp/util/locator.py create mode 100644 tests/zyp/__init__.py create mode 100644 tests/zyp/conftest.py create mode 100644 tests/zyp/test_bucket.py create mode 100644 tests/zyp/test_collection.py create mode 100644 tests/zyp/test_function.py create mode 100644 tests/zyp/test_locator.py create mode 100644 tests/zyp/test_model.py create mode 100644 tests/zyp/test_moksha.py create mode 100644 tests/zyp/test_project.py create mode 100644 tests/zyp/test_util.py create mode 100644 tests/zyp/transformation-bucket.json create mode 100644 tests/zyp/transformation-collection.yaml diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index baf59aa..5489191 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -104,7 +104,7 @@ jobs: pip install "setuptools>=64" --upgrade # Install package in editable mode. - pip install --use-pep517 --prefer-binary --editable=.[develop,test,mongodb] + pip install --use-pep517 --prefer-binary --editable=.[mongodb,develop,test] - name: Run linters and software tests run: poe check @@ -120,3 +120,65 @@ jobs: env_vars: OS,PYTHON name: codecov-umbrella fail_ci_if_error: true + + + test-zyp: + name: " + Zyp: Python ${{ matrix.python-version }} + " + runs-on: ${{ matrix.os }} + strategy: + fail-fast: false + matrix: + os: ['ubuntu-latest'] + python-version: ['3.8', '3.9', '3.12'] + + env: + OS: ${{ matrix.os }} + PYTHON: ${{ matrix.python-version }} + + steps: + + - name: Acquire sources + uses: actions/checkout@v4 + + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + architecture: x64 + cache: 'pip' + cache-dependency-path: + pyproject.toml + + - name: Set up project + run: | + + # `setuptools 0.64.0` adds support for editable install hooks (PEP 660). + # https://github.com/pypa/setuptools/blob/main/CHANGES.rst#v6400 + pip install "setuptools>=64" --upgrade + + # Install package in editable mode. + pip install --use-pep517 --prefer-binary --editable=.[zyp,develop,test] + + - name: Set timezone + uses: szenius/set-timezone@v2.0 + with: + timezoneLinux: "Europe/Berlin" + timezoneMacos: "Europe/Berlin" + timezoneWindows: "European Standard Time" + + - name: Run linters and software tests + run: poe check + + # https://github.com/codecov/codecov-action + - name: Upload coverage results to Codecov + uses: codecov/codecov-action@v4 + env: + CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }} + with: + files: ./coverage.xml + flags: zyp + env_vars: OS,PYTHON + name: codecov-umbrella + fail_ci_if_error: true diff --git a/CHANGES.md b/CHANGES.md index 5abd9a1..5c4b7d1 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,6 +1,8 @@ # Changelog ## Unreleased +- Added `BucketTransformation`, a minimal transformation engine + based on JSON Pointer (RFC 6901). - Added documentation using Sphinx and Read the Docs ## 2024/08/05 v0.0.3 diff --git a/README.md b/README.md index eba78af..a77025a 100644 --- a/README.md +++ b/README.md @@ -10,7 +10,6 @@ [![License](https://img.shields.io/pypi/l/commons-codec.svg)](https://pypi.org/project/commons-codec/) ## About -Data decoding, encoding, conversion, and translation utilities. > A codec is a device or computer program that encodes or decodes a data stream or signal. > Codec is a portmanteau of coder/decoder. @@ -21,18 +20,23 @@ Data decoding, encoding, conversion, and translation utilities. > -- https://en.wikipedia.org/wiki/Codec ## What's Inside -- **Decoders:** A collection of reusable utilities with minimal dependencies for - transcoding purposes, mostly collected from other projects like +- [Change Data Capture (CDC)]: **Transformer components** for converging CDC event messages to + SQL statements. + +- A collection of reusable utilities with minimal dependencies for + **decoding and transcoding** purposes, mostly collected from other projects like [Kotori](https://kotori.readthedocs.io/) and [LorryStream](https://lorrystream.readthedocs.io/), in order to provide them per standalone package for broader use cases. -- Transformers for [Change Data Capture (CDC)] messages to SQL statements. +- [Zyp], a generic and compact **transformation engine** written in Python, for data + decoding, encoding, conversion, translation, transformation, and cleansing purposes, + to be used as a pipeline element for data pre- and/or post-processing. ## Installation The package is available from [PyPI] at [commons-codec]. -To install the most recent version, run: +To install the most recent version, including support for MongoDB, and Zyp, run: ```shell -pip install --upgrade commons-codec +pip install --upgrade 'commons-codec[mongodb,zyp]' ``` ## Usage @@ -47,7 +51,7 @@ Kudos to the authors of all the many software components this library is vendoring and building upon. ### Similar Projects -See [prior art]. +See [prior art] and [Zyp research]. ### Contributing The `commons-codec` package is an open source project, and is @@ -69,8 +73,10 @@ within the header sections of relevant files. [Apache Commons Codec]: https://commons.apache.org/proper/commons-codec/ [Change Data Capture (CDC)]: https://en.wikipedia.org/wiki/Change_data_capture [commons-codec]: https://pypi.org/project/commons-codec/ +[Zyp research]: https://commons-codec.readthedocs.io/zyp/research.html [documentation]: https://commons-codec.readthedocs.io/ [examples]: https://github.com/daq-tools/commons-codec/tree/main/examples [managed on GitHub]: https://github.com/daq-tools/commons-codec [prior art]: https://commons-codec.readthedocs.io/prior-art.html [PyPI]: https://pypi.org/ +[Zyp]: https://commons-codec.readthedocs.io/zyp/ diff --git a/doc/backlog.md b/doc/backlog.md index 91b7814..c0ff7b6 100644 --- a/doc/backlog.md +++ b/doc/backlog.md @@ -12,3 +12,6 @@ - [ ] MongoDB: Implement stream resumption using `start_after` - [ ] Feature: Filter by events, e.g. Ignore "delete" events? - [ ] Integration Testing the "example" programs? +- [ ] Improve capabilities of DMS translator + https://github.com/daq-tools/commons-codec/issues/11 +- https://github.com/supabase/pg_replicate diff --git a/doc/decode.md b/doc/decode.md index f3f6c8a..0ae50e8 100644 --- a/doc/decode.md +++ b/doc/decode.md @@ -1,4 +1,4 @@ -# Various Decoders +# Decoder Collection `commons-codec` includes telemetry data decoders for individual popular sensor appliances. diff --git a/doc/index.md b/doc/index.md index 63d66b0..cd3eee1 100644 --- a/doc/index.md +++ b/doc/index.md @@ -23,7 +23,7 @@ | [LorryStream] ```{include} readme.md -:start-line: 12 +:start-line: 11 ``` @@ -34,6 +34,7 @@ cdc/index decode +zyp/index ``` ```{toctree} diff --git a/doc/zyp/backlog.md b/doc/zyp/backlog.md new file mode 100644 index 0000000..dda82f1 --- /dev/null +++ b/doc/zyp/backlog.md @@ -0,0 +1,48 @@ +# Zyp Backlog + +## Iteration +1 +- Refactor module namespace to `zyp` +- Documentation +- CLI interface +- Apply to MongoDB Table Loader in CrateDB Toolkit + +## Iteration +2 +Demonstrate! +- math expressions +- omit key (recursively) +- combine keys +- filter on keys and/or values +- Pathological cases like "Not defined" in typed fields like `TIMESTAMP` +- Use simpleeval, like Meltano, and provide the same built-in functions +- https://sdk.meltano.com/en/v0.39.1/stream_maps.html#other-built-in-functions-and-names +- https://github.com/MeltanoLabs/meltano-map-transform/pull/255 +- https://github.com/MeltanoLabs/meltano-map-transform/issues/252 +- Use JSONPath, see https://sdk.meltano.com/en/v0.39.1/code_samples.html#use-a-jsonpath-expression-to-extract-the-next-page-url-from-a-hateoas-response + +## Iteration +3 +- Moksha transformations on Buckets +- Investigate using JSON Schema +- Fluent API interface +- https://github.com/Halvani/alphabetic +- Mappers do not support external API lookups. + To add external API lookups, you can either (a) land all your data and + then joins using a transformation tool like dbt, or (b) create a custom + mapper plugin with inline lookup logic. + => Example from Luftdatenpumpe, using a reverse geocoder +- [ ] Define schema + https://sdk.meltano.com/en/latest/typing.html +- https://docs.meltano.com/guide/v2-migration/#migrate-to-an-adapter-specific-dbt-transformer +- https://github.com/meltano/sdk/blob/v0.39.1/singer_sdk/mapper.py + +## Fluent API Interface + +```python + +from zyp.model.fluent import FluentTransformation + +transformation = FluentTransformation() +.jmes("records[?starts_with(location, 'B')]") +.rename_fields({"_id": "id"}) +.convert_values({"/id": "int", "/value": "float"}, type="pointer-python") +.jq(".[] |= (.value /= 100)") +``` diff --git a/doc/zyp/index.md b/doc/zyp/index.md new file mode 100644 index 0000000..8e7ee70 --- /dev/null +++ b/doc/zyp/index.md @@ -0,0 +1,192 @@ +# Zyp Transformations + +## About +A data model and implementation for a compact transformation engine written +in [Python], based on [JSON Pointer] (RFC 6901), [JMESPath], and [transon], +implemented using [attrs] and [cattrs]. + +## Ideas +:Conciseness: + Define a multistep data refinement process with as little code as possible. +:Low Footprint: + Doesn't need any infrastructure or pipeline framework. It's just a little library. +:Interoperability: + Marshal transformation recipe definition to/from text-only representations (JSON, + YAML), in order to encourage implementations in other languages. +:Performance: + Well, it is written in Python. Fragments can be re-written in Rust, when applicable. +:Immediate: + Other ETL frameworks and concepts often need to first land your data in the target + system before applying subsequent transformations. Zyp is working directly within + the data pipeline, before data is inserted into the target system. + +## Synopsis I +A basic transformation example for individual data records. + +```python +from zyp.model.bucket import BucketTransformation, FieldRenamer, ValueConverter + +# Consider a slightly messy collection of records. +data_in = [ + {"_id": "123", "name": "device-foo", "reading": "42.42"}, + {"_id": "456", "name": "device-bar", "reading": -84.01}, +] + +# Define a transformation that renames the `_id` field to `id`, +# casts its value to `int`, and casts the `reading` field to `float`. +transformation = BucketTransformation( + names=FieldRenamer().add(old="_id", new="id"), + values=ValueConverter() + .add(pointer="/id", transformer="builtins.int") + .add(pointer="/reading", transformer="builtins.float"), +) + +for record in data_in: + print(transformation.apply(record)) +``` +The result is a transformed data collection. +```json +[ + {"id": 123, "name": "device-foo", "reading": 42.42}, + {"id": 456, "name": "device-bar", "reading": -84.01} +] +``` + +## Synopsis II +A more advanced transformation example for a collection of data records. + +Consider a messy collection of input data. +- The actual collection is nested within the top-level `records` item. +- `_id` fields are conveyed in string format. +- `value` fields include both integer and string values. +- `value` fields are fixed-point values, using a scaling factor of `100`. +- The collection includes invalid `null` records. + Those records usually trip processing when, for example, filtering on object items. +```python +data_in = { + "message-source": "system-3000", + "message-type": "eai-warehouse", + "records": [ + {"_id": "12", "meta": {"name": "foo", "location": "B"}, "data": {"value": "4242"}}, + None, + {"_id": "34", "meta": {"name": "bar", "location": "BY"}, "data": {"value": -8401}}, + {"_id": "56", "meta": {"name": "baz", "location": "NI"}, "data": {"value": 2323}}, + {"_id": "78", "meta": {"name": "qux", "location": "NRW"}, "data": {"value": -580}}, + None, + None, + ], +} +``` + +Consider after applying a corresponding transformation, the expected outcome is a +collection of valid records, optionally filtered, and values adjusted according +to relevant type hints and other conversions. +```python +data_out = [ + {"id": 12, "meta": {"name": "foo", "location": "B"}, "data": {"value": 42.42}}, + {"id": 34, "meta": {"name": "bar", "location": "BY"}, "data": {"value": -84.01}}, +] +``` + +Let's come up with relevant pre-processing rules to cleanse and mangle the shape of the +input collection. In order to make this example more exciting, let's include two special +needs: +- Filter input collection by value of nested element. +- Rename top-level fields starting with underscore `_`. + +Other than those special rules, the fundamental ones to re-shape the data are: +- Unwrap `records` attribute from container dictionary into actual collection. +- Filter collection, both by omitting invalid/empty records, and by applying query + constrains. +- On each record, rename the top-level `_id` field to `id`. +- On each record, adjust the data types of the `id` and `value` fields. +- Postprocess collection, applying a custom scaling factor to the `value` field. + +Zyp let's you concisely write those rules down, using the Python language. + +```python +from zyp.model.bucket import BucketTransformation, FieldRenamer, ValueConverter +from zyp.model.collection import CollectionTransformation +from zyp.model.moksha import MokshaTransformation + +transformation = CollectionTransformation( + pre=MokshaTransformation().jmes("records[?not_null(meta.location) && !starts_with(meta.location, 'N')]"), + bucket=BucketTransformation( + names=FieldRenamer().add(old="_id", new="id"), + values=ValueConverter() + .add(pointer="/id", transformer="builtins.int") + .add(pointer="/data/value", transformer="builtins.float"), + ), + post=MokshaTransformation().jq(".[] |= (.data.value /= 100)"), +) + +data_out = transformation.apply(data_in) +``` +Alternatively, serialize the `zyp-collection` transformation description, +for example into YAML format. +```python +print(transformation.to_yaml()) +``` +```yaml +meta: + version: 1 + type: zyp-collection +pre: + rules: + - expression: records[?not_null(meta.location) && !starts_with(meta.location, 'N')] + type: jmes +bucket: + names: + rules: + - new: id + old: _id + values: + rules: + - pointer: /id + transformer: builtins.int + - pointer: /data/value + transformer: builtins.float +post: + rules: + - expression: .[] |= (.data.value /= 100) + type: jq +``` + + +## Prior Art +- [Singer Transformer] +- [PipelineWise Transformations] +- [singer-transform] +- [Meltano Inline Data Mapping] +- [Meltano Inline Stream Maps] +- [AWS DMS source filter rules] +- [AWS DMS table selection and transformation rules] +- ... and many more. Thanks for the inspirations. + +## Etymology +With kudos to [Kris Zyp] for conceiving [JSON Pointer]. + +## More +```{toctree} +:maxdepth: 1 + +research +backlog +``` + + + +[attrs]: https://www.attrs.org/ +[AWS DMS source filter rules]: https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Tasks.CustomizingTasks.Filters.html +[AWS DMS table selection and transformation rules]: https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Tasks.CustomizingTasks.TableMapping.SelectionTransformation.html +[cattrs]: https://catt.rs/ +[Kris Zyp]: https://github.com/kriszyp +[JMESPath]: https://jmespath.org/ +[JSON Pointer]: https://datatracker.ietf.org/doc/html/rfc6901 +[Meltano Inline Data Mapping]: https://docs.meltano.com/guide/mappers/ +[Meltano Inline Stream Maps]: https://sdk.meltano.com/en/latest/stream_maps.html +[PipelineWise Transformations]: https://transferwise.github.io/pipelinewise/user_guide/transformations.html +[Python]: https://en.wikipedia.org/wiki/Python_(programming_language) +[Singer Transformer]: https://github.com/singer-io/singer-python/blob/master/singer/transform.py +[singer-transform]: https://github.com/dkarzon/singer-transform +[transon]: https://transon-org.github.io/ diff --git a/doc/zyp/research.md b/doc/zyp/research.md new file mode 100644 index 0000000..5780533 --- /dev/null +++ b/doc/zyp/research.md @@ -0,0 +1,25 @@ +(zyp-research)= +# Zyp Research + +## Toolbox +- jq, jsonpointer, jmespath, funcy, morph, boltons, toolz +- json-spec, jdata, jolt, json-document-transforms, transon + + +## Prior Art +- https://pypi.org/project/json-spec/ +- https://pypi.org/project/transon/ +- https://pypi.org/project/jdata/ +- https://github.com/microsoft/json-document-transforms +- https://github.com/Microsoft/json-document-transforms/wiki +- https://github.com/bazaarvoice/jolt +- https://stackoverflow.com/questions/76303733/exploring-jolt-functions-for-json-to-json-transformations-an-overview +- https://github.com/microsoft/JsonToJsonMapper +- https://pypi.org/project/jdt/ +- https://github.com/videntity/json-data-tools +- https://github.com/datavis-tech/json-templates +- https://github.com/google/jsonnet +- https://github.com/jsonata-js/jsonata +- https://github.com/pacifica/python-jsonpath2 +- https://github.com/reagento/adaptix +- https://blog.panoply.io/best-data-transformation-tools diff --git a/pyproject.toml b/pyproject.toml index 6747177..32e1744 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -21,10 +21,12 @@ keywords = [ "dynamodb", "encode", "i/o", + "jmespath", "json", + "jsonpointer", "luftdaten.info", "map data", - "marshall", + "marshal", "mongodb", "nested data", "sensor.community", @@ -35,11 +37,13 @@ keywords = [ "transcode", "transform", "translate", + "transon", "ttn", "tts", - "unmarshall", + "unmarshal", "unserialize", "utility", + "zyp", ] license = { text = "LGPL 3, EUPL 1.2" } authors = [ @@ -102,9 +106,13 @@ dynamic = [ dependencies = [ "attrs<25", "backports-strenum<1.3; python_version<'3.11'", + "cattrs<24", "simplejson<4", "toolz<0.13", ] +optional-dependencies.all = [ + "commons-codec[mongodb,zyp]", +] optional-dependencies.develop = [ "mypy<1.12", "poethepoet<0.28", @@ -133,6 +141,14 @@ optional-dependencies.test = [ "pytest-cov<6", "pytest-mock<4", ] +optional-dependencies.zyp = [ + "jmespath<1.1", + "jq<1.8", + "jsonpointer<4", + "python-dateutil<2.10", + "pyyaml<7", + "transon==0.0.7", +] urls.Changelog = "https://commons-codec.readthedocs.io/changes.html" urls.Documentation = "https://commons-codec.readthedocs.io/" @@ -215,9 +231,10 @@ pythonpath = [ "src", ] testpaths = [ - "examples", "commons_codec", + "examples", "tests", + "zyp", ] python_files = [ "test_*.py", @@ -238,6 +255,7 @@ source = [ branch = false omit = [ "tests/*", + "src/zyp/util/dictx.py", ] [tool.coverage.report] @@ -250,7 +268,10 @@ exclude_lines = [ [tool.mypy] mypy_path = "src" -packages = [ "commons_codec" ] +packages = [ + "commons_codec", + "zyp", +] exclude = [ ] check_untyped_defs = true diff --git a/src/commons_codec/model.py b/src/commons_codec/model.py index c86f422..b69f808 100644 --- a/src/commons_codec/model.py +++ b/src/commons_codec/model.py @@ -6,7 +6,7 @@ if sys.version_info >= (3, 11): from enum import StrEnum else: - from backports.strenum import StrEnum + from backports.strenum import StrEnum # pragma: no cover from attrs import define diff --git a/src/zyp/__init__.py b/src/zyp/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/zyp/function.py b/src/zyp/function.py new file mode 100644 index 0000000..38b6f15 --- /dev/null +++ b/src/zyp/function.py @@ -0,0 +1,35 @@ +import datetime as dt +import logging +import typing as t + +logger = logging.getLogger(__name__) + + +def to_datetime(value: t.Any, on_error: t.Literal["raise", "ignore"] = "ignore") -> t.Union[dt.datetime, None]: + if isinstance(value, dt.datetime): + return value + import dateutil.parser + + try: + return dateutil.parser.parse(value) + except (TypeError, dateutil.parser.ParserError) as ex: + logger.warning(f"Parsing value into datetime failed: {value}. Reason: {ex}") + if on_error == "ignore": + return None + elif on_error == "raise": + raise + + +def to_unixtime(value: t.Any, on_error: t.Literal["raise", "ignore"] = "ignore") -> t.Union[float, None]: + if isinstance(value, float): + return value + if isinstance(value, int): + return float(value) + if value is not None and not isinstance(value, dt.datetime): + value = to_datetime(value, on_error=on_error) + if value is None: + if on_error == "ignore": + return None + elif on_error == "raise": + raise ValueError(f"Converting value to unixtime failed: {value}") + return value.timestamp() diff --git a/src/zyp/model/__init__.py b/src/zyp/model/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/zyp/model/base.py b/src/zyp/model/base.py new file mode 100644 index 0000000..f833ca4 --- /dev/null +++ b/src/zyp/model/base.py @@ -0,0 +1,75 @@ +import typing as t +from collections import OrderedDict + +import attr +from attr import Factory +from attrs import define +from cattrs.preconf.json import make_converter as make_json_converter +from cattrs.preconf.pyyaml import make_converter as make_yaml_converter + +from zyp.util.data import no_privates_no_nulls_no_empties + + +@define +class Metadata: + version: t.Union[int, None] = None + type: t.Union[str, None] = None + + +@define +class SchemaDefinitionRule: + pointer: str + type: str + + +@define +class SchemaDefinition: + rules: t.List[SchemaDefinitionRule] = Factory(list) + _map: t.Dict[str, str] = Factory(dict) + + def add(self, pointer: str, type: str) -> "SchemaDefinition": # noqa: A002 + return self._add(SchemaDefinitionRule(pointer=pointer, type=type)) + + def __attrs_post_init__(self): + if self.rules and not self._map: + for rule in self.rules: + self._add_runtime(rule) + + def _add(self, rule: SchemaDefinitionRule) -> "SchemaDefinition": + self.rules.append(rule) + self._add_runtime(rule) + return self + + def _add_runtime(self, rule: SchemaDefinitionRule) -> "SchemaDefinition": + self._map[rule.pointer] = rule.type + return self + + +@define +class Dumpable: + meta: t.Union[Metadata, None] = None + + def to_dict(self) -> t.Dict[str, t.Any]: + return attr.asdict(self, dict_factory=OrderedDict, filter=no_privates_no_nulls_no_empties) + + def to_json(self) -> str: + converter = make_json_converter(dict_factory=OrderedDict) + return converter.dumps(self.to_dict()) + + def to_yaml(self) -> str: + converter = make_yaml_converter(dict_factory=OrderedDict) + return converter.dumps(self.to_dict()) + + @classmethod + def from_dict(cls, data: t.Dict[str, t.Any]): + return cls(**data) + + @classmethod + def from_json(cls, json_str: str): + converter = make_json_converter(dict_factory=OrderedDict) + return converter.loads(json_str, cls) + + @classmethod + def from_yaml(cls, yaml_str: str): + converter = make_yaml_converter(dict_factory=OrderedDict) + return converter.loads(yaml_str, cls) diff --git a/src/zyp/model/bucket.py b/src/zyp/model/bucket.py new file mode 100644 index 0000000..6e03d51 --- /dev/null +++ b/src/zyp/model/bucket.py @@ -0,0 +1,180 @@ +import importlib +import logging +import typing as t + +import jmespath +import jq +import jsonpointer +import transon +from attr import Factory +from attrs import define +from jsonpointer import JsonPointer + +from zyp.model.base import Dumpable, Metadata, SchemaDefinition +from zyp.util.dictx import OrderedDictX +from zyp.util.locator import swap_node, to_pointer + +logger = logging.getLogger(__name__) + + +Record = t.Dict[str, t.Any] +Collection = t.List[Record] +DictOrList = t.Union[Record, Collection] +TransonTemplate = t.Dict[str, t.Any] + +MokshaTransformer = t.Union[jmespath.parser.ParsedResult, jq._Program, transon.Transformer] + + +@define +class ConverterRuleBase: + def compile(self): + raise NotImplementedError("Please implement this method") + + +@define +class ConverterBase: + rules: t.List[t.Any] = Factory(list) + _runtime_rules: t.List[t.Any] = Factory(list) + + def __attrs_post_init__(self): + if self.rules and not self._runtime_rules: + for rule in self.rules: + self._add_runtime(rule) + + def _add_rule(self, rule): + self.rules.append(rule) + self._add_runtime(rule) + return self + + def _add_runtime(self, rule): + self._runtime_rules.append(rule.compile()) + return self + + +@define +class ValueConverterRule(ConverterRuleBase): + pointer: str + transformer: str + args: t.Union[t.List[t.Any], None] = Factory(list) + + def compile(self): + pointer = to_pointer(self.pointer) + if isinstance(self.transformer, str): + if not self.transformer: + raise ValueError("Empty transformer reference") + transformer_function = self._resolve_fun(self.transformer) + else: + transformer_function = self.transformer + if self.args: + transformer_function = transformer_function(*self.args) + return ValueConverterRuntimeRule(pointer=pointer, transformer=transformer_function) + + @staticmethod + def _resolve_fun(symbol: str) -> t.Callable: + if "." not in symbol: + symbol = f"zyp.function.{symbol}" + modname, symbol = symbol.rsplit(".", 1) + mod = importlib.import_module(modname) + return getattr(mod, symbol) + + +@define +class ValueConverterRuntimeRule: + pointer: jsonpointer.JsonPointer + transformer: t.Callable + + +@define +class ValueConverter(ConverterBase): + rules: t.List[ValueConverterRule] = Factory(list) + _runtime_rules: t.List[ValueConverterRuntimeRule] = Factory(list) + + def add(self, pointer: str, transformer: str, args: t.List[t.Any] = None) -> "ValueConverter": + self._add_rule(ValueConverterRule(pointer=pointer, transformer=transformer, args=args)) + return self + + def apply(self, data: t.Dict[str, t.Any]) -> t.Dict[str, t.Any]: + for rule in self._runtime_rules: + data = t.cast(t.Dict[str, t.Any], swap_node(rule.pointer, data, rule.transformer)) + return data + + +@define +class FieldRenamerRule: + old: str + new: str + + +@define +class FieldRenamer: + rules: t.List[FieldRenamerRule] = Factory(list) + + def add(self, old: str, new: str) -> "FieldRenamer": + self.rules.append(FieldRenamerRule(old=old, new=new)) + return self + + def apply(self, data: t.Dict[str, t.Any]) -> t.Dict[str, t.Any]: + d = OrderedDictX(data) + for rule in self.rules: + d.rename_key(rule.old, rule.new) + return d + + +@define +class TransonRule: + pointer: str + template: TransonTemplate + + def compile(self): + return TransonRuntimeRule(to_pointer(self.pointer), transformer=transon.Transformer(self.template)) + + +@define +class TransonRuntimeRule: + pointer: JsonPointer + transformer: transon.Transformer + + +@define +class TransonTransformation(ConverterBase): + rules: t.List[TransonRule] = Factory(list) + _runtime_rules: t.List[TransonRuntimeRule] = Factory(list) + + def add(self, pointer: str, template: TransonTemplate) -> "TransonTransformation": + self._add_rule(TransonRule(pointer=pointer, template=template)) + return self + + def apply(self, data: t.Dict[str, t.Any]) -> t.Dict[str, t.Any]: + for rule in self._runtime_rules: + data = t.cast(t.Dict[str, t.Any], swap_node(rule.pointer, data, rule.transformer.transform)) + return data + + +@define +class BucketTransformation(Dumpable): + """ + A minimal transformation engine. + + Based on: + - JSON Pointer (RFC 6901) + - Transon + + Documentation: + - https://www.rfc-editor.org/rfc/rfc6901 + - https://transon-org.github.io/ + """ + + meta: Metadata = Metadata(version=1, type="zyp-bucket") + schema: t.Union[SchemaDefinition, None] = None + names: t.Union[FieldRenamer, None] = None + values: t.Union[ValueConverter, None] = None + transon: t.Union[TransonTransformation, None] = None + + def apply(self, data: t.Dict[str, t.Any]) -> t.Dict[str, t.Any]: + if self.names: + data = self.names.apply(data) + if self.values: + data = self.values.apply(data) + if self.transon: + data = self.transon.apply(data) + return data diff --git a/src/zyp/model/collection.py b/src/zyp/model/collection.py new file mode 100644 index 0000000..9deed1b --- /dev/null +++ b/src/zyp/model/collection.py @@ -0,0 +1,36 @@ +import typing as t + +from attrs import define + +from zyp.model.base import Dumpable, Metadata, SchemaDefinition +from zyp.model.bucket import BucketTransformation, Collection, DictOrList +from zyp.model.moksha import MokshaTransformation + + +@define(frozen=True) +class CollectionAddress: + container: str + name: str + + +@define +class CollectionTransformation(Dumpable): + meta: Metadata = Metadata(version=1, type="zyp-collection") + address: t.Union[CollectionAddress, None] = None + schema: t.Union[SchemaDefinition, None] = None + pre: t.Union[MokshaTransformation, None] = None + bucket: t.Union[BucketTransformation, None] = None + post: t.Union[MokshaTransformation, None] = None + + def apply(self, data: DictOrList) -> Collection: + collection = t.cast(Collection, data) + if self.pre: + collection = t.cast(Collection, self.pre.apply(collection)) + collection_out: Collection = [] + if self.bucket: + for item in collection: + item = self.bucket.apply(item) + collection_out.append(item) + if self.post: + collection_out = t.cast(Collection, self.post.apply(collection_out)) + return collection_out diff --git a/src/zyp/model/fluent.py b/src/zyp/model/fluent.py new file mode 100644 index 0000000..0492aed --- /dev/null +++ b/src/zyp/model/fluent.py @@ -0,0 +1,25 @@ +import typing as t + +from attrs import define + +from zyp.model.bucket import ConverterBase +from zyp.model.moksha import MokshaRule + + +@define +class FluentTransformation(ConverterBase): + rules = t.List[t.Any] + + def jmes(self, expression) -> "FluentTransformation": + self._add_rule(MokshaRule(type="jmes", expression=expression)) + return self + + def jq(self, expression) -> "FluentTransformation": + self._add_rule(MokshaRule(type="jq", expression=expression)) + return self + + def rename_fields(self, definition: t.Dict[str, t.Any]) -> "FluentTransformation": + return self + + def convert_values(self, definition: t.Dict[str, t.Any], type: str) -> "FluentTransformation": # noqa: A002 + return self diff --git a/src/zyp/model/moksha.py b/src/zyp/model/moksha.py new file mode 100644 index 0000000..968e26b --- /dev/null +++ b/src/zyp/model/moksha.py @@ -0,0 +1,68 @@ +import collections +import typing as t + +import jmespath +import jq +import transon +from attr import Factory +from attrs import define + +from zyp.model.bucket import ConverterBase, DictOrList, MokshaTransformer, TransonTemplate +from zyp.util.expression import compile_expression + + +@define +class MokshaRule: + type: str + expression: t.Union[str, TransonTemplate] + + def compile(self): + return MokshaRuntimeRule(self.type, compile_expression(self.type, self.expression)) + + +@define +class MokshaRuntimeRule: + type: str + transformer: MokshaTransformer + + def evaluate(self, data: DictOrList) -> DictOrList: + if isinstance(self.transformer, jmespath.parser.ParsedResult): + return self.transformer.search(data, options=jmespath.Options(dict_cls=collections.OrderedDict)) + elif isinstance(self.transformer, jq._Program): + return self.transformer.input_value(data).first() + elif isinstance(self.transformer, transon.Transformer): + return self.transformer.transform(data) + else: + raise TypeError(f"Evaluation failed. Type must be either jmes or jq or transon: {self.transformer}") + + +@define +class MokshaTransformation(ConverterBase): + rules: t.List[MokshaRule] = Factory(list) + _runtime_rules: t.List[MokshaRuntimeRule] = Factory(list) + + def jmes(self, expression: str) -> "MokshaTransformation": + if not expression: + raise ValueError("JMESPath expression cannot be empty") + + self._add_rule(MokshaRule(type="jmes", expression=expression)) + return self + + def jq(self, expression: str) -> "MokshaTransformation": + if not expression: + raise ValueError("jq expression cannot be empty") + + self._add_rule(MokshaRule(type="jq", expression=expression)) + return self + + def transon(self, expression: TransonTemplate) -> "MokshaTransformation": + if not expression: + raise ValueError("transon expression cannot be empty") + + self._add_rule(MokshaRule(type="transon", expression=expression)) + return self + + def apply(self, data: DictOrList) -> DictOrList: + for rule in self._runtime_rules: + data = rule.evaluate(data) + return data diff --git a/src/zyp/model/project.py b/src/zyp/model/project.py new file mode 100644 index 0000000..7a4d3fe --- /dev/null +++ b/src/zyp/model/project.py @@ -0,0 +1,32 @@ +import typing as t + +from attr import Factory +from attrs import define + +from zyp.model.base import Dumpable, Metadata +from zyp.model.collection import CollectionAddress, CollectionTransformation + + +@define +class TransformationProject(Dumpable): + meta: Metadata = Metadata(version=1, type="zyp-project") + collections: t.List[CollectionTransformation] = Factory(list) + _map: t.Dict[CollectionAddress, CollectionTransformation] = Factory(dict) + + def __attrs_post_init__(self): + if self.collections and not self._map: + for collection in self.collections: + self._add(collection) + + def _add(self, collection: CollectionTransformation) -> "TransformationProject": + if collection is None or collection.address is None: + raise ValueError("CollectionTransformation or address missing") + self._map[collection.address] = collection + return self + + def add(self, collection: CollectionTransformation) -> "TransformationProject": + self.collections.append(collection) + return self._add(collection) + + def get(self, address: CollectionAddress) -> CollectionTransformation: + return self._map[address] diff --git a/src/zyp/util/__init__.py b/src/zyp/util/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/zyp/util/data.py b/src/zyp/util/data.py new file mode 100644 index 0000000..f45f3f2 --- /dev/null +++ b/src/zyp/util/data.py @@ -0,0 +1,10 @@ +def no_privates_no_nulls_no_empties(key, value) -> bool: + """ + A filter for `attr.asdict`, to suppress private attributes. + """ + is_private = key.name.startswith("_") + is_null = value is None + is_empty = value == [] + if is_private or is_null or is_empty: + return False + return True diff --git a/src/zyp/util/dictx.py b/src/zyp/util/dictx.py new file mode 100644 index 0000000..0459624 --- /dev/null +++ b/src/zyp/util/dictx.py @@ -0,0 +1,165 @@ +""" +OrderedDictX by Zuzu Corneliu. + +For the keeping of order case (the other one is trivial, remove old and add new +one): I was not satisfied with the ordered-dictionary needing reconstruction +(at least partially), obviously for efficiency reasons, so I've put together a +class (OrderedDictX) that extends OrderedDict and allows you to do key changes +efficiently, i.e. in O(1) complexity. The implementation can also be adjusted +for the now-ordered built-in dict class. + +It uses 2 extra dictionaries to remap the changed keys ("external" - i.e. as +they appear externally to the user) to the ones in the underlying OrderedDict +("internal") - the dictionaries will only hold keys that were changed so as +long as no key changing is done they will be empty. + +As expected, the splicing method is extremely slow (didn't expect it to be that +much slower either though) and uses a lot of memory, and the O(N) solution of +@Ashwini Chaudhary (bug-fixed though, del also needed) is also slower, 17X +times in this example. + +Of course, this solution being O(1), compared to the O(N) OrderedDictRaymond +the time difference becomes much more apparent as the dictionary size +increases, e.g. for 5 times more elements (100000), the O(N) is 100X slower. + +https://stackoverflow.com/questions/16475384/rename-a-dictionary-key/75115645#75115645 +""" + +from collections import OrderedDict + + +class OrderedDictX(OrderedDict): + def __init__(self, *args, **kwargs): + # Mappings from new->old (ext2int), old->new (int2ext). + # Only the keys that are changed (internal key doesn't match what the user sees) are contained. + self._keys_ext2int = OrderedDict() + self._keys_int2ext = OrderedDict() + self.update(*args, **kwargs) + + def rename_key(self, k_old, k_new): + # Validate that the old key is part of the dict + if not self.__contains__(k_old): + raise KeyError(f"Cannot rename key {k_old} to {k_new}: {k_old} not existing in dict") + + # Return if no changing is actually to be done + if len(OrderedDict.fromkeys([k_old, k_new])) == 1: + return + + # Validate that the new key would not conflict with another one + if self.__contains__(k_new): + raise KeyError(f"Cannot rename key {k_old} to {k_new}: {k_new} already in dict") + + # Change the key using internal dicts mechanism + if k_old in self._keys_ext2int: + # Revert change temporarily + k_old_int = self._keys_ext2int[k_old] + del self._keys_ext2int[k_old] + k_old = k_old_int + # Check if new key matches the internal key + if len(OrderedDict.fromkeys([k_old, k_new])) == 1: + del self._keys_int2ext[k_old] + return + + # Finalize key change + self._keys_ext2int[k_new] = k_old + self._keys_int2ext[k_old] = k_new + + def __contains__(self, k) -> bool: + if k in self._keys_ext2int: + return True + if not super().__contains__(k): + return False + return k not in self._keys_int2ext + + def __getitem__(self, k): + if not self.__contains__(k): + # Intentionally raise KeyError in ext2int + return self._keys_ext2int[k] + return super().__getitem__(self._keys_ext2int.get(k, k)) + + def __setitem__(self, k, v): + if k in self._keys_ext2int: + return super().__setitem__(self._keys_ext2int[k], v) + # If the key exists in the internal state but was renamed to a k_ext, + # employ this trick: make it such that it appears as if k_ext has also been renamed to k + if k in self._keys_int2ext: + k_ext = self._keys_int2ext[k] + self._keys_ext2int[k] = k_ext + k = k_ext + return super().__setitem__(k, v) + + def __delitem__(self, k): + if not self.__contains__(k): + # Intentionally raise KeyError in ext2int + del self._keys_ext2int[k] + if k in self._keys_ext2int: + k_int = self._keys_ext2int[k] + del self._keys_ext2int[k] + del self._keys_int2ext[k_int] + k = k_int + return super().__delitem__(k) + + def __iter__(self): + yield from self.keys() + + def __reversed__(self): + for k in reversed(super().keys()): + yield self._keys_int2ext.get(k, k) + + def __eq__(self, other: object) -> bool: + if not isinstance(other, dict): + return False + if len(self) != len(other): + return False + for (k, v), (k_other, v_other) in zip(self.items(), other.items()): + if k != k_other or v != v_other: + return False + return True + + def update(self, *args, **kwargs): + for k, v in OrderedDict(*args, **kwargs).items(): + self.__setitem__(k, v) + + def popitem(self, last=True) -> tuple: + if not last: + k = next(iter(self.keys())) + else: + k = next(iter(reversed(self.keys()))) + v = self.__getitem__(k) + self.__delitem__(k) + return k, v + + class OrderedDictXKeysView: + def __init__(self, odx: "OrderedDictX", orig_keys): + self._odx = odx + self._orig_keys = orig_keys + + def __iter__(self): + for k in self._orig_keys: + yield self._odx._keys_int2ext.get(k, k) + + def __reversed__(self): + for k in reversed(self._orig_keys): + yield self._odx._keys_int2ext.get(k, k) + + class OrderedDictXItemsView: + def __init__(self, odx: "OrderedDictX", orig_items): + self._odx = odx + self._orig_items = orig_items + + def __iter__(self): + for k, v in self._orig_items: + yield self._odx._keys_int2ext.get(k, k), v + + def __reversed__(self): + for k, v in reversed(self._orig_items): + yield self._odx._keys_int2ext.get(k, k), v + + def keys(self): + return self.OrderedDictXKeysView(self, super().keys()) + + def items(self): + return self.OrderedDictXItemsView(self, super().items()) + + def copy(self): + return OrderedDictX(self.items()) diff --git a/src/zyp/util/expression.py b/src/zyp/util/expression.py new file mode 100644 index 0000000..af00231 --- /dev/null +++ b/src/zyp/util/expression.py @@ -0,0 +1,18 @@ +import typing as t + +import jmespath +import jq +import transon + +from zyp.model.bucket import MokshaTransformer, TransonTemplate + + +def compile_expression(type: str, expression: t.Union[str, TransonTemplate]) -> MokshaTransformer: # noqa: A002 + if type == "jmes": + return jmespath.compile(expression) + elif type == "jq": + return jq.compile(expression) + elif type == "transon": + return transon.Transformer(expression) + else: + raise TypeError(f"Compilation failed. Type must be either jmes or jq or transon: {type}") diff --git a/src/zyp/util/locator.py b/src/zyp/util/locator.py new file mode 100644 index 0000000..1f0b6d5 --- /dev/null +++ b/src/zyp/util/locator.py @@ -0,0 +1,37 @@ +import logging +import typing as t + +import jsonpointer +from jsonpointer import JsonPointer, JsonPointerException + +logger = logging.getLogger(__name__) + +not_found = object() + + +def swap_node( + pointer: JsonPointer, value: t.Any, fun: t.Callable = None, on_error: t.Literal["ignore", "raise"] = "ignore" +) -> t.Union[JsonPointer, None]: + node = pointer.resolve(value, not_found) + if node is not_found: + msg = f"Element not found: {pointer}" + logger.debug(msg) + if on_error == "raise": + raise JsonPointerException(msg) + return value + if fun is not None: + node = fun(node) + inplace = bool(pointer.parts) + return pointer.set(value, node, inplace=inplace) + + +def to_pointer(pointer: t.Union[str, JsonPointer]) -> JsonPointer: + if isinstance(pointer, str): + try: + return jsonpointer.JsonPointer(pointer) + except JsonPointerException as ex: + raise ValueError(ex) from ex + elif isinstance(pointer, JsonPointer): + return pointer + else: + raise TypeError(f"Value is not of type str or JsonPointer: {type(pointer).__name__}") diff --git a/tests/transform/test_aws_dms.py b/tests/transform/test_aws_dms.py index 3fce7a9..ddf825e 100644 --- a/tests/transform/test_aws_dms.py +++ b/tests/transform/test_aws_dms.py @@ -1,4 +1,5 @@ # ruff: noqa: S608 FIXME: Possible SQL injection vector through string-based query construction +import base64 import json import pytest @@ -276,3 +277,7 @@ def test_decode_cdc_delete_failure(cdc): with pytest.raises(ValueError) as ex: DMSTranslatorCrateDB().to_sql(MSG_DATA_DELETE) assert ex.match("Unable to invoke DML operation without primary key information") + + +if __name__ == "__main__": + print(base64.b64encode(json.dumps(MSG_DATA_INSERT).encode("utf-8"))) # noqa: T201 diff --git a/tests/zyp/__init__.py b/tests/zyp/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/zyp/conftest.py b/tests/zyp/conftest.py new file mode 100644 index 0000000..b3556e0 --- /dev/null +++ b/tests/zyp/conftest.py @@ -0,0 +1,6 @@ +# ruff: noqa: E402 +import pytest + +jmespath = pytest.importorskip("jmespath") +jsonpointer = pytest.importorskip("jsonpointer") +transon = pytest.importorskip("transon") diff --git a/tests/zyp/test_bucket.py b/tests/zyp/test_bucket.py new file mode 100644 index 0000000..646b64a --- /dev/null +++ b/tests/zyp/test_bucket.py @@ -0,0 +1,289 @@ +# ruff: noqa: E402 +import datetime as dt +import json +import sys +from copy import deepcopy +from pathlib import Path + +import pytest +from zyp.model.base import SchemaDefinition +from zyp.model.bucket import ( + BucketTransformation, + FieldRenamer, + TransonTransformation, + ValueConverter, +) +from zyp.model.collection import CollectionTransformation + + +class ReadingWithTimestamps: + """ + An example dataset including a variety of timestamps. + + null + 06/30/2023 + 07/31/2022 00:00:00 + 2022-07-07 + Invalid date + """ + + ingress = { + "meta": { + "american_date": "06/30/2023", + "american_date_time": "06/14/2022 12:42:24", + "empty_date": "", + "international_date": "2022-07-07", + "invalid_date": "Invalid date", + "none_date": None, + "null_date": "null", + }, + "data": { + "temperature": 42.42, + "humidity": 84.84, + }, + } + egress = { + "meta": { + "american_date": dt.datetime(2023, 6, 30, 0, 0, 0), + "american_date_time": dt.datetime(2022, 6, 14, 12, 42, 24), + "empty_date": None, + "international_date": dt.datetime(2022, 7, 7), + "invalid_date": None, + "none_date": None, + "null_date": None, + }, + "data": { + "temperature": 42.42, + "humidity": 84.84, + }, + } + + +class BasicReading: + ingress = { + "_id": "foobar", + "meta": { + "date": "06/14/2022 12:42:24", + }, + "data": { + "temperature": 42.42, + "humidity": 84.84, + }, + } + egress = { + "id": "foobar", + "meta": { + "date": 1655203344.0, + }, + "data": { + "temperature": 42.42, + "humidity": 84.84, + }, + } + + +def test_value_converter_datetime_function_reference(): + """ + Verify value conversion with function reference to built-in transformer. + """ + engine = ValueConverter() + engine.add(pointer="/meta/american_date", transformer="to_datetime") + engine.add(pointer="/meta/american_date_time", transformer="to_datetime") + engine.add(pointer="/meta/empty_date", transformer="to_datetime") + engine.add(pointer="/meta/international_date", transformer="to_datetime") + engine.add(pointer="/meta/invalid_date", transformer="to_datetime") + engine.add(pointer="/meta/none_date", transformer="to_datetime") + engine.add(pointer="/meta/null_date", transformer="to_datetime") + + indata = deepcopy(ReadingWithTimestamps.ingress) + outdata = engine.apply(indata) + assert outdata == ReadingWithTimestamps.egress + + +def test_value_converter_datetime_function_callback(): + """ + Verify value conversion with function callback. + + Note: This use-case is discouraged, because an inline callback can't + be serialized into a text representation well. + """ + engine = ValueConverter() + from zyp.function import to_datetime + + engine.add(pointer="/meta/american_date", transformer=to_datetime) + indata = deepcopy(ReadingWithTimestamps.ingress) + outdata = engine.apply(indata) + assert outdata["meta"]["american_date"] == ReadingWithTimestamps.egress["meta"]["american_date"] + + +def test_value_converter_root_node_yaml_dump(): + """ + Converting values on the root level of the document. + """ + engine = ValueConverter() + engine.add(pointer="", transformer="yaml.dump") + assert engine.apply({"value": 42}) == "value: 42\n" + + +def test_value_converter_root_node_extract_and_convert(): + """ + Converting values on the root level of the document. + """ + engine = ValueConverter() + engine.add(pointer="", transformer="operator.itemgetter", args=["value"]) + engine.add(pointer="", transformer="builtins.str") + assert engine.apply({"value": 42}) == "42" + + +def test_value_converter_path_invalid(): + """ + Converting values with an invalid location pointer fails. + """ + engine = ValueConverter() + with pytest.raises(ValueError) as ex: + engine.add(pointer="---", transformer="to_datetime") + assert ex.match("Location must start with /") + + +def test_value_converter_transformer_empty(): + """ + Converting values with an empty transformer reference fails. + """ + engine = ValueConverter() + with pytest.raises(ValueError) as ex: + engine.add(pointer="/foo", transformer="") + assert ex.match("Empty transformer reference") + + +def test_value_converter_transformer_unknown_module(): + """ + Converting values with an unknown transformer module fails. + """ + engine = ValueConverter() + with pytest.raises(ImportError) as ex: + engine.add(pointer="/foo", transformer="foo.to_unknown") + assert ex.match("No module named 'foo'") + + +def test_value_converter_transformer_unknown_symbol(): + """ + Converting values with an unknown transformer symbol fails. + """ + engine = ValueConverter() + with pytest.raises(AttributeError) as ex: + engine.add(pointer="/foo", transformer="to_unknown") + assert ex.match("module 'zyp.function' has no attribute 'to_unknown'") + + +def test_bucket_transformation_success(): + """ + Converting values with a complete transformation description. + """ + transformation = BucketTransformation( + names=FieldRenamer().add(old="_id", new="id"), + values=ValueConverter().add(pointer="/meta/date", transformer="to_unixtime"), + ) + result = transformation.apply(deepcopy(BasicReading.ingress)) + assert result == BasicReading.egress + + +def test_bucket_transformation_transon_compute(): + """ + Converting documents using a `transon` transformation. + https://transon-org.github.io/ + """ + transformation = BucketTransformation( + transon=TransonTransformation().add( + pointer="/abc", template={"$": "call", "name": "str", "value": {"$": "expr", "op": "mul", "value": 2}} + ), + ) + result = transformation.apply({"abc": 123}) + assert result == {"abc": "246"} + + +def test_bucket_transformation_transon_filter(): + """ + Converting documents using a `transon` transformation. + https://transon-org.github.io/ + """ + transformation = BucketTransformation( + transon=TransonTransformation().add( + pointer="", template={"$": "filter", "cond": {"$": "expr", "op": "!=", "values": [{"$": "key"}, "baz"]}} + ), + ) + result = transformation.apply({"foo": "bar", "baz": "qux", "123": "456"}) + assert result == {"foo": "bar", "123": "456"} + + +def test_bucket_transformation_success_2(): + """ + Running a transformation without any manipulations yields the original input value. + """ + transformation = BucketTransformation() + result = transformation.apply(deepcopy(BasicReading.ingress)) + assert result == BasicReading.ingress + + +def test_bucket_transformation_serialize(): + """ + A transformation description can be serialized to a data structure and back. + """ + transformation = BucketTransformation( + schema=SchemaDefinition().add(pointer="/meta/date", type="DATETIME"), + names=FieldRenamer().add(old="_id", new="id"), + values=ValueConverter().add(pointer="/meta/date", transformer="to_unixtime"), + ) + transformation_dict = { + "meta": {"version": 1, "type": "zyp-bucket"}, + "schema": {"rules": [{"pointer": "/meta/date", "type": "DATETIME"}]}, + "names": {"rules": [{"new": "id", "old": "_id"}]}, + "values": {"rules": [{"pointer": "/meta/date", "transformer": "to_unixtime"}]}, + } + result = transformation.to_dict() + assert result == transformation_dict + + result = transformation.to_json() + assert json.loads(result) == transformation_dict + + +def test_bucket_transformation_serialize_args(): + """ + Check if transformer args are also serialized. + """ + transformation = BucketTransformation( + values=ValueConverter().add(pointer="", transformer="operator.itemgetter", args=["value"]), + ) + result = transformation.to_dict() + transformation_dict = { + "meta": {"version": 1, "type": "zyp-bucket"}, + "values": {"rules": [{"pointer": "", "transformer": "operator.itemgetter", "args": ["value"]}]}, + } + assert result == transformation_dict + + +def test_bucket_transformation_load_and_apply(): + """ + Verify transformation can be loaded from JSON and applied again. + """ + payload = Path("tests/zyp/transformation-bucket.json").read_text() + transformation = BucketTransformation.from_json(payload) + result = transformation.apply(deepcopy(BasicReading.ingress)) + assert result == BasicReading.egress + + +@pytest.mark.skipif(sys.version_info < (3, 9), reason="Does not work on Python 3.8 and earlier") +def test_bucket_transon_marshal(): + """ + Verify transformation can be loaded from JSON and applied again. + """ + transformation = BucketTransformation( + transon=TransonTransformation().add( + pointer="/abc", template={"$": "call", "name": "str", "value": {"$": "expr", "op": "mul", "value": 2}} + ), + ) + BucketTransformation.from_yaml(transformation.to_yaml()) + + +def test_from_dict(): + assert isinstance(BucketTransformation.from_dict({}), BucketTransformation) + assert isinstance(CollectionTransformation.from_dict({}), CollectionTransformation) diff --git a/tests/zyp/test_collection.py b/tests/zyp/test_collection.py new file mode 100644 index 0000000..e17e50c --- /dev/null +++ b/tests/zyp/test_collection.py @@ -0,0 +1,102 @@ +from copy import deepcopy +from pathlib import Path + +import yaml +from zyp.model.bucket import BucketTransformation, FieldRenamer, ValueConverter +from zyp.model.collection import CollectionTransformation +from zyp.model.moksha import MokshaTransformation + + +class ComplexRecipe: + """ + It executes the following steps, in order of appearance: + + - Unwrap `records` attribute from container dictionary into actual collection. + - Filter collection, both by omitting invalid/empty records, and by applying query constrains. + - On each record, rename the top-level `_id` field to `id`. + - On each record, apply value conversions to two nested data values. + - Postprocess collection, applying a custom value scaling factor. + """ + + # Define a messy input data collection. + data_in = { + "message-source": "system-3000", + "message-type": "eai-warehouse", + "records": [ + {"_id": "12", "meta": {"name": "foo", "location": "B"}, "data": {"value": "4242"}}, + None, + {"_id": "34", "meta": {"name": "bar", "location": "BY"}, "data": {"value": -8401}}, + {"_id": "56", "meta": {"name": "baz", "location": "NI"}, "data": {"value": 2323}}, + {"_id": "78", "meta": {"name": "qux", "location": "NRW"}, "data": {"value": -580}}, + None, + None, + ], + } + + # Define expectation of the cleansed data collection. + data_out = [ + {"id": 12, "meta": {"name": "foo", "location": "B"}, "data": {"value": 42.42}}, + {"id": 34, "meta": {"name": "bar", "location": "BY"}, "data": {"value": -84.01}}, + ] + + # Define transformation. + transformation = CollectionTransformation( + pre=MokshaTransformation().jmes("records[?not_null(meta.location) && !starts_with(meta.location, 'N')]"), + bucket=BucketTransformation( + names=FieldRenamer().add(old="_id", new="id"), + values=ValueConverter() + .add(pointer="/id", transformer="builtins.int") + .add(pointer="/data/value", transformer="builtins.float"), + ), + post=MokshaTransformation().jq(".[] |= (.data.value /= 100)"), + ) + + +def test_collection_transformation_success(): + """ + Verify transformation recipe for re-shaping a collection of records. + """ + assert ComplexRecipe.transformation.apply(ComplexRecipe.data_in) == ComplexRecipe.data_out + + +def test_collection_transformation_serialize(): + """ + Verify collection transformation description can be serialized to a data structure and back. + """ + transformation = ComplexRecipe.transformation + transformation_dict = { + "meta": {"version": 1, "type": "zyp-collection"}, + "pre": { + "rules": [ + {"type": "jmes", "expression": "records[?not_null(meta.location) && !starts_with(meta.location, 'N')]"} + ] + }, + "bucket": { + "meta": {"version": 1, "type": "zyp-bucket"}, + "names": {"rules": [{"old": "_id", "new": "id"}]}, + "values": { + "rules": [ + {"pointer": "/id", "transformer": "builtins.int"}, + {"pointer": "/data/value", "transformer": "builtins.float"}, + ] + }, + }, + "post": {"rules": [{"type": "jq", "expression": ".[] |= (.data.value /= 100)"}]}, + } + dict_result = transformation.to_dict() + assert dict_result == transformation_dict + return + + yaml_result = transformation.to_yaml() + assert yaml.full_load(yaml_result) == transformation_dict + CollectionTransformation.from_yaml(yaml_result) + + +def test_collection_transformation_load_and_apply(): + """ + Verify transformation can be loaded from JSON and applied again. + """ + payload = Path("tests/zyp/transformation-collection.yaml").read_text() + transformation = CollectionTransformation.from_yaml(payload) + result = transformation.apply(deepcopy(ComplexRecipe.data_in)) + assert result == ComplexRecipe.data_out diff --git a/tests/zyp/test_function.py b/tests/zyp/test_function.py new file mode 100644 index 0000000..c65df0e --- /dev/null +++ b/tests/zyp/test_function.py @@ -0,0 +1,41 @@ +import datetime as dt + +import pytest +from dateutil.parser import ParserError +from zyp.function import to_datetime, to_unixtime + +stddate = dt.datetime(2023, 6, 30) + + +def test_to_datetime_success(): + assert to_datetime("06/30/2023") == stddate + assert to_datetime("06/05/2023") == dt.datetime(2023, 6, 5) + assert to_datetime(stddate) == stddate + assert to_datetime("---") is None + assert to_datetime(None) is None + + +def test_to_datetime_failure(): + with pytest.raises(ParserError) as ex: + to_datetime("---", on_error="raise") + assert ex.match("String does not contain a date: ---") + + +def test_to_unixtime_success(): + assert to_unixtime("06/30/2023") == 1688076000.0 + assert to_unixtime("06/05/2023") == 1685916000.0 + assert to_unixtime(stddate) == 1688076000.0 + assert to_unixtime("---") is None + assert to_unixtime(123) == 123 + assert to_unixtime(123.45) == 123.45 + assert to_unixtime(None) is None + + +def test_to_unixtime_failure(): + with pytest.raises(ParserError) as ex: + to_unixtime("---", on_error="raise") + assert ex.match("String does not contain a date: ---") + + with pytest.raises(ValueError) as ex: + to_unixtime(None, on_error="raise") + assert ex.match("Converting value to unixtime failed: None") diff --git a/tests/zyp/test_locator.py b/tests/zyp/test_locator.py new file mode 100644 index 0000000..49bf7fd --- /dev/null +++ b/tests/zyp/test_locator.py @@ -0,0 +1,18 @@ +import pytest +from jsonpointer import JsonPointer, JsonPointerException +from zyp.util.locator import swap_node + + +def test_swap_node_not_found_raise(): + data = {"abc": "def"} + pointer = JsonPointer("/foo") + with pytest.raises(JsonPointerException) as ex: + swap_node(pointer, data, on_error="raise") + assert ex.match("Element not found: /foo") + + +def test_swap_node_not_found_ignore(): + data = {"abc": "def"} + pointer = JsonPointer("/foo") + new_data = swap_node(pointer, data, on_error="ignore") + assert new_data is data diff --git a/tests/zyp/test_model.py b/tests/zyp/test_model.py new file mode 100644 index 0000000..16e8560 --- /dev/null +++ b/tests/zyp/test_model.py @@ -0,0 +1,15 @@ +from zyp.model.fluent import FluentTransformation + + +def test_fluent_transformation(): + """ + FIXME: Fluent transformations are not implemented yet. + """ + transformation = ( + FluentTransformation() + .jmes("records[?starts_with(location, 'B')]") + .rename_fields({"_id": "id"}) + .convert_values({"/id": "int", "/value": "float"}, type="pointer-python") + .jq(".[] |= (.value /= 100)") + ) + assert len(transformation.rules) == 2 diff --git a/tests/zyp/test_moksha.py b/tests/zyp/test_moksha.py new file mode 100644 index 0000000..1ec6f18 --- /dev/null +++ b/tests/zyp/test_moksha.py @@ -0,0 +1,69 @@ +import pytest +from jmespath.exceptions import ParseError +from zyp.model.moksha import MokshaRule, MokshaTransformation + + +def test_moksha_jq_compute_nested(): + """ + Verify updating deeply nested field with value, using moksha/jq. + https://stackoverflow.com/a/65822084 + """ + transformation = MokshaTransformation().jq(".[] |= (.data.abc *= 2)") + assert transformation.apply([{"data": {"abc": 123}}]) == [{"data": {"abc": 246}}] + + +def test_transon_duplicate_records(): + """ + Verify record duplication works well. + """ + transformation = MokshaTransformation().transon({"$": "expr", "op": "mul", "value": 42}) + assert transformation.apply([{"foo": "bar", "baz": "qux"}]) == [{"foo": "bar", "baz": "qux"}] * 42 + + +def test_transon_idempotency(): + """ + Verify record duplication works well. + """ + transformation = MokshaTransformation().transon({"$": "this"}) + assert transformation.apply([{"foo": "bar"}, {"baz": "qux"}]) == [{"foo": "bar"}, {"baz": "qux"}] + + +def test_moksha_rule(): + moksha = MokshaRule(type="jmes", expression="@").compile() + assert moksha.transformer.expression == "@" + assert moksha.transformer.parsed == {"type": "current", "children": []} + + +def test_moksha_runtime_rule_success(): + assert MokshaRule(type="jmes", expression="@").compile().evaluate(42.42) == 42.42 + + +def test_moksha_runtime_rule_syntax_error(): + with pytest.raises(ParseError) as ex: + MokshaRule(type="jmes", expression="@foo").compile() + assert ex.match("Unexpected token: foo") + + +def test_moksha_runtime_rule_invalid_transformer(): + rule = MokshaRule(type="jmes", expression="@").compile() + rule.transformer = "foo" + with pytest.raises(TypeError) as ex: + rule.evaluate(42.42) + assert ex.match("Evaluation failed. Type must be either jmes or jq or transon: foo") + + +def test_moksha_empty(): + """ + Empty JSON Pointer expression means "root node". + """ + with pytest.raises(ValueError) as ex: + MokshaTransformation().jmes("") + assert ex.match("JMESPath expression cannot be empty") + + with pytest.raises(ValueError) as ex: + MokshaTransformation().jq("") + assert ex.match("jq expression cannot be empty") + + with pytest.raises(ValueError) as ex: + MokshaTransformation().transon("") + assert ex.match("transon expression cannot be empty") diff --git a/tests/zyp/test_project.py b/tests/zyp/test_project.py new file mode 100644 index 0000000..1a7f755 --- /dev/null +++ b/tests/zyp/test_project.py @@ -0,0 +1,24 @@ +import sys + +import pytest +from zyp.model.collection import CollectionAddress, CollectionTransformation +from zyp.model.project import TransformationProject + + +@pytest.mark.skipif(sys.version_info < (3, 9), reason="Does not work on Python 3.8 and earlier") +def test_project_success(): + address = CollectionAddress(container="foo", name="bar") + ct = CollectionTransformation(address=address) + pt = TransformationProject().add(ct) + pt.to_yaml() + + pt = TransformationProject(collections=[ct]) + pt.to_yaml() + + assert pt.get(address) is ct + + +def test_project_failure(): + with pytest.raises(ValueError) as ex: + TransformationProject().add(CollectionTransformation()) + assert ex.match("CollectionTransformation or address missing") diff --git a/tests/zyp/test_util.py b/tests/zyp/test_util.py new file mode 100644 index 0000000..c5aa6ba --- /dev/null +++ b/tests/zyp/test_util.py @@ -0,0 +1,50 @@ +import jmespath +import jq +import jsonpointer +import pytest +import transon +from zyp.util.expression import compile_expression +from zyp.util.locator import to_pointer + + +def test_to_pointer_string(): + assert to_pointer("/") == jsonpointer.JsonPointer("/") + assert to_pointer("") == jsonpointer.JsonPointer("") + + +def test_to_pointer_jsonpointer(): + assert to_pointer(jsonpointer.JsonPointer("/")) == jsonpointer.JsonPointer("/") + + +def test_to_pointer_none(): + with pytest.raises(TypeError) as ex: + to_pointer(None) + assert ex.match("Value is not of type str or JsonPointer: NoneType") + + +def test_to_pointer_int(): + with pytest.raises(TypeError) as ex: + to_pointer(42) + assert ex.match("Value is not of type str or JsonPointer: int") + + +def test_compile_expression_jmes(): + transformer: jmespath.parser.ParsedResult = compile_expression(type="jmes", expression="@") + assert transformer.expression == "@" + assert transformer.parsed == {"type": "current", "children": []} + + +def test_compile_expression_jq(): + transformer: jq._Program = compile_expression(type="jq", expression=".") + assert transformer.program_string == "." + + +def test_compile_expression_transon(): + transformer: transon.Transformer = compile_expression(type="transon", expression={"$": "this"}) + assert transformer.template == {"$": "this"} + + +def test_compile_expression_unknown(): + with pytest.raises(TypeError) as ex: + compile_expression(type="foobar", expression=None) + assert ex.match("Compilation failed. Type must be either jmes or jq or transon: foobar") diff --git a/tests/zyp/transformation-bucket.json b/tests/zyp/transformation-bucket.json new file mode 100644 index 0000000..193ce40 --- /dev/null +++ b/tests/zyp/transformation-bucket.json @@ -0,0 +1,30 @@ +{ + "meta": { + "version": 1, + "type": "zyp-bucket" + }, + "schema": { + "rules": [ + { + "pointer": "/meta/date", + "type": "DATETIME" + } + ] + }, + "names": { + "rules": [ + { + "old": "_id", + "new": "id" + } + ] + }, + "values": { + "rules": [ + { + "pointer": "/meta/date", + "transformer": "to_unixtime" + } + ] + } +} diff --git a/tests/zyp/transformation-collection.yaml b/tests/zyp/transformation-collection.yaml new file mode 100644 index 0000000..5000aff --- /dev/null +++ b/tests/zyp/transformation-collection.yaml @@ -0,0 +1,22 @@ +meta: + version: 1 + type: zyp-collection +pre: + rules: + - expression: records[?not_null(meta.location) && !starts_with(meta.location, 'N')] + type: jmes +bucket: + names: + rules: + - new: id + old: _id + values: + rules: + - pointer: /id + transformer: builtins.int + - pointer: /data/value + transformer: builtins.float +post: + rules: + - expression: .[] |= (.data.value /= 100) + type: jq