Skip to content

Commit

Permalink
[#3220] Extract CleanupTransaction from YBTransaction
Browse files Browse the repository at this point in the history
Summary:
When sealed transaction is applied, coordinator cleans up its intents.
We already have such functionality in YBTransaction, i.e. client part of transaction.

So extracted it to separate utility function.

Test Plan: Jenkins

Reviewers: dmitry, mikhail, timur

Reviewed By: timur

Subscribers: ybase, bogdan

Differential Revision: https://phabricator.dev.yugabyte.com/D7744
  • Loading branch information
spolitov committed Dec 26, 2019
1 parent beac0ea commit 1166e5a
Show file tree
Hide file tree
Showing 5 changed files with 181 additions and 63 deletions.
1 change: 1 addition & 0 deletions src/yb/client/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ set(CLIENT_SRCS
table_handle.cc
tablet_rpc.cc
transaction.cc
transaction_cleanup.cc
transaction_manager.cc
transaction_pool.cc
transaction_rpc.cc
Expand Down
67 changes: 4 additions & 63 deletions src/yb/client/transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "yb/client/in_flight_op.h"
#include "yb/client/meta_cache.h"
#include "yb/client/tablet_rpc.h"
#include "yb/client/transaction_cleanup.h"
#include "yb/client/transaction_manager.h"
#include "yb/client/transaction_rpc.h"
#include "yb/client/yb_op.h"
Expand Down Expand Up @@ -596,60 +597,9 @@ class YBTransaction::Impl final {
tablet_ids.assign(tablets_with_metadata_.begin(), tablets_with_metadata_.end());
}

for (const auto& tablet_id : tablet_ids) {
manager_->client()->LookupTabletById(
tablet_id,
TransactionRpcDeadline(),
std::bind(&Impl::LookupTabletForCleanupDone, this, _1, transaction),
client::UseCache::kTrue);
}
}

void LookupTabletForCleanupDone(const Result<internal::RemoteTabletPtr>& remote_tablet,
const YBTransactionPtr& transaction) {
if (!remote_tablet.ok()) {
// Intents will be cleaned up later in this case.
LOG(WARNING) << "Tablet lookup failed: " << remote_tablet.status();
return;
}
VLOG_WITH_PREFIX(1) << "Lookup tablet for cleanup done: " << yb::ToString(*remote_tablet);
auto remote_tablet_servers = (**remote_tablet).GetRemoteTabletServers(
internal::IncludeFailedReplicas::kTrue);

constexpr auto kCallTimeout = 15s;
auto now = manager_->Now().ToUint64();

{
std::unique_lock<std::mutex> lock(mutex_);
abort_requests_.reserve(abort_requests_.size() + remote_tablet_servers.size());
for (auto* server : remote_tablet_servers) {
VLOG_WITH_PREFIX(2) << "Sending cleanup to: " << yb::ToString(*server);
auto status = server->InitProxy(manager_->client());
if (!status.ok()) {
LOG(WARNING) << "Failed to init proxy to " << server->ToString() << ": " << status;
continue;
}
abort_requests_.emplace_back();
auto& abort_request = abort_requests_.back();

auto& request = abort_request.request;
request.set_tablet_id((**remote_tablet).tablet_id());
request.set_propagated_hybrid_time(now);
auto& state = *request.mutable_state();
state.set_transaction_id(metadata_.transaction_id.begin(), metadata_.transaction_id.size());
state.set_status(TransactionStatus::CLEANUP);

abort_request.controller.set_timeout(kCallTimeout);

server->proxy()->UpdateTransactionAsync(
request, &abort_request.response, &abort_request.controller,
std::bind(&Impl::ProcessResponse, this, transaction));
}
}
}

void ProcessResponse(const YBTransactionPtr& transaction) {
VLOG_WITH_PREFIX(3) << "Cleanup intents for Abort done";
CleanupTransaction(
manager_->client(), manager_->clock(), metadata_.transaction_id, Sealed::kFalse,
tablet_ids);
}

void CommitDone(const Status& status,
Expand Down Expand Up @@ -916,15 +866,6 @@ class YBTransaction::Impl final {
rpc::Rpcs::Handle commit_handle_;
rpc::Rpcs::Handle abort_handle_;

// RPC data for abort requests.
struct AbortRequest {
tserver::UpdateTransactionRequestPB request;
tserver::UpdateTransactionResponsePB response;
rpc::RpcController controller;
};

boost::container::stable_vector<AbortRequest> abort_requests_;

typedef std::unordered_set<TabletId> TabletIds;

std::mutex mutex_;
Expand Down
127 changes: 127 additions & 0 deletions src/yb/client/transaction_cleanup.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
// Copyright (c) YugaByte, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
// in compliance with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations
// under the License.
//

#include "yb/client/transaction_cleanup.h"

#include "yb/client/client.h"
#include "yb/client/meta_cache.h"

#include "yb/tserver/tserver_service.proxy.h"

using namespace std::literals;
using namespace std::placeholders;

namespace yb {
namespace client {

namespace {

class TransactionCleanup : public std::enable_shared_from_this<TransactionCleanup> {
public:
TransactionCleanup(
YBClient* client, const scoped_refptr<ClockBase>& clock, const TransactionId& transaction_id,
Sealed sealed)
: client_(client), clock_(clock), transaction_id_(transaction_id), sealed_(sealed) {
}

void Perform(const std::vector<TabletId>& tablet_ids) {
auto self = shared_from_this();
for (const auto& tablet_id : tablet_ids) {
client_->LookupTabletById(
tablet_id,
TransactionRpcDeadline(),
std::bind(&TransactionCleanup::LookupTabletDone, this, _1, self),
client::UseCache::kTrue);
}
}

private:
void LookupTabletDone(const Result<internal::RemoteTabletPtr>& remote_tablet,
const std::shared_ptr<TransactionCleanup>& self) {
if (!remote_tablet.ok()) {
// Intents will be cleaned up later in this case.
LOG_WITH_PREFIX(WARNING) << "Tablet lookup failed: " << remote_tablet.status();
return;
}
VLOG_WITH_PREFIX(1) << "Lookup tablet for cleanup done: " << yb::ToString(*remote_tablet);
auto remote_tablet_servers = (**remote_tablet).GetRemoteTabletServers(
internal::IncludeFailedReplicas::kTrue);

constexpr auto kCallTimeout = 15s;
auto now = clock_->Now().ToUint64();

const auto& tablet_id = (**remote_tablet).tablet_id();

std::lock_guard<std::mutex> lock(mutex_);
calls_.reserve(calls_.size() + remote_tablet_servers.size());
for (auto* server : remote_tablet_servers) {
VLOG_WITH_PREFIX(2) << "Sending cleanup to T " << (**remote_tablet).tablet_id() << " P "
<< server->permanent_uuid();
auto status = server->InitProxy(client_);
if (!status.ok()) {
LOG_WITH_PREFIX(WARNING) << "Failed to init proxy to " << server->ToString() << ": "
<< status;
continue;
}
calls_.emplace_back();
auto& call = calls_.back();

auto& request = call.request;
request.set_tablet_id(tablet_id);
request.set_propagated_hybrid_time(now);
auto& state = *request.mutable_state();
state.set_transaction_id(transaction_id_.begin(), transaction_id_.size());
state.set_status(TransactionStatus::CLEANUP);
state.set_sealed(sealed_);

call.controller.set_timeout(kCallTimeout);

server->proxy()->UpdateTransactionAsync(
request, &call.response, &call.controller,
[this, self, remote_tablet = *remote_tablet, server] {
VLOG_WITH_PREFIX(3) << "Cleaned intents at T " << remote_tablet->tablet_id() << " P "
<< server->permanent_uuid();
});
}
}

std::string LogPrefix() const {
return Format("ID $0: ", transaction_id_);
}

struct Call {
tserver::UpdateTransactionRequestPB request;
tserver::UpdateTransactionResponsePB response;
rpc::RpcController controller;
};

YBClient* const client_;
const scoped_refptr<ClockBase> clock_;
const TransactionId transaction_id_;
const Sealed sealed_;

std::mutex mutex_;
boost::container::stable_vector<Call> calls_ GUARDED_BY(mutex_);
};

} // namespace

void CleanupTransaction(
YBClient* client, const scoped_refptr<ClockBase>& clock, const TransactionId& transaction_id,
Sealed sealed, const std::vector<TabletId>& tablets) {
auto cleanup = std::make_shared<TransactionCleanup>(client, clock, transaction_id, sealed);
cleanup->Perform(tablets);
}

} // namespace client
} // namespace yb
45 changes: 45 additions & 0 deletions src/yb/client/transaction_cleanup.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright (c) YugaByte, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
// in compliance with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations
// under the License.
//

#ifndef YB_CLIENT_TRANSACTION_CLEANUP_H
#define YB_CLIENT_TRANSACTION_CLEANUP_H

#include <memory>

#include <boost/container/stable_vector.hpp>

#include "yb/client/client_fwd.h"

#include "yb/common/common_fwd.h"
#include "yb/common/entity_ids.h"
#include "yb/common/transaction.h"

#include "yb/rpc/rpc_controller.h"

#include "yb/tserver/tserver_service.pb.h"

namespace yb {
namespace client {

YB_STRONGLY_TYPED_BOOL(Sealed);

// Sends cleanup intents request to provided tablets.
// sealed - whether transaction was previously sealed.
void CleanupTransaction(
YBClient* client, const scoped_refptr<ClockBase>& clock, const TransactionId& transaction_id,
Sealed sealed, const std::vector<TabletId>& tablets);

} // namespace client
} // namespace yb

#endif // YB_CLIENT_TRANSACTION_CLEANUP_H
4 changes: 4 additions & 0 deletions src/yb/tserver/tserver.proto
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,10 @@ message TransactionStatePB {

// Relevant only in APPLYING state.
optional fixed64 commit_hybrid_time = 4;

// Relevant only in CLEANUP status, true when we cleanup intents of applied transaction,
// that was previously sealed.
optional bool sealed = 6;
}

// Truncate tablet request.
Expand Down

0 comments on commit 1166e5a

Please sign in to comment.