Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Added an extensible API for stream schema sources #2876

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions samples/sample_tap_gitlab/gitlab_rest_streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from singer_sdk.authenticators import SimpleAuthenticator
from singer_sdk.pagination import SimpleHeaderPaginator
from singer_sdk.schema import LocalSchemaSource
from singer_sdk.streams.rest import RESTStream
from singer_sdk.typing import (
ArrayType,
Expand All @@ -19,7 +20,7 @@
StringType,
)

SCHEMAS_DIR = importlib.resources.files(__package__) / "schemas"
LOCAL_SCHEMAS = LocalSchemaSource(importlib.resources.files(__package__) / "schemas")

DEFAULT_URL_BASE = "https://gitlab.com/api/v4"

Expand Down Expand Up @@ -103,7 +104,7 @@ class ProjectsStream(ProjectBasedStream):
primary_keys = ("id",)
replication_key = "last_activity_at"
is_sorted = True
schema_filepath = SCHEMAS_DIR / "projects.json"
schema = LOCAL_SCHEMAS("projects")


class ReleasesStream(ProjectBasedStream):
Expand All @@ -113,7 +114,7 @@ class ReleasesStream(ProjectBasedStream):
path = "/projects/{project_id}/releases"
primary_keys = ("project_id", "tag_name")
replication_key = None
schema_filepath = SCHEMAS_DIR / "releases.json"
schema = LOCAL_SCHEMAS("releases")


class IssuesStream(ProjectBasedStream):
Expand All @@ -124,7 +125,7 @@ class IssuesStream(ProjectBasedStream):
primary_keys = ("id",)
replication_key = "updated_at"
is_sorted = False
schema_filepath = SCHEMAS_DIR / "issues.json"
schema = LOCAL_SCHEMAS("issues")


class CommitsStream(ProjectBasedStream):
Expand All @@ -137,7 +138,7 @@ class CommitsStream(ProjectBasedStream):
primary_keys = ("id",)
replication_key = "created_at"
is_sorted = False
schema_filepath = SCHEMAS_DIR / "commits.json"
schema = LOCAL_SCHEMAS("commits")


class EpicsStream(ProjectBasedStream):
Expand Down Expand Up @@ -202,7 +203,7 @@ class EpicIssuesStream(GitlabStream):
path = "/groups/{group_id}/epics/{epic_iid}/issues"
primary_keys = ("id",)
replication_key = None
schema_filepath = SCHEMAS_DIR / "epic_issues.json"
schema = LOCAL_SCHEMAS("epic_issues")
parent_stream_type = EpicsStream # Stream should wait for parents to complete.

def get_url_params(
Expand Down
108 changes: 108 additions & 0 deletions singer_sdk/schema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
"""Schema sources."""

from __future__ import annotations

import functools
import json
import sys
import typing as t
from pathlib import Path

import requests

from singer_sdk._singerlib import resolve_schema_references

if sys.version_info < (3, 12):
from importlib.abc import Traversable
else:
from importlib.resources.abc import Traversable


class BaseSchemaSource:
"""Base schema source."""

def __init__(self) -> None:
"""Initialize the schema source."""
self._registry: dict[str, dict] = {}

def get_schema(self, *args: t.Any, **kwargs: t.Any) -> dict:
"""Get schema from reference.

Raises:
NotImplementedError: If the method is not implemented by the subclass.
"""
msg = "Subclasses must implement this method."
raise NotImplementedError(msg)

def __call__(self, *args: t.Any, **kwargs: t.Any) -> dict:
"""Get schema for the given stream name or reference.

Returns:
The schema dictionary.
"""
return self.get_schema(*args, **kwargs)


class LocalSchemaSource(BaseSchemaSource):
"""Local schema source."""

def __init__(self, path: Path | Traversable) -> None:
"""Initialize the schema source."""
super().__init__()
self.path = path

def get_schema(self, name: str) -> dict:
"""Get schema from reference.

Args:
name: Name of the stream.

Returns:
The schema dictionary.
"""
if name not in self._registry:
schema_path = self.path / f"{name}.json"
self._registry[name] = json.loads(schema_path.read_text())

return self._registry[name]


class OpenAPISchemaSource(BaseSchemaSource):
"""OpenAPI schema source."""

def __init__(self, path: str | Path | Traversable) -> None:
"""Initialize the schema source."""
super().__init__()
self.path = path

@functools.cached_property
def spec_dict(self) -> dict:
"""OpenAPI spec dictionary.

Raises:
ValueError: If the path type is not supported.
"""
if isinstance(self.path, (Path, Traversable)):
return json.loads(self.path.read_text()) # type: ignore[no-any-return]

if self.path.startswith("http"):
return requests.get(self.path, timeout=10).json() # type: ignore[no-any-return]

msg = f"Unsupported path type: {self.path}"
raise ValueError(msg)

def get_schema(self, ref: str) -> dict:
"""Get schema from reference.

Args:
ref: Reference to the schema.

Returns:
The schema dictionary.
"""
if ref not in self._registry:
schema = {"$ref": f"#/components/schemas/{ref}"}
schema["components"] = self.spec_dict["components"]
self._registry[ref] = resolve_schema_references(schema)

return self._registry[ref]
34 changes: 34 additions & 0 deletions tests/_singerlib/test_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,40 @@
}


