Skip to content

Commit

Permalink
Merge f7d017f into f8dab68
Browse files Browse the repository at this point in the history
  • Loading branch information
serbel324 authored Jun 3, 2024
2 parents f8dab68 + f7d017f commit 1bce8a4
Show file tree
Hide file tree
Showing 31 changed files with 239 additions and 103 deletions.
1 change: 1 addition & 0 deletions ydb/core/blobstorage/dsproxy/ut_fat/dsproxy_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4252,6 +4252,7 @@ class TBlobStorageProxyTest: public TTestBase {
vDiskConfig->GCOnlySynced = false;
vDiskConfig->HullCompLevelRateThreshold = 0.1;
vDiskConfig->SkeletonFrontQueueBackpressureCheckMsgId = false;
vDiskConfig->UseCostTracker = false;

IActor* vDisk = CreateVDisk(vDiskConfig, bsInfo, counters);
TActorSetupCmd vDiskSetup(vDisk, TMailboxType::Revolving, 0);
Expand Down
13 changes: 13 additions & 0 deletions ydb/core/blobstorage/nodewarden/node_warden_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,19 @@ void TNodeWarden::Bootstrap() {
icb->RegisterSharedControl(EnableSyncLogChunkCompressionSSD, "VDiskControls.EnableSyncLogChunkCompressionSSD");
icb->RegisterSharedControl(MaxSyncLogChunksInFlightHDD, "VDiskControls.MaxSyncLogChunksInFlightHDD");
icb->RegisterSharedControl(MaxSyncLogChunksInFlightSSD, "VDiskControls.MaxSyncLogChunksInFlightSSD");

icb->RegisterSharedControl(CostMetricsParametersByMedia[NPDisk::DEVICE_TYPE_ROT].BurstThresholdNs,
"VDiskControls.BurstThresholdNsHDD");
icb->RegisterSharedControl(CostMetricsParametersByMedia[NPDisk::DEVICE_TYPE_SSD].BurstThresholdNs,
"VDiskControls.BurstThresholdNsSSD");
icb->RegisterSharedControl(CostMetricsParametersByMedia[NPDisk::DEVICE_TYPE_NVME].BurstThresholdNs,
"VDiskControls.BurstThresholdNsNVME");
icb->RegisterSharedControl(CostMetricsParametersByMedia[NPDisk::DEVICE_TYPE_ROT].DiskTimeAvailableScale,
"VDiskControls.DiskTimeAvailableScaleHDD");
icb->RegisterSharedControl(CostMetricsParametersByMedia[NPDisk::DEVICE_TYPE_SSD].DiskTimeAvailableScale,
"VDiskControls.DiskTimeAvailableScaleSSD");
icb->RegisterSharedControl(CostMetricsParametersByMedia[NPDisk::DEVICE_TYPE_NVME].DiskTimeAvailableScale,
"VDiskControls.DiskTimeAvailableScaleNVME");
}

// start replication broker
Expand Down
7 changes: 7 additions & 0 deletions ydb/core/blobstorage/nodewarden/node_warden_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,8 @@ namespace NKikimr::NStorage {
TReplQuoter::TPtr ReplNodeRequestQuoter;
TReplQuoter::TPtr ReplNodeResponseQuoter;

TCostMetricsParametersByMedia CostMetricsParametersByMedia;

public:
struct TGroupRecord;

Expand All @@ -159,6 +161,11 @@ namespace NKikimr::NStorage {
, EnableSyncLogChunkCompressionSSD(0, 0, 1)
, MaxSyncLogChunksInFlightHDD(10, 1, 1024)
, MaxSyncLogChunksInFlightSSD(10, 1, 1024)
, CostMetricsParametersByMedia({
TCostMetricsParameters{200},
TCostMetricsParameters{50},
TCostMetricsParameters{32},
})
{
Y_ABORT_UNLESS(Cfg->BlobStorageConfig.GetServiceSet().AvailabilityDomainsSize() <= 1);
AvailDomainId = 1;
Expand Down
15 changes: 2 additions & 13 deletions ydb/core/blobstorage/nodewarden/node_warden_vdisk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,20 +184,9 @@ namespace NKikimr::NStorage {
vdiskConfig->MaxSyncLogChunksInFlight = MaxSyncLogChunksInFlightSSD;
}

vdiskConfig->FeatureFlags = Cfg->FeatureFlags;
vdiskConfig->CostMetricsParametersByMedia = CostMetricsParametersByMedia;

if (Cfg->BlobStorageConfig.HasCostMetricsSettings()) {
for (auto type : Cfg->BlobStorageConfig.GetCostMetricsSettings().GetVDiskTypes()) {
if (type.HasPDiskType() && deviceType == PDiskTypeToPDiskType(type.GetPDiskType())) {
if (type.HasBurstThresholdNs()) {
vdiskConfig->BurstThresholdNs = type.GetBurstThresholdNs();
}
if (type.HasDiskTimeAvailableScale()) {
vdiskConfig->DiskTimeAvailableScale = type.GetDiskTimeAvailableScale();
}
}
}
}
vdiskConfig->FeatureFlags = Cfg->FeatureFlags;

if (StorageConfig.HasBlobStorageConfig() && StorageConfig.GetBlobStorageConfig().HasVDiskPerformanceSettings()) {
for (auto &type : StorageConfig.GetBlobStorageConfig().GetVDiskPerformanceSettings().GetVDiskTypes()) {
Expand Down
8 changes: 4 additions & 4 deletions ydb/core/blobstorage/pdisk/blobstorage_pdisk.h
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ struct TEvYardInitResult : public TEventLocal<TEvYardInitResult, TEvBlobStorage:
TEvYardInitResult(const NKikimrProto::EReplyStatus status, const TString &errorReason)
: Status(status)
, StatusFlags(0)
, PDiskParams(new TPDiskParams(0, 0, 0, 0, 0, 0, 0, 0, 0, 0))
, PDiskParams(new TPDiskParams(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, DEVICE_TYPE_ROT))
, ErrorReason(errorReason)
{
Y_ABORT_UNLESS(status != NKikimrProto::OK, "Single-parameter constructor is for error responses only");
Expand All @@ -183,7 +183,7 @@ struct TEvYardInitResult : public TEventLocal<TEvYardInitResult, TEvBlobStorage:
ui64 writeSpeedBps, ui64 readBlockSize, ui64 writeBlockSize,
ui64 bulkWriteBlockSize, ui32 chunkSize, ui32 appendBlockSize,
TOwner owner, TOwnerRound ownerRound, TStatusFlags statusFlags, TVector<TChunkIdx> ownedChunks,
const TString &errorReason)
EDeviceType trueMediaType, const TString &errorReason)
: Status(status)
, StatusFlags(statusFlags)
, PDiskParams(new TPDiskParams(
Expand All @@ -196,8 +196,8 @@ struct TEvYardInitResult : public TEventLocal<TEvYardInitResult, TEvBlobStorage:
writeSpeedBps,
readBlockSize,
writeBlockSize,
bulkWriteBlockSize
))
bulkWriteBlockSize,
trueMediaType))
, OwnedChunks(std::move(ownedChunks))
, ErrorReason(errorReason)
{}
Expand Down
8 changes: 5 additions & 3 deletions ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1743,7 +1743,8 @@ void TPDisk::ReplyErrorYardInitResult(TYardInit &evYardInit, const TString &str)
DriveModel.Speed(TDriveModel::OP_TYPE_WRITE), readBlockSize, writeBlockSize,
DriveModel.BulkWriteBlockSize(),
GetUserAccessibleChunkSize(), GetChunkAppendBlockSize(), OwnerSystem, 0,
GetStatusFlags(OwnerSystem, evYardInit.OwnerGroupType), TVector<TChunkIdx>(), error.Str()));
GetStatusFlags(OwnerSystem, evYardInit.OwnerGroupType), TVector<TChunkIdx>(),
Cfg->RetrieveDeviceType(), error.Str()));
Mon.YardInit.CountResponse();
}

