Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KIKIMR-20042 Added table with statistics to LoadActor html report #565

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 107 additions & 37 deletions ydb/core/load_test/keyvalue_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <ydb/core/control/immediate_control_board_impl.h>
#include <ydb/core/keyvalue/keyvalue_events.h>

#include <library/cpp/histogram/hdr/histogram.h>
#include <library/cpp/monlib/service/pages/templates.h>
#include <library/cpp/time_provider/time_provider.h>

Expand All @@ -17,12 +18,6 @@
namespace NKikimr {
class TKeyValueWriterLoadTestActor;

#define PARAM(NAME, VALUE) \
TABLER() { \
TABLED() { str << NAME; } \
TABLED() { str << VALUE; } \
}

class TWorker {
friend class TKeyValueWriterLoadTestActor;

Expand All @@ -39,6 +34,10 @@ class TWorker {
TString DataBuffer;
TReallyFastRng32 *Gen;
bool IsDying = false;

ui64 Errors = 0;
ui64 OutOfBoundsLatencies = 0;
NHdr::THistogram LatencyHistogram{6'000'000, 4};
public:

TWorker(const NKikimr::TEvLoadTestRequest::TKeyValueLoad::TWorkerConfig& cmd,
Expand Down Expand Up @@ -87,12 +86,23 @@ class TWorker {
return ev;
}

void OnResult(ui32 size) {
--ItemsInFlight;
BytesInFlight -= size;
void OnSuccess(ui32 size, TDuration responseTime) {
ReduceInFlight(size);
if (!LatencyHistogram.RecordValue(responseTime.MicroSeconds())) {
LOG_INFO_S(*NActors::TActivationContext::ActorSystem(), NKikimrServices::BS_LOAD_TEST, "Worker# " << Idx << " skipped recording of " << responseTime << " response time");
++OutOfBoundsLatencies;
}
}

~TWorker() {
void OnFailure(ui32 size) {
ReduceInFlight(size);
++Errors;
}

private:
void ReduceInFlight(ui32 size) {
--ItemsInFlight;
BytesInFlight -= size;
}
};

Expand Down Expand Up @@ -123,6 +133,7 @@ class TKeyValueWriterLoadTestActor : public TActorBootstrapped<TKeyValueWriterLo
ui64 ReqIdx = 0;
ui32 DurationSeconds;
i32 OwnerInitInProgress = 0;
TString ConfigString;

TReallyFastRng32 Rng;

Expand Down Expand Up @@ -170,6 +181,8 @@ class TKeyValueWriterLoadTestActor : public TActorBootstrapped<TKeyValueWriterLo
GetSubgroup("tablet", Sprintf("%09" PRIu64, TabletId));
KeyValueBytesWritten = LoadCounters->GetCounter("KeyValueBytesWritten", true);
ResponseTimes.Initialize(LoadCounters, "subsystem", "LoadActorLogWriteDuration", "Time in microseconds", percentiles);

google::protobuf::TextFormat::PrintToString(cmd, &ConfigString);
}

~TKeyValueWriterLoadTestActor() {
Expand Down Expand Up @@ -198,9 +211,9 @@ class TKeyValueWriterLoadTestActor : public TActorBootstrapped<TKeyValueWriterLo
for (auto& worker : Workers) {
AppData(ctx)->Icb->RegisterLocalControl(worker->MaxInFlight,
Sprintf("KeyValueWriteLoadActor_MaxInFlight_%04" PRIu64 "_%04" PRIu32, Tag, worker->Idx));
LOG_INFO_S(ctx, NKikimrServices::BS_LOAD_TEST, "Tag# " << Tag << " last TEvKeyValueResult, "
<< " all workers is initialized, start test");
}
LOG_INFO_S(ctx, NKikimrServices::BS_LOAD_TEST, "Tag# " << Tag << " last TEvKeyValueResult, "
<< "all workers are initialized, start test");
EarlyStop = false;
Connect(ctx);
}
Expand Down Expand Up @@ -232,7 +245,10 @@ class TKeyValueWriterLoadTestActor : public TActorBootstrapped<TKeyValueWriterLo
}
const TString errorReason = EarlyStop ?
"Abort, stop signal received" : "OK, called StartDeathProcess";
ctx.Send(Parent, new TEvLoad::TEvLoadTestFinished(Tag, report, errorReason));
auto* finishEv = new TEvLoad::TEvLoadTestFinished(Tag, report, errorReason);
finishEv->LastHtmlPage = RenderHTML(false);

ctx.Send(Parent, finishEv);
NTabletPipe::CloseClient(SelfId(), Pipe);
Die(ctx);
}
Expand All @@ -258,8 +274,8 @@ class TKeyValueWriterLoadTestActor : public TActorBootstrapped<TKeyValueWriterLo
void SendWriteRequests(const TActorContext& ctx) {
ui64 sent = 0;
for (auto& worker : Workers) {
auto now = TAppData::TimeProvider->Now();
while (std::unique_ptr<TEvKeyValue::TEvRequest> ev = worker->TrySend()) {
auto now = TAppData::TimeProvider->Now();
ui64 size = ev->Record.GetCmdWrite(0).GetValue().size();
*KeyValueBytesWritten += size;
ev->Record.SetCookie(ReqIdx);
Expand All @@ -275,24 +291,21 @@ class TKeyValueWriterLoadTestActor : public TActorBootstrapped<TKeyValueWriterLo
void Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorContext& ctx) {
auto msg = ev->Get();
auto record = msg->Record;
if (record.GetStatus() != NMsgBusProxy::MSTATUS_OK) {
TStringStream str;
str << " TEvKeyValue::TEvResponse is not OK, msg.ToString()# " << msg->ToString();
LOG_ERROR_S(ctx, NKikimrServices::BS_LOAD_TEST, str.Str());
ctx.Send(Parent, new TEvLoad::TEvLoadTestFinished(Tag, nullptr, str.Str()));
NTabletPipe::CloseClient(SelfId(), Pipe);
Die(ctx);
return;
}

auto now = TAppData::TimeProvider->Now();
auto it = InFlightWrites.find(record.GetCookie());
Y_ABORT_UNLESS(it != InFlightWrites.end());
const auto& stats = it->second;
ResponseTimes.Increment((now - stats.SentTime).MicroSeconds());
auto responseTime = TAppData::TimeProvider->Now() - stats.SentTime;
ResponseTimes.Increment(responseTime.MicroSeconds());
auto& worker = Workers[stats.WorkerIdx];

worker->OnResult(stats.Size);
if (record.GetStatus() == NMsgBusProxy::MSTATUS_OK) {
worker->OnSuccess(stats.Size, responseTime);
} else {
LOG_WARN_S(ctx, NKikimrServices::BS_LOAD_TEST, " TEvKeyValue::TEvResponse is not OK, msg.ToString()# " << msg->ToString());
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure about the prio, maybe it should be ERROR as before


worker->OnFailure(stats.Size);
}
WrittenBytes = WrittenBytes + stats.Size;
LOG_TRACE_S(ctx, NKikimrServices::BS_LOAD_TEST, "Tag# " << Tag << " EvResult, "
<< " WrittenBytes# " << WrittenBytes);
Expand All @@ -301,29 +314,86 @@ class TKeyValueWriterLoadTestActor : public TActorBootstrapped<TKeyValueWriterLo
SendWriteRequests(ctx);
}

void Handle(NMon::TEvHttpInfo::TPtr& ev, const TActorContext& ctx) {
TString RenderHTML(bool showPassedTime) {
TStringStream str;
HTML(str) {
TABLE() {
if (showPassedTime) {
PARA() {
str << "Time passed: " << (TAppData::TimeProvider->Now() - TestStartTime).Seconds() << "s / "
<< DurationSeconds << "s";
}
}
TABLE_CLASS("table table-condenced") {
TABLEHEAD() {
TABLER() {
TABLEH() { str << "Parameter"; }
TABLEH() { str << "Value"; }
TABLEH() {
str << "Worker#";
}
TABLEH() {
str << "Writes";
}
TABLEH() {
str << "Errors";
}
TABLEH() {
str << "OOB Latencies";
}
TABLEH() {
str << "p50(ms)";
}
TABLEH() {
str << "p95(ms)";
}
TABLEH() {
str << "p99(ms)";
}
TABLEH() {
str << "pMax(ms)";
}
}
}
TABLEBODY() {

PARAM("Elapsed time / Duration", (TAppData::TimeProvider->Now() - TestStartTime).Seconds() << "s / "
<< DurationSeconds << "s");
for (auto& worker : Workers) {
PARAM("Worker idx", worker->Idx);
PARAM("Worker next OperationIdx", worker->OperationIdx);
for (auto& worker: Workers) {
TABLER() {
TABLED() {
str << worker->Idx;
}
TABLED() {
str << worker->LatencyHistogram.GetTotalCount();
}
TABLED() {
str << worker->Errors;
}
TABLED() {
str << worker->OutOfBoundsLatencies;
}
TABLED() {
str << worker->LatencyHistogram.GetValueAtPercentile(50.0) / 1000.0;
}
TABLED() {
str << worker->LatencyHistogram.GetValueAtPercentile(95.0) / 1000.0;
}
TABLED() {
str << worker->LatencyHistogram.GetValueAtPercentile(99.0) / 1000.0;
}
TABLED() {
str << worker->LatencyHistogram.GetMax() / 1000.0;
}
}
}
}
}
COLLAPSED_BUTTON_CONTENT(Sprintf("configProtobuf%" PRIu64, Tag), "Config") {
PRE() {
str << ConfigString;
}
}
}
return str.Str();
}

ctx.Send(ev->Sender, new NMon::TEvHttpInfoRes(str.Str(), ev->Get()->SubRequestId));
void Handle(NMon::TEvHttpInfo::TPtr& ev, const TActorContext& ctx) {
ctx.Send(ev->Sender, new NMon::TEvHttpInfoRes(RenderHTML(true), ev->Get()->SubRequestId));
}

void Handle(TEvTabletPipe::TEvClientConnected::TPtr ev, const TActorContext& ctx) {
Expand Down
Loading