Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable/disable ssl connections, return connection_string in API #7040

Merged
merged 1 commit into from
Jul 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions ydb/core/grpc_services/rpc_replication.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

#include <google/protobuf/util/time_util.h>

#include <util/string/builder.h>

namespace NKikimr::NGRpcService {

using namespace Ydb;
Expand Down Expand Up @@ -138,9 +140,18 @@ class TDescribeReplicationRPC: public TRpcSchemeRequestActor<TDescribeReplicatio
return ReplyWithResult(Ydb::StatusIds::SUCCESS, Result, ctx);
}

static TString BuildConnectionString(const NKikimrReplication::TConnectionParams& params) {
return TStringBuilder()
<< (params.GetEnableSsl() ? "grpcs://" : "grpc://")
<< params.GetEndpoint()
<< "/?database=" << params.GetDatabase();
}

static void ConvertConnectionParams(const NKikimrReplication::TConnectionParams& from, Ydb::Replication::ConnectionParams& to) {
to.set_endpoint(from.GetEndpoint());
to.set_database(from.GetDatabase());
to.set_enable_ssl(from.GetEnableSsl());
to.set_connection_string(BuildConnectionString(from));

switch (from.GetCredentialsCase()) {
case NKikimrReplication::TConnectionParams::kStaticCredentials:
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/host/kqp_gateway_proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1912,6 +1912,7 @@ class TKqpGatewayProxy : public IKikimrGateway {
const auto parseResult = NYdb::ParseConnectionString(*connectionString);
params.SetEndpoint(parseResult.Endpoint);
params.SetDatabase(parseResult.Database);
params.SetEnableSsl(parseResult.EnableSsl);
}
if (const auto& endpoint = settings.Settings.Endpoint) {
params.SetEndpoint(*endpoint);
Expand Down
55 changes: 55 additions & 0 deletions ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6365,6 +6365,61 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
}
}

void AsyncReplicationConnectionParams(TKikimrRunner& kikimr, const TString& connectionParam, bool ssl = false) {
using namespace NReplication;

auto repl = TReplicationClient(kikimr.GetDriver(), TCommonClientSettings().Database("/Root"));
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();

{
auto query = R"(
--!syntax_v1
CREATE TABLE `/Root/table` (Key Uint64, Value String, PRIMARY KEY (Key));
)";

auto result = session.ExecuteSchemeQuery(query).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}
{
auto query = Sprintf(R"(
--!syntax_v1
CREATE ASYNC REPLICATION `/Root/replication` FOR
`/Root/table` AS `/Root/replica`
WITH (
%s, TOKEN = "root@builtin"
);
)", connectionParam.c_str());

const auto result = session.ExecuteSchemeQuery(query).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}
{
const auto result = repl.DescribeReplication("/Root/replication").ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());

const auto& params = result.GetReplicationDescription().GetConnectionParams();
UNIT_ASSERT_VALUES_EQUAL(params.GetDiscoveryEndpoint(), kikimr.GetEndpoint());
UNIT_ASSERT_VALUES_EQUAL(params.GetDatabase(), "/Root");
UNIT_ASSERT_VALUES_EQUAL(params.GetEnableSsl(), ssl);
}
}

