Skip to content

Commit

Permalink
Methods for unacked messages metrics
Browse files Browse the repository at this point in the history
This change adds a getNumUnackedMessages(dest) method to UNICAST3/4//NAKACK4 which can be used for e.g. metrics.
  • Loading branch information
cfredri4 committed Jan 20, 2025
1 parent ace28d1 commit a98e6d4
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 0 deletions.
11 changes: 11 additions & 0 deletions src/org/jgroups/protocols/NAKACK4.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,17 @@ public class NAKACK4 extends ReliableMulticast {
public NAKACK4 ackThreshold(int t) {ack_threshold=t; return this;}
@Override public Options sendOptions() {return SEND_OPTIONS;}

@ManagedAttribute(type = SCALAR)
public long getNumUnackedMessages() {
long minAck=ack_table.min();
return minAck > 0 ? seqno.get() - minAck : 0;
}

public long getNumUnackedMessages(Address dest) {
long minAck=ack_table.min(dest);
return minAck > 0 ? seqno.get() - minAck : 0;
}

@ManagedAttribute(description="Number of times sender threads were blocked on a full send window",type=SCALAR)
public long getNumBlockings() {
FixedBuffer<Message> buf=(FixedBuffer<Message>)sendBuf();
Expand Down
5 changes: 5 additions & 0 deletions src/org/jgroups/protocols/ReliableUnicast.java
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,11 @@ public int getNumUnackedMessages() {
return accumulate(Buffer::size, send_table.values());
}

public int getNumUnackedMessages(Address dest) {
Entry entry=send_table.get(dest);
return entry != null ? entry.buf.size() : 0;
}

@ManagedAttribute(description="Total number of undelivered messages in all receive windows",type=SCALAR)
public int getXmitTableUndeliveredMessages() {
return accumulate(Buffer::size, recv_table.values());
Expand Down
5 changes: 5 additions & 0 deletions src/org/jgroups/protocols/UNICAST3.java
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,11 @@ public int getNumUnackedMessages() {
return accumulate(Table::size, send_table.values());
}

public int getNumUnackedMessages(Address dest) {
Entry entry=send_table.get(dest);
return entry != null ? entry.msgs.size() : 0;
}

@ManagedAttribute(description="Total number of undelivered messages in all receive windows",type=SCALAR)
public int getXmitTableUndeliveredMessages() {
return accumulate(Table::size, recv_table.values());
Expand Down
9 changes: 9 additions & 0 deletions src/org/jgroups/util/AckTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,15 @@ public long min() {
}
}

public long min(Address mbr) {
lock.lock();
try {
return acks.getOrDefault(mbr, 0L);
} finally {
lock.unlock();
}
}

/** Adds an ACK from a sender to the map. Returns the old and new minimum */
public long[] ack(Address sender, long seqno) {
lock.lock();
Expand Down

0 comments on commit a98e6d4

Please sign in to comment.