From 29980d67486b5b3003b251bdd4affdc54b725efe Mon Sep 17 00:00:00 2001 From: Nikita Vasilev Date: Tue, 11 Jun 2024 11:45:51 +0300 Subject: [PATCH] CREATE TABLE AS with inconsistent write (#4989) --- ydb/core/kqp/common/kqp_yql.cpp | 9 ++ ydb/core/kqp/common/kqp_yql.h | 3 + ydb/core/kqp/expr_nodes/kqp_expr_nodes.json | 3 +- ydb/core/kqp/host/kqp_statement_rewrite.cpp | 26 ++-- ydb/core/kqp/host/kqp_type_ann.cpp | 2 +- ydb/core/kqp/opt/kqp_opt_effects.cpp | 13 +- ydb/core/kqp/opt/kqp_opt_kql.cpp | 8 ++ ydb/core/kqp/provider/yql_kikimr_type_ann.cpp | 24 ++-- .../kqp/query_compiler/kqp_query_compiler.cpp | 18 ++- ydb/core/kqp/runtime/kqp_write_actor.cpp | 133 ++++++++++++++---- ydb/core/kqp/runtime/kqp_write_table.cpp | 81 +++++++---- ydb/core/kqp/runtime/kqp_write_table.h | 2 + ydb/core/kqp/session_actor/kqp_query_state.h | 18 ++- ydb/core/kqp/ut/query/kqp_query_ut.cpp | 1 + ydb/library/yql/core/yql_opt_utils.cpp | 11 ++ ydb/library/yql/core/yql_opt_utils.h | 1 + 16 files changed, 267 insertions(+), 86 deletions(-) diff --git a/ydb/core/kqp/common/kqp_yql.cpp b/ydb/core/kqp/common/kqp_yql.cpp index 14e4dbd53257..988f6f900dd9 100644 --- a/ydb/core/kqp/common/kqp_yql.cpp +++ b/ydb/core/kqp/common/kqp_yql.cpp @@ -282,6 +282,9 @@ TKqpUpsertRowsSettings TKqpUpsertRowsSettings::Parse(const TCoNameValueTupleList } else if (name == TKqpUpsertRowsSettings::IsUpdateSettingName) { YQL_ENSURE(tuple.Ref().ChildrenSize() == 1); settings.IsUpdate = true; + } else if (name == TKqpUpsertRowsSettings::AllowInconsistentWritesSettingName) { + YQL_ENSURE(tuple.Ref().ChildrenSize() == 1); + settings.AllowInconsistentWrites = true; } else { YQL_ENSURE(false, "Unknown KqpUpsertRows setting name '" << name << "'"); } @@ -310,6 +313,12 @@ NNodes::TCoNameValueTupleList TKqpUpsertRowsSettings::BuildNode(TExprContext& ct .Name().Build(IsUpdateSettingName) .Done()); } + if (AllowInconsistentWrites) { + settings.emplace_back( + Build(ctx, pos) + .Name().Build(AllowInconsistentWritesSettingName) + .Done()); + } return Build(ctx, pos) .Add(settings) diff --git a/ydb/core/kqp/common/kqp_yql.h b/ydb/core/kqp/common/kqp_yql.h index efd99a91dbfc..7fca43a5cd63 100644 --- a/ydb/core/kqp/common/kqp_yql.h +++ b/ydb/core/kqp/common/kqp_yql.h @@ -81,12 +81,15 @@ struct TKqpReadTableSettings { struct TKqpUpsertRowsSettings { static constexpr TStringBuf InplaceSettingName = "Inplace"; static constexpr TStringBuf IsUpdateSettingName = "IsUpdate"; + static constexpr TStringBuf AllowInconsistentWritesSettingName = "AllowInconsistentWrites"; bool Inplace = false; bool IsUpdate = false; + bool AllowInconsistentWrites = false; void SetInplace() { Inplace = true; } void SetIsUpdate() { IsUpdate = true; } + void SetAllowInconsistentWrites() { AllowInconsistentWrites = true; } static TKqpUpsertRowsSettings Parse(const NNodes::TCoNameValueTupleList& settingsList); static TKqpUpsertRowsSettings Parse(const NNodes::TKqpUpsertRows& node); diff --git a/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json b/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json index 496d65422a38..7f471c794cdd 100644 --- a/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json +++ b/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json @@ -538,7 +538,8 @@ "Children": [ {"Index": 0, "Name": "Table", "Type": "TKqpTable"}, {"Index": 1, "Name": "Columns", "Type": "TCoAtomList"}, - {"Index": 2, "Name": "Settings", "Type": "TCoNameValueTupleList", "Optional": true} + {"Index": 2, "Name": "InconsistentWrite", "Type": "TCoAtom"}, + {"Index": 3, "Name": "Settings", "Type": "TCoNameValueTupleList", "Optional": true} ] }, { diff --git a/ydb/core/kqp/host/kqp_statement_rewrite.cpp b/ydb/core/kqp/host/kqp_statement_rewrite.cpp index f33180bc210a..11202b97a76f 100644 --- a/ydb/core/kqp/host/kqp_statement_rewrite.cpp +++ b/ydb/core/kqp/host/kqp_statement_rewrite.cpp @@ -19,7 +19,7 @@ namespace { NYql::TExprNode::TPtr ReplaceInto = nullptr; }; - bool IsColumnTable(const NYql::NNodes::TMaybeNode& tableSettings) { + bool IsOlap(const NYql::NNodes::TMaybeNode& tableSettings) { if (!tableSettings) { return false; } @@ -85,6 +85,8 @@ namespace { return std::nullopt; } + const bool isOlap = IsOlap(settings.TableSettings); + const auto& insertData = writeArgs.Get(3); if (insertData.Ptr()->Content() == "Void") { return std::nullopt; @@ -136,7 +138,7 @@ namespace { const auto name = item->GetName(); auto currentType = item->GetItemType(); - const bool notNull = primariKeyColumns.contains(name) && IsColumnTable(settings.TableSettings); + const bool notNull = primariKeyColumns.contains(name) && isOlap; if (notNull && currentType->GetKind() == NYql::ETypeAnnotationKind::Optional) { currentType = currentType->Cast()->GetItemType(); @@ -169,6 +171,19 @@ namespace { const auto topLevelRead = NYql::FindTopLevelRead(insertData.Ptr()); + NYql::TExprNode::TListType insertSettings; + insertSettings.push_back( + exprCtx.NewList(pos, { + exprCtx.NewAtom(pos, "mode"), + exprCtx.NewAtom(pos, "replace"), + })); + if (!isOlap) { + insertSettings.push_back( + exprCtx.NewList(pos, { + exprCtx.NewAtom(pos, "AllowInconsistentWrites"), + })); + } + const auto insert = exprCtx.NewCallable(pos, "Write!", { topLevelRead == nullptr ? exprCtx.NewWorld(pos) : exprCtx.NewCallable(pos, "Left!", {topLevelRead.Get()}), exprCtx.NewCallable(pos, "DataSink", { @@ -184,12 +199,7 @@ namespace { }), }), insertData.Ptr(), - exprCtx.NewList(pos, { - exprCtx.NewList(pos, { - exprCtx.NewAtom(pos, "mode"), - exprCtx.NewAtom(pos, "replace"), - }), - }), + exprCtx.NewList(pos, std::move(insertSettings)), }); TCreateTableAsResult result; diff --git a/ydb/core/kqp/host/kqp_type_ann.cpp b/ydb/core/kqp/host/kqp_type_ann.cpp index 1b7bc6311c36..1ea01b629491 100644 --- a/ydb/core/kqp/host/kqp_type_ann.cpp +++ b/ydb/core/kqp/host/kqp_type_ann.cpp @@ -1840,7 +1840,7 @@ TStatus AnnotateKqpSinkEffect(const TExprNode::TPtr& node, TExprContext& ctx) { } TStatus AnnotateTableSinkSettings(const TExprNode::TPtr& input, TExprContext& ctx) { - if (!EnsureMinMaxArgsCount(*input, 2, 3, ctx)) { + if (!EnsureMinMaxArgsCount(*input, 2, 4, ctx)) { return TStatus::Error; } input->SetTypeAnn(ctx.MakeType()); diff --git a/ydb/core/kqp/opt/kqp_opt_effects.cpp b/ydb/core/kqp/opt/kqp_opt_effects.cpp index c40ba344160c..5a765c573f5b 100644 --- a/ydb/core/kqp/opt/kqp_opt_effects.cpp +++ b/ydb/core/kqp/opt/kqp_opt_effects.cpp @@ -217,7 +217,8 @@ bool IsMapWrite(const TKikimrTableDescription& table, TExprBase input, TExprCont #undef DBG } -TDqStage RebuildPureStageWithSink(TExprBase expr, const TKqpTable& table, const TCoAtomList& columns, TExprContext& ctx) { +TDqStage RebuildPureStageWithSink(TExprBase expr, const TKqpTable& table, const TCoAtomList& columns, + const TKqpUpsertRowsSettings& settings, TExprContext& ctx) { Y_DEBUG_ABORT_UNLESS(IsDqPureExpr(expr)); return Build(ctx, expr.Pos()) @@ -239,6 +240,9 @@ TDqStage RebuildPureStageWithSink(TExprBase expr, const TKqpTable& table, const .Settings() .Table(table) .Columns(columns) + .InconsistentWrite(settings.AllowInconsistentWrites + ? ctx.NewAtom(expr.Pos(), "true") + : ctx.NewAtom(expr.Pos(), "false")) .Settings() .Build() .Build() @@ -292,7 +296,7 @@ bool BuildUpsertRowsEffect(const TKqlUpsertRows& node, TExprContext& ctx, const } if (IsDqPureExpr(node.Input())) { if (sinkEffect) { - stageInput = RebuildPureStageWithSink(node.Input(), node.Table(), node.Columns(), ctx); + stageInput = RebuildPureStageWithSink(node.Input(), node.Table(), node.Columns(), settings, ctx); effect = Build(ctx, node.Pos()) .Stage(stageInput.Cast().Ptr()) .SinkIndex().Build("0") @@ -330,6 +334,9 @@ bool BuildUpsertRowsEffect(const TKqlUpsertRows& node, TExprContext& ctx, const .Settings() .Table(node.Table()) .Columns(node.Columns()) + .InconsistentWrite(settings.AllowInconsistentWrites + ? ctx.NewAtom(node.Pos(), "true") + : ctx.NewAtom(node.Pos(), "false")) .Settings() .Build() .Build() @@ -339,7 +346,7 @@ bool BuildUpsertRowsEffect(const TKqlUpsertRows& node, TExprContext& ctx, const .Name("row") .Done(); - if (table.Metadata->Kind == EKikimrTableKind::Olap) { + if (table.Metadata->Kind == EKikimrTableKind::Olap || settings.AllowInconsistentWrites) { // OLAP is expected to write into all shards (hash partitioning), // so we use serveral sinks for this without union all. // (TODO: shuffle by shard instead of DqCnMap) diff --git a/ydb/core/kqp/opt/kqp_opt_kql.cpp b/ydb/core/kqp/opt/kqp_opt_kql.cpp index ec833baf9365..8114c0e0177e 100644 --- a/ydb/core/kqp/opt/kqp_opt_kql.cpp +++ b/ydb/core/kqp/opt/kqp_opt_kql.cpp @@ -227,6 +227,7 @@ TExprBase BuildUpsertTable(const TKiWriteTable& write, const TCoAtomList& inputC YQL_ENSURE(generateColumnsIfInsertNode); TCoAtomList generateColumnsIfInsert = TCoNameValueTuple(generateColumnsIfInsertNode).Value().Cast(); + auto settings = FilterSettings(write.Settings().Ref(), {"AllowInconsistentWrites"}, ctx); const auto [input, columns] = BuildWriteInput(write, table, inputColumns, autoincrement, write.Pos(), ctx); if (generateColumnsIfInsert.Ref().ChildrenSize() > 0) { return Build(ctx, write.Pos()) @@ -243,6 +244,7 @@ TExprBase BuildUpsertTable(const TKiWriteTable& write, const TCoAtomList& inputC .Input(input.Ptr()) .Columns(columns.Ptr()) .ReturningColumns(write.ReturningColumns()) + .Settings(settings) .Done(); return effect; @@ -252,6 +254,7 @@ TExprBase BuildUpsertTableWithIndex(const TKiWriteTable& write, const TCoAtomLis const TCoAtomList& autoincrement, const TKikimrTableDescription& table, TExprContext& ctx) { + auto settings = FilterSettings(write.Settings().Ref(), {"AllowInconsistentWrites"}, ctx); const auto [input, columns] = BuildWriteInput(write, table, inputColumns, autoincrement, write.Pos(), ctx); auto generateColumnsIfInsertNode = GetSetting(write.Settings().Ref(), "generate_columns_if_insert"); YQL_ENSURE(generateColumnsIfInsertNode); @@ -263,6 +266,7 @@ TExprBase BuildUpsertTableWithIndex(const TKiWriteTable& write, const TCoAtomLis .Columns(columns.Ptr()) .ReturningColumns(write.ReturningColumns()) .GenerateColumnsIfInsert(generateColumnsIfInsert) + .Settings(settings) .Done(); return effect; @@ -272,12 +276,14 @@ TExprBase BuildReplaceTable(const TKiWriteTable& write, const TCoAtomList& input const TCoAtomList& autoincrement, const TKikimrTableDescription& table, TExprContext& ctx) { + auto settings = FilterSettings(write.Settings().Ref(), {"AllowInconsistentWrites"}, ctx); const auto [input, columns] = BuildWriteInput(write, table, inputColumns, autoincrement, write.Pos(), ctx); auto effect = Build(ctx, write.Pos()) .Table(BuildTableMeta(table, write.Pos(), ctx)) .Input(input.Ptr()) .Columns(columns) .ReturningColumns(write.ReturningColumns()) + .Settings(settings) .Done(); return effect; @@ -287,6 +293,7 @@ TExprBase BuildReplaceTableWithIndex(const TKiWriteTable& write, const TCoAtomLi const TCoAtomList& autoincrement, const TKikimrTableDescription& table, TExprContext& ctx) { + auto settings = FilterSettings(write.Settings().Ref(), {"AllowInconsistentWrites"}, ctx); const auto [input, columns] = BuildWriteInput(write, table, inputColumns, autoincrement, write.Pos(), ctx); auto effect = Build(ctx, write.Pos()) .Table(BuildTableMeta(table, write.Pos(), ctx)) @@ -294,6 +301,7 @@ TExprBase BuildReplaceTableWithIndex(const TKiWriteTable& write, const TCoAtomLi .Columns(columns.Ptr()) .ReturningColumns(write.ReturningColumns()) .GenerateColumnsIfInsert().Build() + .Settings(settings) .Done(); return effect; diff --git a/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp b/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp index af198f729c44..d24e6d3d1410 100644 --- a/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_type_ann.cpp @@ -615,26 +615,34 @@ class TKiSinkTypeAnnotationTransformer : public TKiSinkVisitorTransformer generateColumnsIfInsert.push_back(ctx.NewAtom(node.Pos(), generatedColumn)); } - node.Ptr()->ChildRef(TKiWriteTable::idx_Settings) = Build(ctx, node.Pos()) - .Add(node.Settings()) - .Add() + TVector settings; + for (const auto& setting : node.Settings()) { + settings.push_back(setting); + } + settings.push_back( + Build(ctx, node.Pos()) .Name().Build("input_columns") .Value() .Add(columns) .Build() - .Build() - .Add() + .Done()); + settings.push_back( + Build(ctx, node.Pos()) .Name().Build("default_constraint_columns") .Value() .Add(defaultConstraintColumns) .Build() - .Build() - .Add() + .Done()); + settings.push_back( + Build(ctx, node.Pos()) .Name().Build("generate_columns_if_insert") .Value() .Add(generateColumnsIfInsert) .Build() - .Build() + .Done()); + + node.Ptr()->ChildRef(TKiWriteTable::idx_Settings) = Build(ctx, node.Pos()) + .Add(settings) .Done() .Ptr(); diff --git a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp index 9e3db46f023a..a33ea3c888ab 100644 --- a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp +++ b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp @@ -752,7 +752,7 @@ class TKqpQueryCompiler : public IKqpQueryCompiler { stageProto.SetOutputsCount(outputsCount); // Dq sinks - bool hasTableSink = false; + bool hasTxTableSink = false; if (auto maybeOutputsNode = stage.Outputs()) { auto outputsNode = maybeOutputsNode.Cast(); for (size_t i = 0; i < outputsNode.Size(); ++i) { @@ -764,12 +764,17 @@ class TKqpQueryCompiler : public IKqpQueryCompiler { FillSink(sinkNode, sinkProto, tablesMap, ctx); sinkProto->SetOutputIndex(FromString(TStringBuf(sinkNode.Index()))); - // Only sinks to ydb tables can be considered as effects. - hasTableSink |= IsTableSink(sinkNode.DataSink().Cast().Category()); + if (IsTableSink(sinkNode.DataSink().Cast().Category())) { + // Only sinks with transactions to ydb tables can be considered as effects. + // Inconsistent internal sinks and external sinks (like S3) aren't effects. + auto settings = sinkNode.Settings().Maybe(); + YQL_ENSURE(settings); + hasTxTableSink |= settings.InconsistentWrite().Cast().StringValue() != "true"; + } } } - stageProto.SetIsEffectsStage(hasEffects || hasTableSink); + stageProto.SetIsEffectsStage(hasEffects || hasTxTableSink); auto paramsType = CollectParameters(stage, ctx); auto programBytecode = NDq::BuildProgram(stage.Program(), *paramsType, *KqlCompiler, TypeEnv, FuncRegistry, @@ -1033,6 +1038,7 @@ class TKqpQueryCompiler : public IKqpQueryCompiler { for (const auto& columnName : tableMeta->KeyColumnNames) { const auto columnMeta = tableMeta->Columns.FindPtr(columnName); YQL_ENSURE(columnMeta != nullptr, "Unknown column in sink: \"" + columnName + "\""); + auto keyColumnProto = settingsProto.AddKeyColumns(); keyColumnProto->SetId(columnMeta->Id); keyColumnProto->SetName(columnName); @@ -1062,6 +1068,10 @@ class TKqpQueryCompiler : public IKqpQueryCompiler { } } + if (const auto inconsistentWrite = settings.InconsistentWrite().Cast(); inconsistentWrite.StringValue() == "true") { + settingsProto.SetInconsistentTx(true); + } + internalSinkProto.MutableSettings()->PackFrom(settingsProto); } else { YQL_ENSURE(false, "Unsupported sink type"); diff --git a/ydb/core/kqp/runtime/kqp_write_actor.cpp b/ydb/core/kqp/runtime/kqp_write_actor.cpp index 4638bc925a30..65b2141608e9 100644 --- a/ydb/core/kqp/runtime/kqp_write_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_write_actor.cpp @@ -25,15 +25,16 @@ namespace { constexpr i64 kInFlightMemoryLimitPerActor = 64_MB; constexpr i64 kMemoryLimitPerMessage = 48_MB; - constexpr i64 kMaxBatchesPerMessage = 8; + constexpr i64 kMaxBatchesPerMessage = 1; struct TWriteActorBackoffSettings { - TDuration StartRetryDelay = TDuration::MilliSeconds(200); - TDuration MaxRetryDelay = TDuration::Seconds(20); + TDuration StartRetryDelay = TDuration::MilliSeconds(250); + TDuration MaxRetryDelay = TDuration::Seconds(5); double UnsertaintyRatio = 0.5; double Multiplier = 2.0; - ui64 MaxWriteAttempts = 16; + ui64 MaxWriteAttempts = 32; + ui64 MaxResolveAttempts = 5; }; const TWriteActorBackoffSettings* BackoffSettings() { @@ -92,7 +93,9 @@ class TKqpWriteActor : public TActorBootstrapped, public NYql::N void CheckMemory() { const auto freeSpace = Writer.GetFreeSpace(); - if (freeSpace > LastFreeMemory) { + const auto targetMemory = Writer.MemoryLimit / 2; + if (freeSpace >= targetMemory && targetMemory > LastFreeMemory) { + YQL_ENSURE(freeSpace > 0); Writer.ResumeExecution(); } LastFreeMemory = freeSpace; @@ -108,6 +111,7 @@ class TKqpWriteActor : public TActorBootstrapped, public NYql::N struct TEvPrivate { enum EEv { EvShardRequestTimeout = EventSpaceBegin(TKikimrEvents::ES_PRIVATE), + EvResolveRequestPlanned, }; struct TEvShardRequestTimeout : public TEventLocal { @@ -117,6 +121,9 @@ class TKqpWriteActor : public TActorBootstrapped, public NYql::N : ShardId(shardId) { } }; + + struct TEvResolveRequestPlanned : public TEventLocal { + }; }; public: @@ -143,7 +150,7 @@ class TKqpWriteActor : public TActorBootstrapped, public NYql::N Settings.GetInconsistentTx()) { YQL_ENSURE(std::holds_alternative(TxId)); - YQL_ENSURE(!InconsistentTx && !ImmediateTx); + YQL_ENSURE(!ImmediateTx); EgressStats.Level = args.StatsLevel; } @@ -171,7 +178,7 @@ class TKqpWriteActor : public TActorBootstrapped, public NYql::N } i64 GetFreeSpace() const final { - const i64 result = ShardedWriteController + const i64 result = (ShardedWriteController && !IsResolving()) ? MemoryLimit - ShardedWriteController->GetMemory() : std::numeric_limits::min(); // Can't use zero here because compute can use overcommit! return result; @@ -195,7 +202,7 @@ class TKqpWriteActor : public TActorBootstrapped, public NYql::N Finished = finished; EgressStats.Resume(); - CA_LOG_D("New data: size=" << size << ", finished=" << finished << "."); + CA_LOG_D("New data: size=" << size << ", finished=" << finished << ", used memory=" << ShardedWriteController->GetMemory() << "."); YQL_ENSURE(ShardedWriteController); try { @@ -208,7 +215,6 @@ class TKqpWriteActor : public TActorBootstrapped, public NYql::N CurrentExceptionMessage(), NYql::NDqProto::StatusIds::INTERNAL_ERROR); } - ProcessBatches(); } @@ -229,7 +235,36 @@ class TKqpWriteActor : public TActorBootstrapped, public NYql::N } } + bool IsResolving() const { + return ResolveAttempts > 0; + } + + void RetryResolveTable() { + if (!IsResolving()) { + ResolveTable(); + } + } + + void PlanResolveTable() { + TlsActivationContext->Schedule( + CalculateNextAttemptDelay(ResolveAttempts), + new IEventHandle(SelfId(), SelfId(), new TEvPrivate::TEvResolveRequestPlanned{}, 0, 0)); + } + void ResolveTable() { + SchemeEntry.reset(); + SchemeRequest.reset(); + + if (ResolveAttempts++ >= BackoffSettings()->MaxResolveAttempts) { + const auto error = TStringBuilder() + << "Too many table resolve attempts for Sink=" << this->SelfId() << "."; + CA_LOG_E(error); + RuntimeError( + error, + NYql::NDqProto::StatusIds::SCHEME_ERROR); + return; + } + CA_LOG_D("Resolve TableId=" << TableId); TAutoPtr request(new NSchemeCache::TSchemeCacheNavigate()); NSchemeCache::TSchemeCacheNavigate::TEntry entry; @@ -238,13 +273,16 @@ class TKqpWriteActor : public TActorBootstrapped, public NYql::N entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpTable; entry.SyncVersion = false; request->ResultSet.emplace_back(entry); + + Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvInvalidateTable(TableId, {})); Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(request)); } void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) { if (ev->Get()->Request->ErrorCount > 0) { - RuntimeError(TStringBuilder() << "Failed to get table: " - << TableId << "'", NYql::NDqProto::StatusIds::SCHEME_ERROR); + CA_LOG_E(TStringBuilder() << "Failed to get table: " + << TableId << "'"); + PlanResolveTable(); return; } auto& resultSet = ev->Get()->Request->ResultSet; @@ -271,6 +309,7 @@ class TKqpWriteActor : public TActorBootstrapped, public NYql::N void ResolveShards() { YQL_ENSURE(!SchemeRequest || InconsistentTx); YQL_ENSURE(SchemeEntry); + CA_LOG_D("Resolve shards for TableId=" << TableId); TVector columns; TVector keyColumnTypes; @@ -297,18 +336,21 @@ class TKqpWriteActor : public TActorBootstrapped, public NYql::N } void Handle(TEvTxProxySchemeCache::TEvResolveKeySetResult::TPtr& ev) { - YQL_ENSURE(!SchemeRequest); + YQL_ENSURE(!SchemeRequest || InconsistentTx); auto* request = ev->Get()->Request.Get(); if (request->ErrorCount > 0) { - RuntimeError(TStringBuilder() << "Failed to get table: " - << TableId << "'", NYql::NDqProto::StatusIds::SCHEME_ERROR); + CA_LOG_E(TStringBuilder() << "Failed to get table: " + << TableId << "'"); + PlanResolveTable(); return; } YQL_ENSURE(request->ResultSet.size() == 1); SchemeRequest = std::move(request->ResultSet[0]); + CA_LOG_D("Resolved shards for TableId=" << TableId << ". PartitionsCount=" << SchemeRequest->KeyDescription->GetPartitions().size() << "."); + Prepare(); } @@ -324,7 +366,8 @@ class TKqpWriteActor : public TActorBootstrapped, public NYql::N CA_LOG_E("Got UNSPECIFIED for table `" << SchemeEntry->TableId.PathId.ToString() << "`." << " ShardID=" << ev->Get()->Record.GetOrigin() << "," - << " Sink=" << this->SelfId() << "."); + << " Sink=" << this->SelfId() << "." + << getIssues().ToOneLineString()); RuntimeError( TStringBuilder() << "Got UNSPECIFIED for table `" << SchemeEntry->TableId.PathId.ToString() << "`.", @@ -343,7 +386,8 @@ class TKqpWriteActor : public TActorBootstrapped, public NYql::N CA_LOG_E("Got ABORTED for table `" << SchemeEntry->TableId.PathId.ToString() << "`." << " ShardID=" << ev->Get()->Record.GetOrigin() << "," - << " Sink=" << this->SelfId() << "."); + << " Sink=" << this->SelfId() << "." + << getIssues().ToOneLineString()); RuntimeError( TStringBuilder() << "Got ABORTED for table `" << SchemeEntry->TableId.PathId.ToString() << "`.", @@ -355,12 +399,20 @@ class TKqpWriteActor : public TActorBootstrapped, public NYql::N CA_LOG_E("Got INTERNAL ERROR for table `" << SchemeEntry->TableId.PathId.ToString() << "`." << " ShardID=" << ev->Get()->Record.GetOrigin() << "," - << " Sink=" << this->SelfId() << "."); - RuntimeError( - TStringBuilder() << "Got INTERNAL ERROR for table `" - << SchemeEntry->TableId.PathId.ToString() << "`.", - NYql::NDqProto::StatusIds::INTERNAL_ERROR, - getIssues()); + << " Sink=" << this->SelfId() << "." + << getIssues().ToOneLineString()); + + // TODO: Add new status for splits in datashard. This is tmp solution. + if (getIssues().ToOneLineString().Contains("in a pre/offline state assuming this is due to a finished split (wrong shard state)")) { + ResetShardRetries(ev->Get()->Record.GetOrigin(), ev->Cookie); + RetryResolveTable(); + } else { + RuntimeError( + TStringBuilder() << "Got INTERNAL ERROR for table `" + << SchemeEntry->TableId.PathId.ToString() << "`.", + NYql::NDqProto::StatusIds::INTERNAL_ERROR, + getIssues()); + } return; } case NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED: { @@ -368,15 +420,17 @@ class TKqpWriteActor : public TActorBootstrapped, public NYql::N << SchemeEntry->TableId.PathId.ToString() << "`." << " ShardID=" << ev->Get()->Record.GetOrigin() << "," << " Sink=" << this->SelfId() << "." - << " Ignored this error."); - // TODO: more retries + << " Ignored this error." + << getIssues().ToOneLineString()); + // TODO: support waiting return; } case NKikimrDataEvents::TEvWriteResult::STATUS_CANCELLED: { CA_LOG_E("Got CANCELLED for table `" << SchemeEntry->TableId.PathId.ToString() << "`." << " ShardID=" << ev->Get()->Record.GetOrigin() << "," - << " Sink=" << this->SelfId() << "."); + << " Sink=" << this->SelfId() << "." + << getIssues().ToOneLineString()); RuntimeError( TStringBuilder() << "Got CANCELLED for table `" << SchemeEntry->TableId.PathId.ToString() << "`.", @@ -388,7 +442,8 @@ class TKqpWriteActor : public TActorBootstrapped, public NYql::N CA_LOG_E("Got BAD REQUEST for table `" << SchemeEntry->TableId.PathId.ToString() << "`." << " ShardID=" << ev->Get()->Record.GetOrigin() << "," - << " Sink=" << this->SelfId() << "."); + << " Sink=" << this->SelfId() << "." + << getIssues().ToOneLineString()); RuntimeError( TStringBuilder() << "Got BAD REQUEST for table `" << SchemeEntry->TableId.PathId.ToString() << "`.", @@ -400,9 +455,11 @@ class TKqpWriteActor : public TActorBootstrapped, public NYql::N CA_LOG_E("Got SCHEME CHANGED for table `" << SchemeEntry->TableId.PathId.ToString() << "`." << " ShardID=" << ev->Get()->Record.GetOrigin() << "," - << " Sink=" << this->SelfId() << "."); + << " Sink=" << this->SelfId() << "." + << getIssues().ToOneLineString()); if (InconsistentTx) { - ResolveTable(); + ResetShardRetries(ev->Get()->Record.GetOrigin(), ev->Cookie); + RetryResolveTable(); } else { RuntimeError( TStringBuilder() << "Got SCHEME CHANGED for table `" @@ -416,7 +473,8 @@ class TKqpWriteActor : public TActorBootstrapped, public NYql::N CA_LOG_E("Got LOCKS BROKEN for table `" << SchemeEntry->TableId.PathId.ToString() << "`." << " ShardID=" << ev->Get()->Record.GetOrigin() << "," - << " Sink=" << this->SelfId() << "."); + << " Sink=" << this->SelfId() << "." + << getIssues().ToOneLineString()); RuntimeError( TStringBuilder() << "Got LOCKS BROKEN for table `" << SchemeEntry->TableId.PathId.ToString() << "`.", @@ -538,20 +596,30 @@ class TKqpWriteActor : public TActorBootstrapped, public NYql::N if (SchemeEntry->Kind != NSchemeCache::TSchemeCacheNavigate::KindColumnTable) { TlsActivationContext->Schedule( CalculateNextAttemptDelay(metadata->SendAttempts), - new IEventHandle(SelfId(), SelfId(), new TEvPrivate::TEvShardRequestTimeout(shardId), 0, metadata->Cookie)); + new IEventHandle( + SelfId(), + SelfId(), + new TEvPrivate::TEvShardRequestTimeout(shardId), + 0, + metadata->Cookie)); } } void RetryShard(const ui64 shardId, const std::optional ifCookieEqual) { const auto metadata = ShardedWriteController->GetMessageMetadata(shardId); if (!metadata || (ifCookieEqual && metadata->Cookie != ifCookieEqual)) { + CA_LOG_D("Retry failed: not found ShardID=" << shardId << " with Cookie=" << ifCookieEqual.value_or(0)); return; } - CA_LOG_T("Retry ShardID=" << shardId << " with Cookie=" << ifCookieEqual.value_or(0)); + CA_LOG_D("Retry ShardID=" << shardId << " with Cookie=" << ifCookieEqual.value_or(0)); SendDataToShard(shardId); } + void ResetShardRetries(const ui64 shardId, const ui64 cookie) { + ShardedWriteController->ResetRetries(shardId, cookie); + } + void Handle(TEvPrivate::TEvShardRequestTimeout::TPtr& ev) { CA_LOG_W("Timeout shardID=" << ev->Get()->ShardId); RetryShard(ev->Get()->ShardId, ev->Cookie); @@ -581,6 +649,7 @@ class TKqpWriteActor : public TActorBootstrapped, public NYql::N void Prepare() { YQL_ENSURE(SchemeEntry); + ResolveAttempts = 0; if (!ShardedWriteController) { TVector columnsMetadata; @@ -619,6 +688,7 @@ class TKqpWriteActor : public TActorBootstrapped, public NYql::N CurrentExceptionMessage(), NYql::NDqProto::StatusIds::INTERNAL_ERROR); } + ProcessBatches(); } void ResumeExecution() { @@ -645,6 +715,7 @@ class TKqpWriteActor : public TActorBootstrapped, public NYql::N std::optional SchemeEntry; std::optional SchemeRequest; + ui64 ResolveAttempts = 0; THashMap LocksInfo; bool Finished = false; diff --git a/ydb/core/kqp/runtime/kqp_write_table.cpp b/ydb/core/kqp/runtime/kqp_write_table.cpp index 3459c61dc8fd..8087a19f1074 100644 --- a/ydb/core/kqp/runtime/kqp_write_table.cpp +++ b/ydb/core/kqp/runtime/kqp_write_table.cpp @@ -19,7 +19,7 @@ namespace NKqp { namespace { constexpr ui64 MaxBatchBytes = 8_MB; -constexpr ui64 MaxUnshardedBatchBytes = 8_MB; +constexpr ui64 MaxUnshardedBatchBytes = 4_MB; TVector BuildColumns(const TConstArrayRef inputColumns) { TVector result; @@ -489,6 +489,7 @@ class TDataShardPayloadSerializer : public IPayloadSerializer { TVector cells(Columns.size()); data.ForEachRow([&](const auto& row) { for (size_t index = 0; index < Columns.size(); ++index) { + // TODO: move to SerializedVector cells[WriteIndex[index]] = MakeCell( Columns[index].PType, row.GetElement(index), @@ -506,6 +507,7 @@ class TDataShardPayloadSerializer : public IPayloadSerializer { YQL_ENSURE(datashardBatch); auto data = datashardBatch->Extract(); const auto rows = data.size() / Columns.size(); + YQL_ENSURE(data.size() == rows * Columns.size()); for (size_t rowIndex = 0; rowIndex < rows; ++rowIndex) { AddRow( @@ -633,8 +635,11 @@ class TShardsInfo { public: class TShardInfo { friend class TShardsInfo; - TShardInfo(i64& memory) - : Memory(memory) { + TShardInfo(i64& memory, ui64& nextCookie, bool& closed) + : Memory(memory) + , NextCookie(nextCookie) + , Cookie(NextCookie++) + , Closed(closed) { } public: @@ -654,10 +659,6 @@ class TShardsInfo { return IsClosed() && IsEmpty(); } - void Close() { - Closed = true; - } - void MakeNextBatches(i64 maxDataSize, ui64 maxCount) { YQL_ENSURE(BatchesInFlight == 0); i64 dataSize = 0; @@ -682,7 +683,7 @@ class TShardsInfo { Batches.pop_front(); } - ++Cookie; + Cookie = NextCookie++; SendAttempts = 0; BatchesInFlight = 0; @@ -714,12 +715,19 @@ class TShardsInfo { ++SendAttempts; } + void ResetSendAttempts() { + SendAttempts = 0; + } + private: std::deque Batches; - bool Closed = false; i64& Memory; - ui64 Cookie = 1; + ui64& NextCookie; + ui64 Cookie; + + bool& Closed; + ui32 SendAttempts = 0; size_t BatchesInFlight = 0; }; @@ -730,7 +738,7 @@ class TShardsInfo { return it->second; } - auto [insertIt, _] = ShardsInfo.emplace(shard, TShardInfo(Memory)); + auto [insertIt, _] = ShardsInfo.emplace(shard, TShardInfo(Memory, NextCookie, Closed)); return insertIt->second; } @@ -777,40 +785,61 @@ class TShardsInfo { void Clear() { ShardsInfo = {}; Memory = 0; + Closed = false; + } + + void Close() { + Closed = true; } private: THashMap ShardsInfo; i64 Memory = 0; + ui64 NextCookie = 1; + bool Closed = false; }; class TShardedWriteController : public IShardedWriteController { public: void OnPartitioningChanged(const NSchemeCache::TSchemeCacheNavigate::TEntry& schemeEntry) override { - if (Serializer) { - FlushSerializer(true); - } + BeforePartitioningChanged(); Serializer = CreateColumnShardPayloadSerializer( schemeEntry, InputColumnsMetadata, TypeEnv); - ReshardData(); - ShardsInfo.Clear(); + AfterPartitioningChanged(); } void OnPartitioningChanged( const NSchemeCache::TSchemeCacheNavigate::TEntry& schemeEntry, NSchemeCache::TSchemeCacheRequest::TEntry&& partitionsEntry) override { - if (Serializer) { - FlushSerializer(true); - } + BeforePartitioningChanged(); Serializer = CreateDataShardPayloadSerializer( schemeEntry, std::move(partitionsEntry), InputColumnsMetadata, TypeEnv); + AfterPartitioningChanged(); + } + + void BeforePartitioningChanged() { + if (Serializer) { + if (!Closed) { + Serializer->Close(); + } + FlushSerializer(true); + } + } + + void AfterPartitioningChanged() { + ShardsInfo.Close(); ReshardData(); ShardsInfo.Clear(); + if (Closed) { + Close(); + } else { + FlushSerializer(GetMemory() >= Settings.MemoryLimitTotal); + } } void AddData(NMiniKQL::TUnboxedValueBatch&& data) override { @@ -829,9 +858,7 @@ class TShardedWriteController : public IShardedWriteController { Serializer->Close(); FlushSerializer(true); YQL_ENSURE(Serializer->IsFinished()); - for (auto& [shardId, shardInfo] : ShardsInfo.GetShards()) { - shardInfo.Close(); - } + ShardsInfo.Close(); } TVector GetPendingShards() const override { @@ -896,6 +923,14 @@ class TShardedWriteController : public IShardedWriteController { shardInfo.IncSendAttempts(); } + void ResetRetries(ui64 shardId, ui64 cookie) override { + auto& shardInfo = ShardsInfo.GetShard(shardId); + if (shardInfo.IsEmpty() || shardInfo.GetCookie() != cookie) { + return; + } + shardInfo.ResetSendAttempts(); + } + i64 GetMemory() const override { YQL_ENSURE(Serializer); return Serializer->GetMemory() + ShardsInfo.GetMemory(); @@ -960,8 +995,6 @@ class TShardedWriteController : public IShardedWriteController { } } - TString LogPrefix = "ShardedWriteController"; - TShardedWriteControllerSettings Settings; TVector InputColumnsMetadata; const NMiniKQL::TTypeEnvironment& TypeEnv; diff --git a/ydb/core/kqp/runtime/kqp_write_table.h b/ydb/core/kqp/runtime/kqp_write_table.h index d09df441e9c7..7846cb954cc6 100644 --- a/ydb/core/kqp/runtime/kqp_write_table.h +++ b/ydb/core/kqp/runtime/kqp_write_table.h @@ -90,6 +90,8 @@ class IShardedWriteController : public TThrRefBase { virtual std::optional OnMessageAcknowledged(ui64 shardId, ui64 cookie) = 0; virtual void OnMessageSent(ui64 shardId, ui64 cookie) = 0; + virtual void ResetRetries(ui64 shardId, ui64 cookie) = 0; + virtual i64 GetMemory() const = 0; virtual bool IsClosed() const = 0; diff --git a/ydb/core/kqp/session_actor/kqp_query_state.h b/ydb/core/kqp/session_actor/kqp_query_state.h index 7cf13a2a14ee..cc43b58f04e1 100644 --- a/ydb/core/kqp/session_actor/kqp_query_state.h +++ b/ydb/core/kqp/session_actor/kqp_query_state.h @@ -329,8 +329,8 @@ class TKqpQueryState : public TNonCopyable { return true; } - if (HasSinkInTx(tx)) { - // At current time sinks require separate tnx with commit. + if (HasTxSinkInTx(tx)) { + // At current time transactional internal sinks require separate tnx with commit. return false; } @@ -392,7 +392,7 @@ class TKqpQueryState : public TNonCopyable { if (TxCtx->CanDeferEffects()) { // At current time sinks require separate tnx with commit. - while (tx && tx->GetHasEffects() && !HasSinkInTx(tx)) { + while (tx && tx->GetHasEffects() && !HasTxSinkInTx(tx)) { QueryData->CreateKqpValueMap(tx); bool success = TxCtx->AddDeferredEffect(tx, QueryData); YQL_ENSURE(success); @@ -409,10 +409,16 @@ class TKqpQueryState : public TNonCopyable { return tx; } - bool HasSinkInTx(const TKqpPhyTxHolder::TConstPtr& tx) const { + bool HasTxSinkInTx(const TKqpPhyTxHolder::TConstPtr& tx) const { for (const auto& stage : tx->GetStages()) { - if (!stage.GetSinks().empty()) { - return true; + for (const auto& sink : stage.GetSinks()) { + if (sink.GetTypeCase() == NKqpProto::TKqpSink::kInternalSink && sink.GetInternalSink().GetSettings().Is()) { + NKikimrKqp::TKqpTableSinkSettings settings; + YQL_ENSURE(sink.GetInternalSink().GetSettings().UnpackTo(&settings), "Failed to unpack settings"); + if (!settings.GetInconsistentTx()) { + return true; + } + } } } return false; diff --git a/ydb/core/kqp/ut/query/kqp_query_ut.cpp b/ydb/core/kqp/ut/query/kqp_query_ut.cpp index 92b4541eee4a..995bd1d50e10 100644 --- a/ydb/core/kqp/ut/query/kqp_query_ut.cpp +++ b/ydb/core/kqp/ut/query/kqp_query_ut.cpp @@ -1787,6 +1787,7 @@ Y_UNIT_TEST_SUITE(KqpQuery) { Y_UNIT_TEST(OltpCreateAsSelect_Simple) { NKikimrConfig::TAppConfig appConfig; appConfig.MutableTableServiceConfig()->SetEnableOlapSink(true); + appConfig.MutableTableServiceConfig()->SetEnableOltpSink(true); appConfig.MutableTableServiceConfig()->SetEnablePreparedDdl(true); appConfig.MutableTableServiceConfig()->SetEnableCreateTableAs(true); appConfig.MutableTableServiceConfig()->SetEnablePerStatementQueryExecution(true); diff --git a/ydb/library/yql/core/yql_opt_utils.cpp b/ydb/library/yql/core/yql_opt_utils.cpp index 37ae3d11ec19..576afe9ed0cc 100644 --- a/ydb/library/yql/core/yql_opt_utils.cpp +++ b/ydb/library/yql/core/yql_opt_utils.cpp @@ -555,6 +555,17 @@ TExprNode::TPtr GetSetting(const TExprNode& settings, const TStringBuf& name) { return nullptr; } +TExprNode::TPtr FilterSettings(const TExprNode& settings, const THashSet& names, TExprContext& ctx) { + TExprNode::TListType children; + for (auto setting : settings.Children()) { + if (setting->ChildrenSize() != 0 && names.contains(setting->Head().Content())) { + children.push_back(setting); + } + } + + return ctx.NewList(settings.Pos(), std::move(children)); +} + bool HasSetting(const TExprNode& settings, const TStringBuf& name) { return GetSetting(settings, name) != nullptr; } diff --git a/ydb/library/yql/core/yql_opt_utils.h b/ydb/library/yql/core/yql_opt_utils.h index 4dc04cae82fb..69759bf7e7eb 100644 --- a/ydb/library/yql/core/yql_opt_utils.h +++ b/ydb/library/yql/core/yql_opt_utils.h @@ -51,6 +51,7 @@ const TTypeAnnotationNode* RemoveOptionalType(const TTypeAnnotationNode* type); const TTypeAnnotationNode* RemoveAllOptionals(const TTypeAnnotationNode* type); TExprNode::TPtr GetSetting(const TExprNode& settings, const TStringBuf& name); +TExprNode::TPtr FilterSettings(const TExprNode& settings, const THashSet& names, TExprContext& ctx); bool HasSetting(const TExprNode& settings, const TStringBuf& name); bool HasAnySetting(const TExprNode& settings, const THashSet& names); TExprNode::TPtr RemoveSetting(const TExprNode& settings, const TStringBuf& name, TExprContext& ctx);