Skip to content

Commit

Permalink
Merge 0e29fce into d0c941e
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Jan 1, 2025
2 parents d0c941e + 0e29fce commit 0953a72
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 28 deletions.
2 changes: 2 additions & 0 deletions ydb/core/tx/columnshard/columnshard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,8 @@ void TColumnShard::Handle(NActors::TEvents::TEvWakeup::TPtr& ev, const TActorCon
const TMonotonic now = TMonotonic::Now();
GetProgressTxController().PingTimeouts(now);
ctx.Schedule(TDuration::Seconds(1), new NActors::TEvents::TEvWakeup(0));
} else if (ev->Get()->Tag == 1) {
WriteTasksQueue.Drain(true, ctx);
}
}

Expand Down
62 changes: 38 additions & 24 deletions ydb/core/tx/columnshard/columnshard__write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,40 @@ class TAbortWriteTransaction: public NTabletFlatExecutor::TTransactionBase<TColu
ui64 Cookie;
};

bool TColumnShard::TWriteTask::Execute(TColumnShard* owner, const TActorContext& ctx) {
auto overloadStatus = owner->CheckOverloaded(PathId);
if (overloadStatus == EOverloadStatus::OverloadMetadata) {
AFL_INFO(NKikimrServices::TX_COLUMNSHARD_WRITE)("event", "wait_overload")("status", overloadStatus);
return false;
}
owner->Counters.GetCSCounters().WritingCounters->OnWritingTaskDequeue(TMonotonic::Now() - Created);
if (overloadStatus != EOverloadStatus::None) {
std::unique_ptr<NActors::IEventBase> result = NEvents::TDataEvents::TEvWriteResult::BuildError(
owner->TabletID(), 0, NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED, "overload data error");
owner->OverloadWriteFail(overloadStatus, NEvWrite::TWriteMeta(0, PathId, SourceId, {}, TGUID::CreateTimebased().AsGuidString()),
ArrowData->GetSize(), Cookie, std::move(result), ctx);
return true;
}

owner->OperationsManager->RegisterLock(LockId, owner->Generation());
auto writeOperation = owner->OperationsManager->RegisterOperation(
PathId, LockId, Cookie, GranuleShardingVersionId, ModificationType, AppDataVerified().FeatureFlags.GetEnableWritePortionsOnInsert());

AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_WRITE)("writing_size", ArrowData->GetSize())("operation_id", writeOperation->GetIdentifier())(
"in_flight", owner->Counters.GetWritesMonitor()->GetWritesInFlight())(
"size_in_flight", owner->Counters.GetWritesMonitor()->GetWritesSizeInFlight());
owner->Counters.GetWritesMonitor()->OnStartWrite(ArrowData->GetSize());

AFL_VERIFY(writeOperation);
writeOperation->SetBehaviour(Behaviour);
NOlap::TWritingContext wContext(owner->TabletID(), owner->SelfId(), Schema, owner->StoragesManager,
owner->Counters.GetIndexationCounters().SplitterCounters, owner->Counters.GetCSCounters().WritingCounters, NOlap::TSnapshot::Max(),
writeOperation->GetActivityChecker());
ArrowData->SetSeparationPoints(owner->GetIndexAs<NOlap::TColumnEngineForLogs>().GetGranulePtrVerified(PathId)->GetBucketPositions());
writeOperation->Start(*owner, ArrowData, SourceId, wContext);
return true;
}

void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActorContext& ctx) {
TMemoryProfileGuard mpg("NEvents::TDataEvents::TEvWrite");
NActors::TLogContextGuard gLogging =
Expand Down Expand Up @@ -560,11 +594,8 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor
return;
}

auto overloadStatus = CheckOverloaded(pathId);
if (overloadStatus != EOverloadStatus::None) {
std::unique_ptr<NActors::IEventBase> result = NEvents::TDataEvents::TEvWriteResult::BuildError(
TabletID(), 0, NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED, "overload data error");
OverloadWriteFail(overloadStatus, NEvWrite::TWriteMeta(0, pathId, source, {}, TGUID::CreateTimebased().AsGuidString()), arrowData->GetSize(), cookie, std::move(result), ctx);
if (!AppDataVerified().ColumnShardConfig.GetWritingEnabled()) {
sendError("writing disabled", NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED);
return;
}

Expand All @@ -580,25 +611,8 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor
lockId = record.GetLockTxId();
}

