From bdfa47deef87ed51cb2eb1c5a9a76c2fd6a6cd6f Mon Sep 17 00:00:00 2001 From: Bela Ban Date: Thu, 22 Dec 2022 14:53:11 +0100 Subject: [PATCH] ns --- .../protocols/PerDestinationBundler.java | 71 ++++++++++--------- 1 file changed, 36 insertions(+), 35 deletions(-) diff --git a/src/org/jgroups/protocols/PerDestinationBundler.java b/src/org/jgroups/protocols/PerDestinationBundler.java index 8903296eae7..0a5474dd79b 100644 --- a/src/org/jgroups/protocols/PerDestinationBundler.java +++ b/src/org/jgroups/protocols/PerDestinationBundler.java @@ -15,8 +15,10 @@ import java.util.Map; import java.util.Objects; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.LongAdder; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Stream; import static org.jgroups.protocols.TP.MSG_OVERHEAD; @@ -36,28 +38,32 @@ public class PerDestinationBundler implements Bundler { * This value needs to be smaller than the largest datagram packet size in case of UDP */ @Property(name="max_size", type= AttributeType.BYTES, - description="Maximum number of bytes for messages to be queued until they are sent") + description="Maximum number of bytes for messages to be queued (per destination) until they are sent") protected int max_size=64000; @Property(description="The maximum number of queued messages per destination. When the queue is full, a new " + "batch will be sent") - public int max_queue_size=128; + protected int max_queue_size=128; - @ManagedAttribute(description="Number of messages sent in BatchMessages",type=AttributeType.SCALAR) - protected long num_msgs_sent; + @ManagedAttribute(description="Total number of messages sent (single and batches)",type=AttributeType.SCALAR) + protected final LongAdder total_msgs_sent=new LongAdder(); - @ManagedAttribute(description="Number of BatchMessages sent",type=AttributeType.SCALAR) - protected long num_batches_sent; + @ManagedAttribute(description="Number of single messages sent",type=AttributeType.SCALAR) + protected final LongAdder num_single_msgs_sent=new LongAdder(); - @ManagedAttribute(description="Number of BatchMessages sent because the queue was full",type=AttributeType.SCALAR) - protected long num_batches_sent_due_to_max_size; - @ManagedAttribute(description="Number of BatchMessages sent because the max number of messages has been " + + @ManagedAttribute(description="Number of batches sent",type=AttributeType.SCALAR) + protected final LongAdder num_batches_sent=new LongAdder(); + + @ManagedAttribute(description="Number of batches sent because the queue was full",type=AttributeType.SCALAR) + protected final LongAdder num_sends_due_to_max_size=new LongAdder(); + + @ManagedAttribute(description="Number of batches sent because the max number of messages has been " + "reached (max_queue_size)", type=AttributeType.SCALAR) - protected long num_batches_sent_due_to_full_queue; + protected final LongAdder num_sends_due_to_full_queue=new LongAdder(); - @ManagedAttribute(description="Number of MessageBatches sent because the last sender thread returned",type=AttributeType.SCALAR) - protected long num_batches_sent_due_to_last_thread; + @ManagedAttribute(description="Number of batches sent because the last sender thread returned",type=AttributeType.SCALAR) + protected final LongAdder num_sends_due_to_last_thread=new LongAdder(); protected TP transport; protected Log log; @@ -78,16 +84,15 @@ public int size() { @ManagedAttribute(description="Average number of messages in an BatchMessage") public double avgBatchSize() { - if(num_batches_sent == 0 || num_msgs_sent == 0) return 0.0; - return num_msgs_sent / (double)num_batches_sent; + long num_batches=num_batches_sent.sum(), total_msgs=total_msgs_sent.sum(), single_msgs=num_single_msgs_sent.sum(); + if(num_batches == 0 || total_msgs == 0) return 0.0; + long batched_msgs=total_msgs - single_msgs; + return batched_msgs / (double)num_batches; } @Override public void resetStats() { - num_msgs_sent=0; - num_batches_sent=0; - num_batches_sent_due_to_max_size=0; - num_batches_sent_due_to_full_queue=0; - num_batches_sent_due_to_last_thread=0; + Stream.of(total_msgs_sent, num_batches_sent, num_sends_due_to_max_size, + num_sends_due_to_full_queue, num_sends_due_to_last_thread).forEach(LongAdder::reset); } public void init(TP transport) { @@ -119,7 +124,7 @@ public void viewChange(View view) { .forEach(dest -> dests.putIfAbsent(dest, new SendBuffer())); // remove left members - dests.keySet().stream().filter(dest -> !mbrs.contains(dest) && !(dest instanceof NullAddress)) + dests.keySet().stream().filter(dest -> !mbrs.contains(dest) && !(dest == NULL)) .forEach(dests::remove); } @@ -146,19 +151,19 @@ protected void addMessage(Address dest, Message msg) { lock.lock(); try { if(total_bytes + msg_bytes >= max_size) { - num_batches_sent_due_to_max_size++; + num_sends_due_to_max_size.increment(); sendBatch(dest); // will not throw an exception, just log a warning } msgs[index++]=msg; total_bytes+=msg_bytes; if(index == msgs.length) { - num_batches_sent_due_to_full_queue++; + num_sends_due_to_full_queue.increment(); sendBatch(dest); } if(thread_count.decrementAndGet() == 0) { - num_batches_sent_due_to_last_thread++; + num_sends_due_to_last_thread.increment(); sendBatch(dest); // will not throw an exception, just log a warning } } @@ -171,22 +176,18 @@ protected void sendBatch(Address destination) { if(index == 0) return; - Address dest=destination instanceof NullAddress? null : destination; + Address dest=destination == NULL? null : destination; if(index == 1) { // send a single message sendSingleMessage(dest, msgs[0]); - msgs[0]=null; - index=0; - total_bytes=0; - num_msgs_sent++; - return; + num_single_msgs_sent.increment(); } - - sendMessageList(dest, local_addr, msgs, index); - // msgs = new Message[max_batch_size]; - num_msgs_sent+=index; - num_batches_sent++; - index=0; + else { + sendMessageList(dest, local_addr, msgs, index); + num_batches_sent.increment(); + } + total_msgs_sent.add(index); total_bytes=0; + index=0; } protected void sendSingleMessage(final Address dest, final Message msg) {