diff --git a/ydb/core/protos/msgbus.proto b/ydb/core/protos/msgbus.proto index e55ea8973dde..af96f144ff09 100644 --- a/ydb/core/protos/msgbus.proto +++ b/ydb/core/protos/msgbus.proto @@ -773,6 +773,7 @@ message TTestShardControlRequest { repeated TSizeInterval Sizes = 6; // distrubution of generated value size repeated TTimeInterval WritePeriods = 7; // time between two events repeated TTimeInterval RestartPeriods = 8; // time between automatic restarts + optional uint32 PatchRequestsFractionPPM = 12; } optional uint64 TabletId = 1; diff --git a/ydb/core/test_tablet/load_actor_impl.cpp b/ydb/core/test_tablet/load_actor_impl.cpp index 70c3a701912c..866d62a5771b 100644 --- a/ydb/core/test_tablet/load_actor_impl.cpp +++ b/ydb/core/test_tablet/load_actor_impl.cpp @@ -59,7 +59,8 @@ namespace NKikimr::NTestShard { return; } if (StallCounter > 500) { - if (WritesInFlight.empty() && DeletesInFlight.empty() && ReadsInFlight.empty() && TransitionInFlight.empty()) { + if (WritesInFlight.empty() && PatchesInFlight.empty() && DeletesInFlight.empty() && ReadsInFlight.empty() && + TransitionInFlight.empty()) { StallCounter = 0; } else { return; @@ -70,17 +71,23 @@ namespace NKikimr::NTestShard { barrier = Settings.GetValidateAfterBytes(); } if (BytesProcessed > barrier) { // time to perform validation - if (WritesInFlight.empty() && DeletesInFlight.empty() && ReadsInFlight.empty() && TransitionInFlight.empty()) { + if (WritesInFlight.empty() && PatchesInFlight.empty() && DeletesInFlight.empty() && ReadsInFlight.empty() && + TransitionInFlight.empty()) { RunValidation(false); } } else { // resume load const TMonotonic now = TActivationContext::Monotonic(); bool canWriteMore = false; - if (WritesInFlight.size() < Settings.GetMaxInFlight()) { + if (WritesInFlight.size() + PatchesInFlight.size() < Settings.GetMaxInFlight()) { if (NextWriteTimestamp <= now) { - IssueWrite(); - if (WritesInFlight.size() < Settings.GetMaxInFlight() || !Settings.GetResetWritePeriodOnFull()) { + if (Settings.HasPatchRequestsFractionPPM() && !ConfirmedKeys.empty() && + RandomNumber(1'000'000u) < Settings.GetPatchRequestsFractionPPM()) { + IssuePatch(); + } else { + IssueWrite(); + } + if (WritesInFlight.size() + PatchesInFlight.size() < Settings.GetMaxInFlight() || !Settings.GetResetWritePeriodOnFull()) { NextWriteTimestamp += GenerateRandomInterval(Settings.GetWritePeriods()); canWriteMore = NextWriteTimestamp <= now; } else { @@ -177,6 +184,13 @@ namespace NKikimr::NTestShard { } WritesInFlight.erase(it); } + if (auto nh = PatchesInFlight.extract(record.GetCookie())) { + const TString& key = nh.mapped(); + const auto it = Keys.find(key); + Y_VERIFY_S(it != Keys.end(), "Key# " << key << " not found in Keys dict"); + STLOG(PRI_WARN, TEST_SHARD, TS27, "patch failed", (TabletId, TabletId), (Key, key)); + RegisterTransition(*it, ::NTestShard::TStateServer::WRITE_PENDING, ::NTestShard::TStateServer::DELETED); + } if (const auto it = DeletesInFlight.find(record.GetCookie()); it != DeletesInFlight.end()) { for (const TString& key : it->second.KeysInQuery) { const auto it = Keys.find(key); @@ -209,10 +223,11 @@ namespace NKikimr::NTestShard { }; STLOG(PRI_INFO, TEST_SHARD, TS04, "TEvKeyValue::TEvResponse", (TabletId, TabletId), (Msg, makeResponse())); ProcessWriteResult(record.GetCookie(), record.GetWriteResult()); + ProcessPatchResult(record.GetCookie(), record.GetPatchResult()); ProcessDeleteResult(record.GetCookie(), record.GetDeleteRangeResult()); ProcessReadResult(record.GetCookie(), record.GetReadResult(), *ev->Get()); } - if (WritesInFlight.size() != Settings.GetMaxInFlight() && NextWriteTimestamp == TMonotonic::Max()) { + if (WritesInFlight.size() + PatchesInFlight.size() != Settings.GetMaxInFlight() && NextWriteTimestamp == TMonotonic::Max()) { NextWriteTimestamp = TMonotonic::Now() + GenerateRandomInterval(Settings.GetWritePeriods()); } Action(); diff --git a/ydb/core/test_tablet/load_actor_impl.h b/ydb/core/test_tablet/load_actor_impl.h index 32186991ac42..2a84684a975a 100644 --- a/ydb/core/test_tablet/load_actor_impl.h +++ b/ydb/core/test_tablet/load_actor_impl.h @@ -119,6 +119,7 @@ namespace NKikimr::NTestShard { ui64 BytesOfData = 0; std::unordered_map WritesInFlight; // cookie -> TWriteInfo + std::unordered_map PatchesInFlight; ui32 KeysWritten = 0; static constexpr TDuration WriteSpeedWindow = TDuration::Seconds(10); static constexpr TDuration ReadSpeedWindow = TDuration::Seconds(10); @@ -136,7 +137,9 @@ namespace NKikimr::NTestShard { void GenerateKeyValue(TString *key, TString *value, bool *isInline); void IssueWrite(); + void IssuePatch(); void ProcessWriteResult(ui64 cookie, const google::protobuf::RepeatedPtrField& results); + void ProcessPatchResult(ui64 cookie, const google::protobuf::RepeatedPtrField& results); void TrimBytesWritten(TInstant now); void HandleWriteOnTime(); void HandleDoSomeAction(); diff --git a/ydb/core/test_tablet/load_actor_mon.cpp b/ydb/core/test_tablet/load_actor_mon.cpp index 84c1bcce1365..43d115965732 100644 --- a/ydb/core/test_tablet/load_actor_mon.cpp +++ b/ydb/core/test_tablet/load_actor_mon.cpp @@ -155,6 +155,11 @@ namespace NKikimr::NTestShard { TABLED() { str << self->WritesInFlight.size(); } } + TABLER() { + TABLED() { str << "Patches in flight"; } + TABLED() { str << self->PatchesInFlight.size(); } + } + TABLER() { TABLED() { str << "Reads in flight"; } TABLED() { str << self->ReadsInFlight.size() << '/' << self->KeysBeingRead.size(); } diff --git a/ydb/core/test_tablet/load_actor_read_validate.cpp b/ydb/core/test_tablet/load_actor_read_validate.cpp index d9d548be5f9c..e2674f4198c8 100644 --- a/ydb/core/test_tablet/load_actor_read_validate.cpp +++ b/ydb/core/test_tablet/load_actor_read_validate.cpp @@ -53,6 +53,7 @@ namespace NKikimr::NTestShard { { // ensure no concurrent operations are running Y_ABORT_UNLESS(self.WritesInFlight.empty()); + Y_ABORT_UNLESS(self.PatchesInFlight.empty()); Y_ABORT_UNLESS(self.DeletesInFlight.empty()); Y_ABORT_UNLESS(self.TransitionInFlight.empty()); for (auto& [key, info] : KeysBefore) { diff --git a/ydb/core/test_tablet/load_actor_write.cpp b/ydb/core/test_tablet/load_actor_write.cpp index 4198993cb5fb..0624b5210ece 100644 --- a/ydb/core/test_tablet/load_actor_write.cpp +++ b/ydb/core/test_tablet/load_actor_write.cpp @@ -45,6 +45,67 @@ namespace NKikimr::NTestShard { BytesProcessed += value.size(); } + void TLoadActor::IssuePatch() { + Y_ABORT_UNLESS(!ConfirmedKeys.empty()); + const size_t index = RandomNumber(ConfirmedKeys.size()); + const TString originalKey = ConfirmedKeys[index]; + + // extract length from the original key -- it may not change + ui64 len, seed, id; + StringSplitter(originalKey).Split(',').CollectInto(&len, &seed, &id); + TString originalValue = FastGenDataForLZ4(len, seed); + + // generate patched key + seed = RandomNumber(); + id = RandomNumber(); + const TString patchedKey = TStringBuilder() << len << ',' << seed << ',' << id; + + // generate random value for the new key + TString patchedValue = FastGenDataForLZ4(len, seed); + + auto ev = CreateRequest(); + auto& r = ev->Record; + auto *patch = r.AddCmdPatch(); + patch->SetOriginalKey(originalKey); + patch->SetPatchedKey(patchedKey); + + TRope rope(patchedValue); + ui64 offset = 0; + for (size_t chunks = 0; offset != len; ++chunks) { + // skip matching parts + while (offset + 1 < len && originalValue[offset] == patchedValue[offset]) { + ++offset; + } + Y_ABORT_UNLESS(offset < len); + + // add patched part + size_t pos = offset + 1; + while (pos < len && originalValue[pos] != patchedValue[pos]) { + ++pos; + } + const size_t size = (chunks < 8 ? pos : len) - offset; + + auto *diff = patch->AddDiffs(); + diff->SetOffset(offset); + if (RandomNumber(2u)) { + diff->SetPayloadId(ev->AddPayload(TRope(rope.Position(offset), rope.Position(offset + size)))); + } else { + diff->SetValue(patchedValue.substr(offset, size)); + } + offset += size; + } + + auto [pifIt, pifInserted] = PatchesInFlight.try_emplace(r.GetCookie(), patchedKey); + Y_ABORT_UNLESS(pifInserted); + + auto [it, inserted] = Keys.try_emplace(patchedKey, len); + Y_ABORT_UNLESS(inserted); + RegisterTransition(*it, ::NTestShard::TStateServer::ABSENT, ::NTestShard::TStateServer::WRITE_PENDING, std::move(ev)); + + ++KeysWritten; + BytesProcessed += len; + } + void TLoadActor::ProcessWriteResult(ui64 cookie, const google::protobuf::RepeatedPtrField& results) { if (const auto wifIt = WritesInFlight.find(cookie); wifIt != WritesInFlight.end()) { TWriteInfo& info = wifIt->second; @@ -69,4 +130,17 @@ namespace NKikimr::NTestShard { } } + void TLoadActor::ProcessPatchResult(ui64 cookie, const google::protobuf::RepeatedPtrField& results) { + if (auto nh = PatchesInFlight.extract(cookie)) { + Y_ABORT_UNLESS(results.size() == 1); + const auto& res = results[0]; + Y_VERIFY_S(res.GetStatus() == NKikimrProto::OK, "TabletId# " << TabletId << " CmdPatch failed Status# " + << NKikimrProto::EReplyStatus_Name(NKikimrProto::EReplyStatus(res.GetStatus()))); + const TString& key = nh.mapped(); + const auto it = Keys.find(key); + Y_VERIFY_S(it != Keys.end(), "Key# " << key << " not found in Keys dict"); + RegisterTransition(*it, ::NTestShard::TStateServer::WRITE_PENDING, ::NTestShard::TStateServer::CONFIRMED); + } + } + } // NKikimr::NTestShard