Skip to content

Commit

Permalink
Merge df9001f into 38a7ef2
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Jul 14, 2024
2 parents 38a7ef2 + df9001f commit 6dc0ff9
Show file tree
Hide file tree
Showing 121 changed files with 1,279 additions and 1,978 deletions.
47 changes: 47 additions & 0 deletions ydb/core/formats/arrow/scalar/serialization.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
#include "serialization.h"
#include <ydb/core/formats/arrow/switch/switch_type.h>
#include <ydb/library/actors/core/log.h>

namespace NKikimr::NArrow::NScalar {

TConclusion<TString> TSerializer::SerializePayloadToString(const std::shared_ptr<arrow::Scalar>& scalar) {
TString resultString;
const bool resultFlag = NArrow::SwitchType(scalar->type->id(), [&](const auto& type) {
using TWrap = std::decay_t<decltype(type)>;
if constexpr (arrow::has_c_type<typename TWrap::T>()) {
using CType = typename TWrap::T::c_type;
using ScalarType = typename arrow::TypeTraits<typename TWrap::T>::ScalarType;
const ScalarType* scalarTyped = static_cast<const ScalarType*>(scalar.get());
resultString = TString(sizeof(CType), '\0');
memcpy(&resultString[0], scalarTyped->data(), sizeof(CType));
return true;
}
return false;
});
if (!resultFlag) {
return TConclusionStatus::Fail("incorrect scalar type for payload serialization: " + scalar->type->ToString());
}
return resultString;
}

TConclusion<std::shared_ptr<arrow::Scalar>> TSerializer::DeserializeFromStringWithPayload(const TString& data, const std::shared_ptr<arrow::DataType>& dataType) {
AFL_VERIFY(dataType);
std::shared_ptr<arrow::Scalar> result;
const bool resultFlag = NArrow::SwitchType(dataType->id(), [&](const auto& type) {
using TWrap = std::decay_t<decltype(type)>;
if constexpr (arrow::has_c_type<typename TWrap::T>()) {
using CType = typename TWrap::T::c_type;
AFL_VERIFY(data.size() == sizeof(CType));
using ScalarType = typename arrow::TypeTraits<typename TWrap::T>::ScalarType;
result = std::make_shared<ScalarType>(*(CType*)&data[0], dataType);
return true;
}
return false;
});
if (!resultFlag) {
return TConclusionStatus::Fail("incorrect scalar type for payload deserialization: " + dataType->ToString());
}
return result;
}

}
15 changes: 15 additions & 0 deletions ydb/core/formats/arrow/scalar/serialization.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#pragma once
#include <ydb/library/conclusion/result.h>

#include <contrib/libs/apache/arrow/cpp/src/arrow/scalar.h>
#include <contrib/libs/apache/arrow/cpp/src/arrow/type.h>

#include <util/generic/string.h>

namespace NKikimr::NArrow::NScalar {
class TSerializer {
public:
static TConclusion<TString> SerializePayloadToString(const std::shared_ptr<arrow::Scalar>& scalar);
static TConclusion<std::shared_ptr<arrow::Scalar>> DeserializeFromStringWithPayload(const TString& data, const std::shared_ptr<arrow::DataType>& dataType);
};
}
14 changes: 14 additions & 0 deletions ydb/core/formats/arrow/scalar/ya.make
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
LIBRARY()

PEERDIR(
contrib/libs/apache/arrow
ydb/library/conclusion
ydb/core/formats/arrow/switch
ydb/library/actors/core
)

SRCS(
serialization.cpp
)

