Skip to content

Commit

Permalink
merge: #14492
Browse files Browse the repository at this point in the history
14492: [Backport stable/8.2] [Backport release-8.2.15] Backport Reduced AppendListener Scope r=github-actions[bot] a=backport-action

# Description
Backport of #14488 to `stable/8.2`.

relates to #14312 #14275 #14275
original author: `@megglos`

Co-authored-by: Christopher Zell <zelldon91@googlemail.com>
  • Loading branch information
zeebe-bors-camunda[bot] and ChrisKujawa authored Sep 27, 2023
2 parents 65f2228 + 45d10cd commit c50db78
Show file tree
Hide file tree
Showing 14 changed files with 116 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,16 @@
*/
package io.atomix.raft;

import io.atomix.raft.storage.log.IndexedRaftLogEntry;

/**
* This listener will only be called by the Leader, when it commits an entry. If RAFT is currently
* running in a follower role, it will not call this listener.
* This listener will only be called by the Leader, when it commits an application entry.
*
* <p>If RAFT is currently running in a follower role, it will not call this listener.
*/
@FunctionalInterface
public interface RaftCommittedEntryListener {
public interface RaftApplicationEntryCommittedPositionListener {

/**
* @param indexedRaftLogEntry the new committed entry
* @param committedPosition the new committed position which is related to the application entries
*/
void onCommit(IndexedRaftLogEntry indexedRaftLogEntry);
void onCommit(long committedPosition);
}
21 changes: 11 additions & 10 deletions atomix/cluster/src/main/java/io/atomix/raft/impl/RaftContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import io.atomix.cluster.ClusterMembershipService;
import io.atomix.cluster.MemberId;
import io.atomix.raft.ElectionTimer;
import io.atomix.raft.RaftApplicationEntryCommittedPositionListener;
import io.atomix.raft.RaftCommitListener;
import io.atomix.raft.RaftCommittedEntryListener;
import io.atomix.raft.RaftException.ProtocolException;
import io.atomix.raft.RaftRoleChangeListener;
import io.atomix.raft.RaftServer;
Expand Down Expand Up @@ -55,7 +55,6 @@
import io.atomix.raft.roles.RaftRole;
import io.atomix.raft.storage.RaftStorage;
import io.atomix.raft.storage.StorageException;
import io.atomix.raft.storage.log.IndexedRaftLogEntry;
import io.atomix.raft.storage.log.RaftLog;
import io.atomix.raft.storage.system.MetaStore;
import io.atomix.raft.utils.StateUtil;
Expand Down Expand Up @@ -103,7 +102,7 @@ public class RaftContext implements AutoCloseable, HealthMonitorable {
private final Set<Consumer<State>> stateChangeListeners = new CopyOnWriteArraySet<>();
private final Set<Consumer<RaftMember>> electionListeners = new CopyOnWriteArraySet<>();
private final Set<RaftCommitListener> commitListeners = new CopyOnWriteArraySet<>();
private final Set<RaftCommittedEntryListener> committedEntryListeners =
private final Set<RaftApplicationEntryCommittedPositionListener> committedEntryListeners =
new CopyOnWriteArraySet<>();
private final Set<SnapshotReplicationListener> snapshotReplicationListeners =
new CopyOnWriteArraySet<>();
Expand Down Expand Up @@ -417,21 +416,23 @@ public void removeCommitListener(final RaftCommitListener commitListener) {
* <p>Note that it will be called on the Raft thread, and as such should not perform any heavy
* computation.
*
* @param raftCommittedEntryListener the listener to add
* @param raftApplicationEntryCommittedPositionListener the listener to add
*/
public void addCommittedEntryListener(
final RaftCommittedEntryListener raftCommittedEntryListener) {
committedEntryListeners.add(raftCommittedEntryListener);
final RaftApplicationEntryCommittedPositionListener
raftApplicationEntryCommittedPositionListener) {
committedEntryListeners.add(raftApplicationEntryCommittedPositionListener);
}

/**
* Removes registered committedEntryListener
*
* @param raftCommittedEntryListener the listener to remove
* @param raftApplicationEntryCommittedPositionListener the listener to remove
*/
public void removeCommittedEntryListener(
final RaftCommittedEntryListener raftCommittedEntryListener) {
committedEntryListeners.remove(raftCommittedEntryListener);
final RaftApplicationEntryCommittedPositionListener
raftApplicationEntryCommittedPositionListener) {
committedEntryListeners.remove(raftApplicationEntryCommittedPositionListener);
}

/**
Expand All @@ -448,7 +449,7 @@ public void notifyCommitListeners(final long lastCommitIndex) {
*
* @param committedEntry the most recently committed entry
*/
public void notifyCommittedEntryListeners(final IndexedRaftLogEntry committedEntry) {
public void notifyApplicationEntryCommittedPositionListeners(final long committedEntry) {
committedEntryListeners.forEach(listener -> listener.onCommit(committedEntry));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import io.atomix.cluster.messaging.ClusterCommunicationService;
import io.atomix.primitive.partition.Partition;
import io.atomix.primitive.partition.PartitionMetadata;
import io.atomix.raft.RaftApplicationEntryCommittedPositionListener;
import io.atomix.raft.RaftCommitListener;
import io.atomix.raft.RaftCommittedEntryListener;
import io.atomix.raft.RaftRoleChangeListener;
import io.atomix.raft.RaftServer;
import io.atomix.raft.RaftServer.Role;
Expand Down Expand Up @@ -241,16 +241,20 @@ public void removeCommitListener(final RaftCommitListener commitListener) {
}

/**
* @see io.atomix.raft.impl.RaftContext#addCommittedEntryListener(RaftCommittedEntryListener)
* @see
* io.atomix.raft.impl.RaftContext#addCommittedEntryListener(RaftApplicationEntryCommittedPositionListener)
*/
public void addCommittedEntryListener(final RaftCommittedEntryListener commitListener) {
public void addCommittedEntryListener(
final RaftApplicationEntryCommittedPositionListener commitListener) {
server.getContext().addCommittedEntryListener(commitListener);
}

/**
* @see io.atomix.raft.impl.RaftContext#removeCommittedEntryListener(RaftCommittedEntryListener)
* @see
* io.atomix.raft.impl.RaftContext#removeCommittedEntryListener(RaftApplicationEntryCommittedPositionListener)
*/
public void removeCommittedEntryListener(final RaftCommittedEntryListener commitListener) {
public void removeCommittedEntryListener(
final RaftApplicationEntryCommittedPositionListener commitListener) {
server.getContext().removeCommittedEntryListener(commitListener);
}

Expand Down
55 changes: 35 additions & 20 deletions atomix/cluster/src/main/java/io/atomix/raft/roles/LeaderRole.java
Original file line number Diff line number Diff line change
Expand Up @@ -599,27 +599,42 @@ private void safeAppendEntry(final ApplicationEntry entry, final AppendListener

private void replicate(final IndexedRaftLogEntry indexed, final AppendListener appendListener) {
raft.checkThread();
appender
.appendEntries(indexed.index())
.whenCompleteAsync(
(commitIndex, commitError) -> {
if (!isRunning()) {
return;
}
final var appendEntriesFuture = appender.appendEntries(indexed.index());

// have the state machine apply the index which should do nothing but ensures it keeps
// up to date with the latest entries, so it can handle configuration and initial
// entries properly on fail over
if (commitError == null) {
appendListener.onCommit(indexed);
raft.notifyCommittedEntryListeners(indexed);
} else {
appendListener.onCommitError(indexed, commitError);
// replicating the entry will be retried on the next append request
log.error("Failed to replicate entry: {}", indexed, commitError);
}
},
raft.getThreadContext());
if (indexed.isApplicationEntry()) {
// We have some services which are waiting for the application records, especially position
// to be committed. This is our glue code to notify them, instead of
// passing the complete object (IndexedRaftLogEntry) threw the listeners and
// keep them in heap until they are committed. This had the risk of going out of OOM
// if records can't be committed, see https://github.com/camunda/zeebe/issues/14275
final var committedPosition = indexed.getApplicationEntry().highestPosition();
appendEntriesFuture.whenCompleteAsync(
(commitIndex, commitError) -> {
if (isRunning() && commitError == null) {
raft.notifyApplicationEntryCommittedPositionListeners(committedPosition);
}
},
raft.getThreadContext());
}

appendEntriesFuture.whenCompleteAsync(
(commitIndex, commitError) -> {
if (!isRunning()) {
return;
}

// have the state machine apply the index which should do nothing but ensures it keeps
// up to date with the latest entries, so it can handle configuration and initial
// entries properly on fail over
if (commitError == null) {
appendListener.onCommit(commitIndex);
} else {
appendListener.onCommitError(commitIndex, commitError);
// replicating the entry will be retried on the next append request
log.error("Failed to replicate entry: {}", commitIndex, commitError);
}
},
raft.getThreadContext());
}

public synchronized void onInitialEntriesCommitted(final Runnable runnable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
/**
* A log appender provides a central entry point to append to the local Raft log such that it is
* automatically replicated and eventually committed, and the ability for callers to be notified of
* various events, e.g. {@link AppendListener#onCommit(IndexedRaftLogEntry)}.
* various events, e.g. {@link AppendListener#onCommit(long)}.
*/
@FunctionalInterface
public interface ZeebeLogAppender {
Expand Down Expand Up @@ -86,17 +86,17 @@ default void onWriteError(final Throwable error) {}
/**
* Called when the entry has been committed.
*
* @param indexed the entry that was committed
* @param index the index of the entry that was committed
*/
default void onCommit(final IndexedRaftLogEntry indexed) {}
default void onCommit(final long index) {}

/**
* Called when an error occurred while replicating or committing an entry, typically when if an
* append operation was pending when shutting down the server or stepping down as leader.
*
* @param indexed the entry that should have been committed
* @param index the index of the entry that should have been committed
* @param error the error that occurred
*/
default void onCommitError(final IndexedRaftLogEntry indexed, final Throwable error) {}
default void onCommitError(final long index, final Throwable error) {}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -633,7 +633,9 @@ static class DataLossChecker implements AppendListener {
final AppendListener delegate;

// Keep track of committed entries and its checksum.
final Map<Long, Long> indexToChecksumMap = new HashMap<>();
final Map<Long, Long> committedIndexToChecksumMap = new HashMap<>();

final Map<Long, Long> pendingWriteToBeCommitted = new HashMap<>();

private String failMessage = "";
private boolean dataloss = false;
Expand All @@ -644,6 +646,9 @@ static class DataLossChecker implements AppendListener {

@Override
public void onWrite(final IndexedRaftLogEntry indexed) {
final long index = indexed.index();
final var entryChecksum = indexed.getPersistedRaftRecord().checksum();
pendingWriteToBeCommitted.put(index, entryChecksum);
delegate.onWrite(indexed);
}

Expand All @@ -653,23 +658,26 @@ public void onWriteError(final Throwable error) {
}

@Override
public void onCommit(final IndexedRaftLogEntry indexed) {
final var entryChecksum = indexed.getPersistedRaftRecord().checksum();
final long index = indexed.index();
if (indexToChecksumMap.containsKey(index) && indexToChecksumMap.get(index) != entryChecksum) {
public void onCommit(final long index) {
if (committedIndexToChecksumMap.containsKey(index)
&& pendingWriteToBeCommitted.containsKey(index)
&& pendingWriteToBeCommitted.get(index).equals(committedIndexToChecksumMap.get(index))) {
failMessage =
"Committed entry at index %d checksum %d is being overwritten by entry with checksum %d"
.formatted(index, indexToChecksumMap.get(index), entryChecksum);
.formatted(
index,
committedIndexToChecksumMap.get(index),
pendingWriteToBeCommitted.get(index));
LOG.info(failMessage);
dataloss = true;
}
indexToChecksumMap.put(index, entryChecksum);
delegate.onCommit(indexed);
committedIndexToChecksumMap.put(index, pendingWriteToBeCommitted.remove(index));
delegate.onCommit(index);
}

@Override
public void onCommitError(final IndexedRaftLogEntry indexed, final Throwable error) {
delegate.onCommitError(indexed, error);
public void onCommitError(final long index, final Throwable error) {
delegate.onCommitError(index, error);
}

public String getFailMessage() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
package io.atomix.raft;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
Expand Down Expand Up @@ -82,14 +82,14 @@ public void shouldNotifyCommitListenerOnAllNodes() throws Throwable {
@Test
public void shouldNotifyCommittedEntryListenerOnLeaderOnly() throws Throwable {
// given
final var committedEntryListener = mock(RaftCommittedEntryListener.class);
final var committedEntryListener = mock(RaftApplicationEntryCommittedPositionListener.class);
raftRule.addCommittedEntryListener(committedEntryListener);

// when
raftRule.appendEntry(); // awaits commit

// then
verify(committedEntryListener, timeout(1000L).times(1)).onCommit(any());
verify(committedEntryListener, timeout(1000L).times(1)).onCommit(anyLong());
}

@Test
Expand Down
13 changes: 8 additions & 5 deletions atomix/cluster/src/test/java/io/atomix/raft/RaftRule.java
Original file line number Diff line number Diff line change
Expand Up @@ -643,9 +643,12 @@ public void addCommitListener(final RaftCommitListener raftCommitListener) {
}

public void addCommittedEntryListener(
final RaftCommittedEntryListener raftCommittedEntryListener) {
final RaftApplicationEntryCommittedPositionListener
raftApplicationEntryCommittedPositionListener) {
servers.forEach(
(id, raft) -> raft.getContext().addCommittedEntryListener(raftCommittedEntryListener));
(id, raft) ->
raft.getContext()
.addCommittedEntryListener(raftApplicationEntryCommittedPositionListener));
}

public void partition(final RaftServer follower) {
Expand Down Expand Up @@ -688,12 +691,12 @@ public void onWriteError(final Throwable error) {
}

@Override
public void onCommit(final IndexedRaftLogEntry indexed) {
commitFuture.complete(indexed.index());
public void onCommit(final long index) {
commitFuture.complete(index);
}

@Override
public void onCommitError(final IndexedRaftLogEntry indexed, final Throwable error) {
public void onCommitError(final long index, final Throwable error) {
commitFuture.completeExceptionally(error);
}

Expand Down
6 changes: 3 additions & 3 deletions atomix/cluster/src/test/java/io/atomix/raft/RaftTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -573,12 +573,12 @@ public void onWriteError(final Throwable error) {
}

@Override
public void onCommit(final IndexedRaftLogEntry indexed) {
commitFuture.complete(indexed.index());
public void onCommit(final long index) {
commitFuture.complete(index);
}

@Override
public void onCommitError(final IndexedRaftLogEntry indexed, final Throwable error) {
public void onCommitError(final long index, final Throwable error) {
fail("Unexpected write error: " + error.getMessage());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ public void shouldNotifyOnCommit() {
append();

// then
final IndexedRaftLogEntry appended = appenderListener.pollCommitted();
assertNotNull(appended);
final var committed = appenderListener.pollCommitted();
assertNotNull(committed);
assertEquals(0, appenderListener.getErrors().size());
}

Expand Down
Loading

0 comments on commit c50db78

Please sign in to comment.