From e946a1ba7294935ff7fbb8cf7570b2bec454ad3a Mon Sep 17 00:00:00 2001 From: Aleksandr Chizhevskii Date: Thu, 25 Apr 2024 15:55:27 +0000 Subject: [PATCH 1/4] no runtime --- .../providers/s3/actors/yql_s3_read_actor.cpp | 36 +++++++++---------- .../s3/provider/yql_s3_dq_integration.cpp | 10 +++--- .../s3/provider/yql_s3_logical_opt.cpp | 11 ++++-- ydb/tests/fq/s3/test_size_limit.py | 27 ++++++++++---- 4 files changed, 51 insertions(+), 33 deletions(-) diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp index 520af40c7465..9ce5fd815ff6 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp @@ -160,12 +160,12 @@ struct TEvS3FileQueue { EvEnd }; - static_assert(EvEnd < EventSpaceEnd(NKikimr::TKikimrEvents::ES_S3_FILE_QUEUE), + static_assert(EvEnd < EventSpaceEnd(NKikimr::TKikimrEvents::ES_S3_FILE_QUEUE), "expect EvEnd < EventSpaceEnd(TEvents::ES_S3_FILE_QUEUE)"); - + struct TEvUpdateConsumersCount : public TEventPB { - + explicit TEvUpdateConsumersCount(ui64 consumersCountDelta = 0) { Record.SetConsumersCountDelta(consumersCountDelta); } @@ -173,7 +173,7 @@ struct TEvS3FileQueue { struct TEvAck : public TEventPB { - + TEvAck() = default; explicit TEvAck(const TMessageTransportMeta& transportMeta) { @@ -595,7 +595,7 @@ class TS3FileQueueActor : public TActorBootstrapped { Send(ev->Sender, new TEvS3FileQueue::TEvObjectPathReadError(*MaybeIssues, ev->Get()->Record.GetTransportMeta())); TryFinish(ev->Sender, ev->Get()->Record.GetTransportMeta().GetSeqNo()); } - + void HandleUpdateConsumersCount(TEvS3FileQueue::TEvUpdateConsumersCount::TPtr& ev) { if (!UpdatedConsumers.contains(ev->Sender)) { LOG_D( @@ -649,7 +649,7 @@ class TS3FileQueueActor : public TActorBootstrapped { LOG_T("TS3FileQueueActor", "SendObjects Sending " << result.size() << " objects to consumer with id " << consumer); Send(consumer, new TEvS3FileQueue::TEvObjectPathBatch(std::move(result), HasNoMoreItems(), transportMeta)); - + if (HasNoMoreItems()) { TryFinish(consumer, transportMeta.GetSeqNo()); } @@ -671,7 +671,7 @@ class TS3FileQueueActor : public TActorBootstrapped { } bool CanSendToConsumer(const TActorId& consumer) { - return !UseRuntimeListing || RoundRobinStageFinished || + return !UseRuntimeListing || RoundRobinStageFinished || (StartedConsumers.size() < ConsumersCount && !StartedConsumers.contains(consumer)); } @@ -749,7 +749,7 @@ class TS3FileQueueActor : public TActorBootstrapped { } }); } - + void ScheduleRequest(const TActorId& consumer, const TMessageTransportMeta& transportMeta) { PendingRequests[consumer].push_back(transportMeta); HasPendingRequests = true; @@ -786,7 +786,7 @@ class TS3FileQueueActor : public TActorBootstrapped { } } } - + void TryFinish(const TActorId& consumer, ui64 seqNo) { LOG_T("TS3FileQueueActor", "TryFinish from consumer " << consumer << ", " << FinishedConsumers.size() << " consumers already finished, seqNo=" << seqNo); if (FinishingConsumerToLastSeqNo.contains(consumer)) { @@ -834,7 +834,7 @@ class TS3FileQueueActor : public TActorBootstrapped { const TString Pattern; const ES3PatternVariant PatternVariant; const ES3PatternType PatternType; - + static constexpr TDuration PoisonTimeout = TDuration::Hours(3); static constexpr TDuration RoundRobinStageTimeout = TDuration::Seconds(3); }; @@ -1093,7 +1093,7 @@ class TS3ReadActor : public TActorBootstrapped, public IDqComputeA void HandleAck(TEvS3FileQueue::TEvAck::TPtr& ev) { FileQueueEvents.OnEventReceived(ev); } - + static void OnDownloadFinished(TActorSystem* actorSystem, TActorId selfId, const TString& requestId, IHTTPGateway::TResult&& result, size_t pathInd, const TString path) { if (!result.Issues) { actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvReadResult(std::move(result.Content), requestId, pathInd, path))); @@ -1205,7 +1205,7 @@ class TS3ReadActor : public TActorBootstrapped, public IDqComputeA auto issues = NS3Util::AddParentIssue(TStringBuilder{} << "Error while reading file " << path << " with request id [" << requestId << "]", TIssues{result->Get()->Error}); Send(ComputeActorId, new TEvAsyncInputError(InputIndex, std::move(issues), NYql::NDqProto::StatusIds::EXTERNAL_ERROR)); } - + void Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvRetry::TPtr&) { FileQueueEvents.Retry(); } @@ -2084,7 +2084,7 @@ class TS3ReadCoroImpl : public TActorCoroImpl { if (isCancelled) { LOG_CORO_D("RunCoroBlockArrowParserOverHttp - STOPPED ON SATURATION, downloaded " << QueueBufferCounter->DownloadedBytes << " bytes"); - break; + break; } } } @@ -2780,7 +2780,7 @@ class TS3StreamReadActor : public TActorBootstrapped, public void CommitState(const NDqProto::TCheckpoint&) final {} ui64 GetInputIndex() const final { - return InputIndex; + return InputIndex; } const TDqAsyncStats& GetIngressStats() const final { @@ -3034,7 +3034,7 @@ class TS3StreamReadActor : public TActorBootstrapped, public } } } - + void Handle(TEvS3FileQueue::TEvAck::TPtr& ev) { FileQueueEvents.OnEventReceived(ev); } @@ -3390,7 +3390,7 @@ std::pair CreateS3ReadActor( if (params.GetRowsLimitHint() != 0) { rowsLimitHint = params.GetRowsLimitHint(); } - + TActorId fileQueueActor; if (auto it = settings.find("fileQueueActor"); it != settings.cend()) { NActorsProto::TActorId protoId; @@ -3398,7 +3398,7 @@ std::pair CreateS3ReadActor( ParseFromTextFormat(inputStream, protoId); fileQueueActor = ActorIdFromProto(protoId); } - + ui64 fileQueueBatchSizeLimit = 0; if (auto it = settings.find("fileQueueBatchSizeLimit"); it != settings.cend()) { fileQueueBatchSizeLimit = FromString(it->second); @@ -3408,7 +3408,7 @@ std::pair CreateS3ReadActor( if (auto it = settings.find("fileQueueBatchObjectCountLimit"); it != settings.cend()) { fileQueueBatchObjectCountLimit = FromString(it->second); } - + ui64 fileQueueConsumersCountDelta = 0; if (readRanges.size() > 1) { fileQueueConsumersCountDelta = readRanges.size() - 1; diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp index 94e93ee2b1f5..fe59ac3b9773 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp @@ -410,7 +410,7 @@ class TS3DqIntegration: public TDqIntegrationBase { auto fileQueueBatchObjectCountLimit = State_->Configuration->FileQueueBatchObjectCountLimit.Get().GetOrElse(1000); srcDesc.MutableSettings()->insert({"fileQueueBatchObjectCountLimit", ToString(fileQueueBatchObjectCountLimit)}); - + YQL_CLOG(DEBUG, ProviderS3) << " useRuntimeListing=" << useRuntimeListing; if (useRuntimeListing) { @@ -422,8 +422,8 @@ class TS3DqIntegration: public TDqIntegrationBase { packed.Data().Literal().Value(), FromString(packed.IsText().Literal().Value()), paths); - paths.insert(paths.end(), - std::make_move_iterator(pathsChunk.begin()), + paths.insert(paths.end(), + std::make_move_iterator(pathsChunk.begin()), std::make_move_iterator(pathsChunk.end())); } @@ -434,11 +434,11 @@ class TS3DqIntegration: public TDqIntegrationBase { builder.AddPath(f.Path, f.Size, f.IsDirectory); }); builder.Save(&range); - + TVector serialized(1); TStringOutput out(serialized.front()); range.Save(&out); - + paths.clear(); ReadPathsList(srcDesc, {}, serialized, paths); diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp index fb9d39560680..c6d6e09117bc 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp @@ -233,8 +233,13 @@ class TS3LogicalOptProposalTransformer : public TOptimizeTransformerBase { fileSizeLimit = it->second; } } + + ui64 userSizeLimit = std::numeric_limits::max(); if (formatName == "parquet") { fileSizeLimit = State_->Configuration->BlockFileSizeLimit; + } else if (formatName == "raw") { + const auto sizeLimitParam = dqSource.Input().Cast().SizeLimit().Maybe(); + userSizeLimit = FromString(sizeLimitParam.Cast().StringValue()); } for (const TS3Path& batch : maybeS3SourceSettings.Cast().Paths()) { @@ -245,13 +250,13 @@ class TS3LogicalOptProposalTransformer : public TOptimizeTransformerBase { UnpackPathsList(packed, isTextEncoded, paths); for (auto& entry : paths) { - if (entry.Size > fileSizeLimit) { + if (std::min(entry.Size, userSizeLimit) > fileSizeLimit) { ctx.AddError(TIssue(ctx.GetPosition(batch.Pos()), TStringBuilder() << "Size of object " << entry.Path << " = " << entry.Size << " and exceeds limit = " << fileSizeLimit << " specified for format " << formatName)); hasErr = true; return false; } - totalSize += entry.Size; + totalSize += std::min(entry.Size, userSizeLimit); ++count; } } @@ -673,7 +678,7 @@ class TS3LogicalOptProposalTransformer : public TOptimizeTransformerBase { .RowsLimitHint(count.Literal()) .Build() .Build() - .Done(); + .Done(); } TMaybeNode PushDownLimit(TExprBase node, TExprContext& ctx) const { diff --git a/ydb/tests/fq/s3/test_size_limit.py b/ydb/tests/fq/s3/test_size_limit.py index 74357d82643a..22deb78d5f1f 100644 --- a/ydb/tests/fq/s3/test_size_limit.py +++ b/ydb/tests/fq/s3/test_size_limit.py @@ -13,9 +13,10 @@ class TestS3(TestYdsBase): @yq_all - @pytest.mark.parametrize("limit", [5, 10, 15, 20, 100, 1000]) + @pytest.mark.parametrize("kikimr_params", [{"raw": 20}, {"raw": 150}, {"raw": 1000}], indirect=True) + @pytest.mark.parametrize("limit", [5, 100, 500, 1500]) @pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True) - def test_size_limit(self, kikimr, s3, client, limit, unique_prefix): + def test_size_limit(self, kikimr, s3, client, limit, kikimr_params, unique_prefix): resource = boto3.resource( "s3", endpoint_url=s3.s3_url, @@ -33,25 +34,37 @@ def test_size_limit(self, kikimr, s3, client, limit, unique_prefix): aws_secret_access_key="secret_key" ) - info = "Да и ты, читатель, разве ты не Ничья Рыба и одновременно разве не Рыба на Лине?" + # info = "Да и ты, читатель, разве ты не Ничья Рыба и одновременно разве не Рыба на Лине?" + info = """ + 01234567890123456789012345678901234567890123456789 + 01234567890123456789012345678901234567890123456789 + 01234567890123456789012345678901234567890123456789 + 01234567890123456789012345678901234567890123456789 + """ + s3_client.put_object(Body=info, Bucket='fbucket', Key='info.txt', ContentType='text/plain') kikimr.control_plane.wait_bootstrap(1) storage_connection_name = unique_prefix + "test-connection" client.create_storage_connection(storage_connection_name, "fbucket") - sql = R''' + sql = f''' SELECT data FROM - `{}`.`info.txt` + `{storage_connection_name}`.`info.txt` WITH( - read_max_bytes="{}", + read_max_bytes="{limit}", format=raw, SCHEMA (data String, )) - '''.format(storage_connection_name, limit) + ''' query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id + + if kikimr_params.param['raw'] < min(len(info), limit): + client.wait_query_status(query_id, fq.QueryMeta.FAILED) + return + client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) data = client.get_result_data(query_id) From a66a7e5de580256fe9749dbe077b58fdad01d41b Mon Sep 17 00:00:00 2001 From: Aleksandr Chizhevskii Date: Sat, 27 Apr 2024 16:01:23 +0000 Subject: [PATCH 2/4] runtime listing --- .../providers/s3/actors/yql_s3_read_actor.cpp | 22 +++++++++++++++---- .../providers/s3/actors/yql_s3_read_actor.h | 1 + .../s3/provider/yql_s3_dq_integration.cpp | 6 +++++ .../s3/provider/yql_s3_logical_opt.cpp | 4 +++- ydb/tests/fq/s3/test_format_setting.py | 10 +++------ ydb/tests/fq/s3/test_size_limit.py | 20 ++++++++--------- 6 files changed, 41 insertions(+), 22 deletions(-) diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp index 9ce5fd815ff6..7100b0644443 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp @@ -388,6 +388,7 @@ class TS3FileQueueActor : public TActorBootstrapped { TPathList paths, size_t prefetchSize, ui64 fileSizeLimit, + ui64 readLimit, bool useRuntimeListing, ui64 consumersCount, ui64 batchSizeLimit, @@ -401,6 +402,7 @@ class TS3FileQueueActor : public TActorBootstrapped { : TxId(std::move(txId)) , PrefetchSize(prefetchSize) , FileSizeLimit(fileSizeLimit) + , ReadLimit(readLimit) , MaybeIssues(Nothing()) , UseRuntimeListing(useRuntimeListing) , ConsumersCount(consumersCount) @@ -510,7 +512,7 @@ class TS3FileQueueActor : public TActorBootstrapped { // skip 'directories' continue; } - if (object.Size > FileSizeLimit) { + if (object.Size > std::min(FileSizeLimit, ReadLimit)) { auto errorMessage = TStringBuilder() << "Size of object " << object.Path << " = " << object.Size @@ -522,10 +524,10 @@ class TS3FileQueueActor : public TActorBootstrapped { LOG_T("TS3FileQueueActor", "SaveRetrievedResults adding path: " << object.Path); TObjectPath objectPath; objectPath.SetPath(object.Path); - objectPath.SetSize(object.Size); + objectPath.SetSize(std::min(object.Size, ReadLimit)); objectPath.SetPathIndex(CurrentDirectoryPathIndex); Objects.emplace_back(std::move(objectPath)); - ObjectsTotalSize += object.Size; + ObjectsTotalSize += std::min(object.Size, ReadLimit); } return true; } @@ -810,6 +812,7 @@ class TS3FileQueueActor : public TActorBootstrapped { size_t PrefetchSize; ui64 FileSizeLimit; + ui64 ReadLimit; TMaybe MaybeLister = Nothing(); TMaybe> ListingFuture; size_t CurrentDirectoryPathIndex = 0; @@ -914,6 +917,7 @@ class TS3ReadActor : public TActorBootstrapped, public IDqComputeA std::move(Paths), ReadActorFactoryCfg.MaxInflight * 2, FileSizeLimit, + SizeLimit, false, 1, FileQueueBatchSizeLimit, @@ -2534,6 +2538,7 @@ class TS3StreamReadActor : public TActorBootstrapped, public ::NMonitoring::TDynamicCounterPtr counters, ::NMonitoring::TDynamicCounterPtr taskCounters, ui64 fileSizeLimit, + ui64 readLimit, std::optional rowsLimitHint, IMemoryQuotaManager::TPtr memoryQuotaManager, bool useRuntimeListing, @@ -2560,6 +2565,7 @@ class TS3StreamReadActor : public TActorBootstrapped, public , TaskCounters(std::move(taskCounters)) , FileQueueActor(fileQueueActor) , FileSizeLimit(fileSizeLimit) + , ReadLimit(readLimit) , MemoryQuotaManager(memoryQuotaManager) , UseRuntimeListing(useRuntimeListing) , FileQueueBatchSizeLimit(fileQueueBatchSizeLimit) @@ -2618,6 +2624,7 @@ class TS3StreamReadActor : public TActorBootstrapped, public std::move(Paths), ReadActorFactoryCfg.MaxInflight * 2, FileSizeLimit, + ReadLimit, false, 1, FileQueueBatchSizeLimit, @@ -3132,6 +3139,7 @@ class TS3StreamReadActor : public TActorBootstrapped, public std::set CoroActors; NActors::TActorId FileQueueActor; const ui64 FileSizeLimit; + const ui64 ReadLimit; bool Bootstrapped = false; IMemoryQuotaManager::TPtr MemoryQuotaManager; bool UseRuntimeListing; @@ -3291,6 +3299,7 @@ IActor* CreateS3FileQueueActor( TPathList paths, size_t prefetchSize, ui64 fileSizeLimit, + ui64 readLimit, bool useRuntimeListing, ui64 consumersCount, ui64 batchSizeLimit, @@ -3306,6 +3315,7 @@ IActor* CreateS3FileQueueActor( paths, prefetchSize, fileSizeLimit, + readLimit, useRuntimeListing, consumersCount, batchSizeLimit, @@ -3516,9 +3526,13 @@ std::pair CreateS3ReadActor( #undef SET_FLAG #undef SUPPORTED_FLAGS + ui64 sizeLimit = std::numeric_limits::max(); + if (const auto it = settings.find("sizeLimit"); settings.cend() != it) + sizeLimit = FromString(it->second); + const auto actor = new TS3StreamReadActor(inputIndex, statsLevel, txId, std::move(gateway), holderFactory, params.GetUrl(), authInfo, pathPattern, pathPatternVariant, std::move(paths), addPathIndex, readSpec, computeActorId, retryPolicy, - cfg, counters, taskCounters, fileSizeLimit, rowsLimitHint, memoryQuotaManager, + cfg, counters, taskCounters, fileSizeLimit, sizeLimit, rowsLimitHint, memoryQuotaManager, params.GetUseRuntimeListing(), fileQueueActor, fileQueueBatchSizeLimit, fileQueueBatchObjectCountLimit, fileQueueConsumersCountDelta); return {actor, actor}; diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h index 2b1ca1adeff6..17ddc2ba26fc 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h +++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h @@ -18,6 +18,7 @@ NActors::IActor* CreateS3FileQueueActor( NS3Details::TPathList paths, size_t prefetchSize, ui64 fileSizeLimit, + ui64 readLimit, bool useRuntimeListing, ui64 consumersCount, ui64 batchSizeLimit, diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp index fe59ac3b9773..291ca045260b 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp @@ -485,12 +485,18 @@ class TS3DqIntegration: public TDqIntegrationBase { YQL_CLOG(DEBUG, ProviderS3) << " hasDirectories=" << hasDirectories << ", consumersCount=" << consumersCount; + ui64 readLimit = std::numeric_limits::max(); + if (srcDesc.MutableSettings()->find("sizeLimit") != srcDesc.MutableSettings()->cend()) { + readLimit = FromString(srcDesc.MutableSettings()->at("sizeLimit")); + } + auto fileQueueActor = NActors::TActivationContext::ActorSystem()->Register( NDq::CreateS3FileQueueActor( 0ul, std::move(paths), fileQueuePrefetchSize, fileSizeLimit, + readLimit, useRuntimeListing, consumersCount, fileQueueBatchSizeLimit, diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp index c6d6e09117bc..6886a70b8f6e 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp @@ -239,7 +239,9 @@ class TS3LogicalOptProposalTransformer : public TOptimizeTransformerBase { fileSizeLimit = State_->Configuration->BlockFileSizeLimit; } else if (formatName == "raw") { const auto sizeLimitParam = dqSource.Input().Cast().SizeLimit().Maybe(); - userSizeLimit = FromString(sizeLimitParam.Cast().StringValue()); + if (sizeLimitParam.IsValid()) { + userSizeLimit = FromString(sizeLimitParam.Cast().StringValue()); + } } for (const TS3Path& batch : maybeS3SourceSettings.Cast().Paths()) { diff --git a/ydb/tests/fq/s3/test_format_setting.py b/ydb/tests/fq/s3/test_format_setting.py index 79f3442c8b5c..1b8a77481011 100644 --- a/ydb/tests/fq/s3/test_format_setting.py +++ b/ydb/tests/fq/s3/test_format_setting.py @@ -21,6 +21,7 @@ from google.protobuf import struct_pb2 + class TestS3(TestYdsBase): def create_bucket_and_upload_file(self, filename, s3, kikimr): s3_helpers.create_bucket_and_upload_file(filename, s3.s3_url, "fbucket", "ydb/tests/fq/s3/test_format_settings") @@ -907,7 +908,6 @@ def test_string_not_null_multi(self, kikimr, s3, client, filename, unique_prefix assert data.result.result_set.rows[0].items[1].bytes_value == b"", str(data.result.result_set) assert data.result.result_set.rows[0].items[2].bytes_value == b"", str(data.result.result_set) - @yq_all def test_parquet_converters_to_timestamp(self, kikimr, s3, client, unique_prefix): # timestamp[ms] -> Timestamp @@ -925,7 +925,6 @@ def test_parquet_converters_to_timestamp(self, kikimr, s3, client, unique_prefix pq.write_table(table, yatest_common.work_path(filename)) s3_helpers.create_bucket_and_upload_file(filename, s3.s3_url, "fbucket", yatest_common.work_path()) - kikimr.control_plane.wait_bootstrap(1) storage_connection_name = unique_prefix + "hcpp" client.create_storage_connection(storage_connection_name, "fbucket") @@ -973,10 +972,9 @@ def test_parquet_converters_to_timestamp(self, kikimr, s3, client, unique_prefix data = client.get_result_data(query_id, limit=50) assert len(data.result.result_set.rows) == 1, "invalid count rows" - # timestamp[s] -> Timestamp - # 2024-04-02T12:01:00.000Z + # 2024-04-02T12:01:00.000Z data = [['apple'], [1712059260]] # Define the schema for the data @@ -994,7 +992,6 @@ def test_parquet_converters_to_timestamp(self, kikimr, s3, client, unique_prefix data = client.get_result_data(query_id, limit=50) assert len(data.result.result_set.rows) == 1, "invalid count rows" - # timestamp[ns] -> Timestamp # 2024-04-02T12:01:00.000Z @@ -1430,7 +1427,6 @@ def test_parquet_converters_to_datetime(self, kikimr, s3, client, unique_prefix) pq.write_table(table, yatest_common.work_path(filename)) s3_helpers.create_bucket_and_upload_file(filename, s3.s3_url, "fbucket", yatest_common.work_path()) - kikimr.control_plane.wait_bootstrap(1) storage_connection_name = unique_prefix + "hcpp" client.create_storage_connection(storage_connection_name, "fbucket") @@ -1482,7 +1478,7 @@ def test_parquet_converters_to_datetime(self, kikimr, s3, client, unique_prefix) # timestamp[s] -> Timestamp - # 2024-04-02T12:01:00.000Z + # 2024-04-02T12:01:00.000Z data = [['apple'], [1712059260]] # Define the schema for the data diff --git a/ydb/tests/fq/s3/test_size_limit.py b/ydb/tests/fq/s3/test_size_limit.py index 22deb78d5f1f..bd914d515285 100644 --- a/ydb/tests/fq/s3/test_size_limit.py +++ b/ydb/tests/fq/s3/test_size_limit.py @@ -13,10 +13,11 @@ class TestS3(TestYdsBase): @yq_all + @pytest.mark.parametrize("runtime_listing", ["false", "true"]) @pytest.mark.parametrize("kikimr_params", [{"raw": 20}, {"raw": 150}, {"raw": 1000}], indirect=True) - @pytest.mark.parametrize("limit", [5, 100, 500, 1500]) + @pytest.mark.parametrize("limit", [5, 100, 500]) @pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True) - def test_size_limit(self, kikimr, s3, client, limit, kikimr_params, unique_prefix): + def test_size_limit(self, kikimr, s3, client, limit, kikimr_params, runtime_listing, unique_prefix): resource = boto3.resource( "s3", endpoint_url=s3.s3_url, @@ -34,7 +35,6 @@ def test_size_limit(self, kikimr, s3, client, limit, kikimr_params, unique_prefi aws_secret_access_key="secret_key" ) - # info = "Да и ты, читатель, разве ты не Ничья Рыба и одновременно разве не Рыба на Лине?" info = """ 01234567890123456789012345678901234567890123456789 01234567890123456789012345678901234567890123456789 @@ -48,6 +48,7 @@ def test_size_limit(self, kikimr, s3, client, limit, kikimr_params, unique_prefi client.create_storage_connection(storage_connection_name, "fbucket") sql = f''' + pragma s3.UseRuntimeListing="{runtime_listing}"; SELECT data FROM @@ -63,11 +64,10 @@ def test_size_limit(self, kikimr, s3, client, limit, kikimr_params, unique_prefi if kikimr_params.param['raw'] < min(len(info), limit): client.wait_query_status(query_id, fq.QueryMeta.FAILED) - return + else: + client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) - client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) - - data = client.get_result_data(query_id) - result_set = data.result.result_set - result = result_set.rows[0].items[0].bytes_value - assert result == info.encode()[:limit] + data = client.get_result_data(query_id) + result_set = data.result.result_set + result = result_set.rows[0].items[0].bytes_value + assert result == info.encode()[:limit] From a06426884598f0c7026f55b4ea924bd10d9e8f03 Mon Sep 17 00:00:00 2001 From: Aleksandr Chizhevskii Date: Thu, 2 May 2024 13:21:26 +0000 Subject: [PATCH 3/4] pr fix --- ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp | 3 ++- .../yql/providers/s3/provider/yql_s3_dq_integration.cpp | 4 ++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp index 7100b0644443..e31296f37433 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp @@ -3527,8 +3527,9 @@ std::pair CreateS3ReadActor( #undef SET_FLAG #undef SUPPORTED_FLAGS ui64 sizeLimit = std::numeric_limits::max(); - if (const auto it = settings.find("sizeLimit"); settings.cend() != it) + if (const auto it = settings.find("sizeLimit"); settings.cend() != it) { sizeLimit = FromString(it->second); + } const auto actor = new TS3StreamReadActor(inputIndex, statsLevel, txId, std::move(gateway), holderFactory, params.GetUrl(), authInfo, pathPattern, pathPatternVariant, std::move(paths), addPathIndex, readSpec, computeActorId, retryPolicy, diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp index 291ca045260b..56a2d92693e9 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp @@ -486,8 +486,8 @@ class TS3DqIntegration: public TDqIntegrationBase { YQL_CLOG(DEBUG, ProviderS3) << " hasDirectories=" << hasDirectories << ", consumersCount=" << consumersCount; ui64 readLimit = std::numeric_limits::max(); - if (srcDesc.MutableSettings()->find("sizeLimit") != srcDesc.MutableSettings()->cend()) { - readLimit = FromString(srcDesc.MutableSettings()->at("sizeLimit")); + if (const auto sizeLimitIter = srcDesc.MutableSettings()->find("sizeLimit"); sizeLimitIter != srcDesc.MutableSettings()->cend()) { + readLimit = FromString(sizeLimitIter->second); } auto fileQueueActor = NActors::TActivationContext::ActorSystem()->Register( From 4d97d299f098f6a834a4069c67cb30a4f135af47 Mon Sep 17 00:00:00 2001 From: Aleksandr Chizhevskii Date: Fri, 3 May 2024 16:01:46 +0000 Subject: [PATCH 4/4] removed min() --- ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp | 8 +++++--- .../yql/providers/s3/provider/yql_s3_logical_opt.cpp | 5 +++-- ydb/tests/fq/s3/test_size_limit.py | 7 ++++--- 3 files changed, 12 insertions(+), 8 deletions(-) diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp index e31296f37433..0082a7a814b2 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp @@ -512,7 +512,9 @@ class TS3FileQueueActor : public TActorBootstrapped { // skip 'directories' continue; } - if (object.Size > std::min(FileSizeLimit, ReadLimit)) { + + const ui64 bytesUsed = std::min(object.Size, ReadLimit); + if (bytesUsed > FileSizeLimit) { auto errorMessage = TStringBuilder() << "Size of object " << object.Path << " = " << object.Size @@ -524,10 +526,10 @@ class TS3FileQueueActor : public TActorBootstrapped { LOG_T("TS3FileQueueActor", "SaveRetrievedResults adding path: " << object.Path); TObjectPath objectPath; objectPath.SetPath(object.Path); - objectPath.SetSize(std::min(object.Size, ReadLimit)); + objectPath.SetSize(bytesUsed); objectPath.SetPathIndex(CurrentDirectoryPathIndex); Objects.emplace_back(std::move(objectPath)); - ObjectsTotalSize += std::min(object.Size, ReadLimit); + ObjectsTotalSize += bytesUsed; } return true; } diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp index 6886a70b8f6e..429f94293e02 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp @@ -252,13 +252,14 @@ class TS3LogicalOptProposalTransformer : public TOptimizeTransformerBase { UnpackPathsList(packed, isTextEncoded, paths); for (auto& entry : paths) { - if (std::min(entry.Size, userSizeLimit) > fileSizeLimit) { + const ui64 bytesUsed = std::min(entry.Size, userSizeLimit); + if (bytesUsed > fileSizeLimit) { ctx.AddError(TIssue(ctx.GetPosition(batch.Pos()), TStringBuilder() << "Size of object " << entry.Path << " = " << entry.Size << " and exceeds limit = " << fileSizeLimit << " specified for format " << formatName)); hasErr = true; return false; } - totalSize += std::min(entry.Size, userSizeLimit); + totalSize += bytesUsed; ++count; } } diff --git a/ydb/tests/fq/s3/test_size_limit.py b/ydb/tests/fq/s3/test_size_limit.py index bd914d515285..c1e620285fc9 100644 --- a/ydb/tests/fq/s3/test_size_limit.py +++ b/ydb/tests/fq/s3/test_size_limit.py @@ -62,11 +62,12 @@ def test_size_limit(self, kikimr, s3, client, limit, kikimr_params, runtime_list query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id + expected_status = fq.QueryMeta.COMPLETED if kikimr_params.param['raw'] < min(len(info), limit): - client.wait_query_status(query_id, fq.QueryMeta.FAILED) - else: - client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) + expected_status = fq.QueryMeta.FAILED + client.wait_query_status(query_id, expected_status) + if expected_status == fq.QueryMeta.COMPLETED: data = client.get_result_data(query_id) result_set = data.result.result_set result = result_set.rows[0].items[0].bytes_value