Skip to content

Commit

Permalink
Merge 15ae8f0 into 2490271
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Jan 2, 2025
2 parents 2490271 + 15ae8f0 commit 49966cc
Show file tree
Hide file tree
Showing 9 changed files with 202 additions and 60 deletions.
7 changes: 5 additions & 2 deletions ydb/core/tx/columnshard/columnshard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

#include <ydb/core/protos/table_stats.pb.h>
#include <ydb/core/tx/columnshard/bg_tasks/adapter/adapter.h>
#include <ydb/core/tx/columnshard/tablet/write_queue.h>
#include <ydb/core/tx/priorities/usage/service.h>
#include <ydb/core/tx/tiering/manager.h>

Expand Down Expand Up @@ -69,8 +70,8 @@ void TColumnShard::TrySwitchToWork(const TActorContext& ctx) {
}
ProgressTxController->OnTabletInit();
{
const TLogContextGuard gLogging =
NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", TabletID())("self_id", SelfId())("process", "SwitchToWork");
const TLogContextGuard gLogging = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", TabletID())(
"self_id", SelfId())("process", "SwitchToWork");
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "initialize_shard")("step", "SwitchToWork");
Become(&TThis::StateWork);
SignalTabletActive(ctx);
Expand Down Expand Up @@ -250,6 +251,8 @@ void TColumnShard::Handle(NActors::TEvents::TEvWakeup::TPtr& ev, const TActorCon
const TMonotonic now = TMonotonic::Now();
GetProgressTxController().PingTimeouts(now);
ctx.Schedule(TDuration::Seconds(1), new NActors::TEvents::TEvWakeup(0));
} else if (ev->Get()->Tag == 1) {
WriteTasksQueue->Drain(true, ctx);
}
}

Expand Down
53 changes: 20 additions & 33 deletions ydb/core/tx/columnshard/columnshard__write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "transactions/operators/ev_write/secondary.h"
#include "transactions/operators/ev_write/sync.h"

#include <ydb/core/tx/columnshard/tablet/write_queue.h>
#include <ydb/core/tx/conveyor/usage/service.h>
#include <ydb/core/tx/data_events/events.h>

Expand Down Expand Up @@ -46,26 +47,27 @@ void TColumnShard::OverloadWriteFail(const EOverloadStatus overloadReason, const
Y_ABORT("invalid function usage");
}

AFL_TRACE(NKikimrServices::TX_COLUMNSHARD_WRITE)("event", "write_overload")("size", writeSize)("path_id", writeMeta.GetTableId())(
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_WRITE)("event", "write_overload")("size", writeSize)("path_id", writeMeta.GetTableId())(
"reason", overloadReason);

ctx.Send(writeMeta.GetSource(), event.release(), 0, cookie);
}

