Skip to content

Commit

Permalink
Diff schemas (ydb-platform#10958)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored and zverevgeny committed Jan 1, 2025
1 parent b1f6580 commit abefcc9
Show file tree
Hide file tree
Showing 37 changed files with 604 additions and 305 deletions.
2 changes: 0 additions & 2 deletions ydb/core/formats/arrow/splitter/scheme_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ class ISchemaDetailInfo {
NAccessor::TColumnSaver GetColumnSaver(const ui32 columnId) const;
virtual std::shared_ptr<arrow::Field> GetField(const ui32 columnId) const = 0;
virtual std::optional<TColumnSerializationStat> GetColumnSerializationStats(const ui32 columnId) const = 0;
virtual bool NeedMinMaxForColumn(const ui32 columnId) const = 0;
virtual bool IsSortedColumn(const ui32 columnId) const = 0;
virtual std::optional<TBatchSerializationStat> GetBatchSerializationStats(const std::shared_ptr<arrow::RecordBatch>& rb) const = 0;
};
} // namespace NKikimr::NArrow::NSplitter
1 change: 0 additions & 1 deletion ydb/core/grpc_services/rpc_create_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,6 @@ class TCreateTableRPC : public TRpcSchemeRequestActor<TCreateTableRPC, TEvCreate
tableDesc->SetName(tableName);

auto schema = tableDesc->MutableSchema();
schema->SetEngine(NKikimrSchemeOp::EColumnTableEngine::COLUMN_ENGINE_REPLACING_TIMESERIES);

TString error;
if (!FillColumnDescription(*tableDesc, req.columns(), code, error)) {
Expand Down
6 changes: 0 additions & 6 deletions ydb/core/grpc_services/rpc_log_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,19 +120,13 @@ bool ConvertSchemaFromPublicToInternal(const Ydb::LogStore::Schema& from, NKikim
}
}

to.SetEngine(NKikimrSchemeOp::COLUMN_ENGINE_REPLACING_TIMESERIES);
status = {};
return true;
}

