Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove table only for empty insert table #8822

Merged
merged 16 commits into from
Sep 8, 2024
89 changes: 88 additions & 1 deletion ydb/core/formats/arrow/modifier/schema.h
Original file line number Diff line number Diff line change
@@ -1,9 +1,95 @@
#pragma once
#include <ydb/library/accessor/accessor.h>
#include <ydb/library/actors/core/log.h>
#include <ydb/library/conclusion/status.h>

#include <contrib/libs/apache/arrow/cpp/src/arrow/type.h>
#include <util/generic/hash.h>

namespace NKikimr::NArrow {

class TSchemaLite {
private:
YDB_READONLY_DEF(std::vector<std::shared_ptr<arrow::Field>>, Fields);

public:
TSchemaLite() = default;
TSchemaLite(const std::shared_ptr<arrow::Schema>& schema) {
AFL_VERIFY(schema);
Fields = schema->fields();
}

const std::shared_ptr<arrow::Field>& field(const ui32 index) const {
return GetFieldByIndexVerified(index);
}

bool Equals(const TSchemaLite& schema, const bool withMetadata = false) const {
if (Fields.size() != schema.Fields.size()) {
return false;
}
for (ui32 i = 0; i < Fields.size(); ++i) {
if (!Fields[i]->Equals(schema.Fields[i], withMetadata)) {
return false;
}
}
return true;
}

const std::vector<std::shared_ptr<arrow::Field>>& fields() const {
return Fields;
}

int num_fields() const {
return Fields.size();
}

std::vector<std::string> field_names() const {
std::vector<std::string> result;
result.reserve(Fields.size());
for (auto&& f : Fields) {
result.emplace_back(f->name());
}
return result;
}

TString DebugString() const {
TStringBuilder sb;
sb << "[";
for (auto&& f : Fields) {
sb << f->ToString() << ";";
}
sb << "]";

return sb;
}

TString ToString() const {
return DebugString();
}

const std::shared_ptr<arrow::Field>& GetFieldByIndexVerified(const ui32 index) const {
AFL_VERIFY(index < Fields.size());
return Fields[index];
}

const std::shared_ptr<arrow::Field>& GetFieldByIndexOptional(const ui32 index) const {
if (index < Fields.size()) {
return Fields[index];
}
return Default<std::shared_ptr<arrow::Field>>();
}

TSchemaLite(std::vector<std::shared_ptr<arrow::Field>>&& fields)
: Fields(std::move(fields)) {
}

TSchemaLite(const std::vector<std::shared_ptr<arrow::Field>>& fields)
: Fields(fields) {
}
};

} // namespace NKikimr::NArrow

namespace NKikimr::NArrow::NModifier {
class TSchema {
private:
Expand All @@ -13,6 +99,7 @@ class TSchema {
bool Finished = false;

void Initialize(const std::vector<std::shared_ptr<arrow::Field>>& fields);

public:
TSchema() = default;
TSchema(const std::shared_ptr<TSchema>& schema);
Expand Down Expand Up @@ -75,4 +162,4 @@ class TSchema {
}
};
};
}
} // namespace NKikimr::NArrow::NModifier
71 changes: 47 additions & 24 deletions ydb/core/formats/arrow/process_columns.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#include "process_columns.h"

#include "common/adapter.h"
#include "modifier/schema.h"
#include "modifier/subset.h"

