Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sequences backup #5018

Merged
merged 7 commits into from
Jun 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 139 additions & 0 deletions ydb/core/kqp/ut/pg/kqp_pg_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1694,6 +1694,145 @@ Y_UNIT_TEST_SUITE(KqpPg) {
}
}

Y_UNIT_TEST(CopyTableSerialColumns) {
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnableSequences(true);
auto setting = NKikimrKqp::TKqpSetting();
TKikimrRunner kikimr(
TKikimrSettings()
.SetAppConfig(appConfig)
.SetKqpSettings({setting})
.SetWithSampleTables(false)
.SetEnableNotNullDataColumns(true)
);
auto client = kikimr.GetTableClient();
auto session = client.CreateSession().GetValueSync().GetSession();
{
const auto query = Q_(R"(
--!syntax_pg
CREATE TABLE PgSerial (
key serial PRIMARY KEY,
value int2
))");

auto result = session.ExecuteSchemeQuery(query).ExtractValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
}

{
const auto query = Q_(R"(
--!syntax_pg
INSERT INTO PgSerial (value) values (1);
)");

auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
}

{
const auto result = session.CopyTable("/Root/PgSerial", "/Root/copy").GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());

auto desc = session.DescribeTable("/Root/copy").ExtractValueSync();
UNIT_ASSERT_C(desc.IsSuccess(), desc.GetIssues().ToString());
}

{
const auto query = Q_(R"(
--!syntax_pg
SELECT * FROM copy;
)");

auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());

UNIT_ASSERT_C(!result.GetResultSets().empty(), "results are empty");
CompareYson(R"(
[["1";"1"]]
)", FormatResultSetYson(result.GetResultSet(0)));
}

{
const auto query = Q_(R"(
--!syntax_pg
INSERT INTO copy (value) values (1);
)");

auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
}

{
const auto query = Q_(R"(
--!syntax_pg
SELECT * FROM copy;
)");

auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());

UNIT_ASSERT_C(!result.GetResultSets().empty(), "results are empty");
CompareYson(R"(
[["1";"1"];["2";"1"]]
)", FormatResultSetYson(result.GetResultSet(0)));
}

{
const auto query = Q_(R"(
--!syntax_pg
SELECT * FROM PgSerial;
)");

auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());

UNIT_ASSERT_C(!result.GetResultSets().empty(), "results are empty");
CompareYson(R"(
[["1";"1"]]
)", FormatResultSetYson(result.GetResultSet(0)));
}

{
const auto query = Q_(R"(
--!syntax_pg
INSERT INTO PgSerial (value) values (1);
)");

auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
}

{
const auto query = Q_(R"(
--!syntax_pg
SELECT * FROM copy;
)");

auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());

UNIT_ASSERT_C(!result.GetResultSets().empty(), "results are empty");
CompareYson(R"(
[["1";"1"];["2";"1"]]
)", FormatResultSetYson(result.GetResultSet(0)));
}

{
const auto query = Q_(R"(
--!syntax_pg
SELECT * FROM PgSerial;
)");

auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());

UNIT_ASSERT_C(!result.GetResultSets().empty(), "results are empty");
CompareYson(R"(
[["1";"1"];["2";"1"]]
)", FormatResultSetYson(result.GetResultSet(0)));
}
}

