Skip to content

Commit

Permalink
Dynamic S3 Listing (ydb-platform#2398)
Browse files Browse the repository at this point in the history
Co-authored-by: Yaroslav Plishan <80714170+MetaGigachad@users.noreply.github.com>
  • Loading branch information
Hor911 and MetaGigachad authored Mar 3, 2024
1 parent 3f96a63 commit dd39aa2
Show file tree
Hide file tree
Showing 52 changed files with 1,242 additions and 396 deletions.
3 changes: 3 additions & 0 deletions ydb/core/base/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,9 @@ struct TKikimrEvents : TEvents {
ES_TABLE_CREATOR,
ES_PQ_PARTITION_CHOOSER,
ES_GRAPH,
ES_REPLICATION_SERVICE,
ES_CHANGE_EXCHANGE_NEW, // rename ES_CHANGE_EXCHANGE to if path with rename of old ES_CHANGE_EXCHANGE to ES_CHANGE_EXCHANGE_DATASHARD is merged
ES_S3_FILE_QUEUE,
};
};

Expand Down
2 changes: 0 additions & 2 deletions ydb/core/fq/libs/actors/proxy_private.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ namespace NKikimr {

namespace NFq {

NActors::TActorId MakeYqPrivateProxyId();

NActors::IActor* CreateYqlAnalyticsPrivateProxy(
const NConfig::TPrivateProxyConfig& privateProxyConfig,
TIntrusivePtr<ITimeProvider> timeProvider,
Expand Down
10 changes: 10 additions & 0 deletions ydb/core/fq/libs/checkpointing/checkpoint_coordinator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,16 @@ void TCheckpointCoordinator::Handle(NActors::TEvInterconnect::TEvNodeConnected::
}
}

void TCheckpointCoordinator::Handle(NActors::TEvents::TEvUndelivered::TPtr& ev) {
CC_LOG_D("Handle undelivered");

if (const auto actorIt = AllActors.find(ev->Sender); actorIt != AllActors.end()) {
actorIt->second->EventsQueue.HandleUndelivered(ev);
}

NYql::TTaskControllerImpl<TCheckpointCoordinator>::OnUndelivered(ev);
}

void TCheckpointCoordinator::Handle(NActors::TEvents::TEvPoison::TPtr& ev) {
CC_LOG_D("Got TEvPoison");
Send(ev->Sender, new NActors::TEvents::TEvPoisonTaken(), 0, ev->Cookie);
Expand Down
7 changes: 4 additions & 3 deletions ydb/core/fq/libs/checkpointing/checkpoint_coordinator.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ class TCheckpointCoordinator : public NYql::TTaskControllerImpl<TCheckpointCoord
void Handle(NActors::TEvents::TEvPoison::TPtr&);
void Handle(NActors::TEvInterconnect::TEvNodeDisconnected::TPtr& ev);
void Handle(NActors::TEvInterconnect::TEvNodeConnected::TPtr& ev);
void Handle(NActors::TEvents::TEvUndelivered::TPtr& ev);
void Handle(const TEvCheckpointCoordinator::TEvRunGraph::TPtr&);
void HandleException(const std::exception& err);

Expand Down Expand Up @@ -89,13 +90,13 @@ class TCheckpointCoordinator : public NYql::TTaskControllerImpl<TCheckpointCoord
hFunc(NYql::NDq::TEvRetryQueuePrivate::TEvRetry, Handle)

hFunc(NActors::TEvents::TEvPoison, Handle)
hFunc(NActors::TEvents::TEvUndelivered, NYql::TTaskControllerImpl<TCheckpointCoordinator>::OnUndelivered)
hFunc(NActors::TEvents::TEvWakeup, NYql::TTaskControllerImpl<TCheckpointCoordinator>::OnWakeup)

hFunc(NActors::TEvInterconnect::TEvNodeDisconnected, Handle)
hFunc(NActors::TEvInterconnect::TEvNodeConnected, Handle),
hFunc(NActors::TEvInterconnect::TEvNodeConnected, Handle)
hFunc(NActors::TEvents::TEvUndelivered, Handle)

ExceptionFunc(std::exception, HandleException)
, ExceptionFunc(std::exception, HandleException)
)

static constexpr char ActorName[] = "YQ_CHECKPOINT_COORDINATOR";
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#include "config.h"
#include "control_plane_proxy.h"
#include "probes.h"
#include "utils.h"

#include <ydb/core/fq/libs/actors/logging/log.h>
#include <ydb/core/fq/libs/compute/ydb/control_plane/compute_database_control_plane_service.h>
Expand All @@ -23,6 +22,7 @@
#include <ydb/core/fq/libs/control_plane_proxy/actors/utils.h>
#include <ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.h>
#include <ydb/core/fq/libs/control_plane_proxy/events/events.h>
#include <ydb/core/fq/libs/control_plane_proxy/utils/utils.h>
#include <ydb/public/lib/fq/scope.h>

#include <ydb/library/actors/core/actor.h>
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/fq/libs/control_plane_proxy/events/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -394,4 +394,6 @@ struct TEvControlPlaneProxy {
};
};

NActors::TActorId ControlPlaneProxyActorId();

}
9 changes: 9 additions & 0 deletions ydb/core/fq/libs/control_plane_proxy/utils/ya.make
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
LIBRARY()

