Skip to content

Commit

Permalink
Merge 0a1d586 into de9b991
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Sep 7, 2024
2 parents de9b991 + 0a1d586 commit 74ccc13
Show file tree
Hide file tree
Showing 39 changed files with 685 additions and 397 deletions.
89 changes: 88 additions & 1 deletion ydb/core/formats/arrow/modifier/schema.h
Original file line number Diff line number Diff line change
@@ -1,9 +1,95 @@
#pragma once
#include <ydb/library/accessor/accessor.h>
#include <ydb/library/actors/core/log.h>
#include <ydb/library/conclusion/status.h>

#include <contrib/libs/apache/arrow/cpp/src/arrow/type.h>
#include <util/generic/hash.h>

namespace NKikimr::NArrow {

class TSchemaLite {
private:
YDB_READONLY_DEF(std::vector<std::shared_ptr<arrow::Field>>, Fields);

public:
TSchemaLite() = default;
TSchemaLite(const std::shared_ptr<arrow::Schema>& schema) {
AFL_VERIFY(schema);
Fields = schema->fields();
}

const std::shared_ptr<arrow::Field>& field(const ui32 index) const {
return GetFieldByIndexVerified(index);
}

bool Equals(const TSchemaLite& schema, const bool withMetadata = false) const {
if (Fields.size() != schema.Fields.size()) {
return false;
}
for (ui32 i = 0; i < Fields.size(); ++i) {
if (!Fields[i]->Equals(schema.Fields[i], withMetadata)) {
return false;
}
}
return true;
}

const std::vector<std::shared_ptr<arrow::Field>>& fields() const {
return Fields;
}

int num_fields() const {
return Fields.size();
}

std::vector<std::string> field_names() const {
std::vector<std::string> result;
result.reserve(Fields.size());
for (auto&& f : Fields) {
result.emplace_back(f->name());
}
return result;
}

TString DebugString() const {
TStringBuilder sb;
sb << "[";
for (auto&& f : Fields) {
sb << f->ToString() << ";";
}
sb << "]";

return sb;
}

TString ToString() const {
return DebugString();
}

const std::shared_ptr<arrow::Field>& GetFieldByIndexVerified(const ui32 index) const {
AFL_VERIFY(index < Fields.size());
return Fields[index];
}

const std::shared_ptr<arrow::Field>& GetFieldByIndexOptional(const ui32 index) const {
if (index < Fields.size()) {
return Fields[index];
}
return Default<std::shared_ptr<arrow::Field>>();
}

TSchemaLite(std::vector<std::shared_ptr<arrow::Field>>&& fields)
: Fields(std::move(fields)) {
}

TSchemaLite(const std::vector<std::shared_ptr<arrow::Field>>& fields)
: Fields(fields) {
}
};

} // namespace NKikimr::NArrow

namespace NKikimr::NArrow::NModifier {
class TSchema {
private:
Expand All @@ -13,6 +99,7 @@ class TSchema {
bool Finished = false;

void Initialize(const std::vector<std::shared_ptr<arrow::Field>>& fields);

public:
TSchema() = default;
TSchema(const std::shared_ptr<TSchema>& schema);
Expand Down Expand Up @@ -75,4 +162,4 @@ class TSchema {
}
};
};
}
} // namespace NKikimr::NArrow::NModifier
71 changes: 47 additions & 24 deletions ydb/core/formats/arrow/process_columns.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#include "process_columns.h"

#include "common/adapter.h"
#include "modifier/schema.h"
#include "modifier/subset.h"

