Skip to content

Commit

Permalink
KIKIMR-20042 Added table with statistics to LoadActor html report
Browse files Browse the repository at this point in the history
  • Loading branch information
domwst committed Dec 19, 2023
1 parent c3ff64d commit 9056863
Showing 1 changed file with 107 additions and 37 deletions.
144 changes: 107 additions & 37 deletions ydb/core/load_test/keyvalue_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <ydb/core/control/immediate_control_board_impl.h>
#include <ydb/core/keyvalue/keyvalue_events.h>

#include <library/cpp/histogram/hdr/histogram.h>
#include <library/cpp/monlib/service/pages/templates.h>
#include <library/cpp/time_provider/time_provider.h>

Expand All @@ -17,12 +18,6 @@
namespace NKikimr {
class TKeyValueWriterLoadTestActor;

#define PARAM(NAME, VALUE) \
TABLER() { \
TABLED() { str << NAME; } \
TABLED() { str << VALUE; } \
}

class TWorker {
friend class TKeyValueWriterLoadTestActor;

Expand All @@ -39,6 +34,10 @@ class TWorker {
TString DataBuffer;
TReallyFastRng32 *Gen;
bool IsDying = false;

ui64 Errors = 0;
ui64 OutOfBoundsLatencies = 0;
NHdr::THistogram LatencyHistogram{6'000'000, 4};
public:

TWorker(const NKikimr::TEvLoadTestRequest::TKeyValueLoad::TWorkerConfig& cmd,
Expand Down Expand Up @@ -87,12 +86,23 @@ class TWorker {
return ev;
}

void OnResult(ui32 size) {
--ItemsInFlight;
BytesInFlight -= size;
void OnSuccess(ui32 size, TDuration responseTime) {
ReduceInFlight(size);
if (!LatencyHistogram.RecordValue(responseTime.MicroSeconds())) {
LOG_INFO_S(*NActors::TActivationContext::ActorSystem(), NKikimrServices::BS_LOAD_TEST, "Worker# " << Idx << " skipped recording of " << responseTime << " response time");
++OutOfBoundsLatencies;
}
}

~TWorker() {
void OnFailure(ui32 size) {
ReduceInFlight(size);
++Errors;
}

private:
void ReduceInFlight(ui32 size) {
--ItemsInFlight;
BytesInFlight -= size;
}
};

Expand Down Expand Up @@ -123,6 +133,7 @@ class TKeyValueWriterLoadTestActor : public TActorBootstrapped<TKeyValueWriterLo
ui64 ReqIdx = 0;
ui32 DurationSeconds;
i32 OwnerInitInProgress = 0;
TString ConfigString;

TReallyFastRng32 Rng;

Expand Down Expand Up @@ -170,6 +181,8 @@ class TKeyValueWriterLoadTestActor : public TActorBootstrapped<TKeyValueWriterLo
GetSubgroup("tablet", Sprintf("%09" PRIu64, TabletId));
KeyValueBytesWritten = LoadCounters->GetCounter("KeyValueBytesWritten", true);
ResponseTimes.Initialize(LoadCounters, "subsystem", "LoadActorLogWriteDuration", "Time in microseconds", percentiles);

google::protobuf::TextFormat::PrintToString(cmd, &ConfigString);
}

~TKeyValueWriterLoadTestActor() {
Expand Down Expand Up @@ -198,9 +211,9 @@ class TKeyValueWriterLoadTestActor : public TActorBootstrapped<TKeyValueWriterLo
for (auto& worker : Workers) {
AppData(ctx)->Icb->RegisterLocalControl(worker->MaxInFlight,
Sprintf("KeyValueWriteLoadActor_MaxInFlight_%04" PRIu64 "_%04" PRIu32, Tag, worker->Idx));
LOG_INFO_S(ctx, NKikimrServices::BS_LOAD_TEST, "Tag# " << Tag << " last TEvKeyValueResult, "
<< " all workers is initialized, start test");
}
LOG_INFO_S(ctx, NKikimrServices::BS_LOAD_TEST, "Tag# " << Tag << " last TEvKeyValueResult, "
<< "all workers are initialized, start test");
EarlyStop = false;
Connect(ctx);
}
Expand Down Expand Up @@ -232,7 +245,10 @@ class TKeyValueWriterLoadTestActor : public TActorBootstrapped<TKeyValueWriterLo
}
const TString errorReason = EarlyStop ?
"Abort, stop signal received" : "OK, called StartDeathProcess";
ctx.Send(Parent, new TEvLoad::TEvLoadTestFinished(Tag, report, errorReason));
auto* finishEv = new TEvLoad::TEvLoadTestFinished(Tag, report, errorReason);
finishEv->LastHtmlPage = RenderHTML(false);

ctx.Send(Parent, finishEv);
NTabletPipe::CloseClient(SelfId(), Pipe);
Die(ctx);
}
Expand All @@ -258,8 +274,8 @@ class TKeyValueWriterLoadTestActor : public TActorBootstrapped<TKeyValueWriterLo
void SendWriteRequests(const TActorContext& ctx) {
ui64 sent = 0;
for (auto& worker : Workers) {
auto now = TAppData::TimeProvider->Now();
while (std::unique_ptr<TEvKeyValue::TEvRequest> ev = worker->TrySend()) {
auto now = TAppData::TimeProvider->Now();
ui64 size = ev->Record.GetCmdWrite(0).GetValue().size();
*KeyValueBytesWritten += size;
ev->Record.SetCookie(ReqIdx);
Expand All @@ -275,24 +291,21 @@ class TKeyValueWriterLoadTestActor : public TActorBootstrapped<TKeyValueWriterLo
void Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorContext& ctx) {
auto msg = ev->Get();
auto record = msg->Record;
if (record.GetStatus() != NMsgBusProxy::MSTATUS_OK) {
TStringStream str;
str << " TEvKeyValue::TEvResponse is not OK, msg.ToString()# " << msg->ToString();
LOG_ERROR_S(ctx, NKikimrServices::BS_LOAD_TEST, str.Str());
ctx.Send(Parent, new TEvLoad::TEvLoadTestFinished(Tag, nullptr, str.Str()));
NTabletPipe::CloseClient(SelfId(), Pipe);
Die(ctx);
return;
}

auto now = TAppData::TimeProvider->Now();
auto it = InFlightWrites.find(record.GetCookie());
Y_ABORT_UNLESS(it != InFlightWrites.end());
const auto& stats = it->second;
ResponseTimes.Increment((now - stats.SentTime).MicroSeconds());
auto responseTime = TAppData::TimeProvider->Now() - stats.SentTime;
ResponseTimes.Increment(responseTime.MicroSeconds());
auto& worker = Workers[stats.WorkerIdx];

worker->OnResult(stats.Size);
if (record.GetStatus() == NMsgBusProxy::MSTATUS_OK) {
worker->OnSuccess(stats.Size, responseTime);
} else {
LOG_WARN_S(ctx, NKikimrServices::BS_LOAD_TEST, " TEvKeyValue::TEvResponse is not OK, msg.ToString()# " << msg->ToString());

worker->OnFailure(stats.Size);
}
WrittenBytes = WrittenBytes + stats.Size;
LOG_TRACE_S(ctx, NKikimrServices::BS_LOAD_TEST, "Tag# " << Tag << " EvResult, "
<< " WrittenBytes# " << WrittenBytes);
Expand All @@ -301,29 +314,86 @@ class TKeyValueWriterLoadTestActor : public TActorBootstrapped<TKeyValueWriterLo
SendWriteRequests(ctx);
}

void Handle(NMon::TEvHttpInfo::TPtr& ev, const TActorContext& ctx) {
TString RenderHTML(bool showPassedTime) {
TStringStream str;
HTML(str) {
TABLE() {
if (showPassedTime) {
PARA() {
str << "Time passed: " << (TAppData::TimeProvider->Now() - TestStartTime).Seconds() << "s / "
<< DurationSeconds << "s";
}
}
TABLE_CLASS("table table-condenced") {
TABLEHEAD() {
TABLER() {
TABLEH() { str << "Parameter"; }
TABLEH() { str << "Value"; }
TABLEH() {
str << "Worker#";
}
TABLEH() {
str << "Writes";
}
TABLEH() {
str << "Errors";
}
TABLEH() {
str << "OOB Latencies";
}
TABLEH() {
str << "p50(ms)";
}
TABLEH() {
str << "p95(ms)";
}
TABLEH() {
str << "p99(ms)";
}
TABLEH() {
str << "pMax(ms)";
}
}
}
TABLEBODY() {

PARAM("Elapsed time / Duration", (TAppData::TimeProvider->Now() - TestStartTime).Seconds() << "s / "
<< DurationSeconds << "s");
for (auto& worker : Workers) {
PARAM("Worker idx", worker->Idx);
PARAM("Worker next OperationIdx", worker->OperationIdx);
for (auto& worker: Workers) {
TABLER() {
TABLED() {
str << worker->Idx;
}
TABLED() {
str << worker->LatencyHistogram.GetTotalCount();
}
TABLED() {
str << worker->Errors;
}
TABLED() {
str << worker->OutOfBoundsLatencies;
}
TABLED() {
str << worker->LatencyHistogram.GetValueAtPercentile(50.0) / 1000.0;
}
TABLED() {
str << worker->LatencyHistogram.GetValueAtPercentile(95.0) / 1000.0;
}
TABLED() {
str << worker->LatencyHistogram.GetValueAtPercentile(99.0) / 1000.0;
}
TABLED() {
str << worker->LatencyHistogram.GetMax() / 1000.0;
}
}
}
}
}
COLLAPSED_BUTTON_CONTENT(Sprintf("configProtobuf%" PRIu64, Tag), "Config") {
PRE() {
str << ConfigString;
}
}
}
return str.Str();
}

ctx.Send(ev->Sender, new NMon::TEvHttpInfoRes(str.Str(), ev->Get()->SubRequestId));
void Handle(NMon::TEvHttpInfo::TPtr& ev, const TActorContext& ctx) {
ctx.Send(ev->Sender, new NMon::TEvHttpInfoRes(RenderHTML(true), ev->Get()->SubRequestId));
}

void Handle(TEvTabletPipe::TEvClientConnected::TPtr ev, const TActorContext& ctx) {
Expand Down

0 comments on commit 9056863

Please sign in to comment.