Skip to content

Commit

Permalink
Merge 0737dae into f383e34
Browse files Browse the repository at this point in the history
  • Loading branch information
StekPerepolnen authored May 8, 2024
2 parents f383e34 + 0737dae commit 91b3d33
Show file tree
Hide file tree
Showing 2 changed files with 156 additions and 54 deletions.
168 changes: 134 additions & 34 deletions ydb/core/viewer/json_autocomplete.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ namespace NKikimr {
namespace NViewer {

using namespace NActors;
using NSchemeShard::TEvSchemeShard;
using TNavigate = NSchemeCache::TSchemeCacheNavigate;

class TJsonAutocomplete : public TViewerPipeClient<TJsonAutocomplete> {
Expand All @@ -23,8 +24,9 @@ class TJsonAutocomplete : public TViewerPipeClient<TJsonAutocomplete> {
TJsonSettings JsonSettings;
ui32 Timeout = 0;

TAutoPtr<TEvViewer::TEvViewerResponse> ProxyResult;
TAutoPtr<TEvViewer::TEvViewerResponse> ViewerProxyResult;
TAutoPtr<NConsole::TEvConsole::TEvListTenantsResponse> ConsoleResult;
TVector<TAutoPtr<TEvSchemeShard::TEvDescribeSchemeResult>> TxProxyResult;
TAutoPtr<TEvTxProxySchemeCache::TEvNavigateKeySetResult> CacheResult;

struct TSchemaWordData {
Expand Down Expand Up @@ -151,24 +153,10 @@ class TJsonAutocomplete : public TViewerPipeClient<TJsonAutocomplete> {
return NViewer::IsPostContent(Event);
}

TAutoPtr<NSchemeCache::TSchemeCacheNavigate> MakeSchemeCacheRequest() {
TAutoPtr<NSchemeCache::TSchemeCacheNavigate> request(new NSchemeCache::TSchemeCacheNavigate());

for (TString& path: Paths) {
NSchemeCache::TSchemeCacheNavigate::TEntry entry;
entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpList;
entry.SyncVersion = false;
entry.Path = SplitPath(path);
request->ResultSet.emplace_back(entry);
}

return request;
}

void Bootstrap() {
if (ViewerRequest) {
// handle proxied request
SendSchemeCacheRequest();
SendNavigateRequest();
} else if (!Database) {
// autocomplete database list via console request
RequestConsoleListTenants();
Expand All @@ -179,7 +167,7 @@ class TJsonAutocomplete : public TViewerPipeClient<TJsonAutocomplete> {
}
if (Requests == 0) {
// perform autocomplete without proxying
SendSchemeCacheRequest();
SendNavigateRequest();
}
}

Expand All @@ -191,15 +179,15 @@ class TJsonAutocomplete : public TViewerPipeClient<TJsonAutocomplete> {
void Undelivered(TEvents::TEvUndelivered::TPtr &ev) {
if (!Direct && ev->Get()->SourceType == NViewer::TEvViewer::EvViewerRequest) {
Direct = true;
SendSchemeCacheRequest(); // fallback
SendNavigateRequest(); // fallback
RequestDone();
}
}

void Disconnected(TEvInterconnect::TEvNodeDisconnected::TPtr &) {
if (!Direct) {
Direct = true;
SendSchemeCacheRequest(); // fallback
SendNavigateRequest(); // fallback
RequestDone();
}
}
Expand All @@ -212,15 +200,39 @@ class TJsonAutocomplete : public TViewerPipeClient<TJsonAutocomplete> {
}
}
if (TenantDynamicNodes.empty()) {
SendSchemeCacheRequest();
SendNavigateRequest();
} else {
SendDynamicNodeAutocompleteRequest();
}
RequestDone();
}

void SendSchemeCacheRequest() {
SendRequest(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(MakeSchemeCacheRequest()));
void SendNavigateRequest() {
for (TString& path: Paths) {
THolder<TEvTxUserProxy::TEvNavigate> request = MakeHolder<TEvTxUserProxy::TEvNavigate>();
auto record = request->Record.MutableDescribePath();
record->SetPath(path);
record->MutableOptions()->SetBackupInfo(true);
record->MutableOptions()->SetShowPrivateTable(true);
record->MutableOptions()->SetReturnChildren(true);
record->MutableOptions()->SetReturnBoundaries(false);
record->MutableOptions()->SetReturnPartitionConfig(true);
record->MutableOptions()->SetReturnPartitionStats(false);
record->MutableOptions()->SetReturnPartitioningInfo(true);
request->Record.SetUserToken(Event->Get()->UserToken);
SendRequest(MakeTxProxyID(), request.Release());
}

TAutoPtr<NSchemeCache::TSchemeCacheNavigate> request(new NSchemeCache::TSchemeCacheNavigate());
for (TString& path: Paths) {
NSchemeCache::TSchemeCacheNavigate::TEntry entry;
entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpList;
entry.SyncVersion = false;
entry.Path = SplitPath(path);
request->ResultSet.emplace_back(entry);
}

SendRequest(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(request));
}

void SendDynamicNodeAutocompleteRequest() {
Expand Down Expand Up @@ -259,6 +271,7 @@ class TJsonAutocomplete : public TViewerPipeClient<TJsonAutocomplete> {
switch (ev->GetTypeRewrite()) {
hFunc(TEvStateStorage::TEvBoardInfo, Handle);
hFunc(NConsole::TEvConsole::TEvListTenantsResponse, Handle);
hFunc(TEvSchemeShard::TEvDescribeSchemeResult, Handle);
hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle);
hFunc(TEvents::TEvUndelivered, Undelivered);
hFunc(TEvInterconnect::TEvNodeConnected, Connected);
Expand All @@ -268,13 +281,13 @@ class TJsonAutocomplete : public TViewerPipeClient<TJsonAutocomplete> {
}
}

void ParseProxyResult() {
if (ProxyResult == nullptr) {
Result.add_error("Failed to collect information from ProxyResult");
void ParseViewerProxyResult() {
if (ViewerProxyResult == nullptr) {
Result.add_error("Failed to collect information from ViewerProxyResult");
return;
}
if (ProxyResult->Record.HasAutocompleteResponse()) {
Result = ProxyResult->Record.GetAutocompleteResponse();
if (ViewerProxyResult->Record.HasAutocompleteResponse()) {
Result = ViewerProxyResult->Record.GetAutocompleteResponse();
} else {
Result.add_error("Proxying return empty response");
}
Expand All @@ -294,8 +307,8 @@ class TJsonAutocomplete : public TViewerPipeClient<TJsonAutocomplete> {
}
}

NKikimrViewer::EAutocompleteType ConvertType(TNavigate::EKind navigate) {
switch (navigate) {
NKikimrViewer::EAutocompleteType ConvertType(TNavigate::EKind type) {
switch (type) {
case TNavigate::KindSubdomain:
return NKikimrViewer::sub_domain;
case TNavigate::KindPath:
Expand Down Expand Up @@ -339,6 +352,53 @@ class TJsonAutocomplete : public TViewerPipeClient<TJsonAutocomplete> {
}
}

NKikimrViewer::EAutocompleteType ConvertType(NKikimrSchemeOp::EPathType type) {
switch (type) {
case NKikimrSchemeOp::EPathType::EPathTypeInvalid:
return NKikimrViewer::unknown;
case NKikimrSchemeOp::EPathType::EPathTypeDir:
return NKikimrViewer::dir;
case NKikimrSchemeOp::EPathType::EPathTypeTable:
return NKikimrViewer::table;
case NKikimrSchemeOp::EPathType::EPathTypePersQueueGroup:
return NKikimrViewer::pers_queue_group;
case NKikimrSchemeOp::EPathType::EPathTypeSubDomain:
return NKikimrViewer::sub_domain;
case NKikimrSchemeOp::EPathType::EPathTypeRtmrVolume:
return NKikimrViewer::rtmr_volume;
case NKikimrSchemeOp::EPathType::EPathTypeBlockStoreVolume:
return NKikimrViewer::block_store_volume;
case NKikimrSchemeOp::EPathType::EPathTypeKesus:
return NKikimrViewer::kesus;
case NKikimrSchemeOp::EPathType::EPathTypeSolomonVolume :
return NKikimrViewer::solomon_volume;
case NKikimrSchemeOp::EPathType::EPathTypeTableIndex:
return NKikimrViewer::index;
case NKikimrSchemeOp::EPathType::EPathTypeExtSubDomain:
return NKikimrViewer::ext_sub_domain;
case NKikimrSchemeOp::EPathType::EPathTypeFileStore:
return NKikimrViewer::file_store;
case NKikimrSchemeOp::EPathType::EPathTypeColumnStore:
return NKikimrViewer::column_store;
case NKikimrSchemeOp::EPathType::EPathTypeColumnTable:
return NKikimrViewer::column_table;
case NKikimrSchemeOp::EPathType::EPathTypeCdcStream:
return NKikimrViewer::cdc_stream;
case NKikimrSchemeOp::EPathType::EPathTypeSequence:
return NKikimrViewer::sequence;
case NKikimrSchemeOp::EPathType::EPathTypeReplication:
return NKikimrViewer::replication;
case NKikimrSchemeOp::EPathType::EPathTypeBlobDepot:
return NKikimrViewer::blob_depot;
case NKikimrSchemeOp::EPathType::EPathTypeExternalTable:
return NKikimrViewer::external_table;
case NKikimrSchemeOp::EPathType::EPathTypeExternalDataSource:
return NKikimrViewer::external_data_source;
case NKikimrSchemeOp::EPathType::EPathTypeView:
return NKikimrViewer::view;
}
}

void ParseCacheResult() {
if (CacheResult == nullptr) {
Result.add_error("Failed to collect information from CacheResult");
Expand Down Expand Up @@ -372,6 +432,44 @@ class TJsonAutocomplete : public TViewerPipeClient<TJsonAutocomplete> {
}
}

bool TryParseTxProxyResult() {
if (TxProxyResult.size() != Paths.size()) {
return false;
}
for (auto& result: TxProxyResult) {
if (result->GetRecord().GetStatus() != NKikimrScheme::EStatus::StatusSuccess) {
return false;
}
}
for (auto& result: TxProxyResult) {
auto& record = result->GetRecord();
auto& entry = record.GetPathDescription();

for (auto& child: entry.GetChildren()) {
Dictionary[child.GetName()] = TSchemaWordData(child.GetName(), ConvertType(child.GetPathType()), record.GetPath());
}
if (entry.HasTable()) {
auto& description = entry.GetTable();
for (const auto& column : description.GetColumns()) {
Dictionary[column.GetName()] = TSchemaWordData(column.GetName(), NKikimrViewer::column, record.GetPath());
}
for (const auto& index : description.GetTableIndexes()) {
Dictionary[index.GetName()] = TSchemaWordData(index.GetName(), NKikimrViewer::index, record.GetPath());
}
for (const auto& cdcStream : description.GetCdcStreams()) {
Dictionary[cdcStream.GetName()] = TSchemaWordData(cdcStream.GetName(), NKikimrViewer::cdc_stream, record.GetPath());
}
}
}

return true;
}

void Handle(TEvSchemeShard::TEvDescribeSchemeResult::TPtr& ev) {
TxProxyResult.emplace_back(ev->Release());
RequestDone();
}

void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr &ev) {
CacheResult = ev->Release();
RequestDone();
Expand All @@ -395,15 +493,17 @@ class TJsonAutocomplete : public TViewerPipeClient<TJsonAutocomplete> {
}

void ReplyAndPassAway() {
if (ProxyResult) {
ParseProxyResult();
if (ViewerProxyResult) {
ParseViewerProxyResult();
} else if (Database) {
ParseCacheResult();
if (!TryParseTxProxyResult()) {
ParseCacheResult();
}
} else {
ParseConsoleResult();
}

if (!ProxyResult) {
if (!ViewerProxyResult) {
Result.set_success(Result.error_size() == 0);
if (Result.error_size() == 0) {
auto fuzzy = FuzzySearcher<TSchemaWordData>(Dictionary);
Expand All @@ -425,7 +525,7 @@ class TJsonAutocomplete : public TViewerPipeClient<TJsonAutocomplete> {
}

void Handle(TEvViewer::TEvViewerResponse::TPtr& ev) {
ProxyResult = ev.Release()->Release();
ViewerProxyResult = ev.Release()->Release();
RequestDone();
}

Expand Down
42 changes: 22 additions & 20 deletions ydb/core/viewer/viewer_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1158,26 +1158,28 @@ Y_UNIT_TEST_SUITE(Viewer) {
(*x)->Get()->Record.MutableResponse()->mutable_operation()->mutable_result()->PackFrom(listTenantsResult);
break;
}
case TEvTxProxySchemeCache::EvNavigateKeySetResult: {
auto *x = reinterpret_cast<TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr*>(&ev);
(*x)->Get()->Request->ErrorCount = 0;
for (auto& entry: (*x)->Get()->Request->ResultSet) {
if (entry.Path.size() <= 2) {
const TPathId pathId(1, 1);
auto listNodeEntry = MakeIntrusive<TNavigate::TListNodeEntry>();
listNodeEntry->Children.reserve(3);
listNodeEntry->Children.emplace_back("orders", pathId, TNavigate::KindTable);
listNodeEntry->Children.emplace_back("clients", pathId, TNavigate::KindTable);
listNodeEntry->Children.emplace_back("products", pathId, TNavigate::KindTable);
entry.ListNodeEntry = listNodeEntry;
entry.Kind = TSchemeCacheNavigate::EKind::KindExtSubdomain;
} else {
entry.Columns[1].Name = "id";
entry.Columns[2].Name = "name";
entry.Columns[3].Name = "description";
entry.Kind = TSchemeCacheNavigate::EKind::KindTable;
}
entry.Status = TSchemeCacheNavigate::EStatus::Ok;
case TEvSchemeShard::EvDescribeSchemeResult: {
auto *x = reinterpret_cast<TEvSchemeShard::TEvDescribeSchemeResult::TPtr*>(&ev);
auto record = (*x)->Get()->MutableRecord();
record->SetStatus(NKikimrScheme::EStatus::StatusSuccess);
auto pathDescription = record->MutablePathDescription();
if (tables.size() == 0) {
auto child = pathDescription->AddChildren();
child->SetName("orders");
child->SetPathType(NKikimrSchemeOp::EPathType::EPathTypeTable);
child = pathDescription->AddChildren();
child->SetName("clients");
child->SetPathType(NKikimrSchemeOp::EPathType::EPathTypeTable);
child = pathDescription->AddChildren();
child->SetName("products");
child->SetPathType(NKikimrSchemeOp::EPathType::EPathTypeTable);
} else {
auto column = pathDescription->MutableTable()->AddColumns();
column->SetName("id");
column = pathDescription->MutableTable()->AddColumns();
column->SetName("name");
column = pathDescription->MutableTable()->AddColumns();
column->SetName("description");
}
break;
}
Expand Down

0 comments on commit 91b3d33

Please sign in to comment.