diff --git a/ydb/core/grpc_services/local_grpc/local_grpc.h b/ydb/core/grpc_services/local_grpc/local_grpc.h new file mode 100644 index 000000000000..c122a7280b62 --- /dev/null +++ b/ydb/core/grpc_services/local_grpc/local_grpc.h @@ -0,0 +1,106 @@ +#pragma once + +#include +#include + +namespace NKikimr::NGRpcService::NLocalGrpc { + +class TContextBase : public NYdbGrpc::IRequestContextBase { +public: + TContextBase(std::shared_ptr baseRequest) + : BaseRequest_{std::move(baseRequest)} + , AuthState_{/*needAuth*/true} + {} + + virtual void ReplyWithYdbStatus(Ydb::StatusIds::StatusCode status) = 0; + + NYdbGrpc::TAuthState& GetAuthState() override { + return AuthState_; + } + + void ReplyUnauthenticated(const TString& in) override { + ReplyError(grpc::UNAUTHENTICATED, in); + } + + void ReplyError(grpc::StatusCode code, const TString& msg, const TString& details = "") override { + NYql::TIssue issue{TStringBuilder() << "grpc code: " << code << ", msg: " << msg << " (" << details << ")"}; + issue.SetCode(code, NYql::ESeverity::TSeverityIds_ESeverityId_S_ERROR); + RaiseIssue(issue); + ReplyWithYdbStatus(Ydb::StatusIds::GENERIC_ERROR); + } + + TInstant Deadline() const override { + return BaseRequest_->GetDeadline(); + } + + TSet GetPeerMetaKeys() const override { + Y_ABORT("TLocalGrpcContext::GetPeerMetaKeys unimplemented"); + return {}; + } + + TVector GetPeerMetaValues(TStringBuf key) const override { + auto value = BaseRequest_->GetPeerMetaValues(TString{key}); + if (value) { + return {std::move(*value)}; + } + return {}; + } + + TVector FindClientCert() const override { + return BaseRequest_->FindClientCert(); + } + + grpc_compression_level GetCompressionLevel() const override { + return GRPC_COMPRESS_LEVEL_NONE; + } + + google::protobuf::Arena* GetArena() override { + return &Arena_; + } + + void AddTrailingMetadata(const TString& key, const TString& value) override { + Y_UNUSED(key, value); + } + + void UseDatabase(const TString& database) override { + Y_UNUSED(database); + } + + // Streaming part + + void SetNextReplyCallback(TOnNextReply&& cb) override { + Y_UNUSED(cb); + } + void FinishStreamingOk() override {} + TAsyncFinishResult GetFinishFuture() override { return {}; } + TString GetPeer() const override { return "localhost"; } + bool SslServer() const override { return false; } + bool IsClientLost() const override { return false; } + +public: + NYql::TIssues GetIssues() { + return IssueManager_.GetIssues(); + } + +protected: + const IRequestCtx& GetBaseRequest() const noexcept { + return *BaseRequest_; + } + + IRequestCtx& GetBaseRequest() noexcept { + return *BaseRequest_; + } + + void RaiseIssue(const NYql::TIssue& issue) { + IssueManager_.RaiseIssue(issue); + } + +private: + std::shared_ptr BaseRequest_; + NYdbGrpc::TAuthState AuthState_; + + NYql::TIssueManager IssueManager_; + google::protobuf::Arena Arena_; +}; + +} // namespace NKikimr::NGRpcService::NLocalGrpc diff --git a/ydb/core/grpc_services/local_rpc/local_rpc.cpp b/ydb/core/grpc_services/local_rpc/local_rpc.cpp new file mode 100644 index 000000000000..d2aeb613fca1 --- /dev/null +++ b/ydb/core/grpc_services/local_rpc/local_rpc.cpp @@ -0,0 +1,48 @@ +#include "local_rpc.h" + + +namespace NKikimr::NRpcService { + +namespace { + +class TFacilityProviderCommon : public NGRpcService::IFacilityProvider { +public: + explicit TFacilityProviderCommon(ui64 channelBufferSize) + : ChannelBufferSize(channelBufferSize) + {} + + ui64 GetChannelBufferSize() const override { + return ChannelBufferSize; + } + + virtual ~TFacilityProviderCommon() { + } + +private: + const ui64 ChannelBufferSize; +}; + +class TFacilityProviderSameMailbox : public TFacilityProviderCommon { + using TBase = TFacilityProviderCommon; + +public: + TFacilityProviderSameMailbox(TActorContext actorContext, ui64 channelBufferSize) + : TBase(channelBufferSize) + , ActorContext(actorContext) + {} + + TActorId RegisterActor(IActor* actor) const override { + return ActorContext.RegisterWithSameMailbox(actor); + } + +private: + const TActorContext ActorContext; +}; + +} // anonymous namespace + +TFacilityProviderPtr CreateFacilityProviderSameMailbox(TActorContext actorContext, ui64 channelBufferSize) { + return std::make_shared(actorContext, channelBufferSize); +} + +} // namespace NKikimr::NRpcService diff --git a/ydb/core/grpc_services/local_rpc/local_rpc.h b/ydb/core/grpc_services/local_rpc/local_rpc.h index 362ae2be87b3..8c1710c03eb2 100644 --- a/ydb/core/grpc_services/local_rpc/local_rpc.h +++ b/ydb/core/grpc_services/local_rpc/local_rpc.h @@ -1,6 +1,7 @@ #pragma once #include +#include #include @@ -26,17 +27,112 @@ class TPromiseWrapper { }; template -class TLocalRpcCtx : public NGRpcService::IRequestOpCtx { +class TLocalRpcCtxImplData { +protected: + TCbWrapper CbWrapper; + NYql::TIssueManager IssueManager; + std::unique_ptr CostInfo; + + template + TLocalRpcCtxImplData(TCb&& cb) + : CbWrapper(std::forward(cb)) + {} +}; + +template +class TLocalRpcCtxImpl; + +template +class TLocalRpcCtxImpl : public NGRpcService::IRequestNoOpCtx, public TLocalRpcCtxImplData { +protected: + using TBase = TLocalRpcCtxImplData; + + template + TLocalRpcCtxImpl(TCb&& cb) + : TBase(std::forward(cb)) + {} +}; + +template +class TLocalRpcCtxImpl : public NGRpcService::IRequestOpCtx, public TLocalRpcCtxImplData { +protected: + using TBase = TLocalRpcCtxImplData; + + template + TLocalRpcCtxImpl(TCb&& cb) + : TBase(std::forward(cb)) + {} + +public: + using TResp = typename TRpc::TResponse; + + void SendResult(const google::protobuf::Message& result, Ydb::StatusIds::StatusCode status) override { + TResp resp; + auto deferred = resp.mutable_operation(); + deferred->set_ready(true); + deferred->set_status(status); + if (TBase::CostInfo) { + deferred->mutable_cost_info()->CopyFrom(*TBase::CostInfo); + } + NYql::IssuesToMessage(TBase::IssueManager.GetIssues(), deferred->mutable_issues()); + auto data = deferred->mutable_result(); + data->PackFrom(result); + TBase::CbWrapper(resp); + } + + void SendResult(const google::protobuf::Message& result, + Ydb::StatusIds::StatusCode status, + const google::protobuf::RepeatedPtrField& message) override + { + TResp resp; + auto deferred = resp.mutable_operation(); + deferred->set_ready(true); + deferred->set_status(status); + deferred->mutable_issues()->MergeFrom(message); + if (TBase::CostInfo) { + deferred->mutable_cost_info()->CopyFrom(*TBase::CostInfo); + } + auto data = deferred->mutable_result(); + data->PackFrom(result); + TBase::CbWrapper(resp); + } + + void SendResult(Ydb::StatusIds::StatusCode status, + const google::protobuf::RepeatedPtrField& message) override + { + TResp resp; + auto deferred = resp.mutable_operation(); + deferred->set_ready(true); + deferred->set_status(status); + deferred->mutable_issues()->MergeFrom(message); + if (TBase::CostInfo) { + deferred->mutable_cost_info()->CopyFrom(*TBase::CostInfo); + } + TBase::CbWrapper(resp); + } + + void SendOperation(const Ydb::Operations::Operation& operation) override { + TResp resp; + resp.mutable_operation()->CopyFrom(operation); + TBase::CbWrapper(resp); + } +}; + +template +class TLocalRpcCtx : public TLocalRpcCtxImpl { public: + using TBase = TLocalRpcCtxImpl; using TResp = typename TRpc::TResponse; + using EStreamCtrl = NYdbGrpc::IRequestContextBase::EStreamCtrl; + template TLocalRpcCtx(TProto&& req, TCb&& cb, const TString& databaseName, const TMaybe& token, const TMaybe& requestType, bool internalCall) - : Request(std::forward(req)) - , CbWrapper(std::forward(cb)) + : TBase(std::forward(cb)) + , Request(std::forward(req)) , DatabaseName(databaseName) , RequestType(requestType) , InternalCall(internalCall) @@ -67,8 +163,10 @@ class TLocalRpcCtx : public NGRpcService::IRequestOpCtx { return EmptySerializedTokenMessage_; } - const TMaybe GetPeerMetaValues(const TString&) const override { - Y_ABORT("Unimplemented"); + const TMaybe GetPeerMetaValues(const TString& key) const override { + if (key == NYdb::YDB_DATABASE_HEADER) { + return GetDatabaseName(); + } return TMaybe{}; } @@ -79,8 +177,8 @@ class TLocalRpcCtx : public NGRpcService::IRequestOpCtx { void ReplyWithYdbStatus(Ydb::StatusIds::StatusCode status) override { TResp resp; - NGRpcService::TCommonResponseFiller::Fill(resp, IssueManager.GetIssues(), CostInfo.get(), status); - CbWrapper(resp); + NGRpcService::TCommonResponseFiller::Fill(resp, TBase::IssueManager.GetIssues(), TBase::CostInfo.get(), status); + TBase::CbWrapper(resp); } TString GetPeerName() const override { @@ -91,63 +189,12 @@ class TLocalRpcCtx : public NGRpcService::IRequestOpCtx { return TRpc::TRequest::descriptor()->name(); } - void SendResult(const google::protobuf::Message& result, Ydb::StatusIds::StatusCode status) override { - TResp resp; - auto deferred = resp.mutable_operation(); - deferred->set_ready(true); - deferred->set_status(status); - if (CostInfo) { - deferred->mutable_cost_info()->CopyFrom(*CostInfo); - } - NYql::IssuesToMessage(IssueManager.GetIssues(), deferred->mutable_issues()); - auto data = deferred->mutable_result(); - data->PackFrom(result); - CbWrapper(resp); - } - - void SendResult(const google::protobuf::Message& result, - Ydb::StatusIds::StatusCode status, - const google::protobuf::RepeatedPtrField& message) override - { - TResp resp; - auto deferred = resp.mutable_operation(); - deferred->set_ready(true); - deferred->set_status(status); - deferred->mutable_issues()->MergeFrom(message); - if (CostInfo) { - deferred->mutable_cost_info()->CopyFrom(*CostInfo); - } - auto data = deferred->mutable_result(); - data->PackFrom(result); - CbWrapper(resp); - } - - void SendResult(Ydb::StatusIds::StatusCode status, - const google::protobuf::RepeatedPtrField& message) override - { - TResp resp; - auto deferred = resp.mutable_operation(); - deferred->set_ready(true); - deferred->set_status(status); - deferred->mutable_issues()->MergeFrom(message); - if (CostInfo) { - deferred->mutable_cost_info()->CopyFrom(*CostInfo); - } - CbWrapper(resp); - } - - void SendOperation(const Ydb::Operations::Operation& operation) override { - TResp resp; - resp.mutable_operation()->CopyFrom(operation); - CbWrapper(resp); - } - void RaiseIssue(const NYql::TIssue& issue) override { - IssueManager.RaiseIssue(issue); + TBase::IssueManager.RaiseIssue(issue); } void RaiseIssues(const NYql::TIssues& issues) override { - IssueManager.RaiseIssues(issues); + TBase::IssueManager.RaiseIssues(issues); } google::protobuf::Arena* GetArena() override { @@ -204,8 +251,8 @@ class TLocalRpcCtx : public NGRpcService::IRequestOpCtx { } void SetCostInfo(float consumed_units) override { - CostInfo = std::make_unique(); - CostInfo->set_consumed_units(consumed_units); + TBase::CostInfo = std::make_unique(); + TBase::CostInfo->set_consumed_units(consumed_units); } void SetDiskQuotaExceeded(bool disk) override { @@ -242,21 +289,17 @@ class TLocalRpcCtx : public NGRpcService::IRequestOpCtx { void Reply(NProtoBuf::Message *r, ui32) override { TResp* resp = dynamic_cast(r); Y_ABORT_UNLESS(resp); - CbWrapper(*resp); + TBase::CbWrapper(*resp); } private: typename TRpc::TRequest Request; - TCbWrapper CbWrapper; const TString DatabaseName; const TMaybe RequestType; const bool InternalCall; TIntrusiveConstPtr InternalToken; const TString EmptySerializedTokenMessage_; - - NYql::TIssueManager IssueManager; google::protobuf::Arena Arena; - std::unique_ptr CostInfo; std::unique_ptr QuotaExceeded; }; @@ -314,5 +357,159 @@ TActorId DoLocalRpcSameMailbox(typename TRpc::TRequest&& proto, std::function(std::move(proto), std::move(cb), database, token, Nothing(), ctx, internalCall); } +//// Streaming part + +template +class TStreamReadProcessor : public NGRpcService::NLocalGrpc::TContextBase { + using TBase = NGRpcService::NLocalGrpc::TContextBase; + using TOnResponseCallback = std::function; + +public: + TStreamReadProcessor(std::shared_ptr baseRequest) + : TBase(std::move(baseRequest)) + {} + + void Read(TOnResponseCallback callback) { + if (!ResponseQueue.empty()) { + callback(DoPopResponse()); + return; + } + + Y_ABORT_UNLESS(!Finished, "Try to read from finished stream"); + Y_ABORT_UNLESS(!OnResponseCallback, "Can not multiply read from stream"); + OnResponseCallback = callback; + } + + void Cancel() { + FinishPromise.SetValue(EFinishStatus::CANCEL); + Finished = true; + } + + bool IsFinished() const { + return Finished; + } + + bool HasData() const { + return !Finished || !ResponseQueue.empty(); + } + +protected: + const NProtoBuf::Message* GetRequest() const override { + return GetBaseRequest().GetRequest(); + } + + NProtoBuf::Message* GetRequestMut() override { + return GetBaseRequest().GetRequestMut(); + } + + TAsyncFinishResult GetFinishFuture() override { + return FinishPromise.GetFuture(); + } + + bool IsClientLost() const override { + return FinishPromise.HasValue(); + } + + void SetNextReplyCallback(TOnNextReply&& callback) override { + NextReplyCallback = callback; + } + + void FinishStreamingOk() override { + ReplyWithYdbStatus(Ydb::StatusIds::SUCCESS); + } + + void ReplyWithYdbStatus(Ydb::StatusIds::StatusCode status) override { + TResponsePart response; + NGRpcService::TCommonResponseFiller::Fill(response, GetIssues(), nullptr, status); + DoPushResponse(std::move(response), EStreamCtrl::FINISH); + + if (status != Ydb::StatusIds::SUCCESS) { + FinishPromise.SetValue(EFinishStatus::ERROR); + } + } + + void Reply(NProtoBuf::Message* proto, ui32 status = 0) override { + Y_UNUSED(proto, status); + Y_ABORT("Expected TLocalGrpcContext::Reply only for stream"); + } + + void Reply(grpc::ByteBuffer* bytes, ui32 status = 0, EStreamCtrl ctrl = EStreamCtrl::CONT) override { + Y_UNUSED(status); + + grpc::Slice slice; + if (auto status = bytes->TrySingleSlice(&slice); !status.ok()) { + ReplyError(status.error_code(), status.error_message(), status.error_details()); + return; + } + + TResponsePart response; + if (!response.ParseFromArray(slice.begin(), slice.size())) { + RaiseIssue(NYql::TIssue("Response part is corrupted")); + ReplyWithYdbStatus(Ydb::StatusIds::INTERNAL_ERROR); + return; + } + DoPushResponse(std::move(response), ctrl); + } + +private: + TResponsePart DoPopResponse() { + Y_ABORT_UNLESS(!ResponseQueue.empty(), "Try to pop response from empty queue"); + + auto response = std::move(ResponseQueue.front()); + ResponseQueue.pop(); + if (NextReplyCallback && !Finished) { + NextReplyCallback(ResponseQueue.size()); + } + return response; + } + + void DoPushResponse(TResponsePart&& response, EStreamCtrl ctrl) { + if (Finished) { + return; + } + Finished = ctrl == EStreamCtrl::FINISH; + + ResponseQueue.emplace(std::move(response)); + if (OnResponseCallback) { + OnResponseCallback(DoPopResponse()); + OnResponseCallback = nullptr; + } + } + +private: + bool Finished = false; + std::queue ResponseQueue; + NThreading::TPromise FinishPromise = NThreading::NewPromise(); + + TOnNextReply NextReplyCallback; + TOnResponseCallback OnResponseCallback; +}; + +template +using TStreamReadProcessorPtr = TIntrusivePtr>; + +using TFacilityProviderPtr = std::shared_ptr; +TFacilityProviderPtr CreateFacilityProviderSameMailbox(TActorContext actorContext, ui64 channelBufferSize); + +using TRpcActorCreator = std::function p, const NGRpcService::IFacilityProvider& f))>; + +template +TStreamReadProcessorPtr DoLocalRpcStreamSameMailbox(typename TRpc::TRequest&& proto, const TString& database, const TMaybe& token, const TMaybe& requestType, TFacilityProviderPtr facilityProvider, TRpcActorCreator actorCreator, bool internalCall = false) { + using TCbWrapper = std::function; + using TLocalRpcStreamCtx = TStreamReadProcessor; + + auto localRpcCtx = std::make_shared>(std::move(proto), [](const typename TRpc::TResponse&) {}, database, token, requestType, internalCall); + auto localRpcStreamCtx = MakeIntrusive(std::move(localRpcCtx)); + auto localRpcRequest = std::make_unique(localRpcStreamCtx.Get(), [](std::unique_ptr, const NGRpcService::IFacilityProvider&) {}); + actorCreator(std::move(localRpcRequest), *facilityProvider); + + return localRpcStreamCtx; +} + +template +TStreamReadProcessorPtr DoLocalRpcStreamSameMailbox(typename TRpc::TRequest&& proto, const TString& database, const TMaybe& token, TFacilityProviderPtr facilityProvider, TRpcActorCreator actorCreator, bool internalCall = false) { + return DoLocalRpcStreamSameMailbox(std::move(proto), database, token, Nothing(), std::move(facilityProvider), std::move(actorCreator), internalCall); +} + } // namespace NRpcService } // namespace NKikimr diff --git a/ydb/core/grpc_services/local_rpc/ya.make b/ydb/core/grpc_services/local_rpc/ya.make index 75abdcefa0f1..30e335c35dae 100644 --- a/ydb/core/grpc_services/local_rpc/ya.make +++ b/ydb/core/grpc_services/local_rpc/ya.make @@ -1,6 +1,7 @@ LIBRARY() SRCS( + local_rpc.cpp local_rpc.h ) diff --git a/ydb/core/kqp/proxy_service/kqp_script_executions.cpp b/ydb/core/kqp/proxy_service/kqp_script_executions.cpp index a71e065d2cdf..4e017e50965c 100644 --- a/ydb/core/kqp/proxy_service/kqp_script_executions.cpp +++ b/ydb/core/kqp/proxy_service/kqp_script_executions.cpp @@ -1984,9 +1984,6 @@ std::optional> GetTtlFromSerializedMeta(const TS } class TGetScriptExecutionResultQueryActor : public TQueryBase { - static constexpr i64 MAX_NUMBER_ROWS_IN_BATCH = 100000; - static constexpr i64 MAX_BATCH_SIZE = 20_MB; - public: TGetScriptExecutionResultQueryActor(const TString& database, const TString& executionId, i32 resultSetIndex, i64 offset, i64 rowsLimit, i64 sizeLimit, TInstant deadline) : TQueryBase(__func__, executionId) @@ -1994,40 +1991,22 @@ class TGetScriptExecutionResultQueryActor : public TQueryBase { , ExecutionId(executionId) , ResultSetIndex(resultSetIndex) , Offset(offset) - , RowsLimit(rowsLimit ? rowsLimit : std::numeric_limits::max()) - , SizeLimit(sizeLimit ? sizeLimit : std::numeric_limits::max()) + , RowsLimit(rowsLimit) + , SizeLimit(sizeLimit) , Deadline(rowsLimit ? TInstant::Max() : deadline) {} void OnRunQuery() override { TString sql = R"( - -- TGetScriptExecutionResultQuery::OnRunQuery + -- TGetScriptExecutionResultQueryActor::OnRunQuery DECLARE $database AS Text; DECLARE $execution_id AS Text; - DECLARE $result_set_id AS Int32; - DECLARE $offset AS Int64; SELECT result_set_metas, operation_status, issues, end_ts, meta FROM `.metadata/script_executions` WHERE database = $database AND execution_id = $execution_id AND (expire_at > CurrentUtcTimestamp() OR expire_at IS NULL); - - $result_set_table = ( - SELECT row_id, accumulated_size - FROM `.metadata/result_sets` - WHERE database = $database - AND execution_id = $execution_id - AND result_set_id = $result_set_id - ); - - SELECT MAX(row_id) AS max_row_id - FROM $result_set_table - WHERE row_id >= $offset; - - SELECT MAX(accumulated_size) AS start_accumulated_size - FROM $result_set_table - WHERE row_id < $offset; )"; NYdb::TParamsBuilder params; @@ -2037,12 +2016,6 @@ class TGetScriptExecutionResultQueryActor : public TQueryBase { .Build() .AddParam("$execution_id") .Utf8(ExecutionId) - .Build() - .AddParam("$result_set_id") - .Int32(ResultSetIndex) - .Build() - .AddParam("$offset") - .Int64(Offset) .Build(); RunDataQuery(sql, ¶ms); @@ -2050,121 +2023,81 @@ class TGetScriptExecutionResultQueryActor : public TQueryBase { } void OnGetResultsInfo() { - if (ResultSets.size() != 3) { + if (ResultSets.size() != 1) { Finish(Ydb::StatusIds::INTERNAL_ERROR, "Unexpected database response"); return; } - { // columns meta - NYdb::TResultSetParser result(ResultSets[0]); - - if (!result.TryNextRow()) { - Finish(Ydb::StatusIds::NOT_FOUND, "Script execution not found"); - return; - } - - const TMaybe operationStatus = result.ColumnParser("operation_status").GetOptionalInt32(); - if (!operationStatus) { - Finish(Ydb::StatusIds::BAD_REQUEST, "Results are not ready"); - return; - } - - const auto serializedMeta = result.ColumnParser("meta").GetOptionalJsonDocument(); - if (!serializedMeta) { - Finish(Ydb::StatusIds::INTERNAL_ERROR, "Missing operation metainformation"); - return; - } - - const auto endTs = result.ColumnParser("end_ts").GetOptionalTimestamp(); - if (!endTs) { - Finish(Ydb::StatusIds::INTERNAL_ERROR, "Missing operation end timestamp"); - return; - } - - const auto ttl = GetTtlFromSerializedMeta(*serializedMeta); - if (!ttl) { - Finish(Ydb::StatusIds::INTERNAL_ERROR, "Metainformation is corrupted"); - return; - } - const auto [_, resultsTtl] = *ttl; - if (resultsTtl && (*endTs + resultsTtl) < TInstant::Now()){ - Finish(Ydb::StatusIds::NOT_FOUND, "Results are expired"); - return; - } - - Ydb::StatusIds::StatusCode operationStatusCode = static_cast(*operationStatus); - if (operationStatusCode != Ydb::StatusIds::SUCCESS) { - const TMaybe issuesSerialized = result.ColumnParser("issues").GetOptionalJsonDocument(); - if (issuesSerialized) { - Finish(operationStatusCode, DeserializeIssues(*issuesSerialized)); - } else { - Finish(operationStatusCode, "Invalid operation"); - } - return; - } - - const TMaybe serializedMetas = result.ColumnParser("result_set_metas").GetOptionalJsonDocument(); - if (!serializedMetas) { - Finish(Ydb::StatusIds::INTERNAL_ERROR, "Result meta is empty"); - return; - } + NYdb::TResultSetParser result(ResultSets[0]); + if (!result.TryNextRow()) { + Finish(Ydb::StatusIds::NOT_FOUND, "Script execution not found"); + return; + } - NJson::TJsonValue value; - if (!NJson::ReadJsonTree(*serializedMetas, &value) || value.GetType() != NJson::JSON_ARRAY) { - Finish(Ydb::StatusIds::INTERNAL_ERROR, "Result meta is corrupted"); - return; - } + const TMaybe operationStatus = result.ColumnParser("operation_status").GetOptionalInt32(); + if (!operationStatus) { + Finish(Ydb::StatusIds::BAD_REQUEST, "Results are not ready"); + return; + } - const NJson::TJsonValue* metaValue; - if (!value.GetValuePointer(ResultSetIndex, &metaValue)) { - Finish(Ydb::StatusIds::BAD_REQUEST, "Result set index is invalid"); - return; - } + const auto serializedMeta = result.ColumnParser("meta").GetOptionalJsonDocument(); + if (!serializedMeta) { + Finish(Ydb::StatusIds::INTERNAL_ERROR, "Missing operation metainformation"); + return; + } - Ydb::Query::Internal::ResultSetMeta meta; - NProtobufJson::Json2Proto(*metaValue, meta); + const auto endTs = result.ColumnParser("end_ts").GetOptionalTimestamp(); + if (!endTs) { + Finish(Ydb::StatusIds::INTERNAL_ERROR, "Missing operation end timestamp"); + return; + } - *ResultSet.mutable_columns() = meta.columns(); - ResultSet.set_truncated(meta.truncated()); + const auto ttl = GetTtlFromSerializedMeta(*serializedMeta); + if (!ttl) { + Finish(Ydb::StatusIds::INTERNAL_ERROR, "Metainformation is corrupted"); + return; + } + const auto [_, resultsTtl] = *ttl; + if (resultsTtl && (*endTs + resultsTtl) < TInstant::Now()){ + Finish(Ydb::StatusIds::NOT_FOUND, "Results are expired"); + return; + } - if (SizeLimit) { - const i64 resultSetSize = ResultSet.ByteSizeLong(); - if (resultSetSize > SizeLimit) { - Finish(Ydb::StatusIds::BAD_REQUEST, "Result set meta is larger than fetch size limit"); - return; - } - SizeLimit -= resultSetSize; + Ydb::StatusIds::StatusCode operationStatusCode = static_cast(*operationStatus); + if (operationStatusCode != Ydb::StatusIds::SUCCESS) { + const TMaybe issuesSerialized = result.ColumnParser("issues").GetOptionalJsonDocument(); + if (issuesSerialized) { + Finish(operationStatusCode, DeserializeIssues(*issuesSerialized)); + } else { + Finish(operationStatusCode, "Invalid operation"); } + return; } - { // max row id - NYdb::TResultSetParser result(ResultSets[1]); - if (result.RowsCount() != 1) { - Finish(Ydb::StatusIds::INTERNAL_ERROR, "Unexpected database response"); - return; - } + const TMaybe serializedMetas = result.ColumnParser("result_set_metas").GetOptionalJsonDocument(); + if (!serializedMetas) { + Finish(Ydb::StatusIds::INTERNAL_ERROR, "Result meta is empty"); + return; + } - result.TryNextRow(); + NJson::TJsonValue value; + if (!NJson::ReadJsonTree(*serializedMetas, &value) || value.GetType() != NJson::JSON_ARRAY) { + Finish(Ydb::StatusIds::INTERNAL_ERROR, "Result meta is corrupted"); + return; + } - const TMaybe maxRowId = result.ColumnParser("max_row_id").GetOptionalInt64(); - if (!maxRowId) { - HasMoreResults = false; - Finish(); - return; - } - MaxRowId = *maxRowId; + const NJson::TJsonValue* metaValue; + if (!value.GetValuePointer(ResultSetIndex, &metaValue)) { + Finish(Ydb::StatusIds::BAD_REQUEST, "Result set index is invalid"); + return; } - { // start accumulated size - NYdb::TResultSetParser result(ResultSets[2]); - if (result.RowsCount() != 1) { - Finish(Ydb::StatusIds::INTERNAL_ERROR, "Unexpected database response"); - return; - } + Ydb::Query::Internal::ResultSetMeta meta; + NProtobufJson::Json2Proto(*metaValue, meta); - result.TryNextRow(); - StartAccumulatedSize = result.ColumnParser("start_accumulated_size").GetOptionalInt64().GetOrElse(0); - } + *ResultSet.mutable_columns() = meta.columns(); + ResultSet.set_truncated(meta.truncated()); + ResultSetSize = ResultSet.ByteSizeLong(); ClearTimeInfo(); FetchScriptResults(); @@ -2178,7 +2111,6 @@ class TGetScriptExecutionResultQueryActor : public TQueryBase { DECLARE $result_set_id AS Int32; DECLARE $offset AS Int64; DECLARE $limit AS Uint64; - DECLARE $max_accumulated_size AS int64; SELECT database, execution_id, result_set_id, row_id, result_set FROM `.metadata/result_sets` @@ -2186,7 +2118,6 @@ class TGetScriptExecutionResultQueryActor : public TQueryBase { AND execution_id = $execution_id AND result_set_id = $result_set_id AND row_id >= $offset - AND (accumulated_size IS NULL OR accumulated_size <= $max_accumulated_size) ORDER BY database, execution_id, result_set_id, row_id LIMIT $limit; )"; @@ -2206,44 +2137,17 @@ class TGetScriptExecutionResultQueryActor : public TQueryBase { .Int64(Offset) .Build() .AddParam("$limit") - .Uint64(std::min(RowsLimit, MAX_NUMBER_ROWS_IN_BATCH)) - .Build() - .AddParam("$max_accumulated_size") - .Int64(StartAccumulatedSize + std::min(SizeLimit, MAX_BATCH_SIZE)) + .Uint64(RowsLimit ? RowsLimit + 1 : std::numeric_limits::max()) .Build(); - RunDataQuery(sql, ¶ms); - SetQueryResultHandler(&TGetScriptExecutionResultQueryActor::OnResultsFetched, TStringBuilder() << "Fetch results for offset " << Offset); + RunStreamQuery(sql, ¶ms, SizeLimit ? SizeLimit : 60_MB); + SetQueryResultHandler(&TGetScriptExecutionResultQueryActor::OnQueryResult, TStringBuilder() << "Fetch results for offset " << Offset); } - void OnResultsFetched() { - if (ResultSets.size() != 1) { - Finish(Ydb::StatusIds::INTERNAL_ERROR, "Unexpected database response"); - return; - } - - NYdb::TResultSetParser result(ResultSets[0]); - - if (result.RowsCount() == 0) { - if (ResultSet.rows_size() > 0) { - Finish(); - } else { - Finish(Ydb::StatusIds::BAD_REQUEST, "Failed to fetch script result due to size limit"); - } - return; - } - - i64 lastRowId = 0; + void OnStreamResult(NYdb::TResultSet&& resultSet) override { + NYdb::TResultSetParser result(resultSet); while (result.TryNextRow()) { - const TMaybe rowId = result.ColumnParser("row_id").GetOptionalInt64(); - if (!rowId) { - Finish(Ydb::StatusIds::INTERNAL_ERROR, "Result set row id is null"); - return; - } - lastRowId = *rowId; - const TMaybe serializedRow = result.ColumnParser("result_set").GetOptionalString(); - if (!serializedRow) { Finish(Ydb::StatusIds::INTERNAL_ERROR, "Result set row is null"); return; @@ -2253,30 +2157,32 @@ class TGetScriptExecutionResultQueryActor : public TQueryBase { Finish(Ydb::StatusIds::INTERNAL_ERROR, "Result set row is empty"); return; } + + i64 rowSize = serializedRow->size(); + if (SizeLimit && ResultSet.rows_size() && ResultSetSize + rowSize > SizeLimit) { + CancelFetchQuery(); + return; + } + + if (RowsLimit && ResultSet.rows_size() >= RowsLimit) { + CancelFetchQuery(); + return; + } - StartAccumulatedSize += serializedRow->size(); - SizeLimit -= serializedRow->size(); + ResultSetSize += rowSize; if (!ResultSet.add_rows()->ParseFromString(*serializedRow)) { Finish(Ydb::StatusIds::INTERNAL_ERROR, "Result set row is corrupted"); return; } } - if (lastRowId >= MaxRowId) { - HasMoreResults = false; - Finish(); - return; - } - - Offset += result.RowsCount(); - RowsLimit -= result.RowsCount(); - - if (RowsLimit <= 0 || SizeLimit <= 0 || TInstant::Now() + TDuration::Seconds(5) + GetAverageTime() >= Deadline) { - Finish(); - return; + if (TInstant::Now() + TDuration::Seconds(5) + GetAverageTime() >= Deadline) { + CancelFetchQuery(); } + } - FetchScriptResults(); + void OnQueryResult() override { + Finish(); } void OnFinish(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) override { @@ -2287,20 +2193,24 @@ class TGetScriptExecutionResultQueryActor : public TQueryBase { } } +private: + void CancelFetchQuery() { + HasMoreResults = true; + CancelStreamQuery(); + } + private: const TString Database; const TString ExecutionId; const i32 ResultSetIndex; - i64 Offset; - i64 RowsLimit; - i64 SizeLimit; + const i64 Offset; + const i64 RowsLimit; + const i64 SizeLimit; const TInstant Deadline; - i64 MaxRowId = 0; - i64 StartAccumulatedSize = 0; - + i64 ResultSetSize = 0; Ydb::ResultSet ResultSet; - bool HasMoreResults = true; + bool HasMoreResults = false; }; class TGetScriptExecutionResultActor : public TActorBootstrapped { diff --git a/ydb/library/query_actor/query_actor.cpp b/ydb/library/query_actor/query_actor.cpp index 979ff1f6d065..1c04b05ae46a 100644 --- a/ydb/library/query_actor/query_actor.cpp +++ b/ydb/library/query_actor/query_actor.cpp @@ -2,15 +2,12 @@ #include #include -#include -#include -#include -#include +#include #include #include +#include -#include #define LOG_T(stream) LOG_TRACE_S(*TlsActivationContext, LogComponent, LogPrefix() << stream) #define LOG_D(stream) LOG_DEBUG_S(*TlsActivationContext, LogComponent, LogPrefix() << stream) @@ -20,121 +17,104 @@ #define LOG_E(stream) LOG_ERROR_S(*TlsActivationContext, LogComponent, LogPrefix() << stream) #define LOG_C(stream) LOG_CRIT_S(*TlsActivationContext, LogComponent, LogPrefix() << stream) + namespace NKikimr { +using namespace NGRpcService; +using namespace NRpcService; +using namespace NYql; +using namespace Ydb; + +namespace { + +template +TIssues IssuesFromProtoMessage(const TProto& message) { + TIssues issues; + IssuesFromMessage(message.issues(), issues); + return issues; +} + +} // anonymous namespace + +//// TTxControl + TQueryBase::TTxControl TQueryBase::TTxControl::CommitTx() { - TTxControl control; - control.Commit = true; - return control; + return TTxControl().Commit(true); } TQueryBase::TTxControl TQueryBase::TTxControl::BeginTx() { - TTxControl control; - control.Begin = true; - return control; + return TTxControl().Begin(true); } TQueryBase::TTxControl TQueryBase::TTxControl::BeginAndCommitTx() { - TTxControl control; - control.Begin = true; - control.Commit = true; - return control; + return BeginTx().Commit(true); } TQueryBase::TTxControl TQueryBase::TTxControl::ContinueTx() { - TTxControl control; - control.Continue = true; - return control; + return TTxControl().Continue(true); } TQueryBase::TTxControl TQueryBase::TTxControl::ContinueAndCommitTx() { - TTxControl control; - control.Continue = true; - control.Commit = true; - return control; -} - -NYql::TIssues TQueryBase::TEvQueryBasePrivate::IssuesFromOperation(const Ydb::Operations::Operation& operation) { - NYql::TIssues issues; - NYql::IssuesFromMessage(operation.issues(), issues); - return issues; -} - -TQueryBase::TEvQueryBasePrivate::TEvDataQueryResult::TEvDataQueryResult(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) - : Status(status) - , Issues(std::move(issues)) -{ -} - -TQueryBase::TEvQueryBasePrivate::TEvDataQueryResult::TEvDataQueryResult(const Ydb::Table::ExecuteDataQueryResponse& resp) - : TEvDataQueryResult(resp.operation().status(), IssuesFromOperation(resp.operation())) -{ - resp.operation().result().UnpackTo(&Result); + return ContinueTx().Commit(true); } -TQueryBase::TEvQueryBasePrivate::TEvCreateSessionResult::TEvCreateSessionResult(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) - : Status(status) - , Issues(std::move(issues)) -{ -} +//// Private events -TQueryBase::TEvQueryBasePrivate::TEvCreateSessionResult::TEvCreateSessionResult(const Ydb::Table::CreateSessionResponse& resp) - : TEvCreateSessionResult(resp.operation().status(), IssuesFromOperation(resp.operation())) +TQueryBase::TEvQueryBasePrivate::TEvDataQueryResult::TEvDataQueryResult(Table::ExecuteDataQueryResponse&& response) + : Status(response.operation().status()) + , Issues(IssuesFromProtoMessage(response.operation())) { - Ydb::Table::CreateSessionResult result; - resp.operation().result().UnpackTo(&result); - SessionId = result.session_id(); + response.operation().result().UnpackTo(&Result); } -TQueryBase::TEvQueryBasePrivate::TEvDeleteSessionResult::TEvDeleteSessionResult(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) - : Status(status) - , Issues(std::move(issues)) -{ -} +TQueryBase::TEvQueryBasePrivate::TEvStreamQueryResultPart::TEvStreamQueryResultPart(Table::ExecuteScanQueryPartialResponse&& response) + : Status(response.status()) + , Issues(IssuesFromProtoMessage(response)) + , ResultSet(std::move(*response.mutable_result()->mutable_result_set())) +{} -TQueryBase::TEvQueryBasePrivate::TEvDeleteSessionResult::TEvDeleteSessionResult(const Ydb::Table::DeleteSessionResponse& resp) - : TEvDeleteSessionResult(resp.operation().status(), IssuesFromOperation(resp.operation())) +TQueryBase::TEvQueryBasePrivate::TEvCreateSessionResult::TEvCreateSessionResult(Table::CreateSessionResponse&& response) + : Status(response.operation().status()) + , Issues(IssuesFromProtoMessage(response.operation())) { + Table::CreateSessionResult result; + response.operation().result().UnpackTo(&result); + SessionId = std::move(*result.mutable_session_id()); } -TQueryBase::TEvQueryBasePrivate::TEvRollbackTransactionResponse::TEvRollbackTransactionResponse(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) - : Status(status) - , Issues(std::move(issues)) -{ -} +TQueryBase::TEvQueryBasePrivate::TEvDeleteSessionResponse::TEvDeleteSessionResponse(Table::DeleteSessionResponse&& response) + : Status(response.operation().status()) + , Issues(IssuesFromProtoMessage(response.operation())) +{} -TQueryBase::TEvQueryBasePrivate::TEvRollbackTransactionResponse::TEvRollbackTransactionResponse(const Ydb::Table::RollbackTransactionResponse& resp) - : TEvRollbackTransactionResponse(resp.operation().status(), IssuesFromOperation(resp.operation())) -{ -} +TQueryBase::TEvQueryBasePrivate::TEvRollbackTransactionResponse::TEvRollbackTransactionResponse(Table::RollbackTransactionResponse&& response) + : Status(response.operation().status()) + , Issues(IssuesFromProtoMessage(response.operation())) +{} -TQueryBase::TEvQueryBasePrivate::TEvCommitTransactionResponse::TEvCommitTransactionResponse(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) - : Status(status) - , Issues(std::move(issues)) -{ -} +TQueryBase::TEvQueryBasePrivate::TEvCommitTransactionResponse::TEvCommitTransactionResponse(Table::CommitTransactionResponse&& response) + : Status(response.operation().status()) + , Issues(IssuesFromProtoMessage(response.operation())) +{} -TQueryBase::TEvQueryBasePrivate::TEvCommitTransactionResponse::TEvCommitTransactionResponse(const Ydb::Table::CommitTransactionResponse& resp) - : TEvCommitTransactionResponse(resp.operation().status(), IssuesFromOperation(resp.operation())) -{ -} +//// TQueryBase TQueryBase::TQueryBase(ui64 logComponent, TString sessionId, TString database) : LogComponent(logComponent) , Database(std::move(database)) , SessionId(std::move(sessionId)) -{ -} +{} void TQueryBase::Registered(NActors::TActorSystem* sys, const NActors::TActorId& owner) { - NActors::TActorBootstrapped::Registered(sys, owner); + TBase::Registered(sys, owner); Owner = owner; } STRICT_STFUNC(TQueryBase::StateFunc, hFunc(TEvQueryBasePrivate::TEvDataQueryResult, Handle); + hFunc(TEvQueryBasePrivate::TEvStreamQueryResultPart, Handle); hFunc(TEvQueryBasePrivate::TEvCreateSessionResult, Handle); - hFunc(TEvQueryBasePrivate::TEvDeleteSessionResult, Handle); + hFunc(TEvQueryBasePrivate::TEvDeleteSessionResponse, Handle); hFunc(TEvQueryBasePrivate::TEvRollbackTransactionResponse, Handle); hFunc(TEvQueryBasePrivate::TEvCommitTransactionResponse, Handle); ); @@ -159,8 +139,17 @@ TString TQueryBase::GetDefaultDatabase() { return CanonizePath(AppData()->TenantName); } +//// TQueryBase session operations + +void TQueryBase::RunCreateSession() const { + using TCreateSessionRequest = TGrpcRequestOperationCall; + + Table::CreateSessionRequest request; + Subscribe(DoLocalRpc(std::move(request), Database, Nothing(), TActivationContext::ActorSystem(), true)); +} + void TQueryBase::Handle(TEvQueryBasePrivate::TEvCreateSessionResult::TPtr& ev) { - if (ev->Get()->Status == Ydb::StatusIds::SUCCESS) { + if (ev->Get()->Status == StatusIds::SUCCESS) { SessionId = ev->Get()->SessionId; DeleteSession = true; RunQuery(); @@ -171,21 +160,74 @@ void TQueryBase::Handle(TEvQueryBasePrivate::TEvCreateSessionResult::TPtr& ev) { } } -void TQueryBase::Handle(TEvQueryBasePrivate::TEvDeleteSessionResult::TPtr& ev) { - if (ev->Get()->Status != Ydb::StatusIds::SUCCESS) { +void TQueryBase::RunDeleteSession() const { + using TDeleteSessionRequest = TGrpcRequestOperationCall; + + Y_ABORT_UNLESS(SessionId); + + Table::DeleteSessionRequest request; + request.set_session_id(SessionId); + Subscribe(DoLocalRpc(std::move(request), Database, Nothing(), TActivationContext::ActorSystem(), true)); +} + +void TQueryBase::Handle(TEvQueryBasePrivate::TEvDeleteSessionResponse::TPtr& ev) { + if (ev->Get()->Status != StatusIds::SUCCESS) { LOG_W("Failed to delete session: " << ev->Get()->Status << ". Issues: " << ev->Get()->Issues.ToOneLineString()); } PassAway(); } +//// TQueryBase data query operations + +void TQueryBase::RunQuery() { + try { + OnRunQuery(); + } catch (const std::exception& ex) { + Finish(StatusIds::INTERNAL_ERROR, ex.what()); + } +} + +void TQueryBase::RunDataQuery(const TString& sql, NYdb::TParamsBuilder* params, TTxControl txControl) { + using TExecuteDataQueryRequest = TGrpcRequestOperationCall; + + Y_ABORT_UNLESS(!RunningQuery); + RequestStartTime = TInstant::Now(); + RunningQuery = true; + LOG_D("RunDataQuery: " << sql); + + Table::ExecuteDataQueryRequest request; + request.set_session_id(SessionId); + request.mutable_query()->set_yql_text(sql); + request.mutable_query_cache_policy()->set_keep_in_cache(true); + + if (params) { + *request.mutable_parameters() = NYdb::TProtoAccessor::GetProtoMap(params->Build()); + } + + auto txControlProto = request.mutable_tx_control(); + if (txControl.Begin_) { + txControlProto->mutable_begin_tx()->mutable_serializable_read_write(); + } else if (txControl.Continue_) { + Y_ABORT_UNLESS(TxId); + txControlProto->set_tx_id(TxId); + } + if (txControl.Commit_) { + CommitRequested = true; + txControlProto->set_commit_tx(true); + } + + Subscribe(DoLocalRpc(std::move(request), Database, Nothing(), TActivationContext::ActorSystem(), true)); +} + void TQueryBase::Handle(TEvQueryBasePrivate::TEvDataQueryResult::TPtr& ev) { Y_ABORT_UNLESS(RunningQuery); NumberRequests++; AmountRequestsTime += TInstant::Now() - RequestStartTime; RunningQuery = false; TxId = ev->Get()->Result.tx_meta().id(); - LOG_D("TEvDataQueryResult " << ev->Get()->Status << ", Issues: \"" << ev->Get()->Issues.ToOneLineString() << "\", SessionId: " << SessionId << ", TxId: " << TxId); - if (ev->Get()->Status == Ydb::StatusIds::SUCCESS) { + LOG_D("TEvDataQueryResult " << ev->Get()->Status << ", Issues: " << ev->Get()->Issues.ToOneLineString() << ", SessionId: " << SessionId << ", TxId: " << TxId); + + if (ev->Get()->Status == StatusIds::SUCCESS) { ResultSets.clear(); ResultSets.reserve(ev->Get()->Result.result_sets_size()); for (auto& resultSet : *ev->Get()->Result.mutable_result_sets()) { @@ -194,7 +236,7 @@ void TQueryBase::Handle(TEvQueryBasePrivate::TEvDataQueryResult::TPtr& ev) { try { (this->*QueryResultHandler)(); } catch (const std::exception& ex) { - Finish(Ydb::StatusIds::INTERNAL_ERROR, ex.what()); + Finish(StatusIds::INTERNAL_ERROR, ex.what()); } Y_ABORT_UNLESS(Finished || RunningQuery || RunningCommit); } else { @@ -202,31 +244,106 @@ void TQueryBase::Handle(TEvQueryBasePrivate::TEvDataQueryResult::TPtr& ev) { } } -void TQueryBase::Handle(TEvQueryBasePrivate::TEvRollbackTransactionResponse::TPtr& ev) { - LOG_D("RollbackTransactionResult: " << ev->Get()->Status << ". Issues: " << ev->Get()->Issues.ToOneLineString()); +void TQueryBase::CallOnQueryResult() { + OnQueryResult(); +} - // Continue finish - if (DeleteSession) { - RunDeleteSession(); - } else { - PassAway(); +//// TQueryBase stream query operations + +void TQueryBase::RunStreamQuery(const TString& sql, NYdb::TParamsBuilder* params, ui64 channelBufferSize) { + using TExecuteStreamQueryRequest = TGrpcRequestNoOperationCall; + + Y_ABORT_UNLESS(!RunningQuery); + LOG_D("RunStreamQuery: " << sql); + + Table::ExecuteScanQueryRequest request; + request.set_mode(Table::ExecuteScanQueryRequest::MODE_EXEC); + request.mutable_query()->set_yql_text(sql); + + if (params) { + *request.mutable_parameters() = NYdb::TProtoAccessor::GetProtoMap(params->Build()); } + + auto facilityProvider = CreateFacilityProviderSameMailbox(ActorContext(), channelBufferSize); + StreamQueryProcessor = DoLocalRpcStreamSameMailbox(std::move(request), Database, Nothing(), facilityProvider, &DoExecuteScanQueryRequest, true); + ReadNextStreamPart(); } -void TQueryBase::Handle(TEvQueryBasePrivate::TEvCommitTransactionResponse::TPtr& ev) { - LOG_D("CommitTransactionResult: " << ev->Get()->Status << ". Issues: " << ev->Get()->Issues.ToOneLineString()); +void TQueryBase::ReadNextStreamPart() { + Y_ABORT_UNLESS(!RunningQuery); + Y_ABORT_UNLESS(StreamQueryProcessor && StreamQueryProcessor->HasData()); + RequestStartTime = TInstant::Now(); + RunningQuery = true; + LOG_D("Start read next stream part"); - OnFinish(ev->Get()->Status, std::move(ev->Get()->Issues)); + StreamQueryProcessor->Read(GetOperationCallback()); +} - if (DeleteSession) { - RunDeleteSession(); - } else { - PassAway(); +void TQueryBase::Handle(TEvQueryBasePrivate::TEvStreamQueryResultPart::TPtr& ev) { + Y_ABORT_UNLESS(RunningQuery); + Y_ABORT_UNLESS(StreamQueryProcessor); + NumberRequests++; + AmountRequestsTime += TInstant::Now() - RequestStartTime; + RunningQuery = false; + LOG_D("TEvStreamQueryResultPart " << ev->Get()->Status << ", Issues: " << ev->Get()->Issues.ToOneLineString()); + + if (ev->Get()->Status != StatusIds::SUCCESS) { + Finish(ev->Get()->Status, std::move(ev->Get()->Issues)); + return; + } + + if (ev->Get()->ResultSet.rows_size()) { + try { + (this->*StreamResultHandler)(std::move(ev->Get()->ResultSet)); + } catch (const std::exception& ex) { + Finish(StatusIds::INTERNAL_ERROR, ex.what()); + return; + } + } + + if (StreamQueryProcessor) { + if (StreamQueryProcessor->HasData()) { + ReadNextStreamPart(); + } else { + FinishStreamRequest(); + } + } +} + +void TQueryBase::CallOnStreamResult(NYdb::TResultSet&& resultSet) { + OnStreamResult(std::move(resultSet)); +} + +void TQueryBase::CancelStreamQuery() { + LOG_D("Cancel stream request"); + Y_ABORT_UNLESS(StreamQueryProcessor); + + if (!StreamQueryProcessor->IsFinished()) { + StreamQueryProcessor->Cancel(); + } + FinishStreamRequest(); +} + +void TQueryBase::FinishStreamRequest() { + Y_ABORT_UNLESS(StreamQueryProcessor && StreamQueryProcessor->IsFinished()); + + StreamQueryProcessor = nullptr; + try { + (this->*QueryResultHandler)(); + } catch (const std::exception& ex) { + Finish(StatusIds::INTERNAL_ERROR, ex.what()); } + Y_ABORT_UNLESS(Finished || RunningQuery || RunningCommit); +} + +//// TQueryBase finish operations + +void TQueryBase::Finish() { + Finish(StatusIds::SUCCESS, TIssues()); } -void TQueryBase::Finish(Ydb::StatusIds::StatusCode status, const TString& message, bool rollbackOnError) { - NYql::TIssues issues; +void TQueryBase::Finish(StatusIds::StatusCode status, const TString& message, bool rollbackOnError) { + TIssues issues; issues.AddIssue(message); Finish(status, std::move(issues), rollbackOnError); } @@ -237,9 +354,18 @@ void TQueryBase::Finish(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issue } else { LOG_W("Finish with " << status << ", Issues: " << issues.ToOneLineString() << ", SessionId: " << SessionId << ", TxId: " << TxId); } + Finished = true; OnFinish(status, std::move(issues)); - if (rollbackOnError && !CommitRequested && TxId && status != Ydb::StatusIds::SUCCESS) { + + if (StreamQueryProcessor) { + if (!StreamQueryProcessor->IsFinished()) { + StreamQueryProcessor->Cancel(); + } + StreamQueryProcessor = nullptr; + } + + if (rollbackOnError && !CommitRequested && TxId && status != StatusIds::SUCCESS) { RollbackTransaction(); } else if (DeleteSession) { RunDeleteSession(); @@ -248,89 +374,57 @@ void TQueryBase::Finish(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issue } } -void TQueryBase::Finish() { - Finish(Ydb::StatusIds::SUCCESS, NYql::TIssues()); -} +//// TQueryBase transactions operations -void TQueryBase::RunQuery() { - try { - OnRunQuery(); - } catch (const std::exception& ex) { - Finish(Ydb::StatusIds::INTERNAL_ERROR, ex.what()); - } -} +void TQueryBase::CommitTransaction() { + using TCommitTransactionRequest = TGrpcRequestOperationCall; -void TQueryBase::RunCreateSession() { - using TEvCreateSessionRequest = NGRpcService::TGrpcRequestOperationCall; - Ydb::Table::CreateSessionRequest req; - Subscribe(NRpcService::DoLocalRpc(std::move(req), Database, Nothing(), TActivationContext::ActorSystem(), true)); -} + Y_ABORT_UNLESS(SessionId); + Y_ABORT_UNLESS(TxId); + RunningCommit = true; + LOG_D("Commit transaction: " << TxId); -void TQueryBase::RunDeleteSession() { - using TEvDeleteSessionRequest = NGRpcService::TGrpcRequestOperationCall; - Ydb::Table::DeleteSessionRequest req; - req.set_session_id(SessionId); - Subscribe(NRpcService::DoLocalRpc(std::move(req), Database, Nothing(), TActivationContext::ActorSystem(), true)); + Table::CommitTransactionRequest request; + request.set_session_id(SessionId); + request.set_tx_id(TxId); + Subscribe(DoLocalRpc(std::move(request), Database, Nothing(), TActivationContext::ActorSystem(), true)); } -void TQueryBase::RunDataQuery(const TString& sql, NYdb::TParamsBuilder* params, TTxControl txControl) { - Y_ABORT_UNLESS(!RunningQuery); - RequestStartTime = TInstant::Now(); - RunningQuery = true; - LOG_D("RunDataQuery: " << sql); - using TEvExecuteDataQueryRequest = NGRpcService::TGrpcRequestOperationCall; - Ydb::Table::ExecuteDataQueryRequest req; - req.set_session_id(SessionId); - auto* txControlProto = req.mutable_tx_control(); - if (txControl.Begin) { - txControlProto->mutable_begin_tx()->mutable_serializable_read_write(); - } else if (txControl.Continue) { - Y_ABORT_UNLESS(TxId); - txControlProto->set_tx_id(TxId); - } - if (txControl.Commit) { - CommitRequested = true; - txControlProto->set_commit_tx(true); - } - req.mutable_query()->set_yql_text(sql); - req.mutable_query_cache_policy()->set_keep_in_cache(true); - if (params) { - auto p = params->Build(); - *req.mutable_parameters() = NYdb::TProtoAccessor::GetProtoMap(p); +void TQueryBase::Handle(TEvQueryBasePrivate::TEvCommitTransactionResponse::TPtr& ev) { + LOG_D("CommitTransactionResult: " << ev->Get()->Status << ". Issues: " << ev->Get()->Issues.ToOneLineString()); + + OnFinish(ev->Get()->Status, std::move(ev->Get()->Issues)); + + // Continue finish + if (DeleteSession) { + RunDeleteSession(); + } else { + PassAway(); } - Subscribe(NRpcService::DoLocalRpc(std::move(req), Database, Nothing(), TActivationContext::ActorSystem(), true)); } -void TQueryBase::RollbackTransaction() { +void TQueryBase::RollbackTransaction() const { + using TRollbackTransactionRequest = TGrpcRequestOperationCall; + Y_ABORT_UNLESS(SessionId); Y_ABORT_UNLESS(TxId); LOG_D("Rollback transaction: " << TxId); - using TEvRollbackTransactionRequest = NGRpcService::TGrpcRequestOperationCall; - Ydb::Table::RollbackTransactionRequest req; - req.set_session_id(SessionId); - req.set_tx_id(TxId); - Subscribe(NRpcService::DoLocalRpc(std::move(req), Database, Nothing(), TActivationContext::ActorSystem(), true)); -} -void TQueryBase::CommitTransaction() { - RunningCommit = true; - Y_ABORT_UNLESS(SessionId); - Y_ABORT_UNLESS(TxId); - LOG_D("Commit transaction: " << TxId); - using TEvCommitTransactionRequest = NGRpcService::TGrpcRequestOperationCall; - Ydb::Table::CommitTransactionRequest req; - req.set_session_id(SessionId); - req.set_tx_id(TxId); - Subscribe(NRpcService::DoLocalRpc(std::move(req), Database, Nothing(), TActivationContext::ActorSystem(), true)); + Table::RollbackTransactionRequest request; + request.set_session_id(SessionId); + request.set_tx_id(TxId); + Subscribe(DoLocalRpc(std::move(request), Database, Nothing(), TActivationContext::ActorSystem(), true)); } -void TQueryBase::CallOnQueryResult() { - OnQueryResult(); +void TQueryBase::Handle(TEvQueryBasePrivate::TEvRollbackTransactionResponse::TPtr& ev) { + LOG_D("RollbackTransactionResult: " << ev->Get()->Status << ". Issues: " << ev->Get()->Issues.ToOneLineString()); + + // Continue finish + if (DeleteSession) { + RunDeleteSession(); + } else { + PassAway(); + } } void TQueryBase::SetLogInfo(const TString& operationName, const TString& traceId) { @@ -357,7 +451,7 @@ void TQueryBase::ClearTimeInfo() { NumberRequests = 0; } -TDuration TQueryBase::GetAverageTime() { +TDuration TQueryBase::GetAverageTime() const { Y_ABORT_UNLESS(NumberRequests); return AmountRequestsTime / NumberRequests; } diff --git a/ydb/library/query_actor/query_actor.h b/ydb/library/query_actor/query_actor.h index 7cb2d044829b..4f141b7f9f9d 100644 --- a/ydb/library/query_actor/query_actor.h +++ b/ydb/library/query_actor/query_actor.h @@ -1,10 +1,11 @@ #pragma once -#include -#include -#include -#include -#include -#include + +#include +#include + +#include + +#include #include #include @@ -12,35 +13,49 @@ #include #include #include -#include -#include -#include +#include + +#include +#include +#include +#include +#include +#include + namespace NKikimr { class TQueryBase : public NActors::TActorBootstrapped { + using TBase = NActors::TActorBootstrapped; + protected: struct TTxControl { + using TSelf = TTxControl; + static TTxControl CommitTx(); static TTxControl BeginTx(); static TTxControl BeginAndCommitTx(); static TTxControl ContinueTx(); static TTxControl ContinueAndCommitTx(); - bool Begin = false; - bool Commit = false; - bool Continue = false; + FLUENT_SETTING_DEFAULT(bool, Begin, false); + FLUENT_SETTING_DEFAULT(bool, Commit, false); + FLUENT_SETTING_DEFAULT(bool, Continue, false); }; using TQueryResultHandler = void (TQueryBase::*)(); + using TStreamResultHandler = void (TQueryBase::*)(NYdb::TResultSet&&); private: struct TEvQueryBasePrivate { // Event ids enum EEv : ui32 { EvDataQueryResult = EventSpaceBegin(NActors::TEvents::ES_PRIVATE), + EvStreamQueryResultPart, + EvCreateSessionResult, - EvDeleteSessionResult, + EvDeleteSessionResponse, + EvRollbackTransactionResponse, EvCommitTransactionResponse, @@ -50,47 +65,48 @@ class TQueryBase : public NActors::TActorBootstrapped { static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE)"); // Events - static NYql::TIssues IssuesFromOperation(const Ydb::Operations::Operation& operation); - struct TEvDataQueryResult : public NActors::TEventLocal { - TEvDataQueryResult(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues); - TEvDataQueryResult(const Ydb::Table::ExecuteDataQueryResponse& resp); + TEvDataQueryResult(Ydb::Table::ExecuteDataQueryResponse&& response); - Ydb::StatusIds::StatusCode Status; + const Ydb::StatusIds::StatusCode Status; NYql::TIssues Issues; Ydb::Table::ExecuteQueryResult Result; }; + struct TEvStreamQueryResultPart : public NActors::TEventLocal { + TEvStreamQueryResultPart(Ydb::Table::ExecuteScanQueryPartialResponse&& response); + + const Ydb::StatusIds::StatusCode Status; + NYql::TIssues Issues; + Ydb::ResultSet ResultSet; + }; + struct TEvCreateSessionResult : public NActors::TEventLocal { - TEvCreateSessionResult(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues); - TEvCreateSessionResult(const Ydb::Table::CreateSessionResponse& resp); + TEvCreateSessionResult(Ydb::Table::CreateSessionResponse&& response); - Ydb::StatusIds::StatusCode Status; + const Ydb::StatusIds::StatusCode Status; NYql::TIssues Issues; TString SessionId; }; - struct TEvDeleteSessionResult : public NActors::TEventLocal { - TEvDeleteSessionResult(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues); - TEvDeleteSessionResult(const Ydb::Table::DeleteSessionResponse& resp); + struct TEvDeleteSessionResponse : public NActors::TEventLocal { + TEvDeleteSessionResponse(Ydb::Table::DeleteSessionResponse&& response); - Ydb::StatusIds::StatusCode Status; + const Ydb::StatusIds::StatusCode Status; NYql::TIssues Issues; }; struct TEvRollbackTransactionResponse : public NActors::TEventLocal { - TEvRollbackTransactionResponse(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues); - TEvRollbackTransactionResponse(const Ydb::Table::RollbackTransactionResponse& resp); + TEvRollbackTransactionResponse(Ydb::Table::RollbackTransactionResponse&& response); - Ydb::StatusIds::StatusCode Status; + const Ydb::StatusIds::StatusCode Status; NYql::TIssues Issues; }; struct TEvCommitTransactionResponse : public NActors::TEventLocal { - TEvCommitTransactionResponse(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues); - TEvCommitTransactionResponse(const Ydb::Table::CommitTransactionResponse& resp); + TEvCommitTransactionResponse(Ydb::Table::CommitTransactionResponse&& response); - Ydb::StatusIds::StatusCode Status; + const Ydb::StatusIds::StatusCode Status; NYql::TIssues Issues; }; }; @@ -111,11 +127,13 @@ class TQueryBase : public NActors::TActorBootstrapped { void Finish(); void RunDataQuery(const TString& sql, NYdb::TParamsBuilder* params = nullptr, TTxControl txControl = TTxControl::BeginAndCommitTx()); + void RunStreamQuery(const TString& sql, NYdb::TParamsBuilder* params = nullptr, ui64 channelBufferSize = 60_MB); + void CancelStreamQuery(); void CommitTransaction(); void SetLogInfo(const TString& operationName, const TString& traceId); void ClearTimeInfo(); - TDuration GetAverageTime(); + TDuration GetAverageTime() const; template void SetQueryResultHandler(THandlerFunc handler, const TString& stateDescrption = "") { @@ -123,39 +141,57 @@ class TQueryBase : public NActors::TActorBootstrapped { StateDescription = stateDescrption; } + template + void SetStreamResultHandler(THandlerFunc handler) { + StreamResultHandler = static_cast(handler); + } + private: // Methods for implementing in derived classes. virtual void OnRunQuery() = 0; virtual void OnQueryResult() {} // Must either run next query or finish + virtual void OnStreamResult(NYdb::TResultSet&&) {} virtual void OnFinish(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) = 0; private: void Registered(NActors::TActorSystem* sys, const NActors::TActorId& owner) override; template - void Subscribe(NThreading::TFuture&& f) { - f.Subscribe( - [as = NActors::TActivationContext::ActorSystem(), selfId = SelfId()](const NThreading::TFuture& res) - { - as->Send(selfId, new TEvent(res.GetValue())); - } - ); + void Subscribe(NThreading::TFuture&& f) const { + f.Subscribe([callback = GetOperationCallback()](NThreading::TFuture f) { + callback(f.ExtractValue()); + }); + } + + template + std::function GetOperationCallback() const { + return [actorSystem = NActors::TActivationContext::ActorSystem(), selfId = SelfId()](TProto&& result) { + actorSystem->Send(selfId, new TEvent(std::move(result))); + }; } STFUNC(StateFunc); + + void RunCreateSession() const; void Handle(TEvQueryBasePrivate::TEvCreateSessionResult::TPtr& ev); - void Handle(TEvQueryBasePrivate::TEvDeleteSessionResult::TPtr& ev); - void Handle(TEvQueryBasePrivate::TEvDataQueryResult::TPtr& ev); + + void RunDeleteSession() const; + void Handle(TEvQueryBasePrivate::TEvDeleteSessionResponse::TPtr& ev); + + void RollbackTransaction() const; void Handle(TEvQueryBasePrivate::TEvRollbackTransactionResponse::TPtr& ev); void Handle(TEvQueryBasePrivate::TEvCommitTransactionResponse::TPtr& ev); - void RunQuery(); - void RunCreateSession(); - void RunDeleteSession(); - void RollbackTransaction(); + void Handle(TEvQueryBasePrivate::TEvDataQueryResult::TPtr& ev); + void Handle(TEvQueryBasePrivate::TEvStreamQueryResultPart::TPtr& ev); + void RunQuery(); void CallOnQueryResult(); + void ReadNextStreamPart(); + void FinishStreamRequest(); + void CallOnStreamResult(NYdb::TResultSet&& resultSet); + TString LogPrefix() const; protected: @@ -169,10 +205,7 @@ class TQueryBase : public NActors::TActorBootstrapped { bool Finished = false; bool CommitRequested = false; - TQueryResultHandler QueryResultHandler = &TQueryBase::CallOnQueryResult; - NActors::TActorId Owner; - std::vector ResultSets; TString OperationName; @@ -182,6 +215,11 @@ class TQueryBase : public NActors::TActorBootstrapped { TInstant RequestStartTime; TDuration AmountRequestsTime; ui32 NumberRequests = 0; + +private: + TQueryResultHandler QueryResultHandler = &TQueryBase::CallOnQueryResult; + TStreamResultHandler StreamResultHandler = &TQueryBase::CallOnStreamResult; + NRpcService::TStreamReadProcessorPtr StreamQueryProcessor; }; template diff --git a/ydb/library/query_actor/query_actor_ut.cpp b/ydb/library/query_actor/query_actor_ut.cpp index 25b1631ef199..bcad54634e6f 100644 --- a/ydb/library/query_actor/query_actor_ut.cpp +++ b/ydb/library/query_actor/query_actor_ut.cpp @@ -55,6 +55,8 @@ struct TTestServer { KeyColumnNames: ["Key"] )" ); + + Server->GetRuntime()->SetLogPriority(NKikimrServices::EServiceKikimr::KQP_PROXY, NActors::NLog::EPriority::PRI_DEBUG); } template @@ -191,6 +193,87 @@ Y_UNIT_TEST_SUITE(QueryActorTest) { auto result = server.RunQueryActor(); UNIT_ASSERT_VALUES_EQUAL(result.StatusCode, Ydb::StatusIds::SUCCESS); } + + Y_UNIT_TEST(StreamQuery) { + TTestServer server; + + struct TSelectStreamQuery : public TTestQueryActorBase { + TSelectStreamQuery(const TString& value, ui64 tableSize, ui64 rowsToRead) + : Value(value) + , TableSize(tableSize) + , RowsToRead(rowsToRead) + {} + + void OnRunQuery() override { + TString sql = R"( + DECLARE $value AS Text; + DECLARE $table_size AS Uint64; + + SELECT x FROM AS_TABLE( + ()->(Yql::ToStream(ListReplicate(<|x:$value|>, $table_size))) + ); + )"; + + NYdb::TParamsBuilder params; + params + .AddParam("$value") + .Utf8(Value) + .Build() + .AddParam("$table_size") + .Uint64(TableSize) + .Build(); + + RunStreamQuery(sql, ¶ms, Value.Size() * 10); + } + + void OnStreamResult(NYdb::TResultSet&& resultSet) override { + UNIT_ASSERT_C(ResultExpected, "Query was cancelled, results are not expected"); + + NYdb::TResultSetParser result(resultSet); + UNIT_ASSERT_VALUES_EQUAL_C(result.ColumnsCount(), 1, "Invalid number of columns"); + + while (result.TryNextRow()) { + const TString& row = result.ColumnParser(0).GetUtf8(); + UNIT_ASSERT_VALUES_EQUAL_C(row, Value, "Ivalid row value"); + + if (ReadedRows >= RowsToRead) { + CancelStreamQuery(); + ResultExpected = false; + return; + } + + ReadedRows++; + *FinalResult.add_rows()->mutable_text_value() = row; + } + } + + void OnQueryResult() override { + ResultSets.clear(); + ResultSets.emplace_back(std::move(FinalResult)); + Finish(); + } + + const TString Value; + const ui64 TableSize; + const ui64 RowsToRead; + + ui64 ReadedRows = 0; + bool ResultExpected = true; + Ydb::ResultSet FinalResult; + }; + + { // Read part of table + auto result = server.RunQueryActor("0123456789ABCDEF", 4000000000, 1000); + UNIT_ASSERT_VALUES_EQUAL_C(result.StatusCode, Ydb::StatusIds::SUCCESS, result.Issues.ToOneLineString()); + UNIT_ASSERT_VALUES_EQUAL(result.ResultSets[0].RowsCount(), 1000); + } + + { // Read all table + auto result = server.RunQueryActor("0123456789ABCDEF", 1000, 2000); + UNIT_ASSERT_VALUES_EQUAL_C(result.StatusCode, Ydb::StatusIds::SUCCESS, result.Issues.ToOneLineString()); + UNIT_ASSERT_VALUES_EQUAL(result.ResultSets[0].RowsCount(), 1000); + } + } } } // namespace NKikimr diff --git a/ydb/tests/tools/kqprun/src/actors.cpp b/ydb/tests/tools/kqprun/src/actors.cpp index 48f73e4cabfc..264eb6abcba6 100644 --- a/ydb/tests/tools/kqprun/src/actors.cpp +++ b/ydb/tests/tools/kqprun/src/actors.cpp @@ -67,7 +67,7 @@ class TRunScriptActorMock : public NActors::TActorBootstrappedSender, response.Release()); } - + void Handle(NKikimr::NKqp::TEvKqp::TEvQueryResponse::TPtr& ev) { Promise_.SetValue(std::move(ev)); PassAway(); diff --git a/ydb/tests/tools/kqprun/src/kqp_runner.cpp b/ydb/tests/tools/kqprun/src/kqp_runner.cpp index 14d06038aaf5..1975d86bd368 100644 --- a/ydb/tests/tools/kqprun/src/kqp_runner.cpp +++ b/ydb/tests/tools/kqprun/src/kqp_runner.cpp @@ -123,12 +123,14 @@ class TKqpRunner::TImpl { } void PrintScriptResults() const { - Cout << CoutColors_.Cyan() << "Writing script query results" << CoutColors_.Default() << Endl; - for (size_t i = 0; i < ResultSets_.size(); ++i) { - if (ResultSets_.size() > 1) { - *Options_.ResultOutput << CoutColors_.Cyan() << "Result set " << i + 1 << ":" << CoutColors_.Default() << Endl; + if (Options_.ResultOutput) { + Cout << CoutColors_.Yellow() << TInstant::Now().ToIsoStringLocal() << " Writing script query results..." << CoutColors_.Default() << Endl; + for (size_t i = 0; i < ResultSets_.size(); ++i) { + if (ResultSets_.size() > 1) { + *Options_.ResultOutput << CoutColors_.Cyan() << "Result set " << i + 1 << ":" << CoutColors_.Default() << Endl; + } + PrintScriptResult(ResultSets_[i]); } - PrintScriptResult(ResultSets_[i]); } }