Skip to content

Commit

Permalink
24-3: Allow streams on index tables, replicate index tables (#7150)
Browse files Browse the repository at this point in the history
  • Loading branch information
CyberROFL authored Jul 29, 2024
1 parent c1bcd71 commit 312992d
Show file tree
Hide file tree
Showing 53 changed files with 1,313 additions and 593 deletions.
8 changes: 8 additions & 0 deletions ydb/core/base/path.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,12 @@ inline TVector<TString> ChildPath(const TVector<TString>& parentPath, const TStr
return path;
}

inline TVector<TString> ChildPath(const TVector<TString>& parentPath, const TVector<TString>& childPath) {
auto path = parentPath;
for (const auto& childName : childPath) {
path.push_back(childName);
}
return path;
}

}
77 changes: 69 additions & 8 deletions ydb/core/grpc_services/rpc_alter_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,12 @@ class TAlterTableRPC : public TRpcSchemeRequestActor<TAlterTableRPC, TEvAlterTab
break;

case EOp::Attribute:
PrepareAlterUserAttrubutes();
case EOp::AddChangefeed:
case EOp::DropChangefeed:
GetProxyServices();
break;

case EOp::AddChangefeed:
case EOp::DropIndex:
case EOp::DropChangefeed:
case EOp::RenameIndex:
AlterTable(ctx);
break;
Expand Down Expand Up @@ -197,7 +197,7 @@ class TAlterTableRPC : public TRpcSchemeRequestActor<TAlterTableRPC, TEvAlterTab
Navigate(msg->Services.SchemeCache, ctx);
}

void PrepareAlterUserAttrubutes() {
void GetProxyServices() {
using namespace NTxProxy;
Send(MakeTxProxyID(), new TEvTxUserProxy::TEvGetProxyServicesRequest);
}
Expand All @@ -222,13 +222,38 @@ class TAlterTableRPC : public TRpcSchemeRequestActor<TAlterTableRPC, TEvAlterTab
auto ev = CreateNavigateForPath(DatabaseName);
{
auto& entry = static_cast<TEvTxProxySchemeCache::TEvNavigateKeySet*>(ev)->Request->ResultSet.emplace_back();
entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpTable;
entry.Path = paths;
}

Send(schemeCache, ev);
}

void Navigate(const TTableId& pathId) {
DatabaseName = Request_->GetDatabaseName()
.GetOrElse(DatabaseFromDomain(AppData()));

auto ev = CreateNavigateForPath(DatabaseName);
{
auto& entry = static_cast<TEvTxProxySchemeCache::TEvNavigateKeySet*>(ev)->Request->ResultSet.emplace_back();
entry.RequestType = NSchemeCache::TSchemeCacheNavigate::TEntry::ERequestType::ByTableId;
entry.TableId = pathId;
entry.ShowPrivatePath = true;
entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpList;
}

Send(MakeSchemeCacheID(), ev);
}

static bool IsChangefeedOperation(EOp type) {
switch (type) {
case EOp::AddChangefeed:
case EOp::DropChangefeed:
return true;
default:
return false;
}
}

void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx) {
TXLOG_D("Handle TEvTxProxySchemeCache::TEvNavigateKeySetResult"
<< ", errors# " << ev->Get()->Request.Get()->ErrorCount);
Expand All @@ -251,13 +276,48 @@ class TAlterTableRPC : public TRpcSchemeRequestActor<TAlterTableRPC, TEvAlterTab
return Reply(Ydb::StatusIds::SCHEME_ERROR, ctx);
}

Y_ABORT_UNLESS(!resp->ResultSet.empty());
const auto& entry = resp->ResultSet.back();

switch (entry.Kind) {
case NSchemeCache::TSchemeCacheNavigate::KindTable:
case NSchemeCache::TSchemeCacheNavigate::KindColumnTable:
case NSchemeCache::TSchemeCacheNavigate::KindExternalTable:
case NSchemeCache::TSchemeCacheNavigate::KindExternalDataSource:
case NSchemeCache::TSchemeCacheNavigate::KindView:
break; // table
case NSchemeCache::TSchemeCacheNavigate::KindIndex:
if (IsChangefeedOperation(OpType)) {
break;
}
[[fallthrough]];
default:
Request_->RaiseIssue(MakeIssue(NKikimrIssues::TIssuesIds::GENERIC_RESOLVE_ERROR, TStringBuilder()
<< "Unable to nagivate: " << JoinPath(entry.Path) << " status: PathNotTable"));
return Reply(Ydb::StatusIds::SCHEME_ERROR, ctx);
}

