diff --git a/ydb/library/yql/providers/s3/actors/yql_arrow_column_converters.cpp b/ydb/library/yql/providers/s3/actors/yql_arrow_column_converters.cpp index 33d526d07cb7..ce132d3c909a 100644 --- a/ydb/library/yql/providers/s3/actors/yql_arrow_column_converters.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_arrow_column_converters.cpp @@ -180,6 +180,37 @@ std::shared_ptr ArrowTypeAsYqlTimestamp(const std::shared_ptr +std::shared_ptr ArrowTypeAsYqlString(const std::shared_ptr& targetType, const std::shared_ptr& value, ui64 multiplier, const TString& format = {}) { + ::NYql::NUdf::TStringArrayBuilder builder(NKikimr::NMiniKQL::TTypeInfoHelper(), targetType, *arrow::system_memory_pool(), value->length()); + ::NYql::NUdf::TFixedSizeBlockReader reader; + for (i64 i = 0; i < value->length(); ++i) { + const NUdf::TBlockItem item = reader.GetItem(*value->data(), i); + if constexpr (isOptional) { + if (!item) { + builder.Add(item); + continue; + } + } else if (!item) { + throw parquet::ParquetException(TStringBuilder() << "null value for timestamp could not be represented in non-optional type"); + } + + const TArrowType baseValue = item.As(); + if (baseValue < 0 && baseValue > static_cast(::NYql::NUdf::MAX_TIMESTAMP)) { + throw parquet::ParquetException(TStringBuilder() << "timestamp in parquet is out of range [0, " << ::NYql::NUdf::MAX_TIMESTAMP << "]: " << baseValue); + } + + if (static_cast(baseValue) > ::NYql::NUdf::MAX_TIMESTAMP / multiplier) { + throw parquet::ParquetException(TStringBuilder() << "timestamp in parquet is out of range [0, " << ::NYql::NUdf::MAX_TIMESTAMP << "] after transformation: " << baseValue); + } + + const ui64 v = baseValue * multiplier; + TString result = format ? TInstant::FromValue(v).FormatGmTime(format.c_str()) : TInstant::FromValue(v).ToString(); + builder.Add(NUdf::TBlockItem(NUdf::TStringRef(result.c_str(), result.Size()))); + } + return builder.Build(true).make_array(); +} + template std::shared_ptr ArrowStringAsYqlTimestamp(const std::shared_ptr& targetType, const std::shared_ptr& value, const NDB::FormatSettings& formatSettings) { ::NYql::NUdf::TFixedSizeArrayBuilder builder(NKikimr::NMiniKQL::TTypeInfoHelper(), targetType, *arrow::system_memory_pool(), value->length()); @@ -373,6 +404,30 @@ TColumnConverter ArrowTimestampAsYqlTimestamp(const std::shared_ptr& targetType, bool isOptional, arrow::TimeUnit::type timeUnit) { + return [targetType, isOptional, multiplier=GetMultiplierForTimestamp(timeUnit)](const std::shared_ptr& value) { + return isOptional + ? ArrowTypeAsYqlString(targetType, value, multiplier) + : ArrowTypeAsYqlString(targetType, value, multiplier); + }; +} + +TColumnConverter ArrowDate64AsYqlString(const std::shared_ptr& targetType, bool isOptional, arrow::DateUnit dateUnit) { + return [targetType, isOptional, multiplier=GetMultiplierForDatetime(dateUnit)](const std::shared_ptr& value) { + return isOptional + ? ArrowTypeAsYqlString(targetType, value, multiplier, "%Y-%m-%d") + : ArrowTypeAsYqlString(targetType, value, multiplier, "%Y-%m-%d"); + }; +} + +TColumnConverter ArrowDate32AsYqlString(const std::shared_ptr& targetType, bool isOptional, arrow::DateUnit dateUnit) { + return [targetType, isOptional, multiplier=GetMultiplierForTimestamp(dateUnit)](const std::shared_ptr& value) { + return isOptional + ? ArrowTypeAsYqlString(targetType, value, multiplier, "%Y-%m-%d") + : ArrowTypeAsYqlString(targetType, value, multiplier, "%Y-%m-%d"); + }; +} + TColumnConverter ArrowDate32AsYqlDate(const std::shared_ptr& targetType, bool isOptional, arrow::DateUnit unit) { if (unit == arrow::DateUnit::MILLI) { throw parquet::ParquetException(TStringBuilder() << "millisecond accuracy does not fit into the date"); @@ -457,6 +512,9 @@ TColumnConverter BuildCustomConverter(const std::shared_ptr& or return ArrowDate32AsYqlDatetime(targetType, isOptional, dateType.unit()); case NUdf::EDataSlot::Timestamp: return ArrowDate32AsYqlTimestamp(targetType, isOptional, dateType.unit()); + case NUdf::EDataSlot::String: + case NUdf::EDataSlot::Utf8: + return ArrowDate32AsYqlString(targetType, isOptional, dateType.unit()); default: return {}; } @@ -469,6 +527,9 @@ TColumnConverter BuildCustomConverter(const std::shared_ptr& or return ArrowDate64AsYqlDatetime(targetType, isOptional, dateType.unit()); case NUdf::EDataSlot::Timestamp: return ArrowDate64AsYqlTimestamp(targetType, isOptional, dateType.unit()); + case NUdf::EDataSlot::String: + case NUdf::EDataSlot::Utf8: + return ArrowDate64AsYqlString(targetType, isOptional, dateType.unit()); default: return {}; } @@ -480,10 +541,14 @@ TColumnConverter BuildCustomConverter(const std::shared_ptr& or return ArrowTimestampAsYqlDatetime(targetType, isOptional, timestampType.unit()); case NUdf::EDataSlot::Timestamp: return ArrowTimestampAsYqlTimestamp(targetType, isOptional, timestampType.unit()); + case NUdf::EDataSlot::String: + case NUdf::EDataSlot::Utf8: + return ArrowTimestampAsYqlString(targetType, isOptional, timestampType.unit()); default: return {}; } } + case arrow::Type::STRING: case arrow::Type::BINARY: { switch (slotItem) { case NUdf::EDataSlot::Datetime: diff --git a/ydb/tests/fq/s3/test_format_setting.py b/ydb/tests/fq/s3/test_format_setting.py index bbdb61e04d52..4fd8161a9bad 100644 --- a/ydb/tests/fq/s3/test_format_setting.py +++ b/ydb/tests/fq/s3/test_format_setting.py @@ -914,8 +914,8 @@ def test_parquet_converters_to_timestamp(self, kikimr, s3, client): client.create_storage_connection(storage_connection_name, "fbucket") sql = f''' - $result = SELECT - `fruit`, `ts` + SELECT + `fruit`, CAST(`ts` as String) FROM `{storage_connection_name}`.`/{filename}` WITH (FORMAT="parquet", @@ -923,18 +923,15 @@ def test_parquet_converters_to_timestamp(self, kikimr, s3, client): `fruit` Utf8 NOT NULL, `ts` Timestamp )); - - SELECT Ensure( - 0, - `fruit` = "apple" and `ts` = CAST("2024-04-02T12:01:00.000Z" as Timestamp), - "invalid row: " || Unwrap(`fruit`) || " " || Unwrap(CAST(`ts` as String)) - ) AS value FROM $result; ''' query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) data = client.get_result_data(query_id, limit=50) - assert len(data.result.result_set.rows) == 1, "invalid count rows" + rows = data.result.result_set.rows + assert len(rows) == 1, "invalid count rows" + assert rows[0].items[0].text_value == "apple" + assert rows[0].items[1].bytes_value == b"2024-04-02T12:01:00Z" # timestamp[us] -> Timestamp @@ -954,7 +951,56 @@ def test_parquet_converters_to_timestamp(self, kikimr, s3, client): query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) data = client.get_result_data(query_id, limit=50) - assert len(data.result.result_set.rows) == 1, "invalid count rows" + rows = data.result.result_set.rows + assert len(rows) == 1, "invalid count rows" + assert rows[0].items[0].text_value == "apple" + assert rows[0].items[1].bytes_value == b"2024-04-02T12:01:00Z" + + # string -> Timestamp + + # 2024-04-02T12:01:00.000Z + data = [['apple'], ['2024-04-02 12:01:00']] + + # Define the schema for the data + schema = pa.schema([ + ('fruit', pa.string()), + ('ts', pa.string()) + ]) + + table = pa.Table.from_arrays(data, schema=schema) + pq.write_table(table, yatest_common.work_path(filename)) + s3_helpers.create_bucket_and_upload_file(filename, s3.s3_url, "fbucket", yatest_common.work_path()) + + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) + data = client.get_result_data(query_id, limit=50) + rows = data.result.result_set.rows + assert len(rows) == 1, "invalid count rows" + assert rows[0].items[0].text_value == "apple" + assert rows[0].items[1].bytes_value == b"2024-04-02T12:01:00Z" + + # utf8 -> Timestamp + + # 2024-04-02T12:01:00.000Z + data = [['apple'], ['2024-04-02 12:01:00']] + + # Define the schema for the data + schema = pa.schema([ + ('fruit', pa.string()), + ('ts', pa.utf8()) + ]) + + table = pa.Table.from_arrays(data, schema=schema) + pq.write_table(table, yatest_common.work_path(filename)) + s3_helpers.create_bucket_and_upload_file(filename, s3.s3_url, "fbucket", yatest_common.work_path()) + + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) + data = client.get_result_data(query_id, limit=50) + rows = data.result.result_set.rows + assert len(rows) == 1, "invalid count rows" + assert rows[0].items[0].text_value == "apple" + assert rows[0].items[1].bytes_value == b"2024-04-02T12:01:00Z" # timestamp[s] -> Timestamp @@ -975,7 +1021,10 @@ def test_parquet_converters_to_timestamp(self, kikimr, s3, client): query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) data = client.get_result_data(query_id, limit=50) - assert len(data.result.result_set.rows) == 1, "invalid count rows" + rows = data.result.result_set.rows + assert len(rows) == 1, "invalid count rows" + assert rows[0].items[0].text_value == "apple" + assert rows[0].items[1].bytes_value == b"2024-04-02T12:01:00Z" # timestamp[ns] -> Timestamp @@ -996,7 +1045,10 @@ def test_parquet_converters_to_timestamp(self, kikimr, s3, client): query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) data = client.get_result_data(query_id, limit=50) - assert len(data.result.result_set.rows) == 1, "invalid count rows" + rows = data.result.result_set.rows + assert len(rows) == 1, "invalid count rows" + assert rows[0].items[0].text_value == "apple" + assert rows[0].items[1].bytes_value == b"2024-04-02T12:01:00Z" # date64 -> Timestamp @@ -1013,27 +1065,13 @@ def test_parquet_converters_to_timestamp(self, kikimr, s3, client): pq.write_table(table, yatest_common.work_path(filename)) s3_helpers.create_bucket_and_upload_file(filename, s3.s3_url, "fbucket", yatest_common.work_path()) - sql = f''' - $result = SELECT - `fruit`, `ts` - FROM - `{storage_connection_name}`.`/{filename}` - WITH (FORMAT="parquet", - SCHEMA=( - `fruit` Utf8 NOT NULL, - `ts` Timestamp - )); - SELECT Ensure( - 0, - `fruit` = "apple" and `ts` = CAST("2024-04-02T00:00:00.000Z" as Timestamp), - "invalid row: " || Unwrap(`fruit`) || " " || Unwrap(CAST(`ts` as String)) - ) AS value FROM $result; - ''' - query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) data = client.get_result_data(query_id, limit=50) - assert len(data.result.result_set.rows) == 1, "invalid count rows" + rows = data.result.result_set.rows + assert len(rows) == 1, "invalid count rows" + assert rows[0].items[0].text_value == "apple" + assert rows[0].items[1].bytes_value == b"2024-04-02T00:00:00Z" # date32 -> Timestamp @@ -1053,7 +1091,10 @@ def test_parquet_converters_to_timestamp(self, kikimr, s3, client): query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) data = client.get_result_data(query_id, limit=50) - assert len(data.result.result_set.rows) == 1, "invalid count rows" + rows = data.result.result_set.rows + assert len(rows) == 1, "invalid count rows" + assert rows[0].items[0].text_value == "apple" + assert rows[0].items[1].bytes_value == b"2024-04-02T00:00:00Z" # int32 [UNIX_TIME_SECONDS] -> Timestamp @@ -1071,8 +1112,8 @@ def test_parquet_converters_to_timestamp(self, kikimr, s3, client): s3_helpers.create_bucket_and_upload_file(filename, s3.s3_url, "fbucket", yatest_common.work_path()) sql = f''' - $result = SELECT - `fruit`, `ts` + SELECT + `fruit`, CAST(`ts` as String) FROM `{storage_connection_name}`.`/{filename}` WITH (FORMAT="parquet", @@ -1081,17 +1122,15 @@ def test_parquet_converters_to_timestamp(self, kikimr, s3, client): `ts` Timestamp ), `data.timestamp.format_name`="UNIX_TIME_SECONDS"); - SELECT Ensure( - 0, - `fruit` = "apple" and `ts` = CAST("2024-04-02T12:01:00.000Z" as Timestamp), - "invalid row: " || Unwrap(`fruit`) || " " || Unwrap(CAST(`ts` as String)) - ) AS value FROM $result; ''' query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) data = client.get_result_data(query_id, limit=50) - assert len(data.result.result_set.rows) == 1, "invalid count rows" + rows = data.result.result_set.rows + assert len(rows) == 1, "invalid count rows" + assert rows[0].items[0].text_value == "apple" + assert rows[0].items[1].bytes_value == b"2024-04-02T12:01:00Z" # int64 [UNIX_TIME_SECONDS] -> Timestamp @@ -1109,8 +1148,8 @@ def test_parquet_converters_to_timestamp(self, kikimr, s3, client): s3_helpers.create_bucket_and_upload_file(filename, s3.s3_url, "fbucket", yatest_common.work_path()) sql = f''' - $result = SELECT - `fruit`, `ts` + SELECT + `fruit`, CAST(`ts` as String) FROM `{storage_connection_name}`.`/{filename}` WITH (FORMAT="parquet", @@ -1119,17 +1158,15 @@ def test_parquet_converters_to_timestamp(self, kikimr, s3, client): `ts` Timestamp ), `data.timestamp.format_name`="UNIX_TIME_SECONDS"); - SELECT Ensure( - 0, - `fruit` = "apple" and `ts` = CAST("2024-04-02T12:01:00.000Z" as Timestamp), - "invalid row: " || Unwrap(`fruit`) || " " || Unwrap(CAST(`ts` as String)) - ) AS value FROM $result; ''' query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) data = client.get_result_data(query_id, limit=50) - assert len(data.result.result_set.rows) == 1, "invalid count rows" + rows = data.result.result_set.rows + assert len(rows) == 1, "invalid count rows" + assert rows[0].items[0].text_value == "apple" + assert rows[0].items[1].bytes_value == b"2024-04-02T12:01:00Z" # int64 [UNIX_TIME_MILLISECONDS] -> Timestamp @@ -1147,8 +1184,8 @@ def test_parquet_converters_to_timestamp(self, kikimr, s3, client): s3_helpers.create_bucket_and_upload_file(filename, s3.s3_url, "fbucket", yatest_common.work_path()) sql = f''' - $result = SELECT - `fruit`, `ts` + SELECT + `fruit`, CAST(`ts` as String) FROM `{storage_connection_name}`.`/{filename}` WITH (FORMAT="parquet", @@ -1157,17 +1194,15 @@ def test_parquet_converters_to_timestamp(self, kikimr, s3, client): `ts` Timestamp ), `data.timestamp.format_name`="UNIX_TIME_MILLISECONDS"); - SELECT Ensure( - 0, - `fruit` = "apple" and `ts` = CAST("2024-04-02T12:01:00.000Z" as Timestamp), - "invalid row: " || Unwrap(`fruit`) || " " || Unwrap(CAST(`ts` as String)) - ) AS value FROM $result; ''' query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) data = client.get_result_data(query_id, limit=50) - assert len(data.result.result_set.rows) == 1, "invalid count rows" + rows = data.result.result_set.rows + assert len(rows) == 1, "invalid count rows" + assert rows[0].items[0].text_value == "apple" + assert rows[0].items[1].bytes_value == b"2024-04-02T12:01:00Z" # int64 [UNIX_TIME_MICROSECONDS] -> Timestamp @@ -1185,8 +1220,8 @@ def test_parquet_converters_to_timestamp(self, kikimr, s3, client): s3_helpers.create_bucket_and_upload_file(filename, s3.s3_url, "fbucket", yatest_common.work_path()) sql = f''' - $result = SELECT - `fruit`, `ts` + SELECT + `fruit`, CAST(`ts` as String) FROM `{storage_connection_name}`.`/{filename}` WITH (FORMAT="parquet", @@ -1195,17 +1230,15 @@ def test_parquet_converters_to_timestamp(self, kikimr, s3, client): `ts` Timestamp ), `data.timestamp.format_name`="UNIX_TIME_MICROSECONDS"); - SELECT Ensure( - 0, - `fruit` = "apple" and `ts` = CAST("2024-04-02T12:01:00.000Z" as Timestamp), - "invalid row: " || Unwrap(`fruit`) || " " || Unwrap(CAST(`ts` as String)) - ) AS value FROM $result; ''' query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) data = client.get_result_data(query_id, limit=50) - assert len(data.result.result_set.rows) == 1, "invalid count rows" + rows = data.result.result_set.rows + assert len(rows) == 1, "invalid count rows" + assert rows[0].items[0].text_value == "apple" + assert rows[0].items[1].bytes_value == b"2024-04-02T12:01:00Z" # uint32 [UNIX_TIME_SECONDS] -> Timestamp @@ -1223,8 +1256,8 @@ def test_parquet_converters_to_timestamp(self, kikimr, s3, client): s3_helpers.create_bucket_and_upload_file(filename, s3.s3_url, "fbucket", yatest_common.work_path()) sql = f''' - $result = SELECT - `fruit`, `ts` + SELECT + `fruit`, CAST(`ts` as String) FROM `{storage_connection_name}`.`/{filename}` WITH (FORMAT="parquet", @@ -1233,17 +1266,15 @@ def test_parquet_converters_to_timestamp(self, kikimr, s3, client): `ts` Timestamp ), `data.timestamp.format_name`="UNIX_TIME_SECONDS"); - SELECT Ensure( - 0, - `fruit` = "apple" and `ts` = CAST("2024-04-02T12:01:00.000Z" as Timestamp), - "invalid row: " || Unwrap(`fruit`) || " " || Unwrap(CAST(`ts` as String)) - ) AS value FROM $result; ''' query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) data = client.get_result_data(query_id, limit=50) - assert len(data.result.result_set.rows) == 1, "invalid count rows" + rows = data.result.result_set.rows + assert len(rows) == 1, "invalid count rows" + assert rows[0].items[0].text_value == "apple" + assert rows[0].items[1].bytes_value == b"2024-04-02T12:01:00Z" # uint64 [UNIX_TIME_SECONDS] -> Timestamp @@ -1260,28 +1291,13 @@ def test_parquet_converters_to_timestamp(self, kikimr, s3, client): pq.write_table(table, yatest_common.work_path(filename)) s3_helpers.create_bucket_and_upload_file(filename, s3.s3_url, "fbucket", yatest_common.work_path()) - sql = f''' - $result = SELECT - `fruit`, `ts` - FROM - `{storage_connection_name}`.`/{filename}` - WITH (FORMAT="parquet", - SCHEMA=( - `fruit` Utf8 NOT NULL, - `ts` Timestamp - ), - `data.timestamp.format_name`="UNIX_TIME_SECONDS"); - SELECT Ensure( - 0, - `fruit` = "apple" and `ts` = CAST("2024-04-02T12:01:00.000Z" as Timestamp), - "invalid row: " || Unwrap(`fruit`) || " " || Unwrap(CAST(`ts` as String)) - ) AS value FROM $result; - ''' - query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) data = client.get_result_data(query_id, limit=50) - assert len(data.result.result_set.rows) == 1, "invalid count rows" + rows = data.result.result_set.rows + assert len(rows) == 1, "invalid count rows" + assert rows[0].items[0].text_value == "apple" + assert rows[0].items[1].bytes_value == b"2024-04-02T12:01:00Z" # uint64 [UNIX_TIME_MILLISECONDS] -> Timestamp @@ -1299,8 +1315,8 @@ def test_parquet_converters_to_timestamp(self, kikimr, s3, client): s3_helpers.create_bucket_and_upload_file(filename, s3.s3_url, "fbucket", yatest_common.work_path()) sql = f''' - $result = SELECT - `fruit`, `ts` + SELECT + `fruit`, CAST(`ts` as String) FROM `{storage_connection_name}`.`/{filename}` WITH (FORMAT="parquet", @@ -1309,17 +1325,15 @@ def test_parquet_converters_to_timestamp(self, kikimr, s3, client): `ts` Timestamp ), `data.timestamp.format_name`="UNIX_TIME_MILLISECONDS"); - SELECT Ensure( - 0, - `fruit` = "apple" and `ts` = CAST("2024-04-02T12:01:00.000Z" as Timestamp), - "invalid row: " || Unwrap(`fruit`) || " " || Unwrap(CAST(`ts` as String)) - ) AS value FROM $result; ''' query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) data = client.get_result_data(query_id, limit=50) - assert len(data.result.result_set.rows) == 1, "invalid count rows" + rows = data.result.result_set.rows + assert len(rows) == 1, "invalid count rows" + assert rows[0].items[0].text_value == "apple" + assert rows[0].items[1].bytes_value == b"2024-04-02T12:01:00Z" # uint64 [UNIX_TIME_MICROSECONDS] -> Timestamp @@ -1337,8 +1351,8 @@ def test_parquet_converters_to_timestamp(self, kikimr, s3, client): s3_helpers.create_bucket_and_upload_file(filename, s3.s3_url, "fbucket", yatest_common.work_path()) sql = f''' - $result = SELECT - `fruit`, `ts` + SELECT + `fruit`, CAST(`ts` as String) FROM `{storage_connection_name}`.`/{filename}` WITH (FORMAT="parquet", @@ -1347,17 +1361,15 @@ def test_parquet_converters_to_timestamp(self, kikimr, s3, client): `ts` Timestamp ), `data.timestamp.format_name`="UNIX_TIME_MICROSECONDS"); - SELECT Ensure( - 0, - `fruit` = "apple" and `ts` = CAST("2024-04-02T12:01:00.000Z" as Timestamp), - "invalid row: " || Unwrap(`fruit`) || " " || Unwrap(CAST(`ts` as String)) - ) AS value FROM $result; ''' query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) data = client.get_result_data(query_id, limit=50) - assert len(data.result.result_set.rows) == 1, "invalid count rows" + rows = data.result.result_set.rows + assert len(rows) == 1, "invalid count rows" + assert rows[0].items[0].text_value == "apple" + assert rows[0].items[1].bytes_value == b"2024-04-02T12:01:00Z" # uint16 [default] -> Timestamp @@ -1375,8 +1387,8 @@ def test_parquet_converters_to_timestamp(self, kikimr, s3, client): s3_helpers.create_bucket_and_upload_file(filename, s3.s3_url, "fbucket", yatest_common.work_path()) sql = f''' - $result = SELECT - `fruit`, `ts` + SELECT + `fruit`, CAST(`ts` as String) FROM `{storage_connection_name}`.`/{filename}` WITH (FORMAT="parquet", @@ -1384,17 +1396,15 @@ def test_parquet_converters_to_timestamp(self, kikimr, s3, client): `fruit` Utf8 NOT NULL, `ts` Timestamp )); - SELECT Ensure( - 0, - `fruit` = "apple" and `ts` = CAST("2024-04-02T00:00:00.000Z" as Timestamp), - "invalid row: " || Unwrap(`fruit`) || " " || Unwrap(CAST(`ts` as String)) - ) AS value FROM $result; ''' query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) data = client.get_result_data(query_id, limit=50) - assert len(data.result.result_set.rows) == 1, "invalid count rows" + rows = data.result.result_set.rows + assert len(rows) == 1, "invalid count rows" + assert rows[0].items[0].text_value == "apple" + assert rows[0].items[1].bytes_value == b"2024-04-02T00:00:00Z" @yq_all def test_parquet_converters_to_datetime(self, kikimr, s3, client): @@ -1420,8 +1430,8 @@ def test_parquet_converters_to_datetime(self, kikimr, s3, client): client.create_storage_connection(storage_connection_name, "fbucket") sql = f''' - $result = SELECT - `fruit`, `ts` + SELECT + `fruit`, CAST(`ts` as String) FROM `{storage_connection_name}`.`/{filename}` WITH (FORMAT="parquet", @@ -1429,12 +1439,6 @@ def test_parquet_converters_to_datetime(self, kikimr, s3, client): `fruit` Utf8 NOT NULL, `ts` Datetime )); - - SELECT Ensure( - 0, - `fruit` = "apple" and `ts` = CAST("2024-04-02T00:00:00Z" as Datetime), - "invalid row: " || Unwrap(`fruit`) || " " || Unwrap(CAST(`ts` as String)) - ) AS value FROM $result; ''' query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id @@ -1521,27 +1525,13 @@ def test_parquet_converters_to_datetime(self, kikimr, s3, client): pq.write_table(table, yatest_common.work_path(filename)) s3_helpers.create_bucket_and_upload_file(filename, s3.s3_url, "fbucket", yatest_common.work_path()) - sql = f''' - $result = SELECT - `fruit`, `ts` - FROM - `{storage_connection_name}`.`/{filename}` - WITH (FORMAT="parquet", - SCHEMA=( - `fruit` Utf8 NOT NULL, - `ts` Datetime - )); - SELECT Ensure( - 0, - `fruit` = "apple" and `ts` = CAST("2024-04-02T00:00:00Z" as Datetime), - "invalid row: " || Unwrap(`fruit`) || " " || Unwrap(CAST(`ts` as String)) - ) AS value FROM $result; - ''' - query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) data = client.get_result_data(query_id, limit=50) - assert len(data.result.result_set.rows) == 1, "invalid count rows" + rows = data.result.result_set.rows + assert len(rows) == 1, "invalid count rows" + assert rows[0].items[0].text_value == "apple" + assert rows[0].items[1].bytes_value == b"2024-04-02T00:00:00Z" # date32 -> Timestamp @@ -1561,7 +1551,9 @@ def test_parquet_converters_to_datetime(self, kikimr, s3, client): query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) data = client.get_result_data(query_id, limit=50) - assert len(data.result.result_set.rows) == 1, "invalid count rows" + assert len(rows) == 1, "invalid count rows" + assert rows[0].items[0].text_value == "apple" + assert rows[0].items[1].bytes_value == b"2024-04-02T00:00:00Z" # int32 -> Timestamp @@ -1578,32 +1570,16 @@ def test_parquet_converters_to_datetime(self, kikimr, s3, client): pq.write_table(table, yatest_common.work_path(filename)) s3_helpers.create_bucket_and_upload_file(filename, s3.s3_url, "fbucket", yatest_common.work_path()) - sql = f''' - $result = SELECT - `fruit`, `ts` - FROM - `{storage_connection_name}`.`/{filename}` - WITH (FORMAT="parquet", - SCHEMA=( - `fruit` Utf8 NOT NULL, - `ts` Datetime - )); - - SELECT Ensure( - 0, - `fruit` = "apple" and `ts` = CAST("2024-04-02T12:01:00Z" as Datetime), - "invalid row: " || Unwrap(`fruit`) || " " || Unwrap(CAST(`ts` as String)) - ) AS value FROM $result; - ''' - query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) data = client.get_result_data(query_id, limit=50) - assert len(data.result.result_set.rows) == 1, "invalid count rows" + assert len(rows) == 1, "invalid count rows" + assert rows[0].items[0].text_value == "apple" + assert rows[0].items[1].bytes_value == b"2024-04-02T00:00:00Z" # int64 -> Timestamp - # 2024-04-02T12:01:00.000Z + # 2024-04-02T00:00:00.000Z data = [['apple'], [1712059260]] # Define the schema for the data @@ -1639,11 +1615,13 @@ def test_parquet_converters_to_datetime(self, kikimr, s3, client): query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) data = client.get_result_data(query_id, limit=50) - assert len(data.result.result_set.rows) == 1, "invalid count rows" + assert len(rows) == 1, "invalid count rows" + assert rows[0].items[0].text_value == "apple" + assert rows[0].items[1].bytes_value == b"2024-04-02T00:00:00Z" # uint64 -> Timestamp - # 2024-04-02T12:01:00.000Z + # 2024-04-02T00:00:00.000Z data = [['apple'], [1712059260]] # Define the schema for the data @@ -1659,7 +1637,9 @@ def test_parquet_converters_to_datetime(self, kikimr, s3, client): query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) data = client.get_result_data(query_id, limit=50) - assert len(data.result.result_set.rows) == 1, "invalid count rows" + assert len(rows) == 1, "invalid count rows" + assert rows[0].items[0].text_value == "apple" + assert rows[0].items[1].bytes_value == b"2024-04-02T00:00:00Z" # uint16 [default] -> Timestamp @@ -1676,24 +1656,326 @@ def test_parquet_converters_to_datetime(self, kikimr, s3, client): pq.write_table(table, yatest_common.work_path(filename)) s3_helpers.create_bucket_and_upload_file(filename, s3.s3_url, "fbucket", yatest_common.work_path()) + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) + data = client.get_result_data(query_id, limit=50) + rows = data.result.result_set.rows + assert len(rows) == 1, "invalid count rows" + assert rows[0].items[0].text_value == "apple" + assert rows[0].items[1].bytes_value == b"2024-04-02T00:00:00Z" + + @yq_all + def test_parquet_converters_to_string(self, kikimr, s3, client): + unique_prefix = str(uuid.uuid4()) + + # timestamp[ms] -> String + # 2024-04-02T12:01:00.000000Z + data = [['apple'], [1712059260000]] + + # Define the schema for the data + schema = pa.schema([ + ('fruit', pa.string()), + ('ts', pa.timestamp('ms')) + ]) + + table = pa.Table.from_arrays(data, schema=schema) + filename = 'test_parquet_converters_to_string.parquet' + pq.write_table(table, yatest_common.work_path(filename)) + s3_helpers.create_bucket_and_upload_file(filename, s3.s3_url, "fbucket", yatest_common.work_path()) + + kikimr.control_plane.wait_bootstrap(1) + storage_connection_name = unique_prefix + "hcpp" + client.create_storage_connection(storage_connection_name, "fbucket") + sql = f''' - $result = SELECT + SELECT `fruit`, `ts` FROM `{storage_connection_name}`.`/{filename}` WITH (FORMAT="parquet", SCHEMA=( `fruit` Utf8 NOT NULL, - `ts` Datetime + `ts` String )); - SELECT Ensure( - 0, - `fruit` = "apple" and `ts` = CAST("2024-04-02T00:00:00Z" as Datetime), - "invalid row: " || Unwrap(`fruit`) || " " || Unwrap(CAST(`ts` as String)) - ) AS value FROM $result; ''' query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) data = client.get_result_data(query_id, limit=50) - assert len(data.result.result_set.rows) == 1, "invalid count rows" + rows = data.result.result_set.rows + assert len(rows) == 1, "invalid count rows" + assert rows[0].items[0].text_value == "apple" + assert rows[0].items[1].bytes_value == b"2024-04-02T12:01:00.000000Z" + + # timestamp[us] -> String + + # 2024-04-02T12:01:00.000000Z + data = [['apple'], [1712059260000000]] + + # Define the schema for the data + schema = pa.schema([ + ('fruit', pa.string()), + ('ts', pa.timestamp('us')) + ]) + + table = pa.Table.from_arrays(data, schema=schema) + pq.write_table(table, yatest_common.work_path(filename)) + s3_helpers.create_bucket_and_upload_file(filename, s3.s3_url, "fbucket", yatest_common.work_path()) + + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) + data = client.get_result_data(query_id, limit=50) + rows = data.result.result_set.rows + assert len(rows) == 1, "invalid count rows" + assert rows[0].items[0].text_value == "apple" + assert rows[0].items[1].bytes_value == b"2024-04-02T12:01:00.000000Z" + + # timestamp[s] -> String + + # 2024-04-02T12:01:00.000000Z + data = [['apple'], [1712059260]] + + # Define the schema for the data + schema = pa.schema([ + ('fruit', pa.string()), + ('ts', pa.timestamp('s')) + ]) + + table = pa.Table.from_arrays(data, schema=schema) + pq.write_table(table, yatest_common.work_path(filename)) + s3_helpers.create_bucket_and_upload_file(filename, s3.s3_url, "fbucket", yatest_common.work_path()) + + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) + data = client.get_result_data(query_id, limit=50) + rows = data.result.result_set.rows + assert len(rows) == 1, "invalid count rows" + assert rows[0].items[0].text_value == "apple" + assert rows[0].items[1].bytes_value == b"2024-04-02T12:01:00.000000Z" + + # timestamp[ns] -> String + + # 2024-04-02T12:01:00.000000Z + data = [['apple'], [1712059260000000000]] + + # Define the schema for the data + schema = pa.schema([ + ('fruit', pa.string()), + ('ts', pa.timestamp('ns')) + ]) + + table = pa.Table.from_arrays(data, schema=schema) + pq.write_table(table, yatest_common.work_path(filename)) + s3_helpers.create_bucket_and_upload_file(filename, s3.s3_url, "fbucket", yatest_common.work_path()) + + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) + data = client.get_result_data(query_id, limit=50) + rows = data.result.result_set.rows + assert len(rows) == 1, "invalid count rows" + assert rows[0].items[0].text_value == "apple" + assert rows[0].items[1].bytes_value == b"2024-04-02T12:01:00.000000Z" + + # date64 -> String + + # 2024-04-02 + data = [['apple'], [1712016000000]] + + # Define the schema for the data + schema = pa.schema([ + ('fruit', pa.string()), + ('ts', pa.date64()) + ]) + + table = pa.Table.from_arrays(data, schema=schema) + pq.write_table(table, yatest_common.work_path(filename)) + s3_helpers.create_bucket_and_upload_file(filename, s3.s3_url, "fbucket", yatest_common.work_path()) + + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) + data = client.get_result_data(query_id, limit=50) + rows = data.result.result_set.rows + assert len(rows) == 1, "invalid count rows" + assert rows[0].items[0].text_value == "apple" + assert rows[0].items[1].bytes_value == b"2024-04-02" + + # date32 -> String + + # 2024-04-02 + data = [['apple'], [19815]] + + # Define the schema for the data + schema = pa.schema([ + ('fruit', pa.string()), + ('ts', pa.date32()) + ]) + + table = pa.Table.from_arrays(data, schema=schema) + pq.write_table(table, yatest_common.work_path(filename)) + s3_helpers.create_bucket_and_upload_file(filename, s3.s3_url, "fbucket", yatest_common.work_path()) + + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) + data = client.get_result_data(query_id, limit=50) + rows = data.result.result_set.rows + assert len(rows) == 1, "invalid count rows" + assert rows[0].items[0].text_value == "apple" + assert rows[0].items[1].bytes_value == b"2024-04-02" + + @yq_all + def test_parquet_converters_to_utf8(self, kikimr, s3, client): + unique_prefix = str(uuid.uuid4()) + + # timestamp[ms] -> Utf8 + # 2024-04-02T12:01:00.000000Z + data = [['apple'], [1712059260000]] + + # Define the schema for the data + schema = pa.schema([ + ('fruit', pa.string()), + ('ts', pa.timestamp('ms')) + ]) + + table = pa.Table.from_arrays(data, schema=schema) + filename = 'test_parquet_converters_to_utf8.parquet' + pq.write_table(table, yatest_common.work_path(filename)) + s3_helpers.create_bucket_and_upload_file(filename, s3.s3_url, "fbucket", yatest_common.work_path()) + + kikimr.control_plane.wait_bootstrap(1) + storage_connection_name = unique_prefix + "hcpp" + client.create_storage_connection(storage_connection_name, "fbucket") + + sql = f''' + SELECT + `fruit`, `ts` + FROM + `{storage_connection_name}`.`/{filename}` + WITH (FORMAT="parquet", + SCHEMA=( + `fruit` Utf8 NOT NULL, + `ts` Utf8 + )); + ''' + + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) + data = client.get_result_data(query_id, limit=50) + rows = data.result.result_set.rows + assert len(rows) == 1, "invalid count rows" + assert rows[0].items[0].text_value == "apple" + assert rows[0].items[1].text_value == "2024-04-02T12:01:00.000000Z" + + # timestamp[us] -> Utf8 + + # 2024-04-02T12:01:00.000000Z + data = [['apple'], [1712059260000000]] + + # Define the schema for the data + schema = pa.schema([ + ('fruit', pa.string()), + ('ts', pa.timestamp('us')) + ]) + + table = pa.Table.from_arrays(data, schema=schema) + pq.write_table(table, yatest_common.work_path(filename)) + s3_helpers.create_bucket_and_upload_file(filename, s3.s3_url, "fbucket", yatest_common.work_path()) + + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) + data = client.get_result_data(query_id, limit=50) + rows = data.result.result_set.rows + assert len(rows) == 1, "invalid count rows" + assert rows[0].items[0].text_value == "apple" + assert rows[0].items[1].text_value == "2024-04-02T12:01:00.000000Z" + + # timestamp[s] -> Utf8 + + # 2024-04-02T12:01:00.000000Z + data = [['apple'], [1712059260]] + + # Define the schema for the data + schema = pa.schema([ + ('fruit', pa.string()), + ('ts', pa.timestamp('s')) + ]) + + table = pa.Table.from_arrays(data, schema=schema) + pq.write_table(table, yatest_common.work_path(filename)) + s3_helpers.create_bucket_and_upload_file(filename, s3.s3_url, "fbucket", yatest_common.work_path()) + + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) + data = client.get_result_data(query_id, limit=50) + rows = data.result.result_set.rows + assert len(rows) == 1, "invalid count rows" + assert rows[0].items[0].text_value == "apple" + assert rows[0].items[1].text_value == "2024-04-02T12:01:00.000000Z" + + # timestamp[ns] -> Utf8 + + # 2024-04-02T12:01:00.000000Z + data = [['apple'], [1712059260000000000]] + + # Define the schema for the data + schema = pa.schema([ + ('fruit', pa.string()), + ('ts', pa.timestamp('ns')) + ]) + + table = pa.Table.from_arrays(data, schema=schema) + pq.write_table(table, yatest_common.work_path(filename)) + s3_helpers.create_bucket_and_upload_file(filename, s3.s3_url, "fbucket", yatest_common.work_path()) + + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) + data = client.get_result_data(query_id, limit=50) + rows = data.result.result_set.rows + assert len(rows) == 1, "invalid count rows" + assert rows[0].items[0].text_value == "apple" + assert rows[0].items[1].text_value == "2024-04-02T12:01:00.000000Z" + + # date64 -> Utf8 + + # 2024-04-02 + data = [['apple'], [1712016000000]] + + # Define the schema for the data + schema = pa.schema([ + ('fruit', pa.string()), + ('ts', pa.date64()) + ]) + + table = pa.Table.from_arrays(data, schema=schema) + pq.write_table(table, yatest_common.work_path(filename)) + s3_helpers.create_bucket_and_upload_file(filename, s3.s3_url, "fbucket", yatest_common.work_path()) + + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) + data = client.get_result_data(query_id, limit=50) + rows = data.result.result_set.rows + assert len(rows) == 1, "invalid count rows" + assert rows[0].items[0].text_value == "apple" + assert rows[0].items[1].text_value == "2024-04-02" + + # date32 -> Utf8 + + # 2024-04-02 + data = [['apple'], [19815]] + + # Define the schema for the data + schema = pa.schema([ + ('fruit', pa.string()), + ('ts', pa.date32()) + ]) + + table = pa.Table.from_arrays(data, schema=schema) + pq.write_table(table, yatest_common.work_path(filename)) + s3_helpers.create_bucket_and_upload_file(filename, s3.s3_url, "fbucket", yatest_common.work_path()) + + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) + data = client.get_result_data(query_id, limit=50) + rows = data.result.result_set.rows + assert len(rows) == 1, "invalid count rows" + assert rows[0].items[0].text_value == "apple" + assert rows[0].items[1].text_value == "2024-04-02"