TColumnShard::EOverloadStatus TColumnShard::CheckOverloaded(const ui64 pathId) const {
if (IsAnyChannelYellowStop()) {
return EOverloadStatus::Disk;
}

TColumnShard::EOverloadStatus TColumnShard::CheckOverloadedWait(const ui64 pathId) const {
if (InsertTable && InsertTable->IsOverloadedByCommitted(pathId)) {
return EOverloadStatus::InsertTable;
}

Counters.GetCSCounters().OnIndexMetadataLimit(NOlap::IColumnEngine::GetMetadataLimit());
if (TablesManager.GetPrimaryIndex() && TablesManager.GetPrimaryIndex()->IsOverloadedByMetadata(NOlap::IColumnEngine::GetMetadataLimit())) {
return EOverloadStatus::OverloadMetadata;
}
return EOverloadStatus::None;
}

TColumnShard::EOverloadStatus TColumnShard::CheckOverloadedImmediate(const ui64 pathId) const {
if (IsAnyChannelYellowStop()) {
return EOverloadStatus::Disk;
}
ui64 txLimit = Settings.OverloadTxInFlight;
ui64 writesLimit = Settings.OverloadWritesInFlight;
ui64 writesSizeLimit = Settings.OverloadWritesSizeInFlight;
Expand Down Expand Up @@ -93,7 +95,8 @@ void TColumnShard::Handle(NPrivateEvents::NWrite::TEvWritePortionResult::TPtr& e
NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD_WRITE)("tablet_id", TabletID())("event", "TEvWritePortionResult");
std::vector<TNoDataWrite> noDataWrites = ev->Get()->DetachNoDataWrites();
for (auto&& i : noDataWrites) {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD_WRITE)("event", "no_data_write_finished")("writing_size", i.GetDataSize())("writing_id", i.GetWriteMeta().GetId());
AFL_WARN(NKikimrServices::TX_COLUMNSHARD_WRITE)("event", "no_data_write_finished")("writing_size", i.GetDataSize())(
"writing_id", i.GetWriteMeta().GetId());
Counters.GetWritesMonitor()->OnFinishWrite(i.GetDataSize(), 1);
}
if (ev->Get()->GetWriteStatus() == NKikimrProto::OK) {
Expand Down Expand Up @@ -255,7 +258,10 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex

NEvWrite::TWriteData writeData(writeMeta, arrowData, snapshotSchema->GetIndexInfo().GetReplaceKey(),
StoragesManager->GetInsertOperator()->StartWritingAction(NOlap::NBlobOperations::EConsumer::WRITING), false);
auto overloadStatus = CheckOverloaded(pathId);
auto overloadStatus = CheckOverloadedImmediate(pathId);
if (overloadStatus == EOverloadStatus::None) {
overloadStatus = CheckOverloadedWait(pathId);
}
if (overloadStatus != EOverloadStatus::None) {
std::unique_ptr<NActors::IEventBase> result = std::make_unique<TEvColumnShard::TEvWriteResult>(
TabletID(), writeData.GetWriteMeta(), NKikimrTxColumnShard::EResultStatus::OVERLOADED);
Expand Down Expand Up @@ -560,11 +566,8 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor
return;
}

auto overloadStatus = CheckOverloaded(pathId);
if (overloadStatus != EOverloadStatus::None) {
std::unique_ptr<NActors::IEventBase> result = NEvents::TDataEvents::TEvWriteResult::BuildError(
TabletID(), 0, NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED, "overload data error");
OverloadWriteFail(overloadStatus, NEvWrite::TWriteMeta(0, pathId, source, {}, TGUID::CreateTimebased().AsGuidString()), arrowData->GetSize(), cookie, std::move(result), ctx);
if (!AppDataVerified().ColumnShardConfig.GetWritingEnabled()) {
sendError("writing disabled", NKikimrDataEvents::TEvWriteResult::STATUS_CANCELLED);
return;
}

Expand All @@ -580,25 +583,9 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor
lockId = record.GetLockTxId();
}

if (!AppDataVerified().ColumnShardConfig.GetWritingEnabled()) {
sendError("writing disabled", NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED);
return;
}

OperationsManager->RegisterLock(lockId, Generation());
auto writeOperation = OperationsManager->RegisterOperation(
pathId, lockId, cookie, granuleShardingVersionId, *mType, AppDataVerified().FeatureFlags.GetEnableWritePortionsOnInsert());

AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_WRITE)("writing_size", arrowData->GetSize())("operation_id", writeOperation->GetIdentifier())(
"in_flight", Counters.GetWritesMonitor()->GetWritesInFlight())("size_in_flight", Counters.GetWritesMonitor()->GetWritesSizeInFlight());
Counters.GetWritesMonitor()->OnStartWrite(arrowData->GetSize());

Y_ABORT_UNLESS(writeOperation);
writeOperation->SetBehaviour(behaviour);
NOlap::TWritingContext wContext(TabletID(), SelfId(), schema, StoragesManager, Counters.GetIndexationCounters().SplitterCounters,
Counters.GetCSCounters().WritingCounters, NOlap::TSnapshot::Max(), writeOperation->GetActivityChecker());
arrowData->SetSeparationPoints(GetIndexAs<NOlap::TColumnEngineForLogs>().GetGranulePtrVerified(pathId)->GetBucketPositions());
writeOperation->Start(*this, arrowData, source, wContext);
WriteTasksQueue->TryEnqueue(
this, ctx, TWriteTask(arrowData, schema, source, granuleShardingVersionId, pathId, cookie, lockId, *mType, behaviour));
WriteTasksQueue->Drain(false, ctx);
}

} // namespace NKikimr::NColumnShard
2 changes: 2 additions & 0 deletions ydb/core/tx/columnshard/columnshard_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
#include <ydb/core/tx/columnshard/engines/changes/compaction.h>
#include <ydb/core/tx/columnshard/engines/column_engine_logs.h>
#include <ydb/core/tx/columnshard/engines/scheme/schema_version.h>
#include <ydb/core/tx/columnshard/tablet/write_queue.h>
#include <ydb/core/tx/conveyor/usage/service.h>
#include <ydb/core/tx/priorities/usage/abstract.h>
#include <ydb/core/tx/priorities/usage/events.h>
Expand Down Expand Up @@ -77,6 +78,7 @@ TColumnShard::TColumnShard(TTabletStorageInfo* info, const TActorId& tablet)
, TabletCountersHolder(new TProtobufTabletCounters<ESimpleCounters_descriptor, ECumulativeCounters_descriptor,
EPercentileCounters_descriptor, ETxTypes_descriptor>())
, Counters(*TabletCountersHolder)
, WriteTasksQueue(std::make_unique<TWriteTasksQueue>(this))
, ProgressTxController(std::make_unique<TTxController>(*this))
, StoragesManager(std::make_shared<NOlap::TStoragesManager>(*this))
, DataLocksManager(std::make_shared<NOlap::NDataLocks::TManager>())
Expand Down
9 changes: 8 additions & 1 deletion ydb/core/tx/columnshard/columnshard_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ class TGeneralCompactColumnEngineChanges;

