Skip to content

Commit

Permalink
YQ-3756 Shared reading: check buffer size in read_actor (#10418)
Browse files Browse the repository at this point in the history
  • Loading branch information
kardymonds authored Oct 15, 2024
1 parent b3d9c13 commit c1278b6
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 22 deletions.
61 changes: 44 additions & 17 deletions ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,10 @@ class TDqPqRdReadActor : public NActors::TActor<TDqPqRdReadActor>, public NYql::
const TTxId& txId,
const NActors::TActorId selfId,
TActorId rowDispatcherActorId,
ui64 partitionId,
ui64 eventQueueId)
: RowDispatcherActorId(rowDispatcherActorId) {
: RowDispatcherActorId(rowDispatcherActorId)
, PartitionId(partitionId) {
EventsQueue.Init(txId, selfId, selfId, eventQueueId, /* KeepAlive */ true);
EventsQueue.OnNewRecipientId(rowDispatcherActorId);
}
Expand All @@ -160,12 +162,15 @@ class TDqPqRdReadActor : public NActors::TActor<TDqPqRdReadActor>, public NYql::
ui64 NextOffset = 0;
bool IsWaitingRowDispatcherResponse = false;
NYql::NDq::TRetryEventsQueue EventsQueue;
bool NewDataArrived = false;
bool HasPendingData = false;
TActorId RowDispatcherActorId;
ui64 PartitionId;
};

TMap<ui64, SessionInfo> Sessions;
const THolderFactory& HolderFactory;
const i64 MaxBufferSize;
i64 ReadyBufferSizeBytes = 0;

public:
TDqPqRdReadActor(
Expand All @@ -179,7 +184,8 @@ class TDqPqRdReadActor : public NActors::TActor<TDqPqRdReadActor>, public NYql::
const NActors::TActorId& computeActorId,
const NActors::TActorId& localRowDispatcherActorId,
const TString& token,
const ::NMonitoring::TDynamicCounterPtr& counters);
const ::NMonitoring::TDynamicCounterPtr& counters,
i64 bufferSize);

void Handle(NFq::TEvRowDispatcher::TEvCoordinatorChanged::TPtr& ev);
void Handle(NFq::TEvRowDispatcher::TEvCoordinatorResult::TPtr& ev);
Expand Down Expand Up @@ -233,6 +239,7 @@ class TDqPqRdReadActor : public NActors::TActor<TDqPqRdReadActor>, public NYql::
void StopSessions();
void ReInit(const TString& reason);
void PrintInternalState();
void TrySendGetNextBatch(SessionInfo& sessionInfo);
};

