diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index f86f441d6e1c..1974eeed8635 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -1406,7 +1406,7 @@ - name: S3 sourceDefinitionId: 69589781-7828-43c5-9f63-8925b1c1ccc2 dockerRepository: airbyte/source-s3 - dockerImageTag: 0.1.26 + dockerImageTag: 0.1.27 documentationUrl: https://docs.airbyte.com/integrations/sources/s3 icon: s3.svg sourceType: file diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index 2d5d9ca1621e..afd8e162220e 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -12506,7 +12506,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-s3:0.1.26" +- dockerImage: "airbyte/source-s3:0.1.27" spec: documentationUrl: "https://docs.airbyte.com/integrations/sources/s3" changelogUrl: "https://docs.airbyte.com/integrations/sources/s3" @@ -12639,6 +12639,8 @@ \ during schema detection, increasing this should solve it. Beware\ \ of raising this too high as you could hit OOM errors." default: 10000 + minimum: 1 + maximum: 2147483647 order: 9 type: "integer" - title: "Parquet" diff --git a/airbyte-integrations/connectors/source-s3/Dockerfile b/airbyte-integrations/connectors/source-s3/Dockerfile index c38be4e387ef..e97e475cc4a2 100644 --- a/airbyte-integrations/connectors/source-s3/Dockerfile +++ b/airbyte-integrations/connectors/source-s3/Dockerfile @@ -17,5 +17,5 @@ COPY source_s3 ./source_s3 ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.1.26 +LABEL io.airbyte.version=0.1.27 LABEL io.airbyte.name=airbyte/source-s3 diff --git a/airbyte-integrations/connectors/source-s3/acceptance-test-config.yml b/airbyte-integrations/connectors/source-s3/acceptance-test-config.yml index c85b3712cc75..e652a72ea4b8 100644 --- a/airbyte-integrations/connectors/source-s3/acceptance-test-config.yml +++ b/airbyte-integrations/connectors/source-s3/acceptance-test-config.yml @@ -4,6 +4,8 @@ connector_image: airbyte/source-s3:dev tests: spec: - spec_path: "integration_tests/spec.json" + backward_compatibility_tests_config: + disable_for_version: "0.1.26" connection: # for CSV format - config_path: "secrets/config.json" diff --git a/airbyte-integrations/connectors/source-s3/integration_tests/config_minio.json b/airbyte-integrations/connectors/source-s3/integration_tests/config_minio.json index 1a0863be9c8b..6198d09c452e 100644 --- a/airbyte-integrations/connectors/source-s3/integration_tests/config_minio.json +++ b/airbyte-integrations/connectors/source-s3/integration_tests/config_minio.json @@ -6,10 +6,13 @@ "aws_access_key_id": "123456", "aws_secret_access_key": "123456key", "path_prefix": "", - "endpoint": "http://10.0.229.255:9000" + "endpoint": "http://10.0.210.197:9000" }, "format": { - "filetype": "csv" + "filetype": "csv", + "delimiter": ",", + "quote_char": "'", + "encoding": "utf8" }, "path_pattern": "*.csv", "schema": "{}" diff --git a/airbyte-integrations/connectors/source-s3/integration_tests/config_minio.template.json b/airbyte-integrations/connectors/source-s3/integration_tests/config_minio.template.json index c551bfeb6685..574f5fb000e5 100644 --- a/airbyte-integrations/connectors/source-s3/integration_tests/config_minio.template.json +++ b/airbyte-integrations/connectors/source-s3/integration_tests/config_minio.template.json @@ -9,7 +9,10 @@ "endpoint": "http://:9000" }, "format": { - "filetype": "csv" + "filetype": "csv", + "delimiter": ",", + "quote_char": "'", + "encoding": "utf8" }, "path_pattern": "*.csv", "schema": "{}" diff --git a/airbyte-integrations/connectors/source-s3/integration_tests/spec.json b/airbyte-integrations/connectors/source-s3/integration_tests/spec.json index 417c48058501..2fcd27e2bcf9 100644 --- a/airbyte-integrations/connectors/source-s3/integration_tests/spec.json +++ b/airbyte-integrations/connectors/source-s3/integration_tests/spec.json @@ -110,6 +110,8 @@ "title": "Block Size", "description": "The chunk size in bytes to process at a time in memory from each file. If your data is particularly wide and failing during schema detection, increasing this should solve it. Beware of raising this too high as you could hit OOM errors.", "default": 10000, + "minimum": 1, + "maximum": 2147483647, "order": 9, "type": "integer" } diff --git a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/abstract_file_parser.py b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/abstract_file_parser.py index 44579b5a0999..84dc292e96b0 100644 --- a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/abstract_file_parser.py +++ b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/abstract_file_parser.py @@ -108,3 +108,6 @@ def json_schema_to_pyarrow_schema(cls, schema: Mapping[str, Any], reverse: bool :return: converted schema dict """ return {column: cls.json_type_to_pyarrow_type(json_type, reverse=reverse) for column, json_type in schema.items()} + + def _validate_config(self, config: Mapping[str, Any]): + pass diff --git a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/csv_parser.py b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/csv_parser.py index 9b832653b932..adb87b467220 100644 --- a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/csv_parser.py +++ b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/csv_parser.py @@ -2,6 +2,7 @@ # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # +import codecs import csv import json import tempfile @@ -50,6 +51,23 @@ def format(self) -> CsvFormat: self.format_model = CsvFormat.parse_obj(self._format) return self.format_model + def _validate_field_len(self, config: Mapping[str, Any], field_name: str): + if len(config.get("format", {}).get(field_name)) != 1: + raise ValueError(f"{field_name} should contain 1 character only") + + def _validate_config(self, config: Mapping[str, Any]): + if config.get("format", {}).get("filetype") == "csv": + self._validate_field_len(config, "delimiter") + if config.get("format", {}).get("delimiter") in ("\r", "\n"): + raise ValueError("Delimiter cannot be \r or \n") + + self._validate_field_len(config, "quote_char") + + if config.get("format", {}).get("escape_char"): + self._validate_field_len(config, "escape_char") + + codecs.lookup(config.get("format", {}).get("encoding")) + def _read_options(self) -> Mapping[str, str]: """ https://arrow.apache.org/docs/python/generated/pyarrow.csv.ReadOptions.html diff --git a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/csv_spec.py b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/csv_spec.py index 34da5af3e6df..f2c82d7a86c0 100644 --- a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/csv_spec.py +++ b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/csv_spec.py @@ -73,6 +73,8 @@ class Config: ) block_size: int = Field( default=10000, + ge=1, + le=2_147_483_647, # int32_t max description="The chunk size in bytes to process at a time in memory from each file. If your data is particularly wide and failing during schema detection, increasing this should solve it. Beware of raising this too high as you could hit OOM errors.", order=9, ) diff --git a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/source.py b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/source.py index 10aa6d094fdf..6d4a453f23bd 100644 --- a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/source.py +++ b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/source.py @@ -2,7 +2,6 @@ # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # - from abc import ABC, abstractmethod from traceback import format_exc from typing import Any, List, Mapping, Optional, Tuple @@ -59,7 +58,9 @@ def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> The error object will be cast to string to display the problem to the user. """ try: - for file_info in self.stream_class(**config).filepath_iterator(): + stream = self.stream_class(**config) + stream.fileformatparser_class(stream._format)._validate_config(config) + for file_info in stream.filepath_iterator(): # TODO: will need to split config.get("path_pattern") up by stream once supporting multiple streams # test that matching on the pattern doesn't error globmatch(file_info.key, config.get("path_pattern"), flags=GLOBSTAR | SPLIT) diff --git a/airbyte-integrations/connectors/source-s3/unit_tests/test_source.py b/airbyte-integrations/connectors/source-s3/unit_tests/test_source.py index f556412f77b0..680d303e0a36 100644 --- a/airbyte-integrations/connectors/source-s3/unit_tests/test_source.py +++ b/airbyte-integrations/connectors/source-s3/unit_tests/test_source.py @@ -38,13 +38,57 @@ def test_check_connection_exception(config): assert error_msg +@pytest.mark.parametrize( + "delimiter, quote_char, escape_char, encoding, error_type", + [ + ("string", "'", None, "utf8", ValueError), + ("\n", "'", None, "utf8", ValueError), + (",", ";,", None, "utf8", ValueError), + (",", "'", "escape", "utf8", ValueError), + (",", "'", None, "utf888", LookupError) + ], + ids=[ + "long_delimiter", + "forbidden_delimiter_symbol", + "long_quote_char", + "long_escape_char", + "unknown_encoding" + ], +) +def test_check_connection_csv_validation_exception(delimiter, quote_char, escape_char, encoding, error_type): + config = { + "dataset": "test", + "provider": { + "storage": "S3", + "bucket": "test-source-s3", + "aws_access_key_id": "key_id", + "aws_secret_access_key": "access_key", + "path_prefix": "" + }, + "path_pattern": "simple_test*.csv", + "schema": "{}", + "format": { + "filetype": "csv", + "delimiter": delimiter, + "quote_char": quote_char, + "escape_char": escape_char, + "encoding": encoding, + } + } + ok, error_msg = SourceS3().check_connection(logger, config=config) + + assert not ok + assert error_msg + assert isinstance(error_msg, error_type) + + def test_check_connection(config): instance = SourceS3() with patch.object(instance.stream_class, "filepath_iterator", MagicMock()): ok, error_msg = instance.check_connection(logger, config=config) - assert ok - assert not error_msg + assert not ok + assert error_msg def test_streams(config): diff --git a/docs/integrations/sources/s3.md b/docs/integrations/sources/s3.md index 401288e206e8..5b039fdd4b9e 100644 --- a/docs/integrations/sources/s3.md +++ b/docs/integrations/sources/s3.md @@ -209,7 +209,8 @@ The Jsonl parser uses pyarrow hence,only the line-delimited JSON format is suppo | Version | Date | Pull Request | Subject | |:--------|:-----------|:----------------------------------------------------------------------------------------------------------------|:----------------------------------------------------------------------------------------| -| 0.1.26 | 2022-11-08 | [19006](https://github.com/airbytehq/airbyte/pull/19006) | Add virtual-hosted-style option | +| 0.1.27 | 2022-12-08 | [20262](https://github.com/airbytehq/airbyte/pull/20262) | Check config settings for CSV file format | +| 0.1.26 | 2022-11-08 | [19006](https://github.com/airbytehq/airbyte/pull/19006) | Add virtual-hosted-style option | | 0.1.24 | 2022-10-28 | [18602](https://github.com/airbytehq/airbyte/pull/18602) | Wrap errors into AirbyteTracedException pointing to a problem file | | 0.1.23 | 2022-10-10 | [17991](https://github.com/airbytehq/airbyte/pull/17991) | Fix pyarrow to JSON schema type conversion for arrays | | 0.1.23 | 2022-10-10 | [17800](https://github.com/airbytehq/airbyte/pull/17800) | Deleted `use_ssl` and `verify_ssl_cert` flags and hardcoded to `True` |