#include <util/string/join.h>
Expand All @@ -8,8 +10,8 @@ namespace NKikimr::NArrow {

namespace {
template <class TDataContainer, class TStringImpl>
std::shared_ptr<TDataContainer> ExtractColumnsValidateImpl(const std::shared_ptr<TDataContainer>& srcBatch,
const std::vector<TStringImpl>& columnNames) {
std::shared_ptr<TDataContainer> ExtractColumnsValidateImpl(
const std::shared_ptr<TDataContainer>& srcBatch, const std::vector<TStringImpl>& columnNames) {
std::vector<std::shared_ptr<arrow::Field>> fields;
fields.reserve(columnNames.size());
std::vector<std::shared_ptr<typename NAdapter::TDataBuilderPolicy<TDataContainer>::TColumn>> columns;
Expand All @@ -27,9 +29,9 @@ std::shared_ptr<TDataContainer> ExtractColumnsValidateImpl(const std::shared_ptr
return NAdapter::TDataBuilderPolicy<TDataContainer>::Build(std::move(fields), std::move(columns), srcBatch->num_rows());
}

template <class TDataContainer>
TConclusion<std::shared_ptr<TDataContainer>> AdaptColumnsImpl(const std::shared_ptr<TDataContainer>& srcBatch,
const std::shared_ptr<arrow::Schema>& dstSchema, TSchemaSubset* subset) {
template <class TDataContainer, class TSchemaImpl>
TConclusion<std::shared_ptr<TDataContainer>> AdaptColumnsImpl(
const std::shared_ptr<TDataContainer>& srcBatch, const std::shared_ptr<TSchemaImpl>& dstSchema, TSchemaSubset* subset) {
AFL_VERIFY(srcBatch);
AFL_VERIFY(dstSchema);
std::vector<std::shared_ptr<typename NAdapter::TDataBuilderPolicy<TDataContainer>::TColumn>> columns;
Expand All @@ -48,16 +50,16 @@ TConclusion<std::shared_ptr<TDataContainer>> AdaptColumnsImpl(const std::shared_
fields.emplace_back(field);
auto srcField = srcBatch->schema()->field(index);
if (field->Equals(srcField)) {
AFL_VERIFY(columns.back()->type()->Equals(field->type()))("event", "cannot_use_incoming_batch")("reason", "invalid_column_type")("column", field->name())
("column_type", field->type()->ToString())("incoming_type", columns.back()->type()->ToString());
AFL_VERIFY(columns.back()->type()->Equals(field->type()))("event", "cannot_use_incoming_batch")("reason", "invalid_column_type")(
"column", field->name())("column_type", field->type()->ToString())("incoming_type", columns.back()->type()->ToString());
} else {
AFL_ERROR(NKikimrServices::ARROW_HELPER)("event", "cannot_use_incoming_batch")("reason", "invalid_column_type")("column", field->name())
("column_type", field->ToString(true))("incoming_type", srcField->ToString(true));
AFL_ERROR(NKikimrServices::ARROW_HELPER)("event", "cannot_use_incoming_batch")("reason", "invalid_column_type")(
"column", field->name())("column_type", field->ToString(true))("incoming_type", srcField->ToString(true));
return TConclusionStatus::Fail("incompatible column types");
}
} else if (!subset) {
AFL_ERROR(NKikimrServices::ARROW_HELPER)("event", "not_found_column")("column", field->name())
("column_type", field->type()->ToString())("columns", JoinSeq(",", srcBatch->schema()->field_names()));
AFL_ERROR(NKikimrServices::ARROW_HELPER)("event", "not_found_column")("column", field->name())(
"column_type", field->type()->ToString())("columns", JoinSeq(",", srcBatch->schema()->field_names()));
return TConclusionStatus::Fail("not found column '" + field->name() + "'");
}
++idx;
Expand All @@ -76,7 +78,8 @@ std::shared_ptr<TDataContainer> ExtractImpl(const TColumnOperator::EExtractProbl
auto result = ExtractColumnsValidateImpl(incoming, columnNames);
switch (policy) {
case TColumnOperator::EExtractProblemsPolicy::Verify:
AFL_VERIFY((ui32)result->num_columns() == columnNames.size())("schema", incoming->schema()->ToString())("required", JoinSeq(",", columnNames));
AFL_VERIFY((ui32)result->num_columns() == columnNames.size())("schema", incoming->schema()->ToString())(
"required", JoinSeq(",", columnNames));
break;
case TColumnOperator::EExtractProblemsPolicy::Null:
if ((ui32)result->num_columns() != columnNames.size()) {
Expand All @@ -90,7 +93,8 @@ std::shared_ptr<TDataContainer> ExtractImpl(const TColumnOperator::EExtractProbl
}

template <class TDataContainer, class TStringType>
TConclusion<std::shared_ptr<TDataContainer>> ReorderImpl(const std::shared_ptr<TDataContainer>& incoming, const std::vector<TStringType>& columnNames) {
TConclusion<std::shared_ptr<TDataContainer>> ReorderImpl(
const std::shared_ptr<TDataContainer>& incoming, const std::vector<TStringType>& columnNames) {
AFL_VERIFY(!!incoming);
AFL_VERIFY(columnNames.size());
if ((ui32)incoming->num_columns() < columnNames.size()) {
Expand All @@ -107,46 +111,65 @@ TConclusion<std::shared_ptr<TDataContainer>> ReorderImpl(const std::shared_ptr<T
return result;
}

}
} // namespace

std::shared_ptr<arrow::RecordBatch> TColumnOperator::Extract(const std::shared_ptr<arrow::RecordBatch>& incoming, const std::vector<std::string>& columnNames) {
std::shared_ptr<arrow::RecordBatch> TColumnOperator::Extract(
const std::shared_ptr<arrow::RecordBatch>& incoming, const std::vector<std::string>& columnNames) {
return ExtractImpl(AbsentColumnPolicy, incoming, columnNames);
}

std::shared_ptr<arrow::Table> TColumnOperator::Extract(const std::shared_ptr<arrow::Table>& incoming, const std::vector<std::string>& columnNames) {
std::shared_ptr<arrow::Table> TColumnOperator::Extract(
const std::shared_ptr<arrow::Table>& incoming, const std::vector<std::string>& columnNames) {
return ExtractImpl(AbsentColumnPolicy, incoming, columnNames);
}

std::shared_ptr<arrow::RecordBatch> TColumnOperator::Extract(const std::shared_ptr<arrow::RecordBatch>& incoming, const std::vector<TString>& columnNames) {
std::shared_ptr<arrow::RecordBatch> TColumnOperator::Extract(
const std::shared_ptr<arrow::RecordBatch>& incoming, const std::vector<TString>& columnNames) {
return ExtractImpl(AbsentColumnPolicy, incoming, columnNames);
}

std::shared_ptr<arrow::Table> TColumnOperator::Extract(const std::shared_ptr<arrow::Table>& incoming, const std::vector<TString>& columnNames) {
return ExtractImpl(AbsentColumnPolicy, incoming, columnNames);
}

NKikimr::TConclusion<std::shared_ptr<arrow::RecordBatch>> TColumnOperator::Adapt(const std::shared_ptr<arrow::RecordBatch>& incoming, const std::shared_ptr<arrow::Schema>& dstSchema, TSchemaSubset* subset) {
NKikimr::TConclusion<std::shared_ptr<arrow::RecordBatch>> TColumnOperator::Adapt(
const std::shared_ptr<arrow::RecordBatch>& incoming, const std::shared_ptr<arrow::Schema>& dstSchema, TSchemaSubset* subset) {
return AdaptColumnsImpl(incoming, dstSchema, subset);
}

NKikimr::TConclusion<std::shared_ptr<arrow::Table>> TColumnOperator::Adapt(
const std::shared_ptr<arrow::Table>& incoming, const std::shared_ptr<arrow::Schema>& dstSchema, TSchemaSubset* subset) {
return AdaptColumnsImpl(incoming, dstSchema, subset);
}

NKikimr::TConclusion<std::shared_ptr<arrow::RecordBatch>> TColumnOperator::Adapt(
const std::shared_ptr<arrow::RecordBatch>& incoming, const std::shared_ptr<NArrow::TSchemaLite>& dstSchema, TSchemaSubset* subset) {
return AdaptColumnsImpl(incoming, dstSchema, subset);
}

NKikimr::TConclusion<std::shared_ptr<arrow::Table>> TColumnOperator::Adapt(const std::shared_ptr<arrow::Table>& incoming, const std::shared_ptr<arrow::Schema>& dstSchema, TSchemaSubset* subset) {
NKikimr::TConclusion<std::shared_ptr<arrow::Table>> TColumnOperator::Adapt(
const std::shared_ptr<arrow::Table>& incoming, const std::shared_ptr<NArrow::TSchemaLite>& dstSchema, TSchemaSubset* subset) {
return AdaptColumnsImpl(incoming, dstSchema, subset);
}

NKikimr::TConclusion<std::shared_ptr<arrow::RecordBatch>> TColumnOperator::Reorder(const std::shared_ptr<arrow::RecordBatch>& incoming, const std::vector<std::string>& columnNames) {
NKikimr::TConclusion<std::shared_ptr<arrow::RecordBatch>> TColumnOperator::Reorder(
const std::shared_ptr<arrow::RecordBatch>& incoming, const std::vector<std::string>& columnNames) {
return ReorderImpl(incoming, columnNames);
}

NKikimr::TConclusion<std::shared_ptr<arrow::Table>> TColumnOperator::Reorder(const std::shared_ptr<arrow::Table>& incoming, const std::vector<std::string>& columnNames) {
NKikimr::TConclusion<std::shared_ptr<arrow::Table>> TColumnOperator::Reorder(
const std::shared_ptr<arrow::Table>& incoming, const std::vector<std::string>& columnNames) {
return ReorderImpl(incoming, columnNames);
}

NKikimr::TConclusion<std::shared_ptr<arrow::RecordBatch>> TColumnOperator::Reorder(const std::shared_ptr<arrow::RecordBatch>& incoming, const std::vector<TString>& columnNames) {
NKikimr::TConclusion<std::shared_ptr<arrow::RecordBatch>> TColumnOperator::Reorder(
const std::shared_ptr<arrow::RecordBatch>& incoming, const std::vector<TString>& columnNames) {
return ReorderImpl(incoming, columnNames);
}

NKikimr::TConclusion<std::shared_ptr<arrow::Table>> TColumnOperator::Reorder(const std::shared_ptr<arrow::Table>& incoming, const std::vector<TString>& columnNames) {
NKikimr::TConclusion<std::shared_ptr<arrow::Table>> TColumnOperator::Reorder(
const std::shared_ptr<arrow::Table>& incoming, const std::vector<TString>& columnNames) {
return ReorderImpl(incoming, columnNames);
}

}
} // namespace NKikimr::NArrow
28 changes: 20 additions & 8 deletions ydb/core/formats/arrow/process_columns.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
namespace NKikimr::NArrow {

class TSchemaSubset;
class TSchemaLite;

class TColumnOperator {
public:
Expand All @@ -14,6 +15,7 @@ class TColumnOperator {
Verify,
Skip
};

private:
EExtractProblemsPolicy AbsentColumnPolicy = EExtractProblemsPolicy::Verify;

Expand All @@ -33,18 +35,28 @@ class TColumnOperator {
return *this;
}

std::shared_ptr<arrow::RecordBatch> Extract(const std::shared_ptr<arrow::RecordBatch>& incoming, const std::vector<std::string>& columnNames);
std::shared_ptr<arrow::RecordBatch> Extract(
const std::shared_ptr<arrow::RecordBatch>& incoming, const std::vector<std::string>& columnNames);
std::shared_ptr<arrow::Table> Extract(const std::shared_ptr<arrow::Table>& incoming, const std::vector<std::string>& columnNames);
std::shared_ptr<arrow::RecordBatch> Extract(const std::shared_ptr<arrow::RecordBatch>& incoming, const std::vector<TString>& columnNames);
std::shared_ptr<arrow::Table> Extract(const std::shared_ptr<arrow::Table>& incoming, const std::vector<TString>& columnNames);

TConclusion<std::shared_ptr<arrow::RecordBatch>> Adapt(const std::shared_ptr<arrow::RecordBatch>& incoming, const std::shared_ptr<arrow::Schema>& dstSchema, TSchemaSubset* subset = nullptr);
TConclusion<std::shared_ptr<arrow::Table>> Adapt(const std::shared_ptr<arrow::Table>& incoming, const std::shared_ptr<arrow::Schema>& dstSchema, TSchemaSubset* subset = nullptr);

TConclusion<std::shared_ptr<arrow::RecordBatch>> Reorder(const std::shared_ptr<arrow::RecordBatch>& incoming, const std::vector<std::string>& columnNames);
TConclusion<std::shared_ptr<arrow::Table>> Reorder(const std::shared_ptr<arrow::Table>& incoming, const std::vector<std::string>& columnNames);
TConclusion<std::shared_ptr<arrow::RecordBatch>> Reorder(const std::shared_ptr<arrow::RecordBatch>& incoming, const std::vector<TString>& columnNames);
TConclusion<std::shared_ptr<arrow::RecordBatch>> Adapt(
const std::shared_ptr<arrow::RecordBatch>& incoming, const std::shared_ptr<arrow::Schema>& dstSchema, TSchemaSubset* subset = nullptr);
TConclusion<std::shared_ptr<arrow::Table>> Adapt(
const std::shared_ptr<arrow::Table>& incoming, const std::shared_ptr<arrow::Schema>& dstSchema, TSchemaSubset* subset = nullptr);
TConclusion<std::shared_ptr<arrow::RecordBatch>> Adapt(const std::shared_ptr<arrow::RecordBatch>& incoming,
const std::shared_ptr<NArrow::TSchemaLite>& dstSchema, TSchemaSubset* subset = nullptr);
TConclusion<std::shared_ptr<arrow::Table>> Adapt(
const std::shared_ptr<arrow::Table>& incoming, const std::shared_ptr<NArrow::TSchemaLite>& dstSchema, TSchemaSubset* subset = nullptr);

TConclusion<std::shared_ptr<arrow::RecordBatch>> Reorder(
const std::shared_ptr<arrow::RecordBatch>& incoming, const std::vector<std::string>& columnNames);
TConclusion<std::shared_ptr<arrow::Table>> Reorder(
const std::shared_ptr<arrow::Table>& incoming, const std::vector<std::string>& columnNames);
TConclusion<std::shared_ptr<arrow::RecordBatch>> Reorder(
const std::shared_ptr<arrow::RecordBatch>& incoming, const std::vector<TString>& columnNames);
TConclusion<std::shared_ptr<arrow::Table>> Reorder(const std::shared_ptr<arrow::Table>& incoming, const std::vector<TString>& columnNames);
};

}
} // namespace NKikimr::NArrow
15 changes: 10 additions & 5 deletions ydb/core/tx/columnshard/columnshard__propose_transaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,10 @@ class TTxProposeTransaction: public NTabletFlatExecutor::TTransactionBase<TColum
auto internalOp = Self->GetProgressTxController().GetTxOperatorOptional(txId);
if (!internalOp) {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "removed tx operator");
return;
}
NActors::TLogContextGuard lGuardTx = NActors::TLogContextBuilder::Build()("int_op_tx", internalOp->GetTxInfo().DebugString());
NActors::TLogContextGuard lGuardTx =
NActors::TLogContextBuilder::Build()("int_op_tx", internalOp->GetTxInfo().DebugString())("int_this", (ui64)internalOp.get());
if (!internalOp->CheckTxInfoForReply(*TxInfo)) {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "deprecated tx operator");
return;
Expand Down Expand Up @@ -143,11 +145,14 @@ class TTxProposeTransaction: public NTabletFlatExecutor::TTransactionBase<TColum
return TTxController::TProposeResult(NKikimrTxColumnShard::EResultStatus::SCHEMA_ERROR, "No primary index for TTL");
}

auto schema = Self->TablesManager.GetPrimaryIndexSafe().GetVersionedIndex().GetLastSchema()->GetSchema();
auto ttlColumn = schema->GetFieldByName(columnName);
if (!ttlColumn) {
return TTxController::TProposeResult(NKikimrTxColumnShard::EResultStatus::SCHEMA_ERROR, "TTL tx wrong TTL column '" + columnName + "'");
auto schemaSnapshot = Self->TablesManager.GetPrimaryIndexSafe().GetVersionedIndex().GetLastSchema();
auto schema = schemaSnapshot->GetSchema();
auto index = schemaSnapshot->GetColumnIdOptional(columnName);
if (!index) {
return TTxController::TProposeResult(
NKikimrTxColumnShard::EResultStatus::SCHEMA_ERROR, "TTL tx wrong TTL column '" + columnName + "'");
}
auto ttlColumn = schemaSnapshot->GetFieldByColumnIdVerified(*index);

const TInstant now = TlsActivationContext ? AppData()->TimeProvider->Now() : TInstant::Now();
for (ui64 pathId : ttlBody.GetPathIds()) {
Expand Down
Loading

0 comments on commit 74ccc13

Please sign in to comment.