Skip to content

Commit

Permalink
Msg delivery is stuck due to items in the collection recentlyJoinedCo…
Browse files Browse the repository at this point in the history
…nsumers are out-of-order (apache#23795)

(cherry picked from commit 4a01423)
(cherry picked from commit ca535a2)
  • Loading branch information
poorbarcode authored and nikhil-ctds committed Jan 3, 2025
1 parent 183b27d commit 0dfa4f2
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,44 @@ public synchronized CompletableFuture<Void> addConsumer(Consumer consumer) {
&& consumerList.size() > 1
&& cursor.getNumberOfEntriesSinceFirstNotAckedMessage() > 1) {
recentlyJoinedConsumers.put(consumer, readPositionWhenJoining);
sortRecentlyJoinedConsumersIfNeeded();
}
}
});
}

private void sortRecentlyJoinedConsumersIfNeeded() {
if (recentlyJoinedConsumers.size() == 1) {
return;
}
boolean sortNeeded = false;
PositionImpl posPre = null;
PositionImpl posAfter = null;
for (Map.Entry<Consumer, PositionImpl> entry : recentlyJoinedConsumers.entrySet()) {
if (posPre == null) {
posPre = entry.getValue();
} else {
posAfter = entry.getValue();
}
if (posPre != null && posAfter != null) {
if (posPre.compareTo(posAfter) > 0) {
sortNeeded = true;
break;
}
posPre = posAfter;
}
}

if (sortNeeded) {
List<Map.Entry<Consumer, PositionImpl>> sortedList = new ArrayList<>(recentlyJoinedConsumers.entrySet());
Collections.sort(sortedList, Map.Entry.comparingByValue());
recentlyJoinedConsumers.clear();
for (Map.Entry<Consumer, PositionImpl> entry : sortedList) {
recentlyJoinedConsumers.put(entry.getKey(), entry.getValue());
}
}
}

@Override
public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceException {
// The consumer must be removed from the selector before calling the superclass removeConsumer method.
Expand Down Expand Up @@ -548,8 +581,11 @@ public boolean hasSameKeySharedPolicy(KeySharedMeta ksm) {
&& ksm.isAllowOutOfOrderDelivery() == this.allowOutOfOrderDelivery);
}

public LinkedHashMap<Consumer, PositionImpl> getRecentlyJoinedConsumers() {
return recentlyJoinedConsumers;
public synchronized LinkedHashMap<Consumer, PositionImpl> getRecentlyJoinedConsumers() {
if (recentlyJoinedConsumers == null) {
return null;
}
return new LinkedHashMap<>(recentlyJoinedConsumers);
}

public Map<Consumer, List<Range>> getConsumerKeyHashRanges() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
Expand Down Expand Up @@ -171,6 +173,85 @@ public void testAddConsumerWhenClosed() throws Exception {
assertTrue(persistentDispatcher.getSelector().getConsumerKeyHashRanges().isEmpty());
}

@Test
public void testSortRecentlyJoinedConsumersIfNeeded() throws Exception {
PersistentStickyKeyDispatcherMultipleConsumers persistentDispatcher =
new PersistentStickyKeyDispatcherMultipleConsumers(
topicMock, cursorMock, subscriptionMock, configMock,
new KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT));

Consumer consumer0 = mock(Consumer.class);
when(consumer0.consumerName()).thenReturn("c0-1");
Consumer consumer1 = mock(Consumer.class);
when(consumer1.consumerName()).thenReturn("c1");
Consumer consumer2 = mock(Consumer.class);
when(consumer2.consumerName()).thenReturn("c2");
Consumer consumer3 = mock(Consumer.class);
when(consumer3.consumerName()).thenReturn("c3");
Consumer consumer4 = mock(Consumer.class);
when(consumer4.consumerName()).thenReturn("c4");
Consumer consumer5 = mock(Consumer.class);
when(consumer5.consumerName()).thenReturn("c5");
Consumer consumer6 = mock(Consumer.class);
when(consumer6.consumerName()).thenReturn("c6");

when(cursorMock.getNumberOfEntriesSinceFirstNotAckedMessage()).thenReturn(100L);
when(cursorMock.getMarkDeletedPosition()).thenReturn(PositionImpl.get(-1, -1));

when(cursorMock.getReadPosition()).thenReturn(PositionImpl.get(0, 0));
persistentDispatcher.addConsumer(consumer0).join();

when(cursorMock.getReadPosition()).thenReturn(PositionImpl.get(4, 1));
persistentDispatcher.addConsumer(consumer1).join();

when(cursorMock.getReadPosition()).thenReturn(PositionImpl.get(5, 2));
persistentDispatcher.addConsumer(consumer2).join();

when(cursorMock.getReadPosition()).thenReturn(PositionImpl.get(5, 1));
persistentDispatcher.addConsumer(consumer3).join();

when(cursorMock.getReadPosition()).thenReturn(PositionImpl.get(5, 3));
persistentDispatcher.addConsumer(consumer4).join();

when(cursorMock.getReadPosition()).thenReturn(PositionImpl.get(4, 2));
persistentDispatcher.addConsumer(consumer5).join();

when(cursorMock.getReadPosition()).thenReturn(PositionImpl.get(6, 1));
persistentDispatcher.addConsumer(consumer6).join();

assertEquals(persistentDispatcher.getRecentlyJoinedConsumers().size(), 6);

Iterator<Map.Entry<Consumer, PositionImpl>> itr
= persistentDispatcher.getRecentlyJoinedConsumers().entrySet().iterator();

Map.Entry<Consumer, PositionImpl> entry1 = itr.next();
assertEquals(entry1.getValue(), PositionImpl.get(4, 1));
assertEquals(entry1.getKey(), consumer1);

Map.Entry<Consumer, PositionImpl> entry2 = itr.next();
assertEquals(entry2.getValue(), PositionImpl.get(4, 2));
assertEquals(entry2.getKey(), consumer5);

Map.Entry<Consumer, PositionImpl> entry3 = itr.next();
assertEquals(entry3.getValue(), PositionImpl.get(5, 1));
assertEquals(entry3.getKey(), consumer3);

Map.Entry<Consumer, PositionImpl> entry4 = itr.next();
assertEquals(entry4.getValue(), PositionImpl.get(5, 2));
assertEquals(entry4.getKey(), consumer2);

Map.Entry<Consumer, PositionImpl> entry5 = itr.next();
assertEquals(entry5.getValue(), PositionImpl.get(5, 3));
assertEquals(entry5.getKey(), consumer4);

Map.Entry<Consumer, PositionImpl> entry6 = itr.next();
assertEquals(entry6.getValue(), PositionImpl.get(6, 1));
assertEquals(entry6.getKey(), consumer6);

// cleanup.
persistentDispatcher.close();
}

@Test
public void testSendMarkerMessage() {
try {
Expand Down

0 comments on commit 0dfa4f2

Please sign in to comment.