Skip to content

Commit

Permalink
YQ-3225 detached script execution results forgetting (#5716)
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA authored Jun 22, 2024
1 parent 472cd54 commit df3736b
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 55 deletions.
2 changes: 1 addition & 1 deletion ydb/core/grpc_services/rpc_forget_operation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ class TForgetOperationRPC: public TRpcOperationRequestActor<TForgetOperationRPC,
}

void SendForgetScriptExecutionOperation() {
Send(NKqp::MakeKqpProxyID(SelfId().NodeId()), new NKqp::TEvForgetScriptExecutionOperation(DatabaseName, OperationId, Request->GetDeadline()));
Send(NKqp::MakeKqpProxyID(SelfId().NodeId()), new NKqp::TEvForgetScriptExecutionOperation(DatabaseName, OperationId));
}

public:
Expand Down
11 changes: 4 additions & 7 deletions ydb/core/kqp/common/events/script_executions.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,13 @@ enum EFinalizationStatus : i32 {
};

struct TEvForgetScriptExecutionOperation : public NActors::TEventLocal<TEvForgetScriptExecutionOperation, TKqpScriptExecutionEvents::EvForgetScriptExecutionOperation> {
explicit TEvForgetScriptExecutionOperation(const TString& database, const NOperationId::TOperationId& id, TInstant deadline)
TEvForgetScriptExecutionOperation(const TString& database, const NOperationId::TOperationId& id)
: Database(database)
, OperationId(id)
, Deadline(deadline)
{
}
{}

TString Database;
NOperationId::TOperationId OperationId;
TInstant Deadline;
const TString Database;
const NOperationId::TOperationId OperationId;
};

