Skip to content

Commit

Permalink
Correct index construction (#6500)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Jul 15, 2024
1 parent 85d4495 commit ab051d7
Show file tree
Hide file tree
Showing 128 changed files with 1,435 additions and 2,106 deletions.
47 changes: 47 additions & 0 deletions ydb/core/formats/arrow/scalar/serialization.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
#include "serialization.h"
#include <ydb/core/formats/arrow/switch/switch_type.h>
#include <ydb/library/actors/core/log.h>

namespace NKikimr::NArrow::NScalar {

TConclusion<TString> TSerializer::SerializePayloadToString(const std::shared_ptr<arrow::Scalar>& scalar) {
TString resultString;
const bool resultFlag = NArrow::SwitchType(scalar->type->id(), [&](const auto& type) {
using TWrap = std::decay_t<decltype(type)>;
if constexpr (arrow::has_c_type<typename TWrap::T>()) {
using CType = typename TWrap::T::c_type;
using ScalarType = typename arrow::TypeTraits<typename TWrap::T>::ScalarType;
const ScalarType* scalarTyped = static_cast<const ScalarType*>(scalar.get());
resultString = TString(sizeof(CType), '\0');
memcpy(&resultString[0], scalarTyped->data(), sizeof(CType));
return true;
}
return false;
});
if (!resultFlag) {
return TConclusionStatus::Fail("incorrect scalar type for payload serialization: " + scalar->type->ToString());
}
return resultString;
}

TConclusion<std::shared_ptr<arrow::Scalar>> TSerializer::DeserializeFromStringWithPayload(const TString& data, const std::shared_ptr<arrow::DataType>& dataType) {
AFL_VERIFY(dataType);
std::shared_ptr<arrow::Scalar> result;
const bool resultFlag = NArrow::SwitchType(dataType->id(), [&](const auto& type) {
using TWrap = std::decay_t<decltype(type)>;
if constexpr (arrow::has_c_type<typename TWrap::T>()) {
using CType = typename TWrap::T::c_type;
AFL_VERIFY(data.size() == sizeof(CType));
using ScalarType = typename arrow::TypeTraits<typename TWrap::T>::ScalarType;
result = std::make_shared<ScalarType>(*(CType*)&data[0], dataType);
return true;
}
return false;
});
if (!resultFlag) {
return TConclusionStatus::Fail("incorrect scalar type for payload deserialization: " + dataType->ToString());
}
return result;
}

}
15 changes: 15 additions & 0 deletions ydb/core/formats/arrow/scalar/serialization.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#pragma once
#include <ydb/library/conclusion/result.h>

#include <contrib/libs/apache/arrow/cpp/src/arrow/scalar.h>
#include <contrib/libs/apache/arrow/cpp/src/arrow/type.h>

#include <util/generic/string.h>

namespace NKikimr::NArrow::NScalar {
class TSerializer {
public:
static TConclusion<TString> SerializePayloadToString(const std::shared_ptr<arrow::Scalar>& scalar);
static TConclusion<std::shared_ptr<arrow::Scalar>> DeserializeFromStringWithPayload(const TString& data, const std::shared_ptr<arrow::DataType>& dataType);
};
}
14 changes: 14 additions & 0 deletions ydb/core/formats/arrow/scalar/ya.make
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
LIBRARY()

PEERDIR(
contrib/libs/apache/arrow
ydb/library/conclusion
ydb/core/formats/arrow/switch
ydb/library/actors/core
)

SRCS(
serialization.cpp
)

END()
1 change: 1 addition & 0 deletions ydb/core/formats/arrow/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ PEERDIR(
ydb/core/formats/arrow/dictionary
ydb/core/formats/arrow/transformer
ydb/core/formats/arrow/reader
ydb/core/formats/arrow/scalar
ydb/core/formats/arrow/hash
ydb/library/actors/core
ydb/library/arrow_kernels
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "alter_sharding.h"
#include <ydb/library/actors/core/log.h>
#include <util/string/type.h>

namespace NKikimr::NKqp {
Expand Down Expand Up @@ -26,4 +27,8 @@ void TAlterShardingOperation::DoSerializeScheme(NKikimrSchemeOp::TModifyScheme&
scheme.MutableAlterColumnTable()->MutableReshardColumnTable()->SetIncrease(*Increase);
}

void TAlterShardingOperation::DoSerializeScheme(NKikimrSchemeOp::TAlterColumnTableSchema& /*scheme*/) const {
AFL_VERIFY(false);
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#include "abstract.h"
#include <ydb/core/tx/columnshard/engines/scheme/statistics/abstract/constructor.h>

namespace NKikimr::NKqp {

Expand All @@ -12,9 +11,7 @@ class TAlterShardingOperation: public ITableStoreOperation {
static inline const auto Registrator = TFactory::TRegistrator<TAlterShardingOperation>(GetTypeName());
private:
std::optional<bool> Increase;
virtual void DoSerializeScheme(NKikimrSchemeOp::TAlterColumnTableSchema& /*scheme*/) const override {
AFL_VERIFY(false);
}
virtual void DoSerializeScheme(NKikimrSchemeOp::TAlterColumnTableSchema& /*scheme*/) const override;
virtual void DoSerializeScheme(NKikimrSchemeOp::TModifyScheme& scheme, const bool isStandalone) const override;

public:
Expand Down
21 changes: 0 additions & 21 deletions ydb/core/kqp/gateway/behaviour/tablestore/operations/drop_stat.cpp

This file was deleted.

19 changes: 0 additions & 19 deletions ydb/core/kqp/gateway/behaviour/tablestore/operations/drop_stat.h

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,6 @@ TConclusionStatus TUpsertIndexOperation::DoDeserialize(NYql::TObjectSettingsImpl
}
IndexName = *fValue;
}
StorageId = features.Extract("STORAGE_ID");
if (StorageId && !*StorageId) {
return TConclusionStatus::Fail("STORAGE_ID cannot be empty string");
}
TString indexType;
{
auto fValue = features.Extract("TYPE");
Expand Down Expand Up @@ -46,9 +42,6 @@ TConclusionStatus TUpsertIndexOperation::DoDeserialize(NYql::TObjectSettingsImpl

void TUpsertIndexOperation::DoSerializeScheme(NKikimrSchemeOp::TAlterColumnTableSchema& schemaData) const {
auto* indexProto = schemaData.AddUpsertIndexes();
if (StorageId) {
indexProto->SetStorageId(*StorageId);
}
indexProto->SetName(IndexName);
IndexMetaConstructor.SerializeToProto(*indexProto);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ class TUpsertIndexOperation : public ITableStoreOperation {
static inline auto Registrator = TFactory::TRegistrator<TUpsertIndexOperation>(GetTypeName());
private:
TString IndexName;
std::optional<TString> StorageId;
NBackgroundTasks::TInterfaceProtoContainer<NOlap::NIndexes::IIndexMetaConstructor> IndexMetaConstructor;
public:
TConclusionStatus DoDeserialize(NYql::TObjectSettingsImpl::TFeaturesExtractor& features) override;
Expand Down

This file was deleted.

23 changes: 0 additions & 23 deletions ydb/core/kqp/gateway/behaviour/tablestore/operations/upsert_stat.h

This file was deleted.

3 changes: 0 additions & 3 deletions ydb/core/kqp/gateway/behaviour/tablestore/operations/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,13 @@ SRCS(
GLOBAL drop_column.cpp
GLOBAL upsert_index.cpp
GLOBAL drop_index.cpp
GLOBAL upsert_stat.cpp
GLOBAL drop_stat.cpp
GLOBAL upsert_opt.cpp
GLOBAL alter_sharding.cpp
)

PEERDIR(
ydb/services/metadata/manager
ydb/core/formats/arrow/serializer
ydb/core/tx/columnshard/engines/scheme/statistics/abstract
ydb/core/tx/columnshard/engines/storage/optimizer/abstract
ydb/core/kqp/gateway/utils
ydb/core/protos
Expand Down
8 changes: 4 additions & 4 deletions ydb/core/kqp/ut/olap/helpers/typed_local.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,16 +143,16 @@ void TTypedLocalHelper::FillPKOnly(const double pkKff /*= 0*/, const ui32 numRow
TBase::SendDataViaActorSystem(TablePath, batch);
}

void TTypedLocalHelper::GetStats(std::vector<NKikimrColumnShardStatisticsProto::TPortionStorage>& stats, const bool verbose /*= false*/) {
void TTypedLocalHelper::GetStats(std::vector<NJson::TJsonValue>& stats, const bool verbose /*= false*/) {
TString selectQuery = "SELECT * FROM `" + TablePath + "/.sys/primary_index_portion_stats` WHERE Activity = true";
auto tableClient = KikimrRunner.GetTableClient();
auto rows = ExecuteScanQuery(tableClient, selectQuery, verbose);
for (auto&& r : rows) {
for (auto&& c : r) {
if (c.first == "Stats") {
NKikimrColumnShardStatisticsProto::TPortionStorage store;
AFL_VERIFY(google::protobuf::TextFormat::ParseFromString(GetUtf8(c.second), &store));
stats.emplace_back(store);
NJson::TJsonValue jsonStore;
AFL_VERIFY(NJson::ReadJsonFastTree(GetUtf8(c.second), &jsonStore));
stats.emplace_back(jsonStore);
}
}
}
Expand Down
9 changes: 6 additions & 3 deletions ydb/core/kqp/ut/olap/helpers/typed_local.h
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
#pragma once
#include <ydb/core/testlib/cs_helper.h>
#include <ydb/core/kqp/ut/common/kqp_ut_common.h>
#include <ydb/public/sdk/cpp/client/ydb_types/status_codes.h>
#include <ydb/core/formats/arrow/simple_builder/array.h>
#include <ydb/core/formats/arrow/simple_builder/batch.h>
#include <ydb/core/formats/arrow/simple_builder/filler.h>

#include <ydb/public/sdk/cpp/client/ydb_types/status_codes.h>

#include <library/cpp/json/writer/json_value.h>

namespace NKikimr::NKqp {

class TTypedLocalHelper: public Tests::NCS::THelper {
Expand All @@ -19,7 +22,7 @@ class TTypedLocalHelper: public Tests::NCS::THelper {
protected:
virtual TString GetTestTableSchema() const override;
virtual std::vector<TString> GetShardingColumns() const override {
return {"pk_int"};
return { "pk_int" };
}
public:
TTypedLocalHelper(const TString& typeName, TKikimrRunner& kikimrRunner, const TString& tableName = "olapTable", const TString& storeName = "olapStore")
Expand Down Expand Up @@ -66,7 +69,7 @@ class TTypedLocalHelper: public Tests::NCS::THelper {

void GetVolumes(ui64& rawBytes, ui64& bytes, const bool verbose = false, const std::vector<TString> columnNames = {});

void GetStats(std::vector<NKikimrColumnShardStatisticsProto::TPortionStorage>& stats, const bool verbose = false);
void GetStats(std::vector<NJson::TJsonValue>& stats, const bool verbose = false);

void GetCount(ui64& count);

Expand Down
Loading

0 comments on commit ab051d7

Please sign in to comment.