Expand Down Expand Up @@ -1790,7 +1791,8 @@ bool TPDisk::YardInitForKnownVDisk(TYardInit &evYardInit, TOwner owner) {
DriveModel.SeekTimeNs() / 1000ull, DriveModel.Speed(TDriveModel::OP_TYPE_READ),
DriveModel.Speed(TDriveModel::OP_TYPE_WRITE), readBlockSize, writeBlockSize,
DriveModel.BulkWriteBlockSize(), GetUserAccessibleChunkSize(), GetChunkAppendBlockSize(), owner,
ownerRound, GetStatusFlags(OwnerSystem, evYardInit.OwnerGroupType), ownedChunks, nullptr));
ownerRound, GetStatusFlags(OwnerSystem, evYardInit.OwnerGroupType), ownedChunks,
Cfg->RetrieveDeviceType(), nullptr));
GetStartingPoints(owner, result->StartingPoints);
ownerData.VDiskId = vDiskId;
ownerData.CutLogId = evYardInit.CutLogId;
Expand Down Expand Up @@ -1941,7 +1943,7 @@ void TPDisk::YardInitFinish(TYardInit &evYardInit) {
DriveModel.Speed(TDriveModel::OP_TYPE_WRITE), readBlockSize, writeBlockSize,
DriveModel.BulkWriteBlockSize(), GetUserAccessibleChunkSize(), GetChunkAppendBlockSize(), owner, ownerRound,
GetStatusFlags(OwnerSystem, evYardInit.OwnerGroupType) | ui32(NKikimrBlobStorage::StatusNewOwner), TVector<TChunkIdx>(),
nullptr));
Cfg->RetrieveDeviceType(), nullptr));
GetStartingPoints(result->PDiskParams->Owner, result->StartingPoints);
WriteSysLogRestorePoint(new TCompletionEventSender(
this, evYardInit.Sender, result.Release(), Mon.YardInit.Results), evYardInit.ReqId, {});
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/blobstorage/pdisk/blobstorage_pdisk_params.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ namespace NKikimr {
////////////////////////////////////////////////////////////////////////////
TPDiskParams::TPDiskParams(NPDisk::TOwner owner, ui64 ownerRound, ui32 chunkSize, ui32 appendBlockSize,
ui64 seekTimeUs, ui64 readSpeedBps, ui64 writeSpeedBps, ui64 readBlockSize,
ui64 writeBlockSize, ui64 bulkWriteBlockSize)
ui64 writeBlockSize, ui64 bulkWriteBlockSize, NPDisk::EDeviceType trueMediaType)
: Owner(owner)
, OwnerRound(ownerRound)
, ChunkSize(chunkSize)
Expand All @@ -25,6 +25,7 @@ namespace NKikimr {
, BulkWriteBlockSize(bulkWriteBlockSize)
, PrefetchSizeBytes(CalculatePrefetchSizeBytes(seekTimeUs, readSpeedBps))
, GlueRequestDistanceBytes(CalculateGlueRequestDistanceBytes(seekTimeUs, readSpeedBps))
, TrueMediaType(trueMediaType)
{
Y_DEBUG_ABORT_UNLESS(AppendBlockSize <= ChunkSize);
}
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/blobstorage/pdisk/blobstorage_pdisk_params.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,15 @@ namespace NKikimr {
const ui64 PrefetchSizeBytes; // Pdisk is expected to stream data of this size at 83% of max speed.
const ui64 GlueRequestDistanceBytes; // It is faster to read unneeded data of this size than to seek over it.

const NPDisk::EDeviceType TrueMediaType;

static ui32 CalculateRecommendedReadSize(ui64 seekTimeUs, ui64 readSpeedBps, ui64 appendBlockSize);
static ui64 CalculatePrefetchSizeBytes(ui64 seekTimeUs, ui64 readSpeedBps);
static ui64 CalculateGlueRequestDistanceBytes(ui64 seekTimeUs, ui64 readSpeedBps);

TPDiskParams(NPDisk::TOwner owner, ui64 ownerRound, ui32 chunkSize, ui32 appendBlockSize,
ui64 seekTimeUs, ui64 readSpeedBps, ui64 writeSpeedBps, ui64 readBlockSize,
ui64 writeBlockSize, ui64 bulkWriteBlockSize);
ui64 writeBlockSize, ui64 bulkWriteBlockSize, NPDisk::EDeviceType trueMediaType);
void OutputHtml(IOutputStream &str) const;
TString ToString() const;
};
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/blobstorage/pdisk/mock/pdisk_mock.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ class TPDiskMockActor : public TActorBootstrapped<TPDiskMockActor> {
const ui64 bulkWriteBlockSize = 65536;
res = std::make_unique<NPDisk::TEvYardInitResult>(NKikimrProto::OK, seekTimeUs, readSpeedBps, writeSpeedBps,
readBlockSize, writeBlockSize, bulkWriteBlockSize, Impl.ChunkSize, Impl.AppendBlockSize, ownerId,
owner->OwnerRound, GetStatusFlags(), std::move(ownedChunks), TString());
owner->OwnerRound, GetStatusFlags(), std::move(ownedChunks), NPDisk::DEVICE_TYPE_NVME, TString());
res->StartingPoints = owner->StartingPoints;
} else {
res = std::make_unique<NPDisk::TEvYardInitResult>(NKikimrProto::INVALID_ROUND, "invalid owner round");
Expand Down
47 changes: 38 additions & 9 deletions ydb/core/blobstorage/ut_blobstorage/lib/env.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ struct TEnvironmentSetup {
std::set<TActorId> CommencedReplication;
std::unordered_map<ui32, TString> Cache;

using TIcbControlKey = std::pair<ui32, TString>; // { nodeId, name }

std::unordered_map<TIcbControlKey, TControlWrapper> IcbControls;

struct TSettings {
const ui32 NodeCount = 9;
const bool VDiskReplPausedAtStart = false;
Expand All @@ -39,7 +43,7 @@ struct TEnvironmentSetup {
const bool SuppressCompatibilityCheck = false;
const TFeatureFlags FeatureFlags;
const NPDisk::EDeviceType DiskType = NPDisk::EDeviceType::DEVICE_TYPE_NVME;
const ui32 BurstThresholdNs = 0;
const ui64 BurstThresholdNs = 0;
const ui32 MinHugeBlobInBytes = 0;
const float DiskTimeAvailableScale = 1;
const bool UseFakeConfigDispatcher = false;
Expand Down Expand Up @@ -374,14 +378,24 @@ struct TEnvironmentSetup {
}
config->FeatureFlags = Settings.FeatureFlags;

{
auto* type = config->BlobStorageConfig.MutableCostMetricsSettings()->AddVDiskTypes();
type->SetPDiskType(NKikimrBlobStorage::EPDiskType::ROT);
if (Settings.BurstThresholdNs) {
type->SetBurstThresholdNs(Settings.BurstThresholdNs);
}
type->SetDiskTimeAvailableScale(Settings.DiskTimeAvailableScale);
TAppData* appData = Runtime->GetNode(nodeId)->AppData.get();

#define ADD_ICB_CONTROL(controlName, defaultVal, minVal, maxVal, currentValue) { \
TControlWrapper control(defaultVal, minVal, maxVal); \
appData->Icb->RegisterSharedControl(control, controlName); \
control = currentValue; \
IcbControls.insert({{nodeId, controlName}, std::move(control)}); \
}

if (Settings.BurstThresholdNs) {
ADD_ICB_CONTROL("VDiskControls.BurstThresholdNsHDD", 200'000'000, 1, 1'000'000'000'000, Settings.BurstThresholdNs);
ADD_ICB_CONTROL("VDiskControls.BurstThresholdNsSSD", 50'000'000, 1, 1'000'000'000'000, Settings.BurstThresholdNs);
ADD_ICB_CONTROL("VDiskControls.BurstThresholdNsNVME", 32'000'000, 1, 1'000'000'000'000, Settings.BurstThresholdNs);
}
ADD_ICB_CONTROL("VDiskControls.DiskTimeAvailableScaleHDD", 1'000, 1, 1'000'000, std::round(Settings.DiskTimeAvailableScale * 1'000));
ADD_ICB_CONTROL("VDiskControls.DiskTimeAvailableScaleSSD", 1'000, 1, 1'000'000, std::round(Settings.DiskTimeAvailableScale * 1'000));
ADD_ICB_CONTROL("VDiskControls.DiskTimeAvailableScaleNVME", 1'000, 1, 1'000'000, std::round(Settings.DiskTimeAvailableScale * 1'000));
#undef ADD_ICB_CONTROL

{
auto* type = config->BlobStorageConfig.MutableVDiskPerformanceSettings()->AddVDiskTypes();
Expand Down Expand Up @@ -699,7 +713,7 @@ struct TEnvironmentSetup {
});
}

