Skip to content

Commit

Permalink
Yq 3322 Shared reading (to stable) (#9915)
Browse files Browse the repository at this point in the history
Co-authored-by: Pisarenko Grigoriy <grigoriypisar@ydb.tech>
  • Loading branch information
kardymonds and GrigoriyPA authored Oct 2, 2024
1 parent 97e9a76 commit afee6e2
Show file tree
Hide file tree
Showing 104 changed files with 7,578 additions and 247 deletions.
1 change: 1 addition & 0 deletions ydb/core/fq/libs/actors/clusters_from_connections.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ void FillPqClusterConfig(NYql::TPqClusterConfig& clusterConfig,
clusterConfig.SetUseSsl(ds.secure());
clusterConfig.SetAddBearerToToken(useBearerForYdb);
clusterConfig.SetClusterType(TPqClusterConfig::CT_DATA_STREAMS);
clusterConfig.SetSharedReading(ds.shared_reading());
FillClusterAuth(clusterConfig, ds.auth(), authToken, accountIdSignatures);
}

Expand Down
7 changes: 7 additions & 0 deletions ydb/core/fq/libs/actors/logging/log.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,13 @@
#define LOG_STREAMS_STORAGE_SERVICE_AS_WARN(actorSystem, logRecordStream) LOG_STREAMS_IMPL_AS(actorSystem, WARN, STREAMS_STORAGE_SERVICE, logRecordStream)
#define LOG_STREAMS_STORAGE_SERVICE_AS_ERROR(actorSystem, logRecordStream) LOG_STREAMS_IMPL_AS(actorSystem, ERROR, STREAMS_STORAGE_SERVICE, logRecordStream)

// Component: ROW_DISPATCHER.
#define LOG_ROW_DISPATCHER_TRACE(logRecordStream) LOG_STREAMS_IMPL(TRACE, FQ_ROW_DISPATCHER, LogPrefix << logRecordStream)
#define LOG_ROW_DISPATCHER_DEBUG(logRecordStream) LOG_STREAMS_IMPL(DEBUG, FQ_ROW_DISPATCHER, LogPrefix << logRecordStream)
#define LOG_ROW_DISPATCHER_INFO(logRecordStream) LOG_STREAMS_IMPL(INFO, FQ_ROW_DISPATCHER, LogPrefix << logRecordStream)
#define LOG_ROW_DISPATCHER_WARN(logRecordStream) LOG_STREAMS_IMPL(WARN, FQ_ROW_DISPATCHER, LogPrefix << logRecordStream)
#define LOG_ROW_DISPATCHER_ERROR(logRecordStream) LOG_STREAMS_IMPL(ERROR, FQ_ROW_DISPATCHER, LogPrefix << logRecordStream)

// Component: STREAMS_SCHEDULER_SERVICE.
#define LOG_STREAMS_SCHEDULER_SERVICE_EMERG(logRecordStream) LOG_STREAMS_IMPL(EMERG, STREAMS_SCHEDULER_SERVICE, logRecordStream)
#define LOG_STREAMS_SCHEDULER_SERVICE_ALERT(logRecordStream) LOG_STREAMS_IMPL(ALERT, STREAMS_SCHEDULER_SERVICE, logRecordStream)
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/fq/libs/checkpointing/checkpoint_coordinator.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
#include <ydb/public/api/protos/draft/fq.pb.h>

#include <ydb/library/yql/dq/actors/compute/dq_compute_actor.h>
#include <ydb/library/yql/dq/actors/compute/retry_queue.h>
#include <ydb/library/yql/dq/actors/common/retry_queue.h>
#include <ydb/library/yql/providers/dq/actors/events.h>
#include <ydb/library/yql/providers/dq/actors/task_controller_impl.h>

Expand Down
2 changes: 2 additions & 0 deletions ydb/core/fq/libs/config/protos/fq_config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import "ydb/core/fq/libs/config/protos/quotas_manager.proto";
import "ydb/core/fq/libs/config/protos/rate_limiter.proto";
import "ydb/core/fq/libs/config/protos/read_actors_factory.proto";
import "ydb/core/fq/libs/config/protos/resource_manager.proto";
import "ydb/core/fq/libs/config/protos/row_dispatcher.proto";
import "ydb/core/fq/libs/config/protos/test_connection.proto";
import "ydb/core/fq/libs/config/protos/token_accessor.proto";
import "ydb/library/folder_service/proto/config.proto";
Expand Down Expand Up @@ -53,4 +54,5 @@ message TConfig {
TRateLimiterConfig RateLimiter = 22;
bool EnableTaskCounters = 23;
TComputeConfig Compute = 24;
TRowDispatcherConfig RowDispatcher = 25;
}
23 changes: 23 additions & 0 deletions ydb/core/fq/libs/config/protos/row_dispatcher.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
syntax = "proto3";
option cc_enable_arenas = true;

package NFq.NConfig;
option java_package = "ru.yandex.kikimr.proto";

import "ydb/core/fq/libs/config/protos/storage.proto";

////////////////////////////////////////////////////////////

message TRowDispatcherCoordinatorConfig {
TYdbStorageConfig Database = 1;
string CoordinationNodePath = 2;
}
message TRowDispatcherConfig {
bool Enabled = 1;
uint64 TimeoutBeforeStartSessionSec = 2;
uint64 SendStatusPeriodSec = 3;
uint64 MaxSessionUsedMemory = 4;
bool WithoutConsumer = 5;
TRowDispatcherCoordinatorConfig Coordinator = 6;

}
1 change: 1 addition & 0 deletions ydb/core/fq/libs/config/protos/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ SRCS(
rate_limiter.proto
read_actors_factory.proto
resource_manager.proto
row_dispatcher.proto
storage.proto
test_connection.proto
token_accessor.proto
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/fq/libs/events/event_subspace.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ struct TYqEventSubspace {
ControlPlaneConfig,
YdbCompute,
TableOverFq,

RowDispatcher,
SubspacesEnd,
};

Expand Down
2 changes: 2 additions & 0 deletions ydb/core/fq/libs/events/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@ PEERDIR(
ydb/library/actors/core
ydb/core/fq/libs/graph_params/proto
ydb/core/fq/libs/protos
ydb/core/fq/libs/row_dispatcher/protos
ydb/library/yql/core/facade
ydb/library/yql/providers/common/db_id_async_resolver
ydb/library/yql/providers/dq/provider
ydb/library/yql/providers/pq/proto
ydb/library/yql/public/issue
ydb/public/api/protos
ydb/public/sdk/cpp/client/ydb_table
Expand Down
13 changes: 13 additions & 0 deletions ydb/core/fq/libs/init/init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <ydb/core/fq/libs/rate_limiter/events/control_plane_events.h>
#include <ydb/core/fq/libs/rate_limiter/events/data_plane.h>
#include <ydb/core/fq/libs/rate_limiter/quoter_service/quoter_service.h>
#include <ydb/core/fq/libs/row_dispatcher/row_dispatcher_service.h>
#include <ydb/core/fq/libs/shared_resources/shared_resources.h>
#include <ydb/core/fq/libs/test_connection/test_connection.h>

Expand Down Expand Up @@ -193,6 +194,18 @@ void Init(
credentialsFactory = NYql::CreateSecuredServiceAccountCredentialsOverTokenAccessorFactory(tokenAccessorConfig.GetEndpoint(), tokenAccessorConfig.GetUseSsl(), caContent, tokenAccessorConfig.GetConnectionPoolSize());
}

if (protoConfig.GetRowDispatcher().GetEnabled()) {
auto rowDispatcher = NFq::NewRowDispatcherService(
protoConfig.GetRowDispatcher(),
protoConfig.GetCommon(),
NKikimr::CreateYdbCredentialsProviderFactory,
yqSharedResources,
credentialsFactory,
tenant,
yqCounters->GetSubgroup("subsystem", "row_dispatcher"));
actorRegistrator(NFq::RowDispatcherServiceActorId(), rowDispatcher.release());
}

auto s3ActorsFactory = NYql::NDq::CreateS3ActorsFactory();

if (protoConfig.GetPrivateApi().GetEnabled()) {
Expand Down
1 change: 1 addition & 0 deletions ydb/core/fq/libs/init/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ PEERDIR(
ydb/core/fq/libs/quota_manager
ydb/core/fq/libs/rate_limiter/control_plane_service
ydb/core/fq/libs/rate_limiter/quoter_service
ydb/core/fq/libs/row_dispatcher
ydb/core/fq/libs/shared_resources
ydb/core/fq/libs/test_connection
ydb/core/protos
Expand Down
37 changes: 37 additions & 0 deletions ydb/core/fq/libs/row_dispatcher/actors_factory.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
#include <ydb/core/fq/libs/row_dispatcher/actors_factory.h>

#include <ydb/core/fq/libs/row_dispatcher/topic_session.h>

namespace NFq::NRowDispatcher {


struct TActorFactory : public IActorFactory {
TActorFactory() {}

NActors::TActorId RegisterTopicSession(
const TString& topicPath,
const NConfig::TRowDispatcherConfig& config,
NActors::TActorId rowDispatcherActorId,
ui32 partitionId,
NYdb::TDriver driver,
std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory,
const ::NMonitoring::TDynamicCounterPtr& counters) const override {

auto actorPtr = NFq::NewTopicSession(
topicPath,
config,
rowDispatcherActorId,
partitionId,
std::move(driver),
credentialsProviderFactory,
counters
);
return NActors::TlsActivationContext->ExecutorThread.RegisterActor(actorPtr.release(), NActors::TMailboxType::HTSwap, Max<ui32>());
}
};

IActorFactory::TPtr CreateActorFactory() {
return MakeIntrusive<TActorFactory>();
}

}
25 changes: 25 additions & 0 deletions ydb/core/fq/libs/row_dispatcher/actors_factory.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#pragma once

#include <ydb/core/fq/libs/config/protos/row_dispatcher.pb.h>
#include <util/generic/ptr.h>
#include <ydb/library/actors/core/actor.h>
#include <ydb/public/sdk/cpp/client/ydb_driver/driver.h>

namespace NFq::NRowDispatcher {

struct IActorFactory : public TThrRefBase {
using TPtr = TIntrusivePtr<IActorFactory>;

virtual NActors::TActorId RegisterTopicSession(
const TString& topicPath,
const NConfig::TRowDispatcherConfig& config,
NActors::TActorId rowDispatcherActorId,
ui32 partitionId,
NYdb::TDriver driver,
std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory,
const ::NMonitoring::TDynamicCounterPtr& counters) const = 0;
};

IActorFactory::TPtr CreateActorFactory();

}
Loading

0 comments on commit afee6e2

Please sign in to comment.