struct TEvForgetScriptExecutionOperationResponse : public NActors::TEventLocal<TEvForgetScriptExecutionOperationResponse, TKqpScriptExecutionEvents::EvForgetScriptExecutionOperationResponse> {
Expand Down
78 changes: 43 additions & 35 deletions ydb/core/kqp/proxy_service/kqp_script_executions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -871,11 +871,10 @@ class TForgetScriptExecutionOperationQueryActor : public TQueryBase {
static constexpr i64 MAX_NUMBER_ROWS_IN_BATCH = 100000;

public:
TForgetScriptExecutionOperationQueryActor(const TString& executionId, const TString& database, TInstant operationDeadline)
TForgetScriptExecutionOperationQueryActor(const TString& executionId, const TString& database)
: TQueryBase(__func__, executionId)
, ExecutionId(executionId)
, Database(database)
, Deadline(operationDeadline)
{}

void OnRunQuery() override {
Expand All @@ -888,14 +887,36 @@ class TForgetScriptExecutionOperationQueryActor : public TQueryBase {
FROM `.metadata/script_executions`
WHERE database = $database AND execution_id = $execution_id;
DELETE
FROM `.metadata/script_execution_leases`
WHERE database = $database AND execution_id = $execution_id;
)";

NYdb::TParamsBuilder params;
params
.AddParam("$database")
.Utf8(Database)
.Build()
.AddParam("$execution_id")
.Utf8(ExecutionId)
.Build();

RunDataQuery(sql, &params);
SetQueryResultHandler(&TForgetScriptExecutionOperationQueryActor::OnOperationDeleted, "Forget script execution operation");
}

void OnOperationDeleted() {
SendResponse(Ydb::StatusIds::SUCCESS, {});

TString sql = R"(
-- TForgetScriptExecutionOperationQueryActor::OnOperationDeleted
DECLARE $database AS Text;
DECLARE $execution_id AS Text;
SELECT MAX(result_set_id) AS max_result_set_id, MAX(row_id) AS max_row_id
FROM `.metadata/result_sets`
WHERE database = $database AND execution_id = $execution_id AND
(expire_at > CurrentUtcTimestamp() OR expire_at IS NULL);
DELETE
FROM `.metadata/script_execution_leases`
WHERE database = $database AND execution_id = $execution_id;
)";

NYdb::TParamsBuilder params;
Expand All @@ -908,7 +929,7 @@ class TForgetScriptExecutionOperationQueryActor : public TQueryBase {
.Build();

RunDataQuery(sql, &params);
SetQueryResultHandler(&TForgetScriptExecutionOperationQueryActor::OnGetResultsInfo, "Forget script execution operation");
SetQueryResultHandler(&TForgetScriptExecutionOperationQueryActor::OnGetResultsInfo, "Get results info");
}

void OnGetResultsInfo() {
Expand Down Expand Up @@ -939,7 +960,6 @@ class TForgetScriptExecutionOperationQueryActor : public TQueryBase {
}
MaxRowId = *maxRowId;

ClearTimeInfo();
DeleteScriptResults();
}

Expand Down Expand Up @@ -985,34 +1005,34 @@ class TForgetScriptExecutionOperationQueryActor : public TQueryBase {
return;
}

if (TInstant::Now() + 2 * GetAverageTime() >= Deadline) {
Finish(Ydb::StatusIds::TIMEOUT, ForgetOperationTimeoutIssues());
return;
}

DeleteScriptResults();
}

void OnFinish(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) override {
Send(Owner, new TEvForgetScriptExecutionOperationResponse(status, std::move(issues)));
SendResponse(status, std::move(issues));
}

static NYql::TIssues ForgetOperationTimeoutIssues() {
return { NYql::TIssue("Forget script execution operation timeout") };
private:
void SendResponse(Ydb::StatusIds::StatusCode status, NYql::TIssues issues) {
if (ResponseSent) {
return;
}
ResponseSent = true;
Send(Owner, new TEvForgetScriptExecutionOperationResponse(status, std::move(issues)));
}

private:
TString ExecutionId;
TString Database;
TInstant Deadline;
const TString ExecutionId;
const TString Database;
i64 NumberRowsInBatch = 0;
i64 MaxRowId = 0;
bool ResponseSent = false;
};

class TForgetScriptExecutionOperationActor : public TActorBootstrapped<TForgetScriptExecutionOperationActor> {
public:
using TForgetOperationRetryActor = TQueryRetryActor<TForgetScriptExecutionOperationQueryActor, TEvForgetScriptExecutionOperationResponse, TString, TString, TInstant>;
using TForgetOperationRetryActor = TQueryRetryActor<TForgetScriptExecutionOperationQueryActor, TEvForgetScriptExecutionOperationResponse, TString, TString>;

public:
explicit TForgetScriptExecutionOperationActor(TEvForgetScriptExecutionOperation::TPtr ev)
: Request(std::move(ev))
{}
Expand Down Expand Up @@ -1050,19 +1070,7 @@ class TForgetScriptExecutionOperationActor : public TActorBootstrapped<TForgetSc
}

KQP_PROXY_LOG_D("[TForgetScriptExecutionOperationActor] ExecutionId: " << ExecutionId << ", lease check success. Start TForgetOperationRetryActor");

TDuration minDelay = TDuration::MilliSeconds(10);
TDuration maxTime = Request->Get()->Deadline - TInstant::Now();
if (maxTime <= minDelay) {
Reply(Ydb::StatusIds::TIMEOUT, TForgetScriptExecutionOperationQueryActor::ForgetOperationTimeoutIssues());
return;
}

Register(new TForgetOperationRetryActor(
SelfId(),
TForgetOperationRetryActor::IRetryPolicy::GetExponentialBackoffPolicy(TForgetOperationRetryActor::Retryable, minDelay, TDuration::MilliSeconds(200), TDuration::Seconds(1), std::numeric_limits<size_t>::max(), maxTime),
ExecutionId, Request->Get()->Database, TInstant::Now() + maxTime
));
Register(new TForgetOperationRetryActor(SelfId(), ExecutionId, Request->Get()->Database));
}

void Handle(TEvForgetScriptExecutionOperationResponse::TPtr& ev) {
Expand Down Expand Up @@ -1090,7 +1098,7 @@ class TForgetScriptExecutionOperationActor : public TActorBootstrapped<TForgetSc
}

