Skip to content

Commit

Permalink
fix bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
jason committed Feb 19, 2021
1 parent cb59b3d commit 04e9130
Show file tree
Hide file tree
Showing 461 changed files with 130,819 additions and 109 deletions.
26 changes: 26 additions & 0 deletions pika/HISTORY.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,30 @@
# pika for codis
## 3.0.4-4.1 (2020-12-7)
### optimize
* 支持单独设置binlog路径,避免binlog和普通日志文件耦合在一起
* 调整异步写binlog队列大小限制,binlog队列阻塞时每5s输出一次日志
* slowlog计数器不中断
* 异步加载key时,队列满后每5s输出一次日志
* rocksdb compact删除数据时忽略快照,避免读命令高时,垃圾数据无法删除
* 细化慢日志处理时间

## 3.0.4-4.0 (2020-10-26)
### optimize
* 支持redis rdb版本为7的数据格式slot迁移到pika,跳过了quicklist结构,后续会支持

## 3.0.4-3.9 (2020-9-3)
### bug fix
* 修复slave-priority配置参数读取错误问题

## 3.0.4-3.8 (2020-7-27)
### New Features
* rocksdb日志移到pika日志目录下
* 静态链接lzma库
* 限制单条慢日志大小为256字节(之前为1kb)

### bug fix
* 修复发送异常二进制流命令,可能导致进程崩溃问题

## 3.0.4-3.7 (2020-6-8)
### optimize
* 优化gc加锁流程,解决gc时锁竞争导致读命令产生的延时抖动
Expand Down
75 changes: 41 additions & 34 deletions pika/Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
CLEAN_FILES = # deliberately empty, so we can append below.
CXX=g++
PLATFORM_LDFLAGS= -lpthread -lrt -llzma
PLATFORM_LDFLAGS= -lpthread -lrt
PLATFORM_CXXFLAGS= -std=c++11 -fno-builtin-memcmp -msse -msse4.2
PROFILING_FLAGS=-pg
OPT=
Expand Down Expand Up @@ -115,50 +115,53 @@ ifndef UNWIND_PATH
UNWIND_PATH=$(THIRD_PATH)/libunwind-1.3.1
endif





ifndef LZMA_PATH
LZMA_PATH=$(THIRD_PATH)/xz-5.0.4
endif


ifeq ($(static_compile_flag), 1)
GLOG := $(GLOG_PATH)/.libs/libglog.a
SNAPPY :=$(SNAPPY_PATH)/.libs/libsnappy.a
TCMALLOC := $(TCMALLOC_PATH)/.libs/libtcmalloc.a
UNWIND := $(UNWIND_PATH)/src/.libs/libunwind.a
LZMA := $(LZMA_PATH)/src/liblzma/.libs/liblzma.a
endif

INCLUDE_PATH = -I./include \
-I$(SLASH_PATH)/ \
-I$(PINK_PATH)/ \
-I$(BLACKWIDOW_PATH)/include/ \
-I$(DORY_PATH)/include \
-I$(ROCKSDB_PATH)/ \
-I$(ROCKSDB_PATH)/include \
-I$(REDISDB_PATH) \
-I$(UTILS_PATH) \
-I$(SLASH_PATH)/ \
-I$(PINK_PATH)/ \
-I$(BLACKWIDOW_PATH)/include/ \
-I$(DORY_PATH)/include \
-I$(ROCKSDB_PATH)/ \
-I$(ROCKSDB_PATH)/include \
-I$(REDISDB_PATH) \
-I$(UTILS_PATH) \

ifeq ($(static_compile_flag),1)
INCLUDE_PATH += -I$(GLOG_PATH)/src/
INCLUDE_PATH += -I$(SNAPPY_PATH)/
INCLUDE_PATH += -I$(UNWIND_PATH)/include/
INCLUDE_PATH += -I$(UNWIND_PATH)/src/
INCLUDE_PATH += -I$(TCMALLOC_PATH)/src/
INCLUDE_PATH += -I$(LZMA_PATH)/src/
INCLUDE_PATH += -I$(LZMA_PATH)/src/liblzma/api/
endif