Y_UNIT_TEST(CreateIndex) {
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnablePreparedDdl(true);;
Expand Down
5 changes: 5 additions & 0 deletions ydb/core/protos/counters_schemeshard.proto
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,8 @@ enum ESimpleCounters {
COUNTER_IN_FLIGHT_OPS_TxDropView = 157 [(CounterOpts) = {Name: "InFlightOps/DropView"}];

COUNTER_GRAPHSHARD_COUNT = 158 [(CounterOpts) = {Name: "GraphShards"}];

COUNTER_IN_FLIGHT_OPS_TxCopySequence = 159 [(CounterOpts) = {Name: "InFlightOps/CopySequence"}];
}

enum ECumulativeCounters {
Expand Down Expand Up @@ -315,6 +317,7 @@ enum ECumulativeCounters {
COUNTER_FINISHED_OPS_TxCreateView = 95 [(CounterOpts) = {Name: "FinishedOps/CreateView"}];
COUNTER_FINISHED_OPS_TxDropView = 96 [(CounterOpts) = {Name: "FinishedOps/DropView"}];
COUNTER_FINISHED_OPS_TxAlterView = 97 [(CounterOpts) = {Name: "FinishedOps/AlterView"}];
COUNTER_FINISHED_OPS_TxCopySequence = 98 [(CounterOpts) = {Name: "FinishedOps/TxCopySequence"}];
}

enum EPercentileCounters {
Expand Down Expand Up @@ -536,4 +539,6 @@ enum ETxTypes {
TXTYPE_PERSQUEUE_PROPOSE_RESULT = 83 [(TxTypeOpts) = {Name: "TxPersQueueProposeResult"}];
TXTYPE_PERSQUEUE_PROPOSE_ATTACH_RESULT = 84 [(TxTypeOpts) = {Name: "TxProposeTransactionAttachResult"}];
TXTYPE_UPDATE_DOMAIN_REPLY = 85 [(TxTypeOpts) = {Name: "TxUpdateDomainReply"}];

TXTYPE_SEQUENCESHARD_GET_SEQUENCE_RESULT = 86 [(TxTypeOpts) = {Name: "TxSequenceShardGetSequenceResult"}];
}
1 change: 1 addition & 0 deletions ydb/core/protos/counters_sequenceshard.proto
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,5 @@ enum ETxTypes {
TXTYPE_FREEZE_SEQUENCE = 7 [(TxTypeOpts) = {Name: "TxFreezeSequence"}];
TXTYPE_RESTORE_SEQUENCE = 8 [(TxTypeOpts) = {Name: "TxRestoreSequence"}];
TXTYPE_REDIRECT_SEQUENCE = 9 [(TxTypeOpts) = {Name: "TxRedirectSequence"}];
TXTYPE_GET_SEQUENCE = 10 [(TxTypeOpts) = {Name: "TxGetSequence"}];
}
12 changes: 12 additions & 0 deletions ydb/core/protos/flat_scheme_op.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1275,6 +1275,10 @@ message TMoveIndex {
}

message TSequenceDescription {
message TSetVal {
optional sint64 NextValue = 1;
optional bool NextUsed = 2;
}
optional string Name = 1; // mandatory
optional NKikimrProto.TPathID PathId = 2; // sequence path id, assigned by schemeshard
optional uint64 Version = 3; // incremented every time sequence is altered
Expand All @@ -1286,6 +1290,7 @@ message TSequenceDescription {
optional uint64 Cache = 8; // number of items to cache, defaults to 1
optional sint64 Increment = 9; // increment at each call, defaults to 1
optional bool Cycle = 10; // true when cycle on overflow is allowed
optional TSetVal SetVal = 11; // SetVal(NextValue, NextUsed) is executed atomically when creating
}

message TSequenceSharding {
Expand Down Expand Up @@ -1571,6 +1576,12 @@ message TModifyScheme {
optional TViewDescription CreateView = 64;

optional NActorsProto.TActorId TempTableOwnerActorId = 65;

optional TCopySequence CopySequence = 66;
}

message TCopySequence {
optional string CopyFrom = 1;
}

// "Script", used by client to parse text files with multiple DDL commands
Expand All @@ -1588,6 +1599,7 @@ message TDescribeOptions {
optional bool ShowPrivateTable = 7 [default = false];
optional bool ReturnChannelsBinding = 8 [default = false];
optional bool ReturnRangeKey = 9 [default = true];
optional bool ReturnSetVal = 10 [default = false];
}

// Request to read scheme for a specific path
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/protos/out/out_sequenceshard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,7 @@ Y_DECLARE_OUT_SPEC(, NKikimrTxSequenceShard::TEvRestoreSequenceResult::EStatus,
Y_DECLARE_OUT_SPEC(, NKikimrTxSequenceShard::TEvRedirectSequenceResult::EStatus, stream, value) {
stream << NKikimrTxSequenceShard::TEvRedirectSequenceResult::EStatus_Name(value);
}

Y_DECLARE_OUT_SPEC(, NKikimrTxSequenceShard::TEvGetSequenceResult::EStatus, stream, value) {
stream << NKikimrTxSequenceShard::TEvGetSequenceResult::EStatus_Name(value);
}
37 changes: 37 additions & 0 deletions ydb/core/protos/tx_sequenceshard.proto
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ message TEvMarkSchemeShardPipe {
}

message TEvCreateSequence {
message TSetVal {
sint64 NextValue = 1;
bool NextUsed = 2;
}

NKikimrProto.TPathID PathId = 1;
uint64 TxId = 2;
uint64 TxPartId = 3;
Expand All @@ -39,6 +44,8 @@ message TEvCreateSequence {
oneof OptionalCycle {
bool Cycle = 9;
}
bool Frozen = 10; // defaults to false
TSetVal SetVal = 11;
}

message TEvCreateSequenceResult {
Expand Down Expand Up @@ -224,3 +231,33 @@ message TEvRedirectSequenceResult {
uint64 TxId = 3;
uint64 TxPartId = 4;
}

message TEvGetSequence {
NKikimrProto.TPathID PathId = 1;
uint64 TxId = 2;
uint64 TxPartId = 3;
}

message TEvGetSequenceResult {
enum EStatus {
UNKNOWN = 0;
SUCCESS = 1;
PIPE_OUTDATED = 2;
SEQUENCE_NOT_FOUND = 3;
SEQUENCE_MOVED = 4;
}

EStatus Status = 1;
uint64 Origin = 2;
uint64 TxId = 3;
uint64 TxPartId = 4;
uint64 MovedTo = 5; // moved to a different sequence shard
sint64 MinValue = 6;
sint64 MaxValue = 7;
sint64 StartValue = 8;
sint64 NextValue = 9;
bool NextUsed = 10;
uint64 Cache = 11;
sint64 Increment = 12;
bool Cycle = 13;
}
1 change: 1 addition & 0 deletions ydb/core/tx/datashard/export_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ TMaybe<Ydb::Table::CreateTableRequest> GenYdbScheme(
FillPartitioningSettings(scheme, tableDesc);
FillKeyBloomFilter(scheme, tableDesc);
FillReadReplicasSettings(scheme, tableDesc);
FillSequenceDescription(scheme, tableDesc);

return scheme;
}
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/tx/schemeshard/schemeshard__operation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1043,6 +1043,8 @@ ISubOperation::TPtr TOperation::RestorePart(TTxState::ETxType txType, TTxState::
Y_ABORT("TODO: implement");
case TTxState::ETxType::TxDropSequence:
return CreateDropSequence(NextPartId(), txState);
case TTxState::ETxType::TxCopySequence:
return CreateCopySequence(NextPartId(), txState);

case TTxState::ETxType::TxFillIndex:
Y_ABORT("deprecated");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -619,7 +619,7 @@ class TBackupRestoreOperationBase: public TSubOperation {
.NotAsyncReplicaTable()
.NotUnderOperation()
.IsCommonSensePath() //forbid alter impl index tables
.NotChildren(); //forbid backup table with indexes
.CanBackupTable(); //forbid backup table with indexes

if (!checks) {
result->SetError(checks.GetStatus(), checks.GetError());
Expand Down
8 changes: 5 additions & 3 deletions ydb/core/tx/schemeshard/schemeshard__operation_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ void UpdatePartitioningForCopyTable(TOperationId operationId, TTxState& txState,
class TProposedWaitParts: public TSubOperationState {
private:
TOperationId OperationId;
const TTxState::ETxState NextState;

TString DebugHint() const override {
return TStringBuilder()
Expand All @@ -51,8 +52,9 @@ class TProposedWaitParts: public TSubOperationState {
}

public:
TProposedWaitParts(TOperationId id)
TProposedWaitParts(TOperationId id, TTxState::ETxState nextState = TTxState::Done)
: OperationId(id)
, NextState(nextState)
{
IgnoreMessages(DebugHint(),
{ TEvHive::TEvCreateTabletReply::EventType
Expand Down Expand Up @@ -124,7 +126,7 @@ class TProposedWaitParts: public TSubOperationState {
// Got notifications from all datashards?
if (txState->ShardsInProgress.empty()) {
NTableState::AckAllSchemaChanges(OperationId, *txState, context);
context.SS->ChangeTxState(db, OperationId, TTxState::Done);
context.SS->ChangeTxState(db, OperationId, NextState);
return true;
}

Expand Down Expand Up @@ -968,7 +970,7 @@ class TPropose: public TSubOperationState {
TTxState* txState = context.SS->FindTx(OperationId);
Y_ABORT_UNLESS(txState);
Y_ABORT_UNLESS(txState->TxType == TTxState::TxCreatePQGroup || txState->TxType == TTxState::TxAlterPQGroup || txState->TxType == TTxState::TxAllocatePQ);

TPathId pathId = txState->TargetPathId;
TPathElement::TPtr path = context.SS->PathsById.at(pathId);

Expand Down
Loading
Loading