Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

YQ-2302: read_max_bytes bypasses file_size_limit #4117

Merged
merged 4 commits into from
May 27, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 37 additions & 22 deletions ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,20 +160,20 @@ struct TEvS3FileQueue {

EvEnd
};
static_assert(EvEnd < EventSpaceEnd(NKikimr::TKikimrEvents::ES_S3_FILE_QUEUE),
static_assert(EvEnd < EventSpaceEnd(NKikimr::TKikimrEvents::ES_S3_FILE_QUEUE),
"expect EvEnd < EventSpaceEnd(TEvents::ES_S3_FILE_QUEUE)");

struct TEvUpdateConsumersCount :
public TEventPB<TEvUpdateConsumersCount, NS3::FileQueue::TEvUpdateConsumersCount, EvUpdateConsumersCount> {

explicit TEvUpdateConsumersCount(ui64 consumersCountDelta = 0) {
Record.SetConsumersCountDelta(consumersCountDelta);
}
};

struct TEvAck :
public TEventPB<TEvAck, NS3::FileQueue::TEvAck, EvAck> {

TEvAck() = default;

explicit TEvAck(const TMessageTransportMeta& transportMeta) {
Expand Down Expand Up @@ -388,6 +388,7 @@ class TS3FileQueueActor : public TActorBootstrapped<TS3FileQueueActor> {
TPathList paths,
size_t prefetchSize,
ui64 fileSizeLimit,
ui64 readLimit,
bool useRuntimeListing,
ui64 consumersCount,
ui64 batchSizeLimit,
Expand All @@ -401,6 +402,7 @@ class TS3FileQueueActor : public TActorBootstrapped<TS3FileQueueActor> {
: TxId(std::move(txId))
, PrefetchSize(prefetchSize)
, FileSizeLimit(fileSizeLimit)
, ReadLimit(readLimit)
, MaybeIssues(Nothing())
, UseRuntimeListing(useRuntimeListing)
, ConsumersCount(consumersCount)
Expand Down Expand Up @@ -510,7 +512,7 @@ class TS3FileQueueActor : public TActorBootstrapped<TS3FileQueueActor> {
// skip 'directories'
continue;
}
if (object.Size > FileSizeLimit) {
if (object.Size > std::min(FileSizeLimit, ReadLimit)) {
auto errorMessage = TStringBuilder()
<< "Size of object " << object.Path << " = "
<< object.Size
Expand All @@ -522,10 +524,10 @@ class TS3FileQueueActor : public TActorBootstrapped<TS3FileQueueActor> {
LOG_T("TS3FileQueueActor", "SaveRetrievedResults adding path: " << object.Path);
TObjectPath objectPath;
objectPath.SetPath(object.Path);
objectPath.SetSize(object.Size);
objectPath.SetSize(std::min(object.Size, ReadLimit));
objectPath.SetPathIndex(CurrentDirectoryPathIndex);
Objects.emplace_back(std::move(objectPath));
ObjectsTotalSize += object.Size;
ObjectsTotalSize += std::min(object.Size, ReadLimit);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

копипаста выражения. или в переменную положи или вытащи через objectPath.GetSize();

}
return true;
}
Expand Down Expand Up @@ -595,7 +597,7 @@ class TS3FileQueueActor : public TActorBootstrapped<TS3FileQueueActor> {
Send(ev->Sender, new TEvS3FileQueue::TEvObjectPathReadError(*MaybeIssues, ev->Get()->Record.GetTransportMeta()));
TryFinish(ev->Sender, ev->Get()->Record.GetTransportMeta().GetSeqNo());
}

void HandleUpdateConsumersCount(TEvS3FileQueue::TEvUpdateConsumersCount::TPtr& ev) {
if (!UpdatedConsumers.contains(ev->Sender)) {
LOG_D(
Expand Down Expand Up @@ -649,7 +651,7 @@ class TS3FileQueueActor : public TActorBootstrapped<TS3FileQueueActor> {

LOG_T("TS3FileQueueActor", "SendObjects Sending " << result.size() << " objects to consumer with id " << consumer);
Send(consumer, new TEvS3FileQueue::TEvObjectPathBatch(std::move(result), HasNoMoreItems(), transportMeta));

if (HasNoMoreItems()) {
TryFinish(consumer, transportMeta.GetSeqNo());
}
Expand All @@ -671,7 +673,7 @@ class TS3FileQueueActor : public TActorBootstrapped<TS3FileQueueActor> {
}

bool CanSendToConsumer(const TActorId& consumer) {
return !UseRuntimeListing || RoundRobinStageFinished ||
return !UseRuntimeListing || RoundRobinStageFinished ||
(StartedConsumers.size() < ConsumersCount && !StartedConsumers.contains(consumer));
}

Expand Down Expand Up @@ -749,7 +751,7 @@ class TS3FileQueueActor : public TActorBootstrapped<TS3FileQueueActor> {
}
});
}

void ScheduleRequest(const TActorId& consumer, const TMessageTransportMeta& transportMeta) {
PendingRequests[consumer].push_back(transportMeta);
HasPendingRequests = true;
Expand Down Expand Up @@ -786,7 +788,7 @@ class TS3FileQueueActor : public TActorBootstrapped<TS3FileQueueActor> {
}
}
}

void TryFinish(const TActorId& consumer, ui64 seqNo) {
LOG_T("TS3FileQueueActor", "TryFinish from consumer " << consumer << ", " << FinishedConsumers.size() << " consumers already finished, seqNo=" << seqNo);
if (FinishingConsumerToLastSeqNo.contains(consumer)) {
Expand All @@ -810,6 +812,7 @@ class TS3FileQueueActor : public TActorBootstrapped<TS3FileQueueActor> {

size_t PrefetchSize;
ui64 FileSizeLimit;
ui64 ReadLimit;
TMaybe<NS3Lister::IS3Lister::TPtr> MaybeLister = Nothing();
TMaybe<NThreading::TFuture<NS3Lister::TListResult>> ListingFuture;
size_t CurrentDirectoryPathIndex = 0;
Expand All @@ -834,7 +837,7 @@ class TS3FileQueueActor : public TActorBootstrapped<TS3FileQueueActor> {
const TString Pattern;
const ES3PatternVariant PatternVariant;
const ES3PatternType PatternType;

static constexpr TDuration PoisonTimeout = TDuration::Hours(3);
static constexpr TDuration RoundRobinStageTimeout = TDuration::Seconds(3);
};
Expand Down Expand Up @@ -914,6 +917,7 @@ class TS3ReadActor : public TActorBootstrapped<TS3ReadActor>, public IDqComputeA
std::move(Paths),
ReadActorFactoryCfg.MaxInflight * 2,
FileSizeLimit,
SizeLimit,
false,
1,
FileQueueBatchSizeLimit,
Expand Down Expand Up @@ -1093,7 +1097,7 @@ class TS3ReadActor : public TActorBootstrapped<TS3ReadActor>, public IDqComputeA
void HandleAck(TEvS3FileQueue::TEvAck::TPtr& ev) {
FileQueueEvents.OnEventReceived(ev);
}

static void OnDownloadFinished(TActorSystem* actorSystem, TActorId selfId, const TString& requestId, IHTTPGateway::TResult&& result, size_t pathInd, const TString path) {
if (!result.Issues) {
actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvReadResult(std::move(result.Content), requestId, pathInd, path)));
Expand Down Expand Up @@ -1205,7 +1209,7 @@ class TS3ReadActor : public TActorBootstrapped<TS3ReadActor>, public IDqComputeA
auto issues = NS3Util::AddParentIssue(TStringBuilder{} << "Error while reading file " << path << " with request id [" << requestId << "]", TIssues{result->Get()->Error});
Send(ComputeActorId, new TEvAsyncInputError(InputIndex, std::move(issues), NYql::NDqProto::StatusIds::EXTERNAL_ERROR));
}

void Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvRetry::TPtr&) {
FileQueueEvents.Retry();
}
Expand Down Expand Up @@ -2084,7 +2088,7 @@ class TS3ReadCoroImpl : public TActorCoroImpl {
if (isCancelled) {
LOG_CORO_D("RunCoroBlockArrowParserOverHttp - STOPPED ON SATURATION, downloaded " <<
QueueBufferCounter->DownloadedBytes << " bytes");
break;
break;
}
}
}
Expand Down Expand Up @@ -2534,6 +2538,7 @@ class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public
::NMonitoring::TDynamicCounterPtr counters,
::NMonitoring::TDynamicCounterPtr taskCounters,
ui64 fileSizeLimit,
ui64 readLimit,
std::optional<ui64> rowsLimitHint,
IMemoryQuotaManager::TPtr memoryQuotaManager,
bool useRuntimeListing,
Expand All @@ -2560,6 +2565,7 @@ class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public
, TaskCounters(std::move(taskCounters))
, FileQueueActor(fileQueueActor)
, FileSizeLimit(fileSizeLimit)
, ReadLimit(readLimit)
, MemoryQuotaManager(memoryQuotaManager)
, UseRuntimeListing(useRuntimeListing)
, FileQueueBatchSizeLimit(fileQueueBatchSizeLimit)
Expand Down Expand Up @@ -2618,6 +2624,7 @@ class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public
std::move(Paths),
ReadActorFactoryCfg.MaxInflight * 2,
FileSizeLimit,
ReadLimit,
false,
1,
FileQueueBatchSizeLimit,
Expand Down Expand Up @@ -2780,7 +2787,7 @@ class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public
void CommitState(const NDqProto::TCheckpoint&) final {}

ui64 GetInputIndex() const final {
return InputIndex;
return InputIndex;
}

const TDqAsyncStats& GetIngressStats() const final {
Expand Down Expand Up @@ -3034,7 +3041,7 @@ class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public
}
}
}

void Handle(TEvS3FileQueue::TEvAck::TPtr& ev) {
FileQueueEvents.OnEventReceived(ev);
}
Expand Down Expand Up @@ -3132,6 +3139,7 @@ class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public
std::set<NActors::TActorId> CoroActors;
NActors::TActorId FileQueueActor;
const ui64 FileSizeLimit;
const ui64 ReadLimit;
bool Bootstrapped = false;
IMemoryQuotaManager::TPtr MemoryQuotaManager;
bool UseRuntimeListing;
Expand Down Expand Up @@ -3291,6 +3299,7 @@ IActor* CreateS3FileQueueActor(
TPathList paths,
size_t prefetchSize,
ui64 fileSizeLimit,
ui64 readLimit,
bool useRuntimeListing,
ui64 consumersCount,
ui64 batchSizeLimit,
Expand All @@ -3306,6 +3315,7 @@ IActor* CreateS3FileQueueActor(
paths,
prefetchSize,
fileSizeLimit,
readLimit,
useRuntimeListing,
consumersCount,
batchSizeLimit,
Expand Down Expand Up @@ -3390,15 +3400,15 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor(
if (params.GetRowsLimitHint() != 0) {
rowsLimitHint = params.GetRowsLimitHint();
}

TActorId fileQueueActor;
if (auto it = settings.find("fileQueueActor"); it != settings.cend()) {
NActorsProto::TActorId protoId;
TMemoryInput inputStream(it->second);
ParseFromTextFormat(inputStream, protoId);
fileQueueActor = ActorIdFromProto(protoId);
}

ui64 fileQueueBatchSizeLimit = 0;
if (auto it = settings.find("fileQueueBatchSizeLimit"); it != settings.cend()) {
fileQueueBatchSizeLimit = FromString<ui64>(it->second);
Expand All @@ -3408,7 +3418,7 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor(
if (auto it = settings.find("fileQueueBatchObjectCountLimit"); it != settings.cend()) {
fileQueueBatchObjectCountLimit = FromString<ui64>(it->second);
}

ui64 fileQueueConsumersCountDelta = 0;
if (readRanges.size() > 1) {
fileQueueConsumersCountDelta = readRanges.size() - 1;
Expand Down Expand Up @@ -3516,9 +3526,14 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor(

#undef SET_FLAG
#undef SUPPORTED_FLAGS
ui64 sizeLimit = std::numeric_limits<ui64>::max();
if (const auto it = settings.find("sizeLimit"); settings.cend() != it) {
sizeLimit = FromString<ui64>(it->second);
}

const auto actor = new TS3StreamReadActor(inputIndex, statsLevel, txId, std::move(gateway), holderFactory, params.GetUrl(), authInfo, pathPattern, pathPatternVariant,
std::move(paths), addPathIndex, readSpec, computeActorId, retryPolicy,
cfg, counters, taskCounters, fileSizeLimit, rowsLimitHint, memoryQuotaManager,
cfg, counters, taskCounters, fileSizeLimit, sizeLimit, rowsLimitHint, memoryQuotaManager,
params.GetUseRuntimeListing(), fileQueueActor, fileQueueBatchSizeLimit, fileQueueBatchObjectCountLimit, fileQueueConsumersCountDelta);

return {actor, actor};
Expand Down
1 change: 1 addition & 0 deletions ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ NActors::IActor* CreateS3FileQueueActor(
NS3Details::TPathList paths,
size_t prefetchSize,
ui64 fileSizeLimit,
ui64 readLimit,
bool useRuntimeListing,
ui64 consumersCount,
ui64 batchSizeLimit,
Expand Down
16 changes: 11 additions & 5 deletions ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ class TS3DqIntegration: public TDqIntegrationBase {

auto fileQueueBatchObjectCountLimit = State_->Configuration->FileQueueBatchObjectCountLimit.Get().GetOrElse(1000);
srcDesc.MutableSettings()->insert({"fileQueueBatchObjectCountLimit", ToString(fileQueueBatchObjectCountLimit)});

YQL_CLOG(DEBUG, ProviderS3) << " useRuntimeListing=" << useRuntimeListing;

if (useRuntimeListing) {
Expand All @@ -422,8 +422,8 @@ class TS3DqIntegration: public TDqIntegrationBase {
packed.Data().Literal().Value(),
FromString<bool>(packed.IsText().Literal().Value()),
paths);
paths.insert(paths.end(),
std::make_move_iterator(pathsChunk.begin()),
paths.insert(paths.end(),
std::make_move_iterator(pathsChunk.begin()),
std::make_move_iterator(pathsChunk.end()));
}

Expand All @@ -434,11 +434,11 @@ class TS3DqIntegration: public TDqIntegrationBase {
builder.AddPath(f.Path, f.Size, f.IsDirectory);
});
builder.Save(&range);

TVector<TString> serialized(1);
TStringOutput out(serialized.front());
range.Save(&out);

paths.clear();
ReadPathsList(srcDesc, {}, serialized, paths);

Expand Down Expand Up @@ -485,12 +485,18 @@ class TS3DqIntegration: public TDqIntegrationBase {

YQL_CLOG(DEBUG, ProviderS3) << " hasDirectories=" << hasDirectories << ", consumersCount=" << consumersCount;

ui64 readLimit = std::numeric_limits<ui64>::max();
if (const auto sizeLimitIter = srcDesc.MutableSettings()->find("sizeLimit"); sizeLimitIter != srcDesc.MutableSettings()->cend()) {
readLimit = FromString<ui64>(sizeLimitIter->second);
}

auto fileQueueActor = NActors::TActivationContext::ActorSystem()->Register(
NDq::CreateS3FileQueueActor(
0ul,
std::move(paths),
fileQueuePrefetchSize,
fileSizeLimit,
readLimit,
useRuntimeListing,
consumersCount,
fileQueueBatchSizeLimit,
Expand Down
13 changes: 10 additions & 3 deletions ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -233,8 +233,15 @@ class TS3LogicalOptProposalTransformer : public TOptimizeTransformerBase {
fileSizeLimit = it->second;
}
}

ui64 userSizeLimit = std::numeric_limits<ui64>::max();
if (formatName == "parquet") {
fileSizeLimit = State_->Configuration->BlockFileSizeLimit;
} else if (formatName == "raw") {
const auto sizeLimitParam = dqSource.Input().Cast<TS3SourceSettings>().SizeLimit().Maybe<TCoAtom>();
if (sizeLimitParam.IsValid()) {
userSizeLimit = FromString<ui64>(sizeLimitParam.Cast().StringValue());
}
}

for (const TS3Path& batch : maybeS3SourceSettings.Cast().Paths()) {
Expand All @@ -245,13 +252,13 @@ class TS3LogicalOptProposalTransformer : public TOptimizeTransformerBase {
UnpackPathsList(packed, isTextEncoded, paths);

for (auto& entry : paths) {
if (entry.Size > fileSizeLimit) {
if (std::min(entry.Size, userSizeLimit) > fileSizeLimit) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

постарайся избавиться от копирования выражения с std::min

ctx.AddError(TIssue(ctx.GetPosition(batch.Pos()),
TStringBuilder() << "Size of object " << entry.Path << " = " << entry.Size << " and exceeds limit = " << fileSizeLimit << " specified for format " << formatName));
hasErr = true;
return false;
}
totalSize += entry.Size;
totalSize += std::min(entry.Size, userSizeLimit);
++count;
}
}
Expand Down Expand Up @@ -673,7 +680,7 @@ class TS3LogicalOptProposalTransformer : public TOptimizeTransformerBase {
.RowsLimitHint(count.Literal())
.Build()
.Build()
.Done();
.Done();
}

TMaybeNode<TExprBase> PushDownLimit(TExprBase node, TExprContext& ctx) const {
Expand Down
Loading
Loading