Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

buffered writing for columnshards #686

Merged
merged 9 commits into from
Dec 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions ydb/core/protos/config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1422,6 +1422,8 @@ message TColumnShardConfig {
optional bool IndexationEnabled = 4 [default = true];
optional bool CompactionEnabled = 5 [default = true];
optional bool TTLEnabled = 6 [default = true];
optional bool WritingEnabled = 7 [default = true];
optional uint32 WritingBufferDurationMs = 8 [default = 0];
}

message TSchemeShardConfig {
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/tx/columnshard/blobs_action/abstract/read.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,9 @@ void IBlobsReadingAction::OnReadError(const TBlobRange& range, const TErrorStatu
void IBlobsReadingAction::AddRange(const TBlobRange& range, const TString& result /*= Default<TString>()*/) {
Y_ABORT_UNLESS(!Started);
if (!result) {
Y_ABORT_UNLESS(RangesForRead[range.BlobId].emplace(range).second);
AFL_VERIFY(RangesForRead[range.BlobId].emplace(range).second)("range", range.ToString());
} else {
Y_ABORT_UNLESS(RangesForResult.emplace(range, result).second);
AFL_VERIFY(RangesForResult.emplace(range, result).second)("range", range.ToString());
}
}

Expand Down
2 changes: 2 additions & 0 deletions ydb/core/tx/columnshard/blobs_action/counters/write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ TWriteCounters::TWriteCounters(const TConsumerCounters& owner)
ReplyBytes = TBase::GetDeriviative("Replies/Bytes");
ReplyDurationBySize = TBase::GetHistogram("Replies/Duration/Bytes", NMonitoring::ExponentialHistogram(15, 2, 1));
ReplyDurationByCount = TBase::GetHistogram("Replies/Duration/Count", NMonitoring::ExponentialHistogram(15, 2, 1));
WritesBySize = TBase::GetHistogram("Writes/Bytes", NMonitoring::ExponentialHistogram(25, 2, 1));
VolumeByChunkSize = TBase::GetHistogram("Volume/Bytes", NMonitoring::ExponentialHistogram(25, 2, 1));

FailsCount = TBase::GetDeriviative("Fails/Count");
FailBytes = TBase::GetDeriviative("Fails/Bytes");
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/tx/columnshard/blobs_action/counters/write.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ class TWriteCounters: public NColumnShard::TCommonCountersOwner {
NMonitoring::TDynamicCounters::TCounterPtr ReplyBytes;
NMonitoring::THistogramPtr ReplyDurationByCount;
NMonitoring::THistogramPtr ReplyDurationBySize;
NMonitoring::THistogramPtr WritesBySize;
NMonitoring::THistogramPtr VolumeByChunkSize;

NMonitoring::TDynamicCounters::TCounterPtr FailsCount;
NMonitoring::TDynamicCounters::TCounterPtr FailBytes;
Expand All @@ -34,6 +36,8 @@ class TWriteCounters: public NColumnShard::TCommonCountersOwner {
ReplyBytes->Add(bytes);
ReplyDurationByCount->Collect((i64)d.MilliSeconds());
ReplyDurationBySize->Collect((i64)d.MilliSeconds(), (i64)bytes);
WritesBySize->Collect((i64)bytes);
VolumeByChunkSize->Collect((i64)bytes, (i64)bytes);
}

void OnFail(const ui64 bytes, const TDuration d) const {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@ bool TTxInsertTableCleanup::Execute(TTransactionContext& txc, const TActorContex
auto storage = Self->StoragesManager->GetInsertOperator();
BlobsAction = storage->StartDeclareRemovingAction("TX_CLEANUP");
for (auto& [abortedWriteId, abortedData] : allAborted) {
Self->InsertTable->EraseAborted(dbTable, abortedData);
Y_ABORT_UNLESS(abortedData.GetBlobRange().IsFullBlob());
BlobsAction->DeclareRemove(abortedData.GetBlobRange().GetBlobId());
Self->InsertTable->EraseAborted(dbTable, abortedData, BlobsAction);
}
BlobsAction->OnExecuteTxAfterRemoving(*Self, blobManagerDb, true);
return true;
Expand Down
126 changes: 77 additions & 49 deletions ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp
Original file line number Diff line number Diff line change
@@ -1,26 +1,27 @@
#include "tx_write.h"

namespace NKikimr::NColumnShard {
bool TTxWrite::InsertOneBlob(TTransactionContext& txc, const TEvPrivate::TEvWriteBlobsResult::TPutBlobData& blobData, const TWriteId writeId, const TString& blob) {
const NKikimrTxColumnShard::TLogicalMetadata& meta = blobData.GetLogicalMeta();

const auto& blobRange = blobData.GetBlobRange();
bool TTxWrite::InsertOneBlob(TTransactionContext& txc, const NOlap::TWideSerializedBatch& batch, const TWriteId writeId) {
NKikimrTxColumnShard::TLogicalMetadata meta;
meta.SetNumRows(batch->GetRowsCount());
meta.SetRawBytes(batch->GetRawBytes());
meta.SetDirtyWriteTimeSeconds(batch.GetStartInstant().Seconds());
meta.SetSpecialKeysRawData(batch->GetSpecialKeysSafe().SerializeToString());

const auto& blobRange = batch.GetRange();
Y_ABORT_UNLESS(blobRange.GetBlobId().IsValid());

// First write wins
TBlobGroupSelector dsGroupSelector(Self->Info());
NOlap::TDbWrapper dbTable(txc.DB, &dsGroupSelector);

const auto& writeMeta(PutBlobResult->Get()->GetWriteMeta());

auto tableSchema = Self->TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetSchemaVerified(PutBlobResult->Get()->GetSchemaVersion());
const auto& writeMeta = batch.GetAggregation().GetWriteData()->GetWriteMeta();
auto schemeVersion = batch.GetAggregation().GetWriteData()->GetData()->GetSchemaVersion();
auto tableSchema = Self->TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetSchemaVerified(schemeVersion);

NOlap::TInsertedData insertData((ui64)writeId, writeMeta.GetTableId(), writeMeta.GetDedupId(), blobRange, meta, tableSchema->GetVersion(), blob);
NOlap::TInsertedData insertData((ui64)writeId, writeMeta.GetTableId(), writeMeta.GetDedupId(), blobRange, meta, tableSchema->GetVersion(), batch->GetData());
bool ok = Self->InsertTable->Insert(dbTable, std::move(insertData));
if (ok) {
// Put new data into blob cache
Y_ABORT_UNLESS(blobRange.IsFullBlob());

Self->UpdateInsertTableCounters();
return true;
}
Expand All @@ -31,62 +32,89 @@ bool TTxWrite::InsertOneBlob(TTransactionContext& txc, const TEvPrivate::TEvWrit
bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) {
NActors::TLogContextGuard logGuard = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())("tx_state", "execute");
ACFL_DEBUG("event", "start_execute");
const auto& writeMeta(PutBlobResult->Get()->GetWriteMeta());
Y_ABORT_UNLESS(Self->TablesManager.IsReadyForWrite(writeMeta.GetTableId()));

txc.DB.NoMoreReadsForTx();
TWriteOperation::TPtr operation;
if (writeMeta.HasLongTxId()) {
AFL_VERIFY(PutBlobResult->Get()->GetBlobData().size() == 1)("count", PutBlobResult->Get()->GetBlobData().size());
} else {
operation = Self->OperationsManager->GetOperation((TWriteId)writeMeta.GetWriteId());
Y_ABORT_UNLESS(operation);
Y_ABORT_UNLESS(operation->GetStatus() == EOperationStatus::Started);
}
const NOlap::TWritingBuffer& buffer = PutBlobResult->Get()->MutableWritesBuffer();
for (auto&& aggr : buffer.GetAggregations()) {
const auto& writeMeta = aggr->GetWriteData()->GetWriteMeta();
Y_ABORT_UNLESS(Self->TablesManager.IsReadyForWrite(writeMeta.GetTableId()));
txc.DB.NoMoreReadsForTx();
TWriteOperation::TPtr operation;
if (writeMeta.HasLongTxId()) {
AFL_VERIFY(aggr->GetSplittedBlobs().size() == 1)("count", aggr->GetSplittedBlobs().size());
} else {
operation = Self->OperationsManager->GetOperation((TWriteId)writeMeta.GetWriteId());
Y_ABORT_UNLESS(operation);
Y_ABORT_UNLESS(operation->GetStatus() == EOperationStatus::Started);
}

TVector<TWriteId> writeIds;
for (auto blobData : PutBlobResult->Get()->GetBlobData()) {
auto writeId = TWriteId(writeMeta.GetWriteId());
if (operation) {
writeId = Self->BuildNextWriteId(txc);
} else {
if (!operation) {
NIceDb::TNiceDb db(txc.DB);
writeId = Self->GetLongTxWrite(db, writeMeta.GetLongTxIdUnsafe(), writeMeta.GetWritePartId());
aggr->AddWriteId(writeId);
}

if (!InsertOneBlob(txc, blobData, writeId, PutBlobResult->Get()->GetBlobVerified(blobData.GetBlobRange()))) {
LOG_S_DEBUG(TxPrefix() << "duplicate writeId " << (ui64)writeId << TxSuffix());
Self->IncCounter(COUNTER_WRITE_DUPLICATE);
for (auto&& i : aggr->GetSplittedBlobs()) {
if (operation) {
writeId = Self->BuildNextWriteId(txc);
aggr->AddWriteId(writeId);
}

if (!InsertOneBlob(txc, i, writeId)) {
LOG_S_DEBUG(TxPrefix() << "duplicate writeId " << (ui64)writeId << TxSuffix());
Self->IncCounter(COUNTER_WRITE_DUPLICATE);
}
}
writeIds.push_back(writeId);
}

TBlobManagerDb blobManagerDb(txc.DB);
AFL_VERIFY(PutBlobResult->Get()->GetActions().size() == 1);
AFL_VERIFY(PutBlobResult->Get()->GetActions().front()->GetBlobsCount() == PutBlobResult->Get()->GetBlobData().size());
for (auto&& i : PutBlobResult->Get()->GetActions()) {
AFL_VERIFY(buffer.GetAddActions().size() == 1);
for (auto&& i : buffer.GetAddActions()) {
i->OnExecuteTxAfterWrite(*Self, blobManagerDb, true);
}

if (operation) {
operation->OnWriteFinish(txc, writeIds);
auto txInfo = Self->ProgressTxController->RegisterTxWithDeadline(operation->GetTxId(), NKikimrTxColumnShard::TX_KIND_COMMIT_WRITE, "", writeMeta.GetSource(), 0, txc);
Y_UNUSED(txInfo);
NEvents::TDataEvents::TCoordinatorInfo tInfo = Self->ProgressTxController->GetCoordinatorInfo(operation->GetTxId());
Result = NEvents::TDataEvents::TEvWriteResult::BuildPrepared(Self->TabletID(), operation->GetTxId(), tInfo);
} else {
Y_ABORT_UNLESS(writeIds.size() == 1);
Result = std::make_unique<TEvColumnShard::TEvWriteResult>(Self->TabletID(), writeMeta, (ui64)writeIds.front(), NKikimrTxColumnShard::EResultStatus::SUCCESS);
for (auto&& i : buffer.GetRemoveActions()) {
i->OnExecuteTxAfterRemoving(*Self, blobManagerDb, true);
}
for (auto&& aggr : buffer.GetAggregations()) {
const auto& writeMeta = aggr->GetWriteData()->GetWriteMeta();
std::unique_ptr<TEvColumnShard::TEvWriteResult> result;
TWriteOperation::TPtr operation;
if (!writeMeta.HasLongTxId()) {
operation = Self->OperationsManager->GetOperation((TWriteId)writeMeta.GetWriteId());
Y_ABORT_UNLESS(operation);
Y_ABORT_UNLESS(operation->GetStatus() == EOperationStatus::Started);
}
if (operation) {
operation->OnWriteFinish(txc, aggr->GetWriteIds());
auto txInfo = Self->ProgressTxController->RegisterTxWithDeadline(operation->GetTxId(), NKikimrTxColumnShard::TX_KIND_COMMIT_WRITE, "", writeMeta.GetSource(), 0, txc);
Y_UNUSED(txInfo);
NEvents::TDataEvents::TCoordinatorInfo tInfo = Self->ProgressTxController->GetCoordinatorInfo(operation->GetTxId());
Results.emplace_back(NEvents::TDataEvents::TEvWriteResult::BuildPrepared(Self->TabletID(), operation->GetTxId(), tInfo));
} else {
Y_ABORT_UNLESS(aggr->GetWriteIds().size() == 1);
Results.emplace_back(std::make_unique<TEvColumnShard::TEvWriteResult>(Self->TabletID(), writeMeta, (ui64)aggr->GetWriteIds().front(), NKikimrTxColumnShard::EResultStatus::SUCCESS));
}
}
return true;
}

void TTxWrite::Complete(const TActorContext& ctx) {
NActors::TLogContextGuard logGuard = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())("tx_state", "complete");
AFL_VERIFY(Result);
Self->CSCounters.OnWriteTxComplete((TMonotonic::Now() - PutBlobResult->Get()->GetWriteMeta().GetWriteStartInstant()).MilliSeconds());
Self->CSCounters.OnSuccessWriteResponse();
ctx.Send(PutBlobResult->Get()->GetWriteMeta().GetSource(), Result.release());
const auto now = TMonotonic::Now();
const NOlap::TWritingBuffer& buffer = PutBlobResult->Get()->MutableWritesBuffer();
for (auto&& i : buffer.GetAddActions()) {
i->OnCompleteTxAfterWrite(*Self);
}
for (auto&& i : buffer.GetRemoveActions()) {
i->OnCompleteTxAfterRemoving(*Self);
}
AFL_VERIFY(buffer.GetAggregations().size() == Results.size());
for (ui32 i = 0; i < buffer.GetAggregations().size(); ++i) {
const auto& writeMeta = buffer.GetAggregations()[i]->GetWriteData()->GetWriteMeta();
ctx.Send(writeMeta.GetSource(), Results[i].release());
Self->CSCounters.OnWriteTxComplete((now - writeMeta.GetWriteStartInstant()).MilliSeconds());
Self->CSCounters.OnSuccessWriteResponse();
}

}

}
5 changes: 3 additions & 2 deletions ydb/core/tx/columnshard/blobs_action/transaction/tx_write.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once
#include <ydb/core/tx/columnshard/columnshard_impl.h>
#include <ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.h>

namespace NKikimr::NColumnShard {

Expand All @@ -15,12 +16,12 @@ class TTxWrite : public TTransactionBase<TColumnShard> {
void Complete(const TActorContext& ctx) override;
TTxType GetTxType() const override { return TXTYPE_WRITE; }

bool InsertOneBlob(TTransactionContext& txc, const TEvPrivate::TEvWriteBlobsResult::TPutBlobData& blobData, const TWriteId writeId, const TString& blob);
bool InsertOneBlob(TTransactionContext& txc, const NOlap::TWideSerializedBatch& batch, const TWriteId writeId);

private:
TEvPrivate::TEvWriteBlobsResult::TPtr PutBlobResult;
const ui32 TabletTxNo;
std::unique_ptr<NActors::IEventBase> Result;
std::vector<std::unique_ptr<NActors::IEventBase>> Results;

TStringBuilder TxPrefix() const {
return TStringBuilder() << "TxWrite[" << ToString(TabletTxNo) << "] ";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ void TTxWriteIndex::Complete(const TActorContext& ctx) {
Self->EnqueueBackgroundActivities(false, TriggerActivity);
}

Self->UpdateResourceMetrics(ctx, Ev->Get()->PutResult->GetResourceUsage());
changes->MutableBlobsAction().OnCompleteTxAfterAction(*Self);
NYDBTest::TControllers::GetColumnShardController()->OnWriteIndexComplete(Self->TabletID(), changes->TypeString());
}
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/tx/columnshard/columnshard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include "blobs_reader/actor.h"
#include "hooks/abstract/abstract.h"
#include "resource_subscriber/actor.h"
#include "engines/writer/buffer/actor.h"

namespace NKikimr {

Expand All @@ -15,6 +16,8 @@ namespace NKikimr::NColumnShard {

void TColumnShard::CleanupActors(const TActorContext& ctx) {
ctx.Send(ResourceSubscribeActor, new TEvents::TEvPoisonPill);
ctx.Send(BufferizationWriteActorId, new TEvents::TEvPoisonPill);

StoragesManager->Stop();
if (Tiers) {
Tiers->Stop();
Expand Down Expand Up @@ -69,6 +72,7 @@ void TColumnShard::OnActivateExecutor(const TActorContext& ctx) {
CompactionLimits.RegisterControls(icb);
Settings.RegisterControls(icb);
ResourceSubscribeActor = ctx.Register(new NOlap::NResourceBroker::NSubscribe::TActor(TabletID(), SelfId()));
BufferizationWriteActorId = ctx.Register(new NColumnShard::NWriting::TActor(TabletID(), SelfId()));
Execute(CreateTxInitSchema(), ctx);
}

Expand Down
Loading
Loading