switch (OpType) {
case EOp::AddIndex:
return AlterTableAddIndexOp(resp, ctx);
case EOp::Attribute:
Y_ABORT_UNLESS(!resp->ResultSet.empty());
ResolvedPathId = resp->ResultSet.back().TableId.PathId;
return AlterTable(ctx);
case EOp::AddChangefeed:
case EOp::DropChangefeed:
if (entry.Kind != NSchemeCache::TSchemeCacheNavigate::KindIndex) {
AlterTable(ctx);
} else if (auto list = entry.ListNodeEntry) {
if (list->Children.size() != 1) {
return Reply(Ydb::StatusIds::SCHEME_ERROR, ctx);
}

const auto& child = list->Children.at(0);
AlterTable(ctx, CanonizePath(ChildPath(NKikimr::SplitPath(GetProtoRequest()->path()), child.Name)));
} else {
Navigate(entry.TableId);
}
break;
default:
TXLOG_E("Got unexpected cache response");
return Reply(Ydb::StatusIds::INTERNAL_ERROR, ctx);
Expand Down Expand Up @@ -351,13 +411,14 @@ class TAlterTableRPC : public TRpcSchemeRequestActor<TAlterTableRPC, TEvAlterTab
Die(ctx);
}

void AlterTable(const TActorContext &ctx) {
void AlterTable(const TActorContext &ctx, const TMaybe<TString>& overridePath = {}) {
const auto req = GetProtoRequest();
std::unique_ptr<TEvTxUserProxy::TEvProposeTransaction> proposeRequest = CreateProposeTransaction();
auto modifyScheme = proposeRequest->Record.MutableTransaction()->MutableModifyScheme();
modifyScheme->SetAllowAccessToPrivatePaths(overridePath.Defined());
Ydb::StatusIds::StatusCode code;
TString error;
if (!BuildAlterTableModifyScheme(req, modifyScheme, Profiles, ResolvedPathId, code, error)) {
if (!BuildAlterTableModifyScheme(overridePath.GetOrElse(req->path()), req, modifyScheme, Profiles, ResolvedPathId, code, error)) {
NYql::TIssues issues;
issues.AddIssue(NYql::TIssue(error));
return Reply(code, issues, ctx);
Expand Down
83 changes: 70 additions & 13 deletions ydb/core/grpc_services/rpc_describe_table.cpp
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
#include "service_table.h"
#include <ydb/core/grpc_services/base/base.h>

#include "rpc_calls.h"
#include "rpc_scheme_base.h"

#include "service_table.h"
#include "rpc_common/rpc_common.h"

#include <ydb/core/base/path.h>
#include <ydb/core/base/table_index.h>
#include <ydb/core/grpc_services/base/base.h>
#include <ydb/core/grpc_services/rpc_common/rpc_common.h>
#include <ydb/core/tx/schemeshard/schemeshard.h>
#include <ydb/core/ydb_convert/table_description.h>
#include <ydb/core/ydb_convert/ydb_convert.h>
Expand All @@ -22,25 +22,84 @@ using TEvDescribeTableRequest = TGrpcRequestOperationCall<Ydb::Table::DescribeTa
class TDescribeTableRPC : public TRpcSchemeRequestActor<TDescribeTableRPC, TEvDescribeTableRequest> {
using TBase = TRpcSchemeRequestActor<TDescribeTableRPC, TEvDescribeTableRequest>;

TString OverrideName;

static bool ShowPrivatePath(const TString& path) {
if (AppData()->AllowPrivateTableDescribeForTest) {
return true;
}

if (path.EndsWith("/indexImplTable")) {
return true;
}

return false;
}

public:
TDescribeTableRPC(IRequestOpCtx* msg)
: TBase(msg) {}

void Bootstrap(const TActorContext &ctx) {
TBase::Bootstrap(ctx);

SendProposeRequest(ctx);
const auto& path = GetProtoRequest()->path();
const auto paths = NKikimr::SplitPath(path);
if (paths.empty()) {
Request_->RaiseIssue(NYql::TIssue("Invalid path"));
return Reply(Ydb::StatusIds::BAD_REQUEST, ctx);
}

auto navigate = MakeHolder<NSchemeCache::TSchemeCacheNavigate>();
navigate->DatabaseName = CanonizePath(Request_->GetDatabaseName().GetOrElse(""));
auto& entry = navigate->ResultSet.emplace_back();
entry.Path = paths;
entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpList;
entry.SyncVersion = true;
entry.ShowPrivatePath = ShowPrivatePath(path);

Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(navigate));
Become(&TDescribeTableRPC::StateWork);
}

private:
void StateWork(TAutoPtr<IEventHandle>& ev) {
switch (ev->GetTypeRewrite()) {
HFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle);
HFunc(NSchemeShard::TEvSchemeShard::TEvDescribeSchemeResult, Handle);
default: TBase::StateWork(ev);
}
}

void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx) {
auto* navigate = ev->Get()->Request.Get();

Y_ABORT_UNLESS(navigate->ResultSet.size() == 1);
const auto& entry = navigate->ResultSet.front();

if (navigate->ErrorCount > 0) {
switch (entry.Status) {
case NSchemeCache::TSchemeCacheNavigate::EStatus::PathErrorUnknown:
case NSchemeCache::TSchemeCacheNavigate::EStatus::RootUnknown:
return Reply(Ydb::StatusIds::SCHEME_ERROR, ctx);
default:
return Reply(Ydb::StatusIds::UNAVAILABLE, ctx);
}
}

if (entry.Kind == NSchemeCache::TSchemeCacheNavigate::KindIndex) {
auto list = entry.ListNodeEntry;
if (!list || list->Children.size() != 1) {
return Reply(Ydb::StatusIds::SCHEME_ERROR, ctx);
}

OverrideName = entry.Path.back();
SendProposeRequest(CanonizePath(ChildPath(entry.Path, list->Children.at(0).Name)), ctx);
} else {
SendProposeRequest(GetProtoRequest()->path(), ctx);
}
}

void Handle(NSchemeShard::TEvSchemeShard::TEvDescribeSchemeResult::TPtr& ev, const TActorContext& ctx) {
const auto& record = ev->Get()->GetRecord();
const auto status = record.GetStatus();
Expand All @@ -53,9 +112,10 @@ class TDescribeTableRPC : public TRpcSchemeRequestActor<TDescribeTableRPC, TEvDe
case NKikimrScheme::StatusSuccess: {
const auto& pathDescription = record.GetPathDescription();
Ydb::Scheme::Entry* selfEntry = describeTableResult.mutable_self();
selfEntry->set_name(pathDescription.GetSelf().GetName());
selfEntry->set_type(static_cast<Ydb::Scheme::Entry::Type>(pathDescription.GetSelf().GetPathType()));
ConvertDirectoryEntry(pathDescription.GetSelf(), selfEntry, true);
if (OverrideName) {
selfEntry->set_name(OverrideName);
}

if (pathDescription.HasColumnTableDescription()) {
const auto& tableDescription = pathDescription.GetColumnTableDescription();
Expand Down Expand Up @@ -136,9 +196,8 @@ class TDescribeTableRPC : public TRpcSchemeRequestActor<TDescribeTableRPC, TEvDe
}
}

void SendProposeRequest(const TActorContext &ctx) {
void SendProposeRequest(const TString& path, const TActorContext& ctx) {
const auto req = GetProtoRequest();
const TString path = req->path();

std::unique_ptr<TEvTxUserProxy::TEvNavigate> navigateRequest(new TEvTxUserProxy::TEvNavigate());
SetAuthToken(navigateRequest, *Request_);
Expand All @@ -153,9 +212,7 @@ class TDescribeTableRPC : public TRpcSchemeRequestActor<TDescribeTableRPC, TEvDe
record->MutableOptions()->SetReturnPartitionStats(true);
}

if (AppData(ctx)->AllowPrivateTableDescribeForTest || path.EndsWith("/indexImplTable")) {
record->MutableOptions()->SetShowPrivateTable(true);
}
record->MutableOptions()->SetShowPrivateTable(ShowPrivatePath(path));

ctx.Send(MakeTxProxyID(), navigateRequest.release());
}
Expand Down
64 changes: 64 additions & 0 deletions ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4075,6 +4075,70 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
}
}

Y_UNIT_TEST(ChangefeedOnIndexTable) {
TKikimrRunner kikimr(TKikimrSettings()
.SetPQConfig(DefaultPQConfig())
.SetEnableChangefeedsOnIndexTables(true));
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();

{
auto query = R"(
--!syntax_v1
CREATE TABLE `/Root/table` (
Key Uint64,
Value String,
PRIMARY KEY (Key),
INDEX SyncIndex GLOBAL SYNC ON (`Value`),
INDEX AsyncIndex GLOBAL ASYNC ON (`Value`)
);
)";

auto result = session.ExecuteSchemeQuery(query).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}

const auto changefeed = TChangefeedDescription("feed", EChangefeedMode::KeysOnly, EChangefeedFormat::Json);
{
auto result = session.AlterTable("/Root/table/AsyncIndex", TAlterTableSettings()
.AppendAddChangefeeds(changefeed)
).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::PRECONDITION_FAILED, result.GetIssues().ToString());
}
{
auto result = session.AlterTable("/Root/table/SyncIndex", TAlterTableSettings()
.AppendAddChangefeeds(changefeed)
).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}
}