def test_simple_schema():
simple_schema = {
"title": "Longitude and Latitude Values",
"description": "A geographical coordinate.",
"required": ["latitude", "longitude"],
"type": "object",
"properties": {
"latitude": {"type": "number", "minimum": -90, "maximum": 90},
"longitude": {"type": "number", "minimum": -180, "maximum": 180},
},
}

schema_plus = Schema.from_dict(simple_schema)
assert schema_plus.to_dict() == simple_schema
assert schema_plus.required == ["latitude", "longitude"]
assert isinstance(schema_plus.properties["latitude"], Schema)
latitude = schema_plus.properties["latitude"]
assert latitude.type == "number"


def test_schema_with_items():
schema = {
"description": "A representation of a person, company, organization, or place",
"type": "object",
"properties": {"fruits": {"type": "array", "items": {"type": "string"}}},
}
schema_plus = Schema.from_dict(schema)
assert schema_plus.to_dict() == schema
assert isinstance(schema_plus.properties["fruits"], Schema)
fruits = schema_plus.properties["fruits"]
assert isinstance(fruits.items, Schema)
assert fruits.items.type == "string"


@pytest.mark.parametrize(
"schema,expected",
[
Expand Down
86 changes: 24 additions & 62 deletions tests/core/test_schema.py
Original file line number Diff line number Diff line change
@@ -1,70 +1,32 @@
"""
Testing that Schema can convert schemas lossless from and to dicts.

Schemas are taken from these examples;
https://json-schema.org/learn/miscellaneous-examples.html

NOTE: The following properties are not currently supported;
pattern
unevaluatedProperties
propertyNames
minProperties
maxProperties
prefixItems
contains
minContains
maxContains
minItems
maxItems
uniqueItems
enum
const
contentMediaType
contentEncoding
allOf
oneOf
not

Some of these could be trivially added (if they are SIMPLE_PROPERTIES.
Some might need more thinking if they can contain schemas (though, note that we also
treat 'additionalProperties', 'anyOf' and' patternProperties' as SIMPLE even though they
can contain schemas.
"""
"""Test the schema sources."""

from __future__ import annotations

from singer_sdk._singerlib import Schema
import typing as t

from singer_sdk.schema import LocalSchemaSource, OpenAPISchemaSource

if t.TYPE_CHECKING:
import pytest

def test_simple_schema():
simple_schema = {
"title": "Longitude and Latitude Values",
"description": "A geographical coordinate.",
"required": ["latitude", "longitude"],
"type": "object",
"properties": {
"latitude": {"type": "number", "minimum": -90, "maximum": 90},
"longitude": {"type": "number", "minimum": -180, "maximum": 180},
},
}

schema_plus = Schema.from_dict(simple_schema)
assert schema_plus.to_dict() == simple_schema
assert schema_plus.required == ["latitude", "longitude"]
assert isinstance(schema_plus.properties["latitude"], Schema)
latitude = schema_plus.properties["latitude"]
assert latitude.type == "number"
def test_local_schema_source(pytestconfig: pytest.Config):
schema_dir = pytestconfig.rootpath / "tests/fixtures/schemas"
schema_source = LocalSchemaSource(schema_dir)
schema = schema_source("user")
assert isinstance(schema, dict)
assert schema["type"] == "object"
assert "items" not in schema
assert "properties" in schema
assert "id" in schema["properties"]


def test_schema_with_items():
schema = {
"description": "A representation of a person, company, organization, or place",
"type": "object",
"properties": {"fruits": {"type": "array", "items": {"type": "string"}}},
}
schema_plus = Schema.from_dict(schema)
assert schema_plus.to_dict() == schema
assert isinstance(schema_plus.properties["fruits"], Schema)
fruits = schema_plus.properties["fruits"]
assert isinstance(fruits.items, Schema)
assert fruits.items.type == "string"
def test_openapi_schema_source(pytestconfig: pytest.Config):
openapi_path = pytestconfig.rootpath / "tests/fixtures/openapi.json"
schema_source = OpenAPISchemaSource(openapi_path)
schema = schema_source("ProjectListItem")
assert isinstance(schema, dict)
assert schema["type"] == "object"
assert "items" not in schema
assert "properties" in schema
assert "id" in schema["properties"]
Loading