Skip to content

Commit

Permalink
add zset limit length
Browse files Browse the repository at this point in the history
  • Loading branch information
jason committed Jan 2, 2020
1 parent cc454d3 commit 7fc07f7
Show file tree
Hide file tree
Showing 25 changed files with 1,143 additions and 66 deletions.
13 changes: 13 additions & 0 deletions pika/HISTORY.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,17 @@
# pika for codis
## 3.0.4-3.4 (2019-12-27)
### New Features
* zset数据类型支持限长功能
* 支持动态设置检测操作系统free内存周期

### optimize
* 调整min_free_kbytes为系统总内存的3%,目的是加快系统cache内存回收,避免free内存太少,导致写QPS较高时产生延时毛刺
* zcount/zrangebyscore/zremrangebyscore/zrevrangebyscore接口从用户传入的min值开始迭代,而不是从score最小值开始迭代
* zrange接口,当起始索引大于元素总量的70%时采用反向迭代;zrevrange接口,当起始索引大于元素总量30%,采用正向迭代

### bug fix
* 修复scan命令,如果每次使用相同的游标,会导致存储游标的list持续增大,发生内存泄露

## 3.0.4-3.3 (2019-10-31)
### optimize
* 优化GC算法,每次GC任务结束后继续递归判断是否需要GC,而不是依赖compact触发GC任务,提升GC速度
Expand Down
28 changes: 25 additions & 3 deletions pika/conf/pika.conf
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,24 @@ cache-maxmemory-samples: 5
# cache-lfu-decay-time
cache-lfu-decay-time: 1

########################
## Zset auto del setting
########################
# when the number of zset is more than the threshold, auto delete members. 0 means disable auto delete.
zset-auto-del-threshold : 0
# delete direction, 0 means delete from head, -1 means delete from tail.
zset-auto-del-direction : 0
# the number of deleted member
zset-auto-del-num : 1
# auto delete cron task, format: start-end, like 02-04, pika will check to schedule auto delete zset between 2 to 4 o'clock everyday
zset-auto-del-cron :
# auto delete interval, format: interval, like 6, pika will check to schedule auto delete zset every 6 hours.
zset-auto-del-interval : 0
# when delete a key members success, will sleep for a moment. sleep time = delete time * speed factor. you can set like 0.1 [0,1000]
zset-auto-del-cron-speed-factor : 0.1
# scan the number of keys from zset db at a time.
zset-auto-del-scan-round-num : 10000

###################
## Critical Settings
###################
Expand Down Expand Up @@ -152,16 +170,20 @@ disable-wal : no
use-direct-reads : no
# Use O_DIRECT for writes in background flush and compactions. [yes | no]
use-direct-io-for-flush-and-compaction : no
# check system left free memory interval. default is 60s.
check-free-mem-interval : 60
# If system free memory less than min-system-free-mem, clear system cached memory. 0 means disable clear cached memory. you can set like 1073741824(1G).
min-system-free-mem : 0
min-system-free-mem : 10737418240
# when set yes, set system min-free-kbytes to 3% of total memory. [yes | no]
optimize-min-free-kbytes : yes
# The max gc batch size at one time. default is 1073741824(1G)
max-gc-batch-size : 1073741824
# The ratio of how much discardable size of a blob file can be GC. default is 50, means 50% file size can be GC.
blob-file-discardable-ratio : 50
# gc-sample-cycle default is 604800(7day)
gc-sample-cycle : 604800
# max-gc-queue-size default is 64
max-gc-queue-size : 64
# max-gc-queue-size default is 2
max-gc-queue-size : 2
# BlockBasedTable block_size, default 4k
# block-size: 4096
# block LRU cache, 0 to disable. you can set like 268435456(256M).
Expand Down
23 changes: 22 additions & 1 deletion pika/include/pika_admin.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ class InfoCmd : public Cmd {
kInfoLog,
kInfoData,
kInfoCache,
kInfoZset,
kInfoAll
};

