Skip to content

Commit

Permalink
buffered writing for columnshards (#686)
Browse files Browse the repository at this point in the history
* buffered writing for columnshards

* buffered writing for columnshards

* Revert "Merge branch 'write-buffer' of https://github.com/ivanmorozov333/ydb into write-buffer"

This reverts commit b5aa131, reversing
changes made to a831473.

* fix build

* fix build

* fix build

* fix

* one more fix
  • Loading branch information
ivanmorozov333 authored Dec 26, 2023
1 parent 9c84445 commit ad45285
Show file tree
Hide file tree
Showing 47 changed files with 788 additions and 433 deletions.
2 changes: 2 additions & 0 deletions ydb/core/protos/config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1422,6 +1422,8 @@ message TColumnShardConfig {
optional bool IndexationEnabled = 4 [default = true];
optional bool CompactionEnabled = 5 [default = true];
optional bool TTLEnabled = 6 [default = true];
optional bool WritingEnabled = 7 [default = true];
optional uint32 WritingBufferDurationMs = 8 [default = 0];
}

message TSchemeShardConfig {
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/tx/columnshard/blobs_action/abstract/read.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,9 @@ void IBlobsReadingAction::OnReadError(const TBlobRange& range, const TErrorStatu
void IBlobsReadingAction::AddRange(const TBlobRange& range, const TString& result /*= Default<TString>()*/) {
Y_ABORT_UNLESS(!Started);
if (!result) {
Y_ABORT_UNLESS(RangesForRead[range.BlobId].emplace(range).second);
AFL_VERIFY(RangesForRead[range.BlobId].emplace(range).second)("range", range.ToString());
} else {
Y_ABORT_UNLESS(RangesForResult.emplace(range, result).second);
AFL_VERIFY(RangesForResult.emplace(range, result).second)("range", range.ToString());
}
}

Expand Down
2 changes: 2 additions & 0 deletions ydb/core/tx/columnshard/blobs_action/counters/write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ TWriteCounters::TWriteCounters(const TConsumerCounters& owner)
ReplyBytes = TBase::GetDeriviative("Replies/Bytes");
ReplyDurationBySize = TBase::GetHistogram("Replies/Duration/Bytes", NMonitoring::ExponentialHistogram(15, 2, 1));
ReplyDurationByCount = TBase::GetHistogram("Replies/Duration/Count", NMonitoring::ExponentialHistogram(15, 2, 1));
WritesBySize = TBase::GetHistogram("Writes/Bytes", NMonitoring::ExponentialHistogram(25, 2, 1));
VolumeByChunkSize = TBase::GetHistogram("Volume/Bytes", NMonitoring::ExponentialHistogram(25, 2, 1));

FailsCount = TBase::GetDeriviative("Fails/Count");
FailBytes = TBase::GetDeriviative("Fails/Bytes");
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/tx/columnshard/blobs_action/counters/write.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ class TWriteCounters: public NColumnShard::TCommonCountersOwner {
NMonitoring::TDynamicCounters::TCounterPtr ReplyBytes;
NMonitoring::THistogramPtr ReplyDurationByCount;
NMonitoring::THistogramPtr ReplyDurationBySize;
NMonitoring::THistogramPtr WritesBySize;
NMonitoring::THistogramPtr VolumeByChunkSize;

NMonitoring::TDynamicCounters::TCounterPtr FailsCount;
NMonitoring::TDynamicCounters::TCounterPtr FailBytes;
Expand All @@ -34,6 +36,8 @@ class TWriteCounters: public NColumnShard::TCommonCountersOwner {
ReplyBytes->Add(bytes);
ReplyDurationByCount->Collect((i64)d.MilliSeconds());
ReplyDurationBySize->Collect((i64)d.MilliSeconds(), (i64)bytes);
WritesBySize->Collect((i64)bytes);
VolumeByChunkSize->Collect((i64)bytes, (i64)bytes);
}

void OnFail(const ui64 bytes, const TDuration d) const {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@ bool TTxInsertTableCleanup::Execute(TTransactionContext& txc, const TActorContex
auto storage = Self->StoragesManager->GetInsertOperator();
BlobsAction = storage->StartDeclareRemovingAction("TX_CLEANUP");
for (auto& [abortedWriteId, abortedData] : allAborted) {
Self->InsertTable->EraseAborted(dbTable, abortedData);
Y_ABORT_UNLESS(abortedData.GetBlobRange().IsFullBlob());
BlobsAction->DeclareRemove(abortedData.GetBlobRange().GetBlobId());
Self->InsertTable->EraseAborted(dbTable, abortedData, BlobsAction);
}
BlobsAction->OnExecuteTxAfterRemoving(*Self, blobManagerDb, true);
return true;
Expand Down
126 changes: 77 additions & 49 deletions ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp
Original file line number Diff line number Diff line change
@@ -1,26 +1,27 @@
#include "tx_write.h"

namespace NKikimr::NColumnShard {
bool TTxWrite::InsertOneBlob(TTransactionContext& txc, const TEvPrivate::TEvWriteBlobsResult::TPutBlobData& blobData, const TWriteId writeId, const TString& blob) {
const NKikimrTxColumnShard::TLogicalMetadata& meta = blobData.GetLogicalMeta();

const auto& blobRange = blobData.GetBlobRange();
bool TTxWrite::InsertOneBlob(TTransactionContext& txc, const NOlap::TWideSerializedBatch& batch, const TWriteId writeId) {
NKikimrTxColumnShard::TLogicalMetadata meta;
meta.SetNumRows(batch->GetRowsCount());
meta.SetRawBytes(batch->GetRawBytes());
meta.SetDirtyWriteTimeSeconds(batch.GetStartInstant().Seconds());
meta.SetSpecialKeysRawData(batch->GetSpecialKeysSafe().SerializeToString());

const auto& blobRange = batch.GetRange();
Y_ABORT_UNLESS(blobRange.GetBlobId().IsValid());

// First write wins
TBlobGroupSelector dsGroupSelector(Self->Info());
NOlap::TDbWrapper dbTable(txc.DB, &dsGroupSelector);

const auto& writeMeta(PutBlobResult->Get()->GetWriteMeta());

auto tableSchema = Self->TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetSchemaVerified(PutBlobResult->Get()->GetSchemaVersion());
const auto& writeMeta = batch.GetAggregation().GetWriteData()->GetWriteMeta();
auto schemeVersion = batch.GetAggregation().GetWriteData()->GetData()->GetSchemaVersion();
auto tableSchema = Self->TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetSchemaVerified(schemeVersion);

NOlap::TInsertedData insertData((ui64)writeId, writeMeta.GetTableId(), writeMeta.GetDedupId(), blobRange, meta, tableSchema->GetVersion(), blob);
NOlap::TInsertedData insertData((ui64)writeId, writeMeta.GetTableId(), writeMeta.GetDedupId(), blobRange, meta, tableSchema->GetVersion(), batch->GetData());
bool ok = Self->InsertTable->Insert(dbTable, std::move(insertData));
if (ok) {
// Put new data into blob cache
Y_ABORT_UNLESS(blobRange.IsFullBlob());

Self->UpdateInsertTableCounters();
return true;
}
Expand All @@ -31,62 +32,89 @@ bool TTxWrite::InsertOneBlob(TTransactionContext& txc, const TEvPrivate::TEvWrit
bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) {
NActors::TLogContextGuard logGuard = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())("tx_state", "execute");
ACFL_DEBUG("event", "start_execute");
const auto& writeMeta(PutBlobResult->Get()->GetWriteMeta());
Y_ABORT_UNLESS(Self->TablesManager.IsReadyForWrite(writeMeta.GetTableId()));

txc.DB.NoMoreReadsForTx();
TWriteOperation::TPtr operation;
if (writeMeta.HasLongTxId()) {
AFL_VERIFY(PutBlobResult->Get()->GetBlobData().size() == 1)("count", PutBlobResult->Get()->GetBlobData().size());
} else {
operation = Self->OperationsManager->GetOperation((TWriteId)writeMeta.GetWriteId());
Y_ABORT_UNLESS(operation);
Y_ABORT_UNLESS(operation->GetStatus() == EOperationStatus::Started);
}
const NOlap::TWritingBuffer& buffer = PutBlobResult->Get()->MutableWritesBuffer();
for (auto&& aggr : buffer.GetAggregations()) {
const auto& writeMeta = aggr->GetWriteData()->GetWriteMeta();
Y_ABORT_UNLESS(Self->TablesManager.IsReadyForWrite(writeMeta.GetTableId()));
txc.DB.NoMoreReadsForTx();
TWriteOperation::TPtr operation;
if (writeMeta.HasLongTxId()) {
AFL_VERIFY(aggr->GetSplittedBlobs().size() == 1)("count", aggr->GetSplittedBlobs().size());
} else {
operation = Self->OperationsManager->GetOperation((TWriteId)writeMeta.GetWriteId());
Y_ABORT_UNLESS(operation);
Y_ABORT_UNLESS(operation->GetStatus() == EOperationStatus::Started);
}

TVector<TWriteId> writeIds;
for (auto blobData : PutBlobResult->Get()->GetBlobData()) {
auto writeId = TWriteId(writeMeta.GetWriteId());
if (operation) {
writeId = Self->BuildNextWriteId(txc);
} else {
if (!operation) {
NIceDb::TNiceDb db(txc.DB);
writeId = Self->GetLongTxWrite(db, writeMeta.GetLongTxIdUnsafe(), writeMeta.GetWritePartId());
aggr->AddWriteId(writeId);
}

if (!InsertOneBlob(txc, blobData, writeId, PutBlobResult->Get()->GetBlobVerified(blobData.GetBlobRange()))) {
LOG_S_DEBUG(TxPrefix() << "duplicate writeId " << (ui64)writeId << TxSuffix());
Self->IncCounter(COUNTER_WRITE_DUPLICATE);
for (auto&& i : aggr->GetSplittedBlobs()) {
if (operation) {
writeId = Self->BuildNextWriteId(txc);
aggr->AddWriteId(writeId);
}

if (!InsertOneBlob(txc, i, writeId)) {
LOG_S_DEBUG(TxPrefix() << "duplicate writeId " << (ui64)writeId << TxSuffix());
Self->IncCounter(COUNTER_WRITE_DUPLICATE);
}
}
writeIds.push_back(writeId);
}

TBlobManagerDb blobManagerDb(txc.DB);
AFL_VERIFY(PutBlobResult->Get()->GetActions().size() == 1);
AFL_VERIFY(PutBlobResult->Get()->GetActions().front()->GetBlobsCount() == PutBlobResult->Get()->GetBlobData().size());
for (auto&& i : PutBlobResult->Get()->GetActions()) {
AFL_VERIFY(buffer.GetAddActions().size() == 1);
for (auto&& i : buffer.GetAddActions()) {
i->OnExecuteTxAfterWrite(*Self, blobManagerDb, true);
}

if (operation) {
operation->OnWriteFinish(txc, writeIds);
auto txInfo = Self->ProgressTxController->RegisterTxWithDeadline(operation->GetTxId(), NKikimrTxColumnShard::TX_KIND_COMMIT_WRITE, "", writeMeta.GetSource(), 0, txc);
Y_UNUSED(txInfo);
NEvents::TDataEvents::TCoordinatorInfo tInfo = Self->ProgressTxController->GetCoordinatorInfo(operation->GetTxId());
Result = NEvents::TDataEvents::TEvWriteResult::BuildPrepared(Self->TabletID(), operation->GetTxId(), tInfo);
} else {
Y_ABORT_UNLESS(writeIds.size() == 1);
Result = std::make_unique<TEvColumnShard::TEvWriteResult>(Self->TabletID(), writeMeta, (ui64)writeIds.front(), NKikimrTxColumnShard::EResultStatus::SUCCESS);
for (auto&& i : buffer.GetRemoveActions()) {
i->OnExecuteTxAfterRemoving(*Self, blobManagerDb, true);
}
for (auto&& aggr : buffer.GetAggregations()) {
const auto& writeMeta = aggr->GetWriteData()->GetWriteMeta();
std::unique_ptr<TEvColumnShard::TEvWriteResult> result;
TWriteOperation::TPtr operation;
if (!writeMeta.HasLongTxId()) {
operation = Self->OperationsManager->GetOperation((TWriteId)writeMeta.GetWriteId());
Y_ABORT_UNLESS(operation);
Y_ABORT_UNLESS(operation->GetStatus() == EOperationStatus::Started);
}
if (operation) {
operation->OnWriteFinish(txc, aggr->GetWriteIds());
auto txInfo = Self->ProgressTxController->RegisterTxWithDeadline(operation->GetTxId(), NKikimrTxColumnShard::TX_KIND_COMMIT_WRITE, "", writeMeta.GetSource(), 0, txc);
Y_UNUSED(txInfo);
NEvents::TDataEvents::TCoordinatorInfo tInfo = Self->ProgressTxController->GetCoordinatorInfo(operation->GetTxId());
Results.emplace_back(NEvents::TDataEvents::TEvWriteResult::BuildPrepared(Self->TabletID(), operation->GetTxId(), tInfo));
} else {
Y_ABORT_UNLESS(aggr->GetWriteIds().size() == 1);
Results.emplace_back(std::make_unique<TEvColumnShard::TEvWriteResult>(Self->TabletID(), writeMeta, (ui64)aggr->GetWriteIds().front(), NKikimrTxColumnShard::EResultStatus::SUCCESS));
}
}
return true;
}

void TTxWrite::Complete(const TActorContext& ctx) {
NActors::TLogContextGuard logGuard = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())("tx_state", "complete");
AFL_VERIFY(Result);
Self->CSCounters.OnWriteTxComplete((TMonotonic::Now() - PutBlobResult->Get()->GetWriteMeta().GetWriteStartInstant()).MilliSeconds());
Self->CSCounters.OnSuccessWriteResponse();
ctx.Send(PutBlobResult->Get()->GetWriteMeta().GetSource(), Result.release());
const auto now = TMonotonic::Now();
const NOlap::TWritingBuffer& buffer = PutBlobResult->Get()->MutableWritesBuffer();
for (auto&& i : buffer.GetAddActions()) {
i->OnCompleteTxAfterWrite(*Self);
}
for (auto&& i : buffer.GetRemoveActions()) {
i->OnCompleteTxAfterRemoving(*Self);
}
AFL_VERIFY(buffer.GetAggregations().size() == Results.size());
for (ui32 i = 0; i < buffer.GetAggregations().size(); ++i) {
const auto& writeMeta = buffer.GetAggregations()[i]->GetWriteData()->GetWriteMeta();
ctx.Send(writeMeta.GetSource(), Results[i].release());
Self->CSCounters.OnWriteTxComplete((now - writeMeta.GetWriteStartInstant()).MilliSeconds());
Self->CSCounters.OnSuccessWriteResponse();
}

}

}
5 changes: 3 additions & 2 deletions ydb/core/tx/columnshard/blobs_action/transaction/tx_write.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once
#include <ydb/core/tx/columnshard/columnshard_impl.h>
#include <ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.h>

namespace NKikimr::NColumnShard {

Expand All @@ -15,12 +16,12 @@ class TTxWrite : public TTransactionBase<TColumnShard> {
void Complete(const TActorContext& ctx) override;
TTxType GetTxType() const override { return TXTYPE_WRITE; }

bool InsertOneBlob(TTransactionContext& txc, const TEvPrivate::TEvWriteBlobsResult::TPutBlobData& blobData, const TWriteId writeId, const TString& blob);
bool InsertOneBlob(TTransactionContext& txc, const NOlap::TWideSerializedBatch& batch, const TWriteId writeId);

private:
TEvPrivate::TEvWriteBlobsResult::TPtr PutBlobResult;
const ui32 TabletTxNo;
std::unique_ptr<NActors::IEventBase> Result;
std::vector<std::unique_ptr<NActors::IEventBase>> Results;

TStringBuilder TxPrefix() const {
return TStringBuilder() << "TxWrite[" << ToString(TabletTxNo) << "] ";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ void TTxWriteIndex::Complete(const TActorContext& ctx) {
Self->EnqueueBackgroundActivities(false, TriggerActivity);
}

Self->UpdateResourceMetrics(ctx, Ev->Get()->PutResult->GetResourceUsage());
changes->MutableBlobsAction().OnCompleteTxAfterAction(*Self);
NYDBTest::TControllers::GetColumnShardController()->OnWriteIndexComplete(Self->TabletID(), changes->TypeString());
}
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/tx/columnshard/columnshard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include "blobs_reader/actor.h"
#include "hooks/abstract/abstract.h"
#include "resource_subscriber/actor.h"
#include "engines/writer/buffer/actor.h"

namespace NKikimr {

Expand All @@ -15,6 +16,8 @@ namespace NKikimr::NColumnShard {

void TColumnShard::CleanupActors(const TActorContext& ctx) {
ctx.Send(ResourceSubscribeActor, new TEvents::TEvPoisonPill);
ctx.Send(BufferizationWriteActorId, new TEvents::TEvPoisonPill);

StoragesManager->Stop();
if (Tiers) {
Tiers->Stop();
Expand Down Expand Up @@ -69,6 +72,7 @@ void TColumnShard::OnActivateExecutor(const TActorContext& ctx) {
CompactionLimits.RegisterControls(icb);
Settings.RegisterControls(icb);
ResourceSubscribeActor = ctx.Register(new NOlap::NResourceBroker::NSubscribe::TActor(TabletID(), SelfId()));
BufferizationWriteActorId = ctx.Register(new NColumnShard::NWriting::TActor(TabletID(), SelfId()));
Execute(CreateTxInitSchema(), ctx);
}

Expand Down
Loading

0 comments on commit ad45285

Please sign in to comment.