Skip to content

Commit

Permalink
batch portions on write (#13182)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Jan 6, 2025
1 parent d591f19 commit 63b0998
Show file tree
Hide file tree
Showing 31 changed files with 862 additions and 209 deletions.
27 changes: 27 additions & 0 deletions ydb/core/formats/arrow/reader/position.h
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,33 @@ class TIntervalPositions {
public:
using const_iterator = std::vector<TIntervalPosition>::const_iterator;

void Merge(const TIntervalPositions& from) {
auto itSelf = Positions.begin();
auto itFrom = from.Positions.begin();
while (itSelf != Positions.end() && itFrom != from.Positions.end()) {
if (*itSelf < *itFrom) {
Positions.emplace_back(*itSelf);
++itSelf;
} else if (*itFrom < *itSelf) {
Positions.emplace_back(*itFrom);
++itFrom;
} else {
Positions.emplace_back(*itFrom);
++itSelf;
++itFrom;
}
}
if (itSelf == Positions.end()) {
Positions.insert(Positions.end(), itFrom, from.Positions.end());
} else {
Positions.insert(Positions.end(), itSelf, Positions.end());
}
}

ui32 GetPointsCount() const {
return Positions.size();
}

bool IsEmpty() const {
return Positions.empty();
}
Expand Down
69 changes: 61 additions & 8 deletions ydb/core/kqp/ut/olap/helpers/typed_local.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
#include "typed_local.h"
#include "query_executor.h"
#include "get_value.h"
#include "query_executor.h"
#include "typed_local.h"

#include <ydb/core/formats/arrow/arrow_helpers.h>
#include <ydb/core/grpc_services/base/base.h>
#include <ydb/core/grpc_services/local_rpc/local_rpc.h>

#include <ydb/public/api/protos/ydb_table.pb.h>

namespace NKikimr::NKqp {

Expand Down Expand Up @@ -37,8 +43,8 @@ TString TTypedLocalHelper::GetMultiColumnTestTableSchema(ui32 reps) const {
return result;
}

void TTypedLocalHelper::CreateMultiColumnOlapTableWithStore(ui32 reps, ui32 storeShardsCount, ui32 tableShardsCount) {
CreateSchemaOlapTablesWithStore(GetMultiColumnTestTableSchema(reps), {TableName}, "olapStore", storeShardsCount, tableShardsCount);
void TTypedLocalHelper::CreateMultiColumnOlapTableWithStore(ui32 reps, ui32 storeShardsCount, ui32 tableShardsCount) {
CreateSchemaOlapTablesWithStore(GetMultiColumnTestTableSchema(reps), { TableName }, "olapStore", storeShardsCount, tableShardsCount);
}

void TTypedLocalHelper::ExecuteSchemeQuery(const TString& alterQuery, const NYdb::EStatus expectedStatus /*= EStatus::SUCCESS*/) const {
Expand Down Expand Up @@ -69,7 +75,8 @@ void TTypedLocalHelper::PrintCount() {
}

NKikimr::NKqp::TTypedLocalHelper::TDistribution TTypedLocalHelper::GetDistribution(const bool verbose /*= false*/) {
const TString selectQuery = "PRAGMA Kikimr.OptUseFinalizeByKey='true';SELECT COUNT(*) as c, field FROM `" + TablePath + "` GROUP BY field ORDER BY field";
const TString selectQuery =
"PRAGMA Kikimr.OptUseFinalizeByKey='true';SELECT COUNT(*) as c, field FROM `" + TablePath + "` GROUP BY field ORDER BY field";

auto tableClient = KikimrRunner.GetTableClient();
auto rows = ExecuteScanQuery(tableClient, selectQuery, verbose);
Expand Down Expand Up @@ -101,7 +108,8 @@ NKikimr::NKqp::TTypedLocalHelper::TDistribution TTypedLocalHelper::GetDistributi
return TDistribution(count, *minCount, *maxCount, groups.size());
}

void TTypedLocalHelper::GetVolumes(ui64& rawBytes, ui64& bytes, const bool verbose /*= false*/, const std::vector<TString> columnNames /*= {}*/) {
void TTypedLocalHelper::GetVolumes(
ui64& rawBytes, ui64& bytes, const bool verbose /*= false*/, const std::vector<TString> columnNames /*= {}*/) {
TString selectQuery = "SELECT * FROM `" + TablePath + "/.sys/primary_index_stats` WHERE Activity == 1";
if (columnNames.size()) {
selectQuery += " AND EntityName IN ('" + JoinSeq("','", columnNames) + "')";
Expand Down Expand Up @@ -160,7 +168,9 @@ void TTypedLocalHelper::GetCount(ui64& count) {

void TTypedLocalHelper::FillPKOnly(const double pkKff /*= 0*/, const ui32 numRows /*= 800000*/) const {
std::vector<NArrow::NConstruction::IArrayBuilder::TPtr> builders;
builders.emplace_back(NArrow::NConstruction::TSimpleArrayConstructor<NArrow::NConstruction::TIntSeqFiller<arrow::Int64Type>>::BuildNotNullable("pk_int", numRows * pkKff));
builders.emplace_back(
NArrow::NConstruction::TSimpleArrayConstructor<NArrow::NConstruction::TIntSeqFiller<arrow::Int64Type>>::BuildNotNullable(
"pk_int", numRows * pkKff));
NArrow::NConstruction::TRecordBatchConstructor batchBuilder(builders);
std::shared_ptr<arrow::RecordBatch> batch = batchBuilder.BuildBatch(numRows);
TBase::SendDataViaActorSystem(TablePath, batch);
Expand All @@ -181,4 +191,47 @@ void TTypedLocalHelper::GetStats(std::vector<NJson::TJsonValue>& stats, const bo
}
}

}
void TTypedLocalHelper::TWritingGuard::SendDataViaActorSystem(TString testTable, std::shared_ptr<arrow::RecordBatch> batch,
const Ydb::StatusIds_StatusCode expectedStatus /*= = Ydb::StatusIds::SUCCESS*/) const {
auto* runtime = KikimrRunner.GetTestServer().GetRuntime();

UNIT_ASSERT(batch);
UNIT_ASSERT(batch->num_rows());
auto data = NArrow::SerializeBatchNoCompression(batch);
UNIT_ASSERT(!data.empty());
TString serializedSchema = NArrow::SerializeSchema(*batch->schema());
UNIT_ASSERT(serializedSchema);

Ydb::Table::BulkUpsertRequest request;
request.mutable_arrow_batch_settings()->set_schema(serializedSchema);
request.set_data(data);
request.set_table(testTable);

using TEvBulkUpsertRequest = NGRpcService::TGrpcRequestOperationCall<Ydb::Table::BulkUpsertRequest, Ydb::Table::BulkUpsertResponse>;
auto future = NRpcService::DoLocalRpc<TEvBulkUpsertRequest>(std::move(request), "", "", runtime->GetActorSystem(0));
Responses.fetch_add(1);
auto* responsesLocal = &Responses;
future.Subscribe([responsesLocal, expectedStatus](const NThreading::TFuture<Ydb::Table::BulkUpsertResponse> f) mutable {
responsesLocal->fetch_add(-1);
auto op = f.GetValueSync().operation();
if (op.status() != Ydb::StatusIds::SUCCESS) {
for (auto& issue : op.issues()) {
Cerr << issue.message() << " ";
}
Cerr << "\n";
}
UNIT_ASSERT_VALUES_EQUAL(op.status(), expectedStatus);
});
}

void TTypedLocalHelper::TWritingGuard::WaitWritings() {
auto* runtime = KikimrRunner.GetTestServer().GetRuntime();
TDispatchOptions options;
options.CustomFinalCondition = [&]() {
return Responses.load() == 0;
};

runtime->DispatchEvents(options);
}

} // namespace NKikimr::NKqp
38 changes: 38 additions & 0 deletions ydb/core/kqp/ut/olap/helpers/typed_local.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,44 @@ class TTypedLocalHelper: public Tests::NCS::THelper {
SetShardingMethod("HASH_FUNCTION_CONSISTENCY_64");
}

class TWritingGuard {
private:
TKikimrRunner& KikimrRunner;
const TString TablePath;
mutable std::atomic<size_t> Responses = 0;
void SendDataViaActorSystem(TString testTable, std::shared_ptr<arrow::RecordBatch> batch,
const Ydb::StatusIds_StatusCode expectedStatus = Ydb::StatusIds::SUCCESS) const;

void WaitWritings();

public:
TWritingGuard(TKikimrRunner& kikimrRunner, const TString& tablePath)
: KikimrRunner(kikimrRunner)
, TablePath(tablePath)
{
}

template <class TFiller>
void FillTable(const TFiller& fillPolicy, const double pkKff = 0, const ui32 numRows = 800000) const {
std::vector<NArrow::NConstruction::IArrayBuilder::TPtr> builders;
builders.emplace_back(
NArrow::NConstruction::TSimpleArrayConstructor<NArrow::NConstruction::TIntSeqFiller<arrow::Int64Type>>::BuildNotNullable(
"pk_int", numRows * pkKff));
builders.emplace_back(std::make_shared<NArrow::NConstruction::TSimpleArrayConstructor<TFiller>>("field", fillPolicy));
NArrow::NConstruction::TRecordBatchConstructor batchBuilder(builders);
std::shared_ptr<arrow::RecordBatch> batch = batchBuilder.BuildBatch(numRows);
SendDataViaActorSystem(TablePath, batch, Ydb::StatusIds::SUCCESS);
}

void Finalize() {
WaitWritings();
}
};

TWritingGuard StartWriting(const TString& tablePath) {
return TWritingGuard(KikimrRunner, tablePath);
}

void ExecuteSchemeQuery(const TString& alterQuery, const NYdb::EStatus expectedStatus = NYdb::EStatus::SUCCESS) const;

TString GetQueryResult(const TString& request) const;
Expand Down
34 changes: 34 additions & 0 deletions ydb/core/kqp/ut/olap/write_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,40 @@ Y_UNIT_TEST_SUITE(KqpOlapWrite) {
UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows[0].at("count")), 800000);
}

Y_UNIT_TEST(MultiWriteInTime) {
auto settings = TKikimrSettings().SetWithSampleTables(false);
settings.AppConfig.MutableColumnShardConfig()->SetWritingBufferDurationMs(15000);
TKikimrRunner kikimr(settings);
Tests::NCommon::TLoggerInit(kikimr).Initialize();
TTypedLocalHelper helper("Utf8", kikimr);
helper.CreateTestOlapTable();
auto writeGuard = helper.StartWriting("/Root/olapStore/olapTable");
writeGuard.FillTable(NArrow::NConstruction::TStringPoolFiller(1, 1, "aaa", 1), 0, 800000);
Sleep(TDuration::Seconds(1));
writeGuard.FillTable(NArrow::NConstruction::TStringPoolFiller(1, 1, "bbb", 1), 0.5, 800000);
Sleep(TDuration::Seconds(1));
writeGuard.FillTable(NArrow::NConstruction::TStringPoolFiller(1, 1, "ccc", 1), 0.75, 800000);
Sleep(TDuration::Seconds(1));
writeGuard.Finalize();

auto selectQuery = TString(R"(
SELECT
field, count(*) as count,
FROM `/Root/olapStore/olapTable`
GROUP BY field
ORDER BY field
)");

auto tableClient = kikimr.GetTableClient();
auto rows = ExecuteScanQuery(tableClient, selectQuery);
UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows[0].at("count")), 400000);
UNIT_ASSERT_VALUES_EQUAL(GetUtf8(rows[0].at("field")), "aaa");
UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows[1].at("count")), 200000);
UNIT_ASSERT_VALUES_EQUAL(GetUtf8(rows[1].at("field")), "bbb");
UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows[2].at("count")), 800000);
UNIT_ASSERT_VALUES_EQUAL(GetUtf8(rows[2].at("field")), "ccc");
}

Y_UNIT_TEST(WriteDeleteCleanGC) {
auto csController = NKikimr::NYDBTest::TControllers::RegisterCSControllerGuard<NKikimr::NOlap::TWaitCompactionController>();
csController->SetSmallSizeDetector(1000000);
Expand Down
1 change: 1 addition & 0 deletions ydb/core/protos/config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1789,6 +1789,7 @@ message TColumnShardConfig {
optional uint32 RestoreDataOnWriteTimeoutSeconds = 30;
optional bool UseSlicesFilter = 31 [default = true];
optional uint32 LimitForPortionsMetadataAsk = 32 [default = 1000];
optional uint64 WritingBufferVolumeMb = 33 [default = 32];
}

message TSchemeShardConfig {
Expand Down
Loading

0 comments on commit 63b0998

Please sign in to comment.