From 0cad95dfdee63aa2cf8b8874a9be03c4dbe4aeab Mon Sep 17 00:00:00 2001 From: Oleg Doronin Date: Mon, 3 Jun 2024 20:27:20 +0000 Subject: [PATCH 1/3] new converters have been added --- .../s3/actors/yql_arrow_column_converters.cpp | 65 ++++ ydb/tests/fq/s3/test_format_setting.py | 362 ++++++++++++++++++ 2 files changed, 427 insertions(+) diff --git a/ydb/library/yql/providers/s3/actors/yql_arrow_column_converters.cpp b/ydb/library/yql/providers/s3/actors/yql_arrow_column_converters.cpp index 0c379dfbc91b..f877d2481a71 100644 --- a/ydb/library/yql/providers/s3/actors/yql_arrow_column_converters.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_arrow_column_converters.cpp @@ -182,6 +182,37 @@ std::shared_ptr ArrowTypeAsYqlTimestamp(const std::shared_ptr +std::shared_ptr ArrowTypeAsYqlString(const std::shared_ptr& targetType, const std::shared_ptr& value, ui64 multiplier) { + ::NYql::NUdf::TStringArrayBuilder builder(NKikimr::NMiniKQL::TTypeInfoHelper(), targetType, *arrow::system_memory_pool(), value->length()); + ::NYql::NUdf::TFixedSizeBlockReader reader; + for (i64 i = 0; i < value->length(); ++i) { + const NUdf::TBlockItem item = reader.GetItem(*value->data(), i); + if constexpr (isOptional) { + if (!item) { + builder.Add(item); + continue; + } + } else if (!item) { + throw parquet::ParquetException(TStringBuilder() << "null value for timestamp could not be represented in non-optional type"); + } + + const TArrowType baseValue = item.As(); + if (baseValue < 0 && baseValue > static_cast(::NYql::NUdf::MAX_TIMESTAMP)) { + throw parquet::ParquetException(TStringBuilder() << "timestamp in parquet is out of range [0, " << ::NYql::NUdf::MAX_TIMESTAMP << "]: " << baseValue); + } + + if (static_cast(baseValue) > ::NYql::NUdf::MAX_TIMESTAMP / multiplier) { + throw parquet::ParquetException(TStringBuilder() << "timestamp in parquet is out of range [0, " << ::NYql::NUdf::MAX_TIMESTAMP << "] after transformation: " << baseValue); + } + + const ui64 v = baseValue * multiplier; + TString result = TInstant::FromValue(v).ToString(); + builder.Add(NUdf::TBlockItem(NUdf::TStringRef(result.c_str(), result.Size()))); + } + return builder.Build(true).make_array(); +} + template std::shared_ptr ArrowStringAsYqlTimestamp(const std::shared_ptr& targetType, const std::shared_ptr& value, const NDB::FormatSettings& formatSettings) { ::NYql::NUdf::TFixedSizeArrayBuilder builder(NKikimr::NMiniKQL::TTypeInfoHelper(), targetType, *arrow::system_memory_pool(), value->length()); @@ -375,6 +406,30 @@ TColumnConverter ArrowTimestampAsYqlTimestamp(const std::shared_ptr& targetType, bool isOptional, arrow::TimeUnit::type timeUnit) { + return [targetType, isOptional, multiplier=GetMultiplierForTimestamp(timeUnit)](const std::shared_ptr& value) { + return isOptional + ? ArrowTypeAsYqlString(targetType, value, multiplier) + : ArrowTypeAsYqlString(targetType, value, multiplier); + }; +} + +TColumnConverter ArrowDate64AsYqlString(const std::shared_ptr& targetType, bool isOptional, arrow::DateUnit dateUnit) { + return [targetType, isOptional, multiplier=GetMultiplierForDatetime(dateUnit)](const std::shared_ptr& value) { + return isOptional + ? ArrowTypeAsYqlString(targetType, value, multiplier) + : ArrowTypeAsYqlString(targetType, value, multiplier); + }; +} + +TColumnConverter ArrowDate32AsYqlString(const std::shared_ptr& targetType, bool isOptional, arrow::DateUnit dateUnit) { + return [targetType, isOptional, multiplier=GetMultiplierForTimestamp(dateUnit)](const std::shared_ptr& value) { + return isOptional + ? ArrowTypeAsYqlString(targetType, value, multiplier) + : ArrowTypeAsYqlString(targetType, value, multiplier); + }; +} + TColumnConverter ArrowDate32AsYqlDate(const std::shared_ptr& targetType, bool isOptional, arrow::DateUnit unit) { if (unit == arrow::DateUnit::MILLI) { throw parquet::ParquetException(TStringBuilder() << "millisecond accuracy does not fit into the date"); @@ -459,6 +514,9 @@ TColumnConverter BuildCustomConverter(const std::shared_ptr& or return ArrowDate32AsYqlDatetime(targetType, isOptional, dateType.unit()); case NUdf::EDataSlot::Timestamp: return ArrowDate32AsYqlTimestamp(targetType, isOptional, dateType.unit()); + case NUdf::EDataSlot::String: + case NUdf::EDataSlot::Utf8: + return ArrowDate32AsYqlString(targetType, isOptional, dateType.unit()); default: return {}; } @@ -471,6 +529,9 @@ TColumnConverter BuildCustomConverter(const std::shared_ptr& or return ArrowDate64AsYqlDatetime(targetType, isOptional, dateType.unit()); case NUdf::EDataSlot::Timestamp: return ArrowDate64AsYqlTimestamp(targetType, isOptional, dateType.unit()); + case NUdf::EDataSlot::String: + case NUdf::EDataSlot::Utf8: + return ArrowDate64AsYqlString(targetType, isOptional, dateType.unit()); default: return {}; } @@ -482,10 +543,14 @@ TColumnConverter BuildCustomConverter(const std::shared_ptr& or return ArrowTimestampAsYqlDatetime(targetType, isOptional, timestampType.unit()); case NUdf::EDataSlot::Timestamp: return ArrowTimestampAsYqlTimestamp(targetType, isOptional, timestampType.unit()); + case NUdf::EDataSlot::String: + case NUdf::EDataSlot::Utf8: + return ArrowTimestampAsYqlString(targetType, isOptional, timestampType.unit()); default: return {}; } } + case arrow::Type::STRING: case arrow::Type::BINARY: { switch (slotItem) { case NUdf::EDataSlot::Datetime: diff --git a/ydb/tests/fq/s3/test_format_setting.py b/ydb/tests/fq/s3/test_format_setting.py index 1b8a77481011..2c63e83dfeb7 100644 --- a/ydb/tests/fq/s3/test_format_setting.py +++ b/ydb/tests/fq/s3/test_format_setting.py @@ -972,6 +972,46 @@ def test_parquet_converters_to_timestamp(self, kikimr, s3, client, unique_prefix data = client.get_result_data(query_id, limit=50) assert len(data.result.result_set.rows) == 1, "invalid count rows" + # string -> Timestamp + + # 2024-04-02T12:01:00.000Z + data = [['apple'], ['2024-04-02 12:01:00']] + + # Define the schema for the data + schema = pa.schema([ + ('fruit', pa.string()), + ('ts', pa.string()) + ]) + + table = pa.Table.from_arrays(data, schema=schema) + pq.write_table(table, yatest_common.work_path(filename)) + s3_helpers.create_bucket_and_upload_file(filename, s3.s3_url, "fbucket", yatest_common.work_path()) + + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) + data = client.get_result_data(query_id, limit=50) + assert len(data.result.result_set.rows) == 1, "invalid count rows" + + # utf8 -> Timestamp + + # 2024-04-02T12:01:00.000Z + data = [['apple'], ['2024-04-02 12:01:00']] + + # Define the schema for the data + schema = pa.schema([ + ('fruit', pa.string()), + ('ts', pa.utf8()) + ]) + + table = pa.Table.from_arrays(data, schema=schema) + pq.write_table(table, yatest_common.work_path(filename)) + s3_helpers.create_bucket_and_upload_file(filename, s3.s3_url, "fbucket", yatest_common.work_path()) + + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) + data = client.get_result_data(query_id, limit=50) + assert len(data.result.result_set.rows) == 1, "invalid count rows" + # timestamp[s] -> Timestamp # 2024-04-02T12:01:00.000Z @@ -1709,3 +1749,325 @@ def test_parquet_converters_to_datetime(self, kikimr, s3, client, unique_prefix) client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) data = client.get_result_data(query_id, limit=50) assert len(data.result.result_set.rows) == 1, "invalid count rows" + + @yq_all + def test_parquet_converters_to_string(self, kikimr, s3, client, unique_prefix): + # timestamp[ms] -> String + # 2024-04-02T12:01:00.000000Z + data = [['apple'], [1712059260000]] + + # Define the schema for the data + schema = pa.schema([ + ('fruit', pa.string()), + ('ts', pa.timestamp('ms')) + ]) + + table = pa.Table.from_arrays(data, schema=schema) + filename = 'test_parquet_converters_to_string.parquet' + pq.write_table(table, yatest_common.work_path(filename)) + s3_helpers.create_bucket_and_upload_file(filename, s3.s3_url, "fbucket", yatest_common.work_path()) + + kikimr.control_plane.wait_bootstrap(1) + storage_connection_name = unique_prefix + "hcpp" + client.create_storage_connection(storage_connection_name, "fbucket") + + sql = f''' + $result = SELECT + `fruit`, `ts` + FROM + `{storage_connection_name}`.`/{filename}` + WITH (FORMAT="parquet", + SCHEMA=( + `fruit` Utf8 NOT NULL, + `ts` String + )); + + SELECT Ensure( + 0, + `fruit` = "apple" and `ts` = "2024-04-02T12:01:00.000000Z", + "invalid row: " || Unwrap(`fruit`) || " " || Unwrap(CAST(`ts` as String)) + ) AS value FROM $result; + ''' + + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) + data = client.get_result_data(query_id, limit=50) + assert len(data.result.result_set.rows) == 1, "invalid count rows" + + # timestamp[us] -> String + + # 2024-04-02T12:01:00.000000Z + data = [['apple'], [1712059260000000]] + + # Define the schema for the data + schema = pa.schema([ + ('fruit', pa.string()), + ('ts', pa.timestamp('us')) + ]) + + table = pa.Table.from_arrays(data, schema=schema) + pq.write_table(table, yatest_common.work_path(filename)) + s3_helpers.create_bucket_and_upload_file(filename, s3.s3_url, "fbucket", yatest_common.work_path()) + + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) + data = client.get_result_data(query_id, limit=50) + assert len(data.result.result_set.rows) == 1, "invalid count rows" + + # timestamp[s] -> String + + # 2024-04-02T12:01:00.000000Z + data = [['apple'], [1712059260]] + + # Define the schema for the data + schema = pa.schema([ + ('fruit', pa.string()), + ('ts', pa.timestamp('s')) + ]) + + table = pa.Table.from_arrays(data, schema=schema) + pq.write_table(table, yatest_common.work_path(filename)) + s3_helpers.create_bucket_and_upload_file(filename, s3.s3_url, "fbucket", yatest_common.work_path()) + + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) + data = client.get_result_data(query_id, limit=50) + assert len(data.result.result_set.rows) == 1, "invalid count rows" + + # timestamp[ns] -> String + + # 2024-04-02T12:01:00.000000Z + data = [['apple'], [1712059260000000000]] + + # Define the schema for the data + schema = pa.schema([ + ('fruit', pa.string()), + ('ts', pa.timestamp('ns')) + ]) + + table = pa.Table.from_arrays(data, schema=schema) + pq.write_table(table, yatest_common.work_path(filename)) + s3_helpers.create_bucket_and_upload_file(filename, s3.s3_url, "fbucket", yatest_common.work_path()) + + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) + data = client.get_result_data(query_id, limit=50) + assert len(data.result.result_set.rows) == 1, "invalid count rows" + + # date64 -> String + + # 2024-04-02T00:00:00.000000Z + data = [['apple'], [1712016000000]] + + # Define the schema for the data + schema = pa.schema([ + ('fruit', pa.string()), + ('ts', pa.date64()) + ]) + + table = pa.Table.from_arrays(data, schema=schema) + pq.write_table(table, yatest_common.work_path(filename)) + s3_helpers.create_bucket_and_upload_file(filename, s3.s3_url, "fbucket", yatest_common.work_path()) + + sql = f''' + $result = SELECT + `fruit`, `ts` + FROM + `{storage_connection_name}`.`/{filename}` + WITH (FORMAT="parquet", + SCHEMA=( + `fruit` Utf8 NOT NULL, + `ts` String + )); + SELECT Ensure( + 0, + `fruit` = "apple" and `ts` = "2024-04-02T00:00:00.000000Z", + "invalid row: " || Unwrap(`fruit`) || " " || Unwrap(CAST(`ts` as String)) + ) AS value FROM $result; + ''' + + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) + data = client.get_result_data(query_id, limit=50) + assert len(data.result.result_set.rows) == 1, "invalid count rows" + + # date32 -> String + + # 2024-04-02T00:00:00.000000Z + data = [['apple'], [19815]] + + # Define the schema for the data + schema = pa.schema([ + ('fruit', pa.string()), + ('ts', pa.date32()) + ]) + + table = pa.Table.from_arrays(data, schema=schema) + pq.write_table(table, yatest_common.work_path(filename)) + s3_helpers.create_bucket_and_upload_file(filename, s3.s3_url, "fbucket", yatest_common.work_path()) + + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) + data = client.get_result_data(query_id, limit=50) + assert len(data.result.result_set.rows) == 1, "invalid count rows" + + @yq_all + def test_parquet_converters_to_utf8(self, kikimr, s3, client, unique_prefix): + # timestamp[ms] -> Utf8 + # 2024-04-02T12:01:00.000000Z + data = [['apple'], [1712059260000]] + + # Define the schema for the data + schema = pa.schema([ + ('fruit', pa.string()), + ('ts', pa.timestamp('ms')) + ]) + + table = pa.Table.from_arrays(data, schema=schema) + filename = 'test_parquet_converters_to_utf8.parquet' + pq.write_table(table, yatest_common.work_path(filename)) + s3_helpers.create_bucket_and_upload_file(filename, s3.s3_url, "fbucket", yatest_common.work_path()) + + kikimr.control_plane.wait_bootstrap(1) + storage_connection_name = unique_prefix + "hcpp" + client.create_storage_connection(storage_connection_name, "fbucket") + + sql = f''' + $result = SELECT + `fruit`, `ts` + FROM + `{storage_connection_name}`.`/{filename}` + WITH (FORMAT="parquet", + SCHEMA=( + `fruit` Utf8 NOT NULL, + `ts` Utf8 + )); + + SELECT Ensure( + 0, + `fruit` = "apple" and `ts` = "2024-04-02T12:01:00.000000Z", + "invalid row: " || Unwrap(`fruit`) || " " || Unwrap(CAST(`ts` as String)) + ) AS value FROM $result; + ''' + + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) + data = client.get_result_data(query_id, limit=50) + assert len(data.result.result_set.rows) == 1, "invalid count rows" + + # timestamp[us] -> Utf8 + + # 2024-04-02T12:01:00.000000Z + data = [['apple'], [1712059260000000]] + + # Define the schema for the data + schema = pa.schema([ + ('fruit', pa.string()), + ('ts', pa.timestamp('us')) + ]) + + table = pa.Table.from_arrays(data, schema=schema) + pq.write_table(table, yatest_common.work_path(filename)) + s3_helpers.create_bucket_and_upload_file(filename, s3.s3_url, "fbucket", yatest_common.work_path()) + + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) + data = client.get_result_data(query_id, limit=50) + assert len(data.result.result_set.rows) == 1, "invalid count rows" + + # timestamp[s] -> Utf8 + + # 2024-04-02T12:01:00.000000Z + data = [['apple'], [1712059260]] + + # Define the schema for the data + schema = pa.schema([ + ('fruit', pa.string()), + ('ts', pa.timestamp('s')) + ]) + + table = pa.Table.from_arrays(data, schema=schema) + pq.write_table(table, yatest_common.work_path(filename)) + s3_helpers.create_bucket_and_upload_file(filename, s3.s3_url, "fbucket", yatest_common.work_path()) + + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) + data = client.get_result_data(query_id, limit=50) + assert len(data.result.result_set.rows) == 1, "invalid count rows" + + # timestamp[ns] -> Utf8 + + # 2024-04-02T12:01:00.000000Z + data = [['apple'], [1712059260000000000]] + + # Define the schema for the data + schema = pa.schema([ + ('fruit', pa.string()), + ('ts', pa.timestamp('ns')) + ]) + + table = pa.Table.from_arrays(data, schema=schema) + pq.write_table(table, yatest_common.work_path(filename)) + s3_helpers.create_bucket_and_upload_file(filename, s3.s3_url, "fbucket", yatest_common.work_path()) + + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) + data = client.get_result_data(query_id, limit=50) + assert len(data.result.result_set.rows) == 1, "invalid count rows" + + # date64 -> Utf8 + + # 2024-04-02T00:00:00.000000Z + data = [['apple'], [1712016000000]] + + # Define the schema for the data + schema = pa.schema([ + ('fruit', pa.string()), + ('ts', pa.date64()) + ]) + + table = pa.Table.from_arrays(data, schema=schema) + pq.write_table(table, yatest_common.work_path(filename)) + s3_helpers.create_bucket_and_upload_file(filename, s3.s3_url, "fbucket", yatest_common.work_path()) + + sql = f''' + $result = SELECT + `fruit`, `ts` + FROM + `{storage_connection_name}`.`/{filename}` + WITH (FORMAT="parquet", + SCHEMA=( + `fruit` Utf8 NOT NULL, + `ts` Utf8 + )); + SELECT Ensure( + 0, + `fruit` = "apple" and `ts` = "2024-04-02T00:00:00.000000Z", + "invalid row: " || Unwrap(`fruit`) || " " || Unwrap(CAST(`ts` as String)) + ) AS value FROM $result; + ''' + + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) + data = client.get_result_data(query_id, limit=50) + assert len(data.result.result_set.rows) == 1, "invalid count rows" + + # date32 -> Utf8 + + # 2024-04-02T00:00:00.000000Z + data = [['apple'], [19815]] + + # Define the schema for the data + schema = pa.schema([ + ('fruit', pa.string()), + ('ts', pa.date32()) + ]) + + table = pa.Table.from_arrays(data, schema=schema) + pq.write_table(table, yatest_common.work_path(filename)) + s3_helpers.create_bucket_and_upload_file(filename, s3.s3_url, "fbucket", yatest_common.work_path()) + + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) + data = client.get_result_data(query_id, limit=50) + assert len(data.result.result_set.rows) == 1, "invalid count rows" From 5031027ce739084915f09aacc1738e43ed21c83e Mon Sep 17 00:00:00 2001 From: Oleg Doronin Date: Wed, 5 Jun 2024 15:06:58 +0000 Subject: [PATCH 2/3] cleanup --- .../s3/actors/yql_arrow_column_converters.cpp | 12 ++++++------ ydb/tests/fq/s3/test_format_setting.py | 12 ++++++------ 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/ydb/library/yql/providers/s3/actors/yql_arrow_column_converters.cpp b/ydb/library/yql/providers/s3/actors/yql_arrow_column_converters.cpp index f877d2481a71..a3fada9666ab 100644 --- a/ydb/library/yql/providers/s3/actors/yql_arrow_column_converters.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_arrow_column_converters.cpp @@ -183,7 +183,7 @@ std::shared_ptr ArrowTypeAsYqlTimestamp(const std::shared_ptr -std::shared_ptr ArrowTypeAsYqlString(const std::shared_ptr& targetType, const std::shared_ptr& value, ui64 multiplier) { +std::shared_ptr ArrowTypeAsYqlString(const std::shared_ptr& targetType, const std::shared_ptr& value, ui64 multiplier, const TString& format = {}) { ::NYql::NUdf::TStringArrayBuilder builder(NKikimr::NMiniKQL::TTypeInfoHelper(), targetType, *arrow::system_memory_pool(), value->length()); ::NYql::NUdf::TFixedSizeBlockReader reader; for (i64 i = 0; i < value->length(); ++i) { @@ -207,7 +207,7 @@ std::shared_ptr ArrowTypeAsYqlString(const std::shared_ptr& targetType, bool isOptional, arrow::DateUnit dateUnit) { return [targetType, isOptional, multiplier=GetMultiplierForDatetime(dateUnit)](const std::shared_ptr& value) { return isOptional - ? ArrowTypeAsYqlString(targetType, value, multiplier) - : ArrowTypeAsYqlString(targetType, value, multiplier); + ? ArrowTypeAsYqlString(targetType, value, multiplier, "%Y-%m-%d") + : ArrowTypeAsYqlString(targetType, value, multiplier, "%Y-%m-%d"); }; } TColumnConverter ArrowDate32AsYqlString(const std::shared_ptr& targetType, bool isOptional, arrow::DateUnit dateUnit) { return [targetType, isOptional, multiplier=GetMultiplierForTimestamp(dateUnit)](const std::shared_ptr& value) { return isOptional - ? ArrowTypeAsYqlString(targetType, value, multiplier) - : ArrowTypeAsYqlString(targetType, value, multiplier); + ? ArrowTypeAsYqlString(targetType, value, multiplier, "%Y-%m-%d") + : ArrowTypeAsYqlString(targetType, value, multiplier, "%Y-%m-%d"); }; } diff --git a/ydb/tests/fq/s3/test_format_setting.py b/ydb/tests/fq/s3/test_format_setting.py index 2c63e83dfeb7..3386839457a8 100644 --- a/ydb/tests/fq/s3/test_format_setting.py +++ b/ydb/tests/fq/s3/test_format_setting.py @@ -1856,7 +1856,7 @@ def test_parquet_converters_to_string(self, kikimr, s3, client, unique_prefix): # date64 -> String - # 2024-04-02T00:00:00.000000Z + # 2024-04-02 data = [['apple'], [1712016000000]] # Define the schema for the data @@ -1881,7 +1881,7 @@ def test_parquet_converters_to_string(self, kikimr, s3, client, unique_prefix): )); SELECT Ensure( 0, - `fruit` = "apple" and `ts` = "2024-04-02T00:00:00.000000Z", + `fruit` = "apple" and `ts` = "2024-04-02", "invalid row: " || Unwrap(`fruit`) || " " || Unwrap(CAST(`ts` as String)) ) AS value FROM $result; ''' @@ -1893,7 +1893,7 @@ def test_parquet_converters_to_string(self, kikimr, s3, client, unique_prefix): # date32 -> String - # 2024-04-02T00:00:00.000000Z + # 2024-04-02 data = [['apple'], [19815]] # Define the schema for the data @@ -2017,7 +2017,7 @@ def test_parquet_converters_to_utf8(self, kikimr, s3, client, unique_prefix): # date64 -> Utf8 - # 2024-04-02T00:00:00.000000Z + # 2024-04-02 data = [['apple'], [1712016000000]] # Define the schema for the data @@ -2042,7 +2042,7 @@ def test_parquet_converters_to_utf8(self, kikimr, s3, client, unique_prefix): )); SELECT Ensure( 0, - `fruit` = "apple" and `ts` = "2024-04-02T00:00:00.000000Z", + `fruit` = "apple" and `ts` = "2024-04-02", "invalid row: " || Unwrap(`fruit`) || " " || Unwrap(CAST(`ts` as String)) ) AS value FROM $result; ''' @@ -2054,7 +2054,7 @@ def test_parquet_converters_to_utf8(self, kikimr, s3, client, unique_prefix): # date32 -> Utf8 - # 2024-04-02T00:00:00.000000Z + # 2024-04-02 data = [['apple'], [19815]] # Define the schema for the data From 59ca9b04863fad6cfb21cc5f634e88819b7ce3b5 Mon Sep 17 00:00:00 2001 From: Oleg Doronin Date: Wed, 5 Jun 2024 18:48:33 +0000 Subject: [PATCH 3/3] cleanup --- ydb/tests/fq/s3/test_format_setting.py | 404 ++++++++++--------------- 1 file changed, 160 insertions(+), 244 deletions(-) diff --git a/ydb/tests/fq/s3/test_format_setting.py b/ydb/tests/fq/s3/test_format_setting.py index 3386839457a8..85ae10ae2e8a 100644 --- a/ydb/tests/fq/s3/test_format_setting.py +++ b/ydb/tests/fq/s3/test_format_setting.py @@ -930,8 +930,8 @@ def test_parquet_converters_to_timestamp(self, kikimr, s3, client, unique_prefix client.create_storage_connection(storage_connection_name, "fbucket") sql = f''' - $result = SELECT - `fruit`, `ts` + SELECT + `fruit`, CAST(`ts` as String) FROM `{storage_connection_name}`.`/{filename}` WITH (FORMAT="parquet", @@ -939,18 +939,15 @@ def test_parquet_converters_to_timestamp(self, kikimr, s3, client, unique_prefix `fruit` Utf8 NOT NULL, `ts` Timestamp )); - - SELECT Ensure( - 0, - `fruit` = "apple" and `ts` = CAST("2024-04-02T12:01:00.000Z" as Timestamp), - "invalid row: " || Unwrap(`fruit`) || " " || Unwrap(CAST(`ts` as String)) - ) AS value FROM $result; ''' query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) data = client.get_result_data(query_id, limit=50) - assert len(data.result.result_set.rows) == 1, "invalid count rows" + rows = data.result.result_set.rows + assert len(rows) == 1, "invalid count rows" + assert rows[0].items[0].text_value == "apple" + assert rows[0].items[1].bytes_value == b"2024-04-02T12:01:00Z" # timestamp[us] -> Timestamp @@ -970,7 +967,10 @@ def test_parquet_converters_to_timestamp(self, kikimr, s3, client, unique_prefix query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) data = client.get_result_data(query_id, limit=50) - assert len(data.result.result_set.rows) == 1, "invalid count rows" + rows = data.result.result_set.rows + assert len(rows) == 1, "invalid count rows" + assert rows[0].items[0].text_value == "apple" + assert rows[0].items[1].bytes_value == b"2024-04-02T12:01:00Z" # string -> Timestamp @@ -990,7 +990,10 @@ def test_parquet_converters_to_timestamp(self, kikimr, s3, client, unique_prefix query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) data = client.get_result_data(query_id, limit=50) - assert len(data.result.result_set.rows) == 1, "invalid count rows" + rows = data.result.result_set.rows + assert len(rows) == 1, "invalid count rows" + assert rows[0].items[0].text_value == "apple" + assert rows[0].items[1].bytes_value == b"2024-04-02T12:01:00Z" # utf8 -> Timestamp @@ -1010,7 +1013,10 @@ def test_parquet_converters_to_timestamp(self, kikimr, s3, client, unique_prefix query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) data = client.get_result_data(query_id, limit=50) - assert len(data.result.result_set.rows) == 1, "invalid count rows" + rows = data.result.result_set.rows + assert len(rows) == 1, "invalid count rows" + assert rows[0].items[0].text_value == "apple" + assert rows[0].items[1].bytes_value == b"2024-04-02T12:01:00Z" # timestamp[s] -> Timestamp @@ -1030,7 +1036,10 @@ def test_parquet_converters_to_timestamp(self, kikimr, s3, client, unique_prefix query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) data = client.get_result_data(query_id, limit=50) - assert len(data.result.result_set.rows) == 1, "invalid count rows" + rows = data.result.result_set.rows + assert len(rows) == 1, "invalid count rows" + assert rows[0].items[0].text_value == "apple" + assert rows[0].items[1].bytes_value == b"2024-04-02T12:01:00Z" # timestamp[ns] -> Timestamp @@ -1050,7 +1059,10 @@ def test_parquet_converters_to_timestamp(self, kikimr, s3, client, unique_prefix query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) data = client.get_result_data(query_id, limit=50) - assert len(data.result.result_set.rows) == 1, "invalid count rows" + rows = data.result.result_set.rows + assert len(rows) == 1, "invalid count rows" + assert rows[0].items[0].text_value == "apple" + assert rows[0].items[1].bytes_value == b"2024-04-02T12:01:00Z" # date64 -> Timestamp @@ -1067,27 +1079,13 @@ def test_parquet_converters_to_timestamp(self, kikimr, s3, client, unique_prefix pq.write_table(table, yatest_common.work_path(filename)) s3_helpers.create_bucket_and_upload_file(filename, s3.s3_url, "fbucket", yatest_common.work_path()) - sql = f''' - $result = SELECT - `fruit`, `ts` - FROM - `{storage_connection_name}`.`/{filename}` - WITH (FORMAT="parquet", - SCHEMA=( - `fruit` Utf8 NOT NULL, - `ts` Timestamp - )); - SELECT Ensure( - 0, - `fruit` = "apple" and `ts` = CAST("2024-04-02T00:00:00.000Z" as Timestamp), - "invalid row: " || Unwrap(`fruit`) || " " || Unwrap(CAST(`ts` as String)) - ) AS value FROM $result; - ''' - query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) data = client.get_result_data(query_id, limit=50) - assert len(data.result.result_set.rows) == 1, "invalid count rows" + rows = data.result.result_set.rows + assert len(rows) == 1, "invalid count rows" + assert rows[0].items[0].text_value == "apple" + assert rows[0].items[1].bytes_value == b"2024-04-02T00:00:00Z" # date32 -> Timestamp @@ -1107,7 +1105,10 @@ def test_parquet_converters_to_timestamp(self, kikimr, s3, client, unique_prefix query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) data = client.get_result_data(query_id, limit=50) - assert len(data.result.result_set.rows) == 1, "invalid count rows" + rows = data.result.result_set.rows + assert len(rows) == 1, "invalid count rows" + assert rows[0].items[0].text_value == "apple" + assert rows[0].items[1].bytes_value == b"2024-04-02T00:00:00Z" # int32 [UNIX_TIME_SECONDS] -> Timestamp @@ -1125,8 +1126,8 @@ def test_parquet_converters_to_timestamp(self, kikimr, s3, client, unique_prefix s3_helpers.create_bucket_and_upload_file(filename, s3.s3_url, "fbucket", yatest_common.work_path()) sql = f''' - $result = SELECT - `fruit`, `ts` + SELECT + `fruit`, CAST(`ts` as String) FROM `{storage_connection_name}`.`/{filename}` WITH (FORMAT="parquet", @@ -1135,17 +1136,15 @@ def test_parquet_converters_to_timestamp(self, kikimr, s3, client, unique_prefix `ts` Timestamp ), `data.timestamp.format_name`="UNIX_TIME_SECONDS"); - SELECT Ensure( - 0, - `fruit` = "apple" and `ts` = CAST("2024-04-02T12:01:00.000Z" as Timestamp), - "invalid row: " || Unwrap(`fruit`) || " " || Unwrap(CAST(`ts` as String)) - ) AS value FROM $result; ''' query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) data = client.get_result_data(query_id, limit=50) - assert len(data.result.result_set.rows) == 1, "invalid count rows" + rows = data.result.result_set.rows + assert len(rows) == 1, "invalid count rows" + assert rows[0].items[0].text_value == "apple" + assert rows[0].items[1].bytes_value == b"2024-04-02T12:01:00Z" # int64 [UNIX_TIME_SECONDS] -> Timestamp @@ -1163,8 +1162,8 @@ def test_parquet_converters_to_timestamp(self, kikimr, s3, client, unique_prefix s3_helpers.create_bucket_and_upload_file(filename, s3.s3_url, "fbucket", yatest_common.work_path()) sql = f''' - $result = SELECT - `fruit`, `ts` + SELECT + `fruit`, CAST(`ts` as String) FROM `{storage_connection_name}`.`/{filename}` WITH (FORMAT="parquet", @@ -1173,17 +1172,15 @@ def test_parquet_converters_to_timestamp(self, kikimr, s3, client, unique_prefix `ts` Timestamp ), `data.timestamp.format_name`="UNIX_TIME_SECONDS"); - SELECT Ensure( - 0, - `fruit` = "apple" and `ts` = CAST("2024-04-02T12:01:00.000Z" as Timestamp), - "invalid row: " || Unwrap(`fruit`) || " " || Unwrap(CAST(`ts` as String)) - ) AS value FROM $result; ''' query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) data = client.get_result_data(query_id, limit=50) - assert len(data.result.result_set.rows) == 1, "invalid count rows" + rows = data.result.result_set.rows + assert len(rows) == 1, "invalid count rows" + assert rows[0].items[0].text_value == "apple" + assert rows[0].items[1].bytes_value == b"2024-04-02T12:01:00Z" # int64 [UNIX_TIME_MILLISECONDS] -> Timestamp @@ -1201,8 +1198,8 @@ def test_parquet_converters_to_timestamp(self, kikimr, s3, client, unique_prefix s3_helpers.create_bucket_and_upload_file(filename, s3.s3_url, "fbucket", yatest_common.work_path()) sql = f''' - $result = SELECT - `fruit`, `ts` + SELECT + `fruit`, CAST(`ts` as String) FROM `{storage_connection_name}`.`/{filename}` WITH (FORMAT="parquet", @@ -1211,17 +1208,15 @@ def test_parquet_converters_to_timestamp(self, kikimr, s3, client, unique_prefix `ts` Timestamp ), `data.timestamp.format_name`="UNIX_TIME_MILLISECONDS"); - SELECT Ensure( - 0, - `fruit` = "apple" and `ts` = CAST("2024-04-02T12:01:00.000Z" as Timestamp), - "invalid row: " || Unwrap(`fruit`) || " " || Unwrap(CAST(`ts` as String)) - ) AS value FROM $result; ''' query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) data = client.get_result_data(query_id, limit=50) - assert len(data.result.result_set.rows) == 1, "invalid count rows" + rows = data.result.result_set.rows + assert len(rows) == 1, "invalid count rows" + assert rows[0].items[0].text_value == "apple" + assert rows[0].items[1].bytes_value == b"2024-04-02T12:01:00Z" # int64 [UNIX_TIME_MICROSECONDS] -> Timestamp @@ -1239,8 +1234,8 @@ def test_parquet_converters_to_timestamp(self, kikimr, s3, client, unique_prefix s3_helpers.create_bucket_and_upload_file(filename, s3.s3_url, "fbucket", yatest_common.work_path()) sql = f''' - $result = SELECT - `fruit`, `ts` + SELECT + `fruit`, CAST(`ts` as String) FROM `{storage_connection_name}`.`/{filename}` WITH (FORMAT="parquet", @@ -1249,17 +1244,15 @@ def test_parquet_converters_to_timestamp(self, kikimr, s3, client, unique_prefix `ts` Timestamp ), `data.timestamp.format_name`="UNIX_TIME_MICROSECONDS"); - SELECT Ensure( - 0, - `fruit` = "apple" and `ts` = CAST("2024-04-02T12:01:00.000Z" as Timestamp), - "invalid row: " || Unwrap(`fruit`) || " " || Unwrap(CAST(`ts` as String)) - ) AS value FROM $result; ''' query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) data = client.get_result_data(query_id, limit=50) - assert len(data.result.result_set.rows) == 1, "invalid count rows" + rows = data.result.result_set.rows + assert len(rows) == 1, "invalid count rows" + assert rows[0].items[0].text_value == "apple" + assert rows[0].items[1].bytes_value == b"2024-04-02T12:01:00Z" # uint32 [UNIX_TIME_SECONDS] -> Timestamp @@ -1277,8 +1270,8 @@ def test_parquet_converters_to_timestamp(self, kikimr, s3, client, unique_prefix s3_helpers.create_bucket_and_upload_file(filename, s3.s3_url, "fbucket", yatest_common.work_path()) sql = f''' - $result = SELECT - `fruit`, `ts` + SELECT + `fruit`, CAST(`ts` as String) FROM `{storage_connection_name}`.`/{filename}` WITH (FORMAT="parquet", @@ -1287,17 +1280,15 @@ def test_parquet_converters_to_timestamp(self, kikimr, s3, client, unique_prefix `ts` Timestamp ), `data.timestamp.format_name`="UNIX_TIME_SECONDS"); - SELECT Ensure( - 0, - `fruit` = "apple" and `ts` = CAST("2024-04-02T12:01:00.000Z" as Timestamp), - "invalid row: " || Unwrap(`fruit`) || " " || Unwrap(CAST(`ts` as String)) - ) AS value FROM $result; ''' query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) data = client.get_result_data(query_id, limit=50) - assert len(data.result.result_set.rows) == 1, "invalid count rows" + rows = data.result.result_set.rows + assert len(rows) == 1, "invalid count rows" + assert rows[0].items[0].text_value == "apple" + assert rows[0].items[1].bytes_value == b"2024-04-02T12:01:00Z" # uint64 [UNIX_TIME_SECONDS] -> Timestamp @@ -1314,28 +1305,13 @@ def test_parquet_converters_to_timestamp(self, kikimr, s3, client, unique_prefix pq.write_table(table, yatest_common.work_path(filename)) s3_helpers.create_bucket_and_upload_file(filename, s3.s3_url, "fbucket", yatest_common.work_path()) - sql = f''' - $result = SELECT - `fruit`, `ts` - FROM - `{storage_connection_name}`.`/{filename}` - WITH (FORMAT="parquet", - SCHEMA=( - `fruit` Utf8 NOT NULL, - `ts` Timestamp - ), - `data.timestamp.format_name`="UNIX_TIME_SECONDS"); - SELECT Ensure( - 0, - `fruit` = "apple" and `ts` = CAST("2024-04-02T12:01:00.000Z" as Timestamp), - "invalid row: " || Unwrap(`fruit`) || " " || Unwrap(CAST(`ts` as String)) - ) AS value FROM $result; - ''' - query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) data = client.get_result_data(query_id, limit=50) - assert len(data.result.result_set.rows) == 1, "invalid count rows" + rows = data.result.result_set.rows + assert len(rows) == 1, "invalid count rows" + assert rows[0].items[0].text_value == "apple" + assert rows[0].items[1].bytes_value == b"2024-04-02T12:01:00Z" # uint64 [UNIX_TIME_MILLISECONDS] -> Timestamp @@ -1353,8 +1329,8 @@ def test_parquet_converters_to_timestamp(self, kikimr, s3, client, unique_prefix s3_helpers.create_bucket_and_upload_file(filename, s3.s3_url, "fbucket", yatest_common.work_path()) sql = f''' - $result = SELECT - `fruit`, `ts` + SELECT + `fruit`, CAST(`ts` as String) FROM `{storage_connection_name}`.`/{filename}` WITH (FORMAT="parquet", @@ -1363,17 +1339,15 @@ def test_parquet_converters_to_timestamp(self, kikimr, s3, client, unique_prefix `ts` Timestamp ), `data.timestamp.format_name`="UNIX_TIME_MILLISECONDS"); - SELECT Ensure( - 0, - `fruit` = "apple" and `ts` = CAST("2024-04-02T12:01:00.000Z" as Timestamp), - "invalid row: " || Unwrap(`fruit`) || " " || Unwrap(CAST(`ts` as String)) - ) AS value FROM $result; ''' query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) data = client.get_result_data(query_id, limit=50) - assert len(data.result.result_set.rows) == 1, "invalid count rows" + rows = data.result.result_set.rows + assert len(rows) == 1, "invalid count rows" + assert rows[0].items[0].text_value == "apple" + assert rows[0].items[1].bytes_value == b"2024-04-02T12:01:00Z" # uint64 [UNIX_TIME_MICROSECONDS] -> Timestamp @@ -1391,8 +1365,8 @@ def test_parquet_converters_to_timestamp(self, kikimr, s3, client, unique_prefix s3_helpers.create_bucket_and_upload_file(filename, s3.s3_url, "fbucket", yatest_common.work_path()) sql = f''' - $result = SELECT - `fruit`, `ts` + SELECT + `fruit`, CAST(`ts` as String) FROM `{storage_connection_name}`.`/{filename}` WITH (FORMAT="parquet", @@ -1401,17 +1375,15 @@ def test_parquet_converters_to_timestamp(self, kikimr, s3, client, unique_prefix `ts` Timestamp ), `data.timestamp.format_name`="UNIX_TIME_MICROSECONDS"); - SELECT Ensure( - 0, - `fruit` = "apple" and `ts` = CAST("2024-04-02T12:01:00.000Z" as Timestamp), - "invalid row: " || Unwrap(`fruit`) || " " || Unwrap(CAST(`ts` as String)) - ) AS value FROM $result; ''' query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) data = client.get_result_data(query_id, limit=50) - assert len(data.result.result_set.rows) == 1, "invalid count rows" + rows = data.result.result_set.rows + assert len(rows) == 1, "invalid count rows" + assert rows[0].items[0].text_value == "apple" + assert rows[0].items[1].bytes_value == b"2024-04-02T12:01:00Z" # uint16 [default] -> Timestamp @@ -1429,8 +1401,8 @@ def test_parquet_converters_to_timestamp(self, kikimr, s3, client, unique_prefix s3_helpers.create_bucket_and_upload_file(filename, s3.s3_url, "fbucket", yatest_common.work_path()) sql = f''' - $result = SELECT - `fruit`, `ts` + SELECT + `fruit`, CAST(`ts` as String) FROM `{storage_connection_name}`.`/{filename}` WITH (FORMAT="parquet", @@ -1438,17 +1410,15 @@ def test_parquet_converters_to_timestamp(self, kikimr, s3, client, unique_prefix `fruit` Utf8 NOT NULL, `ts` Timestamp )); - SELECT Ensure( - 0, - `fruit` = "apple" and `ts` = CAST("2024-04-02T00:00:00.000Z" as Timestamp), - "invalid row: " || Unwrap(`fruit`) || " " || Unwrap(CAST(`ts` as String)) - ) AS value FROM $result; ''' query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) data = client.get_result_data(query_id, limit=50) - assert len(data.result.result_set.rows) == 1, "invalid count rows" + rows = data.result.result_set.rows + assert len(rows) == 1, "invalid count rows" + assert rows[0].items[0].text_value == "apple" + assert rows[0].items[1].bytes_value == b"2024-04-02T00:00:00Z" @yq_all def test_parquet_converters_to_datetime(self, kikimr, s3, client, unique_prefix): @@ -1472,8 +1442,8 @@ def test_parquet_converters_to_datetime(self, kikimr, s3, client, unique_prefix) client.create_storage_connection(storage_connection_name, "fbucket") sql = f''' - $result = SELECT - `fruit`, `ts` + SELECT + `fruit`, CAST(`ts` as String) FROM `{storage_connection_name}`.`/{filename}` WITH (FORMAT="parquet", @@ -1481,12 +1451,6 @@ def test_parquet_converters_to_datetime(self, kikimr, s3, client, unique_prefix) `fruit` Utf8 NOT NULL, `ts` Datetime )); - - SELECT Ensure( - 0, - `fruit` = "apple" and `ts` = CAST("2024-04-02T00:00:00Z" as Datetime), - "invalid row: " || Unwrap(`fruit`) || " " || Unwrap(CAST(`ts` as String)) - ) AS value FROM $result; ''' query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id @@ -1573,27 +1537,13 @@ def test_parquet_converters_to_datetime(self, kikimr, s3, client, unique_prefix) pq.write_table(table, yatest_common.work_path(filename)) s3_helpers.create_bucket_and_upload_file(filename, s3.s3_url, "fbucket", yatest_common.work_path()) - sql = f''' - $result = SELECT - `fruit`, `ts` - FROM - `{storage_connection_name}`.`/{filename}` - WITH (FORMAT="parquet", - SCHEMA=( - `fruit` Utf8 NOT NULL, - `ts` Datetime - )); - SELECT Ensure( - 0, - `fruit` = "apple" and `ts` = CAST("2024-04-02T00:00:00Z" as Datetime), - "invalid row: " || Unwrap(`fruit`) || " " || Unwrap(CAST(`ts` as String)) - ) AS value FROM $result; - ''' - query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) data = client.get_result_data(query_id, limit=50) - assert len(data.result.result_set.rows) == 1, "invalid count rows" + rows = data.result.result_set.rows + assert len(rows) == 1, "invalid count rows" + assert rows[0].items[0].text_value == "apple" + assert rows[0].items[1].bytes_value == b"2024-04-02T00:00:00Z" # date32 -> Timestamp @@ -1613,7 +1563,9 @@ def test_parquet_converters_to_datetime(self, kikimr, s3, client, unique_prefix) query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) data = client.get_result_data(query_id, limit=50) - assert len(data.result.result_set.rows) == 1, "invalid count rows" + assert len(rows) == 1, "invalid count rows" + assert rows[0].items[0].text_value == "apple" + assert rows[0].items[1].bytes_value == b"2024-04-02T00:00:00Z" # int32 -> Timestamp @@ -1630,32 +1582,16 @@ def test_parquet_converters_to_datetime(self, kikimr, s3, client, unique_prefix) pq.write_table(table, yatest_common.work_path(filename)) s3_helpers.create_bucket_and_upload_file(filename, s3.s3_url, "fbucket", yatest_common.work_path()) - sql = f''' - $result = SELECT - `fruit`, `ts` - FROM - `{storage_connection_name}`.`/{filename}` - WITH (FORMAT="parquet", - SCHEMA=( - `fruit` Utf8 NOT NULL, - `ts` Datetime - )); - - SELECT Ensure( - 0, - `fruit` = "apple" and `ts` = CAST("2024-04-02T12:01:00Z" as Datetime), - "invalid row: " || Unwrap(`fruit`) || " " || Unwrap(CAST(`ts` as String)) - ) AS value FROM $result; - ''' - query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) data = client.get_result_data(query_id, limit=50) - assert len(data.result.result_set.rows) == 1, "invalid count rows" + assert len(rows) == 1, "invalid count rows" + assert rows[0].items[0].text_value == "apple" + assert rows[0].items[1].bytes_value == b"2024-04-02T00:00:00Z" # int64 -> Timestamp - # 2024-04-02T12:01:00.000Z + # 2024-04-02T00:00:00.000Z data = [['apple'], [1712059260]] # Define the schema for the data @@ -1691,11 +1627,13 @@ def test_parquet_converters_to_datetime(self, kikimr, s3, client, unique_prefix) query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) data = client.get_result_data(query_id, limit=50) - assert len(data.result.result_set.rows) == 1, "invalid count rows" + assert len(rows) == 1, "invalid count rows" + assert rows[0].items[0].text_value == "apple" + assert rows[0].items[1].bytes_value == b"2024-04-02T00:00:00Z" # uint64 -> Timestamp - # 2024-04-02T12:01:00.000Z + # 2024-04-02T00:00:00.000Z data = [['apple'], [1712059260]] # Define the schema for the data @@ -1711,7 +1649,9 @@ def test_parquet_converters_to_datetime(self, kikimr, s3, client, unique_prefix) query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) data = client.get_result_data(query_id, limit=50) - assert len(data.result.result_set.rows) == 1, "invalid count rows" + assert len(rows) == 1, "invalid count rows" + assert rows[0].items[0].text_value == "apple" + assert rows[0].items[1].bytes_value == b"2024-04-02T00:00:00Z" # uint16 [default] -> Timestamp @@ -1728,27 +1668,13 @@ def test_parquet_converters_to_datetime(self, kikimr, s3, client, unique_prefix) pq.write_table(table, yatest_common.work_path(filename)) s3_helpers.create_bucket_and_upload_file(filename, s3.s3_url, "fbucket", yatest_common.work_path()) - sql = f''' - $result = SELECT - `fruit`, `ts` - FROM - `{storage_connection_name}`.`/{filename}` - WITH (FORMAT="parquet", - SCHEMA=( - `fruit` Utf8 NOT NULL, - `ts` Datetime - )); - SELECT Ensure( - 0, - `fruit` = "apple" and `ts` = CAST("2024-04-02T00:00:00Z" as Datetime), - "invalid row: " || Unwrap(`fruit`) || " " || Unwrap(CAST(`ts` as String)) - ) AS value FROM $result; - ''' - query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) data = client.get_result_data(query_id, limit=50) - assert len(data.result.result_set.rows) == 1, "invalid count rows" + rows = data.result.result_set.rows + assert len(rows) == 1, "invalid count rows" + assert rows[0].items[0].text_value == "apple" + assert rows[0].items[1].bytes_value == b"2024-04-02T00:00:00Z" @yq_all def test_parquet_converters_to_string(self, kikimr, s3, client, unique_prefix): @@ -1772,7 +1698,7 @@ def test_parquet_converters_to_string(self, kikimr, s3, client, unique_prefix): client.create_storage_connection(storage_connection_name, "fbucket") sql = f''' - $result = SELECT + SELECT `fruit`, `ts` FROM `{storage_connection_name}`.`/{filename}` @@ -1781,18 +1707,15 @@ def test_parquet_converters_to_string(self, kikimr, s3, client, unique_prefix): `fruit` Utf8 NOT NULL, `ts` String )); - - SELECT Ensure( - 0, - `fruit` = "apple" and `ts` = "2024-04-02T12:01:00.000000Z", - "invalid row: " || Unwrap(`fruit`) || " " || Unwrap(CAST(`ts` as String)) - ) AS value FROM $result; ''' query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) data = client.get_result_data(query_id, limit=50) - assert len(data.result.result_set.rows) == 1, "invalid count rows" + rows = data.result.result_set.rows + assert len(rows) == 1, "invalid count rows" + assert rows[0].items[0].text_value == "apple" + assert rows[0].items[1].bytes_value == b"2024-04-02T12:01:00.000000Z" # timestamp[us] -> String @@ -1812,7 +1735,10 @@ def test_parquet_converters_to_string(self, kikimr, s3, client, unique_prefix): query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) data = client.get_result_data(query_id, limit=50) - assert len(data.result.result_set.rows) == 1, "invalid count rows" + rows = data.result.result_set.rows + assert len(rows) == 1, "invalid count rows" + assert rows[0].items[0].text_value == "apple" + assert rows[0].items[1].bytes_value == b"2024-04-02T12:01:00.000000Z" # timestamp[s] -> String @@ -1832,7 +1758,10 @@ def test_parquet_converters_to_string(self, kikimr, s3, client, unique_prefix): query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) data = client.get_result_data(query_id, limit=50) - assert len(data.result.result_set.rows) == 1, "invalid count rows" + rows = data.result.result_set.rows + assert len(rows) == 1, "invalid count rows" + assert rows[0].items[0].text_value == "apple" + assert rows[0].items[1].bytes_value == b"2024-04-02T12:01:00.000000Z" # timestamp[ns] -> String @@ -1852,7 +1781,10 @@ def test_parquet_converters_to_string(self, kikimr, s3, client, unique_prefix): query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) data = client.get_result_data(query_id, limit=50) - assert len(data.result.result_set.rows) == 1, "invalid count rows" + rows = data.result.result_set.rows + assert len(rows) == 1, "invalid count rows" + assert rows[0].items[0].text_value == "apple" + assert rows[0].items[1].bytes_value == b"2024-04-02T12:01:00.000000Z" # date64 -> String @@ -1869,27 +1801,13 @@ def test_parquet_converters_to_string(self, kikimr, s3, client, unique_prefix): pq.write_table(table, yatest_common.work_path(filename)) s3_helpers.create_bucket_and_upload_file(filename, s3.s3_url, "fbucket", yatest_common.work_path()) - sql = f''' - $result = SELECT - `fruit`, `ts` - FROM - `{storage_connection_name}`.`/{filename}` - WITH (FORMAT="parquet", - SCHEMA=( - `fruit` Utf8 NOT NULL, - `ts` String - )); - SELECT Ensure( - 0, - `fruit` = "apple" and `ts` = "2024-04-02", - "invalid row: " || Unwrap(`fruit`) || " " || Unwrap(CAST(`ts` as String)) - ) AS value FROM $result; - ''' - query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) data = client.get_result_data(query_id, limit=50) - assert len(data.result.result_set.rows) == 1, "invalid count rows" + rows = data.result.result_set.rows + assert len(rows) == 1, "invalid count rows" + assert rows[0].items[0].text_value == "apple" + assert rows[0].items[1].bytes_value == b"2024-04-02" # date32 -> String @@ -1909,7 +1827,10 @@ def test_parquet_converters_to_string(self, kikimr, s3, client, unique_prefix): query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) data = client.get_result_data(query_id, limit=50) - assert len(data.result.result_set.rows) == 1, "invalid count rows" + rows = data.result.result_set.rows + assert len(rows) == 1, "invalid count rows" + assert rows[0].items[0].text_value == "apple" + assert rows[0].items[1].bytes_value == b"2024-04-02" @yq_all def test_parquet_converters_to_utf8(self, kikimr, s3, client, unique_prefix): @@ -1933,7 +1854,7 @@ def test_parquet_converters_to_utf8(self, kikimr, s3, client, unique_prefix): client.create_storage_connection(storage_connection_name, "fbucket") sql = f''' - $result = SELECT + SELECT `fruit`, `ts` FROM `{storage_connection_name}`.`/{filename}` @@ -1942,18 +1863,15 @@ def test_parquet_converters_to_utf8(self, kikimr, s3, client, unique_prefix): `fruit` Utf8 NOT NULL, `ts` Utf8 )); - - SELECT Ensure( - 0, - `fruit` = "apple" and `ts` = "2024-04-02T12:01:00.000000Z", - "invalid row: " || Unwrap(`fruit`) || " " || Unwrap(CAST(`ts` as String)) - ) AS value FROM $result; ''' query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) data = client.get_result_data(query_id, limit=50) - assert len(data.result.result_set.rows) == 1, "invalid count rows" + rows = data.result.result_set.rows + assert len(rows) == 1, "invalid count rows" + assert rows[0].items[0].text_value == "apple" + assert rows[0].items[1].text_value == "2024-04-02T12:01:00.000000Z" # timestamp[us] -> Utf8 @@ -1973,7 +1891,10 @@ def test_parquet_converters_to_utf8(self, kikimr, s3, client, unique_prefix): query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) data = client.get_result_data(query_id, limit=50) - assert len(data.result.result_set.rows) == 1, "invalid count rows" + rows = data.result.result_set.rows + assert len(rows) == 1, "invalid count rows" + assert rows[0].items[0].text_value == "apple" + assert rows[0].items[1].text_value == "2024-04-02T12:01:00.000000Z" # timestamp[s] -> Utf8 @@ -1993,7 +1914,10 @@ def test_parquet_converters_to_utf8(self, kikimr, s3, client, unique_prefix): query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) data = client.get_result_data(query_id, limit=50) - assert len(data.result.result_set.rows) == 1, "invalid count rows" + rows = data.result.result_set.rows + assert len(rows) == 1, "invalid count rows" + assert rows[0].items[0].text_value == "apple" + assert rows[0].items[1].text_value == "2024-04-02T12:01:00.000000Z" # timestamp[ns] -> Utf8 @@ -2013,7 +1937,10 @@ def test_parquet_converters_to_utf8(self, kikimr, s3, client, unique_prefix): query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) data = client.get_result_data(query_id, limit=50) - assert len(data.result.result_set.rows) == 1, "invalid count rows" + rows = data.result.result_set.rows + assert len(rows) == 1, "invalid count rows" + assert rows[0].items[0].text_value == "apple" + assert rows[0].items[1].text_value == "2024-04-02T12:01:00.000000Z" # date64 -> Utf8 @@ -2030,27 +1957,13 @@ def test_parquet_converters_to_utf8(self, kikimr, s3, client, unique_prefix): pq.write_table(table, yatest_common.work_path(filename)) s3_helpers.create_bucket_and_upload_file(filename, s3.s3_url, "fbucket", yatest_common.work_path()) - sql = f''' - $result = SELECT - `fruit`, `ts` - FROM - `{storage_connection_name}`.`/{filename}` - WITH (FORMAT="parquet", - SCHEMA=( - `fruit` Utf8 NOT NULL, - `ts` Utf8 - )); - SELECT Ensure( - 0, - `fruit` = "apple" and `ts` = "2024-04-02", - "invalid row: " || Unwrap(`fruit`) || " " || Unwrap(CAST(`ts` as String)) - ) AS value FROM $result; - ''' - query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) data = client.get_result_data(query_id, limit=50) - assert len(data.result.result_set.rows) == 1, "invalid count rows" + rows = data.result.result_set.rows + assert len(rows) == 1, "invalid count rows" + assert rows[0].items[0].text_value == "apple" + assert rows[0].items[1].text_value == "2024-04-02" # date32 -> Utf8 @@ -2070,4 +1983,7 @@ def test_parquet_converters_to_utf8(self, kikimr, s3, client, unique_prefix): query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) data = client.get_result_data(query_id, limit=50) - assert len(data.result.result_set.rows) == 1, "invalid count rows" + rows = data.result.result_set.rows + assert len(rows) == 1, "invalid count rows" + assert rows[0].items[0].text_value == "apple" + assert rows[0].items[1].text_value == "2024-04-02"