namespace NKikimr::NColumnShard {

class TArrowData;
class TEvWriteCommitPrimaryTransactionOperator;
class TEvWriteCommitSecondaryTransactionOperator;
class TTxFinishAsyncTransaction;
Expand All @@ -99,6 +100,8 @@ class TOperationsManager;
class TWaitEraseTablesTxSubscriber;
class TTxBlobsWritingFinished;
class TTxBlobsWritingFailed;
class TWriteTasksQueue;
class TWriteTask;

namespace NLoading {
class TInsertTableInitializer;
Expand Down Expand Up @@ -228,6 +231,8 @@ class TColumnShard: public TActor<TColumnShard>, public NTabletFlatExecutor::TTa
friend class NLoading::TInFlightReadsInitializer;
friend class NLoading::TSpecialValuesInitializer;
friend class NLoading::TTablesManagerInitializer;
friend class TWriteTasksQueue;
friend class TWriteTask;

class TTxProgressTx;
class TTxProposeCancel;
Expand Down Expand Up @@ -373,7 +378,8 @@ class TColumnShard: public TActor<TColumnShard>, public NTabletFlatExecutor::TTa
private:
void OverloadWriteFail(const EOverloadStatus overloadReason, const NEvWrite::TWriteMeta& writeMeta, const ui64 writeSize, const ui64 cookie,
std::unique_ptr<NActors::IEventBase>&& event, const TActorContext& ctx);
EOverloadStatus CheckOverloaded(const ui64 tableId) const;
EOverloadStatus CheckOverloadedImmediate(const ui64 tableId) const;
EOverloadStatus CheckOverloadedWait(const ui64 tableId) const;

protected:
STFUNC(StateInit) {
Expand Down Expand Up @@ -464,6 +470,7 @@ class TColumnShard: public TActor<TColumnShard>, public NTabletFlatExecutor::TTa
private:
std::unique_ptr<TTabletCountersBase> TabletCountersHolder;
TCountersManager Counters;
std::unique_ptr<TWriteTasksQueue> WriteTasksQueue;

std::unique_ptr<TTxController> ProgressTxController;
std::unique_ptr<TOperationsManager> OperationsManager;
Expand Down
14 changes: 12 additions & 2 deletions ydb/core/tx/columnshard/counters/columnshard.h
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
#pragma once
#include "common/owner.h"
#include "initialization.h"
#include "tx_progress.h"

#include "common/owner.h"

#include <ydb/core/tx/columnshard/counters/tablet_counters.h>

#include <library/cpp/monlib/dynamic_counters/counters.h>
Expand All @@ -26,14 +27,23 @@ class TWriteCounters: public TCommonCountersOwner {
NMonitoring::TDynamicCounters::TCounterPtr VolumeWriteData;
NMonitoring::THistogramPtr HistogramBytesWriteDataCount;
NMonitoring::THistogramPtr HistogramBytesWriteDataBytes;
NMonitoring::THistogramPtr HistogramDurationQueueWait;

public:
const NMonitoring::TDynamicCounters::TCounterPtr QueueWaitSize;

void OnWritingTaskDequeue(const TDuration d){
HistogramDurationQueueWait->Collect(d.MilliSeconds());
}

TWriteCounters(TCommonCountersOwner& owner)
: TBase(owner, "activity", "writing")
, QueueWaitSize(TBase::GetValue("Write/Queue/Size"))
{
VolumeWriteData = TBase::GetDeriviative("Write/Incoming/Bytes");
HistogramBytesWriteDataCount = TBase::GetHistogram("Write/Incoming/ByBytes/Count", NMonitoring::ExponentialHistogram(18, 2, 100));
HistogramBytesWriteDataBytes = TBase::GetHistogram("Write/Incoming/ByBytes/Bytes", NMonitoring::ExponentialHistogram(18, 2, 100));
HistogramDurationQueueWait = TBase::GetHistogram("Write/Queue/Waiting/DurationMs", NMonitoring::ExponentialHistogram(18, 2, 100));
}

void OnIncomingData(const ui64 dataSize) const {
Expand Down Expand Up @@ -231,4 +241,4 @@ class TCSCounters: public TCommonCountersOwner {
TCSCounters();
};

}
} // namespace NKikimr::NColumnShard
59 changes: 59 additions & 0 deletions ydb/core/tx/columnshard/tablet/write_queue.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
#include "write_queue.h"

#include <ydb/core/tx/columnshard/columnshard_impl.h>
#include <ydb/core/tx/columnshard/operations/write_data.h>
#include <ydb/core/tx/data_events/write_data.h>

namespace NKikimr::NColumnShard {

bool TWriteTask::Execute(TColumnShard* owner, const TActorContext& ctx) {
auto overloadStatus = owner->CheckOverloadedWait(PathId);
if (overloadStatus != TColumnShard::EOverloadStatus::None) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_WRITE)("event", "wait_overload")("status", overloadStatus);
return false;
}

owner->OperationsManager->RegisterLock(LockId, owner->Generation());
auto writeOperation = owner->OperationsManager->RegisterOperation(
PathId, LockId, Cookie, GranuleShardingVersionId, ModificationType, AppDataVerified().FeatureFlags.GetEnableWritePortionsOnInsert());

AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_WRITE)("writing_size", ArrowData->GetSize())("operation_id", writeOperation->GetIdentifier())(
"in_flight", owner->Counters.GetWritesMonitor()->GetWritesInFlight())(
"size_in_flight", owner->Counters.GetWritesMonitor()->GetWritesSizeInFlight());

AFL_VERIFY(writeOperation);
writeOperation->SetBehaviour(Behaviour);
NOlap::TWritingContext wContext(owner->TabletID(), owner->SelfId(), Schema, owner->StoragesManager,
owner->Counters.GetIndexationCounters().SplitterCounters, owner->Counters.GetCSCounters().WritingCounters, NOlap::TSnapshot::Max(),
writeOperation->GetActivityChecker());
ArrowData->SetSeparationPoints(owner->GetIndexAs<NOlap::TColumnEngineForLogs>().GetGranulePtrVerified(PathId)->GetBucketPositions());
writeOperation->Start(*owner, ArrowData, SourceId, wContext);
return true;
}

bool TWriteTasksQueue::Drain(const bool onWakeup, const TActorContext& ctx) {
if (onWakeup) {
WriteTasksOverloadCheckerScheduled = false;
}
while (WriteTasks.size() && WriteTasks.front().Execute(Owner, ctx)) {
WriteTasks.pop_front();
}
if (WriteTasks.size() && !WriteTasksOverloadCheckerScheduled) {
Owner->Schedule(TDuration::MilliSeconds(300), new NActors::TEvents::TEvWakeup(1));
WriteTasksOverloadCheckerScheduled = true;
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "queue_on_write")("size", WriteTasks.size());
}
Owner->Counters.GetCSCounters().WritingCounters->QueueWaitSize->Add((i64)WriteTasks.size() - PredWriteTasksSize);
PredWriteTasksSize = (i64)WriteTasks.size();
return !WriteTasks.size();
}