Y_UNIT_TEST(DescribeIndexTable) {
TKikimrRunner kikimr;
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();

{
auto query = R"(
--!syntax_v1
CREATE TABLE `/Root/table` (
Key Uint64,
Value String,
PRIMARY KEY (Key),
INDEX SyncIndex GLOBAL SYNC ON (`Value`)
);
)";

auto result = session.ExecuteSchemeQuery(query).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}
{
auto desc = session.DescribeTable("/Root/table/SyncIndex").ExtractValueSync();
UNIT_ASSERT_C(desc.IsSuccess(), desc.GetIssues().ToString());
UNIT_ASSERT_VALUES_EQUAL(desc.GetEntry().Name, "SyncIndex");
}
}

Y_UNIT_TEST(CreatedAt) {
TKikimrRunner kikimr(TKikimrSettings().SetPQConfig(DefaultPQConfig()));
auto scheme = NYdb::NScheme::TSchemeClient(kikimr.GetDriver(), TCommonClientSettings().Database("/Root"));
Expand Down
1 change: 1 addition & 0 deletions ydb/core/protos/feature_flags.proto
Original file line number Diff line number Diff line change
Expand Up @@ -145,4 +145,5 @@ message TFeatureFlags {
optional bool EnableColumnStatistics = 130 [default = false];
optional bool EnableSingleCompositeActionGroup = 131 [default = false];
optional bool EnableResourcePoolsOnServerless = 132 [default = false];
optional bool EnableChangefeedsOnIndexTables = 134 [default = false];
}
5 changes: 0 additions & 5 deletions ydb/core/protos/flat_scheme_op.proto
Original file line number Diff line number Diff line change
Expand Up @@ -919,10 +919,6 @@ message TCreateCdcStream {
optional TCdcStreamDescription StreamDescription = 2;
optional uint64 RetentionPeriodSeconds = 3 [default = 86400]; // 1d by default
optional uint32 TopicPartitions = 4;
oneof IndexMode {
google.protobuf.Empty AllIndexes = 5; // Create topic per each index
string IndexName = 6;
}
}

message TAlterCdcStream {
Expand Down Expand Up @@ -1637,7 +1633,6 @@ message TIndexBuildControl {

message TLockConfig {
optional string Name = 1;
optional bool AllowIndexImplLock = 2;
}

message TLockGuard {
Expand Down
1 change: 1 addition & 0 deletions ydb/core/testlib/basics/feature_flags.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ class TTestFeatureFlagsHolder {
FEATURE_FLAG_SETTER(EnableCMSRequestPriorities)
FEATURE_FLAG_SETTER(EnableTableDatetime64)
FEATURE_FLAG_SETTER(EnableResourcePools)
FEATURE_FLAG_SETTER(EnableChangefeedsOnIndexTables)

#undef FEATURE_FLAG_SETTER
};
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/replication/controller/dst_alterer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class TDstAlterer: public TActorBootstrapped<TDstAlterer> {

switch (Kind) {
case TReplication::ETargetKind::Table:
case TReplication::ETargetKind::IndexTable:
tx.SetOperationType(NKikimrSchemeOp::ESchemeOpAlterTable);
PathIdFromPathId(DstPathId, tx.MutableAlterTable()->MutablePathId());
tx.MutableAlterTable()->MutableReplicationConfig()->SetMode(
Expand Down
Loading

0 comments on commit 312992d

Please sign in to comment.