Skip to content

Commit

Permalink
Merge pull request #10389 from uzhastik/24_3_merge_6
Browse files Browse the repository at this point in the history
24 3 merge 6
  • Loading branch information
maximyurchuk authored Oct 15, 2024
2 parents f60c468 + 3ab3902 commit 20fd222
Show file tree
Hide file tree
Showing 539 changed files with 1,462 additions and 1,340 deletions.
29 changes: 18 additions & 11 deletions ydb/core/client/server/msgbus_server_pq_metacache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ class TPersQueueMetaCacheActor : public TActorBootstrapped<TPersQueueMetaCacheAc
req->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_DML);
req->Record.MutableRequest()->SetKeepSession(false);
req->Record.MutableRequest()->SetDatabase(NKikimr::NPQ::GetDatabaseFromConfig(AppData(ctx)->PQConfig));
req->Record.MutableRequest()->SetUsePublicResponseDataFormat(true);

req->Record.MutableRequest()->MutableQueryCachePolicy()->set_keep_in_cache(true);
req->Record.MutableRequest()->MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write();
Expand Down Expand Up @@ -274,9 +275,14 @@ class TPersQueueMetaCacheActor : public TActorBootstrapped<TPersQueueMetaCacheAc

const auto& record = ev->Get()->Record.GetRef();

Y_ABORT_UNLESS(record.GetResponse().GetResults().size() == 1);
const auto& rr = record.GetResponse().GetResults(0).GetValue().GetStruct(0);
ui64 newVersion = rr.ListSize() == 0 ? 0 : rr.GetList(0).GetStruct(0).GetOptional().GetInt64();
Y_VERIFY(record.GetResponse().YdbResultsSize() == 1);
NYdb::TResultSetParser parser(record.GetResponse().GetYdbResults(0));

ui64 newVersion = 0;
if (parser.RowsCount() != 0) {
parser.TryNextRow();
newVersion = *parser.ColumnParser(0).GetOptionalInt64();
}