if (!AppDataVerified().ColumnShardConfig.GetWritingEnabled()) {
sendError("writing disabled", NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED);
return;
}

OperationsManager->RegisterLock(lockId, Generation());
auto writeOperation = OperationsManager->RegisterOperation(
pathId, lockId, cookie, granuleShardingVersionId, *mType, AppDataVerified().FeatureFlags.GetEnableWritePortionsOnInsert());

AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_WRITE)("writing_size", arrowData->GetSize())("operation_id", writeOperation->GetIdentifier())(
"in_flight", Counters.GetWritesMonitor()->GetWritesInFlight())("size_in_flight", Counters.GetWritesMonitor()->GetWritesSizeInFlight());
Counters.GetWritesMonitor()->OnStartWrite(arrowData->GetSize());

Y_ABORT_UNLESS(writeOperation);
writeOperation->SetBehaviour(behaviour);
NOlap::TWritingContext wContext(TabletID(), SelfId(), schema, StoragesManager, Counters.GetIndexationCounters().SplitterCounters,
Counters.GetCSCounters().WritingCounters, NOlap::TSnapshot::Max(), writeOperation->GetActivityChecker());
arrowData->SetSeparationPoints(GetIndexAs<NOlap::TColumnEngineForLogs>().GetGranulePtrVerified(pathId)->GetBucketPositions());
writeOperation->Start(*this, arrowData, source, wContext);
WriteTasksQueue.Enqueue(TWriteTask(arrowData, schema, source, granuleShardingVersionId, pathId, cookie, lockId, *mType, behaviour));
WriteTasksQueue.Drain(false, ctx);
}

} // namespace NKikimr::NColumnShard
1 change: 1 addition & 0 deletions ydb/core/tx/columnshard/columnshard_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ TColumnShard::TColumnShard(TTabletStorageInfo* info, const TActorId& tablet)
, TabletCountersHolder(new TProtobufTabletCounters<ESimpleCounters_descriptor, ECumulativeCounters_descriptor,
EPercentileCounters_descriptor, ETxTypes_descriptor>())
, Counters(*TabletCountersHolder)
, WriteTasksQueue(this)
, ProgressTxController(std::make_unique<TTxController>(*this))
, StoragesManager(std::make_shared<NOlap::TStoragesManager>(*this))
, DataLocksManager(std::make_shared<NOlap::NDataLocks::TManager>())
Expand Down
71 changes: 71 additions & 0 deletions ydb/core/tx/columnshard/columnshard_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ class TGeneralCompactColumnEngineChanges;

namespace NKikimr::NColumnShard {

class TArrowData;
class TEvWriteCommitPrimaryTransactionOperator;
class TEvWriteCommitSecondaryTransactionOperator;
class TTxFinishAsyncTransaction;
Expand Down Expand Up @@ -461,9 +462,79 @@ class TColumnShard: public TActor<TColumnShard>, public NTabletFlatExecutor::TTa
}
}

class TWriteTask: TMoveOnly {
private:
std::shared_ptr<TArrowData> ArrowData;
NOlap::ISnapshotSchema::TPtr Schema;
const NActors::TActorId SourceId;
const std::optional<ui32> GranuleShardingVersionId;
const ui64 PathId;
const ui64 Cookie;
const ui64 LockId;
const NEvWrite::EModificationType ModificationType;
const EOperationBehaviour Behaviour;
const TMonotonic Created = TMonotonic::Now();
public:
TWriteTask(const std::shared_ptr<TArrowData>& arrowData, const NOlap::ISnapshotSchema::TPtr& schema, const NActors::TActorId sourceId,
const std::optional<ui32>& granuleShardingVersionId, const ui64 pathId, const ui64 cookie, const ui64 lockId,
const NEvWrite::EModificationType modificationType, const EOperationBehaviour behaviour)
: ArrowData(arrowData)
, Schema(schema)
, SourceId(sourceId)
, GranuleShardingVersionId(granuleShardingVersionId)
, PathId(pathId)
, Cookie(cookie)
, LockId(lockId)
, ModificationType(modificationType)
, Behaviour(behaviour) {
}

const TMonotonic& GetCreatedMonotonic() const {
return Created;
}

bool Execute(TColumnShard* owner, const TActorContext& ctx);
};

