From afd6be1f882655279145523ca15a5b60a9f1f731 Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Sat, 25 May 2024 13:49:46 +0000 Subject: [PATCH 1/2] Added zero rows limit --- ydb/core/fq/libs/compute/ydb/ydb_connector_actor.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/ydb/core/fq/libs/compute/ydb/ydb_connector_actor.cpp b/ydb/core/fq/libs/compute/ydb/ydb_connector_actor.cpp index 11c6bcdc51cb..5a9613a12623 100644 --- a/ydb/core/fq/libs/compute/ydb/ydb_connector_actor.cpp +++ b/ydb/core/fq/libs/compute/ydb/ydb_connector_actor.cpp @@ -130,6 +130,7 @@ class TYdbConnectorActor : public NActors::TActorBootstrappedGet()->FetchToken); + settings.RowsLimit(0); QueryClient ->FetchScriptResults(ev->Get()->OperationId, ev->Get()->ResultSetId, settings) .Apply([actorSystem = NActors::TActivationContext::ActorSystem(), recipient = ev->Sender, cookie = ev->Cookie, database = ComputeConnection.database()](auto future) { From ff972899f454f6a899665020dd5ae2123848b391 Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Tue, 28 May 2024 10:48:37 +0000 Subject: [PATCH 2/2] Passed rows limit from TResultSetWriterActor --- ydb/core/fq/libs/compute/ydb/events/events.h | 4 +++- ydb/core/fq/libs/compute/ydb/result_writer_actor.cpp | 2 +- ydb/core/fq/libs/compute/ydb/ydb_connector_actor.cpp | 2 +- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/ydb/core/fq/libs/compute/ydb/events/events.h b/ydb/core/fq/libs/compute/ydb/events/events.h index 80b7c55a2fd3..b45928243a1e 100644 --- a/ydb/core/fq/libs/compute/ydb/events/events.h +++ b/ydb/core/fq/libs/compute/ydb/events/events.h @@ -147,15 +147,17 @@ struct TEvYdbCompute { }; struct TEvFetchScriptResultRequest : public NActors::TEventLocal { - TEvFetchScriptResultRequest(const NKikimr::NOperationId::TOperationId& operationId, int64_t resultSetId, const TString& fetchToken) + TEvFetchScriptResultRequest(const NKikimr::NOperationId::TOperationId& operationId, int64_t resultSetId, const TString& fetchToken, uint64_t rowsLimit) : OperationId(operationId) , ResultSetId(resultSetId) , FetchToken(fetchToken) + , RowsLimit(rowsLimit) {} NKikimr::NOperationId::TOperationId OperationId; int64_t ResultSetId = 0; TString FetchToken; + uint64_t RowsLimit = 0; }; struct TEvFetchScriptResultResponse : public NActors::TEventLocal { diff --git a/ydb/core/fq/libs/compute/ydb/result_writer_actor.cpp b/ydb/core/fq/libs/compute/ydb/result_writer_actor.cpp index 70f977735ec0..d1e4eda46d52 100644 --- a/ydb/core/fq/libs/compute/ydb/result_writer_actor.cpp +++ b/ydb/core/fq/libs/compute/ydb/result_writer_actor.cpp @@ -190,7 +190,7 @@ class TResultSetWriterActor : public TBaseComputeActor { void SendFetchScriptResultRequest() { LastProcessedToken = FetchToken; - Register(new TRetryActor(Counters.GetCounters(ERequestType::RT_FETCH_SCRIPT_RESULT), SelfId(), Connector, OperationId, ResultSetId, FetchToken)); + Register(new TRetryActor(Counters.GetCounters(ERequestType::RT_FETCH_SCRIPT_RESULT), SelfId(), Connector, OperationId, ResultSetId, FetchToken, 0)); } void SendReplyAndPassAway() { diff --git a/ydb/core/fq/libs/compute/ydb/ydb_connector_actor.cpp b/ydb/core/fq/libs/compute/ydb/ydb_connector_actor.cpp index 5a9613a12623..97a9987bb95c 100644 --- a/ydb/core/fq/libs/compute/ydb/ydb_connector_actor.cpp +++ b/ydb/core/fq/libs/compute/ydb/ydb_connector_actor.cpp @@ -130,7 +130,7 @@ class TYdbConnectorActor : public NActors::TActorBootstrappedGet()->FetchToken); - settings.RowsLimit(0); + settings.RowsLimit(ev->Get()->RowsLimit); QueryClient ->FetchScriptResults(ev->Get()->OperationId, ev->Get()->ResultSetId, settings) .Apply([actorSystem = NActors::TActivationContext::ActorSystem(), recipient = ev->Sender, cookie = ev->Cookie, database = ComputeConnection.database()](auto future) {