Skip to content

Commit

Permalink
Introduce local SyncLog data cutter (#4124)
Browse files Browse the repository at this point in the history
  • Loading branch information
serbel324 authored May 22, 2024
1 parent 61d0cf5 commit a26dfc6
Show file tree
Hide file tree
Showing 32 changed files with 518 additions and 109 deletions.
11 changes: 9 additions & 2 deletions ydb/core/blobstorage/nodewarden/node_warden_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,15 @@ void TNodeWarden::Bootstrap() {
DsProxyPerPoolCounters = new TDsProxyPerPoolCounters(AppData()->Counters);

if (actorSystem && actorSystem->AppData<TAppData>() && actorSystem->AppData<TAppData>()->Icb) {
actorSystem->AppData<TAppData>()->Icb->RegisterLocalControl(EnablePutBatching, "BlobStorage_EnablePutBatching");
actorSystem->AppData<TAppData>()->Icb->RegisterLocalControl(EnableVPatch, "BlobStorage_EnableVPatch");
const TIntrusivePtr<NKikimr::TControlBoard>& icb = actorSystem->AppData<TAppData>()->Icb;

icb->RegisterLocalControl(EnablePutBatching, "BlobStorage_EnablePutBatching");
icb->RegisterLocalControl(EnableVPatch, "BlobStorage_EnableVPatch");
icb->RegisterSharedControl(EnableLocalSyncLogDataCutting, "VDiskControls.EnableLocalSyncLogDataCutting");
icb->RegisterSharedControl(EnableSyncLogChunkCompressionHDD, "VDiskControls.EnableSyncLogChunkCompressionHDD");
icb->RegisterSharedControl(EnableSyncLogChunkCompressionSSD, "VDiskControls.EnableSyncLogChunkCompressionSSD");
icb->RegisterSharedControl(MaxSyncLogChunksInFlightHDD, "VDiskControls.MaxSyncLogChunksInFlightHDD");
icb->RegisterSharedControl(MaxSyncLogChunksInFlightSSD, "VDiskControls.MaxSyncLogChunksInFlightSSD");
}

// start replication broker
Expand Down
11 changes: 11 additions & 0 deletions ydb/core/blobstorage/nodewarden/node_warden_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,12 @@ namespace NKikimr::NStorage {
TControlWrapper EnablePutBatching;
TControlWrapper EnableVPatch;

TControlWrapper EnableLocalSyncLogDataCutting;
TControlWrapper EnableSyncLogChunkCompressionHDD;
TControlWrapper EnableSyncLogChunkCompressionSSD;
TControlWrapper MaxSyncLogChunksInFlightHDD;
TControlWrapper MaxSyncLogChunksInFlightSSD;

TReplQuoter::TPtr ReplNodeRequestQuoter;
TReplQuoter::TPtr ReplNodeResponseQuoter;

Expand All @@ -148,6 +154,11 @@ namespace NKikimr::NStorage {
: Cfg(cfg)
, EnablePutBatching(Cfg->FeatureFlags.GetEnablePutBatchingForBlobStorage(), false, true)
, EnableVPatch(Cfg->FeatureFlags.GetEnableVPatch(), false, true)
, EnableLocalSyncLogDataCutting(0, 0, 1)
, EnableSyncLogChunkCompressionHDD(1, 0, 1)
, EnableSyncLogChunkCompressionSSD(0, 0, 1)
, MaxSyncLogChunksInFlightHDD(10, 1, 1024)
, MaxSyncLogChunksInFlightSSD(10, 1, 1024)
{
Y_ABORT_UNLESS(Cfg->BlobStorageConfig.GetServiceSet().AvailabilityDomainsSize() <= 1);
AvailDomainId = 1;
Expand Down
10 changes: 10 additions & 0 deletions ydb/core/blobstorage/nodewarden/node_warden_vdisk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,16 @@ namespace NKikimr::NStorage {
vdiskConfig->EnableVDiskCooldownTimeout = Cfg->EnableVDiskCooldownTimeout;
vdiskConfig->ReplPausedAtStart = Cfg->VDiskReplPausedAtStart;
vdiskConfig->EnableVPatch = EnableVPatch;

vdiskConfig->EnableLocalSyncLogDataCutting = EnableLocalSyncLogDataCutting;
if (deviceType == NPDisk::EDeviceType::DEVICE_TYPE_ROT) {
vdiskConfig->EnableSyncLogChunkCompression = EnableSyncLogChunkCompressionHDD;
vdiskConfig->MaxSyncLogChunksInFlight = MaxSyncLogChunksInFlightHDD;
} else {
vdiskConfig->EnableSyncLogChunkCompression = EnableSyncLogChunkCompressionSSD;
vdiskConfig->MaxSyncLogChunksInFlight = MaxSyncLogChunksInFlightSSD;
}

vdiskConfig->FeatureFlags = Cfg->FeatureFlags;

if (Cfg->BlobStorageConfig.HasCostMetricsSettings()) {
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/blobstorage/pdisk/blobstorage_pdisk_ut_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ struct TActorTestContext {
appData->IoContextFactory = IoContext.get();

Runtime->SetLogBackend(IsLowVerbose ? CreateStderrBackend() : CreateNullBackend());
Runtime->Initialize(TTestActorRuntime::TEgg{appData.Release(), nullptr, {}});
Runtime->Initialize(TTestActorRuntime::TEgg{appData.Release(), nullptr, {}, {}});
Runtime->SetLogPriority(NKikimrServices::BS_PDISK, NLog::PRI_NOTICE);
Runtime->SetLogPriority(NKikimrServices::BS_PDISK_SYSLOG, NLog::PRI_NOTICE);
Runtime->SetLogPriority(NKikimrServices::BS_PDISK_TEST, NLog::PRI_DEBUG);
Expand Down
101 changes: 101 additions & 0 deletions ydb/core/blobstorage/ut_blobstorage/sync.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,108 @@
#include <ydb/core/blobstorage/ut_blobstorage/ut_helpers.h>
#include <ydb/core/blobstorage/ut_blobstorage/lib/env.h>
#include <ydb/core/blobstorage/vdisk/common/vdisk_private_events.h>
#include <util/random/random.h>

Y_UNIT_TEST_SUITE(BlobStorageSync) {

void TestCutting(TBlobStorageGroupType groupType) {
const ui32 groupSize = groupType.BlobSubgroupSize();

// for (ui32 mask = 0; mask < (1 << groupSize); ++mask) { // TIMEOUT
{
ui32 mask = RandomNumber(1ull << groupSize);
for (bool compressChunks : { true, false }) {
TEnvironmentSetup env{{
.NodeCount = groupSize,
.Erasure = groupType,
}};

env.CreateBoxAndPool(1, 1);
std::vector<ui32> groups = env.GetGroups();
UNIT_ASSERT_VALUES_EQUAL(groups.size(), 1);
ui32 groupId = groups[0];

const ui64 tabletId = 5000;
const ui32 channel = 10;
ui32 gen = 1;
ui32 step = 1;
ui64 cookie = 1;

ui64 totalSize = 0;

std::vector<TControlWrapper> cutLocalSyncLogControls;
std::vector<TControlWrapper> compressChunksControls;
std::vector<TActorId> edges;

for (ui32 nodeId = 1; nodeId <= groupSize; ++nodeId) {
cutLocalSyncLogControls.emplace_back(0, 0, 1);
compressChunksControls.emplace_back(1, 0, 1);
TAppData* appData = env.Runtime->GetNode(nodeId)->AppData.get();
appData->Icb->RegisterSharedControl(cutLocalSyncLogControls.back(), "VDiskControls.EnableLocalSyncLogDataCutting");
appData->Icb->RegisterSharedControl(compressChunksControls.back(), "VDiskControls.EnableSyncLogChunkCompressionHDD");
edges.push_back(env.Runtime->AllocateEdgeActor(nodeId));
}

for (ui32 i = 0; i < groupSize; ++i) {
env.Runtime->WrapInActorContext(edges[i], [&] {
SendToBSProxy(edges[i], groupId, new TEvBlobStorage::TEvStatus(TInstant::Max()));
});
auto res = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvStatusResult>(edges[i], false);
UNIT_ASSERT_VALUES_EQUAL(res->Get()->Status, NKikimrProto::OK);
}

auto writeBlob = [&](ui32 nodeId, ui32 blobSize) {
TLogoBlobID blobId(tabletId, gen, step, channel, blobSize, ++cookie);
totalSize += blobSize;
TString data = MakeData(blobSize);

const TActorId& sender = edges[nodeId - 1];
env.Runtime->WrapInActorContext(sender, [&] () {
SendToBSProxy(sender, groupId, new TEvBlobStorage::TEvPut(blobId, std::move(data), TInstant::Max()));
});
};

env.Runtime->FilterFunction = [&](ui32/* nodeId*/, std::unique_ptr<IEventHandle>& ev) {
switch(ev->Type) {
case TEvBlobStorage::TEvPutResult::EventType:
UNIT_ASSERT_VALUES_EQUAL(ev->Get<TEvBlobStorage::TEvPutResult>()->Status, NKikimrProto::OK);
return false;
default:
return true;
}
};

while (totalSize < 16_MB) {
writeBlob(GenerateRandom(1, groupSize + 1), GenerateRandom(1, 1_MB));
}
env.Sim(TDuration::Minutes(5));

for (ui32 i = 0; i < groupSize; ++i) {
cutLocalSyncLogControls[i] = !!(mask & (1 << i));
compressChunksControls[i] = compressChunks;
}

while (totalSize < 32_MB) {
writeBlob(GenerateRandom(1, groupSize + 1), GenerateRandom(1, 1_MB));
}

env.Sim(TDuration::Minutes(5));
}
}
}

Y_UNIT_TEST(TestSyncLogCuttingMirror3dc) {
TestCutting(TBlobStorageGroupType::ErasureMirror3dc);
}

Y_UNIT_TEST(TestSyncLogCuttingMirror3of4) {
TestCutting(TBlobStorageGroupType::ErasureMirror3of4);
}

Y_UNIT_TEST(TestSyncLogCuttingBlock4Plus2) {
TestCutting(TBlobStorageGroupType::Erasure4Plus2Block);
}

Y_UNIT_TEST(SyncWhenDiskGetsDown) {
return; // re-enable when protocol issue is resolved

Expand Down
5 changes: 5 additions & 0 deletions ydb/core/blobstorage/ut_blobstorage/ut_helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ namespace NKikimr {

TString MakeData(ui32 dataSize);

template<typename Int1 = ui32, typename Int2 = ui32>
inline Int1 GenerateRandom(Int1 min, Int2 max) {
return min + RandomNumber(max - min);
}

class TInflightActor : public TActorBootstrapped<TInflightActor> {
public:
struct TSettings {
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/blobstorage/vdisk/common/vdisk_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ namespace NKikimr {
SyncLogAdvisedIndexedBlockSize = ui32(1) << ui32(20); // 1 MB
SyncLogMaxMemAmount = ui64(64) << ui64(20); // 64 MB

MaxSyncLogChunkSize = ui32(16) << ui32(10); // 32 Kb

ReplTimeInterval = TDuration::Seconds(60); // 60 seconds
ReplRequestTimeout = TDuration::Seconds(10); // 10 seconds
ReplPlanQuantum = TDuration::MilliSeconds(100); // 100 ms
Expand Down
5 changes: 5 additions & 0 deletions ydb/core/blobstorage/vdisk/common/vdisk_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,11 @@ namespace NKikimr {
ui32 SyncLogAdvisedIndexedBlockSize;
ui64 SyncLogMaxMemAmount;

TControlWrapper EnableLocalSyncLogDataCutting;
TControlWrapper EnableSyncLogChunkCompression;
TControlWrapper MaxSyncLogChunksInFlight;
ui32 MaxSyncLogChunkSize;

///////////// REPL SETTINGS /////////////////////////
TDuration ReplTimeInterval;
TDuration ReplRequestTimeout;
Expand Down
13 changes: 6 additions & 7 deletions ydb/core/blobstorage/vdisk/hullop/blobstorage_hull.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -513,14 +513,11 @@ namespace NKikimr {

///////////////// SYNC //////////////////////////////////////////////////////
TLsnSeg THull::AllocateLsnForSyncDataCmd(const TString &data) {
// count number of elements
ui32 counter = 0;
auto count = [&counter] (const void *) { counter++; };
// do job - count all elements
NSyncLog::TFragmentReader(data).ForEach(count, count, count, count);
NSyncLog::TFragmentReader fragment(data);

// allocate LsnSeg; we reserve a diapason of lsns since we put multiple records
ui64 lsnAdvance = counter;
std::vector<const NSyncLog::TRecordHdr*> records = fragment.ListRecords();
ui64 lsnAdvance = records.size();
Y_ABORT_UNLESS(lsnAdvance > 0);
auto seg = Fields->LsnMngr->AllocLsnForHull(lsnAdvance);

Expand All @@ -536,7 +533,9 @@ namespace NKikimr {
curLsn++;
};
// do job - update blocks cache
NSyncLog::TFragmentReader(data).ForEach(otherHandler, blockHandler, otherHandler, blockHandlerV2);
for (const NSyncLog::TRecordHdr* rec : records) {
NSyncLog::HandleRecordHdr(rec, otherHandler, blockHandler, otherHandler, blockHandlerV2);
}
// check that all records are applied
Y_DEBUG_ABORT_UNLESS(curLsn == seg.Last + 1);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -429,10 +429,9 @@ namespace NKikimr {

void ApplySyncDataByRecord(const TActorContext &ctx, ui64 recordLsn) {
// count number of records
ui64 recsNum = 0;
auto count = [&recsNum] (const void *) { recsNum++; };
NSyncLog::TFragmentReader fragment(LocalSyncDataMsg.Data);
fragment.ForEach(count, count, count, count);
std::vector<const NSyncLog::TRecordHdr*> records = fragment.ListRecords();
ui64 recsNum = records.size();

// calculate lsn
Y_DEBUG_ABORT_UNLESS(recordLsn >= recsNum, "recordLsn# %" PRIu64 " recsNum# %" PRIu64,
Expand Down Expand Up @@ -465,7 +464,9 @@ namespace NKikimr {
};

// apply local sync data
fragment.ForEach(blobHandler, blockHandler, barrierHandler, blockHandlerV2);
for (const NSyncLog::TRecordHdr* rec : records) {
NSyncLog::HandleRecordHdr(rec, blobHandler, blockHandler, barrierHandler, blockHandlerV2);
}
}

void PutLogoBlobsBatchToHull(
Expand Down
114 changes: 114 additions & 0 deletions ydb/core/blobstorage/vdisk/syncer/blobstorage_syncer_localwriter.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "blobstorage_syncer_localwriter.h"
#include <ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogmsgreader.h>
#include <ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogmsgwriter.h>

namespace NKikimr {

Expand Down Expand Up @@ -186,5 +187,118 @@ namespace NKikimr {
return new TLocalSyncDataExtractorActor(vctx, skeletonId, parentId, std::move(ev));
}

///////////////////////////////////////////////////////////////////////////////////////////////
// TLocalSyncDataCutterActor -- actor extracts data from TEvLocalSyncData, cuts it into
// smaller chunks and sends in multiple messages to Skeleton
///////////////////////////////////////////////////////////////////////////////////////////////
class TLocalSyncDataCutterActor : public TActorBootstrapped<TLocalSyncDataCutterActor> {
TIntrusivePtr<TVDiskConfig> VConfig;
TIntrusivePtr<TVDiskContext> VCtx;
TActorId SkeletonId;
TActorId ParentId;
std::unique_ptr<TEvLocalSyncData> Ev;
std::vector<TString> Chunks;

ui32 ChunksInFlight = 0;
bool CompressChunks;
ui32 MaxChunksInFlight;
ui32 MaxChunksSize;

public:
void Bootstrap(const TActorContext&) {
THPTimer timer;
std::unique_ptr<NSyncLog::TNaiveFragmentWriter> fragmentWriter;

if (CompressChunks) {
fragmentWriter.reset(new NSyncLog::TLz4FragmentWriter);
} else {
fragmentWriter.reset(new NSyncLog::TNaiveFragmentWriter);
}

auto addChunk = [&]() {
if (fragmentWriter->GetSize()) {
TString chunk;
fragmentWriter->Finish(&chunk);
Chunks.emplace_back(std::move(chunk));
fragmentWriter->Clear();
}
};

NSyncLog::TFragmentReader fragmentReader(Ev->Data);
std::vector<const NSyncLog::TRecordHdr*> records = fragmentReader.ListRecords();
for (const NSyncLog::TRecordHdr* rec : records) {
if (fragmentWriter->GetSize() + rec->GetSize() > MaxChunksSize) {
addChunk();
}
fragmentWriter->Push(rec, rec->GetSize());
}
addChunk();

LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::BS_SYNCER, VCtx->VDiskLogPrefix
<< "TLocalSyncDataCutterActor: VDiskId# " << Ev->VDiskID.ToString()
<< " dataSize# " << Ev->Data.size()
<< " duration# " << TDuration::Seconds(timer.Passed()));

Become(&TThis::StateFunc);
SendChunks();
}

void Finish(const NKikimrProto::EReplyStatus& status) {
Send(ParentId, new TEvLocalSyncDataResult(status, TAppData::TimeProvider->Now(), nullptr, nullptr));
PassAway();
}

void Handle(const TEvLocalSyncDataResult::TPtr& ev) {
if (ev->Get()->Status == NKikimrProto::OK) {
--ChunksInFlight;
if (Chunks.empty() && ChunksInFlight == 0) {
Finish(NKikimrProto::OK);
} else {
SendChunks();
}
} else {
Finish(ev->Get()->Status);
}
}

void SendChunks() {
while (ChunksInFlight < MaxChunksInFlight && !Chunks.empty()) {
Send(SkeletonId, new TEvLocalSyncData(Ev->VDiskID, Ev->SyncState, std::move(Chunks.back())));
Chunks.pop_back();
++ChunksInFlight;
}
}

public:
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
return NKikimrServices::TActivity::VDISK_LOCALSYNCDATA_CUTTER;
}

TLocalSyncDataCutterActor(
const TIntrusivePtr<TVDiskConfig>& vconfig,
const TIntrusivePtr<TVDiskContext>& vctx,
const TActorId& skeletonId,
const TActorId& parentId,
std::unique_ptr<TEvLocalSyncData> ev)
: VCtx(vctx)
, SkeletonId(skeletonId)
, ParentId(parentId)
, Ev(std::move(ev))
, CompressChunks(vconfig->MaxSyncLogChunksInFlight)
, MaxChunksInFlight(vconfig->MaxSyncLogChunksInFlight)
, MaxChunksSize(vconfig->MaxSyncLogChunkSize)
{}

STRICT_STFUNC(StateFunc, {
hFunc(TEvLocalSyncDataResult, Handle);
})

};

IActor* CreateLocalSyncDataCutter(const TIntrusivePtr<TVDiskConfig>& vconfig, const TIntrusivePtr<TVDiskContext>& vctx,
const TActorId& skeletonId, const TActorId& parentId, std::unique_ptr<TEvLocalSyncData> ev) {
return new TLocalSyncDataCutterActor(vconfig, vctx, skeletonId, parentId, std::move(ev));
}


} // NKikimr
Loading

0 comments on commit a26dfc6

Please sign in to comment.