Skip to content

Commit

Permalink
Initial efforts for sending noops commands (#760)
Browse files Browse the repository at this point in the history
  • Loading branch information
yontyon authored Jul 22, 2020
1 parent cda1764 commit c621a10
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 1 deletion.
2 changes: 1 addition & 1 deletion bftengine/src/bftengine/ControlStateManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ namespace bftEngine {

void ControlStateManager::setStopAtNextCheckpoint() {}

std::optional<uint64_t> ControlStateManager::getStopCheckpointToStopAt() { return UINT64_MAX; }
std::optional<uint64_t> ControlStateManager::getStopCheckpointToStopAt() { return {}; }

ControlStateManager::ControlStateManager(IStateTransfer& state_transfer) : state_transfer_{state_transfer} {
state_transfer_.getStatus(); // Temporary, to escape not-used compilation error
Expand Down
1 change: 1 addition & 0 deletions bftengine/src/bftengine/ReplicaBase.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ class ReplicaBase {
///////////////////////////////////////////////////
// ControlStateManger
std::shared_ptr<ControlStateManager> controlStateManager_;
bool stopAtNextCheckpoint_ = false;
};

} // namespace bftEngine::impl
27 changes: 27 additions & 0 deletions bftengine/src/bftengine/ReplicaImp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,12 @@ void ReplicaImp::tryToSendPrePrepareMsg(bool batchingLogic) {
}

pp->finishAddingRequests();
startConsensusProcess(pp);
}

void ReplicaImp::startConsensusProcess(PrePrepareMsg *pp) {
if (!isCurrentPrimary()) return;
auto firstPath = pp->firstPath();
if (config_.debugStatisticsEnabled) {
DebugStatistics::onSendPrePrepareMessage(pp->numberOfRequests(), requestsQueueOfPrimary.size());
}
Expand Down Expand Up @@ -407,6 +412,22 @@ void ReplicaImp::tryToSendPrePrepareMsg(bool batchingLogic) {
}
}

void ReplicaImp::sendInternalNoopPrePrepareMsg(CommitPath firstPath) {
PrePrepareMsg *pp = new PrePrepareMsg(config_.replicaId, curView, (primaryLastUsedSeqNum + 1), firstPath, 0);
startConsensusProcess(pp);
}

void ReplicaImp::bringTheSystemToTheNextCheckpointBySendingNoopCommands(CommitPath firstPath) {
if (!isCurrentPrimary()) return;
// TODO: According to Ittai, it is better to reach to the next next checkpoint to prevent the follwing:
// 1. The current sequence number is 290 (next checkpoint is 300)
// 2. We decide on 291 --> 291 + concurrency level - 1 (say 29)
// 3. We decide on 290
while (primaryLastUsedSeqNum % checkpointWindowSize != 0) {
sendInternalNoopPrePrepareMsg(firstPath);
}
}

template <typename T>
bool ReplicaImp::relevantMsgForActiveView(const T *msg) {
const SeqNum msgSeqNum = msg->seqNumber();
Expand Down Expand Up @@ -3426,6 +3447,12 @@ void ReplicaImp::executeNextCommittedRequests(concordUtils::SpanWrapper &parent_
auto span = concordUtils::startChildSpan("bft_execute_next_committed_requests", parent_span);

while (lastExecutedSeqNum < lastStableSeqNum + kWorkWindowSize) {
if (!stopAtNextCheckpoint_ && controlStateManager_->getStopCheckpointToStopAt().has_value()) {
// If, following the last execution, we discover that we need to jump to the
// next checkpoint, the primary sends noop commands until filling the working window.
bringTheSystemToTheNextCheckpointBySendingNoopCommands();
stopAtNextCheckpoint_ = true;
}
SeqNum nextExecutedSeqNum = lastExecutedSeqNum + 1;
SeqNumInfo &seqNumInfo = mainLog->get(nextExecutedSeqNum);

Expand Down
3 changes: 3 additions & 0 deletions bftengine/src/bftengine/ReplicaImp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,9 @@ class ReplicaImp : public InternalReplicaApi, public ReplicaForStateTransfer {

private:
void addTimers();
void startConsensusProcess(PrePrepareMsg* pp);
void sendInternalNoopPrePrepareMsg(CommitPath firstPath = CommitPath::SLOW);
void bringTheSystemToTheNextCheckpointBySendingNoopCommands(CommitPath firstPath = CommitPath::SLOW);
};

} // namespace bftEngine::impl

0 comments on commit c621a10

Please sign in to comment.