diff --git a/meltano.yml b/meltano.yml index eda078c..20d5b4d 100644 --- a/meltano.yml +++ b/meltano.yml @@ -1,13 +1,13 @@ version: 1 send_anonymous_usage_stats: true -project_id: "tap-jaffle-shop" +project_id: tap-jaffle-shop default_environment: test environments: - name: test plugins: extractors: - - name: "tap-jaffle-shop" - namespace: "tap_jaffle_shop" + - name: tap-jaffle-shop + namespace: tap_jaffle_shop pip_url: -e . capabilities: - state @@ -15,16 +15,31 @@ plugins: - discover - about - stream-maps - config: - start_date: '2010-01-01T00:00:00Z' settings: - # TODO: To configure using Meltano, declare settings and their types here: - - name: username - - name: password - kind: password - - name: start_date - value: '2010-01-01T00:00:00Z' + - name: years + kind: integer + - name: stream_name_prefix + kind: string + - name: stream_maps + kind: object + - name: stream_map_config + kind: object + - name: batch_config + kind: object + # Uncomment to enable batch messaging: + # config: + # batch_config: + # encoding: + # format: jsonl + # compression: gzip + # storage: + # root: ./output/temp/ loaders: - name: target-jsonl variant: andyh1203 pip_url: target-jsonl + - name: target-duckdb + variant: jwills + pip_url: target-duckdb~=0.4 + config: + filepath: outdb.duckdb diff --git a/tap_jaffle_shop/client.py b/tap_jaffle_shop/client.py index a13e45e..ab19089 100644 --- a/tap_jaffle_shop/client.py +++ b/tap_jaffle_shop/client.py @@ -17,6 +17,8 @@ class JaffleShopStream(PandasStream, metaclass=abc.ABCMeta): for streams that are built on Pandas data frames. """ + batch_size = 100000 + def __init__( self, tap: Tap, diff --git a/tap_jaffle_shop/pandas.py b/tap_jaffle_shop/pandas.py index fbebfd3..4164759 100644 --- a/tap_jaffle_shop/pandas.py +++ b/tap_jaffle_shop/pandas.py @@ -28,6 +28,7 @@ def __init__( data to the target. (Optional.) """ self._dataframe: pd.DataFrame | None = None + self._schema: dict | None = None # type: ignore # TODO: Fix in SDK upstream super().__init__(tap=tap, name=name, schema=None) @abc.abstractmethod @@ -71,6 +72,8 @@ def schema(self) -> dict: Returns: A dictionary object describing the stream's JSON Schema. """ + if self._schema is not None: + return self._schema def _get_type(col: str) -> th.JSONTypeHelper: if str(col).endswith("_at"): @@ -82,9 +85,10 @@ def _get_type(col: str) -> th.JSONTypeHelper: self.dataframe.dtypes[col] )() - return th.PropertiesList( + self._schema = th.PropertiesList( *[th.Property(col, _get_type(col)) for col in self.dataframe.columns] ).to_dict() + return self._schema @classmethod def pandas_dtype_to_jsonschema_type(cls, dtype: str) -> Type[th.JSONTypeHelper]: diff --git a/tap_jaffle_shop/tap.py b/tap_jaffle_shop/tap.py index a15bd14..dc308c5 100644 --- a/tap_jaffle_shop/tap.py +++ b/tap_jaffle_shop/tap.py @@ -34,6 +34,39 @@ class TapJaffleShop(Tap): "between schema and table name." ), ), + th.Property( + "batch_config", + description=( + "If provided, specifies the storage and encoding parameters for " + "batch messaging to the target." + ), + wrapped=th.ObjectType( + th.Property( + "encoding", + description=( + "Specifies the format and compression of the batch files." + ), + wrapped=th.ObjectType( + th.Property("format", th.StringType, allowed_values=["jsonl"]), + th.Property( + "compression", + th.StringType, + allowed_values=["gzip", "none"], + ), + ), + ), + th.Property( + "storage", + description=( + "Defines the storage layer to use when writing batch files" + ), + wrapped=th.ObjectType( + th.Property("root", th.StringType), + th.Property("prefix", th.StringType), + ), + ), + ), + ), ).to_dict() def create_simulation(self):