Skip to content

Commit

Permalink
Fix memory leak in external data source manager (ydb-platform#3961)
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA authored and kardymonds committed Jun 10, 2024
1 parent 28ec4dd commit d6358b0
Showing 1 changed file with 24 additions and 18 deletions.
42 changes: 24 additions & 18 deletions ydb/core/kqp/gateway/behaviour/external_data_source/manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,22 +182,25 @@ NThreading::TFuture<TExternalDataSourceManager::TYqlConclusionStatus> TExternalD
TInternalModificationContext& context) const {
using TRequest = TEvTxUserProxy::TEvProposeTransaction;

auto ev = MakeHolder<TRequest>();
ev->Record.SetDatabaseName(context.GetExternalData().GetDatabase());
if (context.GetExternalData().GetUserToken()) {
ev->Record.SetUserToken(context.GetExternalData().GetUserToken()->GetSerializedToken());
}

auto& schemeTx = *ev->Record.MutableTransaction()->MutableModifyScheme();
NKikimrSchemeOp::TModifyScheme schemeTx;
FillCreateExternalDataSourceCommand(schemeTx, settings, context);

auto validationFuture = ValidateCreateExternalDatasource(schemeTx.GetCreateExternalDataSource(), context);

return validationFuture.Apply([ev = ev.Release(), context, schemeTx](const NThreading::TFuture<TExternalDataSourceManager::TYqlConclusionStatus>& f) {
return validationFuture.Apply([context, schemeTx](const NThreading::TFuture<TExternalDataSourceManager::TYqlConclusionStatus>& f) {
if (const auto& value = f.GetValue(); value.IsFail()) {
return NThreading::MakeFuture(value);
}
return SendSchemeRequest(ev, context.GetExternalData().GetActorSystem(), schemeTx.GetFailedOnAlreadyExists(), schemeTx.GetSuccessOnNotExist());

auto ev = MakeHolder<TRequest>();
ev->Record.SetDatabaseName(context.GetExternalData().GetDatabase());
if (context.GetExternalData().GetUserToken()) {
ev->Record.SetUserToken(context.GetExternalData().GetUserToken()->GetSerializedToken());
}

*ev->Record.MutableTransaction()->MutableModifyScheme() = schemeTx;

return SendSchemeRequest(ev.Release(), context.GetExternalData().GetActorSystem(), schemeTx.GetFailedOnAlreadyExists(), schemeTx.GetSuccessOnNotExist());
});
}

Expand Down Expand Up @@ -256,14 +259,8 @@ NThreading::TFuture<NMetadata::NModifications::IOperationsManager::TYqlConclusio
const ui32 /*nodeId*/, const NMetadata::IClassBehaviour::TPtr& /*manager*/, const IOperationsManager::TExternalModificationContext& context) const {
using TRequest = TEvTxUserProxy::TEvProposeTransaction;

auto ev = MakeHolder<TRequest>();
ev->Record.SetDatabaseName(context.GetDatabase());
if (context.GetUserToken()) {
ev->Record.SetUserToken(context.GetUserToken()->GetSerializedToken());
}

auto validationFuture = NThreading::MakeFuture(TExternalDataSourceManager::TYqlConclusionStatus::Success());
auto& schemeTx = *ev->Record.MutableTransaction()->MutableModifyScheme();
NKikimrSchemeOp::TModifyScheme schemeTx;
switch (schemeOperation.GetOperationCase()) {
case NKqpProto::TKqpSchemeOperation::kCreateExternalDataSource: {
const auto& createExternalDataSource = schemeOperation.GetCreateExternalDataSource();
Expand All @@ -282,11 +279,20 @@ NThreading::TFuture<NMetadata::NModifications::IOperationsManager::TYqlConclusio
TStringBuilder() << "Execution of prepare operation for EXTERNAL_DATA_SOURCE object: unsupported operation: " << int(schemeOperation.GetOperationCase())));
}

return validationFuture.Apply([ev = ev.Release(), context, schemeTx](const NThreading::TFuture<TExternalDataSourceManager::TYqlConclusionStatus>& f) {
return validationFuture.Apply([context, schemeTx](const NThreading::TFuture<TExternalDataSourceManager::TYqlConclusionStatus>& f) {
if (const auto& value = f.GetValue(); value.IsFail()) {
return NThreading::MakeFuture(value);
}
return SendSchemeRequest(ev, context.GetActorSystem(), schemeTx.GetFailedOnAlreadyExists(), schemeTx.GetSuccessOnNotExist());

auto ev = MakeHolder<TRequest>();
ev->Record.SetDatabaseName(context.GetDatabase());
if (context.GetUserToken()) {
ev->Record.SetUserToken(context.GetUserToken()->GetSerializedToken());
}

*ev->Record.MutableTransaction()->MutableModifyScheme() = schemeTx;

return SendSchemeRequest(ev.Release(), context.GetActorSystem(), schemeTx.GetFailedOnAlreadyExists(), schemeTx.GetSuccessOnNotExist());
});
}

Expand Down

0 comments on commit d6358b0

Please sign in to comment.