private:
TEvForgetScriptExecutionOperation::TPtr Request;
const TEvForgetScriptExecutionOperation::TPtr Request;
TString ExecutionId;
bool ExecutionEntryExists = true;
};
Expand Down
33 changes: 22 additions & 11 deletions ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1740,22 +1740,33 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
UNIT_ASSERT_VALUES_EQUAL(rowsFetched, numberRows);

// Test forget operation
TInstant forgetOperationTimeout = TInstant::Now() + NSan::PlainOrUnderSanitizer(TDuration::Minutes(5), TDuration::Minutes(20));
NYdb::NOperation::TOperationClient operationClient(kikimr->GetDriver());
while (TInstant::Now() < forgetOperationTimeout) {
auto status = operationClient.Forget(scriptExecutionOperation.Id()).ExtractValueSync();
if (status.GetStatus() == NYdb::EStatus::SUCCESS || status.GetStatus() == NYdb::EStatus::NOT_FOUND) {
return;
}
auto status = operationClient.Forget(scriptExecutionOperation.Id()).ExtractValueSync();
UNIT_ASSERT_C(status.IsSuccess(), status.GetIssues().ToOneLineString());

const TString countResultsQuery = fmt::format(R"(
SELECT COUNT(*)
FROM `.metadata/result_sets`
WHERE execution_id = "{execution_id}" AND expire_at > CurrentUtcTimestamp();
)", "execution_id"_a=readyOp.Metadata().ExecutionId);

TInstant forgetChecksStart = TInstant::Now();
while (TInstant::Now() - forgetChecksStart <= TDuration::Minutes(5)) {
NYdb::NTable::TDataQueryResult result = session.ExecuteDataQuery(countResultsQuery, NYdb::NTable::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());

UNIT_ASSERT_C(status.GetStatus() == NYdb::EStatus::ABORTED || status.GetStatus() == NYdb::EStatus::TIMEOUT || status.GetStatus() == NYdb::EStatus::CLIENT_DEADLINE_EXCEEDED, status.GetIssues().ToString());
auto resultSet = result.GetResultSetParser(0);
resultSet.TryNextRow();

if (status.GetStatus() == NYdb::EStatus::CLIENT_DEADLINE_EXCEEDED) {
// Wait until last forget is not finished
Sleep(TDuration::Seconds(30));
ui64 numberRows = resultSet.ColumnParser(0).GetUint64();
if (!numberRows) {
return;
}

Cerr << "Rows remains: " << numberRows << ", elapsed time: " << TInstant::Now() - forgetChecksStart << "\n";
Sleep(TDuration::Seconds(1));
}
UNIT_ASSERT_C(false, "Forget operation timeout");
UNIT_ASSERT_C(false, "Results removing timeout");
}

Y_UNIT_TEST(ExecuteScriptWithLargeStrings) {
Expand Down
4 changes: 4 additions & 0 deletions ydb/tests/tools/kqprun/src/kqp_runner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,10 @@ class TKqpRunner::TImpl {
return false;
}

if (!status.Issues.Empty()) {
Cerr << CerrColors_.Red() << "Forget operation finished with issues:" << CerrColors_.Default() << Endl << status.Issues.ToString() << Endl;
}

return true;
}

Expand Down
2 changes: 1 addition & 1 deletion ydb/tests/tools/kqprun/src/ydb_setup.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ class TYdbSetup::TImpl {

NKikimr::NKqp::TEvForgetScriptExecutionOperationResponse::TPtr ForgetScriptExecutionOperationRequest(const TString& operation) const {
NKikimr::NOperationId::TOperationId operationId(operation);
auto event = MakeHolder<NKikimr::NKqp::TEvForgetScriptExecutionOperation>(Settings_.DomainName, operationId, TInstant::Max());
auto event = MakeHolder<NKikimr::NKqp::TEvForgetScriptExecutionOperation>(Settings_.DomainName, operationId);

return RunKqpProxyRequest<NKikimr::NKqp::TEvForgetScriptExecutionOperation, NKikimr::NKqp::TEvForgetScriptExecutionOperationResponse>(std::move(event));
}
Expand Down

0 comments on commit df3736b

Please sign in to comment.