Skip to content

Commit

Permalink
NAKACK2.stable(): reduce number of retransmissions (https://issues.re…
Browse files Browse the repository at this point in the history
  • Loading branch information
belaban committed Mar 6, 2023
1 parent 269f5b3 commit b43fdf1
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 12 deletions.
12 changes: 9 additions & 3 deletions src/org/jgroups/protocols/pbcast/NAKACK2.java
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,8 @@ public void setResendLastSeqno(boolean flag) {
protected Future<?> xmit_task;
/** Used by the retransmit task to keep the last retransmitted seqno per sender (https://issues.redhat.com/browse/JGRP-1539) */
protected final Map<Address,Long> xmit_task_map=new ConcurrentHashMap<>();
//* Used by stable to reduce the number of retransmissions (https://issues.redhat.com/browse/JGRP-2678) */
protected final Map<Address,Long> stable_xmit_map=new ConcurrentHashMap<>();

protected volatile boolean leaving;
protected volatile boolean running;
Expand Down Expand Up @@ -541,6 +543,7 @@ public void stop() {
become_server_queue.clear();
stopRetransmitTask();
xmit_task_map.clear();
stable_xmit_map.clear();
reset();
}

Expand Down Expand Up @@ -585,6 +588,7 @@ public Object down(Event evt) {
if(suppress_log_non_member != null)
suppress_log_non_member.removeExpired(suppress_time_non_member_warnings);
xmit_task_map.keySet().retainAll(mbrs);
stable_xmit_map.keySet().retainAll(mbrs);
break;

case Event.BECOME_SERVER:
Expand Down Expand Up @@ -1434,11 +1438,13 @@ protected void stable(Digest digest) {
Table<Message> buf=xmit_table.get(member);
if(buf != null) {
long my_hr=buf.getHighestReceived();
if(hr >= 0 && hr > my_hr) {
Long prev_hr=stable_xmit_map.get(member);
if(prev_hr != null && prev_hr > my_hr) {
log.trace("%s: my_highest_rcvd (%d) < stability_highest_rcvd (%d): requesting retransmission of %s",
local_addr, my_hr, hr, member + "#" + hr);
retransmit(hr, hr, member, false);
local_addr, my_hr, prev_hr, member + "#" + prev_hr);
retransmit(prev_hr, prev_hr, member, false);
}
stable_xmit_map.put(member, hr);
}

// delete *delivered* msgs that are stable (all messages with seqnos <= seqno)
Expand Down
12 changes: 3 additions & 9 deletions tests/junit/org/jgroups/tests/LastMessageDroppedTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public class LastMessageDroppedTest extends ChannelTestBase {
makeUnique(a,b);
changeNAKACK2(a,b);
// it should take between 0 and 6s to retransmit the last missing msg. if dropped, may have to run multiple times
changeDesiredGossipTime(3000, a,b);
changeDesiredGossipTime(2000, a,b);
a.connect("LastMessageDroppedTest");
b.connect("LastMessageDroppedTest");
Util.waitUntilAllChannelsHaveSameView(10000, 500, a, b);
Expand All @@ -58,10 +58,7 @@ public void testLastMessageDropped() throws Exception {
a.send(null, 3);

Collection<Integer> list=receiver.getMsgs();
for(int i=0; i < 20 && list.size() < 3; i++) {
System.out.println("list=" + list);
Util.sleep(1000);
}
Util.waitUntil(20000, 500, () -> list.size() == 3);
System.out.println("list=" + list);
assert list.size() == 3 : "list=" + list;
}
Expand All @@ -84,10 +81,7 @@ public void testLastMessageAndLastSeqnoDropped() throws Exception {
a.send(null, 3);

Collection<Integer> list=receiver.getMsgs();
for(int i=0; i < 20 && list.size() < 3; i++) {
System.out.println("list=" + list);
Util.sleep(1000);
}
Util.waitUntil(20000, 500, () -> list.size() == 3);
System.out.println("list=" + list);
assert list.size() == 3 : "list=" + list;
}
Expand Down

0 comments on commit b43fdf1

Please sign in to comment.