diff --git a/ydb/core/grpc_services/rpc_forget_operation.cpp b/ydb/core/grpc_services/rpc_forget_operation.cpp index 6e9edc1d2d8d..bb22adfa2c74 100644 --- a/ydb/core/grpc_services/rpc_forget_operation.cpp +++ b/ydb/core/grpc_services/rpc_forget_operation.cpp @@ -97,7 +97,7 @@ class TForgetOperationRPC: public TRpcOperationRequestActorGetDeadline())); + Send(NKqp::MakeKqpProxyID(SelfId().NodeId()), new NKqp::TEvForgetScriptExecutionOperation(DatabaseName, OperationId)); } public: diff --git a/ydb/core/kqp/common/events/script_executions.h b/ydb/core/kqp/common/events/script_executions.h index 54dab1efc61b..a4eb98ee0bc4 100644 --- a/ydb/core/kqp/common/events/script_executions.h +++ b/ydb/core/kqp/common/events/script_executions.h @@ -20,16 +20,13 @@ enum EFinalizationStatus : i32 { }; struct TEvForgetScriptExecutionOperation : public NActors::TEventLocal { - explicit TEvForgetScriptExecutionOperation(const TString& database, const NOperationId::TOperationId& id, TInstant deadline) + TEvForgetScriptExecutionOperation(const TString& database, const NOperationId::TOperationId& id) : Database(database) , OperationId(id) - , Deadline(deadline) - { - } + {} - TString Database; - NOperationId::TOperationId OperationId; - TInstant Deadline; + const TString Database; + const NOperationId::TOperationId OperationId; }; struct TEvForgetScriptExecutionOperationResponse : public NActors::TEventLocal { diff --git a/ydb/core/kqp/proxy_service/kqp_script_executions.cpp b/ydb/core/kqp/proxy_service/kqp_script_executions.cpp index 599cf44a050f..a71e065d2cdf 100644 --- a/ydb/core/kqp/proxy_service/kqp_script_executions.cpp +++ b/ydb/core/kqp/proxy_service/kqp_script_executions.cpp @@ -871,11 +871,10 @@ class TForgetScriptExecutionOperationQueryActor : public TQueryBase { static constexpr i64 MAX_NUMBER_ROWS_IN_BATCH = 100000; public: - TForgetScriptExecutionOperationQueryActor(const TString& executionId, const TString& database, TInstant operationDeadline) + TForgetScriptExecutionOperationQueryActor(const TString& executionId, const TString& database) : TQueryBase(__func__, executionId) , ExecutionId(executionId) , Database(database) - , Deadline(operationDeadline) {} void OnRunQuery() override { @@ -888,14 +887,36 @@ class TForgetScriptExecutionOperationQueryActor : public TQueryBase { FROM `.metadata/script_executions` WHERE database = $database AND execution_id = $execution_id; + DELETE + FROM `.metadata/script_execution_leases` + WHERE database = $database AND execution_id = $execution_id; + )"; + + NYdb::TParamsBuilder params; + params + .AddParam("$database") + .Utf8(Database) + .Build() + .AddParam("$execution_id") + .Utf8(ExecutionId) + .Build(); + + RunDataQuery(sql, ¶ms); + SetQueryResultHandler(&TForgetScriptExecutionOperationQueryActor::OnOperationDeleted, "Forget script execution operation"); + } + + void OnOperationDeleted() { + SendResponse(Ydb::StatusIds::SUCCESS, {}); + + TString sql = R"( + -- TForgetScriptExecutionOperationQueryActor::OnOperationDeleted + DECLARE $database AS Text; + DECLARE $execution_id AS Text; + SELECT MAX(result_set_id) AS max_result_set_id, MAX(row_id) AS max_row_id FROM `.metadata/result_sets` WHERE database = $database AND execution_id = $execution_id AND (expire_at > CurrentUtcTimestamp() OR expire_at IS NULL); - - DELETE - FROM `.metadata/script_execution_leases` - WHERE database = $database AND execution_id = $execution_id; )"; NYdb::TParamsBuilder params; @@ -908,7 +929,7 @@ class TForgetScriptExecutionOperationQueryActor : public TQueryBase { .Build(); RunDataQuery(sql, ¶ms); - SetQueryResultHandler(&TForgetScriptExecutionOperationQueryActor::OnGetResultsInfo, "Forget script execution operation"); + SetQueryResultHandler(&TForgetScriptExecutionOperationQueryActor::OnGetResultsInfo, "Get results info"); } void OnGetResultsInfo() { @@ -939,7 +960,6 @@ class TForgetScriptExecutionOperationQueryActor : public TQueryBase { } MaxRowId = *maxRowId; - ClearTimeInfo(); DeleteScriptResults(); } @@ -985,34 +1005,34 @@ class TForgetScriptExecutionOperationQueryActor : public TQueryBase { return; } - if (TInstant::Now() + 2 * GetAverageTime() >= Deadline) { - Finish(Ydb::StatusIds::TIMEOUT, ForgetOperationTimeoutIssues()); - return; - } - DeleteScriptResults(); } void OnFinish(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) override { - Send(Owner, new TEvForgetScriptExecutionOperationResponse(status, std::move(issues))); + SendResponse(status, std::move(issues)); } - static NYql::TIssues ForgetOperationTimeoutIssues() { - return { NYql::TIssue("Forget script execution operation timeout") }; +private: + void SendResponse(Ydb::StatusIds::StatusCode status, NYql::TIssues issues) { + if (ResponseSent) { + return; + } + ResponseSent = true; + Send(Owner, new TEvForgetScriptExecutionOperationResponse(status, std::move(issues))); } private: - TString ExecutionId; - TString Database; - TInstant Deadline; + const TString ExecutionId; + const TString Database; i64 NumberRowsInBatch = 0; i64 MaxRowId = 0; + bool ResponseSent = false; }; class TForgetScriptExecutionOperationActor : public TActorBootstrapped { -public: - using TForgetOperationRetryActor = TQueryRetryActor; + using TForgetOperationRetryActor = TQueryRetryActor; +public: explicit TForgetScriptExecutionOperationActor(TEvForgetScriptExecutionOperation::TPtr ev) : Request(std::move(ev)) {} @@ -1050,19 +1070,7 @@ class TForgetScriptExecutionOperationActor : public TActorBootstrappedGet()->Deadline - TInstant::Now(); - if (maxTime <= minDelay) { - Reply(Ydb::StatusIds::TIMEOUT, TForgetScriptExecutionOperationQueryActor::ForgetOperationTimeoutIssues()); - return; - } - - Register(new TForgetOperationRetryActor( - SelfId(), - TForgetOperationRetryActor::IRetryPolicy::GetExponentialBackoffPolicy(TForgetOperationRetryActor::Retryable, minDelay, TDuration::MilliSeconds(200), TDuration::Seconds(1), std::numeric_limits::max(), maxTime), - ExecutionId, Request->Get()->Database, TInstant::Now() + maxTime - )); + Register(new TForgetOperationRetryActor(SelfId(), ExecutionId, Request->Get()->Database)); } void Handle(TEvForgetScriptExecutionOperationResponse::TPtr& ev) { @@ -1090,7 +1098,7 @@ class TForgetScriptExecutionOperationActor : public TActorBootstrappedGetDriver()); - while (TInstant::Now() < forgetOperationTimeout) { - auto status = operationClient.Forget(scriptExecutionOperation.Id()).ExtractValueSync(); - if (status.GetStatus() == NYdb::EStatus::SUCCESS || status.GetStatus() == NYdb::EStatus::NOT_FOUND) { - return; - } + auto status = operationClient.Forget(scriptExecutionOperation.Id()).ExtractValueSync(); + UNIT_ASSERT_C(status.IsSuccess(), status.GetIssues().ToOneLineString()); + + const TString countResultsQuery = fmt::format(R"( + SELECT COUNT(*) + FROM `.metadata/result_sets` + WHERE execution_id = "{execution_id}" AND expire_at > CurrentUtcTimestamp(); + )", "execution_id"_a=readyOp.Metadata().ExecutionId); + + TInstant forgetChecksStart = TInstant::Now(); + while (TInstant::Now() - forgetChecksStart <= TDuration::Minutes(5)) { + NYdb::NTable::TDataQueryResult result = session.ExecuteDataQuery(countResultsQuery, NYdb::NTable::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); - UNIT_ASSERT_C(status.GetStatus() == NYdb::EStatus::ABORTED || status.GetStatus() == NYdb::EStatus::TIMEOUT || status.GetStatus() == NYdb::EStatus::CLIENT_DEADLINE_EXCEEDED, status.GetIssues().ToString()); + auto resultSet = result.GetResultSetParser(0); + resultSet.TryNextRow(); - if (status.GetStatus() == NYdb::EStatus::CLIENT_DEADLINE_EXCEEDED) { - // Wait until last forget is not finished - Sleep(TDuration::Seconds(30)); + ui64 numberRows = resultSet.ColumnParser(0).GetUint64(); + if (!numberRows) { + return; } + + Cerr << "Rows remains: " << numberRows << ", elapsed time: " << TInstant::Now() - forgetChecksStart << "\n"; + Sleep(TDuration::Seconds(1)); } - UNIT_ASSERT_C(false, "Forget operation timeout"); + UNIT_ASSERT_C(false, "Results removing timeout"); } Y_UNIT_TEST(ExecuteScriptWithLargeStrings) { diff --git a/ydb/tests/tools/kqprun/src/kqp_runner.cpp b/ydb/tests/tools/kqprun/src/kqp_runner.cpp index 21c31f74ade2..14d06038aaf5 100644 --- a/ydb/tests/tools/kqprun/src/kqp_runner.cpp +++ b/ydb/tests/tools/kqprun/src/kqp_runner.cpp @@ -115,6 +115,10 @@ class TKqpRunner::TImpl { return false; } + if (!status.Issues.Empty()) { + Cerr << CerrColors_.Red() << "Forget operation finished with issues:" << CerrColors_.Default() << Endl << status.Issues.ToString() << Endl; + } + return true; } diff --git a/ydb/tests/tools/kqprun/src/ydb_setup.cpp b/ydb/tests/tools/kqprun/src/ydb_setup.cpp index 68ad39e70eb0..b2ed012e5f57 100644 --- a/ydb/tests/tools/kqprun/src/ydb_setup.cpp +++ b/ydb/tests/tools/kqprun/src/ydb_setup.cpp @@ -225,7 +225,7 @@ class TYdbSetup::TImpl { NKikimr::NKqp::TEvForgetScriptExecutionOperationResponse::TPtr ForgetScriptExecutionOperationRequest(const TString& operation) const { NKikimr::NOperationId::TOperationId operationId(operation); - auto event = MakeHolder(Settings_.DomainName, operationId, TInstant::Max()); + auto event = MakeHolder(Settings_.DomainName, operationId); return RunKqpProxyRequest(std::move(event)); }