PEERDIR(
ydb/public/api/protos
)

YQL_LAST_ABI_VERSION()

END()
1 change: 1 addition & 0 deletions ydb/core/fq/libs/control_plane_proxy/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ END()
RECURSE(
actors
events
utils
)

RECURSE_FOR_TESTS(
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/fq/libs/events/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,8 @@ struct TEvents {
};
};

NActors::TActorId MakeYqPrivateProxyId();

} // namespace NFq

template<>
Expand Down
3 changes: 1 addition & 2 deletions ydb/core/grpc_services/rpc_fq.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@
#include <ydb/core/grpc_services/service_fq.h>
#include <ydb/core/fq/libs/audit/events/events.h>
#include <ydb/core/fq/libs/audit/yq_audit_service.h>
#include <ydb/core/fq/libs/control_plane_proxy/control_plane_proxy.h>
#include <ydb/core/fq/libs/control_plane_proxy/events/events.h>
#include <ydb/core/fq/libs/control_plane_proxy/utils.h>
#include <ydb/core/fq/libs/control_plane_proxy/utils/utils.h>
#include <ydb/public/api/protos/draft/fq.pb.h>
#include <ydb/public/lib/fq/scope.h>

Expand Down
1 change: 0 additions & 1 deletion ydb/core/grpc_services/rpc_fq_internal.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
#include "rpc_deferrable.h"

#include <ydb/core/fq/libs/events/events.h>
#include <ydb/core/fq/libs/actors/proxy_private.h>
#include <ydb/core/fq/libs/protos/fq_private.pb.h>

#include <ydb/library/actors/core/hfunc.h>
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/grpc_services/service_fq.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
#include <memory>

#include <ydb/core/base/ticket_parser.h>
#include <ydb/core/fq/libs/control_plane_proxy/utils.h>
#include <ydb/core/fq/libs/control_plane_proxy/utils/utils.h>

