Skip to content

Commit

Permalink
get rid of mkql results (#9811)
Browse files Browse the repository at this point in the history
  • Loading branch information
gridnevvvit committed Oct 7, 2024
1 parent e33cb20 commit 9bcdf87
Show file tree
Hide file tree
Showing 12 changed files with 74 additions and 28,108 deletions.
29 changes: 18 additions & 11 deletions ydb/core/client/server/msgbus_server_pq_metacache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ class TPersQueueMetaCacheActor : public TActorBootstrapped<TPersQueueMetaCacheAc
req->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_DML);
req->Record.MutableRequest()->SetKeepSession(false);
req->Record.MutableRequest()->SetDatabase(NKikimr::NPQ::GetDatabaseFromConfig(AppData(ctx)->PQConfig));
req->Record.MutableRequest()->SetUsePublicResponseDataFormat(true);

req->Record.MutableRequest()->MutableQueryCachePolicy()->set_keep_in_cache(true);
req->Record.MutableRequest()->MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write();
Expand Down Expand Up @@ -274,9 +275,14 @@ class TPersQueueMetaCacheActor : public TActorBootstrapped<TPersQueueMetaCacheAc

const auto& record = ev->Get()->Record.GetRef();

Y_ABORT_UNLESS(record.GetResponse().GetResults().size() == 1);
const auto& rr = record.GetResponse().GetResults(0).GetValue().GetStruct(0);
ui64 newVersion = rr.ListSize() == 0 ? 0 : rr.GetList(0).GetStruct(0).GetOptional().GetInt64();
Y_VERIFY(record.GetResponse().YdbResultsSize() == 1);
NYdb::TResultSetParser parser(record.GetResponse().GetYdbResults(0));

ui64 newVersion = 0;
if (parser.RowsCount() != 0) {
parser.TryNextRow();
newVersion = *parser.ColumnParser(0).GetOptionalInt64();
}