TDqPqRdReadActor::TDqPqRdReadActor(
Expand All @@ -246,13 +253,15 @@ TDqPqRdReadActor::TDqPqRdReadActor(
const NActors::TActorId& computeActorId,
const NActors::TActorId& localRowDispatcherActorId,
const TString& token,
const ::NMonitoring::TDynamicCounterPtr& counters)
const ::NMonitoring::TDynamicCounterPtr& counters,
i64 bufferSize)
: TActor<TDqPqRdReadActor>(&TDqPqRdReadActor::StateFunc)
, TDqPqReadActorBase(inputIndex, taskId, this->SelfId(), txId, std::move(sourceParams), std::move(readParams), computeActorId)
, Token(token)
, LocalRowDispatcherActorId(localRowDispatcherActorId)
, Metrics(txId, taskId, counters)
, HolderFactory(holderFactory)
, MaxBufferSize(bufferSize)
{
MetadataFields.reserve(SourceParams.MetadataFieldsSize());
TPqMetaExtractor fieldsExtractor;
Expand Down Expand Up @@ -375,23 +384,28 @@ i64 TDqPqRdReadActor::GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueBatch& b
buffer.clear();
do {
auto& readyBatch = ReadyBuffer.front();
SRC_LOG_T("Return " << readyBatch.Data.size() << " items");

for (const auto& message : readyBatch.Data) {
auto [item, size] = CreateItem(message);
buffer.push_back(std::move(item));
}
usedSpace += readyBatch.UsedSpace;
freeSpace -= readyBatch.UsedSpace;
SRC_LOG_T("usedSpace " << usedSpace);
SRC_LOG_T("freeSpace " << freeSpace);

TPartitionKey partitionKey{TString{}, readyBatch.PartitionId};
PartitionToOffset[partitionKey] = readyBatch.NextOffset;
SRC_LOG_T("NextOffset " << readyBatch.NextOffset);
ReadyBuffer.pop();
} while (freeSpace > 0 && !ReadyBuffer.empty());

ReadyBufferSizeBytes -= usedSpace;
SRC_LOG_T("Return " << buffer.RowCount() << " rows, buffer size " << ReadyBufferSizeBytes << ", free space " << freeSpace << ", result size " << usedSpace);

if (!ReadyBuffer.empty()) {
Send(ComputeActorId, new TEvNewAsyncInputDataArrived(InputIndex));
}
for (auto& [partitionId, sessionInfo] : Sessions) {
TrySendGetNextBatch(sessionInfo);
}
ProcessState();
return usedSpace;
}
Expand Down Expand Up @@ -490,11 +504,8 @@ void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvNewDataArrived::TPtr& ev
SRC_LOG_W("Wrong seq num ignore message, seqNo " << meta.GetSeqNo());
return;
}
sessionInfo.NewDataArrived = true;
Metrics.InFlyGetNextBatch->Inc();
auto event = std::make_unique<NFq::TEvRowDispatcher::TEvGetNextBatch>();
event->Record.SetPartitionId(partitionId);
sessionInfo.EventsQueue.Send(event.release());
sessionInfo.HasPendingData = true;
TrySendGetNextBatch(sessionInfo);
}

void TDqPqRdReadActor::Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvRetry::TPtr& ev) {
Expand Down Expand Up @@ -578,7 +589,7 @@ void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvCoordinatorResult::TPtr&
Sessions.emplace(
std::piecewise_construct,
std::forward_as_tuple(partitionId),
std::forward_as_tuple(TxId, SelfId(), rowDispatcherActorId, partitionId));
std::forward_as_tuple(TxId, SelfId(), rowDispatcherActorId, partitionId, partitionId));
}
}
}
Expand Down Expand Up @@ -637,11 +648,12 @@ void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvMessageBatch::TPtr& ev)
for (const auto& message : ev->Get()->Record.GetMessages()) {
SRC_LOG_T("Json: " << message.GetJson());
activeBatch.Data.emplace_back(message.GetJson());
activeBatch.UsedSpace += message.GetJson().size();
sessionInfo.NextOffset = message.GetOffset() + 1;
bytes += message.GetJson().size();
SRC_LOG_T("TEvMessageBatch NextOffset " << sessionInfo.NextOffset);
}
activeBatch.UsedSpace = bytes;
ReadyBufferSizeBytes += bytes;
IngressStats.Bytes += bytes;
IngressStats.Chunks++;
activeBatch.NextOffset = ev->Get()->Record.GetNextMessageOffset();
Expand Down Expand Up @@ -697,6 +709,20 @@ void TDqPqRdReadActor::Handle(TEvPrivate::TEvProcessState::TPtr&) {
ProcessState();
}

void TDqPqRdReadActor::TrySendGetNextBatch(SessionInfo& sessionInfo) {
if (!sessionInfo.HasPendingData) {
return;
}
if (ReadyBufferSizeBytes > MaxBufferSize) {
return;
}
Metrics.InFlyGetNextBatch->Inc();
auto event = std::make_unique<NFq::TEvRowDispatcher::TEvGetNextBatch>();
sessionInfo.HasPendingData = false;
event->Record.SetPartitionId(sessionInfo.PartitionId);
sessionInfo.EventsQueue.Send(event.release());
}

