diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp index c697201f6a38..d0c29ad00ace 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp @@ -160,12 +160,12 @@ struct TEvS3FileQueue { EvEnd }; - static_assert(EvEnd < EventSpaceEnd(NKikimr::TKikimrEvents::ES_S3_FILE_QUEUE), + static_assert(EvEnd < EventSpaceEnd(NKikimr::TKikimrEvents::ES_S3_FILE_QUEUE), "expect EvEnd < EventSpaceEnd(TEvents::ES_S3_FILE_QUEUE)"); - + struct TEvUpdateConsumersCount : public TEventPB { - + explicit TEvUpdateConsumersCount(ui64 consumersCountDelta = 0) { Record.SetConsumersCountDelta(consumersCountDelta); } @@ -173,7 +173,7 @@ struct TEvS3FileQueue { struct TEvAck : public TEventPB { - + TEvAck() = default; explicit TEvAck(const TMessageTransportMeta& transportMeta) { @@ -388,6 +388,7 @@ class TS3FileQueueActor : public TActorBootstrapped { TPathList paths, size_t prefetchSize, ui64 fileSizeLimit, + ui64 readLimit, bool useRuntimeListing, ui64 consumersCount, ui64 batchSizeLimit, @@ -401,6 +402,7 @@ class TS3FileQueueActor : public TActorBootstrapped { : TxId(std::move(txId)) , PrefetchSize(prefetchSize) , FileSizeLimit(fileSizeLimit) + , ReadLimit(readLimit) , MaybeIssues(Nothing()) , UseRuntimeListing(useRuntimeListing) , ConsumersCount(consumersCount) @@ -513,7 +515,9 @@ class TS3FileQueueActor : public TActorBootstrapped { // skip 'directories' continue; } - if (object.Size > FileSizeLimit) { + + const ui64 bytesUsed = std::min(object.Size, ReadLimit); + if (bytesUsed > FileSizeLimit) { auto errorMessage = TStringBuilder() << "Size of object " << object.Path << " = " << object.Size @@ -525,10 +529,10 @@ class TS3FileQueueActor : public TActorBootstrapped { LOG_T("TS3FileQueueActor", "SaveRetrievedResults adding path: " << object.Path << " of size " << object.Size); TObjectPath objectPath; objectPath.SetPath(object.Path); - objectPath.SetSize(object.Size); + objectPath.SetSize(bytesUsed); objectPath.SetPathIndex(CurrentDirectoryPathIndex); Objects.emplace_back(std::move(objectPath)); - ObjectsTotalSize += object.Size; + ObjectsTotalSize += bytesUsed; } return true; } @@ -598,7 +602,7 @@ class TS3FileQueueActor : public TActorBootstrapped { Send(ev->Sender, new TEvS3FileQueue::TEvObjectPathReadError(*MaybeIssues, ev->Get()->Record.GetTransportMeta())); TryFinish(ev->Sender, ev->Get()->Record.GetTransportMeta().GetSeqNo()); } - + void HandleUpdateConsumersCount(TEvS3FileQueue::TEvUpdateConsumersCount::TPtr& ev) { if (!UpdatedConsumers.contains(ev->Sender)) { UpdatedConsumers.insert(ev->Sender); @@ -653,7 +657,7 @@ class TS3FileQueueActor : public TActorBootstrapped { LOG_D("TS3FileQueueActor", "SendObjects Sending " << result.size() << " objects to consumer with id " << consumer << ", " << ObjectsTotalSize << " bytes left"); Send(consumer, new TEvS3FileQueue::TEvObjectPathBatch(std::move(result), HasNoMoreItems(), transportMeta)); - + if (HasNoMoreItems()) { TryFinish(consumer, transportMeta.GetSeqNo()); } @@ -675,7 +679,7 @@ class TS3FileQueueActor : public TActorBootstrapped { } bool CanSendToConsumer(const TActorId& consumer) { - return !UseRuntimeListing || RoundRobinStageFinished || + return !UseRuntimeListing || RoundRobinStageFinished || (StartedConsumers.size() < ConsumersCount && !StartedConsumers.contains(consumer)); } @@ -753,7 +757,7 @@ class TS3FileQueueActor : public TActorBootstrapped { } }); } - + void ScheduleRequest(const TActorId& consumer, const TMessageTransportMeta& transportMeta) { PendingRequests[consumer].push_back(transportMeta); HasPendingRequests = true; @@ -790,7 +794,7 @@ class TS3FileQueueActor : public TActorBootstrapped { } } } - + void TryFinish(const TActorId& consumer, ui64 seqNo) { LOG_T("TS3FileQueueActor", "TryFinish from consumer " << consumer << ", " << FinishedConsumers.size() << " consumers already finished, seqNo=" << seqNo); if (FinishingConsumerToLastSeqNo.contains(consumer)) { @@ -814,6 +818,7 @@ class TS3FileQueueActor : public TActorBootstrapped { size_t PrefetchSize; ui64 FileSizeLimit; + ui64 ReadLimit; TMaybe MaybeLister = Nothing(); TMaybe> ListingFuture; size_t CurrentDirectoryPathIndex = 0; @@ -838,7 +843,7 @@ class TS3FileQueueActor : public TActorBootstrapped { const TString Pattern; const ES3PatternVariant PatternVariant; const ES3PatternType PatternType; - + static constexpr TDuration PoisonTimeout = TDuration::Hours(3); static constexpr TDuration RoundRobinStageTimeout = TDuration::Seconds(3); }; @@ -918,6 +923,7 @@ class TS3ReadActor : public TActorBootstrapped, public IDqComputeA std::move(Paths), ReadActorFactoryCfg.MaxInflight * 2, FileSizeLimit, + SizeLimit, false, 1, FileQueueBatchSizeLimit, @@ -1097,7 +1103,7 @@ class TS3ReadActor : public TActorBootstrapped, public IDqComputeA void HandleAck(TEvS3FileQueue::TEvAck::TPtr& ev) { FileQueueEvents.OnEventReceived(ev); } - + static void OnDownloadFinished(TActorSystem* actorSystem, TActorId selfId, const TString& requestId, IHTTPGateway::TResult&& result, size_t pathInd, const TString path) { if (!result.Issues) { actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvReadResult(std::move(result.Content), requestId, pathInd, path))); @@ -1209,7 +1215,7 @@ class TS3ReadActor : public TActorBootstrapped, public IDqComputeA auto issues = NS3Util::AddParentIssue(TStringBuilder{} << "Error while reading file " << path << " with request id [" << requestId << "]", TIssues{result->Get()->Error}); Send(ComputeActorId, new TEvAsyncInputError(InputIndex, std::move(issues), NYql::NDqProto::StatusIds::EXTERNAL_ERROR)); } - + void Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvRetry::TPtr&) { FileQueueEvents.Retry(); } @@ -2088,7 +2094,7 @@ class TS3ReadCoroImpl : public TActorCoroImpl { if (isCancelled) { LOG_CORO_D("RunCoroBlockArrowParserOverHttp - STOPPED ON SATURATION, downloaded " << QueueBufferCounter->DownloadedBytes << " bytes"); - break; + break; } } } @@ -2538,6 +2544,7 @@ class TS3StreamReadActor : public TActorBootstrapped, public ::NMonitoring::TDynamicCounterPtr counters, ::NMonitoring::TDynamicCounterPtr taskCounters, ui64 fileSizeLimit, + ui64 readLimit, std::optional rowsLimitHint, IMemoryQuotaManager::TPtr memoryQuotaManager, bool useRuntimeListing, @@ -2564,6 +2571,7 @@ class TS3StreamReadActor : public TActorBootstrapped, public , TaskCounters(std::move(taskCounters)) , FileQueueActor(fileQueueActor) , FileSizeLimit(fileSizeLimit) + , ReadLimit(readLimit) , MemoryQuotaManager(memoryQuotaManager) , UseRuntimeListing(useRuntimeListing) , FileQueueBatchSizeLimit(fileQueueBatchSizeLimit) @@ -2622,6 +2630,7 @@ class TS3StreamReadActor : public TActorBootstrapped, public std::move(Paths), ReadActorFactoryCfg.MaxInflight * 2, FileSizeLimit, + ReadLimit, false, 1, FileQueueBatchSizeLimit, @@ -2784,7 +2793,7 @@ class TS3StreamReadActor : public TActorBootstrapped, public void CommitState(const NDqProto::TCheckpoint&) final {} ui64 GetInputIndex() const final { - return InputIndex; + return InputIndex; } const TDqAsyncStats& GetIngressStats() const final { @@ -3038,7 +3047,7 @@ class TS3StreamReadActor : public TActorBootstrapped, public } } } - + void Handle(TEvS3FileQueue::TEvAck::TPtr& ev) { FileQueueEvents.OnEventReceived(ev); } @@ -3136,6 +3145,7 @@ class TS3StreamReadActor : public TActorBootstrapped, public std::set CoroActors; NActors::TActorId FileQueueActor; const ui64 FileSizeLimit; + const ui64 ReadLimit; bool Bootstrapped = false; IMemoryQuotaManager::TPtr MemoryQuotaManager; bool UseRuntimeListing; @@ -3295,6 +3305,7 @@ IActor* CreateS3FileQueueActor( TPathList paths, size_t prefetchSize, ui64 fileSizeLimit, + ui64 readLimit, bool useRuntimeListing, ui64 consumersCount, ui64 batchSizeLimit, @@ -3310,6 +3321,7 @@ IActor* CreateS3FileQueueActor( paths, prefetchSize, fileSizeLimit, + readLimit, useRuntimeListing, consumersCount, batchSizeLimit, @@ -3394,7 +3406,7 @@ std::pair CreateS3ReadActor( if (params.GetRowsLimitHint() != 0) { rowsLimitHint = params.GetRowsLimitHint(); } - + TActorId fileQueueActor; if (auto it = settings.find("fileQueueActor"); it != settings.cend()) { NActorsProto::TActorId protoId; @@ -3402,7 +3414,7 @@ std::pair CreateS3ReadActor( ParseFromTextFormat(inputStream, protoId); fileQueueActor = ActorIdFromProto(protoId); } - + ui64 fileQueueBatchSizeLimit = 0; if (auto it = settings.find("fileQueueBatchSizeLimit"); it != settings.cend()) { fileQueueBatchSizeLimit = FromString(it->second); @@ -3412,7 +3424,7 @@ std::pair CreateS3ReadActor( if (auto it = settings.find("fileQueueBatchObjectCountLimit"); it != settings.cend()) { fileQueueBatchObjectCountLimit = FromString(it->second); } - + ui64 fileQueueConsumersCountDelta = 0; if (readRanges.size() > 1) { fileQueueConsumersCountDelta = readRanges.size() - 1; @@ -3520,9 +3532,14 @@ std::pair CreateS3ReadActor( #undef SET_FLAG #undef SUPPORTED_FLAGS + ui64 sizeLimit = std::numeric_limits::max(); + if (const auto it = settings.find("sizeLimit"); settings.cend() != it) { + sizeLimit = FromString(it->second); + } + const auto actor = new TS3StreamReadActor(inputIndex, statsLevel, txId, std::move(gateway), holderFactory, params.GetUrl(), authInfo, pathPattern, pathPatternVariant, std::move(paths), addPathIndex, readSpec, computeActorId, retryPolicy, - cfg, counters, taskCounters, fileSizeLimit, rowsLimitHint, memoryQuotaManager, + cfg, counters, taskCounters, fileSizeLimit, sizeLimit, rowsLimitHint, memoryQuotaManager, params.GetUseRuntimeListing(), fileQueueActor, fileQueueBatchSizeLimit, fileQueueBatchObjectCountLimit, fileQueueConsumersCountDelta); return {actor, actor}; diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h index 2b1ca1adeff6..17ddc2ba26fc 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h +++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h @@ -18,6 +18,7 @@ NActors::IActor* CreateS3FileQueueActor( NS3Details::TPathList paths, size_t prefetchSize, ui64 fileSizeLimit, + ui64 readLimit, bool useRuntimeListing, ui64 consumersCount, ui64 batchSizeLimit, diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp index 94e93ee2b1f5..56a2d92693e9 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp @@ -410,7 +410,7 @@ class TS3DqIntegration: public TDqIntegrationBase { auto fileQueueBatchObjectCountLimit = State_->Configuration->FileQueueBatchObjectCountLimit.Get().GetOrElse(1000); srcDesc.MutableSettings()->insert({"fileQueueBatchObjectCountLimit", ToString(fileQueueBatchObjectCountLimit)}); - + YQL_CLOG(DEBUG, ProviderS3) << " useRuntimeListing=" << useRuntimeListing; if (useRuntimeListing) { @@ -422,8 +422,8 @@ class TS3DqIntegration: public TDqIntegrationBase { packed.Data().Literal().Value(), FromString(packed.IsText().Literal().Value()), paths); - paths.insert(paths.end(), - std::make_move_iterator(pathsChunk.begin()), + paths.insert(paths.end(), + std::make_move_iterator(pathsChunk.begin()), std::make_move_iterator(pathsChunk.end())); } @@ -434,11 +434,11 @@ class TS3DqIntegration: public TDqIntegrationBase { builder.AddPath(f.Path, f.Size, f.IsDirectory); }); builder.Save(&range); - + TVector serialized(1); TStringOutput out(serialized.front()); range.Save(&out); - + paths.clear(); ReadPathsList(srcDesc, {}, serialized, paths); @@ -485,12 +485,18 @@ class TS3DqIntegration: public TDqIntegrationBase { YQL_CLOG(DEBUG, ProviderS3) << " hasDirectories=" << hasDirectories << ", consumersCount=" << consumersCount; + ui64 readLimit = std::numeric_limits::max(); + if (const auto sizeLimitIter = srcDesc.MutableSettings()->find("sizeLimit"); sizeLimitIter != srcDesc.MutableSettings()->cend()) { + readLimit = FromString(sizeLimitIter->second); + } + auto fileQueueActor = NActors::TActivationContext::ActorSystem()->Register( NDq::CreateS3FileQueueActor( 0ul, std::move(paths), fileQueuePrefetchSize, fileSizeLimit, + readLimit, useRuntimeListing, consumersCount, fileQueueBatchSizeLimit, diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp index fb9d39560680..429f94293e02 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp @@ -233,8 +233,15 @@ class TS3LogicalOptProposalTransformer : public TOptimizeTransformerBase { fileSizeLimit = it->second; } } + + ui64 userSizeLimit = std::numeric_limits::max(); if (formatName == "parquet") { fileSizeLimit = State_->Configuration->BlockFileSizeLimit; + } else if (formatName == "raw") { + const auto sizeLimitParam = dqSource.Input().Cast().SizeLimit().Maybe(); + if (sizeLimitParam.IsValid()) { + userSizeLimit = FromString(sizeLimitParam.Cast().StringValue()); + } } for (const TS3Path& batch : maybeS3SourceSettings.Cast().Paths()) { @@ -245,13 +252,14 @@ class TS3LogicalOptProposalTransformer : public TOptimizeTransformerBase { UnpackPathsList(packed, isTextEncoded, paths); for (auto& entry : paths) { - if (entry.Size > fileSizeLimit) { + const ui64 bytesUsed = std::min(entry.Size, userSizeLimit); + if (bytesUsed > fileSizeLimit) { ctx.AddError(TIssue(ctx.GetPosition(batch.Pos()), TStringBuilder() << "Size of object " << entry.Path << " = " << entry.Size << " and exceeds limit = " << fileSizeLimit << " specified for format " << formatName)); hasErr = true; return false; } - totalSize += entry.Size; + totalSize += bytesUsed; ++count; } } @@ -673,7 +681,7 @@ class TS3LogicalOptProposalTransformer : public TOptimizeTransformerBase { .RowsLimitHint(count.Literal()) .Build() .Build() - .Done(); + .Done(); } TMaybeNode PushDownLimit(TExprBase node, TExprContext& ctx) const {