LastVersionUpdate = ctx.Now();
if (newVersion > CurrentTopicsVersion || CurrentTopicsVersion == 0 || SkipVersionCheck) {
Expand All @@ -293,17 +299,18 @@ class TPersQueueMetaCacheActor : public TActorBootstrapped<TPersQueueMetaCacheAc

const auto& record = ev->Get()->Record.GetRef();

Y_ABORT_UNLESS(record.GetResponse().GetResults().size() == 1);
Y_VERIFY(record.GetResponse().YdbResultsSize() == 1);
TString path, dc;
const auto& rr = record.GetResponse().GetResults(0).GetValue().GetStruct(0);
for (const auto& row : rr.GetList()) {

path = row.GetStruct(0).GetOptional().GetText();
dc = row.GetStruct(1).GetOptional().GetText();
NYdb::TResultSetParser parser(record.GetResponse().GetYdbResults(0));
const ui32 rowCount = parser.RowsCount();
while (parser.TryNextRow()) {
path = *parser.ColumnParser(0).GetOptionalUtf8();
dc = *parser.ColumnParser(1).GetOptionalUtf8();

NewTopics.emplace_back(decltype(NewTopics)::value_type{path, dc});
}
if (rr.ListSize() > 0) {

if (rowCount > 0) {
LastTopicKey = {path, dc};
return RunQuery(EQueryType::EGetTopics, ctx);
} else {
Expand Down Expand Up @@ -710,7 +717,7 @@ class TPersQueueMetaCacheActor : public TActorBootstrapped<TPersQueueMetaCacheAc
void ProcessNodesInfoWaitersQueue(bool status, const TActorContext& ctx) {
if (DynamicNodesMapping == nullptr) {
Y_ABORT_UNLESS(!status);
DynamicNodesMapping.reset(new THashMap<ui32, ui32>);
DynamicNodesMapping.reset(new THashMap<ui32, ui32>);
}
while(!NodesMappingWaiters.empty()) {
ctx.Send(NodesMappingWaiters.front(),
Expand Down
6 changes: 6 additions & 0 deletions ydb/core/cms/cms.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,12 @@ bool TCms::CheckEvictVDisks(const TAction &action, TErrorInfo &error) const {
return false;
}

if (State->Config.SentinelConfig.EvictVDisksStatus.Empty()) {
error.Code = TStatus::ERROR;
error.Reason = "Evict vdisks is disabled in Sentinel (self heal)";
return false;
}

switch (action.GetType()) {
case TAction::RESTART_SERVICES:
case TAction::SHUTDOWN_HOST:
Expand Down
40 changes: 40 additions & 0 deletions ydb/core/cms/cms_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1951,6 +1951,46 @@ Y_UNIT_TEST_SUITE(TCmsTest) {
env.CheckDonePermission("user", permission2.GetPermissions(0).GetId());
}

Y_UNIT_TEST(DisabledEvictVDisks)
{
auto opts = TTestEnvOpts(8).WithSentinel();
TCmsTestEnv env(opts);
env.SetLogPriority(NKikimrServices::CMS, NLog::PRI_DEBUG);

// Make transition faster for tests purposes
auto cmsConfig = env.GetCmsConfig();
cmsConfig.MutableSentinelConfig()->SetDefaultStateLimit(1);
env.SetCmsConfig(cmsConfig);

// Evict VDisks
auto request = env.CheckPermissionRequest(
MakePermissionRequest(TRequestOptions("user").WithEvictVDisks(),
MakeAction(TAction::RESTART_SERVICES, env.GetNodeId(0), 600000000, "storage")
),
TStatus::DISALLOW_TEMP // ok, waiting for move VDisks
);

// Check that FAULTY BSC request is sent
env.CheckBSCUpdateRequests({ env.GetNodeId(0) }, NKikimrBlobStorage::FAULTY);

// Disable VDisks eviction
cmsConfig.MutableSentinelConfig()->SetEvictVDisksStatus(NKikimrCms::TCmsConfig::TSentinelConfig::DISABLED);
env.SetCmsConfig(cmsConfig);

// Check that ACTIVE BSC request is sent
env.CheckBSCUpdateRequests({ env.GetNodeId(0) }, NKikimrBlobStorage::ACTIVE);

// Check that CMS returns ERROR when VDisks eviction is disabled
env.CheckRequest("user", request.GetRequestId(), false, TStatus::ERROR, 0);

// Enable VDisks eviction again
cmsConfig.MutableSentinelConfig()->SetEvictVDisksStatus(NKikimrCms::TCmsConfig::TSentinelConfig::FAULTY);
env.SetCmsConfig(cmsConfig);

// Check that FAULTY BSC request is sent again
env.CheckBSCUpdateRequests({ env.GetNodeId(0) }, NKikimrBlobStorage::FAULTY);
}

Y_UNIT_TEST(EmergencyDuringRollingRestart)
{
TCmsTestEnv env(8);
Expand Down
32 changes: 32 additions & 0 deletions ydb/core/cms/config.h
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
#pragma once

#include "pdisk_state.h"
#include "pdisk_status.h"

#include <ydb/core/protos/cms.pb.h>

#include <util/datetime/base.h>
#include <util/generic/hash.h>
#include <util/generic/map.h>
#include <util/generic/maybe.h>

namespace NKikimr::NCms {

Expand All @@ -30,6 +32,8 @@ struct TCmsSentinelConfig {
ui32 RoomRatio;
ui32 RackRatio;

TMaybeFail<EPDiskStatus> EvictVDisksStatus;

void Serialize(NKikimrCms::TCmsConfig::TSentinelConfig &config) const {
config.SetEnable(Enable);
config.SetDryRun(DryRun);
Expand All @@ -45,6 +49,7 @@ struct TCmsSentinelConfig {
config.SetRackRatio(RackRatio);

SaveStateLimits(config);
SaveEvictVDisksStatus(config);
}

void Deserialize(const NKikimrCms::TCmsConfig::TSentinelConfig &config) {
Expand All @@ -63,6 +68,8 @@ struct TCmsSentinelConfig {

auto newStateLimits = LoadStateLimits(config);
StateLimits.swap(newStateLimits);

EvictVDisksStatus = LoadEvictVDisksStatus(config);
}

void SaveStateLimits(NKikimrCms::TCmsConfig::TSentinelConfig &config) const {
Expand Down Expand Up @@ -129,6 +136,31 @@ struct TCmsSentinelConfig {

return stateLimits;
}

static TMaybeFail<EPDiskStatus> LoadEvictVDisksStatus(const NKikimrCms::TCmsConfig::TSentinelConfig &config) {
using EEvictVDisksStatus = NKikimrCms::TCmsConfig::TSentinelConfig;
switch (config.GetEvictVDisksStatus()) {
case EEvictVDisksStatus::UNKNOWN:
case EEvictVDisksStatus::FAULTY:
return EPDiskStatus::FAULTY;
case EEvictVDisksStatus::DISABLED:
return Nothing();
}
return EPDiskStatus::FAULTY;
}

void SaveEvictVDisksStatus(NKikimrCms::TCmsConfig::TSentinelConfig &config) const {
using EEvictVDisksStatus = NKikimrCms::TCmsConfig::TSentinelConfig;

if (EvictVDisksStatus.Empty()) {
config.SetEvictVDisksStatus(EEvictVDisksStatus::DISABLED);
return;
}

if (*EvictVDisksStatus == EPDiskStatus::FAULTY) {
config.SetEvictVDisksStatus(EEvictVDisksStatus::FAULTY);
}
}
};

struct TCmsLogConfig {
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/cms/sentinel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -895,8 +895,8 @@ class TSentinel: public TActorBootstrapped<TSentinel> {
continue;
}

if (it->second.HasFaultyMarker()) {
info.SetForcedStatus(EPDiskStatus::FAULTY);
if (it->second.HasFaultyMarker() && Config.EvictVDisksStatus.Defined()) {
info.SetForcedStatus(*Config.EvictVDisksStatus);
} else {
info.ResetForcedStatus();
}
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/driver_lib/run/kikimr_services_initializers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2008,7 +2008,8 @@ TPersQueueL2CacheInitializer::TPersQueueL2CacheInitializer(const TKikimrRunConfi
{}

void TPersQueueL2CacheInitializer::InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) {
static const ui64 DEFAULT_PQ_L2_MAX_SIZE_MB = 8 * 1024;
static const ui64 DEFAULT_PQ_L2_MAX_SIZE_MB =
NKikimrNodeLimits::TNodeLimitsConfig_TPersQueueNodeConfig::default_instance().GetSharedCacheSizeMb();
static const TDuration DEFAULT_PQ_L2_KEEP_TIMEOUT = TDuration::Seconds(10);

NPQ::TCacheL2Parameters params;
Expand Down
3 changes: 1 addition & 2 deletions ydb/core/grpc_services/rpc_execute_data_query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,8 @@ class TExecuteDataQueryRPC : public TRpcKqpRequestActor<TExecuteDataQueryRPC, TE
// https://protobuf.dev/reference/cpp/arenas/#swap
// Actualy will be copy in case pf remote execution
queryResult->mutable_result_sets()->Swap(record.MutableResponse()->MutableYdbResults());
} else {
NKqp::ConvertKqpQueryResultsToDbResult(kqpResponse, queryResult);
}

ConvertQueryStats(kqpResponse, queryResult);
if (kqpResponse.HasTxMeta()) {
queryResult->mutable_tx_meta()->CopyFrom(kqpResponse.GetTxMeta());
Expand Down
6 changes: 5 additions & 1 deletion ydb/core/grpc_services/rpc_execute_yql_script.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,11 @@ class TExecuteYqlScriptRPC : public TRpcKqpRequestActor<TExecuteYqlScriptRPC, TE
auto queryResult = TEvExecuteYqlScriptRequest::AllocateResult<TResult>(Request_);

try {
NKqp::ConvertKqpQueryResultsToDbResult(kqpResponse, queryResult);
const auto& results = kqpResponse.GetYdbResults();
for (const auto& result : results) {
queryResult->add_result_sets()->CopyFrom(result);
}

} catch (const std::exception& ex) {
NYql::TIssues issues;
issues.AddIssue(NYql::ExceptionToIssue(ex));
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ namespace {
{}

NKqp::TEvKqp::TEvDataQueryStreamPart::TPtr Handle;
google::protobuf::RepeatedPtrField<NKikimrMiniKQL::TResult>::const_iterator ResultIterator;
google::protobuf::RepeatedPtrField<Ydb::ResultSet>::const_iterator ResultIterator;
};

enum EStreamRpcWakeupTag : ui64 {
Expand Down Expand Up @@ -218,7 +218,7 @@ class TStreamExecuteYqlScriptRPC
auto result = response.mutable_result();

try {
NKqp::ConvertKqpQueryResultToDbResult(kqpResult, result->mutable_result_set());
result->mutable_result_set()->CopyFrom(kqpResult);
} catch (std::exception ex) {
ReplyFinishStream(ex.what());
}
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

#include <ydb/core/persqueue/utils.h>
#include <ydb/core/protos/grpc_pq_old.pb.h>
#include <ydb/public/api/protos/draft/persqueue_common.pb.h>

namespace NKafka {

Expand Down Expand Up @@ -262,6 +263,7 @@ THolder<TEvPartitionWriter::TEvWriteRequest> Convert(const TProduceRequestData::

for (const auto& record : batch->Records) {
NKikimrPQClient::TDataChunk proto;
proto.set_codec(NPersQueueCommon::RAW);
for(auto& h : record.Headers) {
auto res = proto.AddMessageMeta();
if (h.Key) {
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kafka_proxy/actors/kafka_produce_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ using namespace NKikimrClient;
// Each request can contain data for writing to several topics, and in each topic to several partitions.
// When a request to write to an unknown topic arrives, the actor changes the state to Init until it receives
// information about all the topics needed to process the request.
//
//
// Requests are processed in parallel, but it is guaranteed that the recording order will be preserved.
// The order of responses to requests is also guaranteed.
//
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/kqp/common/events/query.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@ struct TEvQueryRequest: public NActors::TEventLocal<TEvQueryRequest, TKqpEvents:
const TQueryRequestSettings& querySettings = TQueryRequestSettings(),
const TString& poolId = "");

TEvQueryRequest() = default;
TEvQueryRequest() {
Record.MutableRequest()->SetUsePublicResponseDataFormat(true);
}

bool IsSerializable() const override {
return true;
Expand Down
11 changes: 1 addition & 10 deletions ydb/core/kqp/common/kqp.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,9 @@

namespace NKikimr::NKqp {

void ConvertKqpQueryResultToDbResult(const NKikimrMiniKQL::TResult& from, Ydb::ResultSet* to);

TString ScriptExecutionRunnerActorIdString(const NActors::TActorId& actorId);
bool ScriptExecutionRunnerActorIdFromString(const TString& executionId, TActorId& actorId);

template<typename TFrom, typename TTo>
inline void ConvertKqpQueryResultsToDbResult(const TFrom& from, TTo* to) {
const auto& results = from.GetResults();
for (const auto& result : results) {
ConvertKqpQueryResultToDbResult(result, to->add_result_sets());
}
}

class TKqpRequestInfo {
public:
Expand Down Expand Up @@ -80,7 +71,7 @@ class IQueryReplayBackend : public TNonCopyable {
/// Accepts query text
virtual void Collect(const TString& queryData) = 0;

virtual bool IsNull() { return false; }
virtual bool IsNull() { return false; }

virtual ~IQueryReplayBackend() {};

Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/common/kqp_event_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ void TEvKqp::TEvQueryRequest::PrepareRemote() const {
Record.MutableRequest()->SetPoolId(PoolId);
}

Record.MutableRequest()->SetUsePublicResponseDataFormat(true);
Record.MutableRequest()->SetSessionId(SessionId);
Record.MutableRequest()->SetAction(QueryAction);
Record.MutableRequest()->SetType(QueryType);
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/kqp/common/kqp_timeouts.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ ui64 GetDefaultQueryTimeoutMs(NKikimrKqp::EQueryType queryType,
case NKikimrKqp::QUERY_TYPE_SQL_DML:
case NKikimrKqp::QUERY_TYPE_PREPARED_DML:
case NKikimrKqp::QUERY_TYPE_AST_DML:
return queryLimits.GetDataQueryTimeoutMs();
case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY:
case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_CONCURRENT_QUERY:
return queryLimits.GetDataQueryTimeoutMs();
return queryServiceConfig.GetQueryTimeoutDefaultSeconds() * 1000;

case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_SCRIPT:
return queryServiceConfig.GetScriptOperationTimeoutDefaultSeconds()
Expand Down
Loading

0 comments on commit 20fd222

Please sign in to comment.