Skip to content

Commit

Permalink
Refine local tunnel (#6463)
Browse files Browse the repository at this point in the history
ref #6225
  • Loading branch information
xzhangxian1008 authored Jan 10, 2023
1 parent 104f678 commit f9167f3
Show file tree
Hide file tree
Showing 16 changed files with 982 additions and 676 deletions.
1 change: 1 addition & 0 deletions dbms/src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ std::unordered_map<String, std::shared_ptr<FailPointChannel>> FailPointHelper::f
#define APPLY_FOR_RANDOM_FAILPOINTS(M) \
M(random_tunnel_wait_timeout_failpoint) \
M(random_tunnel_init_rpc_failure_failpoint) \
M(random_receiver_local_msg_push_failure_failpoint) \
M(random_receiver_sync_msg_push_failure_failpoint) \
M(random_receiver_async_msg_push_failure_failpoint) \
M(random_limit_check_failpoint) \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ struct MockReceiverContext
int source_index = 0;
int send_task_id = 0;
int recv_task_id = -1;
bool is_local = false;
};

struct Reader
Expand Down Expand Up @@ -201,22 +202,22 @@ struct MockReceiverContext
}
}

Request makeRequest(int index) const
static Request makeRequest(int index)
{
return {index, index, -1};
}

std::shared_ptr<Reader> makeReader(const Request &)
std::shared_ptr<Reader> makeSyncReader(const Request &)
{
return std::make_shared<Reader>(queue);
}

void cancelMPPTaskOnTiFlashStorageNode(LoggerPtr)
static void cancelMPPTaskOnTiFlashStorageNode(LoggerPtr)
{
throw Exception("cancelMPPTaskOnTiFlashStorageNode not implemented for MockReceiverContext");
}

void sendMPPTaskToTiFlashStorageNode(LoggerPtr, const std::vector<StorageDisaggregated::RequestAndRegionIDs> &)
static void sendMPPTaskToTiFlashStorageNode(LoggerPtr, const std::vector<StorageDisaggregated::RequestAndRegionIDs> &)
{
throw Exception("sendMPPTaskToTiFlashStorageNode not implemented for MockReceiverContext");
}
Expand All @@ -226,13 +227,16 @@ struct MockReceiverContext
return ::grpc::Status();
}

bool supportAsync(const Request &) const { return false; }
static bool supportAsync(const Request &) { return false; }

void makeAsyncReader(
const Request &,
std::shared_ptr<AsyncReader> &,
grpc::CompletionQueue *,
UnaryCallback<bool> *) const {}

void establishMPPConnectionLocal(const MockReceiverContext::Request &, size_t, LocalRequestHandler &, bool) {}

PacketQueuePtr queue;
std::vector<tipb::FieldType> field_types{};
};
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/FlashService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ grpc::Status FlashService::EstablishMPPConnection(grpc::ServerContext * grpc_con
{
Stopwatch stopwatch;
SyncPacketWriter writer(sync_writer);
tunnel->connect(&writer);
tunnel->connectSync(&writer);
tunnel->waitForFinish();
LOG_INFO(tunnel->getLogger(), "connection for {} cost {} ms.", tunnel->id(), stopwatch.elapsedMilliseconds());
}
Expand Down
Loading

0 comments on commit f9167f3

Please sign in to comment.