namespace NKikimr {
namespace NGRpcService {
Expand Down
3 changes: 1 addition & 2 deletions ydb/core/grpc_services/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,7 @@ PEERDIR(
ydb/core/discovery
ydb/core/engine
ydb/core/formats
ydb/core/fq/libs/actors
ydb/core/fq/libs/control_plane_proxy
ydb/core/fq/libs/events
ydb/core/fq/libs/control_plane_proxy/events
ydb/core/grpc_services/base
ydb/core/grpc_services/counters
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/executer_actor/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ PEERDIR(
ydb/core/client/minikql_compile
ydb/core/formats
ydb/core/kqp/common
ydb/core/kqp/compute_actor
ydb/core/kqp/query_compiler
ydb/core/kqp/rm_service
ydb/core/kqp/topics
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/kqp/host/kqp_host.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1474,11 +1474,11 @@ class TKqpHost : public IKqpHost {
state->CredentialsFactory = FederatedQuerySetup->CredentialsFactory;
state->Configuration->WriteThroughDqIntegration = true;
state->Configuration->AllowAtomicUploadCommit = queryType == EKikimrQueryType::Script;

state->Configuration->Init(FederatedQuerySetup->S3GatewayConfig, TypesCtx);
state->Gateway = FederatedQuerySetup->HttpGateway;

auto dataSource = NYql::CreateS3DataSource(state, FederatedQuerySetup->HttpGateway);
auto dataSink = NYql::CreateS3DataSink(state, FederatedQuerySetup->HttpGateway);
auto dataSource = NYql::CreateS3DataSource(state);
auto dataSink = NYql::CreateS3DataSink(state);

TypesCtx->AddDataSource(NYql::S3ProviderName, std::move(dataSource));
TypesCtx->AddDataSink(NYql::S3ProviderName, std::move(dataSink));
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/node_service/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ PEERDIR(
ydb/core/base
ydb/core/cms/console
ydb/core/kqp/common
ydb/core/kqp/compute_actor
ydb/core/kqp/counters
ydb/core/mind
ydb/core/protos
Expand Down
11 changes: 6 additions & 5 deletions ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -944,11 +944,6 @@ class TKqpQueryCompiler : public IKqpQueryCompiler {
NYql::IDqIntegration* dqIntegration = provider->second->GetDqIntegration();
YQL_ENSURE(dqIntegration, "Unsupported dq source for provider: \"" << dataSourceCategory << "\"");
auto& externalSource = *protoSource->MutableExternalSource();
google::protobuf::Any& settings = *externalSource.MutableSettings();
TString& sourceType = *externalSource.MutableType();
dqIntegration->FillSourceSettings(source.Ref(), settings, sourceType);
YQL_ENSURE(!settings.type_url().empty(), "Data source provider \"" << dataSourceCategory << "\" did't fill dq source settings for its dq source node");
YQL_ENSURE(sourceType, "Data source provider \"" << dataSourceCategory << "\" did't fill dq source settings type for its dq source node");

// Partitioning
TVector<TString> partitionParams;
Expand All @@ -973,6 +968,12 @@ class TKqpQueryCompiler : public IKqpQueryCompiler {
externalSource.SetAuthInfo(CreateStructuredTokenParser(token).ToBuilder().RemoveSecrets().ToJson());
CreateStructuredTokenParser(token).ListReferences(SecretNames);
}

google::protobuf::Any& settings = *externalSource.MutableSettings();
TString& sourceType = *externalSource.MutableType();
dqIntegration->FillSourceSettings(source.Ref(), settings, sourceType, maxTasksPerStage);
YQL_ENSURE(!settings.type_url().empty(), "Data source provider \"" << dataSourceCategory << "\" didn't fill dq source settings for its dq source node");
YQL_ENSURE(sourceType, "Data source provider \"" << dataSourceCategory << "\" didn't fill dq source settings type for its dq source node");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ STRICT_STFUNC_EXC(TDqComputeActorCheckpoints::StateFunc,
hFunc(TEvDqCompute::TEvRun, Handle);
hFunc(NActors::TEvInterconnect::TEvNodeDisconnected, Handle);
hFunc(NActors::TEvInterconnect::TEvNodeConnected, Handle);
hFunc(NActors::TEvents::TEvUndelivered, Handle);
hFunc(TEvRetryQueuePrivate::TEvRetry, Handle);
hFunc(TEvents::TEvWakeup, Handle);
cFunc(TEvents::TEvPoisonPill::EventType, PassAway);,
Expand Down Expand Up @@ -393,6 +394,13 @@ void TDqComputeActorCheckpoints::Handle(NActors::TEvInterconnect::TEvNodeConnect
EventsQueue.HandleNodeConnected(ev->Get()->NodeId);
}

void TDqComputeActorCheckpoints::Handle(NActors::TEvents::TEvUndelivered::TPtr& ev) {
LOG_D("Handle undelivered");
if (!EventsQueue.HandleUndelivered(ev)) {
LOG_E("TEvUndelivered: " << ev->Get()->SourceType);
}
}

void TDqComputeActorCheckpoints::Handle(TEvRetryQueuePrivate::TEvRetry::TPtr& ev) {
Y_UNUSED(ev);
EventsQueue.Retry();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ class TDqComputeActorCheckpoints : public NActors::TActor<TDqComputeActorCheckpo
void Handle(NActors::TEvents::TEvPoison::TPtr&);
void Handle(NActors::TEvInterconnect::TEvNodeDisconnected::TPtr& ev);
void Handle(NActors::TEvInterconnect::TEvNodeConnected::TPtr& ev);
void Handle(NActors::TEvents::TEvUndelivered::TPtr& ev);
void Handle(TEvRetryQueuePrivate::TEvRetry::TPtr& ev);
void Handle(NActors::TEvents::TEvWakeup::TPtr& ev);
void HandleException(const std::exception& err);
Expand Down
10 changes: 10 additions & 0 deletions ydb/library/yql/dq/actors/compute/retry_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,16 @@ void TRetryEventsQueue::HandleNodeConnected(ui32 nodeId) {
}
}

bool TRetryEventsQueue::HandleUndelivered(NActors::TEvents::TEvUndelivered::TPtr& ev) {
if (ev->Sender == RecipientId && ev->Get()->Reason == NActors::TEvents::TEvUndelivered::Disconnected) {
Connected = false;
ScheduleRetry();
return true;
}

return false;
}

void TRetryEventsQueue::Retry() {
RetryScheduled = false;
if (!Connected) {
Expand Down
8 changes: 7 additions & 1 deletion ydb/library/yql/dq/actors/compute/retry_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,16 @@ class TRetryEventsQueue {
}
return false;
}

bool RemoveConfirmedEvents() {
RemoveConfirmedEvents(MyConfirmedSeqNo);
return !Events.empty();
}

void OnNewRecipientId(const NActors::TActorId& recipientId, bool unsubscribe = true);
void HandleNodeConnected(ui32 nodeId);
void HandleNodeDisconnected(ui32 nodeId);
bool HandleUndelivered(NActors::TEvents::TEvUndelivered::TPtr& ev);
void Retry();
void Unsubscribe();

Expand Down Expand Up @@ -160,7 +166,7 @@ class TRetryEventsQueue {
THolder<T> ev = MakeHolder<T>();
ev->Record = Event->Record;
ev->Record.MutableTransportMeta()->SetConfirmedSeqNo(confirmedSeqNo);
return MakeHolder<NActors::IEventHandle>(Recipient, Sender, ev.Release(), 0, Cookie);
return MakeHolder<NActors::IEventHandle>(Recipient, Sender, ev.Release(), NActors::IEventHandle::FlagTrackDelivery, Cookie);
}

private:
Expand Down
2 changes: 1 addition & 1 deletion ydb/library/yql/dq/integration/yql_dq_integration.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class IDqIntegration {
virtual bool CanBlockRead(const NNodes::TExprBase& node, TExprContext& ctx, TTypeAnnotationContext& typesCtx) = 0;
virtual void RegisterMkqlCompiler(NCommon::TMkqlCallableCompilerBase& compiler) = 0;
virtual bool CanFallback() = 0;
virtual void FillSourceSettings(const TExprNode& node, ::google::protobuf::Any& settings, TString& sourceType) = 0;
virtual void FillSourceSettings(const TExprNode& node, ::google::protobuf::Any& settings, TString& sourceType, size_t maxPartitions) = 0;
virtual void FillSinkSettings(const TExprNode& node, ::google::protobuf::Any& settings, TString& sinkType) = 0;
virtual void FillTransformSettings(const TExprNode& node, ::google::protobuf::Any& settings) = 0;
virtual void Annotate(const TExprNode& node, THashMap<TString, TString>& params) = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ class TClickHouseDqIntegration: public TDqIntegrationBase {
return 0ULL;
}

void FillSourceSettings(const TExprNode& node, ::google::protobuf::Any& protoSettings, TString& sourceType) override {
void FillSourceSettings(const TExprNode& node, ::google::protobuf::Any& protoSettings, TString& sourceType, size_t) override {
const TDqSource source(&node);
if (const auto maySettings = source.Settings().Maybe<TClSourceSettings>()) {
const auto settings = maySettings.Cast();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ bool TDqIntegrationBase::CanFallback() {
return false;
}

void TDqIntegrationBase::FillSourceSettings(const TExprNode&, ::google::protobuf::Any&, TString&) {
void TDqIntegrationBase::FillSourceSettings(const TExprNode&, ::google::protobuf::Any&, TString&, size_t) {
}

void TDqIntegrationBase::FillSinkSettings(const TExprNode&, ::google::protobuf::Any&, TString&) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class TDqIntegrationBase: public IDqIntegration {
bool CanBlockRead(const NNodes::TExprBase& node, TExprContext& ctx, TTypeAnnotationContext& typesCtx) override;
TExprNode::TPtr WrapWrite(const TExprNode::TPtr& write, TExprContext& ctx) override;
bool CanFallback() override;
void FillSourceSettings(const TExprNode& node, ::google::protobuf::Any& settings, TString& sourceType) override;
void FillSourceSettings(const TExprNode& node, ::google::protobuf::Any& settings, TString& sourceType, size_t) override;
void FillSinkSettings(const TExprNode& node, ::google::protobuf::Any& settings, TString& sinkType) override;
void FillTransformSettings(const TExprNode& node, ::google::protobuf::Any& settings) override;
void Annotate(const TExprNode& node, THashMap<TString, TString>& params) override;
Expand Down
2 changes: 1 addition & 1 deletion ydb/library/yql/providers/dq/planner/execution_planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,7 @@ namespace NYql::NDqs {
TString sourceType;
if (dqSource) {
sourceSettings.ConstructInPlace();
dqIntegration->FillSourceSettings(*read, *sourceSettings, sourceType);
dqIntegration->FillSourceSettings(*read, *sourceSettings, sourceType, maxPartitions);
YQL_ENSURE(!sourceSettings->type_url().empty(), "Data source provider \"" << dataSourceName << "\" did't fill dq source settings for its dq source node");
YQL_ENSURE(sourceType, "Data source provider \"" << dataSourceName << "\" did't fill dq source settings type for its dq source node");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ class TBuildDqSourceSettingsTransformer: public TOptimizeTransformerBase {
.Ptr();
::google::protobuf::Any settings;
TString sourceType;
dqIntegration->FillSourceSettings(*dqSourceNode, settings, sourceType);
dqIntegration->FillSourceSettings(*dqSourceNode, settings, sourceType, 1);
UNIT_ASSERT_STRINGS_EQUAL(sourceType, "PostgreSqlGeneric");
UNIT_ASSERT(settings.Is<Generic::TSource>());
settings.UnpackTo(DqSourceSettings_);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ namespace NYql {
}

void FillSourceSettings(const TExprNode& node, ::google::protobuf::Any& protoSettings,
TString& sourceType) override {
TString& sourceType, size_t) override {
const TDqSource source(&node);
if (const auto maybeSettings = source.Settings().Maybe<TGenSourceSettings>()) {
const auto settings = maybeSettings.Cast();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ class TPqDqIntegration: public TDqIntegrationBase {
}
}

void FillSourceSettings(const TExprNode& node, ::google::protobuf::Any& protoSettings, TString& sourceType) override {
void FillSourceSettings(const TExprNode& node, ::google::protobuf::Any& protoSettings, TString& sourceType, size_t) override {
if (auto maybeDqSource = TMaybeNode<TDqSource>(&node)) {
auto settings = maybeDqSource.Cast().Settings();
if (auto maybeTopicSource = TMaybeNode<TDqPqTopicSource>(settings.Raw())) {
Expand Down
3 changes: 3 additions & 0 deletions ydb/library/yql/providers/s3/actors/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ PEERDIR(
library/cpp/string_utils/base64
library/cpp/string_utils/quote
library/cpp/xml/document
ydb/core/base
ydb/core/fq/libs/events
ydb/library/yql/dq/actors/compute
ydb/library/yql/minikql/computation
Expand All @@ -37,6 +38,8 @@ PEERDIR(
ydb/library/yql/providers/s3/credentials
ydb/library/yql/providers/s3/object_listers
ydb/library/yql/providers/s3/proto
ydb/library/yql/providers/s3/range_helpers
ydb/library/yql/public/issue
ydb/library/yql/public/types
ydb/library/yql/udfs/common/clickhouse/client
)
Expand Down
Loading

0 comments on commit dd39aa2

Please sign in to comment.