LIB_PATH = -L./ \
-L$(SLASH_PATH)/slash/lib/ \
-L$(PINK_PATH)/pink/lib/ \
-L$(BLACKWIDOW_PATH)/lib/ \
-L$(DORY_PATH)/lib \
-L$(ROCKSDB_PATH)/ \
-L$(REDISDB_PATH)
-L$(SLASH_PATH)/slash/lib/ \
-L$(PINK_PATH)/pink/lib/ \
-L$(BLACKWIDOW_PATH)/lib/ \
-L$(DORY_PATH)/lib \
-L$(ROCKSDB_PATH)/ \
-L$(REDISDB_PATH)

ifeq ($(static_compile_flag),1)
LIB_PATH += -L$(GLOG_PATH)/.libs/
LIB_PATH += -L$(SNAPPY_PATH)/.libs/
LIB_PATH += -L$(UNWIND_PATH)/src/.libs/
LIB_PATH += -L$(TCMALLOC_PATH)/.libs/
LIB_PATH += -L$(LZMA_PATH)/src/liblzma/.libs/
endif


Expand All @@ -167,16 +170,17 @@ LDFLAGS += $(LIB_PATH)
ifeq ($(static_compile_flag),1)
LDFLAGS += -Wl,-Bstatic
endif
LDFLAGS += -lpink$(DEBUG_SUFFIX) \
-lblackwidow$(DEBUG_SUFFIX) \
-ldory$(DEBUG_SUFFIX) \
-lrocksdb$(DEBUG_SUFFIX) \
-lredisdb$(DEBUG_SUFFIX) \
-lslash$(DEBUG_SUFFIX) \
-lglog \
-ltcmalloc \
-lunwind \
-lsnappy \
LDFLAGS += -lpink$(DEBUG_SUFFIX) \
-lblackwidow$(DEBUG_SUFFIX) \
-ldory$(DEBUG_SUFFIX) \
-lrocksdb$(DEBUG_SUFFIX) \
-lredisdb$(DEBUG_SUFFIX) \
-lslash$(DEBUG_SUFFIX) \
-lglog \
-ltcmalloc \
-lunwind \
-lsnappy \
-llzma

ifeq ($(static_compile_flag),1)
LDFLAGS += -Wl,-Bdynamic
Expand Down Expand Up @@ -270,7 +274,7 @@ all: $(BINARY)

dbg: $(BINARY)

$(BINARY): $(PINK) $(ROCKSDB) $(REDISDB) $(BLACKWIDOW) $(DORY) $(SLASH) $(GLOG) $(LIBOBJECTS) $(OBJS) $(SNAPPY) $(TCMALLOC) $(UNWIND)
$(BINARY): $(PINK) $(ROCKSDB) $(REDISDB) $(BLACKWIDOW) $(DORY) $(SLASH) $(GLOG) $(LIBOBJECTS) $(OBJS) $(SNAPPY) $(TCMALLOC) $(UNWIND) $(LZMA)
$(AM_V_at)rm -f $@
$(AM_V_at)$(AM_LINK)
$(AM_V_at)rm -rf $(OUTPUT)
Expand Down Expand Up @@ -298,16 +302,19 @@ $(DORY):
$(AM_V_at)make -C $(DORY_PATH) BLACKWIDOW_PATH=$(BLACKWIDOW_PATH) SLASH_PATH=$(SLASH_PATH) ROCKSDB_PATH=$(ROCKSDB_PATH) REDISDB_PATH=$(REDISDB_PATH) DEBUG_LEVEL=$(DEBUG_LEVEL)

$(GLOG):
cd $(THIRD_PATH)/glog; if [ ! -f ./Makefile ]; then ./configure --disable-shared; fi; make; echo '*' > $(CURDIR)/third/glog/.gitignore;
cd $(GLOG_PATH); if [ ! -f ./Makefile ]; then ./configure --disable-shared; fi; make; echo '*' > $(CURDIR)/third/glog/.gitignore;

