Skip to content

Commit

Permalink
Support KV patching in TestShard
Browse files Browse the repository at this point in the history
  • Loading branch information
alexvru committed Feb 8, 2024
1 parent 0ffd6c5 commit 7c11288
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 6 deletions.
1 change: 1 addition & 0 deletions ydb/core/protos/msgbus.proto
Original file line number Diff line number Diff line change
Expand Up @@ -773,6 +773,7 @@ message TTestShardControlRequest {
repeated TSizeInterval Sizes = 6; // distrubution of generated value size
repeated TTimeInterval WritePeriods = 7; // time between two events
repeated TTimeInterval RestartPeriods = 8; // time between automatic restarts
optional uint32 PatchRequestsFractionPPM = 12;
}

optional uint64 TabletId = 1;
Expand Down
27 changes: 21 additions & 6 deletions ydb/core/test_tablet/load_actor_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ namespace NKikimr::NTestShard {
return;
}
if (StallCounter > 500) {
if (WritesInFlight.empty() && DeletesInFlight.empty() && ReadsInFlight.empty() && TransitionInFlight.empty()) {
if (WritesInFlight.empty() && PatchesInFlight.empty() && DeletesInFlight.empty() && ReadsInFlight.empty() &&
TransitionInFlight.empty()) {
StallCounter = 0;
} else {
return;
Expand All @@ -70,17 +71,23 @@ namespace NKikimr::NTestShard {
barrier = Settings.GetValidateAfterBytes();
}
if (BytesProcessed > barrier) { // time to perform validation
if (WritesInFlight.empty() && DeletesInFlight.empty() && ReadsInFlight.empty() && TransitionInFlight.empty()) {
if (WritesInFlight.empty() && PatchesInFlight.empty() && DeletesInFlight.empty() && ReadsInFlight.empty() &&
TransitionInFlight.empty()) {
RunValidation(false);
}
} else { // resume load
const TMonotonic now = TActivationContext::Monotonic();

bool canWriteMore = false;
if (WritesInFlight.size() < Settings.GetMaxInFlight()) {
if (WritesInFlight.size() + PatchesInFlight.size() < Settings.GetMaxInFlight()) {
if (NextWriteTimestamp <= now) {
IssueWrite();
if (WritesInFlight.size() < Settings.GetMaxInFlight() || !Settings.GetResetWritePeriodOnFull()) {
if (Settings.HasPatchRequestsFractionPPM() && !ConfirmedKeys.empty() &&
RandomNumber(1'000'000u) < Settings.GetPatchRequestsFractionPPM()) {
IssuePatch();
} else {
IssueWrite();
}
if (WritesInFlight.size() + PatchesInFlight.size() < Settings.GetMaxInFlight() || !Settings.GetResetWritePeriodOnFull()) {
NextWriteTimestamp += GenerateRandomInterval(Settings.GetWritePeriods());
canWriteMore = NextWriteTimestamp <= now;
} else {
Expand Down Expand Up @@ -177,6 +184,13 @@ namespace NKikimr::NTestShard {
}
WritesInFlight.erase(it);
}
if (auto nh = PatchesInFlight.extract(record.GetCookie())) {
const TString& key = nh.mapped();
const auto it = Keys.find(key);
Y_VERIFY_S(it != Keys.end(), "Key# " << key << " not found in Keys dict");
STLOG(PRI_WARN, TEST_SHARD, TS27, "patch failed", (TabletId, TabletId), (Key, key));
RegisterTransition(*it, ::NTestShard::TStateServer::WRITE_PENDING, ::NTestShard::TStateServer::DELETED);
}
if (const auto it = DeletesInFlight.find(record.GetCookie()); it != DeletesInFlight.end()) {
for (const TString& key : it->second.KeysInQuery) {
const auto it = Keys.find(key);
Expand Down Expand Up @@ -209,10 +223,11 @@ namespace NKikimr::NTestShard {
};
STLOG(PRI_INFO, TEST_SHARD, TS04, "TEvKeyValue::TEvResponse", (TabletId, TabletId), (Msg, makeResponse()));
ProcessWriteResult(record.GetCookie(), record.GetWriteResult());
ProcessPatchResult(record.GetCookie(), record.GetPatchResult());
ProcessDeleteResult(record.GetCookie(), record.GetDeleteRangeResult());
ProcessReadResult(record.GetCookie(), record.GetReadResult(), *ev->Get());
}
if (WritesInFlight.size() != Settings.GetMaxInFlight() && NextWriteTimestamp == TMonotonic::Max()) {
if (WritesInFlight.size() + PatchesInFlight.size() != Settings.GetMaxInFlight() && NextWriteTimestamp == TMonotonic::Max()) {
NextWriteTimestamp = TMonotonic::Now() + GenerateRandomInterval(Settings.GetWritePeriods());
}
Action();
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/test_tablet/load_actor_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ namespace NKikimr::NTestShard {
ui64 BytesOfData = 0;

std::unordered_map<ui64, TWriteInfo> WritesInFlight; // cookie -> TWriteInfo
std::unordered_map<ui64, TString> PatchesInFlight;
ui32 KeysWritten = 0;
static constexpr TDuration WriteSpeedWindow = TDuration::Seconds(10);
static constexpr TDuration ReadSpeedWindow = TDuration::Seconds(10);
Expand All @@ -136,7 +137,9 @@ namespace NKikimr::NTestShard {

void GenerateKeyValue(TString *key, TString *value, bool *isInline);
void IssueWrite();
void IssuePatch();
void ProcessWriteResult(ui64 cookie, const google::protobuf::RepeatedPtrField<NKikimrClient::TKeyValueResponse::TWriteResult>& results);
void ProcessPatchResult(ui64 cookie, const google::protobuf::RepeatedPtrField<NKikimrClient::TKeyValueResponse::TPatchResult>& results);
void TrimBytesWritten(TInstant now);
void HandleWriteOnTime();
void HandleDoSomeAction();
Expand Down
5 changes: 5 additions & 0 deletions ydb/core/test_tablet/load_actor_mon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,11 @@ namespace NKikimr::NTestShard {
TABLED() { str << self->WritesInFlight.size(); }
}

TABLER() {
TABLED() { str << "Patches in flight"; }
TABLED() { str << self->PatchesInFlight.size(); }
}

TABLER() {
TABLED() { str << "Reads in flight"; }
TABLED() { str << self->ReadsInFlight.size() << '/' << self->KeysBeingRead.size(); }
Expand Down
1 change: 1 addition & 0 deletions ydb/core/test_tablet/load_actor_read_validate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ namespace NKikimr::NTestShard {
{
// ensure no concurrent operations are running
Y_ABORT_UNLESS(self.WritesInFlight.empty());
Y_ABORT_UNLESS(self.PatchesInFlight.empty());
Y_ABORT_UNLESS(self.DeletesInFlight.empty());
Y_ABORT_UNLESS(self.TransitionInFlight.empty());
for (auto& [key, info] : KeysBefore) {
Expand Down
74 changes: 74 additions & 0 deletions ydb/core/test_tablet/load_actor_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,67 @@ namespace NKikimr::NTestShard {
BytesProcessed += value.size();
}

void TLoadActor::IssuePatch() {
Y_ABORT_UNLESS(!ConfirmedKeys.empty());
const size_t index = RandomNumber(ConfirmedKeys.size());
const TString originalKey = ConfirmedKeys[index];

// extract length from the original key -- it may not change
ui64 len, seed, id;
StringSplitter(originalKey).Split(',').CollectInto(&len, &seed, &id);
TString originalValue = FastGenDataForLZ4(len, seed);

// generate patched key
seed = RandomNumber<ui64>();
id = RandomNumber<ui64>();
const TString patchedKey = TStringBuilder() << len << ',' << seed << ',' << id;

// generate random value for the new key
TString patchedValue = FastGenDataForLZ4(len, seed);

auto ev = CreateRequest();
auto& r = ev->Record;
auto *patch = r.AddCmdPatch();
patch->SetOriginalKey(originalKey);
patch->SetPatchedKey(patchedKey);

TRope rope(patchedValue);
ui64 offset = 0;
for (size_t chunks = 0; offset != len; ++chunks) {
// skip matching parts
while (offset + 1 < len && originalValue[offset] == patchedValue[offset]) {
++offset;
}
Y_ABORT_UNLESS(offset < len);

// add patched part
size_t pos = offset + 1;
while (pos < len && originalValue[pos] != patchedValue[pos]) {
++pos;
}
const size_t size = (chunks < 8 ? pos : len) - offset;

auto *diff = patch->AddDiffs();
diff->SetOffset(offset);
if (RandomNumber(2u)) {
diff->SetPayloadId(ev->AddPayload(TRope(rope.Position(offset), rope.Position(offset + size))));
} else {
diff->SetValue(patchedValue.substr(offset, size));
}
offset += size;
}

auto [pifIt, pifInserted] = PatchesInFlight.try_emplace(r.GetCookie(), patchedKey);
Y_ABORT_UNLESS(pifInserted);

auto [it, inserted] = Keys.try_emplace(patchedKey, len);
Y_ABORT_UNLESS(inserted);
RegisterTransition(*it, ::NTestShard::TStateServer::ABSENT, ::NTestShard::TStateServer::WRITE_PENDING, std::move(ev));

++KeysWritten;
BytesProcessed += len;
}

void TLoadActor::ProcessWriteResult(ui64 cookie, const google::protobuf::RepeatedPtrField<NKikimrClient::TKeyValueResponse::TWriteResult>& results) {
if (const auto wifIt = WritesInFlight.find(cookie); wifIt != WritesInFlight.end()) {
TWriteInfo& info = wifIt->second;
Expand All @@ -69,4 +130,17 @@ namespace NKikimr::NTestShard {
}
}

void TLoadActor::ProcessPatchResult(ui64 cookie, const google::protobuf::RepeatedPtrField<NKikimrClient::TKeyValueResponse::TPatchResult>& results) {
if (auto nh = PatchesInFlight.extract(cookie)) {
Y_ABORT_UNLESS(results.size() == 1);
const auto& res = results[0];
Y_VERIFY_S(res.GetStatus() == NKikimrProto::OK, "TabletId# " << TabletId << " CmdPatch failed Status# "
<< NKikimrProto::EReplyStatus_Name(NKikimrProto::EReplyStatus(res.GetStatus())));
const TString& key = nh.mapped();
const auto it = Keys.find(key);
Y_VERIFY_S(it != Keys.end(), "Key# " << key << " not found in Keys dict");
RegisterTransition(*it, ::NTestShard::TStateServer::WRITE_PENDING, ::NTestShard::TStateServer::CONFIRMED);
}
}

} // NKikimr::NTestShard

0 comments on commit 7c11288

Please sign in to comment.