Skip to content

Commit

Permalink
Enable low-code CDK users to specify schema in the manifest (#20375)
Browse files Browse the repository at this point in the history
Enable low-code CDK users to specify schema in the manifest

Also update documentation:
* Add inline schema loader info to yaml-overview.md
* Include inline schema info in tutorial
  • Loading branch information
clnoll authored Dec 13, 2022
1 parent ef624e8 commit 9dae098
Show file tree
Hide file tree
Showing 11 changed files with 77 additions and 6 deletions.
3 changes: 3 additions & 0 deletions airbyte-cdk/python/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 0.13.2
Low-code: Enable low-code CDK users to specify schema inline in the manifest

## 0.13.1
Low-code: Add `SessionTokenAuthenticator`

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@
},
{
"$ref": "#/definitions/DefaultSchemaLoader"
},
{
"$ref": "#/definitions/InlineSchemaLoader"
}
]
},
Expand All @@ -93,6 +96,9 @@
},
{
"$ref": "#/definitions/DefaultSchemaLoader"
},
{
"$ref": "#/definitions/InlineSchemaLoader"
}
]
},
Expand Down Expand Up @@ -1609,6 +1615,11 @@
],
"description": "\n Loads a schema from the default location or returns an empty schema for streams that have not defined their schema file yet.\n\n Attributes:\n config (Config): The user-provided configuration as specified by the source's spec\n options (Mapping[str, Any]): Additional arguments to pass to the string interpolation if needed\n "
},
"InlineSchemaLoader": {
"type": "object",
"properties": {},
"description": "Loads a schema from the manifest, if provided."
},
"AddFields": {
"allOf": [
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class ConcreteDeclarativeSource(JsonSchemaMixin):
class ManifestDeclarativeSource(DeclarativeSource):
"""Declarative source defined by a manifest of low-code components that define source connector behavior"""

VALID_TOP_LEVEL_FIELDS = {"check", "definitions", "spec", "streams", "version"}
VALID_TOP_LEVEL_FIELDS = {"check", "definitions", "schemas", "spec", "streams", "version"}

def __init__(self, source_config: ConnectionDefinition, debug: bool = False):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
from airbyte_cdk.sources.declarative.requesters.paginators.strategies.offset_increment import OffsetIncrement
from airbyte_cdk.sources.declarative.requesters.paginators.strategies.page_increment import PageIncrement
from airbyte_cdk.sources.declarative.retrievers.simple_retriever import SimpleRetriever
from airbyte_cdk.sources.declarative.schema.inline_schema_loader import InlineSchemaLoader
from airbyte_cdk.sources.declarative.schema.json_file_schema_loader import JsonFileSchemaLoader
from airbyte_cdk.sources.declarative.spec import Spec
from airbyte_cdk.sources.declarative.stream_slicers.cartesian_product_stream_slicer import CartesianProductStreamSlicer
Expand Down Expand Up @@ -66,6 +67,7 @@
"DpathExtractor": DpathExtractor,
"ExponentialBackoffStrategy": ExponentialBackoffStrategy,
"HttpRequester": HttpRequester,
"InlineSchemaLoader": InlineSchemaLoader,
"InterpolatedBoolean": InterpolatedBoolean,
"InterpolatedString": InterpolatedString,
"JsonSchema": JsonFileSchemaLoader, # todo remove after hacktoberfest and update connectors to use JsonFileSchemaLoader
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
#

from airbyte_cdk.sources.declarative.schema.default_schema_loader import DefaultSchemaLoader
from airbyte_cdk.sources.declarative.schema.inline_schema_loader import InlineSchemaLoader
from airbyte_cdk.sources.declarative.schema.json_file_schema_loader import JsonFileSchemaLoader
from airbyte_cdk.sources.declarative.schema.schema_loader import SchemaLoader

__all__ = ["JsonFileSchemaLoader", "DefaultSchemaLoader", "SchemaLoader"]
__all__ = ["JsonFileSchemaLoader", "DefaultSchemaLoader", "SchemaLoader", "InlineSchemaLoader"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from dataclasses import InitVar, dataclass
from typing import Any, Dict, Mapping

from airbyte_cdk.sources.declarative.schema.schema_loader import SchemaLoader


@dataclass
class InlineSchemaLoader(SchemaLoader):
"""Describes a stream's schema"""

schema: Dict[str, Any]
options: InitVar[Mapping[str, Any]]

def get_json_schema(self) -> Mapping[str, Any]:
return self.schema
2 changes: 1 addition & 1 deletion airbyte-cdk/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

setup(
name="airbyte-cdk",
version="0.13.1",
version="0.13.2",
description="A framework for writing Airbyte Connectors.",
long_description=README,
long_description_content_type="text/markdown",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

import pytest
from airbyte_cdk.sources.declarative.schema import InlineSchemaLoader


@pytest.mark.parametrize(
"test_name, input_schema, expected_schema",
[
("schema", {"k": "string"}, {"k": "string"}),
("empty_schema", {}, {}),
],
)
def test_static_schema_loads(test_name, input_schema, expected_schema):
schema_loader = InlineSchemaLoader(input_schema, {})

assert schema_loader.get_json_schema() == expected_schema
9 changes: 9 additions & 0 deletions docs/connector-development/config-based/source_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,15 @@ definitions:
"$ref": "#/definitions/RecordTransformation"
checkpoint_interval:
type: integer
InlineSchemaLoader:
type: object
required:
- schema
properties:
"$options":
"$ref": "#/definitions/$options"
schema:
type: object
PrimaryKey:
type: string
Retriever:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ rm source_exchange_rates_tutorial/schemas/customers.json
rm source_exchange_rates_tutorial/schemas/employees.json
```

As an alternative to storing the stream's data schema to the `schemas/` directory, we can store it inline in the YAML file, by including the optional `schema_loader` key and associated schema in the entry for each stream. More information on how to define a stream's schema in the YAML file can be found [here](../understanding-the-yaml-file/yaml-overview.md).

Reading from the source can be done by running the `read` operation

```bash
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@ The low-code framework involves editing a boilerplate [YAML file](../low-code-cd
Streams define the schema of the data to sync, as well as how to read it from the underlying API source.
A stream generally corresponds to a resource within the API. They are analogous to tables for a relational database source.

A stream's schema will can defined as a [JSONSchema](https://json-schema.org/) file in `<source_connector_name>/schemas/<stream_name>.json`.
By default, the schema of a stream's data is defined as a [JSONSchema](https://json-schema.org/) file in `<source_connector_name>/schemas/<stream_name>.json`.

Alternately, the stream's data schema can be stored in YAML format inline in the YAML file, by including the optional `schema_loader` key. If the data schema is provided inline, any schema on disk for that stream will be ignored.

More information on how to define a stream's schema can be found [here](../source_schema.yaml)

The schema of a stream object is:
The stream object is represented in the YAML file as:

```yaml
Stream:
Expand All @@ -34,6 +37,8 @@ The schema of a stream object is:
"$ref": "#/definitions/RecordTransformation"
checkpoint_interval:
type: integer
schema_loader:
"$ref": "#/definitions/InlineSchemaLoader"
```
More details on streams and sources can be found in the [basic concepts section](../../cdk-python/basic-concepts.md).
Expand Down Expand Up @@ -99,4 +104,4 @@ More information on `DatetimeStreamSlicer` can be found in the [stream slicers](
## More readings

- [Requester](./requester.md)
- [Stream slicers](./stream-slicers.md)
- [Stream slicers](./stream-slicers.md)

0 comments on commit 9dae098

Please sign in to comment.