class TWriteTasksQueue {
private:
bool WriteTasksOverloadCheckerScheduled = false;
std::deque<TWriteTask> WriteTasks;
i64 PredWriteTasksSize = 0;
TColumnShard* Owner;
public:
TWriteTasksQueue(TColumnShard* owner)
: Owner(owner) {
}

~TWriteTasksQueue() {
Owner->Counters.GetCSCounters().WritingCounters->QueueWaitSize->Sub(PredWriteTasksSize);
}

void Enqueue(TWriteTask&& task) {
WriteTasks.emplace_back(std::move(task));
}

void Drain(const bool onWakeup, const TActorContext& ctx) {
if (onWakeup) {
WriteTasksOverloadCheckerScheduled = false;
}
while (WriteTasks.size() && WriteTasks.front().Execute(Owner, ctx)) {
WriteTasks.pop_front();
}
if (!WriteTasks.size() && !WriteTasksOverloadCheckerScheduled) {
Owner->Schedule(TDuration::MilliSeconds(100), new NActors::TEvents::TEvWakeup(1));
WriteTasksOverloadCheckerScheduled = true;
}
Owner->Counters.GetCSCounters().WritingCounters->QueueWaitSize->Add((i64)WriteTasks.size() - PredWriteTasksSize);
PredWriteTasksSize = (i64)WriteTasks.size();
}
};

private:
std::unique_ptr<TTabletCountersBase> TabletCountersHolder;
TCountersManager Counters;
TWriteTasksQueue WriteTasksQueue;

std::unique_ptr<TTxController> ProgressTxController;
std::unique_ptr<TOperationsManager> OperationsManager;
Expand Down
17 changes: 13 additions & 4 deletions ydb/core/tx/columnshard/counters/columnshard.h
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
#pragma once
#include "common/owner.h"
#include "initialization.h"
#include "tx_progress.h"

#include "common/owner.h"

#include <ydb/core/tx/columnshard/counters/tablet_counters.h>

#include <library/cpp/monlib/dynamic_counters/counters.h>
Expand All @@ -26,14 +27,22 @@ class TWriteCounters: public TCommonCountersOwner {
NMonitoring::TDynamicCounters::TCounterPtr VolumeWriteData;
NMonitoring::THistogramPtr HistogramBytesWriteDataCount;
NMonitoring::THistogramPtr HistogramBytesWriteDataBytes;
NMonitoring::THistogramPtr HistogramDurationQueueWait;

public:
const NMonitoring::TDynamicCounters::TCounterPtr QueueWaitSize;

void OnWritingTaskDequeue(const TDuration d){
HistogramDurationQueueWait->Collect(d.MilliSeconds());
}

TWriteCounters(TCommonCountersOwner& owner)
: TBase(owner, "activity", "writing")
{
: TBase(owner, "activity", "writing") {
VolumeWriteData = TBase::GetDeriviative("Write/Incoming/Bytes");
HistogramBytesWriteDataCount = TBase::GetHistogram("Write/Incoming/ByBytes/Count", NMonitoring::ExponentialHistogram(18, 2, 100));
HistogramBytesWriteDataBytes = TBase::GetHistogram("Write/Incoming/ByBytes/Bytes", NMonitoring::ExponentialHistogram(18, 2, 100));
HistogramDurationQueueWait = TBase::GetHistogram("Write/Queue/Waiting/DurationMs", NMonitoring::ExponentialHistogram(18, 2, 100));
QueueWaitSize = TBase::GetValue("Write/Queue/Size");
}

void OnIncomingData(const ui64 dataSize) const {
Expand Down Expand Up @@ -231,4 +240,4 @@ class TCSCounters: public TCommonCountersOwner {
TCSCounters();
};

}
} // namespace NKikimr::NColumnShard

0 comments on commit 0953a72

Please sign in to comment.