END()
1 change: 1 addition & 0 deletions ydb/core/formats/arrow/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ PEERDIR(
ydb/core/formats/arrow/dictionary
ydb/core/formats/arrow/transformer
ydb/core/formats/arrow/reader
ydb/core/formats/arrow/scalar
ydb/core/formats/arrow/hash
ydb/library/actors/core
ydb/library/arrow_kernels
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "alter_sharding.h"
#include <ydb/library/actors/core/log.h>
#include <util/string/type.h>

namespace NKikimr::NKqp {
Expand Down Expand Up @@ -26,4 +27,8 @@ void TAlterShardingOperation::DoSerializeScheme(NKikimrSchemeOp::TModifyScheme&
scheme.MutableAlterColumnTable()->MutableReshardColumnTable()->SetIncrease(*Increase);
}

void TAlterShardingOperation::DoSerializeScheme(NKikimrSchemeOp::TAlterColumnTableSchema& /*scheme*/) const {
AFL_VERIFY(false);
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#include "abstract.h"
#include <ydb/core/tx/columnshard/engines/scheme/statistics/abstract/constructor.h>

namespace NKikimr::NKqp {

Expand All @@ -12,9 +11,7 @@ class TAlterShardingOperation: public ITableStoreOperation {
static inline const auto Registrator = TFactory::TRegistrator<TAlterShardingOperation>(GetTypeName());
private:
std::optional<bool> Increase;
virtual void DoSerializeScheme(NKikimrSchemeOp::TAlterColumnTableSchema& /*scheme*/) const override {
AFL_VERIFY(false);
}
virtual void DoSerializeScheme(NKikimrSchemeOp::TAlterColumnTableSchema& /*scheme*/) const override;
virtual void DoSerializeScheme(NKikimrSchemeOp::TModifyScheme& scheme, const bool isStandalone) const override;

public:
Expand Down
21 changes: 0 additions & 21 deletions ydb/core/kqp/gateway/behaviour/tablestore/operations/drop_stat.cpp

This file was deleted.

19 changes: 0 additions & 19 deletions ydb/core/kqp/gateway/behaviour/tablestore/operations/drop_stat.h

This file was deleted.

This file was deleted.

23 changes: 0 additions & 23 deletions ydb/core/kqp/gateway/behaviour/tablestore/operations/upsert_stat.h

This file was deleted.

3 changes: 0 additions & 3 deletions ydb/core/kqp/gateway/behaviour/tablestore/operations/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,13 @@ SRCS(
GLOBAL drop_column.cpp
GLOBAL upsert_index.cpp
GLOBAL drop_index.cpp
GLOBAL upsert_stat.cpp
GLOBAL drop_stat.cpp
GLOBAL upsert_opt.cpp
GLOBAL alter_sharding.cpp
)

PEERDIR(
ydb/services/metadata/manager
ydb/core/formats/arrow/serializer
ydb/core/tx/columnshard/engines/scheme/statistics/abstract
ydb/core/tx/columnshard/engines/storage/optimizer/abstract
ydb/core/kqp/gateway/utils
ydb/core/protos
Expand Down
8 changes: 4 additions & 4 deletions ydb/core/kqp/ut/olap/helpers/typed_local.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,16 +143,16 @@ void TTypedLocalHelper::FillPKOnly(const double pkKff /*= 0*/, const ui32 numRow
TBase::SendDataViaActorSystem(TablePath, batch);
}

void TTypedLocalHelper::GetStats(std::vector<NKikimrColumnShardStatisticsProto::TPortionStorage>& stats, const bool verbose /*= false*/) {
void TTypedLocalHelper::GetStats(std::vector<NJson::TJsonValue>& stats, const bool verbose /*= false*/) {
TString selectQuery = "SELECT * FROM `" + TablePath + "/.sys/primary_index_portion_stats` WHERE Activity = true";
auto tableClient = KikimrRunner.GetTableClient();
auto rows = ExecuteScanQuery(tableClient, selectQuery, verbose);
for (auto&& r : rows) {
for (auto&& c : r) {
if (c.first == "Stats") {
NKikimrColumnShardStatisticsProto::TPortionStorage store;
AFL_VERIFY(google::protobuf::TextFormat::ParseFromString(GetUtf8(c.second), &store));
stats.emplace_back(store);
NJson::TJsonValue jsonStore;
AFL_VERIFY(NJson::ReadJsonFastTree(GetUtf8(c.second), &jsonStore));
stats.emplace_back(jsonStore);
}
}
}
Expand Down
9 changes: 6 additions & 3 deletions ydb/core/kqp/ut/olap/helpers/typed_local.h
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
#pragma once
#include <ydb/core/testlib/cs_helper.h>
#include <ydb/core/kqp/ut/common/kqp_ut_common.h>
#include <ydb/public/sdk/cpp/client/ydb_types/status_codes.h>
#include <ydb/core/formats/arrow/simple_builder/array.h>
#include <ydb/core/formats/arrow/simple_builder/batch.h>
#include <ydb/core/formats/arrow/simple_builder/filler.h>

#include <ydb/public/sdk/cpp/client/ydb_types/status_codes.h>

#include <library/cpp/json/writer/json_value.h>

namespace NKikimr::NKqp {

class TTypedLocalHelper: public Tests::NCS::THelper {
Expand All @@ -19,7 +22,7 @@ class TTypedLocalHelper: public Tests::NCS::THelper {
protected:
virtual TString GetTestTableSchema() const override;
virtual std::vector<TString> GetShardingColumns() const override {
return {"pk_int"};
return { "pk_int" };
}
public:
TTypedLocalHelper(const TString& typeName, TKikimrRunner& kikimrRunner, const TString& tableName = "olapTable", const TString& storeName = "olapStore")
Expand Down Expand Up @@ -66,7 +69,7 @@ class TTypedLocalHelper: public Tests::NCS::THelper {

void GetVolumes(ui64& rawBytes, ui64& bytes, const bool verbose = false, const std::vector<TString> columnNames = {});

void GetStats(std::vector<NKikimrColumnShardStatisticsProto::TPortionStorage>& stats, const bool verbose = false);
void GetStats(std::vector<NJson::TJsonValue>& stats, const bool verbose = false);

void GetCount(ui64& count);

Expand Down
8 changes: 4 additions & 4 deletions ydb/core/kqp/ut/olap/indexes_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -281,11 +281,11 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) {

UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
TString result = StreamResultToYson(it);
Cout << result << Endl;
Cout << csController->GetIndexesSkippingOnSelect().Val() << " / " << csController->GetIndexesApprovedOnSelect().Val() << Endl;
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("result", result);
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("skip", csController->GetIndexesSkippingOnSelect().Val())("check", csController->GetIndexesApprovedOnSelect().Val());
CompareYson(result, R"([[0u;]])");
AFL_VERIFY(csController->GetIndexesSkippedNoData().Val() == 0);
AFL_VERIFY(csController->GetIndexesApprovedOnSelect().Val() < csController->GetIndexesSkippingOnSelect().Val() * 0.3);
AFL_VERIFY(csController->GetIndexesApprovedOnSelect().Val() < csController->GetIndexesSkippingOnSelect().Val());
}
ui32 requestsCount = 100;
for (ui32 i = 0; i < requestsCount; ++i) {
Expand All @@ -310,7 +310,7 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) {
CompareYson(result, R"([[1u;]])");
}

AFL_VERIFY(csController->GetIndexesApprovedOnSelect().Val() < 0.20 * csController->GetIndexesSkippingOnSelect().Val())
AFL_VERIFY(csController->GetIndexesApprovedOnSelect().Val() < csController->GetIndexesSkippingOnSelect().Val())
("approved", csController->GetIndexesApprovedOnSelect().Val())("skipped", csController->GetIndexesSkippingOnSelect().Val());

}
Expand Down
12 changes: 6 additions & 6 deletions ydb/core/kqp/ut/olap/statistics_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,25 @@ Y_UNIT_TEST_SUITE(KqpOlapStatistics) {
helper.CreateTestOlapTable();
auto tableClient = kikimr.GetTableClient();
{
auto alterQuery = TStringBuilder() << "ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_STAT, TYPE=max, NAME=max_pk_int, FEATURES=`{\"column_name\": \"pk_int\"}`);";
auto alterQuery = TStringBuilder() << R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=max_pk_int, TYPE=MAX, FEATURES=`{\"column_name\": \"pk_int\"}`))";
auto session = tableClient.CreateSession().GetValueSync().GetSession();
auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString());
}
{
auto alterQuery = TStringBuilder() << "ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_STAT, TYPE=max, NAME=max_field, FEATURES=`{\"column_name\": \"field\"}`);";
auto alterQuery = TStringBuilder() << "ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=max_pk_int, TYPE=MAX, FEATURES=`{\"column_name\": \"field\"}`);";
auto session = tableClient.CreateSession().GetValueSync().GetSession();
auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
UNIT_ASSERT_VALUES_UNEQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString());
}
{
auto alterQuery = TStringBuilder() << "ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_STAT, TYPE=max, NAME=max_pk_int, FEATURES=`{\"column_name\": \"pk_int\"}`);";
auto alterQuery = TStringBuilder() << "ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=max_pk_int, TYPE=MAX, FEATURES=`{\"column_name\": \"pk_int\"}`);";
auto session = tableClient.CreateSession().GetValueSync().GetSession();
auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
UNIT_ASSERT_VALUES_UNEQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString());
}
{
auto alterQuery = TStringBuilder() << "ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=DROP_STAT, NAME=max_pk_int);";
auto alterQuery = TStringBuilder() << "ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=DROP_INDEX, NAME=max_pk_int);";
auto session = tableClient.CreateSession().GetValueSync().GetSession();
auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString());
Expand All @@ -50,7 +50,7 @@ Y_UNIT_TEST_SUITE(KqpOlapStatistics) {
helper.CreateTestOlapTable();
auto tableClient = kikimr.GetTableClient();
{
auto alterQuery = TStringBuilder() << "ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_STAT, TYPE=max, NAME=max_ts, FEATURES=`{\"column_name\": \"ts\"}`);";
auto alterQuery = TStringBuilder() << "ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, TYPE=MAX, NAME=max_ts, FEATURES=`{\"column_name\": \"ts\"}`);";
auto session = tableClient.CreateSession().GetValueSync().GetSession();
auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString());
Expand All @@ -62,7 +62,7 @@ Y_UNIT_TEST_SUITE(KqpOlapStatistics) {
UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString());
}
{
auto alterQuery = TStringBuilder() << "ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=DROP_STAT, NAME=max_ts);";
auto alterQuery = TStringBuilder() << "ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=DROP_INDEX, NAME=max_ts);";
auto session = tableClient.CreateSession().GetValueSync().GetSession();
auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
UNIT_ASSERT_VALUES_UNEQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString());
Expand Down
Loading

0 comments on commit 6dc0ff9

Please sign in to comment.