bool ConvertSchemaFromInternalToPublic(const NKikimrSchemeOp::TColumnTableSchema& from, Ydb::LogStore::Schema& to,
Ydb::StatusIds::StatusCode& status, TString& error)
{
if (from.GetEngine() != NKikimrSchemeOp::COLUMN_ENGINE_REPLACING_TIMESERIES) {
status = Ydb::StatusIds::INTERNAL_ERROR;
error = TStringBuilder() << "Unexpected table engine: " << NKikimrSchemeOp::EColumnTableEngine_Name(from.GetEngine());
return false;
}
to.mutable_primary_key()->CopyFrom(from.GetKeyColumnNames());
for (const auto& column : from.GetColumns()) {
auto* col = to.add_columns();
Expand Down
2 changes: 0 additions & 2 deletions ydb/core/kqp/host/kqp_gateway_proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -393,8 +393,6 @@ void FillColumnTableSchema(NKikimrSchemeOp::TColumnTableSchema& schema, const T&
for (const auto& keyColumn : metadata.KeyColumnNames) {
schema.AddKeyColumnNames(keyColumn);
}

schema.SetEngine(NKikimrSchemeOp::EColumnTableEngine::COLUMN_ENGINE_REPLACING_TIMESERIES);
}

bool FillCreateColumnTableDesc(NYql::TKikimrTableMetadataPtr metadata,
Expand Down
2 changes: 0 additions & 2 deletions ydb/core/kqp/ut/olap/helpers/typed_local.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ TString TTypedLocalHelper::GetTestTableSchema() const {
Columns { Name: "pk_int" Type: "Int64" NotNull: true }
Columns { Name: "ts" Type: "Timestamp" }
KeyColumnNames: "pk_int"
Engine: COLUMN_ENGINE_REPLACING_TIMESERIES
)";
return result;
}
Expand All @@ -34,7 +33,6 @@ TString TTypedLocalHelper::GetMultiColumnTestTableSchema(ui32 reps) const {
}
result += R"(
KeyColumnNames: "pk_int"
Engine: COLUMN_ENGINE_REPLACING_TIMESERIES
)";
return result;
}
Expand Down
1 change: 0 additions & 1 deletion ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
Columns { Name: "Datetime_column" Type: "Datetime" }
#Columns { Name: "Interval_column" Type: "Interval" }
KeyColumnNames: "key"
Engine: COLUMN_ENGINE_REPLACING_TIMESERIES
}
}
)");
Expand Down
18 changes: 14 additions & 4 deletions ydb/core/protos/flat_scheme_op.proto
Original file line number Diff line number Diff line change
Expand Up @@ -556,9 +556,6 @@ message TColumnTableSchema {
repeated TOlapColumnDescription Columns = 1;
repeated string KeyColumnNames = 2;

// A type of engine used by the table
optional EColumnTableEngine Engine = 3;

// Internal fields
optional uint32 NextColumnId = 4;

Expand All @@ -569,11 +566,24 @@ message TColumnTableSchema {
//optional int32 DefaultCompressionLevel = 7; // deprecated, not used before replace
optional TCompressionOptions DefaultCompression = 8;

optional bool CompositeMarksDeprecated = 9 [ default = false ];
repeated TOlapIndexDescription Indexes = 10;
optional TColumnTableSchemeOptions Options = 12;
}

message TColumnTableSchemaDiff {
optional uint64 Version = 1;

repeated TOlapColumnDescription UpsertColumns = 2;
repeated uint32 DropColumns = 3;

optional TCompressionOptions DefaultCompression = 4;

repeated TOlapIndexDescription UpsertIndexes = 5;
repeated uint32 DropIndexes = 6;

optional TColumnTableSchemeOptions Options = 7;
}

message TColumnTableRequestedOptions {
optional bool SchemeNeedActualization = 1 [default = false];
optional bool ExternalGuaranteeExclusivePK = 2;
Expand Down
1 change: 1 addition & 0 deletions ydb/core/protos/tx_columnshard.proto
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ message TSchemaPresetVersionInfo {
optional uint64 SinceStep = 2;
optional uint64 SinceTxId = 3;
optional NKikimrSchemeOp.TColumnTableSchema Schema = 4;
optional NKikimrSchemeOp.TColumnTableSchemaDiff Diff = 5;
}

message TTtlSettingsPresetVersionInfo {
Expand Down
1 change: 0 additions & 1 deletion ydb/core/testlib/cs_helper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,6 @@ TString THelper::GetTestTableSchema() const {
sb << R"(
KeyColumnNames: "timestamp"
KeyColumnNames: "uid"
Engine : COLUMN_ENGINE_REPLACING_TIMESERIES
)";
return sb;
}
Expand Down
1 change: 0 additions & 1 deletion ydb/core/testlib/cs_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ class THelper: public THelperSchemaless {
Columns { Name: "level" Type: "Int32" DataAccessorConstructor{ ClassName: "SPARSED" }}
Columns { Name: "message" Type: "Utf8" }
KeyColumnNames: "timestamp"
Engine: COLUMN_ENGINE_REPLACING_TIMESERIES
)";

void WithSomeNulls() {
Expand Down
30 changes: 29 additions & 1 deletion ydb/core/tx/columnshard/engines/column_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,34 @@ class IColumnEngine {
virtual void DoRegisterTable(const ui64 pathId) = 0;

public:
class TSchemaInitializationData {
private:
YDB_READONLY_DEF(std::optional<NKikimrSchemeOp::TColumnTableSchema>, Schema);
YDB_READONLY_DEF(std::optional<NKikimrSchemeOp::TColumnTableSchemaDiff>, Diff);

public:
const NKikimrSchemeOp::TColumnTableSchema& GetSchemaVerified() const {
AFL_VERIFY(Schema);
return *Schema;
}

TSchemaInitializationData(
const std::optional<NKikimrSchemeOp::TColumnTableSchema>& schema, const std::optional<NKikimrSchemeOp::TColumnTableSchemaDiff>& diff)
: Schema(schema)
, Diff(diff) {
AFL_VERIFY(Schema || Diff);
}

TSchemaInitializationData(const NKikimrTxColumnShard::TSchemaPresetVersionInfo& info) {
if (info.HasSchema()) {
Schema = info.GetSchema();
}
if (info.HasDiff()) {
Diff = info.GetDiff();
}
}
};

static ui64 GetMetadataLimit();

virtual ~IColumnEngine() = default;
Expand Down Expand Up @@ -293,7 +321,7 @@ class IColumnEngine {
virtual bool ApplyChangesOnTxCreate(std::shared_ptr<TColumnEngineChanges> changes, const TSnapshot& snapshot) noexcept = 0;
virtual bool ApplyChangesOnExecute(IDbWrapper& db, std::shared_ptr<TColumnEngineChanges> changes, const TSnapshot& snapshot) noexcept = 0;
virtual void RegisterSchemaVersion(const TSnapshot& snapshot, TIndexInfo&& info) = 0;
virtual void RegisterSchemaVersion(const TSnapshot& snapshot, const NKikimrSchemeOp::TColumnTableSchema& schema) = 0;
virtual void RegisterSchemaVersion(const TSnapshot& snapshot, const TSchemaInitializationData& schema) = 0;
virtual const TMap<ui64, std::shared_ptr<TColumnEngineStats>>& GetStats() const = 0;
virtual const TColumnEngineStats& GetTotalStats() = 0;
virtual ui64 MemoryUsage() const {
Expand Down
18 changes: 12 additions & 6 deletions ydb/core/tx/columnshard/engines/column_engine_logs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@

namespace NKikimr::NOlap {

TColumnEngineForLogs::TColumnEngineForLogs(ui64 tabletId, const std::shared_ptr<IStoragesManager>& storagesManager,
const TSnapshot& snapshot, const NKikimrSchemeOp::TColumnTableSchema& schema)
TColumnEngineForLogs::TColumnEngineForLogs(
ui64 tabletId, const std::shared_ptr<IStoragesManager>& storagesManager, const TSnapshot& snapshot, const TSchemaInitializationData& schema)
: GranulesStorage(std::make_shared<TGranulesStorage>(SignalCounters, storagesManager))
, StoragesManager(storagesManager)
, TabletId(tabletId)
Expand Down Expand Up @@ -162,11 +162,17 @@ void TColumnEngineForLogs::RegisterSchemaVersion(const TSnapshot& snapshot, TInd
}
}

void TColumnEngineForLogs::RegisterSchemaVersion(const TSnapshot& snapshot, const NKikimrSchemeOp::TColumnTableSchema& schema) {
std::optional<NOlap::TIndexInfo> indexInfoOptional = NOlap::TIndexInfo::BuildFromProto(schema, StoragesManager, SchemaObjectsCache);
void TColumnEngineForLogs::RegisterSchemaVersion(const TSnapshot& snapshot, const TSchemaInitializationData& schema) {
std::optional<NOlap::TIndexInfo> indexInfoOptional;
if (schema.GetDiff()) {
AFL_VERIFY(!VersionedIndex.IsEmpty());
indexInfoOptional = NOlap::TIndexInfo::BuildFromProto(
*schema.GetDiff(), VersionedIndex.GetLastSchema()->GetIndexInfo(), StoragesManager, SchemaObjectsCache);
} else {
indexInfoOptional = NOlap::TIndexInfo::BuildFromProto(schema.GetSchemaVerified(), StoragesManager, SchemaObjectsCache);
}
AFL_VERIFY(indexInfoOptional);
NOlap::TIndexInfo indexInfo = std::move(*indexInfoOptional);
RegisterSchemaVersion(snapshot, std::move(indexInfo));
RegisterSchemaVersion(snapshot, std::move(*indexInfoOptional));
}

bool TColumnEngineForLogs::Load(IDbWrapper& db) {
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/tx/columnshard/engines/column_engine_logs.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class TColumnEngineForLogs: public IColumnEngine {
};

TColumnEngineForLogs(ui64 tabletId, const std::shared_ptr<IStoragesManager>& storagesManager, const TSnapshot& snapshot,
const NKikimrSchemeOp::TColumnTableSchema& schema);
const TSchemaInitializationData& schema);
TColumnEngineForLogs(
ui64 tabletId, const std::shared_ptr<IStoragesManager>& storagesManager, const TSnapshot& snapshot, TIndexInfo&& schema);

Expand Down Expand Up @@ -131,7 +131,7 @@ class TColumnEngineForLogs: public IColumnEngine {
IDbWrapper& db, std::shared_ptr<TColumnEngineChanges> indexChanges, const TSnapshot& snapshot) noexcept override;

void RegisterSchemaVersion(const TSnapshot& snapshot, TIndexInfo&& info) override;
void RegisterSchemaVersion(const TSnapshot& snapshot, const NKikimrSchemeOp::TColumnTableSchema& schema) override;
void RegisterSchemaVersion(const TSnapshot& snapshot, const TSchemaInitializationData& schema) override;

std::shared_ptr<TSelectInfo> Select(
ui64 pathId, TSnapshot snapshot, const TPKRangesFilter& pkRangesFilter, const bool withUncommitted) const override;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class IIndexInfo {
static constexpr const char* SPEC_COL_TX_ID = NOlap::NPortion::TSpecialColumns::SPEC_COL_TX_ID;
static constexpr const char* SPEC_COL_WRITE_ID = NOlap::NPortion::TSpecialColumns::SPEC_COL_WRITE_ID;
static constexpr const char* SPEC_COL_DELETE_FLAG = NOlap::NPortion::TSpecialColumns::SPEC_COL_DELETE_FLAG;
static constexpr ui32 SpecialColumnsCount = 4;

static const inline std::shared_ptr<arrow::Field> PlanStepField = arrow::field(SPEC_COL_PLAN_STEP, arrow::uint64());
static const inline std::shared_ptr<arrow::Field> TxIdField = arrow::field(SPEC_COL_TX_ID, arrow::uint64());
Expand Down
Loading

0 comments on commit abefcc9

Please sign in to comment.