Skip to content

Commit

Permalink
fix:delete consensus level (OpenAtomFoundation#1844)
Browse files Browse the repository at this point in the history
* delete_consensus_level
  • Loading branch information
chejinge authored Aug 1, 2023
1 parent 501483e commit 1d415a4
Show file tree
Hide file tree
Showing 8 changed files with 6 additions and 139 deletions.
4 changes: 0 additions & 4 deletions src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1087,10 +1087,6 @@ void InfoCmd::InfoReplication(std::string& info) {
tmp_stream << db_name << " binlog_offset=" << filenum << " " << offset;
s = master_slot->GetSafetyPurgeBinlog(&safety_purge);
tmp_stream << ",safety_purge=" << (s.ok() ? safety_purge : "error") << "\r\n";
if (g_pika_conf->consensus_level() != 0) {
LogOffset last_log = master_slot->ConsensusLastIndex();
tmp_stream << db_name << " consensus last_log=" << last_log.ToString() << "\r\n";
}
}
}

Expand Down
8 changes: 2 additions & 6 deletions src/pika_client_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ std::shared_ptr<Cmd> PikaClientConn::DoCmd(const PikaCmdArgsType& argv, const st
return c_ptr;
}
}

if (g_pika_conf->consensus_level() != 0 && c_ptr->is_write()) {
c_ptr->SetStage(Cmd::kBinlogStage);
}
Expand Down Expand Up @@ -276,11 +275,8 @@ void PikaClientConn::ExecRedisCmd(const PikaCmdArgsType& argv, const std::shared
}

std::shared_ptr<Cmd> cmd_ptr = DoCmd(argv, opt, resp_ptr);
// level == 0 or (cmd error) or (is_read)
if (g_pika_conf->consensus_level() == 0 || !cmd_ptr->res().ok() || !cmd_ptr->is_write()) {
*resp_ptr = std::move(cmd_ptr->res().message());
resp_num--;
}
*resp_ptr = std::move(cmd_ptr->res().message());
resp_num--;
}

// Initial permission status
Expand Down
28 changes: 1 addition & 27 deletions src/pika_consensus.cc
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ LogOffset SyncProgress::InternalCalCommittedIndex(const std::unordered_map<std::
}
std::vector<LogOffset> offsets;
offsets.reserve(match_index.size());
for (const auto& index : match_index) {
for (const auto& index : match_index) {
offsets.push_back(index.second);
}
std::sort(offsets.begin(), offsets.end());
Expand Down Expand Up @@ -275,9 +275,6 @@ ConsensusCoordinator::ConsensusCoordinator(const std::string& db_name, uint32_t
context_ = std::make_shared<Context>(log_path + kContext);
stable_logger_ = std::make_shared<StableLog>(db_name, slot_id, log_path);
mem_logger_ = std::make_shared<MemLog>();
if (g_pika_conf->consensus_level() != 0) {
Init();
}
}

ConsensusCoordinator::~ConsensusCoordinator() = default;
Expand Down Expand Up @@ -419,10 +416,6 @@ Status ConsensusCoordinator::InternalAppendLog(const BinlogItem& item, const std
if (!s.ok()) {
return s;
}
if (g_pika_conf->consensus_level() == 0) {
return Status::OK();
}
mem_logger_->AppendLog(MemLog::LogItem(log_offset, cmd_ptr, std::move(conn_ptr), std::move(resp_ptr)));
return Status::OK();
}

Expand All @@ -448,25 +441,6 @@ Status ConsensusCoordinator::ProcessLeaderLog(const std::shared_ptr<Cmd>& cmd_pt
}

Status ConsensusCoordinator::ProcessLocalUpdate(const LogOffset& leader_commit) {
if (g_pika_conf->consensus_level() == 0) {
return Status::OK();
}

LogOffset last_index = mem_logger_->last_offset();
LogOffset committed_index = last_index < leader_commit ? last_index : leader_commit;

LogOffset updated_committed_index;
bool need_update = false;
{
std::lock_guard l(index_mu_);
need_update = InternalUpdateCommittedIndex(committed_index, &updated_committed_index);
}
if (need_update) {
Status s = ScheduleApplyFollowerLog(updated_committed_index);
if (!s.ok()) {
return s;
}
}
return Status::OK();
}

Expand Down
11 changes: 0 additions & 11 deletions src/pika_repl_bgworker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -276,15 +276,4 @@ void PikaReplBgWorker::HandleBGWorkerWriteDB(void* arg) {
}
}
}


