Skip to content

Commit

Permalink
Merge branch 'write-buffer' of https://github.com/ivanmorozov333/ydb
Browse files Browse the repository at this point in the history
…into write-buffer
  • Loading branch information
ivanmorozov333 committed Dec 25, 2023
2 parents a831473 + 1b20c81 commit b5aa131
Show file tree
Hide file tree
Showing 27 changed files with 192 additions and 90 deletions.
4 changes: 4 additions & 0 deletions ydb/core/tx/columnshard/blobs_action/abstract/ya.make
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
LIBRARY()

OWNER(
g:kikimr
)

SRCS(
gc.cpp
common.cpp
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/tx/columnshard/blobs_action/bs/ya.make
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
LIBRARY()

OWNER(
g:kikimr
)

SRCS(
gc.cpp
gc_actor.cpp
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/tx/columnshard/blobs_action/counters/ya.make
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
LIBRARY()

OWNER(
g:kikimr
)

SRCS(
read.cpp
storage.cpp
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/tx/columnshard/blobs_action/tier/ya.make
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
LIBRARY()

OWNER(
g:kikimr
)

SRCS(
adapter.cpp
gc.cpp
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/tx/columnshard/blobs_action/transaction/ya.make
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
LIBRARY()

OWNER(
g:kikimr
)

SRCS(
tx_draft.cpp
tx_write.cpp
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/tx/columnshard/blobs_action/ya.make
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
LIBRARY()

OWNER(
g:kikimr
)

SRCS(
blob_manager_db.cpp
memory.cpp
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/tx/columnshard/engines/changes/abstract/ya.make
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
LIBRARY()

OWNER(
g:kikimr
)

SRCS(
abstract.cpp
compaction_info.cpp
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/tx/columnshard/engines/changes/compaction/ya.make
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
LIBRARY()

OWNER(
g:kikimr
)

SRCS(
merge_context.cpp
column_cursor.cpp
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/tx/columnshard/engines/changes/counters/ya.make
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
LIBRARY()

OWNER(
g:kikimr
)

SRCS(
general.cpp
)
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/tx/columnshard/engines/changes/ya.make
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
LIBRARY()

OWNER(
g:kikimr
)

SRCS(
compaction.cpp
ttl.cpp
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/tx/columnshard/engines/insert_table/ya.make
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
LIBRARY()

OWNER(
g:kikimr
)

SRCS(
insert_table.cpp
rt_insertion.cpp
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/tx/columnshard/engines/portions/ya.make
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
LIBRARY()

OWNER(
g:kikimr
)

SRCS(
portion_info.cpp
column_record.cpp
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/tx/columnshard/engines/predicate/ya.make
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
LIBRARY()

OWNER(
g:kikimr
)

SRCS(
container.cpp
range.cpp
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/tx/columnshard/engines/reader/plain_reader/ya.make
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
LIBRARY()

OWNER(
g:kikimr
)

SRCS(
scanner.cpp
source.cpp
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/tx/columnshard/engines/reader/ya.make
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
LIBRARY()

OWNER(
g:kikimr
)

SRCS(
conveyor_task.cpp
description.cpp
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/tx/columnshard/engines/scheme/ya.make
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
LIBRARY()

OWNER(
g:kikimr
)

SRCS(
abstract_scheme.cpp
snapshot_scheme.cpp
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
LIBRARY()

OWNER(
g:kikimr
)

SRCS(
optimizer.cpp
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
LIBRARY()

OWNER(
g:kikimr
)

SRCS(
optimizer.cpp
blob_size.cpp
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
LIBRARY()

OWNER(
g:kikimr
)

SRCS(
optimizer.cpp
counters.cpp
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
LIBRARY()

OWNER(
g:kikimr
)

SRCS(
optimizer.cpp
counters.cpp
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/tx/columnshard/engines/storage/optimizer/ut/ya.make
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
UNITTEST_FOR(ydb/core/tx/columnshard/engines/storage/optimizer)

OWNER(
g:kikimr
)

SIZE(SMALL)

PEERDIR(
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/tx/columnshard/engines/storage/optimizer/ya.make
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
LIBRARY()

OWNER(
g:kikimr
)

PEERDIR(
ydb/core/tx/columnshard/engines/storage/optimizer/abstract
ydb/core/tx/columnshard/engines/storage/optimizer/intervals
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/tx/columnshard/engines/storage/ya.make
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
LIBRARY()

OWNER(
g:kikimr
)

SRCS(
granule.cpp
storage.cpp
Expand Down
5 changes: 5 additions & 0 deletions ydb/core/tx/columnshard/engines/ut/ya.make
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
UNITTEST_FOR(ydb/core/tx/columnshard/engines)

OWNER(
chertus
g:kikimr
)

FORK_SUBTESTS()

SPLIT_FACTOR(60)
Expand Down
102 changes: 51 additions & 51 deletions ydb/core/tx/columnshard/engines/writer/buffer/actor.cpp
Original file line number Diff line number Diff line change
@@ -1,51 +1,51 @@
#include "actor.h"
#include <ydb/core/tx/columnshard/columnshard_impl.h>

namespace NKikimr::NColumnShard::NWriting {

TActor::TActor(ui64 tabletId, const TActorId& parent)
: TabletId(tabletId)
, ParentActorId(parent)
{

}

void TActor::Flush() {
if (Aggregations.size()) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "flush_writing")("size", SumSize)("count", Aggregations.size());
auto action = Aggregations.front()->GetWriteData()->GetBlobsAction();
auto writeController = std::make_shared<NOlap::TIndexedWriteController>(ParentActorId, action, std::move(Aggregations));
if (action->NeedDraftTransaction()) {
TActorContext::AsActorContext().Send(ParentActorId, std::make_unique<NColumnShard::TEvPrivate::TEvWriteDraft>(writeController));
} else {
TActorContext::AsActorContext().Register(NColumnShard::CreateWriteActor(TabletId, writeController, TInstant::Max()));
}
Aggregations.clear();
SumSize = 0;
} else {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "skip_flush_writing");
}
}

void TActor::Handle(TEvFlushBuffer::TPtr& /*ev*/) {
FlushDuration = TDuration::MilliSeconds(AppDataVerified().ColumnShardConfig.GetWritingBufferDurationMs());
Flush();
if (!FlushDuration) {
Schedule(TDuration::MilliSeconds(500), new TEvFlushBuffer);
} else {
Schedule(FlushDuration, new TEvFlushBuffer);
}
}

void TActor::Handle(TEvAddInsertedDataToBuffer::TPtr& ev) {
auto* evBase = ev->Get();
AFL_VERIFY(evBase->GetWriteData()->GetBlobsAction()->GetStorageId() == NOlap::IStoragesManager::DefaultStorageId);
SumSize += evBase->GetWriteData()->GetSize();
Aggregations.emplace_back(std::make_shared<NOlap::TWriteAggregation>(evBase->GetWriteData(), std::move(evBase->MutableBlobsToWrite())));
if (SumSize > 4 * 1024 * 1024 || Aggregations.size() > 750 || !FlushDuration) {
Flush();
}
}


}
#include "actor.h"
#include <ydb/core/tx/columnshard/columnshard_impl.h>

namespace NKikimr::NColumnShard::NWriting {

TActor::TActor(ui64 tabletId, const TActorId& parent)
: TabletId(tabletId)
, ParentActorId(parent)
{

}

void TActor::Flush() {
if (Aggregations.size()) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "flush_writing")("size", SumSize)("count", Aggregations.size());
auto action = Aggregations.front()->GetWriteData()->GetBlobsAction();
auto writeController = std::make_shared<NOlap::TIndexedWriteController>(ParentActorId, action, std::move(Aggregations));
if (action->NeedDraftTransaction()) {
TActorContext::AsActorContext().Send(ParentActorId, std::make_unique<NColumnShard::TEvPrivate::TEvWriteDraft>(writeController));
} else {
TActorContext::AsActorContext().Register(NColumnShard::CreateWriteActor(TabletId, writeController, TInstant::Max()));
}
Aggregations.clear();
SumSize = 0;
} else {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "skip_flush_writing");
}
}

void TActor::Handle(TEvFlushBuffer::TPtr& /*ev*/) {
FlushDuration = TDuration::MilliSeconds(AppDataVerified().ColumnShardConfig.GetWritingBufferDurationMs());
Flush();
if (!FlushDuration) {
Schedule(TDuration::MilliSeconds(500), new TEvFlushBuffer);
} else {
Schedule(FlushDuration, new TEvFlushBuffer);
}
}

void TActor::Handle(TEvAddInsertedDataToBuffer::TPtr& ev) {
auto* evBase = ev->Get();
AFL_VERIFY(evBase->GetWriteData()->GetBlobsAction()->GetStorageId() == NOlap::IStoragesManager::DefaultStorageId);
SumSize += evBase->GetWriteData()->GetSize();
Aggregations.emplace_back(std::make_shared<NOlap::TWriteAggregation>(evBase->GetWriteData(), std::move(evBase->MutableBlobsToWrite())));
if (SumSize > 4 * 1024 * 1024 || Aggregations.size() > 750 || !FlushDuration) {
Flush();
}
}


}
Loading

0 comments on commit b5aa131

Please sign in to comment.