#include <util/string/join.h>
Expand All @@ -8,8 +10,8 @@ namespace NKikimr::NArrow {

namespace {
template <class TDataContainer, class TStringImpl>
std::shared_ptr<TDataContainer> ExtractColumnsValidateImpl(const std::shared_ptr<TDataContainer>& srcBatch,
const std::vector<TStringImpl>& columnNames) {
std::shared_ptr<TDataContainer> ExtractColumnsValidateImpl(
const std::shared_ptr<TDataContainer>& srcBatch, const std::vector<TStringImpl>& columnNames) {
std::vector<std::shared_ptr<arrow::Field>> fields;
fields.reserve(columnNames.size());
std::vector<std::shared_ptr<typename NAdapter::TDataBuilderPolicy<TDataContainer>::TColumn>> columns;
Expand All @@ -27,9 +29,9 @@ std::shared_ptr<TDataContainer> ExtractColumnsValidateImpl(const std::shared_ptr
return NAdapter::TDataBuilderPolicy<TDataContainer>::Build(std::move(fields), std::move(columns), srcBatch->num_rows());
}

template <class TDataContainer>
TConclusion<std::shared_ptr<TDataContainer>> AdaptColumnsImpl(const std::shared_ptr<TDataContainer>& srcBatch,
const std::shared_ptr<arrow::Schema>& dstSchema, TSchemaSubset* subset) {
template <class TDataContainer, class TSchemaImpl>
TConclusion<std::shared_ptr<TDataContainer>> AdaptColumnsImpl(
const std::shared_ptr<TDataContainer>& srcBatch, const std::shared_ptr<TSchemaImpl>& dstSchema, TSchemaSubset* subset) {
AFL_VERIFY(srcBatch);
AFL_VERIFY(dstSchema);
std::vector<std::shared_ptr<typename NAdapter::TDataBuilderPolicy<TDataContainer>::TColumn>> columns;
Expand All @@ -48,16 +50,16 @@ TConclusion<std::shared_ptr<TDataContainer>> AdaptColumnsImpl(const std::shared_
fields.emplace_back(field);
auto srcField = srcBatch->schema()->field(index);
if (field->Equals(srcField)) {
AFL_VERIFY(columns.back()->type()->Equals(field->type()))("event", "cannot_use_incoming_batch")("reason", "invalid_column_type")("column", field->name())
("column_type", field->type()->ToString())("incoming_type", columns.back()->type()->ToString());
AFL_VERIFY(columns.back()->type()->Equals(field->type()))("event", "cannot_use_incoming_batch")("reason", "invalid_column_type")(
"column", field->name())("column_type", field->type()->ToString())("incoming_type", columns.back()->type()->ToString());
} else {
AFL_ERROR(NKikimrServices::ARROW_HELPER)("event", "cannot_use_incoming_batch")("reason", "invalid_column_type")("column", field->name())
("column_type", field->ToString(true))("incoming_type", srcField->ToString(true));
AFL_ERROR(NKikimrServices::ARROW_HELPER)("event", "cannot_use_incoming_batch")("reason", "invalid_column_type")(
"column", field->name())("column_type", field->ToString(true))("incoming_type", srcField->ToString(true));
return TConclusionStatus::Fail("incompatible column types");
}
} else if (!subset) {
AFL_ERROR(NKikimrServices::ARROW_HELPER)("event", "not_found_column")("column", field->name())
("column_type", field->type()->ToString())("columns", JoinSeq(",", srcBatch->schema()->field_names()));
AFL_ERROR(NKikimrServices::ARROW_HELPER)("event", "not_found_column")("column", field->name())(
"column_type", field->type()->ToString())("columns", JoinSeq(",", srcBatch->schema()->field_names()));
return TConclusionStatus::Fail("not found column '" + field->name() + "'");
}
++idx;
Expand All @@ -76,7 +78,8 @@ std::shared_ptr<TDataContainer> ExtractImpl(const TColumnOperator::EExtractProbl
auto result = ExtractColumnsValidateImpl(incoming, columnNames);
switch (policy) {
case TColumnOperator::EExtractProblemsPolicy::Verify:
AFL_VERIFY((ui32)result->num_columns() == columnNames.size())("schema", incoming->schema()->ToString())("required", JoinSeq(",", columnNames));
AFL_VERIFY((ui32)result->num_columns() == columnNames.size())("schema", incoming->schema()->ToString())(
"required", JoinSeq(",", columnNames));
break;
case TColumnOperator::EExtractProblemsPolicy::Null:
if ((ui32)result->num_columns() != columnNames.size()) {
Expand All @@ -90,7 +93,8 @@ std::shared_ptr<TDataContainer> ExtractImpl(const TColumnOperator::EExtractProbl
}

template <class TDataContainer, class TStringType>
TConclusion<std::shared_ptr<TDataContainer>> ReorderImpl(const std::shared_ptr<TDataContainer>& incoming, const std::vector<TStringType>& columnNames) {
TConclusion<std::shared_ptr<TDataContainer>> ReorderImpl(
const std::shared_ptr<TDataContainer>& incoming, const std::vector<TStringType>& columnNames) {
AFL_VERIFY(!!incoming);
AFL_VERIFY(columnNames.size());
if ((ui32)incoming->num_columns() < columnNames.size()) {
Expand All @@ -107,46 +111,65 @@ TConclusion<std::shared_ptr<TDataContainer>> ReorderImpl(const std::shared_ptr<T
return result;
}

}
} // namespace

std::shared_ptr<arrow::RecordBatch> TColumnOperator::Extract(const std::shared_ptr<arrow::RecordBatch>& incoming, const std::vector<std::string>& columnNames) {
std::shared_ptr<arrow::RecordBatch> TColumnOperator::Extract(
const std::shared_ptr<arrow::RecordBatch>& incoming, const std::vector<std::string>& columnNames) {
return ExtractImpl(AbsentColumnPolicy, incoming, columnNames);
}

std::shared_ptr<arrow::Table> TColumnOperator::Extract(const std::shared_ptr<arrow::Table>& incoming, const std::vector<std::string>& columnNames) {
std::shared_ptr<arrow::Table> TColumnOperator::Extract(
const std::shared_ptr<arrow::Table>& incoming, const std::vector<std::string>& columnNames) {
return ExtractImpl(AbsentColumnPolicy, incoming, columnNames);
}

std::shared_ptr<arrow::RecordBatch> TColumnOperator::Extract(const std::shared_ptr<arrow::RecordBatch>& incoming, const std::vector<TString>& columnNames) {
std::shared_ptr<arrow::RecordBatch> TColumnOperator::Extract(
const std::shared_ptr<arrow::RecordBatch>& incoming, const std::vector<TString>& columnNames) {
return ExtractImpl(AbsentColumnPolicy, incoming, columnNames);
}

std::shared_ptr<arrow::Table> TColumnOperator::Extract(const std::shared_ptr<arrow::Table>& incoming, const std::vector<TString>& columnNames) {
return ExtractImpl(AbsentColumnPolicy, incoming, columnNames);
}

NKikimr::TConclusion<std::shared_ptr<arrow::RecordBatch>> TColumnOperator::Adapt(const std::shared_ptr<arrow::RecordBatch>& incoming, const std::shared_ptr<arrow::Schema>& dstSchema, TSchemaSubset* subset) {
NKikimr::TConclusion<std::shared_ptr<arrow::RecordBatch>> TColumnOperator::Adapt(
const std::shared_ptr<arrow::RecordBatch>& incoming, const std::shared_ptr<arrow::Schema>& dstSchema, TSchemaSubset* subset) {
return AdaptColumnsImpl(incoming, dstSchema, subset);
}

NKikimr::TConclusion<std::shared_ptr<arrow::Table>> TColumnOperator::Adapt(
const std::shared_ptr<arrow::Table>& incoming, const std::shared_ptr<arrow::Schema>& dstSchema, TSchemaSubset* subset) {
return AdaptColumnsImpl(incoming, dstSchema, subset);
}

NKikimr::TConclusion<std::shared_ptr<arrow::RecordBatch>> TColumnOperator::Adapt(
const std::shared_ptr<arrow::RecordBatch>& incoming, const std::shared_ptr<NArrow::TSchemaLite>& dstSchema, TSchemaSubset* subset) {
return AdaptColumnsImpl(incoming, dstSchema, subset);
}

NKikimr::TConclusion<std::shared_ptr<arrow::Table>> TColumnOperator::Adapt(const std::shared_ptr<arrow::Table>& incoming, const std::shared_ptr<arrow::Schema>& dstSchema, TSchemaSubset* subset) {
NKikimr::TConclusion<std::shared_ptr<arrow::Table>> TColumnOperator::Adapt(
const std::shared_ptr<arrow::Table>& incoming, const std::shared_ptr<NArrow::TSchemaLite>& dstSchema, TSchemaSubset* subset) {
return AdaptColumnsImpl(incoming, dstSchema, subset);
}

NKikimr::TConclusion<std::shared_ptr<arrow::RecordBatch>> TColumnOperator::Reorder(const std::shared_ptr<arrow::RecordBatch>& incoming, const std::vector<std::string>& columnNames) {
NKikimr::TConclusion<std::shared_ptr<arrow::RecordBatch>> TColumnOperator::Reorder(
const std::shared_ptr<arrow::RecordBatch>& incoming, const std::vector<std::string>& columnNames) {
return ReorderImpl(incoming, columnNames);
}

NKikimr::TConclusion<std::shared_ptr<arrow::Table>> TColumnOperator::Reorder(const std::shared_ptr<arrow::Table>& incoming, const std::vector<std::string>& columnNames) {
NKikimr::TConclusion<std::shared_ptr<arrow::Table>> TColumnOperator::Reorder(
const std::shared_ptr<arrow::Table>& incoming, const std::vector<std::string>& columnNames) {
return ReorderImpl(incoming, columnNames);
}

NKikimr::TConclusion<std::shared_ptr<arrow::RecordBatch>> TColumnOperator::Reorder(const std::shared_ptr<arrow::RecordBatch>& incoming, const std::vector<TString>& columnNames) {
NKikimr::TConclusion<std::shared_ptr<arrow::RecordBatch>> TColumnOperator::Reorder(
const std::shared_ptr<arrow::RecordBatch>& incoming, const std::vector<TString>& columnNames) {
return ReorderImpl(incoming, columnNames);
}

NKikimr::TConclusion<std::shared_ptr<arrow::Table>> TColumnOperator::Reorder(const std::shared_ptr<arrow::Table>& incoming, const std::vector<TString>& columnNames) {
NKikimr::TConclusion<std::shared_ptr<arrow::Table>> TColumnOperator::Reorder(
const std::shared_ptr<arrow::Table>& incoming, const std::vector<TString>& columnNames) {
return ReorderImpl(incoming, columnNames);
}

}
} // namespace NKikimr::NArrow
28 changes: 20 additions & 8 deletions ydb/core/formats/arrow/process_columns.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
namespace NKikimr::NArrow {

class TSchemaSubset;
class TSchemaLite;

class TColumnOperator {
public:
Expand All @@ -14,6 +15,7 @@ class TColumnOperator {
Verify,
Skip
};

private:
EExtractProblemsPolicy AbsentColumnPolicy = EExtractProblemsPolicy::Verify;

Expand All @@ -33,18 +35,28 @@ class TColumnOperator {
return *this;
}

std::shared_ptr<arrow::RecordBatch> Extract(const std::shared_ptr<arrow::RecordBatch>& incoming, const std::vector<std::string>& columnNames);
std::shared_ptr<arrow::RecordBatch> Extract(
const std::shared_ptr<arrow::RecordBatch>& incoming, const std::vector<std::string>& columnNames);
std::shared_ptr<arrow::Table> Extract(const std::shared_ptr<arrow::Table>& incoming, const std::vector<std::string>& columnNames);
std::shared_ptr<arrow::RecordBatch> Extract(const std::shared_ptr<arrow::RecordBatch>& incoming, const std::vector<TString>& columnNames);
std::shared_ptr<arrow::Table> Extract(const std::shared_ptr<arrow::Table>& incoming, const std::vector<TString>& columnNames);

TConclusion<std::shared_ptr<arrow::RecordBatch>> Adapt(const std::shared_ptr<arrow::RecordBatch>& incoming, const std::shared_ptr<arrow::Schema>& dstSchema, TSchemaSubset* subset = nullptr);
TConclusion<std::shared_ptr<arrow::Table>> Adapt(const std::shared_ptr<arrow::Table>& incoming, const std::shared_ptr<arrow::Schema>& dstSchema, TSchemaSubset* subset = nullptr);

TConclusion<std::shared_ptr<arrow::RecordBatch>> Reorder(const std::shared_ptr<arrow::RecordBatch>& incoming, const std::vector<std::string>& columnNames);
TConclusion<std::shared_ptr<arrow::Table>> Reorder(const std::shared_ptr<arrow::Table>& incoming, const std::vector<std::string>& columnNames);
TConclusion<std::shared_ptr<arrow::RecordBatch>> Reorder(const std::shared_ptr<arrow::RecordBatch>& incoming, const std::vector<TString>& columnNames);
TConclusion<std::shared_ptr<arrow::RecordBatch>> Adapt(
const std::shared_ptr<arrow::RecordBatch>& incoming, const std::shared_ptr<arrow::Schema>& dstSchema, TSchemaSubset* subset = nullptr);
TConclusion<std::shared_ptr<arrow::Table>> Adapt(
const std::shared_ptr<arrow::Table>& incoming, const std::shared_ptr<arrow::Schema>& dstSchema, TSchemaSubset* subset = nullptr);
TConclusion<std::shared_ptr<arrow::RecordBatch>> Adapt(const std::shared_ptr<arrow::RecordBatch>& incoming,
const std::shared_ptr<NArrow::TSchemaLite>& dstSchema, TSchemaSubset* subset = nullptr);
TConclusion<std::shared_ptr<arrow::Table>> Adapt(
const std::shared_ptr<arrow::Table>& incoming, const std::shared_ptr<NArrow::TSchemaLite>& dstSchema, TSchemaSubset* subset = nullptr);

TConclusion<std::shared_ptr<arrow::RecordBatch>> Reorder(
const std::shared_ptr<arrow::RecordBatch>& incoming, const std::vector<std::string>& columnNames);
TConclusion<std::shared_ptr<arrow::Table>> Reorder(
const std::shared_ptr<arrow::Table>& incoming, const std::vector<std::string>& columnNames);
TConclusion<std::shared_ptr<arrow::RecordBatch>> Reorder(
const std::shared_ptr<arrow::RecordBatch>& incoming, const std::vector<TString>& columnNames);
TConclusion<std::shared_ptr<arrow::Table>> Reorder(const std::shared_ptr<arrow::Table>& incoming, const std::vector<TString>& columnNames);
};

}
} // namespace NKikimr::NArrow
15 changes: 10 additions & 5 deletions ydb/core/tx/columnshard/columnshard__propose_transaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,10 @@ class TTxProposeTransaction: public NTabletFlatExecutor::TTransactionBase<TColum
auto internalOp = Self->GetProgressTxController().GetTxOperatorOptional(txId);
if (!internalOp) {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "removed tx operator");
return;
}
NActors::TLogContextGuard lGuardTx = NActors::TLogContextBuilder::Build()("int_op_tx", internalOp->GetTxInfo().DebugString());
NActors::TLogContextGuard lGuardTx =
NActors::TLogContextBuilder::Build()("int_op_tx", internalOp->GetTxInfo().DebugString())("int_this", (ui64)internalOp.get());
if (!internalOp->CheckTxInfoForReply(*TxInfo)) {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "deprecated tx operator");
return;
Expand Down Expand Up @@ -143,11 +145,14 @@ class TTxProposeTransaction: public NTabletFlatExecutor::TTransactionBase<TColum
return TTxController::TProposeResult(NKikimrTxColumnShard::EResultStatus::SCHEMA_ERROR, "No primary index for TTL");
}

auto schema = Self->TablesManager.GetPrimaryIndexSafe().GetVersionedIndex().GetLastSchema()->GetSchema();
auto ttlColumn = schema->GetFieldByName(columnName);
if (!ttlColumn) {
return TTxController::TProposeResult(NKikimrTxColumnShard::EResultStatus::SCHEMA_ERROR, "TTL tx wrong TTL column '" + columnName + "'");
auto schemaSnapshot = Self->TablesManager.GetPrimaryIndexSafe().GetVersionedIndex().GetLastSchema();
auto schema = schemaSnapshot->GetSchema();
auto index = schemaSnapshot->GetColumnIdOptional(columnName);
if (!index) {
return TTxController::TProposeResult(
NKikimrTxColumnShard::EResultStatus::SCHEMA_ERROR, "TTL tx wrong TTL column '" + columnName + "'");
}
auto ttlColumn = schemaSnapshot->GetFieldByColumnIdVerified(*index);

const TInstant now = TlsActivationContext ? AppData()->TimeProvider->Now() : TInstant::Now();
for (ui64 pathId : ttlBody.GetPathIds()) {
Expand Down
Loading
Loading