Skip to content

Commit

Permalink
Merge branch 'OpenAtomFoundation:unstable' into incr
Browse files Browse the repository at this point in the history
  • Loading branch information
chejinge authored Aug 1, 2024
2 parents 02a6a25 + 2f3bedb commit 1e0c753
Show file tree
Hide file tree
Showing 20 changed files with 1,215 additions and 275 deletions.
12 changes: 10 additions & 2 deletions include/pika_client_conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ struct TimeStat {
void Reset() {
enqueue_ts_ = dequeue_ts_ = 0;
process_done_ts_ = 0;
before_queue_ts_ = 0;
}

uint64_t start_ts() const {
Expand All @@ -37,8 +38,13 @@ struct TimeStat {
return process_done_ts_ > dequeue_ts_ ? process_done_ts_ - dequeue_ts_ : 0;
}

uint64_t before_queue_time() const {
return process_done_ts_ > dequeue_ts_ ? before_queue_ts_ - enqueue_ts_ : 0;
}

uint64_t enqueue_ts_;
uint64_t dequeue_ts_;
uint64_t before_queue_ts_;
uint64_t process_done_ts_;
};

Expand Down Expand Up @@ -67,8 +73,11 @@ class PikaClientConn : public net::RedisConn {
const net::HandleType& handle_type, int max_conn_rbuf_size);
~PikaClientConn() = default;

bool IsInterceptedByRTC(std::string& opt);

void ProcessRedisCmds(const std::vector<net::RedisCmdArgsType>& argvs, bool async, std::string* response) override;

bool ReadCmdInCache(const net::RedisCmdArgsType& argv, const std::string& opt);
void BatchExecRedisCmd(const std::vector<net::RedisCmdArgsType>& argvs);
int DealMessage(const net::RedisCmdArgsType& argv, std::string* response) override { return 0; }
static void DoBackgroundTask(void* arg);
Expand Down Expand Up @@ -99,8 +108,7 @@ class PikaClientConn : public net::RedisConn {
void AddKeysToWatch(const std::vector<std::string>& db_keys);
void RemoveWatchedKeys();
void SetTxnFailedFromKeys(const std::vector<std::string>& db_keys);
void SetAllTxnFailed();
void SetTxnFailedFromDBs(std::string db_name);
void SetTxnFailedIfKeyExists(const std::string target_db_name = "");
void ExitTxn();
bool IsInTxn();
bool IsTxnInitFailed();
Expand Down
5 changes: 4 additions & 1 deletion include/pika_command.h
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ const std::string kCmdNameXInfo = "xinfo";

const std::string kClusterPrefix = "pkcluster";


/*
* If a type holds a key, a new data structure
* that uses the key will use this error
Expand Down Expand Up @@ -289,7 +290,7 @@ enum CmdFlags {
kCmdFlagsOperateKey = (1 << 19), // redis keySpace
kCmdFlagsStream = (1 << 20),
kCmdFlagsFast = (1 << 21),
kCmdFlagsSlow = (1 << 22),
kCmdFlagsSlow = (1 << 22)
};

void inline RedisAppendContent(std::string& str, const std::string& value);
Expand Down Expand Up @@ -536,6 +537,7 @@ class Cmd : public std::enable_shared_from_this<Cmd> {
bool hasFlag(uint32_t flag) const;
bool is_read() const;
bool is_write() const;
bool isCacheRead() const;

bool IsLocal() const;
bool IsSuspend() const;
Expand Down Expand Up @@ -579,6 +581,7 @@ class Cmd : public std::enable_shared_from_this<Cmd> {
void ProcessCommand(const HintKeys& hint_key = HintKeys());
void InternalProcessCommand(const HintKeys& hint_key);
void DoCommand(const HintKeys& hint_key);
bool DoReadCommandInCache();
void LogCommand() const;

std::string name_;
Expand Down
3 changes: 2 additions & 1 deletion include/pika_consensus.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ class ConsensusCoordinator {
pstd::Status InternalAppendLog(const std::shared_ptr<Cmd>& cmd_ptr);
pstd::Status InternalAppendBinlog(const std::shared_ptr<Cmd>& cmd_ptr);
void InternalApply(const MemLog::LogItem& log);
void InternalApplyFollower(const MemLog::LogItem& log);
void InternalApplyFollower(const std::shared_ptr<Cmd>& cmd_ptr);

pstd::Status GetBinlogOffset(const BinlogOffset& start_offset, LogOffset* log_offset);
pstd::Status GetBinlogOffset(const BinlogOffset& start_offset, const BinlogOffset& end_offset,
Expand All @@ -182,6 +182,7 @@ class ConsensusCoordinator {
pstd::Status FindLogicOffset(const BinlogOffset& start_offset, uint64_t target_index, LogOffset* found_offset);
pstd::Status GetLogsBefore(const BinlogOffset& start_offset, std::vector<LogOffset>* hints);

private:
// keep members in this class works in order
pstd::Mutex order_mu_;

Expand Down
6 changes: 3 additions & 3 deletions include/pika_repl_bgworker.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

#include <memory>
#include <string>

#include <functional>
#include "net/include/bg_thread.h"
#include "net/include/pb_conn.h"
#include "net/include/thread_pool.h"
Expand All @@ -25,13 +25,13 @@ class PikaReplBgWorker {
int StartThread();
int StopThread();
void Schedule(net::TaskFunc func, void* arg);
void QueueClear();
void Schedule(net::TaskFunc func, void* arg, std::function<void()>& call_back);
static void HandleBGWorkerWriteBinlog(void* arg);
static void HandleBGWorkerWriteDB(void* arg);
static void WriteDBInSyncWay(const std::shared_ptr<Cmd>& c_ptr);
void SetThreadName(const std::string& thread_name) {
bg_thread_.set_thread_name(thread_name);
}

BinlogItem binlog_item_;
net::RedisParser redis_parser_;
std::string ip_port_;
Expand Down
36 changes: 29 additions & 7 deletions include/pika_repl_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,8 @@ struct ReplClientWriteBinlogTaskArg {

struct ReplClientWriteDBTaskArg {
const std::shared_ptr<Cmd> cmd_ptr;
LogOffset offset;
std::string db_name;
ReplClientWriteDBTaskArg(std::shared_ptr<Cmd> _cmd_ptr, const LogOffset& _offset, std::string _db_name)
: cmd_ptr(std::move(_cmd_ptr)),
offset(_offset),
db_name(std::move(_db_name)) {}
explicit ReplClientWriteDBTaskArg(std::shared_ptr<Cmd> _cmd_ptr)
: cmd_ptr(std::move(_cmd_ptr)) {}
~ReplClientWriteDBTaskArg() = default;
};

Expand All @@ -68,7 +64,7 @@ class PikaReplClient {
void ScheduleByDBName(net::TaskFunc func, void* arg, const std::string& db_name);
void ScheduleWriteBinlogTask(const std::string& db_name, const std::shared_ptr<InnerMessage::InnerResponse>& res,
const std::shared_ptr<net::PbConn>& conn, void* res_private_data);
void ScheduleWriteDBTask(const std::shared_ptr<Cmd>& cmd_ptr, const LogOffset& offset, const std::string& db_name);
void ScheduleWriteDBTask(const std::shared_ptr<Cmd>& cmd_ptr, const std::string& db_name);

pstd::Status SendMetaSync();
pstd::Status SendDBSync(const std::string& ip, uint32_t port, const std::string& db_name,
Expand All @@ -80,6 +76,24 @@ class PikaReplClient {
const std::string& local_ip, bool is_first_send);
pstd::Status SendRemoveSlaveNode(const std::string& ip, uint32_t port, const std::string& db_name, const std::string& local_ip);

void IncrAsyncWriteDBTaskCount(const std::string& db_name, int32_t incr_step) {
int32_t db_index = db_name.back() - '0';
assert(db_index >= 0 && db_index <= 7);
async_write_db_task_counts_[db_index].fetch_add(incr_step, std::memory_order::memory_order_seq_cst);
}

void DecrAsyncWriteDBTaskCount(const std::string& db_name, int32_t incr_step) {
int32_t db_index = db_name.back() - '0';
assert(db_index >= 0 && db_index <= 7);
async_write_db_task_counts_[db_index].fetch_sub(incr_step, std::memory_order::memory_order_seq_cst);
}

int32_t GetUnfinishedAsyncWriteDBTaskCount(const std::string& db_name) {
int32_t db_index = db_name.back() - '0';
assert(db_index >= 0 && db_index <= 7);
return async_write_db_task_counts_[db_index].load(std::memory_order_seq_cst);
}

private:
size_t GetBinlogWorkerIndexByDBName(const std::string &db_name);
size_t GetHashIndexByKey(const std::string& key);
Expand All @@ -88,6 +102,14 @@ class PikaReplClient {
std::unique_ptr<PikaReplClientThread> client_thread_;
int next_avail_ = 0;
std::hash<std::string> str_hash;

// async_write_db_task_counts_ is used when consuming binlog, which indicates the nums of async write-DB tasks that are
// queued or being executing by WriteDBWorkers. If a flushdb-binlog need to apply DB, it must wait
// util this count drop to zero. you can also check pika discussion #2807 to know more
// it is only used in slaveNode when consuming binlog
std::atomic<int32_t> async_write_db_task_counts_[MAX_DB_NUM];
// [NOTICE] write_db_workers_ must be declared after async_write_db_task_counts_ to ensure write_db_workers_ will be destroyed before async_write_db_task_counts_
// when PikaReplClient is de-constructing, because some of the async task that exec by write_db_workers_ will manipulate async_write_db_task_counts_
std::vector<std::unique_ptr<PikaReplBgWorker>> write_binlog_workers_;
std::vector<std::unique_ptr<PikaReplBgWorker>> write_db_workers_;
};
Expand Down
6 changes: 5 additions & 1 deletion include/pika_rm.h
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ class PikaReplicaManager {
void ScheduleWriteBinlogTask(const std::string& db_name,
const std::shared_ptr<InnerMessage::InnerResponse>& res,
const std::shared_ptr<net::PbConn>& conn, void* res_private_data);
void ScheduleWriteDBTask(const std::shared_ptr<Cmd>& cmd_ptr, const LogOffset& offset, const std::string& db_name);
void ScheduleWriteDBTask(const std::shared_ptr<Cmd>& cmd_ptr, const std::string& db_name);
void ScheduleReplClientBGTaskByDBName(net::TaskFunc , void* arg, const std::string &db_name);
void ReplServerRemoveClientConn(int fd);
void ReplServerUpdateClientConnMap(const std::string& ip_port, int fd);
Expand All @@ -205,6 +205,10 @@ class PikaReplicaManager {
return sync_slave_dbs_;
}

int32_t GetUnfinishedAsyncWriteDBTaskCount(const std::string& db_name) {
return pika_repl_client_->GetUnfinishedAsyncWriteDBTaskCount(db_name);
}

private:
void InitDB();
pstd::Status SelectLocalIp(const std::string& remote_ip, int remote_port, std::string* local_ip);
Expand Down
2 changes: 1 addition & 1 deletion pikatests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ function setup_pika_bin {
exit 1
fi
cp $PIKA_BIN src/redis-server
cp conf/pika.conf tests/assets/default.conf
cp tests/conf/pika.conf tests/assets/default.conf
}


Expand Down
17 changes: 13 additions & 4 deletions src/net/include/bg_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

#include <atomic>
#include <queue>

#include <functional>
#include "net/include/net_thread.h"

#include "pstd/include/pstd_mutex.h"
Expand Down Expand Up @@ -41,7 +41,7 @@ class BGThread final : public Thread {
}

void Schedule(void (*function)(void*), void* arg);

void Schedule(void (*function)(void*), void* arg, std::function<void()>& call_back);
/*
* timeout is in millionsecond
*/
Expand All @@ -52,13 +52,22 @@ class BGThread final : public Thread {
void SwallowReadyTasks();

private:
struct BGItem {
class BGItem {
public:
void (*function)(void*);
void* arg;
//dtor_call_back is an optional call back fun
std::function<void()> dtor_call_back;
BGItem(void (*_function)(void*), void* _arg) : function(_function), arg(_arg) {}
BGItem(void (*_function)(void*), void* _arg, std::function<void()>& _dtor_call_back) : function(_function), arg(_arg), dtor_call_back(_dtor_call_back) {}
~BGItem() {
if (dtor_call_back) {
dtor_call_back();
}
}
};

std::queue<BGItem> queue_;
std::queue<std::unique_ptr<BGItem>> queue_;
std::priority_queue<TimerItem> timer_queue_;

size_t full_;
Expand Down
27 changes: 17 additions & 10 deletions src/net/src/bg_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,9 @@
// of patent rights can be found in the PATENTS file in the same directory.

#include "net/include/bg_thread.h"
#include <sys/time.h>
#include <cstdlib>
#include <mutex>

#include "pstd/include/pstd_mutex.h"
#include "pstd/include/xdebug.h"

namespace net {

void BGThread::Schedule(void (*function)(void*), void* arg) {
Expand All @@ -19,11 +15,22 @@ void BGThread::Schedule(void (*function)(void*), void* arg) {
wsignal_.wait(lock, [this]() { return queue_.size() < full_ || should_stop(); });

if (!should_stop()) {
queue_.emplace(function, arg);
queue_.emplace(std::make_unique<BGItem>(function, arg));
rsignal_.notify_one();
}
}

void BGThread::Schedule(void (*function)(void*), void* arg, std::function<void()>& call_back) {
std::unique_lock lock(mu_);

wsignal_.wait(lock, [this]() { return queue_.size() < full_ || should_stop(); });

if (!should_stop()) {
queue_.emplace(std::make_unique<BGItem>(function, arg, call_back));
rsignal_.notify_one();
}
};

void BGThread::QueueSize(int* pri_size, int* qu_size) {
std::lock_guard lock(mu_);
*pri_size = static_cast<int32_t>(timer_queue_.size());
Expand All @@ -32,7 +39,7 @@ void BGThread::QueueSize(int* pri_size, int* qu_size) {

void BGThread::QueueClear() {
std::lock_guard lock(mu_);
std::queue<BGItem>().swap(queue_);
std::queue<std::unique_ptr<BGItem>>().swap(queue_);
std::priority_queue<TimerItem>().swap(timer_queue_);
wsignal_.notify_one();
}
Expand All @@ -42,10 +49,10 @@ void BGThread::SwallowReadyTasks() {
// while the schedule function would stop to add any tasks.
mu_.lock();
while (!queue_.empty()) {
auto [function, arg] = queue_.front();
std::unique_ptr<BGItem> task_item = std::move(queue_.front());
queue_.pop();
mu_.unlock();
(*function)(arg);
task_item->function(task_item->arg);
mu_.lock();
}
mu_.unlock();
Expand Down Expand Up @@ -96,11 +103,11 @@ void* BGThread::ThreadMain() {
}

if (!queue_.empty()) {
auto [function, arg] = queue_.front();
std::unique_ptr<BGItem> task_item = std::move(queue_.front());
queue_.pop();
wsignal_.notify_one();
lock.unlock();
(*function)(arg);
task_item->function(task_item->arg);
}
}
// swalloc all the remain tasks in ready and timer queue
Expand Down
Loading

0 comments on commit 1e0c753

Please sign in to comment.