Skip to content

Commit

Permalink
ns
Browse files Browse the repository at this point in the history
  • Loading branch information
belaban committed Dec 22, 2022
1 parent 61f3120 commit bdfa47d
Showing 1 changed file with 36 additions and 35 deletions.
71 changes: 36 additions & 35 deletions src/org/jgroups/protocols/PerDestinationBundler.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Stream;

import static org.jgroups.protocols.TP.MSG_OVERHEAD;

Expand All @@ -36,28 +38,32 @@ public class PerDestinationBundler implements Bundler {
* This value needs to be smaller than the largest datagram packet size in case of UDP
*/
@Property(name="max_size", type= AttributeType.BYTES,
description="Maximum number of bytes for messages to be queued until they are sent")
description="Maximum number of bytes for messages to be queued (per destination) until they are sent")
protected int max_size=64000;

@Property(description="The maximum number of queued messages per destination. When the queue is full, a new " +
"batch will be sent")
public int max_queue_size=128;
protected int max_queue_size=128;

@ManagedAttribute(description="Number of messages sent in BatchMessages",type=AttributeType.SCALAR)
protected long num_msgs_sent;
@ManagedAttribute(description="Total number of messages sent (single and batches)",type=AttributeType.SCALAR)
protected final LongAdder total_msgs_sent=new LongAdder();

@ManagedAttribute(description="Number of BatchMessages sent",type=AttributeType.SCALAR)
protected long num_batches_sent;
@ManagedAttribute(description="Number of single messages sent",type=AttributeType.SCALAR)
protected final LongAdder num_single_msgs_sent=new LongAdder();

@ManagedAttribute(description="Number of BatchMessages sent because the queue was full",type=AttributeType.SCALAR)
protected long num_batches_sent_due_to_max_size;

@ManagedAttribute(description="Number of BatchMessages sent because the max number of messages has been " +
@ManagedAttribute(description="Number of batches sent",type=AttributeType.SCALAR)
protected final LongAdder num_batches_sent=new LongAdder();

@ManagedAttribute(description="Number of batches sent because the queue was full",type=AttributeType.SCALAR)
protected final LongAdder num_sends_due_to_max_size=new LongAdder();

@ManagedAttribute(description="Number of batches sent because the max number of messages has been " +
"reached (max_queue_size)", type=AttributeType.SCALAR)
protected long num_batches_sent_due_to_full_queue;
protected final LongAdder num_sends_due_to_full_queue=new LongAdder();

@ManagedAttribute(description="Number of MessageBatches sent because the last sender thread returned",type=AttributeType.SCALAR)
protected long num_batches_sent_due_to_last_thread;
@ManagedAttribute(description="Number of batches sent because the last sender thread returned",type=AttributeType.SCALAR)
protected final LongAdder num_sends_due_to_last_thread=new LongAdder();

protected TP transport;
protected Log log;
Expand All @@ -78,16 +84,15 @@ public int size() {

@ManagedAttribute(description="Average number of messages in an BatchMessage")
public double avgBatchSize() {
if(num_batches_sent == 0 || num_msgs_sent == 0) return 0.0;
return num_msgs_sent / (double)num_batches_sent;
long num_batches=num_batches_sent.sum(), total_msgs=total_msgs_sent.sum(), single_msgs=num_single_msgs_sent.sum();
if(num_batches == 0 || total_msgs == 0) return 0.0;
long batched_msgs=total_msgs - single_msgs;
return batched_msgs / (double)num_batches;
}

@Override public void resetStats() {
num_msgs_sent=0;
num_batches_sent=0;
num_batches_sent_due_to_max_size=0;
num_batches_sent_due_to_full_queue=0;
num_batches_sent_due_to_last_thread=0;
Stream.of(total_msgs_sent, num_batches_sent, num_sends_due_to_max_size,
num_sends_due_to_full_queue, num_sends_due_to_last_thread).forEach(LongAdder::reset);
}

public void init(TP transport) {
Expand Down Expand Up @@ -119,7 +124,7 @@ public void viewChange(View view) {
.forEach(dest -> dests.putIfAbsent(dest, new SendBuffer()));

// remove left members
dests.keySet().stream().filter(dest -> !mbrs.contains(dest) && !(dest instanceof NullAddress))
dests.keySet().stream().filter(dest -> !mbrs.contains(dest) && !(dest == NULL))
.forEach(dests::remove);
}

Expand All @@ -146,19 +151,19 @@ protected void addMessage(Address dest, Message msg) {
lock.lock();
try {
if(total_bytes + msg_bytes >= max_size) {
num_batches_sent_due_to_max_size++;
num_sends_due_to_max_size.increment();
sendBatch(dest); // will not throw an exception, just log a warning
}

msgs[index++]=msg;
total_bytes+=msg_bytes;
if(index == msgs.length) {
num_batches_sent_due_to_full_queue++;
num_sends_due_to_full_queue.increment();
sendBatch(dest);
}

if(thread_count.decrementAndGet() == 0) {
num_batches_sent_due_to_last_thread++;
num_sends_due_to_last_thread.increment();
sendBatch(dest); // will not throw an exception, just log a warning
}
}
Expand All @@ -171,22 +176,18 @@ protected void sendBatch(Address destination) {
if(index == 0)
return;

Address dest=destination instanceof NullAddress? null : destination;
Address dest=destination == NULL? null : destination;
if(index == 1) { // send a single message
sendSingleMessage(dest, msgs[0]);
msgs[0]=null;
index=0;
total_bytes=0;
num_msgs_sent++;
return;
num_single_msgs_sent.increment();
}

sendMessageList(dest, local_addr, msgs, index);
// msgs = new Message[max_batch_size];
num_msgs_sent+=index;
num_batches_sent++;
index=0;
else {
sendMessageList(dest, local_addr, msgs, index);
num_batches_sent.increment();
}
total_msgs_sent.add(index);
total_bytes=0;
index=0;
}

protected void sendSingleMessage(final Address dest, final Message msg) {
Expand Down

0 comments on commit bdfa47d

Please sign in to comment.