void TWriteTasksQueue::Enqueue(TWriteTask&& task) {
WriteTasks.emplace_back(std::move(task));
}

TWriteTasksQueue::~TWriteTasksQueue() {
Owner->Counters.GetCSCounters().WritingCounters->QueueWaitSize->Sub(PredWriteTasksSize);
}

} // namespace NKikimr::NColumnShard
62 changes: 62 additions & 0 deletions ydb/core/tx/columnshard/tablet/write_queue.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
#pragma once
#include <ydb/core/tx/columnshard/engines/scheme/versions/abstract_scheme.h>
#include <ydb/core/tx/columnshard/operations/write.h>
#include <ydb/core/tx/data_events/common/modification_type.h>

namespace NKikimr::NColumnShard {
class TColumnShard;
class TArrowData;
class TWriteTask: TMoveOnly {
private:
std::shared_ptr<TArrowData> ArrowData;
NOlap::ISnapshotSchema::TPtr Schema;
const NActors::TActorId SourceId;
const std::optional<ui32> GranuleShardingVersionId;
const ui64 PathId;
const ui64 Cookie;
const ui64 LockId;
const NEvWrite::EModificationType ModificationType;
const EOperationBehaviour Behaviour;
const TMonotonic Created = TMonotonic::Now();

public:
TWriteTask(const std::shared_ptr<TArrowData>& arrowData, const NOlap::ISnapshotSchema::TPtr& schema, const NActors::TActorId sourceId,
const std::optional<ui32>& granuleShardingVersionId, const ui64 pathId, const ui64 cookie, const ui64 lockId,
const NEvWrite::EModificationType modificationType, const EOperationBehaviour behaviour)
: ArrowData(arrowData)
, Schema(schema)
, SourceId(sourceId)
, GranuleShardingVersionId(granuleShardingVersionId)
, PathId(pathId)
, Cookie(cookie)
, LockId(lockId)
, ModificationType(modificationType)
, Behaviour(behaviour) {
}

const TMonotonic& GetCreatedMonotonic() const {
return Created;
}

bool Execute(TColumnShard* owner, const TActorContext& ctx);
};

class TWriteTasksQueue {
private:
bool WriteTasksOverloadCheckerScheduled = false;
std::deque<TWriteTask> WriteTasks;
i64 PredWriteTasksSize = 0;
TColumnShard* Owner;

public:
TWriteTasksQueue(TColumnShard* owner)
: Owner(owner) {
}

~TWriteTasksQueue();

void Enqueue(TWriteTask&& task);
bool Drain(const bool onWakeup, const TActorContext& ctx);
};

} // namespace NKikimr::NColumnShard
11 changes: 11 additions & 0 deletions ydb/core/tx/columnshard/tablet/ya.make
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
LIBRARY()

SRCS(
write_queue.cpp
)

PEERDIR(
ydb/core/tx/columnshard/hooks/abstract
)

END()
Loading

0 comments on commit 49966cc

Please sign in to comment.