std::pair<IDqComputeActorAsyncInput*, NActors::IActor*> CreateDqPqRdReadActor(
NPq::NProto::TDqPqTopicSource&& settings,
ui64 inputIndex,
Expand All @@ -709,7 +735,7 @@ std::pair<IDqComputeActorAsyncInput*, NActors::IActor*> CreateDqPqRdReadActor(
const NActors::TActorId& localRowDispatcherActorId,
const NKikimr::NMiniKQL::THolderFactory& holderFactory,
const ::NMonitoring::TDynamicCounterPtr& counters,
i64 /*bufferSize*/) // TODO
i64 bufferSize)
{
auto taskParamsIt = taskParams.find("pq");
YQL_ENSURE(taskParamsIt != taskParams.end(), "Failed to get pq task params");
Expand All @@ -731,7 +757,8 @@ std::pair<IDqComputeActorAsyncInput*, NActors::IActor*> CreateDqPqRdReadActor(
computeActorId,
localRowDispatcherActorId,
token,
counters
counters,
bufferSize
);

return {actor, actor};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
namespace NYql::NDq {
class TDqAsyncIoFactory;

const i64 PQRdReadDefaultFreeSpace = 16_MB;
const i64 PQRdReadDefaultFreeSpace = 256_MB;

std::pair<IDqComputeActorAsyncInput*, NActors::IActor*> CreateDqPqRdReadActor(
NPq::NProto::TDqPqTopicSource&& settings,
Expand Down
34 changes: 30 additions & 4 deletions ydb/tests/fq/pq_async_io/ut/dq_pq_rd_read_actor_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

#include <library/cpp/testing/unittest/registar.h>
#include <ydb/core/fq/libs/row_dispatcher/events/data_plane.h>
#include <ydb/library/actors/testlib/test_runtime.h>
#include <library/cpp/testing/unittest/gtest.h>

#include <thread>

Expand Down Expand Up @@ -51,7 +53,8 @@ struct TFixture : public TPqIoTestFixture {
LocalRowDispatcherId,
actor.GetHolderFactory(),
MakeIntrusive<NMonitoring::TDynamicCounters>(),
freeSpace);
freeSpace
);

actor.InitAsyncInput(dqSource, dqSourceAsActor);
});
Expand Down Expand Up @@ -191,9 +194,8 @@ struct TFixture : public TPqIoTestFixture {
});
}


void StartSession(NYql::NPq::NProto::TDqPqTopicSource& settings) {
InitRdSource(settings);
void StartSession(NYql::NPq::NProto::TDqPqTopicSource& settings, i64 freeSpace = 1_MB) {
InitRdSource(settings, freeSpace);
SourceRead<TString>(UVParser);
ExpectCoordinatorChangesSubscribe();

Expand Down Expand Up @@ -345,6 +347,30 @@ Y_UNIT_TEST_SUITE(TDqPqRdReadActorTests) {
ProcessSomeJsons(3, {Json4}, RowDispatcher2);
}

Y_UNIT_TEST_F(Backpressure, TFixture) {
StartSession(Source1, 2_KB);

TString json(900, 'c');
ProcessSomeJsons(0, {json}, RowDispatcher1);

MockNewDataArrived(RowDispatcher1);
ExpectGetNextBatch(RowDispatcher1);
MockMessageBatch(0, {json, json, json}, RowDispatcher1);

MockNewDataArrived(RowDispatcher1);
ASSERT_THROW(
CaSetup->Runtime->GrabEdgeEvent<NFq::TEvRowDispatcher::TEvGetNextBatch>(RowDispatcher1, TDuration::Seconds(0)),
NActors::TEmptyEventQueueException);

auto result = SourceReadDataUntil<TString>(UVParser, 3);
AssertDataWithWatermarks(result, {json, json, json}, {});
ExpectGetNextBatch(RowDispatcher1);

MockMessageBatch(3, {Json1}, RowDispatcher1);
result = SourceReadDataUntil<TString>(UVParser, 1);
AssertDataWithWatermarks(result, {Json1}, {});
}

Y_UNIT_TEST_F(RowDispatcherIsRestarted, TFixture) {
StartSession(Source1);
ProcessSomeJsons(0, {Json1, Json2}, RowDispatcher1);
Expand Down
4 changes: 4 additions & 0 deletions ydb/tests/fq/pq_async_io/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,7 @@ PEERDIR(
YQL_LAST_ABI_VERSION()

END()

RECURSE_FOR_TESTS(
ut
)

0 comments on commit c1278b6

Please sign in to comment.