Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CS improvements #7013

Merged
merged 26 commits into from
Jul 28, 2024
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
14a86a0
fix signal value (#6314)
ivanmorozov333 Jul 5, 2024
d90075c
fix test in asan (nullptr possible in original data) (#6369)
ivanmorozov333 Jul 7, 2024
a566ec4
serialize special keys in separated thread (#6324)
ivanmorozov333 Jul 7, 2024
2b59a44
prevent overlimit for ui32 in genstep (#6365)
ivanmorozov333 Jul 7, 2024
856abe8
reduce logging (#6343)
ivanmorozov333 Jul 7, 2024
cf97903
fix huge blobs volume for start mode withno barrier moving (#6366)
ivanmorozov333 Jul 7, 2024
4fe73dd
corrections for gc tasks construction (#6376)
ivanmorozov333 Jul 7, 2024
f56e028
remove hard freeing from tablet (#6377)
ivanmorozov333 Jul 8, 2024
e3e040f
correct insert table cleaning (#6379)
ivanmorozov333 Jul 8, 2024
5015970
small transfer meta processing correction (#6386)
ivanmorozov333 Jul 8, 2024
bb0fd52
fix resharding tests implementation (#6388)
ivanmorozov333 Jul 8, 2024
eee37d2
remove portions constructor hard freeing from tablet (#6389)
ivanmorozov333 Jul 8, 2024
7c8988f
reduce logging volume (#6418)
ivanmorozov333 Jul 9, 2024
3db3754
fix disk writing limiter (#6452)
ivanmorozov333 Jul 9, 2024
5d5b545
conveyor logging and remove ownerId - internal processes only (#6442)
ivanmorozov333 Jul 9, 2024
40b8f3f
fix splitting to prevent empty batches (#6632)
ivanmorozov333 Jul 13, 2024
5ab12a0
fix throw flag usage (#6633)
ivanmorozov333 Jul 13, 2024
b373aff
fix resharding tests on race gc with sharing (#6654)
ivanmorozov333 Jul 14, 2024
f2e678c
Correct index construction (#6500)
ivanmorozov333 Jul 15, 2024
9727cbb
Limit requested memory (#6698)
ivanmorozov333 Jul 16, 2024
d96d33d
rename ExecuteOnProgress -> ProgressOnExecute (#6739)
ivanmorozov333 Jul 16, 2024
d8e0085
add test for GC usage on insert/delete (#6726)
ivanmorozov333 Jul 17, 2024
9be9dea
improve scan read for aggregations (use commands in step before group…
ivanmorozov333 Jul 17, 2024
1b8c280
fix index actualization (#6793)
ivanmorozov333 Jul 17, 2024
dd9a405
fix unused (#7046)
zverevgeny Jul 24, 2024
538df5b
mute KqpOlapWrite.WriteDeleteCleanGC
zverevgeny Jul 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .github/config/muted_ya.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ ydb/core/external_sources *
ydb/core/quoter/ut QuoterWithKesusTest.PrefetchCoefficient
ydb/core/keyvalue/ut_trace TKeyValueTracingTest.*
ydb/core/kqp/provider/ut KikimrIcGateway.TestLoadBasicSecretValueFromExternalDataSourceMetadata
ydb/core/kqp/ut/olap KqpOlapIndexes.IndexesActualization
ydb/core/kqp/ut/olap KqpOlapBlobsSharing.*
ydb/core/kqp/ut/olap KqpOlap.ScanQueryOltpAndOlap
ydb/core/kqp/ut/olap KqpOlapStatistics.StatsUsageWithTTL
Expand Down
39 changes: 24 additions & 15 deletions ydb/core/formats/arrow/arrow_helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -892,24 +892,33 @@ std::shared_ptr<arrow::RecordBatch> MergeColumns(const std::vector<std::shared_p
}

std::vector<std::shared_ptr<arrow::RecordBatch>> SliceToRecordBatches(const std::shared_ptr<arrow::Table>& t) {
std::set<ui32> splitPositions;
const ui32 numRows = t->num_rows();
for (auto&& i : t->columns()) {
ui32 pos = 0;
for (auto&& arr : i->chunks()) {
splitPositions.emplace(pos);
pos += arr->length();
if (!t->num_rows()) {
return {};
}
std::vector<ui32> positions;
{
for (auto&& i : t->columns()) {
ui32 pos = 0;
for (auto&& arr : i->chunks()) {
positions.emplace_back(pos);
pos += arr->length();
}
AFL_VERIFY(pos == t->num_rows());
}
AFL_VERIFY(pos == t->num_rows());
positions.emplace_back(t->num_rows());
}
std::sort(positions.begin(), positions.end());
positions.erase(std::unique(positions.begin(), positions.end()), positions.end());

std::vector<std::vector<std::shared_ptr<arrow::Array>>> slicedData;
slicedData.resize(splitPositions.size());
std::vector<ui32> positions(splitPositions.begin(), splitPositions.end());
for (auto&& i : t->columns()) {
for (ui32 idx = 0; idx < positions.size(); ++idx) {
auto slice = i->Slice(positions[idx], ((idx + 1 == positions.size()) ? numRows : positions[idx + 1]) - positions[idx]);
AFL_VERIFY(slice->num_chunks() == 1);
slicedData[idx].emplace_back(slice->chunks().front());
slicedData.resize(positions.size() - 1);
{
for (auto&& i : t->columns()) {
for (ui32 idx = 0; idx + 1 < positions.size(); ++idx) {
auto slice = i->Slice(positions[idx], positions[idx + 1] - positions[idx]);
AFL_VERIFY(slice->num_chunks() == 1);
slicedData[idx].emplace_back(slice->chunks().front());
}
}
}
std::vector<std::shared_ptr<arrow::RecordBatch>> result;
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/formats/arrow/program.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -946,18 +946,18 @@ arrow::Result<std::shared_ptr<NArrow::TColumnFilter>> TProgramStep::BuildFilter(
}

const std::set<ui32>& TProgramStep::GetFilterOriginalColumnIds() const {
AFL_VERIFY(IsFilterOnly());
// AFL_VERIFY(IsFilterOnly());
return FilterOriginalColumnIds;
}

std::set<std::string> TProgram::GetEarlyFilterColumns() const {
std::set<std::string> result;
for (ui32 i = 0; i < Steps.size(); ++i) {
auto stepFields = Steps[i]->GetColumnsInUsage(true);
result.insert(stepFields.begin(), stepFields.end());
if (!Steps[i]->IsFilterOnly()) {
break;
}
auto stepFields = Steps[i]->GetColumnsInUsage();
result.insert(stepFields.begin(), stepFields.end());
}
return result;
}
Expand Down
47 changes: 47 additions & 0 deletions ydb/core/formats/arrow/scalar/serialization.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
#include "serialization.h"
#include <ydb/core/formats/arrow/switch/switch_type.h>
#include <ydb/library/actors/core/log.h>

namespace NKikimr::NArrow::NScalar {

TConclusion<TString> TSerializer::SerializePayloadToString(const std::shared_ptr<arrow::Scalar>& scalar) {
TString resultString;
const bool resultFlag = NArrow::SwitchType(scalar->type->id(), [&](const auto& type) {
using TWrap = std::decay_t<decltype(type)>;
if constexpr (arrow::has_c_type<typename TWrap::T>()) {
using CType = typename TWrap::T::c_type;
using ScalarType = typename arrow::TypeTraits<typename TWrap::T>::ScalarType;
const ScalarType* scalarTyped = static_cast<const ScalarType*>(scalar.get());
resultString = TString(sizeof(CType), '\0');
memcpy(&resultString[0], scalarTyped->data(), sizeof(CType));
return true;
}
return false;
});
if (!resultFlag) {
return TConclusionStatus::Fail("incorrect scalar type for payload serialization: " + scalar->type->ToString());
}
return resultString;
}

TConclusion<std::shared_ptr<arrow::Scalar>> TSerializer::DeserializeFromStringWithPayload(const TString& data, const std::shared_ptr<arrow::DataType>& dataType) {
AFL_VERIFY(dataType);
std::shared_ptr<arrow::Scalar> result;
const bool resultFlag = NArrow::SwitchType(dataType->id(), [&](const auto& type) {
using TWrap = std::decay_t<decltype(type)>;
if constexpr (arrow::has_c_type<typename TWrap::T>()) {
using CType = typename TWrap::T::c_type;
AFL_VERIFY(data.size() == sizeof(CType));
using ScalarType = typename arrow::TypeTraits<typename TWrap::T>::ScalarType;
result = std::make_shared<ScalarType>(*(CType*)&data[0], dataType);
return true;
}
return false;
});
if (!resultFlag) {
return TConclusionStatus::Fail("incorrect scalar type for payload deserialization: " + dataType->ToString());
}
return result;
}

}
15 changes: 15 additions & 0 deletions ydb/core/formats/arrow/scalar/serialization.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#pragma once
#include <ydb/library/conclusion/result.h>

#include <contrib/libs/apache/arrow/cpp/src/arrow/scalar.h>
#include <contrib/libs/apache/arrow/cpp/src/arrow/type.h>

#include <util/generic/string.h>

namespace NKikimr::NArrow::NScalar {
class TSerializer {
public:
static TConclusion<TString> SerializePayloadToString(const std::shared_ptr<arrow::Scalar>& scalar);
static TConclusion<std::shared_ptr<arrow::Scalar>> DeserializeFromStringWithPayload(const TString& data, const std::shared_ptr<arrow::DataType>& dataType);
};
}
14 changes: 14 additions & 0 deletions ydb/core/formats/arrow/scalar/ya.make
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
LIBRARY()

PEERDIR(
contrib/libs/apache/arrow
ydb/library/conclusion
ydb/core/formats/arrow/switch
ydb/library/actors/core
)

SRCS(
serialization.cpp
)

END()
7 changes: 4 additions & 3 deletions ydb/core/formats/arrow/size_calcer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -242,11 +242,12 @@ ui64 GetArrayDataSize(const std::shared_ptr<arrow::Array>& column) {
}

NKikimr::NArrow::TSerializedBatch TSerializedBatch::Build(std::shared_ptr<arrow::RecordBatch> batch, const TBatchSplitttingContext& context) {
std::optional<TFirstLastSpecialKeys> specialKeys;
std::optional<TString> specialKeys;
if (context.GetFieldsForSpecialKeys().size()) {
specialKeys = TFirstLastSpecialKeys(batch, context.GetFieldsForSpecialKeys());
specialKeys = TFirstLastSpecialKeys(batch, context.GetFieldsForSpecialKeys()).SerializeToString();
}
return TSerializedBatch(NArrow::SerializeSchema(*batch->schema()), NArrow::SerializeBatchNoCompression(batch), batch->num_rows(), NArrow::GetBatchDataSize(batch), specialKeys);
return TSerializedBatch(NArrow::SerializeSchema(*batch->schema()), NArrow::SerializeBatchNoCompression(batch), batch->num_rows(),
NArrow::GetBatchDataSize(batch), specialKeys);
}

TConclusionStatus TSerializedBatch::BuildWithLimit(std::shared_ptr<arrow::RecordBatch> batch, const TBatchSplitttingContext& context, std::optional<TSerializedBatch>& sbL, std::optional<TSerializedBatch>& sbR) {
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/formats/arrow/size_calcer.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,13 @@ class TSerializedBatch {
YDB_READONLY_DEF(TString, Data);
YDB_READONLY(ui32, RowsCount, 0);
YDB_READONLY(ui32, RawBytes, 0);
std::optional<TFirstLastSpecialKeys> SpecialKeys;
std::optional<TString> SpecialKeys;
public:
size_t GetSize() const {
return Data.size();
}

const TFirstLastSpecialKeys& GetSpecialKeysSafe() const {
const TString& GetSpecialKeysSafe() const {
AFL_VERIFY(SpecialKeys);
return *SpecialKeys;
}
Expand All @@ -95,7 +95,7 @@ class TSerializedBatch {
static TConclusionStatus BuildWithLimit(std::shared_ptr<arrow::RecordBatch> batch, const TBatchSplitttingContext& context, std::optional<TSerializedBatch>& sbL, std::optional<TSerializedBatch>& sbR);
static TSerializedBatch Build(std::shared_ptr<arrow::RecordBatch> batch, const TBatchSplitttingContext& context);

TSerializedBatch(TString&& schemaData, TString&& data, const ui32 rowsCount, const ui32 rawBytes, const std::optional<TFirstLastSpecialKeys>& specialKeys)
TSerializedBatch(TString&& schemaData, TString&& data, const ui32 rowsCount, const ui32 rawBytes, const std::optional<TString>& specialKeys)
: SchemaData(schemaData)
, Data(data)
, RowsCount(rowsCount)
Expand Down
1 change: 1 addition & 0 deletions ydb/core/formats/arrow/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ PEERDIR(
ydb/core/formats/arrow/dictionary
ydb/core/formats/arrow/transformer
ydb/core/formats/arrow/reader
ydb/core/formats/arrow/scalar
ydb/core/formats/arrow/hash
ydb/library/actors/core
ydb/library/arrow_kernels
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "alter_sharding.h"
#include <ydb/library/actors/core/log.h>
#include <util/string/type.h>

namespace NKikimr::NKqp {
Expand Down Expand Up @@ -26,4 +27,8 @@ void TAlterShardingOperation::DoSerializeScheme(NKikimrSchemeOp::TModifyScheme&
scheme.MutableAlterColumnTable()->MutableReshardColumnTable()->SetIncrease(*Increase);
}

void TAlterShardingOperation::DoSerializeScheme(NKikimrSchemeOp::TAlterColumnTableSchema& /*scheme*/) const {
AFL_VERIFY(false);
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#include "abstract.h"
#include <ydb/core/tx/columnshard/engines/scheme/statistics/abstract/constructor.h>

namespace NKikimr::NKqp {

Expand All @@ -12,9 +11,7 @@ class TAlterShardingOperation: public ITableStoreOperation {
static inline const auto Registrator = TFactory::TRegistrator<TAlterShardingOperation>(GetTypeName());
private:
std::optional<bool> Increase;
virtual void DoSerializeScheme(NKikimrSchemeOp::TAlterColumnTableSchema& /*scheme*/) const override {
AFL_VERIFY(false);
}
virtual void DoSerializeScheme(NKikimrSchemeOp::TAlterColumnTableSchema& /*scheme*/) const override;
virtual void DoSerializeScheme(NKikimrSchemeOp::TModifyScheme& scheme, const bool isStandalone) const override;

public:
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,6 @@ TConclusionStatus TUpsertIndexOperation::DoDeserialize(NYql::TObjectSettingsImpl
}
IndexName = *fValue;
}
StorageId = features.Extract("STORAGE_ID");
if (StorageId && !*StorageId) {
return TConclusionStatus::Fail("STORAGE_ID cannot be empty string");
}
TString indexType;
{
auto fValue = features.Extract("TYPE");
Expand Down Expand Up @@ -46,9 +42,6 @@ TConclusionStatus TUpsertIndexOperation::DoDeserialize(NYql::TObjectSettingsImpl

void TUpsertIndexOperation::DoSerializeScheme(NKikimrSchemeOp::TAlterColumnTableSchema& schemaData) const {
auto* indexProto = schemaData.AddUpsertIndexes();
if (StorageId) {
indexProto->SetStorageId(*StorageId);
}
indexProto->SetName(IndexName);
IndexMetaConstructor.SerializeToProto(*indexProto);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ class TUpsertIndexOperation : public ITableStoreOperation {
static inline auto Registrator = TFactory::TRegistrator<TUpsertIndexOperation>(GetTypeName());
private:
TString IndexName;
std::optional<TString> StorageId;
NBackgroundTasks::TInterfaceProtoContainer<NOlap::NIndexes::IIndexMetaConstructor> IndexMetaConstructor;
public:
TConclusionStatus DoDeserialize(NYql::TObjectSettingsImpl::TFeaturesExtractor& features) override;
Expand Down

This file was deleted.

This file was deleted.

Loading
Loading