$(SNAPPY):
cd $(THIRD_PATH)/snappy-1.1.4; if [ ! -f ./Makefile ]; then ./autogen.sh; ./configure --with-pic --enable-static --disable-shared; fi; make;
cd $(SNAPPY_PATH); if [ ! -f ./Makefile ]; then ./autogen.sh; ./configure --with-pic --enable-static --disable-shared; fi; make;

$(TCMALLOC):
cd $(THIRD_PATH)/gperftools-2.6.3; if [ ! -f ./Makefile ]; then ./autogen.sh;./configure; fi; make -j$(PROCESSOR_NUMS);
cd $(TCMALLOC_PATH); if [ ! -f ./Makefile ]; then ./autogen.sh; ./configure; fi; make -j$(PROCESSOR_NUMS);

$(UNWIND):
cd $(THIRD_PATH)/libunwind-1.3.1; if [ ! -f ./Makefile ]; then ./autogen.sh;./configure; fi; make -j$(PROCESSOR_NUMS);
cd $(UNWIND_PATH); if [ ! -f ./Makefile ]; then ./autogen.sh; ./configure; fi; make -j$(PROCESSOR_NUMS);

$(LZMA):
cd $(LZMA_PATH); if [ ! -f ./Makefile ]; then ./configure; fi; make -j$(PROCESSOR_NUMS);

clean:
rm -rf $(OUTPUT)
Expand Down
8 changes: 5 additions & 3 deletions pika/conf/pika.conf
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ thread-num : 1
sync-thread-num : 6
# Item count of sync thread queue
sync-buffer-size : 10
# Pika log path
# Pika and Rocksdb log path
log-path : ./log/
# Pika binlog path
binlog-path : ./binlog/
# Pika glog level: only INFO and ERROR
loglevel : info
# Pika glog size : default is 1800MB
Expand Down Expand Up @@ -56,8 +58,8 @@ max-bytes-for-level-base : 268435456
expire-logs-days : 7
# Expire-logs-nums
expire-logs-nums : 10
# binlog_writer_queue_size is the max size for binlog_queue, default is 1000
binlog-writer-queue-size : 1000
# binlog_writer_queue_size is the max size for binlog_queue, default is 10000
binlog-writer-queue-size : 10000
# binlog_writer working mode: only sync and async.
binlog-writer-method : async
# Number of binlog-writer thread
Expand Down
7 changes: 6 additions & 1 deletion pika/include/pika_client_conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@
#ifndef PIKA_CLIENT_CONN_H_
#define PIKA_CLIENT_CONN_H_

#include <glog/logging.h>
#include <atomic>

#include <glog/logging.h>

#include "pink/include/redis_conn.h"
#include "pink/include/pink_thread.h"
#include "slash/include/slash_mutex.h"
#include "pika_command.h"