LastVersionUpdate = ctx.Now();
if (newVersion > CurrentTopicsVersion || CurrentTopicsVersion == 0 || SkipVersionCheck) {
Expand All @@ -293,17 +299,18 @@ class TPersQueueMetaCacheActor : public TActorBootstrapped<TPersQueueMetaCacheAc

const auto& record = ev->Get()->Record.GetRef();

Y_ABORT_UNLESS(record.GetResponse().GetResults().size() == 1);
Y_VERIFY(record.GetResponse().YdbResultsSize() == 1);
TString path, dc;
const auto& rr = record.GetResponse().GetResults(0).GetValue().GetStruct(0);
for (const auto& row : rr.GetList()) {

path = row.GetStruct(0).GetOptional().GetText();
dc = row.GetStruct(1).GetOptional().GetText();
NYdb::TResultSetParser parser(record.GetResponse().GetYdbResults(0));
const ui32 rowCount = parser.RowsCount();
while (parser.TryNextRow()) {
path = *parser.ColumnParser(0).GetOptionalUtf8();
dc = *parser.ColumnParser(1).GetOptionalUtf8();

NewTopics.emplace_back(decltype(NewTopics)::value_type{path, dc});
}
if (rr.ListSize() > 0) {

if (rowCount > 0) {
LastTopicKey = {path, dc};
return RunQuery(EQueryType::EGetTopics, ctx);
} else {
Expand Down Expand Up @@ -710,7 +717,7 @@ class TPersQueueMetaCacheActor : public TActorBootstrapped<TPersQueueMetaCacheAc
void ProcessNodesInfoWaitersQueue(bool status, const TActorContext& ctx) {
if (DynamicNodesMapping == nullptr) {
Y_ABORT_UNLESS(!status);
DynamicNodesMapping.reset(new THashMap<ui32, ui32>);
DynamicNodesMapping.reset(new THashMap<ui32, ui32>);
}
while(!NodesMappingWaiters.empty()) {
ctx.Send(NodesMappingWaiters.front(),
Expand Down
58 changes: 34 additions & 24 deletions ydb/core/persqueue/cluster_tracker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
#include <tuple>
#include <vector>

#include <ydb/public/sdk/cpp/client/ydb_result/result.h>

namespace NKikimr::NPQ::NClusterTracker {

inline auto& Ctx() {
Expand Down Expand Up @@ -132,6 +134,7 @@ class TClusterTracker: public TActorBootstrapped<TClusterTracker> {
req->Record.MutableRequest()->SetKeepSession(false);
req->Record.MutableRequest()->SetQuery(MakeListClustersQuery());
req->Record.MutableRequest()->SetDatabase(GetDatabase());
req->Record.MutableRequest()->SetUsePublicResponseDataFormat(true);
// useless without explicit session
// req->Record.MutableRequest()->MutableQueryCachePolicy()->set_keep_in_cache(true);
req->Record.MutableRequest()->MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write();
Expand All @@ -144,45 +147,52 @@ class TClusterTracker: public TActorBootstrapped<TClusterTracker> {
LOG_DEBUG_S(Ctx(), NKikimrServices::PERSQUEUE_CLUSTER_TRACKER, "HandleWhileWorking TEvQueryResponse");

const auto& record = ev->Get()->Record.GetRef();
if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS && record.GetResponse().GetResults(0).GetValue().GetStruct(0).ListSize()) {
LOG_DEBUG_S(Ctx(), NKikimrServices::PERSQUEUE_CLUSTER_TRACKER, "HandleWhileWorking TEvQueryResponse UpdateClustersList");
UpdateClustersList(record);
if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS) {
NYdb::TResultSetParser parser(record.GetResponse().GetYdbResults(0));
if (parser.RowsCount()) {
LOG_DEBUG_S(Ctx(), NKikimrServices::PERSQUEUE_CLUSTER_TRACKER, "HandleWhileWorking TEvQueryResponse UpdateClustersList");
UpdateClustersList(parser);

Y_ABORT_UNLESS(ClustersList);
Y_ABORT_UNLESS(ClustersList->Clusters.size());
Y_ABORT_UNLESS(ClustersListUpdateTimestamp && *ClustersListUpdateTimestamp);
Y_ABORT_UNLESS(ClustersList);
Y_ABORT_UNLESS(ClustersList->Clusters.size());
Y_ABORT_UNLESS(ClustersListUpdateTimestamp && *ClustersListUpdateTimestamp);

BroadcastClustersUpdate();
BroadcastClustersUpdate();

Schedule(TDuration::Seconds(Cfg().GetClustersUpdateTimeoutSec()), new TEvents::TEvWakeup);
} else {
LOG_ERROR_S(Ctx(), NKikimrServices::PERSQUEUE_CLUSTER_TRACKER, "failed to list clusters: " << record);
Schedule(TDuration::Seconds(Cfg().GetClustersUpdateTimeoutSec()), new TEvents::TEvWakeup);
return;
}
}

ClustersList = nullptr;
LOG_ERROR_S(Ctx(), NKikimrServices::PERSQUEUE_CLUSTER_TRACKER, "failed to list clusters: " << record);

Schedule(TDuration::Seconds(Cfg().GetClustersUpdateTimeoutOnErrorSec()), new TEvents::TEvWakeup);
}
ClustersList = nullptr;
Schedule(TDuration::Seconds(Cfg().GetClustersUpdateTimeoutOnErrorSec()), new TEvents::TEvWakeup);
}

template<typename TProtoRecord>
void UpdateClustersList(const TProtoRecord& record) {
void UpdateClustersList(TProtoRecord& parser) {
auto clustersList = MakeIntrusive<TClustersList>();
auto& t = record.GetResponse().GetResults(0).GetValue().GetStruct(0);
clustersList->Clusters.resize(t.ListSize());
clustersList->Clusters.resize(parser.RowsCount());

for (size_t i = 0; i < t.ListSize(); ++i) {
bool firstRow = parser.TryNextRow();
YQL_ENSURE(firstRow);
clustersList->Version = *parser.ColumnParser(5).GetOptionalInt64();
size_t i = 0;

do {
auto& cluster = clustersList->Clusters[i];

cluster.Name = t.GetList(i).GetStruct(0).GetOptional().GetText();
cluster.Name = *parser.ColumnParser(0).GetOptionalUtf8();
cluster.Datacenter = cluster.Name;
cluster.Balancer = t.GetList(i).GetStruct(1).GetOptional().GetText();
cluster.Balancer = *parser.ColumnParser(1).GetOptionalUtf8();

cluster.IsLocal = t.GetList(i).GetStruct(2).GetOptional().GetBool();
cluster.IsEnabled = t.GetList(i).GetStruct(3).GetOptional().GetBool();
cluster.Weight = t.GetList(i).GetStruct(4).GetOptional().GetUint64();
}
cluster.IsLocal = *parser.ColumnParser(2).GetOptionalBool();
cluster.IsEnabled = *parser.ColumnParser(3).GetOptionalBool();
cluster.Weight = *parser.ColumnParser(4).GetOptionalUint64();

clustersList->Version = t.GetList(0).GetStruct(5).GetOptional().GetInt64();
++i;
} while (parser.TryNextRow());

ClustersList = std::move(clustersList);
ClustersListUpdateTimestamp = Ctx().Now();
Expand Down
26 changes: 15 additions & 11 deletions ydb/core/persqueue/writer/partition_chooser_impl__table_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h>
#include <ydb/services/metadata/service.h>

#include <ydb/public/sdk/cpp/client/ydb_result/result.h>


namespace NKikimr::NPQ::NPartitionChooser {

Expand Down Expand Up @@ -137,6 +139,7 @@ class TTableHelper {
ev->Record.MutableRequest()->SetSessionId(KqpSessionId);
ev->Record.MutableRequest()->MutableTxControl()->set_commit_tx(false);
ev->Record.MutableRequest()->MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write();
ev->Record.MutableRequest()->SetUsePublicResponseDataFormat(true);
// keep compiled query in cache.
ev->Record.MutableRequest()->MutableQueryCachePolicy()->set_keep_in_cache(true);

Expand Down Expand Up @@ -166,21 +169,20 @@ class TTableHelper {
return false;
}

auto& t = record.GetResponse().GetResults(0).GetValue().GetStruct(0);

NYdb::TResultSetParser parser(record.GetResponse().GetYdbResults(0));
TxId = record.GetResponse().GetTxMeta().id();
Y_ABORT_UNLESS(!TxId.empty());

if (t.ListSize() != 0) {
auto& list = t.GetList(0);
auto& tt = list.GetStruct(0);
if (tt.HasOptional() && tt.GetOptional().HasUint32()) { //already got partition
auto accessTime = list.GetStruct(2).GetOptional().GetUint64();
while(parser.TryNextRow()) {
auto tt = parser.ColumnParser(0).GetOptionalUint32();

if (tt.Defined()) { //already got partition
auto accessTime = parser.ColumnParser(2).GetOptionalUint64().GetOrElse(0);
if (accessTime > AccessTime) { // AccessTime
PartitionId_ = tt.GetOptional().GetUint32();
CreateTime = list.GetStruct(1).GetOptional().GetUint64();
PartitionId_ = *tt;
CreateTime = parser.ColumnParser(1).GetOptionalUint64().GetOrElse(0);
AccessTime = accessTime;
SeqNo_ = list.GetStruct(3).GetOptional().GetUint64();
SeqNo_ = parser.ColumnParser(3).GetOptionalUint64().GetOrElse(0);
}
}
}
Expand All @@ -206,6 +208,8 @@ class TTableHelper {
ev->Record.MutableRequest()->SetDatabase(GetDatabaseName(ctx));
// fill tx settings: set commit tx flag& begin new serializable tx.
ev->Record.MutableRequest()->MutableTxControl()->set_commit_tx(true);
ev->Record.MutableRequest()->SetUsePublicResponseDataFormat(true);

if (KqpSessionId) {
ev->Record.MutableRequest()->SetSessionId(KqpSessionId);
}
Expand Down Expand Up @@ -254,7 +258,7 @@ class TTableHelper {
const TString TopicHashName;

NPQ::NSourceIdEncoding::TEncodedSourceId EncodedSourceId;

NPQ::ESourceIdTableGeneration TableGeneration;
TString SelectQuery;
TString UpdateQuery;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <ydb/core/base/appdata.h>
#include <ydb/core/persqueue/pq_database.h>
#include <ydb/library/mkql_proto/protos/minikql.pb.h>
#include <ydb/public/sdk/cpp/client/ydb_result/result.h>

namespace NKikimr {
namespace NGRpcProxy {
Expand Down Expand Up @@ -45,15 +46,16 @@ void TClustersUpdater::Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr &ev, const TA
auto& record = ev->Get()->Record.GetRef();

if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS) {
auto& t = record.GetResponse().GetResults(0).GetValue().GetStruct(0);
bool local = false;
TVector<TString> clusters;
for (size_t i = 0; i < t.ListSize(); ++i) {
TString dc = t.GetList(i).GetStruct(0).GetOptional().GetText();
local = t.GetList(i).GetStruct(1).GetOptional().GetBool();
NYdb::TResultSetParser parser(record.GetResponse().GetYdbResults(0));

while(parser.TryNextRow()) {
TString dc = *parser.ColumnParser(0).GetOptionalUtf8();
local = *parser.ColumnParser(1).GetOptionalBool();
clusters.push_back(dc);
if (local) {
bool enabled = t.GetList(i).GetStruct(2).GetOptional().GetBool();
bool enabled = *parser.ColumnParser(2).GetOptionalBool();
Y_ABORT_UNLESS(LocalCluster.empty() || LocalCluster == dc);
bool changed = LocalCluster != dc || Enabled != enabled;
if (changed) {
Expand Down
Loading

0 comments on commit 9bcdf87

Please sign in to comment.