Expand All @@ -177,6 +178,8 @@ class InfoCmd : public Cmd {
const static std::string kLogSection;
const static std::string kDataSection;
const static std::string kCache;
const static std::string kZset;


virtual void DoInitial(PikaCmdArgsType &argvs, const CmdInfo* const ptr_info);
virtual void Clear() {
Expand All @@ -192,8 +195,10 @@ class InfoCmd : public Cmd {
void InfoLog(std::string &info);
void InfoData(std::string &info);
void InfoCache(std::string &info);
void InfoZset(std::string &info);

std::string CacheStatusToString(int status);
std::string TaskTypeToString(int task_type);
};

class ShutdownCmd : public Cmd {
Expand Down Expand Up @@ -263,7 +268,7 @@ class TcmallocCmd : public Cmd {
virtual void Do();
private:
int64_t type_;
int64_t rate_;
double rate_;
virtual void DoInitial(PikaCmdArgsType &argv, const CmdInfo* const ptr_info);
};
#endif
Expand Down Expand Up @@ -307,4 +312,20 @@ class CacheCmd : public Cmd {
}
};

class ZsetAutoDelCmd : public Cmd {
public:
virtual void Do();
private:
int64_t cursor_;
double speed_factor_;
virtual void DoInitial(PikaCmdArgsType &argv, const CmdInfo* const ptr_info);
};

class ZsetAutoDelOffCmd : public Cmd {
public:
virtual void Do();
private:
virtual void DoInitial(PikaCmdArgsType &argv, const CmdInfo* const ptr_info);
};

#endif
1 change: 0 additions & 1 deletion pika/include/pika_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,6 @@ class PikaCache
Status WriteSetToCache(std::string &key, std::vector<std::string> &members, int64_t ttl);
Status WriteZSetToCache(std::string &key, std::vector<blackwidow::ScoreMember> &score_members, int64_t ttl);
void PushKeyToAsyncLoadQueue(const char key_type, std::string &key);
std::string CacheStatusToString(int status);

private:
Status InitWithoutLock(uint32_t cache_num, dory::CacheConfig *cache_cfg);
Expand Down
2 changes: 2 additions & 0 deletions pika/include/pika_command.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ const std::string kCmdNameTcmalloc = "tcmalloc";
const std::string kCmdNameEcho = "echo";
const std::string kCmdNameSlowlog = "slowlog";
const std::string kCmdNameCache = "cache";
const std::string kCmdNameZsetAutoDel = "zsetautodel";
const std::string kCmdNameZsetAutoDelOff = "zsetautodeloff";

//Migrate slot
const std::string kCmdNameSlotsMgrtSlot = "slotsmgrtslot";
Expand Down
5 changes: 5 additions & 0 deletions pika/include/pika_commonfunc.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#define PIKA_COMMONFUNC_H_

#include <cstdint>
#include <string>

#include "pink/include/pink_cli.h"

Expand All @@ -14,6 +15,10 @@ class PikaCommonFunc
static uint32_t CRC32CheckSum(const char *buf, int len);

static bool DoAuth(pink::PinkCli *client, const std::string requirepass);

static void BinlogPut(const std::string &key, const std::string &raw_args);

static std::string TimestampToDate(int64_t timestamp);

private:
PikaCommonFunc();
Expand Down
29 changes: 29 additions & 0 deletions pika/include/pika_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,22 @@ class PikaConf : public slash::BaseConf {
bool disable_wal() { return disable_wal_; }
bool use_direct_reads() { return use_direct_reads_; }
bool use_direct_io_for_flush_and_compaction() { return use_direct_io_for_flush_and_compaction_; }
int check_free_mem_interval() { return check_free_mem_interval_; }
int64_t min_system_free_mem() { return min_system_free_mem_; }
bool optimize_min_free_kbytes() { return optimize_min_free_kbytes_; }
int64_t max_gc_batch_size() { return max_gc_batch_size_; }
int blob_file_discardable_ratio() { return blob_file_discardable_ratio_; }
int64_t gc_sample_cycle() { return gc_sample_cycle_; }
int max_gc_queue_size() { return max_gc_queue_size_; }

int zset_auto_del_threshold() { return zset_auto_del_threshold_; }
int zset_auto_del_direction() { return zset_auto_del_direction_; }
int zset_auto_del_num() { return zset_auto_del_num_; }
std::string zset_auto_del_cron(){ RWLock l(&rwlock_, false); return zset_auto_del_cron_; }
int zset_auto_del_interval() { return zset_auto_del_interval_; }
double zset_auto_del_cron_speed_factor() { return zset_auto_del_cron_speed_factor_; }
int zset_auto_del_scan_round_num() { return zset_auto_del_scan_round_num_; }

// Setter
void SetPort(const int value) { port_ = value; }
void SetThreadNum(const int value) { thread_num_ = value; }
Expand Down Expand Up @@ -164,11 +174,20 @@ class PikaConf : public slash::BaseConf {
void SetCacheLFUDecayTime(const int value) { cache_lfu_decay_time_ = value; }
void SetRateBytesPerSec(const int64_t value) { rate_bytes_per_sec_ = value; }
void SetDisableWAL(const bool value) { disable_wal_ = value; }
void SetCheckFreeMemInterval(const int value) { check_free_mem_interval_ = value; }
void SetMinSystemFreeMem(const int64_t value) { min_system_free_mem_ = value; }
void SetOptimizeMinFreeKbytes(const bool value) { optimize_min_free_kbytes_ = value; }
void SetMaxGCBatchSize(const int64_t value) { max_gc_batch_size_ = value; }
void SetBlobFileDiscardableRatio(const int value) { blob_file_discardable_ratio_ = value; }
void SetGCSampleCycle(const int64_t value) { gc_sample_cycle_ = value; }
void SetMaxGCQueueSize(const int value) { max_gc_queue_size_ = value; }
void SetZsetAutoDelThreshold(const int value) { zset_auto_del_threshold_ = value; }
void SetZsetAutoDelDirection(const int value) { zset_auto_del_direction_ = value; }
void SetZsetAutoDelNum(const int value) { zset_auto_del_num_ = value; }
void SetZsetAutoDelCron(const std::string &value) { RWLock l(&rwlock_, true); zset_auto_del_cron_ = value; }
void SetZsetAutoDelInterval(const int value) { zset_auto_del_interval_ = value; }
void SetZsetAutoDelCronSpeedFactor(const double value) { zset_auto_del_cron_speed_factor_ = value; }
void SetZsetAutoDelScanRoundNum(const int value){ zset_auto_del_scan_round_num_ = value; }

int Load();
int ConfigRewrite();
Expand Down Expand Up @@ -250,12 +269,22 @@ class PikaConf : public slash::BaseConf {
std::atomic<bool> disable_wal_;
std::atomic<bool> use_direct_reads_;
std::atomic<bool> use_direct_io_for_flush_and_compaction_;
std::atomic<int> check_free_mem_interval_;
std::atomic<int64_t> min_system_free_mem_;
std::atomic<bool> optimize_min_free_kbytes_;
std::atomic<int64_t> max_gc_batch_size_;
std::atomic<int> blob_file_discardable_ratio_;
std::atomic<int64_t> gc_sample_cycle_;
std::atomic<int> max_gc_queue_size_;

std::atomic<int> zset_auto_del_threshold_;
std::atomic<int> zset_auto_del_direction_;
std::atomic<int> zset_auto_del_num_;
std::string zset_auto_del_cron_;
std::atomic<int> zset_auto_del_interval_;
std::atomic<double> zset_auto_del_cron_speed_factor_;
std::atomic<int> zset_auto_del_scan_round_num_;

pthread_rwlock_t rwlock_;
slash::Mutex config_mutex_;
};
Expand Down
2 changes: 1 addition & 1 deletion pika/include/pika_define.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
#define PIKA_DEFINE_H_


#define PIKA_MAX_WORKER_THREAD_NUM 24
#define PIKA_MAX_WORKER_THREAD_NUM 128

const std::string kPikaPidFile = "pika.pid";

Expand Down
12 changes: 12 additions & 0 deletions pika/include/pika_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "pika_monitor_thread.h"
#include "pika_migrate_thread.h"
#include "pika_binlog_writer_thread.h"
#include "pika_zset_auto_del_thread.h"
#include "pika_define.h"
#include "pika_binlog_bgworker.h"
#include "pika_slot.h"
Expand Down Expand Up @@ -164,6 +165,7 @@ class PikaServer {
void DoTimingTask();
void DoFreshInfoTimingTask();
void DoClearSysCachedMemory();
void DoAutoDelZsetMember();

PikaSlavepingThread* ping_thread_;

Expand Down Expand Up @@ -548,6 +550,11 @@ class PikaServer {
int CacheStatus(void);
static void DoCacheBGTask(void* arg);

// for manual zset del
Status ZsetAutoDel(int64_t cursor, double speed_factor);
Status ZsetAutoDelOff();
void GetZsetInfo(ZsetInfo &info);

private:
std::atomic<bool> exit_;
std::atomic<bool> binlog_io_error_;
Expand Down Expand Up @@ -672,6 +679,11 @@ class PikaServer {
std::vector<BinlogBGWorker*> binlogbg_workers_;
std::hash<std::string> str_hash;

/*
* Auto delete zset use
*/
PikaZsetAutoDelThread* pika_zset_auto_del_thread_;

/*
* for statistic
*/
Expand Down
2 changes: 1 addition & 1 deletion pika/include/pika_version.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,6 @@
#define PIKA_MINOR 0
#define PIKA_PATCH 4
#define PIKA_XMLY_MAJOR 3
#define PIKA_XMLY_MINOR 3
#define PIKA_XMLY_MINOR 4

#endif // INCLUDE_PIKA_VERSION_H_
84 changes: 84 additions & 0 deletions pika/include/pika_zset_auto_del_thread.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
#ifndef PIKA_ZSET_AUTO_DEL_THREAD_H_
#define PIKA_ZSET_AUTO_DEL_THREAD_H_

#include <atomic>
#include <deque>

#include "blackwidow/blackwidow.h"
#include "pink/include/pink_thread.h"
#include "slash/include/slash_mutex.h"


enum ZsetTaskType {
ZSET_NO_TASK,
ZSET_CRON_TASK,
ZSET_MANUAL_TASK
};

struct ZsetInfo {
int64_t last_finish_time;
int64_t last_spend_time;
int64_t last_all_keys_num;
int64_t last_del_keys_num;
bool last_compact_zset_db;
int current_task_type;
int64_t current_task_start_time;
int64_t current_task_spend_time;
int64_t current_cursor;
};

struct ZsetTaskItem {
ZsetTaskType task_type;
int64_t cursor;
double speed_factor;
ZsetTaskItem()
: task_type(ZSET_NO_TASK)
, cursor(0)
, speed_factor(1) {}
};

class PikaZsetAutoDelThread : public pink::Thread
{
public:
PikaZsetAutoDelThread();
~PikaZsetAutoDelThread();

void RequestCronTask();
void RequestManualTask(int64_t cursor, double speed_factor);
void StopManualTask();
void GetZsetInfo(ZsetInfo &info);
int64_t LastFinishCheckAllZsetTime() { return last_finish_check_all_zset_time_; }

private:
void CompactZsetDB();
void TrimAllZsetKeysFinished();
void WriteZsetAutoDelBinlog(const std::string &key, int start, int end);
bool BatchTrimZsetKeys(double speed_factor);
void DoZsetCronTask(double speed_factor);
void DoZsetManualTask(int64_t cursor, double speed_factor);
void DoZsetAutoDelTask(ZsetTaskItem &task_item);
virtual void* ThreadMain();

private:
std::atomic<bool> should_exit_;
slash::CondVar task_cond_;
slash::Mutex mutex_;
std::deque<ZsetTaskItem> task_queue_;

std::atomic<ZsetTaskType> current_task_type_;
std::atomic<int64_t> current_cursor_;
std::atomic<bool> stop_manual_task_;

int64_t zset_db_keys_num_;
int64_t auto_del_keys_num_;
std::atomic<int64_t> start_check_all_zset_time_;
std::atomic<int64_t> last_finish_check_all_zset_time_;

// for zset info use
std::atomic<int64_t> last_spend_time_;
std::atomic<int64_t> last_all_keys_num_;
std::atomic<int64_t> last_del_keys_num_;
std::atomic<bool> last_compact_zset_db_;
};

#endif
Loading

0 comments on commit 7fc07f7

Please sign in to comment.