class PikaWorkerSpecificData;
Expand Down Expand Up @@ -50,6 +52,9 @@ class PikaClientConn: public pink::RedisConn {
CmdTable* const cmds_table_;
bool is_pubsub_;

static std::atomic<uint64_t> slowlog_count_;
static slash::Mutex slowlog_mutex_;

std::string DoCmd(const PikaCmdArgsType& argv,
const std::string& opt,
uint64_t recv_cmd_time_us);
Expand Down
2 changes: 2 additions & 0 deletions pika/include/pika_commonfunc.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ class PikaCommonFunc
static void BinlogPut(const std::string &key, const std::string &raw_args);

static std::string TimestampToDate(int64_t timestamp);

static std::string AppendSubDirectory(const std::string& db_path, const std::string& sub_path);

private:
PikaCommonFunc();
Expand Down
2 changes: 2 additions & 0 deletions pika/include/pika_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class PikaConf : public slash::BaseConf {
int log_level() { return log_level_; }
int max_log_size() { return max_log_size_; }
std::string db_path() { RWLock l(&rwlock_, false); return db_path_; }
std::string binlog_path() { RWLock l(&rwlock_, false); return binlog_path_; }
std::string db_sync_path() { RWLock l(&rwlock_, false); return db_sync_path_; }
int db_sync_speed() { return db_sync_speed_; }
std::string compact_cron() { RWLock l(&rwlock_, false); return compact_cron_; }
Expand Down Expand Up @@ -217,6 +218,7 @@ class PikaConf : public slash::BaseConf {
std::atomic<int> sync_buffer_size_;
std::string log_path_;
std::string db_path_;
std::string binlog_path_;
std::string db_sync_path_;
std::atomic<int> expire_dump_days_;
std::atomic<int> db_sync_speed_;
Expand Down
6 changes: 3 additions & 3 deletions pika/include/pika_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ class PikaServer {
void SlowlogReset(void);
uint32_t SlowlogLen(void);
void SlowlogObtain(int64_t number, std::vector<SlowlogEntry>* slowlogs);
void SlowlogPushEntry(const PikaCmdArgsType& argv, int32_t time, int64_t duration);
void SlowlogPushEntry(const PikaCmdArgsType& argv, uint64_t id, int32_t time, int64_t duration);

PikaSlowlog *slowlog_;

Expand Down Expand Up @@ -465,8 +465,8 @@ class PikaServer {
void PlusThreadQuerynum();
uint64_t ServerQueryNum();
uint64_t ServerCurrentQps();
uint32_t FastThreadPoolSize();
uint32_t SlowThreadPoolSize();
uint32_t FastThreadPoolTasks();
uint32_t SlowThreadPoolTasks();
void ResetLastSecQuerynum(); /* Invoked in PikaDispatchThread's CronHandle */
uint64_t accumulative_connections() {
return statistic_data_.accumulative_connections;
Expand Down
5 changes: 2 additions & 3 deletions pika/include/pika_slowlog.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@


struct SlowlogEntry {
int64_t id;
uint64_t id;
int64_t start_time;
int64_t duration;
PikaCmdArgsType argv;
Expand All @@ -21,7 +21,7 @@ class PikaSlowlog
PikaSlowlog();
~PikaSlowlog();

void Push(const PikaCmdArgsType& argv, int32_t time, int64_t duration);
void Push(const PikaCmdArgsType& argv, uint64_t id, int32_t time, int64_t duration);
void GetInfo(uint32_t number, std::vector<SlowlogEntry> *slowlogs);
void Trim(void);
void Reset(void);
Expand All @@ -32,7 +32,6 @@ class PikaSlowlog
PikaSlowlog& operator=(const PikaSlowlog&);

private:
std::atomic<uint64_t> entry_id_;
std::list<SlowlogEntry> slowlog_list_;
slash::Mutex slowlog_mutex_;
};
Expand Down
4 changes: 2 additions & 2 deletions pika/include/pika_version.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
#define PIKA_MAJOR 3
#define PIKA_MINOR 0
#define PIKA_PATCH 4
#define PIKA_XMLY_MAJOR 3
#define PIKA_XMLY_MINOR 7
#define PIKA_XMLY_MAJOR 4
#define PIKA_XMLY_MINOR 1

#endif // INCLUDE_PIKA_VERSION_H_
19 changes: 13 additions & 6 deletions pika/src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -611,8 +611,8 @@ void InfoCmd::InfoStats(std::string &info) {

tmp_stream << "total_connections_received:" << g_pika_server->accumulative_connections() << "\r\n";
tmp_stream << "instantaneous_ops_per_sec:" << g_pika_server->ServerCurrentQps() << "\r\n";
tmp_stream << "fast_thread_pool_size:" << g_pika_server->FastThreadPoolSize() << "\r\n";
tmp_stream << "slow_thread_pool_size:" << g_pika_server->SlowThreadPoolSize() << "\r\n";
tmp_stream << "fast_thread_pool_tasks:" << g_pika_server->FastThreadPoolTasks() << "\r\n";
tmp_stream << "slow_thread_pool_tasks:" << g_pika_server->SlowThreadPoolTasks() << "\r\n";
tmp_stream << "total_commands_processed:" << g_pika_server->ServerQueryNum() << "\r\n";
PikaServer::BGSaveInfo bgsave_info = g_pika_server->bgsave_info();
bool is_bgsaving = g_pika_server->bgsaving();
Expand Down Expand Up @@ -984,6 +984,12 @@ void ConfigCmd::ConfigGet(std::string &ret) {
EncodeString(&config_body, "log-path");
EncodeString(&config_body, g_pika_conf->log_path());
}

if (slash::stringmatch(pattern.data(), "binlog-path", 1)) {
elements += 2;
EncodeString(&config_body, "binlog-path");
EncodeString(&config_body, g_pika_conf->binlog_path());
}

if (slash::stringmatch(pattern.data(), "loglevel", 1)) {
elements += 2;
Expand Down Expand Up @@ -1657,13 +1663,14 @@ void ConfigCmd::ConfigSet(std::string& ret) {
g_pika_conf->SetWriteBinlog(write_binlog);
ret = "+OK\r\n";
} else if (set_item == "binlog-writer-queue-size") {
if (!slash::string2l(value.data(), value.size(), &ival) || ival <= 0 || ival > 10000) {
if (!slash::string2l(value.data(), value.size(), &ival) || ival <= 0) {
ret = "-ERR Invalid argument " + value + " for CONFIG SET 'binlog-writer-queue-size'\r\n";
return;
}
g_pika_conf->SetBinlogWriterQueueSize(ival);
int tmp_val = (1 > ival || 100000 < ival) ? 10000 : ival;
g_pika_conf->SetBinlogWriterQueueSize(tmp_val);
for (int i=0; i<g_pika_conf->binlog_writer_num(); i++) {
g_pika_server->binlog_write_thread_[i]->SetMaxCmdsQueueSize(ival);
g_pika_server->binlog_write_thread_[i]->SetMaxCmdsQueueSize(tmp_val);
}
ret = "+OK\r\n";
} else if (set_item == "root-connection-num") {
Expand Down Expand Up @@ -1866,7 +1873,7 @@ void ConfigCmd::ConfigSet(std::string& ret) {
}
ret = "+OK\r\n";
} else if (set_item == "slave-priority") {
if (!slash::string2l(value.data(), value.size(), &ival) || ival <= 0) {
if (!slash::string2l(value.data(), value.size(), &ival) || ival < 0) {
ret = "-ERR Invalid argument " + value + " for CONFIG SET 'slave-priority'\r\n";
return;
}
Expand Down
7 changes: 7 additions & 0 deletions pika/src/pika_binlog_writer_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,15 @@ Status PikaBinlogWriterThread::WriteBinlog(const std::string &raw_args, bool is_
SetBinlogIoError(true);
}
} else {
static uint64_t last_log_time_us = 0;
slash::MutexLock lm(&binlog_mutex_protector_);
while (cmds_deque_.size() >= max_cmds_deque_size_) {
// 5s打印一次日志
if (slash::NowMicros() - last_log_time_us > 5000000) {
LOG(WARNING) << "PikaBinlogWriterThread::WriteBinlog waiting...";
last_log_time_us = slash::NowMicros();
}

binlog_write_cond_.Wait();
}

Expand Down
10 changes: 5 additions & 5 deletions pika/src/pika_cache_load_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
#define CACHE_LOAD_QUEUE_MAX_SIZE 2048
#define CACHE_VALUE_ITEM_MAX_SIZE 2048
#define CACHE_LOAD_NUM_ONE_TIME 256
#define WRITE_LOG_CRON 10

extern PikaServer *g_pika_server;

Expand Down Expand Up @@ -40,10 +39,11 @@ PikaCacheLoadThread::Push(const char key_type, std::string &key)
slash::MutexLock lm(&loadkeys_map_mutex_);

if (CACHE_LOAD_QUEUE_MAX_SIZE < loadkeys_queue_.size()) {
static int push_couter = 0;
if (WRITE_LOG_CRON == push_couter++) {
LOG(WARNING) << "PikaCacheLoadThread::Push key:" << key << " failed, becasue queue full. max_size:" << CACHE_LOAD_QUEUE_MAX_SIZE;
push_couter = 0;
// 5s打印一次日志
static uint64_t last_log_time_us = 0;
if (slash::NowMicros() - last_log_time_us > 5000000) {
LOG(WARNING) << "PikaCacheLoadThread::Push waiting...";
last_log_time_us = slash::NowMicros();
}
return;
}
Expand Down
Loading

0 comments on commit 04e9130

Please sign in to comment.