Skip to content

Commit

Permalink
Skip default columns on indexation (#6899)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Jul 20, 2024
1 parent 1c15550 commit 5e95651
Show file tree
Hide file tree
Showing 36 changed files with 424 additions and 173 deletions.
15 changes: 14 additions & 1 deletion ydb/core/formats/arrow/arrow_helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,7 @@ std::shared_ptr<arrow::Scalar> DefaultScalar(const std::shared_ptr<arrow::DataTy
}
return true;
});
Y_ABORT_UNLESS(out);
AFL_VERIFY(out)("type", type->ToString());
return out;
}

Expand Down Expand Up @@ -634,6 +634,19 @@ int ScalarCompare(const std::shared_ptr<arrow::Scalar>& x, const std::shared_ptr
return ScalarCompare(*x, *y);
}

int ScalarCompareNullable(const std::shared_ptr<arrow::Scalar>& x, const std::shared_ptr<arrow::Scalar>& y) {
if (!x && !!y) {
return -1;
}
if (!!x && !y) {
return 1;
}
if (!x && !y) {
return 0;
}
return ScalarCompare(*x, *y);
}

std::shared_ptr<arrow::RecordBatch> SortBatch(const std::shared_ptr<arrow::RecordBatch>& batch,
const std::shared_ptr<arrow::Schema>& sortingKey, const bool andUnique) {
auto sortPermutation = MakeSortPermutation(batch, sortingKey, andUnique);
Expand Down
1 change: 1 addition & 0 deletions ydb/core/formats/arrow/arrow_helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ std::shared_ptr<arrow::Scalar> GetScalar(const std::shared_ptr<arrow::Array>& ar
bool IsGoodScalar(const std::shared_ptr<arrow::Scalar>& x);
int ScalarCompare(const arrow::Scalar& x, const arrow::Scalar& y);
int ScalarCompare(const std::shared_ptr<arrow::Scalar>& x, const std::shared_ptr<arrow::Scalar>& y);
int ScalarCompareNullable(const std::shared_ptr<arrow::Scalar>& x, const std::shared_ptr<arrow::Scalar>& y);
std::partial_ordering ColumnsCompare(const std::vector<std::shared_ptr<arrow::Array>>& x, const ui32 xRow, const std::vector<std::shared_ptr<arrow::Array>>& y, const ui32 yRow);
bool ScalarLess(const std::shared_ptr<arrow::Scalar>& x, const std::shared_ptr<arrow::Scalar>& y);
bool ScalarLess(const arrow::Scalar& x, const arrow::Scalar& y);
Expand Down
51 changes: 51 additions & 0 deletions ydb/core/formats/arrow/modifier/subset.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
#include "subset.h"
#include <ydb/library/actors/core/log.h>

namespace NKikimr::NArrow {

TSchemaSubset::TSchemaSubset(const std::set<ui32>& fieldsIdx, const ui32 fieldsCount) {
AFL_VERIFY(fieldsIdx.size() <= fieldsCount);
AFL_VERIFY(fieldsIdx.size());
if (fieldsCount == fieldsIdx.size()) {
return;
}
Exclude = (fieldsCount - fieldsIdx.size()) < fieldsIdx.size();
if (!Exclude) {
FieldIdx = std::vector<ui32>(fieldsIdx.begin(), fieldsIdx.end());
} else {
auto it = fieldsIdx.begin();
for (ui32 i = 0; i < fieldsCount; ++i) {
if (it == fieldsIdx.end() || i < *it) {
FieldIdx.emplace_back(i);
} else if (*it == i) {
++it;
} else {
AFL_VERIFY(false);
}
}
}
}

NKikimrArrowSchema::TSchemaSubset TSchemaSubset::SerializeToProto() const {
NKikimrArrowSchema::TSchemaSubset result;
result.MutableList()->SetExclude(Exclude);
for (auto&& i : FieldIdx) {
result.MutableList()->AddFieldsIdx(i);
}
return result;
}

TConclusionStatus TSchemaSubset::DeserializeFromProto(const NKikimrArrowSchema::TSchemaSubset& proto) {
if (!proto.HasList()) {
return TConclusionStatus::Fail("no schema subset data");
}
Exclude = proto.GetList().GetExclude();
std::vector<ui32> fieldIdx;
for (auto&& i : proto.GetList().GetFieldsIdx()) {
fieldIdx.emplace_back(i);
}
std::swap(fieldIdx, FieldIdx);
return TConclusionStatus::Success();
}

}
48 changes: 48 additions & 0 deletions ydb/core/formats/arrow/modifier/subset.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
#pragma once
#include <ydb/core/formats/arrow/protos/fields.pb.h>
#include <ydb/library/conclusion/result.h>
#include <ydb/library/actors/core/log.h>

namespace NKikimr::NArrow {

class TSchemaSubset {
private:
std::vector<ui32> FieldIdx;
TString FieldBits;
bool Exclude = false;
public:
TSchemaSubset() = default;
TSchemaSubset(const std::set<ui32>& fieldsIdx, const ui32 fieldsCount);

template <class T>
std::vector<T> Apply(const std::vector<T>& fullSchema) const {
if (FieldIdx.empty()) {
return fullSchema;
}
std::vector<T> fields;
if (!Exclude) {
for (auto&& i : FieldIdx) {
AFL_VERIFY(i < fullSchema.size());
fields.emplace_back(fullSchema[i]);
}
} else {
auto it = FieldIdx.begin();
for (ui32 i = 0; i < fullSchema.size(); ++i) {
if (it == FieldIdx.end() || i < *it) {
AFL_VERIFY(i < fullSchema.size());
fields.emplace_back(fullSchema[i]);
} else if (i == *it) {
++it;
} else {
AFL_VERIFY(false);
}
}
}
return fields;
}

NKikimrArrowSchema::TSchemaSubset SerializeToProto() const;
[[nodiscard]] TConclusionStatus DeserializeFromProto(const NKikimrArrowSchema::TSchemaSubset& proto);
};

}
2 changes: 2 additions & 0 deletions ydb/core/formats/arrow/modifier/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ PEERDIR(
contrib/libs/apache/arrow
ydb/library/conclusion
ydb/core/formats/arrow/switch
ydb/core/formats/arrow/protos
ydb/library/actors/core
)

SRCS(
schema.cpp
subset.cpp
)

END()
29 changes: 20 additions & 9 deletions ydb/core/formats/arrow/process_columns.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "process_columns.h"
#include "common/adapter.h"
#include "modifier/subset.h"

#include <util/string/join.h>

Expand Down Expand Up @@ -28,16 +29,23 @@ std::shared_ptr<TDataContainer> ExtractColumnsValidateImpl(const std::shared_ptr

template <class TDataContainer>
TConclusion<std::shared_ptr<TDataContainer>> AdaptColumnsImpl(const std::shared_ptr<TDataContainer>& srcBatch,
const std::shared_ptr<arrow::Schema>& dstSchema) {
const std::shared_ptr<arrow::Schema>& dstSchema, TSchemaSubset* subset) {
AFL_VERIFY(srcBatch);
AFL_VERIFY(dstSchema);
std::vector<std::shared_ptr<typename NAdapter::TDataBuilderPolicy<TDataContainer>::TColumn>> columns;
columns.reserve(dstSchema->num_fields());

std::vector<std::shared_ptr<arrow::Field>> fields;
fields.reserve(dstSchema->num_fields());
std::set<ui32> fieldIdx;
ui32 idx = 0;
for (auto& field : dstSchema->fields()) {
const int index = srcBatch->schema()->GetFieldIndex(field->name());
if (index > -1) {
if (subset) {
fieldIdx.emplace(idx);
}
columns.push_back(srcBatch->column(index));
fields.emplace_back(field);
auto srcField = srcBatch->schema()->field(index);
if (field->Equals(srcField)) {
AFL_VERIFY(columns.back()->type()->Equals(field->type()))("event", "cannot_use_incoming_batch")("reason", "invalid_column_type")("column", field->name())
Expand All @@ -47,14 +55,17 @@ TConclusion<std::shared_ptr<TDataContainer>> AdaptColumnsImpl(const std::shared_
("column_type", field->ToString(true))("incoming_type", srcField->ToString(true));
return TConclusionStatus::Fail("incompatible column types");
}
} else {
} else if (!subset) {
AFL_ERROR(NKikimrServices::ARROW_HELPER)("event", "not_found_column")("column", field->name())
("column_type", field->type()->ToString())("columns", JoinSeq(",", srcBatch->schema()->field_names()));
return TConclusionStatus::Fail("not found column '" + field->name() + "'");
}
++idx;
}

return NAdapter::TDataBuilderPolicy<TDataContainer>::Build(dstSchema, std::move(columns), srcBatch->num_rows());
if (subset) {
*subset = TSchemaSubset(fieldIdx, dstSchema->num_fields());
}
return NAdapter::TDataBuilderPolicy<TDataContainer>::Build(std::make_shared<arrow::Schema>(fields), std::move(columns), srcBatch->num_rows());
}

template <class TDataContainer, class TStringType>
Expand Down Expand Up @@ -114,12 +125,12 @@ std::shared_ptr<arrow::Table> TColumnOperator::Extract(const std::shared_ptr<arr
return ExtractImpl(AbsentColumnPolicy, incoming, columnNames);
}

NKikimr::TConclusion<std::shared_ptr<arrow::RecordBatch>> TColumnOperator::Adapt(const std::shared_ptr<arrow::RecordBatch>& incoming, const std::shared_ptr<arrow::Schema>& dstSchema) {
return AdaptColumnsImpl(incoming, dstSchema);
NKikimr::TConclusion<std::shared_ptr<arrow::RecordBatch>> TColumnOperator::Adapt(const std::shared_ptr<arrow::RecordBatch>& incoming, const std::shared_ptr<arrow::Schema>& dstSchema, TSchemaSubset* subset) {
return AdaptColumnsImpl(incoming, dstSchema, subset);
}

NKikimr::TConclusion<std::shared_ptr<arrow::Table>> TColumnOperator::Adapt(const std::shared_ptr<arrow::Table>& incoming, const std::shared_ptr<arrow::Schema>& dstSchema) {
return AdaptColumnsImpl(incoming, dstSchema);
NKikimr::TConclusion<std::shared_ptr<arrow::Table>> TColumnOperator::Adapt(const std::shared_ptr<arrow::Table>& incoming, const std::shared_ptr<arrow::Schema>& dstSchema, TSchemaSubset* subset) {
return AdaptColumnsImpl(incoming, dstSchema, subset);
}

NKikimr::TConclusion<std::shared_ptr<arrow::RecordBatch>> TColumnOperator::Reorder(const std::shared_ptr<arrow::RecordBatch>& incoming, const std::vector<std::string>& columnNames) {
Expand Down
6 changes: 4 additions & 2 deletions ydb/core/formats/arrow/process_columns.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

namespace NKikimr::NArrow {

class TSchemaSubset;

class TColumnOperator {
public:
enum class EExtractProblemsPolicy {
Expand Down Expand Up @@ -36,8 +38,8 @@ class TColumnOperator {
std::shared_ptr<arrow::RecordBatch> Extract(const std::shared_ptr<arrow::RecordBatch>& incoming, const std::vector<TString>& columnNames);
std::shared_ptr<arrow::Table> Extract(const std::shared_ptr<arrow::Table>& incoming, const std::vector<TString>& columnNames);

TConclusion<std::shared_ptr<arrow::RecordBatch>> Adapt(const std::shared_ptr<arrow::RecordBatch>& incoming, const std::shared_ptr<arrow::Schema>& dstSchema);
TConclusion<std::shared_ptr<arrow::Table>> Adapt(const std::shared_ptr<arrow::Table>& incoming, const std::shared_ptr<arrow::Schema>& dstSchema);
TConclusion<std::shared_ptr<arrow::RecordBatch>> Adapt(const std::shared_ptr<arrow::RecordBatch>& incoming, const std::shared_ptr<arrow::Schema>& dstSchema, TSchemaSubset* subset = nullptr);
TConclusion<std::shared_ptr<arrow::Table>> Adapt(const std::shared_ptr<arrow::Table>& incoming, const std::shared_ptr<arrow::Schema>& dstSchema, TSchemaSubset* subset = nullptr);

TConclusion<std::shared_ptr<arrow::RecordBatch>> Reorder(const std::shared_ptr<arrow::RecordBatch>& incoming, const std::vector<std::string>& columnNames);
TConclusion<std::shared_ptr<arrow::Table>> Reorder(const std::shared_ptr<arrow::Table>& incoming, const std::vector<std::string>& columnNames);
Expand Down
15 changes: 15 additions & 0 deletions ydb/core/formats/arrow/protos/fields.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package NKikimrArrowSchema;

message TSchemaSubset {

message TFieldsList {
optional bool Exclude = 1;
repeated uint32 FieldsIdx = 2;
}

oneof Implementation {
TFieldsList List = 1;
string Bits = 2;

}
}
1 change: 1 addition & 0 deletions ydb/core/formats/arrow/protos/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ PROTO_LIBRARY()

SRCS(
ssa.proto
fields.proto
)

PEERDIR(
Expand Down
7 changes: 7 additions & 0 deletions ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3087,6 +3087,13 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
}

{
auto it = client.ExecuteQuery(R"(
UPSERT INTO `/Root/DataShard` (Col3) VALUES ('null');
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT(!it.IsSuccess());
}

{
auto it = client.StreamExecuteQuery(R"(
SELECT * FROM `/Root/DataShard` ORDER BY Col1;
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/protos/tx_columnshard.proto
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import "ydb/core/protos/long_tx_service.proto";
import "ydb/core/protos/statistics.proto";
import "ydb/core/protos/subdomains.proto";
import "ydb/core/protos/tx.proto";
import "ydb/core/formats/arrow/protos/fields.proto";

package NKikimrTxColumnShard;
option java_package = "ru.yandex.kikimr.proto";
Expand Down Expand Up @@ -88,6 +89,7 @@ message TLogicalMetadata {
optional uint64 DirtyWriteTimeSeconds = 5;
optional string SpecialKeysRawData = 6;
optional TEvWrite.EModificationType ModificationType = 7;
optional NKikimrArrowSchema.TSchemaSubset SchemaSubset = 8;
}

message TEvWriteResult {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@ bool TTxWrite::InsertOneBlob(TTransactionContext& txc, const NOlap::TWideSeriali

const auto& writeMeta = batch.GetAggregation().GetWriteMeta();
meta.SetModificationType(TEnumOperator<NEvWrite::EModificationType>::SerializeToProto(writeMeta.GetModificationType()));
*meta.MutableSchemaSubset() = batch.GetAggregation().GetSchemaSubset().SerializeToProto();
auto schemeVersion = batch.GetAggregation().GetSchemaVersion();
auto tableSchema = Self->TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetSchemaVerified(schemeVersion);

NOlap::TInsertedData insertData((ui64)writeId, writeMeta.GetTableId(), writeMeta.GetDedupId(), blobRange, meta, tableSchema->GetVersion(), batch->GetData());
NOlap::TInsertedData insertData((ui64)writeId, writeMeta.GetTableId(), writeMeta.GetDedupId(), blobRange,
meta, tableSchema->GetVersion(),
batch->GetData());
bool ok = Self->InsertTable->Insert(dbTable, std::move(insertData));
if (ok) {
Self->UpdateInsertTableCounters();
Expand Down
41 changes: 34 additions & 7 deletions ydb/core/tx/columnshard/engines/changes/general_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,23 @@ void TGeneralCompactColumnEngineChanges::BuildAppendedPortionsByFullBatches(
resultSchema->GetIndexInfo().GetReplaceKey(), resultDataSchema, false, IIndexInfo::GetSnapshotColumnNames());

THashSet<ui64> portionsInUsage;
std::set<ui32> columnIds;
for (auto&& i : portions) {
if (columnIds.size() != resultSchema->GetColumnsCount()) {
for (auto id : i.GetPortionInfo().GetColumnIds()) {
if (resultSchema->GetFieldIndex(id) > 0) {
columnIds.emplace(id);
}
}
}
AFL_VERIFY(portionsInUsage.emplace(i.GetPortionInfo().GetPortionId()).second);
}
AFL_VERIFY(columnIds.size() <= resultSchema->GetColumnsCount());

for (auto&& i : portions) {
auto dataSchema = i.GetPortionInfo().GetSchema(context.SchemaVersions);
auto batch = i.RestoreBatch(dataSchema, *resultSchema);
batch = resultSchema->NormalizeBatch(*dataSchema, batch).DetachResult();
batch = resultSchema->NormalizeBatch(*dataSchema, batch, columnIds).DetachResult();
IIndexInfo::NormalizeDeletionColumn(*batch);
auto filter = BuildPortionFilter(shardingActual, batch, i.GetPortionInfo(), portionsInUsage, resultSchema);
mergeStream.AddSource(batch, filter);
Expand Down Expand Up @@ -175,13 +184,32 @@ void TGeneralCompactColumnEngineChanges::BuildAppendedPortionsByChunks(
}

std::shared_ptr<TSerializationStats> stats = std::make_shared<TSerializationStats>();
for (auto&& i : SwitchedPortions) {
stats->Merge(i.GetSerializationStat(*resultSchema));
std::set<ui32> columnIds;
{
{
THashMap<ui64, ISnapshotSchema::TPtr> schemas;
for (auto& portion : SwitchedPortions) {
auto dataSchema = portion.GetSchema(context.SchemaVersions);
schemas.emplace(dataSchema->GetVersion(), dataSchema);
}
columnIds = ISnapshotSchema::GetColumnsWithDifferentDefaults(schemas, resultSchema);
}
for (auto&& i : SwitchedPortions) {
stats->Merge(i.GetSerializationStat(*resultSchema));
if (columnIds.size() != resultSchema->GetColumnsCount()) {
for (auto id : i.GetColumnIds()) {
if (resultSchema->HasColumnId(id)) {
columnIds.emplace(id);
}
}
}
}
AFL_VERIFY(columnIds.size() <= resultSchema->GetColumnsCount());
}

std::vector<std::map<ui32, std::vector<TColumnPortionResult>>> chunkGroups;
chunkGroups.resize(batchResults.size());
for (auto&& columnId : resultSchema->GetIndexInfo().GetColumnIds()) {
for (auto&& columnId : columnIds) {
NActors::TLogContextGuard logGuard(
NActors::TLogContextBuilder::Build()("field_name", resultSchema->GetIndexInfo().GetColumnName(columnId)));
auto columnInfo = stats->GetColumnInfo(columnId);
Expand All @@ -196,11 +224,10 @@ void TGeneralCompactColumnEngineChanges::BuildAppendedPortionsByChunks(
if (!p.ExtractColumnChunks(columnId, records, chunks)) {
if (!loader) {
loader = resultSchema->GetColumnLoaderVerified(columnId);
} else {
AFL_VERIFY(dataSchema->IsSpecialColumnId(columnId));
}
auto f = resultSchema->GetFieldByColumnIdVerified(columnId);
chunks.emplace_back(std::make_shared<NChunks::TDefaultChunkPreparation>(columnId, p.GetPortionInfo().GetRecordsCount(),
p.GetPortionInfo().GetColumnRawBytes({ columnId }), resultField, resultSchema->GetDefaultValueVerified(columnId),
resultField, resultSchema->GetExternalDefaultValueVerified(columnId),
resultSchema->GetColumnSaver(columnId)));
records = { nullptr };
}
Expand Down
Loading

0 comments on commit 5e95651

Please sign in to comment.