Skip to content

Commit

Permalink
Fix for dynamic listing (#4525) (#4604)
Browse files Browse the repository at this point in the history
  • Loading branch information
Hor911 authored May 16, 2024
1 parent 68d0786 commit 0e3c730
Showing 1 changed file with 12 additions and 8 deletions.
20 changes: 12 additions & 8 deletions ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -417,11 +417,14 @@ class TS3FileQueueActor : public TActorBootstrapped<TS3FileQueueActor> {
object.SetPath(paths[i].Path);
object.SetPathIndex(paths[i].PathIndex);
if (paths[i].IsDirectory) {
LOG_T("TS3FileQueueActor", "TS3FileQueueActor adding dir: " << paths[i].Path);
object.SetSize(0);
Directories.emplace_back(std::move(object));
} else {
LOG_T("TS3FileQueueActor", "TS3FileQueueActor adding path: " << paths[i].Path << " of size " << paths[i].Size);
object.SetSize(paths[i].Size);
Objects.emplace_back(std::move(object));
ObjectsTotalSize += paths[i].Size;
}
}
}
Expand Down Expand Up @@ -496,9 +499,9 @@ class TS3FileQueueActor : public TActorBootstrapped<TS3FileQueueActor> {
}

bool SaveRetrievedResults(const NS3Lister::TListResult& listingResult) {
LOG_T("TS3FileQueueActor", "SaveRetrievedResults");
if (std::holds_alternative<NS3Lister::TListError>(listingResult)) {
MaybeIssues = std::get<NS3Lister::TListError>(listingResult).Issues;
LOG_E("TS3FileQueueActor", "SaveRetrievedResults error: [" << (MaybeIssues ? MaybeIssues->ToOneLineString() : "") << "]");
return false;
}

Expand All @@ -519,7 +522,7 @@ class TS3FileQueueActor : public TActorBootstrapped<TS3FileQueueActor> {
MaybeIssues = TIssues{TIssue{errorMessage}};
return false;
}
LOG_T("TS3FileQueueActor", "SaveRetrievedResults adding path: " << object.Path);
LOG_T("TS3FileQueueActor", "SaveRetrievedResults adding path: " << object.Path << " of size " << object.Size);
TObjectPath objectPath;
objectPath.SetPath(object.Path);
objectPath.SetSize(object.Size);
Expand Down Expand Up @@ -577,7 +580,7 @@ class TS3FileQueueActor : public TActorBootstrapped<TS3FileQueueActor> {
switch (const auto etype = ev->GetTypeRewrite()) {
hFunc(TEvS3FileQueue::TEvUpdateConsumersCount, HandleUpdateConsumersCount);
hFunc(TEvS3FileQueue::TEvGetNextBatch, HandleGetNextBatchForErrorState);
cFunc(TEvPrivatePrivate::EvRoundRobinStageTimeout, HandleRoundRobinStageTimeout);
cFunc(TEvPrivatePrivate::TEvRoundRobinStageTimeout::EventType, HandleRoundRobinStageTimeout);
cFunc(TEvents::TSystem::Poison, HandlePoison);
default:
MaybeIssues = TIssues{TIssue{TStringBuilder() << "An event with unknown type has been received: '" << etype << "'"}};
Expand All @@ -598,17 +601,18 @@ class TS3FileQueueActor : public TActorBootstrapped<TS3FileQueueActor> {

void HandleUpdateConsumersCount(TEvS3FileQueue::TEvUpdateConsumersCount::TPtr& ev) {
if (!UpdatedConsumers.contains(ev->Sender)) {
LOG_D(
"TS3FileQueueActor",
"HandleUpdateConsumersCount Reducing ConsumersCount by " << ev->Get()->Record.GetConsumersCountDelta() << ", recieved from " << ev->Sender);
UpdatedConsumers.insert(ev->Sender);
ConsumersCount -= ev->Get()->Record.GetConsumersCountDelta();
LOG_D(
"TS3FileQueueActor",
"HandleUpdateConsumersCount Reducing ConsumersCount by " << ev->Get()->Record.GetConsumersCountDelta()
<< " to " << ConsumersCount << ", received from " << ev->Sender);
}
Send(ev->Sender, new TEvS3FileQueue::TEvAck(ev->Get()->Record.GetTransportMeta()));
}

void HandleRoundRobinStageTimeout() {
LOG_T("TS3FileQueueActor","Handle start stage timeout");
LOG_D("TS3FileQueueActor","Handle start stage timeout");
if (!RoundRobinStageFinished) {
RoundRobinStageFinished = true;
AnswerPendingRequests();
Expand Down Expand Up @@ -647,7 +651,7 @@ class TS3FileQueueActor : public TActorBootstrapped<TS3FileQueueActor> {
ObjectsTotalSize -= totalSize;
}

LOG_T("TS3FileQueueActor", "SendObjects Sending " << result.size() << " objects to consumer with id " << consumer);
LOG_D("TS3FileQueueActor", "SendObjects Sending " << result.size() << " objects to consumer with id " << consumer << ", " << ObjectsTotalSize << " bytes left");
Send(consumer, new TEvS3FileQueue::TEvObjectPathBatch(std::move(result), HasNoMoreItems(), transportMeta));

if (HasNoMoreItems()) {
Expand Down

0 comments on commit 0e3c730

Please sign in to comment.