From 9a527007ecbc37a147e732928c3aaea130a6d2ca Mon Sep 17 00:00:00 2001 From: wanghenshui Date: Sat, 18 Feb 2023 21:56:32 +0800 Subject: [PATCH] remove proxy code. proxy made performence low --- include/pika_proxy.h | 54 ------------ include/pika_proxy_cli.h | 93 --------------------- include/pika_proxy_conn.h | 103 ----------------------- src/pika.cc | 6 -- src/pika_client_conn.cc | 20 +---- src/pika_cluster.cc | 4 +- src/pika_consensus.cc | 6 +- src/pika_proxy.cc | 131 ----------------------------- src/pika_proxy_cli.cc | 158 ---------------------------------- src/pika_proxy_conn.cc | 172 -------------------------------------- 10 files changed, 8 insertions(+), 739 deletions(-) delete mode 100644 include/pika_proxy.h delete mode 100644 include/pika_proxy_cli.h delete mode 100644 include/pika_proxy_conn.h delete mode 100644 src/pika_proxy.cc delete mode 100644 src/pika_proxy_cli.cc delete mode 100644 src/pika_proxy_conn.cc diff --git a/include/pika_proxy.h b/include/pika_proxy.h deleted file mode 100644 index 00c37b82c3..0000000000 --- a/include/pika_proxy.h +++ /dev/null @@ -1,54 +0,0 @@ -// Copyright (c) 2015-present, Qihoo, Inc. All rights reserved. -// This source code is licensed under the BSD-style license found in the -// LICENSE file in the root directory of this source tree. An additional grant -// of patent rights can be found in the PATENTS file in the same directory. - -#ifndef PIKA_PROXY_H_ -#define PIKA_PROXY_H_ - -#include "include/pika_proxy_conn.h" -#include "include/pika_proxy_cli.h" -#include "include/pika_client_conn.h" - -class ProxyCliManager { - public: - ProxyCliManager(int conn_every_backend, int keepalive_time); - ~ProxyCliManager(); - int Start(); - int Stop(); - // no need mutex lock here - Status ChooseForwardToBackend(ProxyTask* task); - - private: - Status ForwardNextAvailableConn(ProxyTask* task); - std::vector> clis_; - std::atomic rr_counter_; - int conn_every_backend_; -}; - -class PikaProxy { - public: - PikaProxy(); - ~PikaProxy(); - int Start(); - int Stop(); - // write to client_thread and put it into task_queue - static void ForwardToBackend(void* arg); - // grep task_queue and - static void WritebackToCliConn(void* arg); - // bypass to g_pika_server - void ScheduleForwardToBackend( - const std::shared_ptr& conn_ptr, - const std::vector& redis_cmds, - const std::vector& dst); - void MayScheduleWritebackToCliConn(std::shared_ptr conn_ptr, - std::shared_ptr cli, const std::string res); - std::shared_ptr cli_manager() { - return cli_manager_ptr_; - } - - private: - std::shared_ptr cli_manager_ptr_; -}; - -#endif // PIKA_PROXY_H_ diff --git a/include/pika_proxy_cli.h b/include/pika_proxy_cli.h deleted file mode 100644 index 0cc3de3fe2..0000000000 --- a/include/pika_proxy_cli.h +++ /dev/null @@ -1,93 +0,0 @@ -// Copyright (c) 2015-present, Qihoo, Inc. All rights reserved. -// This source code is licensed under the BSD-style license found in the -// LICENSE file in the root directory of this source tree. An additional grant -// of patent rights can be found in the PATENTS file in the same directory. - -#ifndef PIKA_PROXY_CLI_H_ -#define PIKA_PROXY_CLI_H_ - -#include "pink/include/pink_conn.h" -#include "pink/include/client_thread.h" - -#include "include/pika_proxy_conn.h" - -using slash::Status; -class ProxyCli; -class ProxyFactory : public pink::ConnFactory { - public: - explicit ProxyFactory(std::shared_ptr proxy_cli); - virtual std::shared_ptr NewPinkConn( - int connfd, - const std::string &ip_port, - pink::Thread *thread, - void* worker_specific_data, - pink::PinkEpoll* pink_epoll) const override { - return std::static_pointer_cast - (std::make_shared( - connfd, ip_port, thread, pink_epoll, proxy_cli_)); - } - private: - std::shared_ptr proxy_cli_; -}; - -class ProxyHandle : public pink::ClientHandle { - public: - explicit ProxyHandle(std::shared_ptr proxy_cli) : ClientHandle() { - proxy_cli_ = proxy_cli; - } - ~ProxyHandle() { - } - void CronHandle() const override { - } - void FdTimeoutHandle(int fd, const std::string& ip_port) const override { - } - void FdClosedHandle(int fd, const std::string& ip_port) const override; - bool AccessHandle(std::string& ip) const override { - return true; - } - int CreateWorkerSpecificData(void** data) const override { - return 0; - } - int DeleteWorkerSpecificData(void* data) const override { - return 0; - } - void DestConnectFailedHandle( - std::string ip_port, std::string reason) const override { - } - - private: - std::shared_ptr proxy_cli_; -}; - -class ProxyCli : public std::enable_shared_from_this { - public: - ProxyCli(int cron_interval, int keepalive_timeout); - int Start(); - int Stop(); - Status ForwardToBackend(ProxyTask* task); - Status WritebackUpdate(const std::string& ip_port, - const std::string& res, bool* write_back, ProxyTask** res_task); - - struct ProxyCliTask { - std::shared_ptr conn_ptr; - std::shared_ptr resp_ptr; - }; - void LostConn(const std::string& ip_port); - ~ProxyCli(); - - private: - int cron_interval_; - int keepalive_timeout_; - ProxyFactory* proxy_factory_; - ProxyHandle* proxy_handle_; - - slash::Mutex input_l_; - std::shared_ptr client_ptr_; - // pair(backend conn ip + port, std::deque) - std::unordered_map> backend_task_queue_; - // pair(client ip + port, ProxyTask*) - std::unordered_map task_queue_; -}; - -#endif // PIKA_PROXY_CLI_H_ -# diff --git a/include/pika_proxy_conn.h b/include/pika_proxy_conn.h deleted file mode 100644 index d36f77cdb4..0000000000 --- a/include/pika_proxy_conn.h +++ /dev/null @@ -1,103 +0,0 @@ -// Copyright (c) 2015-present, Qihoo, Inc. All rights reserved. -// This source code is licensed under the BSD-style license found in the -// LICENSE file in the root directory of this source tree. An additional grant -// of patent rights can be found in the PATENTS file in the same directory. - -#ifndef PIKA_PROXY_CONN_H_ -#define PIKA_PROXY_CONN_H_ - -#include "pink/include/redis_conn.h" -#include "pink/include/backend_thread.h" -#include "include/pika_client_conn.h" -#include -class ProxyCli; -class PikaProxyConn; - -struct ProxyTask { - ProxyTask() { - } - std::shared_ptr conn_ptr; - std::vector redis_cmds; - std::vector redis_cmds_forward_dst; -}; - -class PikaProxyConn: public pink::RedisConn { - public: - PikaProxyConn(int fd, std::string ip_port, - pink::Thread *server_thread, - pink::PinkEpoll* pink_epoll, - std::shared_ptr proxy_cli); - bool IsAuthed() { return isAuthed_; } - bool IsSelected() { return isSelected_; } - bool IsClosed() { return closed_; } - void SetClose() { closed_ = true;} - virtual ~PikaProxyConn() {} - - private: - int DealMessage( - const pink::RedisCmdArgsType& argv, - std::string* response) override; - std::shared_ptr proxy_cli_; - std::string auth_; - bool isAuthed_; - int table_; - bool isSelected_; - bool closed_; -}; - -struct ConnConfig { - ConnConfig( int table, const std::string& auth, int parallel) - : table_(table), auth_(auth), parallel_(parallel) {} - int table_ = 0; - std::string auth_; - int parallel_ = 10; -}; - -class ParallelConn { - public: -ParallelConn(const std::string& addr, ConnConfig& config, - std::shared_ptr client); - - Status Connect(); - Status Start(); - void Close(); - void Retain(); - bool Release(); - std::string Addr() { return addr_; } - int GetTable() { return config_.table_; } - Status PrepareConn(); - private: - std::shared_ptr GetConn(int fd); - void VerifyAuth(int fd); - void SelectConn(int fd); - void KeepAlive(); - void KeepAliveConn(int fd); - - - //std::vector> parallelConn_; - std::map parallelConn_; - std::set tmpConns_; - std::string addr_; - ConnConfig config_; - std::atomic refCount_; - std::shared_ptr client_; -}; - -class ConnectionPool { - public: - ConnectionPool(const ConnConfig& config, - std::shared_ptr client) - : config_(config), client_(client) { } - void Retain(std::string addr); - void Release(std::string addr); - void AddParallel(const std::string& addr); - private: - // addr and ptr - ConnConfig config_; - std::unordered_map pool_; - std::shared_ptr client_; -}; - - - -#endif // PIKA_PROXY_CONN_H_ diff --git a/src/pika.cc b/src/pika.cc index aa5c6fadb2..f50acf2721 100644 --- a/src/pika.cc +++ b/src/pika.cc @@ -9,7 +9,6 @@ #include "slash/include/env.h" #include "include/pika_rm.h" -#include "include/pika_proxy.h" #include "include/pika_server.h" #include "include/pika_command.h" #include "include/pika_conf.h" @@ -25,7 +24,6 @@ PikaConf* g_pika_conf; PikaServer* g_pika_server; PikaReplicaManager* g_pika_rm; -PikaProxy* g_pika_proxy; PikaCmdTableManager* g_pika_cmd_table_manager; @@ -194,13 +192,11 @@ int main(int argc, char *argv[]) { g_pika_cmd_table_manager = new PikaCmdTableManager(); g_pika_server = new PikaServer(); g_pika_rm = new PikaReplicaManager(); - g_pika_proxy = new PikaProxy(); if (g_pika_conf->daemonize()) { close_std(); } - g_pika_proxy->Start(); g_pika_rm->Start(); g_pika_server->Start(); @@ -212,11 +208,9 @@ int main(int argc, char *argv[]) { // may references to dead PikaServer g_pika_rm->Stop(); - g_pika_proxy->Stop(); delete g_pika_server; delete g_pika_rm; - delete g_pika_proxy; delete g_pika_cmd_table_manager; ::google::ShutdownGoogleLogging(); delete g_pika_conf; diff --git a/src/pika_client_conn.cc b/src/pika_client_conn.cc index d6eb739a60..5851f8100b 100644 --- a/src/pika_client_conn.cc +++ b/src/pika_client_conn.cc @@ -15,13 +15,11 @@ #include "include/pika_cmd_table_manager.h" #include "include/pika_admin.h" #include "include/pika_rm.h" -#include "include/pika_proxy.h" extern PikaConf* g_pika_conf; extern PikaServer* g_pika_server; extern PikaReplicaManager* g_pika_rm; extern PikaCmdTableManager* g_pika_cmd_table_manager; -extern PikaProxy* g_pika_proxy; PikaClientConn::PikaClientConn(int fd, std::string ip_port, pink::Thread* thread, @@ -196,27 +194,15 @@ void PikaClientConn::DoBackgroundTask(void* arg) { conn_ptr->NotifyEpoll(false); return; } - for (auto argv : bg_arg->redis_cmds) { + for (const auto& argv : bg_arg->redis_cmds) { if (argv.size() == 0) { delete bg_arg; conn_ptr->NotifyEpoll(false); return; } } - std::vector dst; - bool all_local = true; - Status s = g_pika_server->GetCmdRouting( - bg_arg->redis_cmds, &dst, &all_local); - if (!s.ok()) { - delete bg_arg; - conn_ptr->NotifyEpoll(false); - return; - } - if (!all_local) { - g_pika_proxy->ScheduleForwardToBackend(conn_ptr, bg_arg->redis_cmds, dst); - } else { - conn_ptr->BatchExecRedisCmd(bg_arg->redis_cmds); - } + + conn_ptr->BatchExecRedisCmd(bg_arg->redis_cmds); delete bg_arg; } diff --git a/src/pika_cluster.cc b/src/pika_cluster.cc index a652fdb579..12e73d8466 100644 --- a/src/pika_cluster.cc +++ b/src/pika_cluster.cc @@ -154,7 +154,7 @@ bool PkClusterInfoCmd::ParseInfoTableSubCmd() { void PkClusterInfoCmd::ClusterInfoSlotRange(const std::string& table_name, const std::set slots, std::string* info) { std::stringstream tmp_stream; - for (auto partition_id : slots) { + for (const auto& partition_id : slots) { std::string p_info; Status s = GetSlotInfo(table_name, partition_id, &p_info); if (!s.ok()) { @@ -588,7 +588,7 @@ void PkClusterSlotsSlaveofCmd::Do(std::shared_ptr partition) { } } - for (auto to_del : to_del_slots) { + for (const auto& to_del : to_del_slots) { slots_.erase(to_del); } diff --git a/src/pika_consensus.cc b/src/pika_consensus.cc index 351b294219..abd45c42f4 100644 --- a/src/pika_consensus.cc +++ b/src/pika_consensus.cc @@ -188,7 +188,7 @@ LogOffset SyncProgress::InternalCalCommittedIndex(std::unordered_map offsets; - for (auto index : match_index) { + for (const auto& index : match_index) { offsets.push_back(index.second); } std::sort(offsets.begin(), offsets.end()); @@ -557,7 +557,7 @@ Status ConsensusCoordinator::ScheduleApplyLog(const LogOffset& committed_index) if (!s.ok()) { return Status::NotFound("committed index not found " + committed_index.ToString()); } - for (auto log : logs) { + for (const auto& log : logs) { context_->PrepareUpdateAppliedIndex(log.offset); InternalApply(log); } @@ -572,7 +572,7 @@ Status ConsensusCoordinator::ScheduleApplyFollowerLog(const LogOffset& committed if (!s.ok()) { return Status::NotFound("committed index not found " + committed_index.ToString()); } - for (auto log : logs) { + for (const auto& log : logs) { context_->PrepareUpdateAppliedIndex(log.offset); InternalApplyFollower(log); } diff --git a/src/pika_proxy.cc b/src/pika_proxy.cc deleted file mode 100644 index f40b0714c1..0000000000 --- a/src/pika_proxy.cc +++ /dev/null @@ -1,131 +0,0 @@ -// Copyright (c) 2015-present, Qihoo, Inc. All rights reserved. -// This source code is licensed under the BSD-style license found in the -// LICENSE file in the root directory of this source tree. An additional grant -// of patent rights can be found in the PATENTS file in the same directory. - -#include "include/pika_proxy.h" -#include "include/pika_server.h" - -extern PikaProxy* g_pika_proxy; -extern PikaServer* g_pika_server; - -/* ProxyCliManager */ - -ProxyCliManager::ProxyCliManager( - int conn_every_backend, int keepalive_time) - : rr_counter_(0), conn_every_backend_(conn_every_backend) { - for (int i = 0; i < conn_every_backend; ++i) { - clis_.push_back( - std::make_shared(10 /*cron interval*/, keepalive_time)); - } -} - -ProxyCliManager::~ProxyCliManager() { -} - -Status ProxyCliManager::ChooseForwardToBackend(ProxyTask* task) { - return ForwardNextAvailableConn(task); -} - -Status ProxyCliManager::ForwardNextAvailableConn(ProxyTask* task) { - uint64_t counter = rr_counter_.load() % conn_every_backend_; - rr_counter_++; - return clis_[counter]->ForwardToBackend(task); -} - -int ProxyCliManager::Start() { - for (auto& cli : clis_) { - int res = cli->Start(); - if (res != pink::kSuccess) { - LOG(ERROR) << "ProxyCliManager Start Failed: " << - (res == pink::kCreateThreadError ? - ": create thread error " : ": other error"); - return res; - } - } - return pink::kSuccess; -} - -int ProxyCliManager::Stop() { - for (auto& cli : clis_) { - cli->Stop(); - } - return pink::kSuccess; -} - -/* PikaProxy */ - -PikaProxy::PikaProxy() { - // conn_every_backend: 10, keepalive_time: 60s - cli_manager_ptr_ = std::make_shared(10, 60); -} - -PikaProxy::~PikaProxy() { -} - -int PikaProxy::Start() { - int res = cli_manager_ptr_->Start(); - if (res != pink::kSuccess) { - LOG(ERROR) << "PikaProxy Start Failed: " << - (res == pink::kCreateThreadError ? - ": create thread error " : ": other error"); - return res; - } - return pink::kSuccess; -} - -int PikaProxy::Stop() { - cli_manager_ptr_->Stop(); - return pink::kSuccess; -} - -void PikaProxy::ForwardToBackend(void* arg) { - ProxyTask* task = reinterpret_cast(arg); - Status s = g_pika_proxy->cli_manager()->ChooseForwardToBackend(task); - if (!s.ok()) { - delete task; - task = NULL; - LOG(WARNING) << "Forward to backend" << s.ToString(); - } -} - -// just one therad invoke this, no lock guard -void PikaProxy::WritebackToCliConn(void* arg) { - ProxyTask* task = reinterpret_cast(arg); - std::shared_ptr conn_ptr = task->conn_ptr; - // TODO(AZ) build redis resp - // for (auto& resp : conn_ptr->resp_array) { - // conn_ptr->WriteResp(std::move(*resp) + "\r\n"); - // } - conn_ptr->resp_array.clear(); - conn_ptr->NotifyEpoll(true); - delete task; -} - -void PikaProxy::MayScheduleWritebackToCliConn( - std::shared_ptr conn_ptr, - std::shared_ptr cli, const std::string res) { - bool write_back = false; - ProxyTask* task = NULL; - Status s = cli->WritebackUpdate(conn_ptr->ip_port(), res, &write_back, &task); - if (!s.ok()) { - LOG(WARNING) << "WritebaclUpdate failed " + s.ToString(); - return; - } - if (!write_back) { - return; - } - g_pika_server->ScheduleClientPool(&PikaProxy::WritebackToCliConn, task); -} - -void PikaProxy::ScheduleForwardToBackend( - const std::shared_ptr& conn_ptr, - const std::vector& redis_cmds, - const std::vector& dst) { - ProxyTask* arg = new ProxyTask(); - arg->conn_ptr = conn_ptr; - arg->redis_cmds = std::move(redis_cmds); - arg->redis_cmds_forward_dst = std::move(dst); - // choose ip and update - g_pika_server->ScheduleClientPool(&PikaProxy::ForwardToBackend, arg); -} diff --git a/src/pika_proxy_cli.cc b/src/pika_proxy_cli.cc deleted file mode 100644 index 3da346258e..0000000000 --- a/src/pika_proxy_cli.cc +++ /dev/null @@ -1,158 +0,0 @@ -// Copyright (c) 2015-present, Qihoo, Inc. All rights reserved. -// This source code is licensed under the BSD-style license found in the -// LICENSE file in the root directory of this source tree. An additional grant -// of patent rights can be found in the PATENTS file in the same directory. - -#include "include/pika_proxy_cli.h" - -#include - -#include - -/* ProxyFactory */ - -ProxyFactory::ProxyFactory(std::shared_ptr proxy_cli) - : proxy_cli_(proxy_cli) { -} - -/* ProxyHandle */ - -void ProxyHandle::FdClosedHandle(int fd, const std::string& ip_port) const { - proxy_cli_->LostConn(ip_port); -} - -/* ProxyCli */ - -ProxyCli::ProxyCli(int cron_interval, int keepalive_timeout) - : cron_interval_(cron_interval), - keepalive_timeout_(keepalive_timeout) { -} - -int ProxyCli::Start() { - ProxyFactory* proxy_factory_ = new ProxyFactory(shared_from_this()); - ProxyHandle* proxy_handle_ = new ProxyHandle(shared_from_this()); - client_ptr_ = std::make_shared( - proxy_factory_, cron_interval_, - keepalive_timeout_, proxy_handle_, nullptr); - - int res = client_ptr_->StartThread(); - if (res != pink::kSuccess) { - LOG(FATAL) << "Start Proxy ClientThread Error: " - << res << (res == pink::kCreateThreadError ? - ": create thread error " : ": other error"); - return res; - } - return pink::kSuccess; -} - -int ProxyCli::Stop() { - client_ptr_->StopThread(); - return pink::kSuccess; -} - -Status ProxyCli::WritebackUpdate( - const std::string& ip_port, - const std::string& res, - bool* write_back, - ProxyTask** res_task) { - slash::MutexLock l(&input_l_); - auto iter = backend_task_queue_.find(ip_port); - if (iter == backend_task_queue_.end()) { - return Status::NotFound(ip_port); - } - std::deque& queue = iter->second; - ProxyCliTask cli_task; - if (!queue.empty()) { - cli_task = queue.front(); - } else { - backend_task_queue_.erase(iter); - return Status::NotFound(ip_port); - } - queue.pop_front(); - if (queue.empty()) { - backend_task_queue_.erase(iter); - } - - cli_task.resp_ptr->append(res); - std::shared_ptr conn_ptr = cli_task.conn_ptr; - conn_ptr->resp_num--; - - if (conn_ptr->resp_num.load() == 0) { - *write_back = true; - const auto& iter = task_queue_.find(conn_ptr->ip_port()); - if (iter == task_queue_.end()) { - LOG(WARNING) << "find ip_port()" << conn_ptr->ip_port() << " not found"; - return Status::Corruption(conn_ptr->ip_port()); - } - *res_task = iter->second; - task_queue_.erase(iter); - } - - return Status::OK(); -} - -Status ProxyCli::ForwardToBackend(ProxyTask* task) { - std::shared_ptr conn_ptr = task->conn_ptr; - conn_ptr->resp_num.store(task->redis_cmds.size()); - - slash::MutexLock l(&input_l_); - size_t loopsize = - task->redis_cmds.size() == task->redis_cmds_forward_dst.size() - ? task->redis_cmds.size() : 0; - if (loopsize == 0) { - return Status::Corruption("cmd and calculated routing not match"); - } - for (size_t i = 0; i < loopsize; ++i) { - std::shared_ptr resp_ptr = std::make_shared(); - conn_ptr->resp_array.push_back(resp_ptr); - ProxyCliTask cli_task; - cli_task.conn_ptr = conn_ptr; - cli_task.resp_ptr = resp_ptr; - pink::RedisCmdArgsType& redis_cmd = task->redis_cmds[i]; - - std::string redis_cmd_str; - // TODO(AZ) build more complex redis command - redis_cmd_str.append("*" + std::to_string(redis_cmd.size()) + "\r\n"); - for (auto& cmd_param : redis_cmd) { - redis_cmd_str.append( - "$" + std::to_string(cmd_param.size()) + "\r\n" + cmd_param + "\r\n"); - } - - Node& node = task->redis_cmds_forward_dst[i]; - Status s = client_ptr_->Write(node.Ip(), node.Port(), redis_cmd_str); - - std::string ip_port = node.Ip() + ":" + std::to_string(node.Port()); - backend_task_queue_[ip_port].push_back(cli_task); - } - //TODO to be pipline - std::string ip_port = conn_ptr->ip_port(); - if (task_queue_.find(ip_port) != task_queue_.end()) { - ProxyTask* tmp_task = task_queue_[ip_port]; - if (tmp_task) { - delete tmp_task; - } - } - task_queue_[ip_port] = task; - return Status::OK(); -} - -void ProxyCli::LostConn(const std::string& ip_port) { - slash::MutexLock l(&input_l_); - auto iter = backend_task_queue_.find(ip_port); - if (iter == backend_task_queue_.end()) { - return; - } - std::deque& queue = iter->second; - // all client whole cmd which sheduled to this ip_port will timeout - for (auto& cli_task : queue) { - std::shared_ptr conn_ptr = cli_task.conn_ptr; - auto iter = task_queue_.find(conn_ptr->ip_port()); - ProxyTask* proxy_task = iter->second; - task_queue_.erase(iter); - delete proxy_task; - } -} -ProxyCli::~ProxyCli() { - delete proxy_factory_; - delete proxy_handle_; -} diff --git a/src/pika_proxy_conn.cc b/src/pika_proxy_conn.cc deleted file mode 100644 index 560aef3df0..0000000000 --- a/src/pika_proxy_conn.cc +++ /dev/null @@ -1,172 +0,0 @@ -// Copyright (c) 2015-present, Qihoo, Inc. All rights reserved. -// This source code is licensed under the BSD-style license found in the -// LICENSE file in the root directory of this source tree. An additional grant -// of patent rights can be found in the PATENTS file in the same directory. - -#include "include/pika_proxy_conn.h" - -#include "pink/include/redis_cli.h" - -#include "include/pika_proxy.h" - -extern PikaProxy* g_pika_proxy; - -PikaProxyConn::PikaProxyConn(int fd, std::string ip_port, - pink::Thread* thread, - pink::PinkEpoll* pink_epoll, - std::shared_ptr proxy_cli) - : RedisConn(fd, ip_port, thread, pink_epoll, - pink::HandleType::kSynchronous, PIKA_MAX_CONN_RBUF_HB), - proxy_cli_(proxy_cli) { -} - - -int PikaProxyConn::DealMessage( - const pink::RedisCmdArgsType& argv, std::string* response) { - std::string res; - for (auto& arg : argv) { - res += arg; - } - g_pika_proxy->MayScheduleWritebackToCliConn( - std::dynamic_pointer_cast(shared_from_this()), - proxy_cli_, res); - return 0; -} - -ParallelConn::ParallelConn(const std::string& addr, ConnConfig& config, - std::shared_ptr client) - : addr_(addr), config_(config), client_(client) { - refCount_ = 1; -} - - -Status ParallelConn::Connect() { - int num = parallelConn_.size() + tmpConns_.size(); - if (num > config_.parallel_) { - return Status::OK(); - } - for (int i = 0; i < num; i++) { - std::string ip; - int port, fd; - if (!slash::ParseIpPortString(addr_, ip, port)) { - LOG(INFO) << "parser addr " << addr_ << " error"; - return Status::InvalidArgument("paser addr error, addr: ", addr_); - } - Status s = client_->Connect(ip, port, &fd); - if (!s.ok()) { - LOG(INFO) << "connect addr: " << addr_ << "error: " << s.ToString(); - return s; - } - LOG(INFO) << "connect addr: " << addr_ << " fd: " << std::to_string(fd); - tmpConns_.insert(fd); - } - return Status::OK(); -} - -std::shared_ptr ParallelConn::GetConn(int fd) { - return client_->GetConn(fd); -} - -void ParallelConn::VerifyAuth(int fd) { - -} - -void ParallelConn::SelectConn(int fd) { - -} - -void ParallelConn::KeepAlive() { - -} - -void ParallelConn::KeepAliveConn(int fd) { - -} - -Status ParallelConn::PrepareConn() { - for(auto item : tmpConns_) { - auto conn = std::dynamic_pointer_cast(GetConn(item)); - if (conn->IsAuthed()) { - SelectConn(item); - } else { - VerifyAuth(item); - SelectConn(item); - } - } - return Status::OK(); -} - -Status ParallelConn::Start() { - Status s = Connect(); - if (!s.ok()) { - return s; - } - s = PrepareConn(); - if (!s.ok()) { - return s; - } - return Status::OK(); -} - -void ParallelConn::Close() { - for (auto item : parallelConn_) { - client_->Close(item.second); - } - parallelConn_.clear(); - for (auto item : tmpConns_) { - client_->Close(item); - } - tmpConns_.clear(); -} - -void ParallelConn::Retain() { - int expect = 0; - if (refCount_.compare_exchange_strong(expect, -1)) { - LOG(INFO) << "retain parallel conn ref count error"; - return; - } - refCount_++; -} - -bool ParallelConn::Release() { - int expect = 0; - if (refCount_.compare_exchange_strong(expect, -1)) { - LOG(INFO) << "release parallel conn ref count error"; - return true; - } - refCount_--; - if (refCount_.compare_exchange_strong(expect, -1)) { - return true; - } - return false; -} - - -void ConnectionPool::Release(std::string addr) { - if (pool_.find(addr) == pool_.end()) { - return; - } - auto parallel = pool_.find(addr)->second; - if (parallel->Release()) { - parallel->Close(); - delete parallel; - pool_.erase(addr); - LOG(INFO) << "release parallel conn :" << parallel->Addr() << " table :" - << std::to_string(parallel->GetTable()); - } -} - -void ConnectionPool::AddParallel(const std::string& addr) { - auto conns = new ParallelConn(addr, config_, client_); - pool_.insert(make_pair(addr, conns)); - conns->Start(); -} - -void ConnectionPool::Retain(std::string addr) { - auto iter = pool_.find(addr); - if (iter != pool_.end()) { - iter->second->Retain(); - return; - } - AddParallel(addr); -}