void PutBlob(const ui32 groupId, const TLogoBlobID& blobId, const TString& part) {
void PutBlob(const ui32 groupId, const TLogoBlobID& blobId, const TString& part) {
TActorId edge = Runtime->AllocateEdgeActor(Settings.ControllerNodeId);
Runtime->WrapInActorContext(edge, [&] {
SendToBSProxy(edge, groupId, new TEvBlobStorage::TEvPut(blobId, part, TInstant::Max(),
Expand Down Expand Up @@ -924,4 +938,19 @@ struct TEnvironmentSetup {
}
return ctr;
};

void SetIcbControl(ui32 nodeId, TString controlName, ui64 value) {
if (nodeId == 0) {
for (nodeId = 1; nodeId <= Settings.NodeCount; ++nodeId) {
auto it = IcbControls.find({nodeId, controlName});
Y_ABORT_UNLESS(it != IcbControls.end());
it->second = value;
}
} else {
auto it = IcbControls.find({nodeId, controlName});
Y_ABORT_UNLESS(it != IcbControls.end());
it->second = value;
}
}

};
29 changes: 15 additions & 14 deletions ydb/core/blobstorage/ut_blobstorage/monitoring.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -254,21 +254,22 @@ Y_UNIT_TEST_SUITE(BurstDetection) {
}

void TestDiskTimeAvailableScaling() {
auto measure = [](float scale) {
TBlobStorageGroupInfo::TTopology topology(TBlobStorageGroupType::ErasureNone, 1, 1, 1, true);
std::unique_ptr<TEnvironmentSetup> env;
ui32 groupSize;
TBlobStorageGroupType groupType;
ui32 groupId;
std::vector<ui32> pdiskLayout;
SetupEnv(topology, env, groupSize, groupType, groupId, pdiskLayout, 0, scale);

return env->AggregateVDiskCounters(env->StoragePoolName, groupSize, groupSize, groupId, pdiskLayout,
"advancedCost", "DiskTimeAvailable");
};
TBlobStorageGroupInfo::TTopology topology(TBlobStorageGroupType::ErasureNone, 1, 1, 1, true);
std::unique_ptr<TEnvironmentSetup> env;
ui32 groupSize;
TBlobStorageGroupType groupType;
ui32 groupId;
std::vector<ui32> pdiskLayout;
SetupEnv(topology, env, groupSize, groupType, groupId, pdiskLayout, 0, 1);

i64 test1 = env->AggregateVDiskCounters(env->StoragePoolName, groupSize, groupSize, groupId, pdiskLayout,
"advancedCost", "DiskTimeAvailable");

env->SetIcbControl(0, "VDiskControls.DiskTimeAvailableScaleNVME", 2'000);
env->Sim(TDuration::Minutes(5));

i64 test1 = measure(1);
i64 test2 = measure(2);
i64 test2 = env->AggregateVDiskCounters(env->StoragePoolName, groupSize, groupSize, groupId, pdiskLayout,
"advancedCost", "DiskTimeAvailable");

i64 delta = test1 * 2 - test2;

Expand Down
1 change: 1 addition & 0 deletions ydb/core/blobstorage/ut_group/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,7 @@ class TTestEnv {
TString());
auto vdiskConfig = AllVDiskKinds->MakeVDiskConfig(baseInfo);
vdiskConfig->EnableVDiskCooldownTimeout = true;
vdiskConfig->UseCostTracker = false;
auto counters = Counters->GetSubgroup("node", ToString(disk.NodeId))->GetSubgroup("vdisk", disk.VDiskId.ToString());
const TActorId& actorId = runtime.Register(CreateVDisk(vdiskConfig, Info, counters), TActorId(), 0, std::nullopt, disk.NodeId);
runtime.RegisterService(disk.VDiskActorId, actorId);
Expand Down
1 change: 1 addition & 0 deletions ydb/core/blobstorage/ut_mirror3of4/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ class TTestEnv {
""
);
const auto vcfg = MakeIntrusive<TVDiskConfig>(baseInfo);
vcfg->UseCostTracker = false;
auto vdiskCounters = Counters->GetSubgroup("vdisk", ToString(i));
runtime.RegisterService(info.GetActorId(i), runtime.Register(CreateVDisk(vcfg, Info, vdiskCounters),
TActorId(), 0, std::nullopt, pdisk.NodeId));
Expand Down
1 change: 1 addition & 0 deletions ydb/core/blobstorage/ut_vdisk/lib/prepare.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ bool TDefaultVDiskSetup::SetUp(TAllVDisks::TVDiskInstance &vdisk, TAllPDisks *pd
modifier(vdisk.Cfg.Get());
}
vdisk.Cfg->RunRepl = runRepl;
vdisk.Cfg->UseCostTracker = false;

return true;
}
Expand Down
1 change: 1 addition & 0 deletions ydb/core/blobstorage/ut_vdisk2/env.h
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ namespace NKikimr {
NPDisk::DEVICE_TYPE_SSD, VSlotId, NKikimrBlobStorage::TVDiskKind::Default, 1,
"static");
VDiskConfig = AllVDiskKinds->MakeVDiskConfig(baseInfo);
VDiskConfig->UseCostTracker = false;

// create and register actor
std::unique_ptr<IActor> vdisk(NKikimr::CreateVDisk(VDiskConfig, Info, Counters->GetSubgroup("subsystem", "vdisk")));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,16 @@ class TBsCostModelMirror3of4 : public TBsCostModelBase {
};

TBsCostTracker::TBsCostTracker(const TBlobStorageGroupType& groupType, NPDisk::EDeviceType diskType,
const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters, ui64 burstThresholdNs,
float diskTimeAvailableScale)
const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters,
const TCostMetricsParameters& costMetricsParameters)
: GroupType(groupType)
, CostCounters(counters->GetSubgroup("subsystem", "advancedCost"))
, MonGroup(std::make_shared<NMonGroup::TCostTrackerGroup>(CostCounters))
, BucketCapacity(burstThresholdNs * diskTimeAvailableScale)
, Bucket(&DiskTimeAvailable, &BucketCapacity, nullptr, nullptr, nullptr, nullptr, true)
, DiskTimeAvailableScale(diskTimeAvailableScale)
, BurstThresholdNs(costMetricsParameters.BurstThresholdNs)
, DiskTimeAvailableScale(costMetricsParameters.DiskTimeAvailableScale)
{
AtomicSet(BucketCapacity, GetDiskTimeAvailableScale() * BurstThresholdNs);
BurstDetector.Initialize(CostCounters, "BurstDetector");
switch (GroupType.GetErasure()) {
case TBlobStorageGroupType::ErasureMirror3dc:
Expand Down
Loading

0 comments on commit 1bce8a4

Please sign in to comment.