if (g_pika_conf->consensus_level() != 0) {
std::shared_ptr<SyncMasterSlot> slot =
g_pika_rm->GetSyncMasterSlotByName(SlotInfo(db_name, slot_id));
if (!slot) {
LOG(WARNING) << "Sync Master Slot not exist " << db_name << slot_id;
return;
}
slot->ConsensusUpdateAppliedIndex(offset);
}
}
21 changes: 0 additions & 21 deletions src/pika_repl_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -181,27 +181,6 @@ Status PikaReplClient::SendSlotTrySync(const std::string& ip, uint32_t port, con
binlog_offset->set_filenum(boffset.filenum);
binlog_offset->set_offset(boffset.offset);

if (g_pika_conf->consensus_level() != 0) {
std::shared_ptr<SyncMasterSlot> slot =
g_pika_rm->GetSyncMasterSlotByName(SlotInfo(db_name, slot_id));
if (!slot) {
return Status::Corruption("Slot not found");
}
LogOffset last_index = slot->ConsensusLastIndex();
uint32_t term = slot->ConsensusTerm();
LOG(INFO) << SlotInfo(db_name, slot_id).ToString() << " TrySync Increase self term from " << term
<< " to " << term + 1;
term++;
slot->ConsensusUpdateTerm(term);
InnerMessage::ConsensusMeta* consensus_meta = request.mutable_consensus_meta();
consensus_meta->set_term(term);
InnerMessage::BinlogOffset* pb_offset = consensus_meta->mutable_log_offset();
pb_offset->set_filenum(last_index.b_offset.filenum);
pb_offset->set_offset(last_index.b_offset.offset);
pb_offset->set_term(last_index.l_offset.term);
pb_offset->set_index(last_index.l_offset.index);
}

std::string to_send;
if (!request.SerializeToString(&to_send)) {
LOG(WARNING) << "Serialize Slot TrySync Request Failed, to Master (" << ip << ":" << port << ")";
Expand Down
25 changes: 0 additions & 25 deletions src/pika_repl_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -106,31 +106,6 @@ void PikaReplServer::BuildBinlogSyncResp(const std::vector<WriteTask>& tasks, In
BuildBinlogOffset(task.binlog_chip_.offset_, boffset);
binlog_sync->set_binlog(task.binlog_chip_.binlog_);
}
if (g_pika_conf->consensus_level() > 0) {
SlotInfo p_info;
if (!tasks.empty()) {
p_info = tasks[0].rm_node_.NodeSlotInfo();
} else {
LOG(WARNING) << "Task size is zero";
return;
}

// write consensus_meta
InnerMessage::ConsensusMeta* consensus_meta = response->mutable_consensus_meta();
InnerMessage::BinlogOffset* last_log = consensus_meta->mutable_log_offset();
BuildBinlogOffset(prev_offset, last_log);
// commit
LogOffset committed_index;
std::shared_ptr<SyncMasterSlot> slot = g_pika_rm->GetSyncMasterSlotByName(p_info);
if (!slot) {
LOG(WARNING) << "SyncSlot " << p_info.ToString() << " Not Found.";
return;
}
committed_index = slot->ConsensusCommittedIndex();
InnerMessage::BinlogOffset* committed = consensus_meta->mutable_commit();
BuildBinlogOffset(committed_index, committed);
consensus_meta->set_term(slot->ConsensusTerm());
}
}

pstd::Status PikaReplServer::Write(const std::string& ip, const int port, const std::string& msg) {
Expand Down
31 changes: 0 additions & 31 deletions src/pika_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -270,22 +270,6 @@ bool PikaServer::readonly(const std::string& db_name, const std::string& key) {
}

bool PikaServer::ConsensusCheck(const std::string& db_name, const std::string& key) {
if (g_pika_conf->consensus_level() != 0) {
std::shared_ptr<DB> db = GetDB(db_name);
if (!db) {
return false;
}
uint32_t index = g_pika_cmd_table_manager->DistributeKey(key, db->SlotNum());

std::shared_ptr<SyncMasterSlot> master_slot =
g_pika_rm->GetSyncMasterSlotByName(SlotInfo(db_name, index));
if (!master_slot) {
LOG(WARNING) << "Sync Master Slot: " << db_name << ":" << index << ", NotFound";
return false;
}
Status s = master_slot->ConsensusSanityCheck();
return s.ok();
}
return true;
}

Expand Down Expand Up @@ -447,14 +431,6 @@ bool PikaServer::IsDBSlotExist(const std::string& db_name, uint32_t slot_id) {
}

bool PikaServer::IsCommandSupport(const std::string& command) {
if (g_pika_conf->consensus_level() != 0) {
// dont support multi key command
// used the same list as sharding mode use
bool res = ConsensusNotSupportCommands.count(command) == 0U;
if (!res) {
return res;
}
}
return true;
}

Expand Down Expand Up @@ -613,10 +589,6 @@ Status PikaServer::DoSameThingEverySlot(const TaskType& type) {

void PikaServer::BecomeMaster() {
std::lock_guard l(state_protector_);
if ((role_ & PIKA_ROLE_MASTER) == 0 && g_pika_conf->write_binlog() && g_pika_conf->consensus_level() > 0) {
LOG(INFO) << "Become new master, start protect mode to waiting binlog sync and commit";
leader_protected_mode_ = true;
}
role_ |= PIKA_ROLE_MASTER;
}

Expand Down Expand Up @@ -1063,9 +1035,6 @@ void PikaServer::DbSyncSendFile(const std::string& ip, int port, const std::stri
fix.open(fn, std::ios::in | std::ios::trunc);
if (fix.is_open()) {
fix << "0s\n" << lip << "\n" << port_ << "\n" << binlog_filenum << "\n" << binlog_offset << "\n";
if (g_pika_conf->consensus_level() != 0) {
fix << term << "\n" << index << "\n";
}
fix.close();
}
ret = pstd::RsyncSendFile(fn, remote_path + "/" + kBgsaveInfoFile, secret_file_path, remote);
Expand Down
17 changes: 3 additions & 14 deletions src/pika_slot.cc
Original file line number Diff line number Diff line change
Expand Up @@ -228,11 +228,7 @@ bool Slot::TryUpdateMasterOffset() {
LOG(WARNING) << "Master Slot: " << slot_name_ << " not exist";
return false;
}
if (g_pika_conf->consensus_level() != 0) {
master_slot->ConsensusReset(LogOffset(BinlogOffset(filenum, offset), LogicOffset(term, index)));
} else {
master_slot->Logger()->SetProducerStatus(filenum, offset);
}
master_slot->Logger()->SetProducerStatus(filenum, offset);
slave_slot->SetReplState(ReplState::kTryConnect);
return true;
}
Expand Down Expand Up @@ -359,9 +355,6 @@ void Slot::DoBgSave(void* arg) {
<< g_pika_server->port() << "\n"
<< info.offset.b_offset.filenum << "\n"
<< info.offset.b_offset.offset << "\n";
if (g_pika_conf->consensus_level() != 0) {
info_content << info.offset.l_offset.term << "\n" << info.offset.l_offset.index << "\n";
}
bg_task_arg->slot->snapshot_uuid_ = md5(info_content.str());
out << info_content.rdbuf();
out.close();
Expand Down Expand Up @@ -439,12 +432,8 @@ bool Slot::InitBgsaveEngine() {
{
std::lock_guard lock(db_rwlock_);
LogOffset bgsave_offset;
if (g_pika_conf->consensus_level() != 0) {
bgsave_offset = slot->ConsensusAppliedIndex();
} else {
// term, index are 0
slot->Logger()->GetProducerStatus(&(bgsave_offset.b_offset.filenum), &(bgsave_offset.b_offset.offset));
}
// term, index are 0
slot->Logger()->GetProducerStatus(&(bgsave_offset.b_offset.filenum), &(bgsave_offset.b_offset.offset));
{
std::lock_guard l(bgsave_protector_);
bgsave_info_.offset = bgsave_offset;
Expand Down

0 comments on commit 1d415a4

Please sign in to comment.