Skip to content

Commit

Permalink
Column Family for ColumnTable (#9657)
Browse files Browse the repository at this point in the history
  • Loading branch information
vlad-gogov authored Nov 21, 2024
1 parent 4f2597b commit 45513a5
Show file tree
Hide file tree
Showing 34 changed files with 2,531 additions and 299 deletions.
9 changes: 5 additions & 4 deletions ydb/core/formats/arrow/serializer/native.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,8 @@ NKikimr::TConclusion<std::shared_ptr<arrow::util::Codec>> TNativeSerializer::Bui
const int levelMin = codec->minimum_compression_level();
const int levelMax = codec->maximum_compression_level();
if (levelDef < levelMin || levelMax < levelDef) {
return TConclusionStatus::Fail(
TStringBuilder() << "incorrect level for codec. have to be: [" << levelMin << ":" << levelMax << "]"
);
return TConclusionStatus::Fail(TStringBuilder() << "incorrect level for codec `" << arrow::util::Codec::GetCodecAsString(cType)
<< "`. have to be: [" << levelMin << ":" << levelMax << "]");
}
std::shared_ptr<arrow::util::Codec> codecPtr = std::move(NArrow::TStatusValidator::GetValid(arrow::util::Codec::Create(cType, levelDef)));
return codecPtr;
Expand Down Expand Up @@ -182,7 +181,9 @@ NKikimr::TConclusionStatus TNativeSerializer::DoDeserializeFromProto(const NKiki
void TNativeSerializer::DoSerializeToProto(NKikimrSchemeOp::TOlapColumn::TSerializer& proto) const {
if (Options.codec) {
proto.MutableArrowCompression()->SetCodec(NArrow::CompressionToProto(Options.codec->compression_type()));
proto.MutableArrowCompression()->SetLevel(Options.codec->compression_level());
if (arrow::util::Codec::SupportsCompressionLevel(Options.codec->compression_type())) {
proto.MutableArrowCompression()->SetLevel(Options.codec->compression_level());
}
} else {
proto.MutableArrowCompression()->SetCodec(NArrow::CompressionToProto(arrow::Compression::UNCOMPRESSED));
}
Expand Down
14 changes: 14 additions & 0 deletions ydb/core/formats/arrow/serializer/native.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,20 @@ class TNativeSerializer: public ISerializer {
Options.use_threads = false;
Options.memory_pool = pool;
}

arrow::Compression::type GetCodecType() const {
if (Options.codec) {
return Options.codec->compression_type();
}
return arrow::Compression::type::UNCOMPRESSED;
}

std::optional<i32> GetCodecLevel() const {
if (Options.codec && arrow::util::Codec::SupportsCompressionLevel(Options.codec->compression_type())) {
return Options.codec->compression_level();
}
return {};
}
};

}
12 changes: 12 additions & 0 deletions ydb/core/formats/arrow/serializer/parsing.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,18 @@ std::string CompressionToString(const arrow::Compression::type compression) {
return arrow::util::Codec::GetCodecAsString(compression);
}

std::string CompressionToString(const NKikimrSchemeOp::EColumnCodec compression) {
switch (compression) {
case NKikimrSchemeOp::EColumnCodec::ColumnCodecPlain:
return "off";
case NKikimrSchemeOp::EColumnCodec::ColumnCodecZSTD:
return "zstd";
case NKikimrSchemeOp::EColumnCodec::ColumnCodecLZ4:
return "lz4";
}
return "";
}

std::optional<arrow::Compression::type> CompressionFromString(const std::string& compressionStr) {
auto result = arrow::util::Codec::GetCompressionType(compressionStr);
if (!result.ok()) {
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/formats/arrow/serializer/parsing.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
namespace NKikimr::NArrow {

std::string CompressionToString(const arrow::Compression::type compression);
std::string CompressionToString(const NKikimrSchemeOp::EColumnCodec compression);
std::optional<arrow::Compression::type> CompressionFromString(const std::string& compressionStr);

NKikimrSchemeOp::EColumnCodec CompressionToProto(const arrow::Compression::type compression);
std::optional<arrow::Compression::type> CompressionFromProto(const NKikimrSchemeOp::EColumnCodec compression);

}
29 changes: 29 additions & 0 deletions ydb/core/formats/arrow/serializer/utils.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#include "parsing.h"
#include "utils.h"

#include <ydb/library/formats/arrow/validation/validation.h>

#include <contrib/libs/apache/arrow/cpp/src/arrow/util/compression.h>

namespace NKikimr::NArrow {
bool SupportsCompressionLevel(const arrow::Compression::type compression) {
return arrow::util::Codec::SupportsCompressionLevel(compression);
}

bool SupportsCompressionLevel(const NKikimrSchemeOp::EColumnCodec compression) {
return SupportsCompressionLevel(CompressionFromProto(compression).value());
}

std::optional<int> MinimumCompressionLevel(const arrow::Compression::type compression) {
if (!SupportsCompressionLevel(compression)) {
return {};
}
return NArrow::TStatusValidator::GetValid(arrow::util::Codec::MinimumCompressionLevel(compression));
}
std::optional<int> MaximumCompressionLevel(const arrow::Compression::type compression) {
if (!SupportsCompressionLevel(compression)) {
return {};
}
return NArrow::TStatusValidator::GetValid(arrow::util::Codec::MaximumCompressionLevel(compression));
}
}
16 changes: 16 additions & 0 deletions ydb/core/formats/arrow/serializer/utils.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#pragma once

#include <ydb/core/protos/flat_scheme_op.pb.h>

#include <contrib/libs/apache/arrow/cpp/src/arrow/util/type_fwd.h>
#include <util/system/yassert.h>

#include <optional>

namespace NKikimr::NArrow {
bool SupportsCompressionLevel(const arrow::Compression::type compression);
bool SupportsCompressionLevel(const NKikimrSchemeOp::EColumnCodec compression);

std::optional<int> MinimumCompressionLevel(const arrow::Compression::type compression);
std::optional<int> MaximumCompressionLevel(const arrow::Compression::type compression);
}
1 change: 1 addition & 0 deletions ydb/core/formats/arrow/serializer/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ SRCS(
GLOBAL native.cpp
stream.cpp
parsing.cpp
utils.cpp
)

END()
98 changes: 94 additions & 4 deletions ydb/core/kqp/host/kqp_gateway_proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@ bool ConvertCreateTableSettingsToProto(NYql::TKikimrTableMetadataPtr metadata, Y
familyProto->set_compression(Ydb::Table::ColumnFamily::COMPRESSION_NONE);
} else if (to_lower(family.Compression.GetRef()) == "lz4") {
familyProto->set_compression(Ydb::Table::ColumnFamily::COMPRESSION_LZ4);
} else if (to_lower(family.Compression.GetRef()) == "zstd") {
familyProto->set_compression(Ydb::Table::ColumnFamily::COMPRESSION_ZSTD);
} else {
code = Ydb::StatusIds::BAD_REQUEST;
error = TStringBuilder() << "Unknown compression '" << family.Compression.GetRef() << "' for a column family";
Expand Down Expand Up @@ -383,9 +385,59 @@ bool FillCreateTableDesc(NYql::TKikimrTableMetadataPtr metadata, NKikimrSchemeOp
}

template <typename T>
void FillColumnTableSchema(NKikimrSchemeOp::TColumnTableSchema& schema, const T& metadata)
{
bool FillColumnTableSchema(NKikimrSchemeOp::TColumnTableSchema& schema, const T& metadata, Ydb::StatusIds::StatusCode& code, TString& error) {
Y_ENSURE(metadata.ColumnOrder.size() == metadata.Columns.size());

THashMap<TString, ui32> columnFamiliesByName;
ui32 columnFamilyId = 1;
for (const auto& family : metadata.ColumnFamilies) {
if (family.Data.Defined()) {
code = Ydb::StatusIds::BAD_REQUEST;
error = TStringBuilder() << "Field `DATA` is not supported for OLAP tables in column family '" << family.Name << "'";
return false;
}
auto columnFamilyIt = columnFamiliesByName.find(family.Name);
if (!columnFamilyIt.IsEnd()) {
code = Ydb::StatusIds::BAD_REQUEST;
error = TStringBuilder() << "Duplicate column family `" << family.Name << '`';
return false;
}
auto familyDescription = schema.AddColumnFamilies();
familyDescription->SetName(family.Name);
if (familyDescription->GetName() == "default") {
familyDescription->SetId(0);
} else {
familyDescription->SetId(columnFamilyId++);
}
Y_ENSURE(columnFamiliesByName.emplace(familyDescription->GetName(), familyDescription->GetId()).second);
if (family.Compression.Defined()) {
NKikimrSchemeOp::EColumnCodec codec;
auto codecName = to_lower(family.Compression.GetRef());
if (codecName == "off") {
codec = NKikimrSchemeOp::EColumnCodec::ColumnCodecPlain;
} else if (codecName == "zstd") {
codec = NKikimrSchemeOp::EColumnCodec::ColumnCodecZSTD;
} else if (codecName == "lz4") {
codec = NKikimrSchemeOp::EColumnCodec::ColumnCodecLZ4;
} else {
code = Ydb::StatusIds::BAD_REQUEST;
error = TStringBuilder() << "Unknown compression '" << family.Compression.GetRef() << "' for a column family";
return false;
}
familyDescription->SetColumnCodec(codec);
} else {
code = Ydb::StatusIds::BAD_REQUEST;
error = TStringBuilder() << "Compression is not set for column family'" << family.Name << "'";
return false;
}

if (family.CompressionLevel.Defined()) {
familyDescription->SetColumnCodecLevel(family.CompressionLevel.GetRef());
}
}

schema.SetNextColumnFamilyId(columnFamilyId);

for (const auto& name : metadata.ColumnOrder) {
auto columnIt = metadata.Columns.find(name);
Y_ENSURE(columnIt != metadata.Columns.end());
Expand All @@ -399,11 +451,29 @@ void FillColumnTableSchema(NKikimrSchemeOp::TColumnTableSchema& schema, const T&
if (columnType.TypeInfo) {
*columnDesc.MutableTypeInfo() = *columnType.TypeInfo;
}

if (!columnFamiliesByName.empty()) {
TString columnFamilyName = "default";
ui32 columnFamilyId = 0;
if (columnIt->second.Families.size()) {
columnFamilyName = *columnIt->second.Families.begin();
auto columnFamilyIdIt = columnFamiliesByName.find(columnFamilyName);
if (columnFamilyIdIt.IsEnd()) {
code = Ydb::StatusIds::BAD_REQUEST;
error = TStringBuilder() << "Unknown column family `" << columnFamilyName << "` for column `" << columnDesc.GetName() << "`";
return false;
}
columnFamilyId = columnFamilyIdIt->second;
}
columnDesc.SetColumnFamilyName(columnFamilyName);
columnDesc.SetColumnFamilyId(columnFamilyId);
}
}

for (const auto& keyColumn : metadata.KeyColumnNames) {
schema.AddKeyColumnNames(keyColumn);
}
return true;
}

bool FillCreateColumnTableDesc(NYql::TKikimrTableMetadataPtr metadata,
Expand Down Expand Up @@ -1705,7 +1775,12 @@ class TKqpGatewayProxy : public IKikimrGateway {
NKikimrSchemeOp::TColumnTableDescription* tableDesc = schemeTx.MutableCreateColumnTable();

tableDesc->SetName(pathPair.second);
FillColumnTableSchema(*tableDesc->MutableSchema(), *metadata);
if (!FillColumnTableSchema(*tableDesc->MutableSchema(), *metadata, code, error)) {
IKqpGateway::TGenericResult errResult;
errResult.AddIssue(NYql::TIssue(error));
errResult.SetStatus(NYql::YqlStatusFromYdbStatus(code));
return MakeFuture(std::move(errResult));
}

if (!FillCreateColumnTableDesc(metadata, *tableDesc, code, error)) {
IKqpGateway::TGenericResult errResult;
Expand Down Expand Up @@ -2016,7 +2091,22 @@ class TKqpGatewayProxy : public IKikimrGateway {

NKikimrSchemeOp::TColumnTableSchemaPreset* schemaPreset = storeDesc->AddSchemaPresets();
schemaPreset->SetName("default");
FillColumnTableSchema(*schemaPreset->MutableSchema(), settings);

if (!settings.ColumnFamilies.empty()) {
IKqpGateway::TGenericResult errResult;
errResult.AddIssue(NYql::TIssue("TableStore does not support column families"));
errResult.SetStatus(NYql::YqlStatusFromYdbStatus(Ydb::StatusIds::BAD_REQUEST));
return MakeFuture(std::move(errResult));
}

Ydb::StatusIds::StatusCode code;
TString error;
if (!FillColumnTableSchema(*schemaPreset->MutableSchema(), settings, code, error)) {
IKqpGateway::TGenericResult errResult;
errResult.AddIssue(NYql::TIssue(error));
errResult.SetStatus(NYql::YqlStatusFromYdbStatus(code));
return MakeFuture(std::move(errResult));
}

if (IsPrepare()) {
auto& phyQuery = *SessionCtx->Query().PreparingQuery->MutablePhysicalQuery();
Expand Down
17 changes: 14 additions & 3 deletions ydb/core/kqp/provider/yql_kikimr_exec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,8 @@ namespace {
return dropGroupSettings;
}

TCreateTableStoreSettings ParseCreateTableStoreSettings(TKiCreateTable create, const TTableSettings& settings) {
TCreateTableStoreSettings ParseCreateTableStoreSettings(
TKiCreateTable create, const TTableSettings& settings, const TVector<TColumnFamily>& columnFamilies) {
TCreateTableStoreSettings out;
out.TableStore = TString(create.Table());
out.ShardsCount = settings.MinPartitions ? *settings.MinPartitions : 0;
Expand Down Expand Up @@ -215,6 +216,13 @@ namespace {
columnMeta.NotNull = notNull;
}

if (columnTuple.Size() > 3) {
auto families = columnTuple.Item(3).Cast<TCoAtomList>();
for (auto family : families) {
columnMeta.Families.push_back(TString(family.Value()));
}
}

out.ColumnOrder.push_back(columnName);
out.Columns.insert(std::make_pair(columnName, columnMeta));
}
Expand All @@ -224,6 +232,7 @@ namespace {
out.Indexes.push_back(indexDesc);
}
#endif
out.ColumnFamilies = columnFamilies;
return out;
}

Expand Down Expand Up @@ -1250,8 +1259,8 @@ class TKiSinkCallableExecutionTransformer : public TAsyncCallbackTransformer<TKi
TStringBuilder() << "TABLESTORE with not COLUMN store"));
return SyncError();
}
future = Gateway->CreateTableStore(cluster,
ParseCreateTableStoreSettings(maybeCreate.Cast(), table.Metadata->TableSettings), existingOk);
future = Gateway->CreateTableStore(cluster, ParseCreateTableStoreSettings(maybeCreate.Cast(), table.Metadata->TableSettings,
table.Metadata->ColumnFamilies), existingOk);
break;
}
case ETableType::Table:
Expand Down Expand Up @@ -1569,6 +1578,8 @@ class TKiSinkCallableExecutionTransformer : public TAsyncCallbackTransformer<TKi
f->set_compression(Ydb::Table::ColumnFamily::COMPRESSION_NONE);
} else if (to_lower(comp) == "lz4") {
f->set_compression(Ydb::Table::ColumnFamily::COMPRESSION_LZ4);
} else if (to_lower(comp) == "zstd") {
f->set_compression(Ydb::Table::ColumnFamily::COMPRESSION_ZSTD);
} else {
auto errText = TStringBuilder() << "Unknown compression '" << comp
<< "' for a column family";
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/provider/yql_kikimr_gateway.h
Original file line number Diff line number Diff line change
Expand Up @@ -699,6 +699,7 @@ struct TCreateTableStoreSettings {
TVector<TString> KeyColumnNames;
TVector<TString> ColumnOrder;
TVector<TIndexDescription> Indexes;
TVector<TColumnFamily> ColumnFamilies;
};

struct TAlterTableStoreSettings {
Expand Down
Loading

0 comments on commit 45513a5

Please sign in to comment.