Y_UNIT_TEST(AsyncReplicationConnectionString) {
TKikimrRunner kikimr;
AsyncReplicationConnectionParams(kikimr, Sprintf(R"(CONNECTION_STRING = "grpc://%s/?database=/Root")", kikimr.GetEndpoint().c_str()));
}

Y_UNIT_TEST(AsyncReplicationConnectionStringWithSsl) {
TKikimrRunner kikimr;
AsyncReplicationConnectionParams(kikimr, Sprintf(R"(CONNECTION_STRING = "grpcs://%s/?database=/Root")", kikimr.GetEndpoint().c_str()), true);
}

Y_UNIT_TEST(AsyncReplicationEndpointAndDatabase) {
TKikimrRunner kikimr;
AsyncReplicationConnectionParams(kikimr, Sprintf(R"(ENDPOINT = "%s", DATABASE = "/Root")", kikimr.GetEndpoint().c_str()));
}

Y_UNIT_TEST(DisableResourcePools) {
TKikimrRunner kikimr(TKikimrSettings().SetEnableResourcePools(false));
auto db = kikimr.GetTableClient();
Expand Down
1 change: 1 addition & 0 deletions ydb/core/protos/replication.proto
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ message TOAuthToken {
message TConnectionParams {
optional string Endpoint = 1;
optional string Database = 2;
optional bool EnableSsl = 5;
// credentials
oneof Credentials {
TStaticCredentials StaticCredentials = 3;
Expand Down
7 changes: 5 additions & 2 deletions ydb/core/tx/replication/controller/replication.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,19 +115,22 @@ class TReplication::TImpl: public TLagProvider {
if (!YdbProxy && !(State == EState::Removing && !Targets)) {
THolder<IActor> ydbProxy;
const auto& params = Config.GetSrcConnectionParams();
const auto& endpoint = params.GetEndpoint();
const auto& database = params.GetDatabase();
const bool ssl = params.GetEnableSsl();

switch (params.GetCredentialsCase()) {
case NKikimrReplication::TConnectionParams::kStaticCredentials:
if (!params.GetStaticCredentials().HasPassword()) {
return ResolveSecret(params.GetStaticCredentials().GetPasswordSecretName(), ctx);
}
ydbProxy.Reset(CreateYdbProxy(params.GetEndpoint(), params.GetDatabase(), params.GetStaticCredentials()));
ydbProxy.Reset(CreateYdbProxy(endpoint, database, ssl, params.GetStaticCredentials()));
break;
case NKikimrReplication::TConnectionParams::kOAuthToken:
if (!params.GetOAuthToken().HasToken()) {
return ResolveSecret(params.GetOAuthToken().GetTokenSecretName(), ctx);
}
ydbProxy.Reset(CreateYdbProxy(params.GetEndpoint(), params.GetDatabase(), params.GetOAuthToken().GetToken()));
ydbProxy.Reset(CreateYdbProxy(endpoint, database, ssl, params.GetOAuthToken().GetToken()));
break;
default:
ErrorState(TStringBuilder() << "Unexpected credentials: " << params.GetCredentialsCase());
Expand Down
22 changes: 15 additions & 7 deletions ydb/core/tx/replication/service/service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,9 @@ class TSessionInfo {

}; // TSessionInfo

struct TCredentialsKey: std::tuple<TString, TString, TString> {
explicit TCredentialsKey(const TString& endpoint, const TString& database, const TString& user)
: std::tuple<TString, TString, TString>(endpoint, database, user)
struct TCredentialsKey: std::tuple<TString, TString, bool, TString> {
explicit TCredentialsKey(const TString& endpoint, const TString& database, bool ssl, const TString& user)
: std::tuple<TString, TString, bool, TString>(endpoint, database, ssl, user)
{
}

Expand All @@ -139,12 +139,20 @@ struct TCredentialsKey: std::tuple<TString, TString, TString> {
return std::get<1>(*this);
}

bool EnableSsl() const {
return std::get<2>(*this);
}

static TCredentialsKey FromParams(const NKikimrReplication::TConnectionParams& params) {
const auto& endpoint = params.GetEndpoint();
const auto& database = params.GetDatabase();
const bool ssl = params.GetEnableSsl();

switch (params.GetCredentialsCase()) {
case NKikimrReplication::TConnectionParams::kStaticCredentials:
return TCredentialsKey(params.GetEndpoint(), params.GetDatabase(), params.GetStaticCredentials().GetUser());
return TCredentialsKey(endpoint, database, ssl, params.GetStaticCredentials().GetUser());
case NKikimrReplication::TConnectionParams::kOAuthToken:
return TCredentialsKey(params.GetEndpoint(), params.GetDatabase(), params.GetOAuthToken().GetToken() /* TODO */);
return TCredentialsKey(endpoint, database, ssl, params.GetOAuthToken().GetToken());
default:
Y_ABORT("Unexpected credentials");
}
Expand All @@ -155,7 +163,7 @@ struct TCredentialsKey: std::tuple<TString, TString, TString> {
} // NKikimr::NReplication::NService

template <>
struct THash<NKikimr::NReplication::NService::TCredentialsKey> : THash<std::tuple<TString, TString, TString>> {};
struct THash<NKikimr::NReplication::NService::TCredentialsKey> : THash<std::tuple<TString, TString, bool, TString>> {};

namespace NKikimr::NReplication {

Expand Down Expand Up @@ -212,7 +220,7 @@ class TReplicationService: public TActorBootstrapped<TReplicationService> {
const TActorId& GetOrCreateYdbProxy(TCredentialsKey&& key, Args&&... args) {
auto it = YdbProxies.find(key);
if (it == YdbProxies.end()) {
auto ydbProxy = Register(CreateYdbProxy(key.Endpoint(), key.Database(), std::forward<Args>(args)...));
auto ydbProxy = Register(CreateYdbProxy(key.Endpoint(), key.Database(), key.EnableSsl(), std::forward<Args>(args)...));
CyberROFL marked this conversation as resolved.
Show resolved Hide resolved
auto res = YdbProxies.emplace(std::move(key), std::move(ydbProxy));
Y_ABORT_UNLESS(res.second);
it = res.first;
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/replication/ut_helpers/test_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class TEnv {
Database = "/" + ToString(DomainName);

YdbProxy = Server.GetRuntime()->Register(CreateYdbProxy(
Endpoint, UseDatabase ? Database : "", std::forward<Args>(args)...));
Endpoint, UseDatabase ? Database : "", false /* ssl */, std::forward<Args>(args)...));
Sender = Server.GetRuntime()->AllocateEdgeActor();
}

Expand Down
25 changes: 13 additions & 12 deletions ydb/core/tx/replication/ydb_proxy/ydb_proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -419,20 +419,21 @@ class TYdbProxy: public TBaseProxyActor<TYdbProxy> {
Call<TEvYdbProxy::TEvCommitOffsetResponse>(ev, &TTopicClient::CommitOffset);
}

static TCommonClientSettings MakeSettings(const TString& endpoint, const TString& database) {
static TCommonClientSettings MakeSettings(const TString& endpoint, const TString& database, bool ssl) {
return TCommonClientSettings()
.DiscoveryEndpoint(endpoint)
.DiscoveryMode(EDiscoveryMode::Async)
.Database(database);
.Database(database)
.SslCredentials(ssl);
}

static TCommonClientSettings MakeSettings(const TString& endpoint, const TString& database, const TString& token) {
return MakeSettings(endpoint, database)
static TCommonClientSettings MakeSettings(const TString& endpoint, const TString& database, bool ssl, const TString& token) {
return MakeSettings(endpoint, database, ssl)
.AuthToken(token);
}

static TCommonClientSettings MakeSettings(const TString& endpoint, const TString& database, const TStaticCredentials& credentials) {
return MakeSettings(endpoint, database)
static TCommonClientSettings MakeSettings(const TString& endpoint, const TString& database, bool ssl, const TStaticCredentials& credentials) {
return MakeSettings(endpoint, database, ssl)
.CredentialsProviderFactory(CreateLoginCredentialsProviderFactory({
.User = credentials.GetUser(),
.Password = credentials.GetPassword(),
Expand Down Expand Up @@ -485,16 +486,16 @@ class TYdbProxy: public TBaseProxyActor<TYdbProxy> {

}; // TYdbProxy

IActor* CreateYdbProxy(const TString& endpoint, const TString& database) {
return new TYdbProxy(endpoint, database);
IActor* CreateYdbProxy(const TString& endpoint, const TString& database, bool ssl) {
return new TYdbProxy(endpoint, database, ssl);
}

IActor* CreateYdbProxy(const TString& endpoint, const TString& database, const TString& token) {
return new TYdbProxy(endpoint, database, token);
IActor* CreateYdbProxy(const TString& endpoint, const TString& database, bool ssl, const TString& token) {
return new TYdbProxy(endpoint, database, ssl, token);
}

IActor* CreateYdbProxy(const TString& endpoint, const TString& database, const TStaticCredentials& credentials) {
return new TYdbProxy(endpoint, database, credentials);
IActor* CreateYdbProxy(const TString& endpoint, const TString& database, bool ssl, const TStaticCredentials& credentials) {
return new TYdbProxy(endpoint, database, ssl, credentials);
}

}
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/tx/replication/ydb_proxy/ydb_proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -255,9 +255,9 @@ struct TEvYdbProxy {

#pragma pop_macro("RemoveDirectory")

IActor* CreateYdbProxy(const TString& endpoint, const TString& database);
IActor* CreateYdbProxy(const TString& endpoint, const TString& database, const TString& token);
IActor* CreateYdbProxy(const TString& endpoint, const TString& database,
IActor* CreateYdbProxy(const TString& endpoint, const TString& database, bool ssl);
IActor* CreateYdbProxy(const TString& endpoint, const TString& database, bool ssl, const TString& token);
IActor* CreateYdbProxy(const TString& endpoint, const TString& database, bool ssl,
const NKikimrReplication::TStaticCredentials& credentials);

}
2 changes: 2 additions & 0 deletions ydb/public/api/protos/draft/ydb_replication.proto
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ message ConnectionParams {

string endpoint = 1;
string database = 2;
bool enable_ssl = 5;
string connection_string = 6;

oneof credentials {
StaticCredentials static_credentials = 3;
Expand Down
5 changes: 5 additions & 0 deletions ydb/public/sdk/cpp/client/draft/ydb_replication.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ namespace NReplication {
TConnectionParams::TConnectionParams(const Ydb::Replication::ConnectionParams& params) {
DiscoveryEndpoint(params.endpoint());
Database(params.database());
SslCredentials(params.enable_ssl());

switch (params.credentials_case()) {
case Ydb::Replication::ConnectionParams::kStaticCredentials:
Expand Down Expand Up @@ -47,6 +48,10 @@ const TString& TConnectionParams::GetDatabase() const {
return *Database_;
}

bool TConnectionParams::GetEnableSsl() const {
return SslCredentials_->IsEnabled;
}

TConnectionParams::ECredentials TConnectionParams::GetCredentials() const {
return static_cast<ECredentials>(Credentials_.index());
}
Expand Down
1 change: 1 addition & 0 deletions ydb/public/sdk/cpp/client/draft/ydb_replication.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class TConnectionParams: private TCommonClientSettings {

const TString& GetDiscoveryEndpoint() const;
const TString& GetDatabase() const;
bool GetEnableSsl() const;

ECredentials GetCredentials() const;
const TStaticCredentials& GetStaticCredentials() const;
Expand Down
Loading