Skip to content

Commit

Permalink
YQ-2302: read_max_bytes bypasses file_size_limit (ydb-platform#4117)
Browse files Browse the repository at this point in the history
  • Loading branch information
alchizhevsky authored and Hor911 committed May 28, 2024
1 parent 5efdadf commit e761874
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 30 deletions.
61 changes: 39 additions & 22 deletions ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,20 +160,20 @@ struct TEvS3FileQueue {
EvEnd
};
static_assert(EvEnd < EventSpaceEnd(NKikimr::TKikimrEvents::ES_S3_FILE_QUEUE),
static_assert(EvEnd < EventSpaceEnd(NKikimr::TKikimrEvents::ES_S3_FILE_QUEUE),
"expect EvEnd < EventSpaceEnd(TEvents::ES_S3_FILE_QUEUE)");

struct TEvUpdateConsumersCount :
public TEventPB<TEvUpdateConsumersCount, NS3::FileQueue::TEvUpdateConsumersCount, EvUpdateConsumersCount> {

explicit TEvUpdateConsumersCount(ui64 consumersCountDelta = 0) {
Record.SetConsumersCountDelta(consumersCountDelta);
}
};

struct TEvAck :
public TEventPB<TEvAck, NS3::FileQueue::TEvAck, EvAck> {

TEvAck() = default;

explicit TEvAck(const TMessageTransportMeta& transportMeta) {
Expand Down Expand Up @@ -388,6 +388,7 @@ class TS3FileQueueActor : public TActorBootstrapped<TS3FileQueueActor> {
TPathList paths,
size_t prefetchSize,
ui64 fileSizeLimit,
ui64 readLimit,
bool useRuntimeListing,
ui64 consumersCount,
ui64 batchSizeLimit,
Expand All @@ -401,6 +402,7 @@ class TS3FileQueueActor : public TActorBootstrapped<TS3FileQueueActor> {
: TxId(std::move(txId))
, PrefetchSize(prefetchSize)
, FileSizeLimit(fileSizeLimit)
, ReadLimit(readLimit)
, MaybeIssues(Nothing())
, UseRuntimeListing(useRuntimeListing)
, ConsumersCount(consumersCount)
Expand Down Expand Up @@ -513,7 +515,9 @@ class TS3FileQueueActor : public TActorBootstrapped<TS3FileQueueActor> {
// skip 'directories'
continue;
}
if (object.Size > FileSizeLimit) {

const ui64 bytesUsed = std::min(object.Size, ReadLimit);
if (bytesUsed > FileSizeLimit) {
auto errorMessage = TStringBuilder()
<< "Size of object " << object.Path << " = "
<< object.Size
Expand All @@ -525,10 +529,10 @@ class TS3FileQueueActor : public TActorBootstrapped<TS3FileQueueActor> {
LOG_T("TS3FileQueueActor", "SaveRetrievedResults adding path: " << object.Path << " of size " << object.Size);
TObjectPath objectPath;
objectPath.SetPath(object.Path);
objectPath.SetSize(object.Size);
objectPath.SetSize(bytesUsed);
objectPath.SetPathIndex(CurrentDirectoryPathIndex);
Objects.emplace_back(std::move(objectPath));
ObjectsTotalSize += object.Size;
ObjectsTotalSize += bytesUsed;
}
return true;
}
Expand Down Expand Up @@ -598,7 +602,7 @@ class TS3FileQueueActor : public TActorBootstrapped<TS3FileQueueActor> {
Send(ev->Sender, new TEvS3FileQueue::TEvObjectPathReadError(*MaybeIssues, ev->Get()->Record.GetTransportMeta()));
TryFinish(ev->Sender, ev->Get()->Record.GetTransportMeta().GetSeqNo());
}

void HandleUpdateConsumersCount(TEvS3FileQueue::TEvUpdateConsumersCount::TPtr& ev) {
if (!UpdatedConsumers.contains(ev->Sender)) {
UpdatedConsumers.insert(ev->Sender);
Expand Down Expand Up @@ -653,7 +657,7 @@ class TS3FileQueueActor : public TActorBootstrapped<TS3FileQueueActor> {

LOG_D("TS3FileQueueActor", "SendObjects Sending " << result.size() << " objects to consumer with id " << consumer << ", " << ObjectsTotalSize << " bytes left");
Send(consumer, new TEvS3FileQueue::TEvObjectPathBatch(std::move(result), HasNoMoreItems(), transportMeta));

if (HasNoMoreItems()) {
TryFinish(consumer, transportMeta.GetSeqNo());
}
Expand All @@ -675,7 +679,7 @@ class TS3FileQueueActor : public TActorBootstrapped<TS3FileQueueActor> {
}

bool CanSendToConsumer(const TActorId& consumer) {
return !UseRuntimeListing || RoundRobinStageFinished ||
return !UseRuntimeListing || RoundRobinStageFinished ||
(StartedConsumers.size() < ConsumersCount && !StartedConsumers.contains(consumer));
}

Expand Down Expand Up @@ -753,7 +757,7 @@ class TS3FileQueueActor : public TActorBootstrapped<TS3FileQueueActor> {
}
});
}

void ScheduleRequest(const TActorId& consumer, const TMessageTransportMeta& transportMeta) {
PendingRequests[consumer].push_back(transportMeta);
HasPendingRequests = true;
Expand Down Expand Up @@ -790,7 +794,7 @@ class TS3FileQueueActor : public TActorBootstrapped<TS3FileQueueActor> {
}
}
}

void TryFinish(const TActorId& consumer, ui64 seqNo) {
LOG_T("TS3FileQueueActor", "TryFinish from consumer " << consumer << ", " << FinishedConsumers.size() << " consumers already finished, seqNo=" << seqNo);
if (FinishingConsumerToLastSeqNo.contains(consumer)) {
Expand All @@ -814,6 +818,7 @@ class TS3FileQueueActor : public TActorBootstrapped<TS3FileQueueActor> {

size_t PrefetchSize;
ui64 FileSizeLimit;
ui64 ReadLimit;
TMaybe<NS3Lister::IS3Lister::TPtr> MaybeLister = Nothing();
TMaybe<NThreading::TFuture<NS3Lister::TListResult>> ListingFuture;
size_t CurrentDirectoryPathIndex = 0;
Expand All @@ -838,7 +843,7 @@ class TS3FileQueueActor : public TActorBootstrapped<TS3FileQueueActor> {
const TString Pattern;
const ES3PatternVariant PatternVariant;
const ES3PatternType PatternType;

static constexpr TDuration PoisonTimeout = TDuration::Hours(3);
static constexpr TDuration RoundRobinStageTimeout = TDuration::Seconds(3);
};
Expand Down Expand Up @@ -918,6 +923,7 @@ class TS3ReadActor : public TActorBootstrapped<TS3ReadActor>, public IDqComputeA
std::move(Paths),
ReadActorFactoryCfg.MaxInflight * 2,
FileSizeLimit,
SizeLimit,
false,
1,
FileQueueBatchSizeLimit,
Expand Down Expand Up @@ -1097,7 +1103,7 @@ class TS3ReadActor : public TActorBootstrapped<TS3ReadActor>, public IDqComputeA
void HandleAck(TEvS3FileQueue::TEvAck::TPtr& ev) {
FileQueueEvents.OnEventReceived(ev);
}

static void OnDownloadFinished(TActorSystem* actorSystem, TActorId selfId, const TString& requestId, IHTTPGateway::TResult&& result, size_t pathInd, const TString path) {
if (!result.Issues) {
actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvReadResult(std::move(result.Content), requestId, pathInd, path)));
Expand Down Expand Up @@ -1209,7 +1215,7 @@ class TS3ReadActor : public TActorBootstrapped<TS3ReadActor>, public IDqComputeA
auto issues = NS3Util::AddParentIssue(TStringBuilder{} << "Error while reading file " << path << " with request id [" << requestId << "]", TIssues{result->Get()->Error});
Send(ComputeActorId, new TEvAsyncInputError(InputIndex, std::move(issues), NYql::NDqProto::StatusIds::EXTERNAL_ERROR));
}

void Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvRetry::TPtr&) {
FileQueueEvents.Retry();
}
Expand Down Expand Up @@ -2088,7 +2094,7 @@ class TS3ReadCoroImpl : public TActorCoroImpl {
if (isCancelled) {
LOG_CORO_D("RunCoroBlockArrowParserOverHttp - STOPPED ON SATURATION, downloaded " <<
QueueBufferCounter->DownloadedBytes << " bytes");
break;
break;
}
}
}
Expand Down Expand Up @@ -2538,6 +2544,7 @@ class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public
::NMonitoring::TDynamicCounterPtr counters,
::NMonitoring::TDynamicCounterPtr taskCounters,
ui64 fileSizeLimit,
ui64 readLimit,
std::optional<ui64> rowsLimitHint,
IMemoryQuotaManager::TPtr memoryQuotaManager,
bool useRuntimeListing,
Expand All @@ -2564,6 +2571,7 @@ class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public
, TaskCounters(std::move(taskCounters))
, FileQueueActor(fileQueueActor)
, FileSizeLimit(fileSizeLimit)
, ReadLimit(readLimit)
, MemoryQuotaManager(memoryQuotaManager)
, UseRuntimeListing(useRuntimeListing)
, FileQueueBatchSizeLimit(fileQueueBatchSizeLimit)
Expand Down Expand Up @@ -2622,6 +2630,7 @@ class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public
std::move(Paths),
ReadActorFactoryCfg.MaxInflight * 2,
FileSizeLimit,
ReadLimit,
false,
1,
FileQueueBatchSizeLimit,
Expand Down Expand Up @@ -2784,7 +2793,7 @@ class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public
void CommitState(const NDqProto::TCheckpoint&) final {}

ui64 GetInputIndex() const final {
return InputIndex;
return InputIndex;
}

const TDqAsyncStats& GetIngressStats() const final {
Expand Down Expand Up @@ -3038,7 +3047,7 @@ class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public
}
}
}

void Handle(TEvS3FileQueue::TEvAck::TPtr& ev) {
FileQueueEvents.OnEventReceived(ev);
}
Expand Down Expand Up @@ -3136,6 +3145,7 @@ class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public
std::set<NActors::TActorId> CoroActors;
NActors::TActorId FileQueueActor;
const ui64 FileSizeLimit;
const ui64 ReadLimit;
bool Bootstrapped = false;
IMemoryQuotaManager::TPtr MemoryQuotaManager;
bool UseRuntimeListing;
Expand Down Expand Up @@ -3295,6 +3305,7 @@ IActor* CreateS3FileQueueActor(
TPathList paths,
size_t prefetchSize,
ui64 fileSizeLimit,
ui64 readLimit,
bool useRuntimeListing,
ui64 consumersCount,
ui64 batchSizeLimit,
Expand All @@ -3310,6 +3321,7 @@ IActor* CreateS3FileQueueActor(
paths,
prefetchSize,
fileSizeLimit,
readLimit,
useRuntimeListing,
consumersCount,
batchSizeLimit,
Expand Down Expand Up @@ -3394,15 +3406,15 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor(
if (params.GetRowsLimitHint() != 0) {
rowsLimitHint = params.GetRowsLimitHint();
}

TActorId fileQueueActor;
if (auto it = settings.find("fileQueueActor"); it != settings.cend()) {
NActorsProto::TActorId protoId;
TMemoryInput inputStream(it->second);
ParseFromTextFormat(inputStream, protoId);
fileQueueActor = ActorIdFromProto(protoId);
}

ui64 fileQueueBatchSizeLimit = 0;
if (auto it = settings.find("fileQueueBatchSizeLimit"); it != settings.cend()) {
fileQueueBatchSizeLimit = FromString<ui64>(it->second);
Expand All @@ -3412,7 +3424,7 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor(
if (auto it = settings.find("fileQueueBatchObjectCountLimit"); it != settings.cend()) {
fileQueueBatchObjectCountLimit = FromString<ui64>(it->second);
}

ui64 fileQueueConsumersCountDelta = 0;
if (readRanges.size() > 1) {
fileQueueConsumersCountDelta = readRanges.size() - 1;
Expand Down Expand Up @@ -3520,9 +3532,14 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor(

#undef SET_FLAG
#undef SUPPORTED_FLAGS
ui64 sizeLimit = std::numeric_limits<ui64>::max();
if (const auto it = settings.find("sizeLimit"); settings.cend() != it) {
sizeLimit = FromString<ui64>(it->second);
}

const auto actor = new TS3StreamReadActor(inputIndex, statsLevel, txId, std::move(gateway), holderFactory, params.GetUrl(), authInfo, pathPattern, pathPatternVariant,
std::move(paths), addPathIndex, readSpec, computeActorId, retryPolicy,
cfg, counters, taskCounters, fileSizeLimit, rowsLimitHint, memoryQuotaManager,
cfg, counters, taskCounters, fileSizeLimit, sizeLimit, rowsLimitHint, memoryQuotaManager,
params.GetUseRuntimeListing(), fileQueueActor, fileQueueBatchSizeLimit, fileQueueBatchObjectCountLimit, fileQueueConsumersCountDelta);

return {actor, actor};
Expand Down
1 change: 1 addition & 0 deletions ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ NActors::IActor* CreateS3FileQueueActor(
NS3Details::TPathList paths,
size_t prefetchSize,
ui64 fileSizeLimit,
ui64 readLimit,
bool useRuntimeListing,
ui64 consumersCount,
ui64 batchSizeLimit,
Expand Down
16 changes: 11 additions & 5 deletions ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ class TS3DqIntegration: public TDqIntegrationBase {

auto fileQueueBatchObjectCountLimit = State_->Configuration->FileQueueBatchObjectCountLimit.Get().GetOrElse(1000);
srcDesc.MutableSettings()->insert({"fileQueueBatchObjectCountLimit", ToString(fileQueueBatchObjectCountLimit)});

YQL_CLOG(DEBUG, ProviderS3) << " useRuntimeListing=" << useRuntimeListing;

if (useRuntimeListing) {
Expand All @@ -422,8 +422,8 @@ class TS3DqIntegration: public TDqIntegrationBase {
packed.Data().Literal().Value(),
FromString<bool>(packed.IsText().Literal().Value()),
paths);
paths.insert(paths.end(),
std::make_move_iterator(pathsChunk.begin()),
paths.insert(paths.end(),
std::make_move_iterator(pathsChunk.begin()),
std::make_move_iterator(pathsChunk.end()));
}

Expand All @@ -434,11 +434,11 @@ class TS3DqIntegration: public TDqIntegrationBase {
builder.AddPath(f.Path, f.Size, f.IsDirectory);
});
builder.Save(&range);

TVector<TString> serialized(1);
TStringOutput out(serialized.front());
range.Save(&out);

paths.clear();
ReadPathsList(srcDesc, {}, serialized, paths);

Expand Down Expand Up @@ -485,12 +485,18 @@ class TS3DqIntegration: public TDqIntegrationBase {

YQL_CLOG(DEBUG, ProviderS3) << " hasDirectories=" << hasDirectories << ", consumersCount=" << consumersCount;

ui64 readLimit = std::numeric_limits<ui64>::max();
if (const auto sizeLimitIter = srcDesc.MutableSettings()->find("sizeLimit"); sizeLimitIter != srcDesc.MutableSettings()->cend()) {
readLimit = FromString<ui64>(sizeLimitIter->second);
}

auto fileQueueActor = NActors::TActivationContext::ActorSystem()->Register(
NDq::CreateS3FileQueueActor(
0ul,
std::move(paths),
fileQueuePrefetchSize,
fileSizeLimit,
readLimit,
useRuntimeListing,
consumersCount,
fileQueueBatchSizeLimit,
Expand Down
14 changes: 11 additions & 3 deletions ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -233,8 +233,15 @@ class TS3LogicalOptProposalTransformer : public TOptimizeTransformerBase {
fileSizeLimit = it->second;
}
}

ui64 userSizeLimit = std::numeric_limits<ui64>::max();
if (formatName == "parquet") {
fileSizeLimit = State_->Configuration->BlockFileSizeLimit;
} else if (formatName == "raw") {
const auto sizeLimitParam = dqSource.Input().Cast<TS3SourceSettings>().SizeLimit().Maybe<TCoAtom>();
if (sizeLimitParam.IsValid()) {
userSizeLimit = FromString<ui64>(sizeLimitParam.Cast().StringValue());
}
}

for (const TS3Path& batch : maybeS3SourceSettings.Cast().Paths()) {
Expand All @@ -245,13 +252,14 @@ class TS3LogicalOptProposalTransformer : public TOptimizeTransformerBase {
UnpackPathsList(packed, isTextEncoded, paths);

for (auto& entry : paths) {
if (entry.Size > fileSizeLimit) {
const ui64 bytesUsed = std::min(entry.Size, userSizeLimit);
if (bytesUsed > fileSizeLimit) {
ctx.AddError(TIssue(ctx.GetPosition(batch.Pos()),
TStringBuilder() << "Size of object " << entry.Path << " = " << entry.Size << " and exceeds limit = " << fileSizeLimit << " specified for format " << formatName));
hasErr = true;
return false;
}
totalSize += entry.Size;
totalSize += bytesUsed;
++count;
}
}
Expand Down Expand Up @@ -673,7 +681,7 @@ class TS3LogicalOptProposalTransformer : public TOptimizeTransformerBase {
.RowsLimitHint(count.Literal())
.Build()
.Build()
.Done();
.Done();
}

TMaybeNode<TExprBase> PushDownLimit(TExprBase node, TExprContext& ctx) const {
Expand Down

0 comments on commit e761874

Please sign in to comment.