From db60cc7fb5a5a2215db270a64ead13e6905eb306 Mon Sep 17 00:00:00 2001 From: Bela Ban Date: Mon, 6 Mar 2023 09:15:18 +0100 Subject: [PATCH] Modified BundlerStressTest --- bin/bundler-stress_test.sh | 18 + src/org/jgroups/protocols/pbcast/NAKACK2.java | 6 +- .../org/jgroups/tests/BundlerStressTest.java | 368 ++++++++++++------ 3 files changed, 264 insertions(+), 128 deletions(-) create mode 100755 bin/bundler-stress_test.sh diff --git a/bin/bundler-stress_test.sh b/bin/bundler-stress_test.sh new file mode 100755 index 00000000000..36e8d82fa54 --- /dev/null +++ b/bin/bundler-stress_test.sh @@ -0,0 +1,18 @@ +#!/bin/bash + +## Rund BundlerStressTest, writes results to $OUTPUT (removes it first, when started) + +## The bundler to be tested +BUNDLERS="no-bundler transfer-queue per-destination" +OUTPUT="bst.txt" +THREADS="1 8 100" +PROPS="$HOME/tcp.xml" +PGM=BundlerStressTest + +rm -f $OUTPUT +for i in $BUNDLERS; + do for j in $THREADS; + do jt $PGM -props $PROPS -bundler $i -num_sender_threads $j -interactive false -time 30 -warmup 10 >> $OUTPUT; + done; + done; + diff --git a/src/org/jgroups/protocols/pbcast/NAKACK2.java b/src/org/jgroups/protocols/pbcast/NAKACK2.java index 842369d44de..c20d962319e 100644 --- a/src/org/jgroups/protocols/pbcast/NAKACK2.java +++ b/src/org/jgroups/protocols/pbcast/NAKACK2.java @@ -236,9 +236,9 @@ public void setResendLastSeqno(boolean flag) { /** Used by the retransmit task to keep the last retransmitted seqno per sender (https://issues.redhat.com/browse/JGRP-1539) */ protected final Map xmit_task_map=new ConcurrentHashMap<>(); - protected volatile boolean leaving=false; - protected volatile boolean running=false; - protected TimeScheduler timer=null; + protected volatile boolean leaving; + protected volatile boolean running; + protected TimeScheduler timer; protected LastSeqnoResender last_seqno_resender; protected final Lock rebroadcast_lock=new ReentrantLock(); protected final Condition rebroadcast_done=rebroadcast_lock.newCondition(); diff --git a/tests/stress/org/jgroups/tests/BundlerStressTest.java b/tests/stress/org/jgroups/tests/BundlerStressTest.java index 3301ecac849..a241898ce21 100644 --- a/tests/stress/org/jgroups/tests/BundlerStressTest.java +++ b/tests/stress/org/jgroups/tests/BundlerStressTest.java @@ -1,70 +1,158 @@ package org.jgroups.tests; -import org.jgroups.Address; -import org.jgroups.BytesMessage; -import org.jgroups.Message; -import org.jgroups.PhysicalAddress; -import org.jgroups.protocols.*; -import org.jgroups.util.AsciiString; +import org.jgroups.*; +import org.jgroups.protocols.TP; +import org.jgroups.protocols.UNICAST3; +import org.jgroups.protocols.pbcast.GMS; +import org.jgroups.stack.Protocol; +import org.jgroups.stack.ProtocolStack; import org.jgroups.util.AverageMinMax; -import org.jgroups.util.DefaultThreadFactory; +import org.jgroups.util.Bits; +import org.jgroups.util.Promise; import org.jgroups.util.Util; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.LockSupport; +import java.util.concurrent.atomic.LongAdder; + +import static java.util.concurrent.TimeUnit.*; /** - * Tests bundler performance + * Tests bundler performance. Creates N members, always sends from the first member to a random member and waits for + * the response (synchronous communication). The request includes the thread's ID. + *
+ * Each sender adds their thread-id to a hashmap and waits on the promise (associated value). The receiver reads the + * thread-id, grabs the promise and calls {@link org.jgroups.util.Promise#setResult(Object)}, waking up the sender + * thread to send the next message. * @author Bela Ban * @since 4.0 */ public class BundlerStressTest { - protected String bundler_type; - protected Bundler bundler; - protected int num_msgs=50000, num_senders=20, msg_size=1000; - protected boolean details; - protected static final Address[] ADDRESSES; - protected final TP transport=new MockTransport(); - protected static final int BUFSIZE=50000; - - - static { - ADDRESSES=new Address[]{null, Util.createRandomAddress("A"), Util.createRandomAddress("B"), - Util.createRandomAddress("C"), Util.createRandomAddress("D"), Util.createRandomAddress("E"), - Util.createRandomAddress("F"), Util.createRandomAddress("G"), Util.createRandomAddress("H")}; + protected String bundler; + protected int time=60 /* seconds */, warmup=time/2, nodes=4, msg_size=1000; + protected int num_sender_threads=1; + protected boolean details; + + protected String cfg="tcp.xml"; + protected JChannel[] channels; + protected final Map> sender_threads=new ConcurrentHashMap<>(); + + + + public BundlerStressTest(String config, String bundler, int time_secs, int warmup, + int nodes, int num_sender_threads, int msg_size) { + this.cfg=config; + this.bundler=bundler; + this.time=time_secs; + this.warmup=warmup; + this.nodes=nodes; + this.num_sender_threads=num_sender_threads; + this.msg_size=msg_size; + } + + + protected BundlerStressTest createChannels() throws Exception { + if(channels != null) + Util.closeReverse(channels); + channels=new JChannel[nodes]; + for(int i=0; i < channels.length; i++) { + char ch=(char)('A' + i); + String name=String.valueOf(ch); + channels[i]=new JChannel(cfg).name(name); + GMS gms=channels[i].getProtocolStack().findProtocol(GMS.class); + if(gms != null) + gms.printLocalAddress(false); + + channels[i].connect("bst"); + System.out.print("."); + if(i == 0) { + TP transport=channels[0].getProtocolStack().getTransport(); + transport.bundler(bundler); + } + else + channels[i].setReceiver(new BundlerTestReceiver()); + } + Util.waitUntilAllChannelsHaveSameView(10000, 500, channels); + for(int i=0; i < channels.length; i++) { + UNICAST3 uni=channels[i].getProtocolStack().findProtocol(UNICAST3.class); + if(uni != null) + uni.sendPendingAcks(); + } + System.out.printf("\n-- view: %s (bundler=%s)\n", channels[0].getView(), getBundlerType()); + for(int i=0; i < channels.length; i++) { + // UNICAST3.sendPendingAcks() (called by stop()) would cause an NPE (down_prot is null) + UNICAST3 uni=channels[i].getProtocolStack().findProtocol(UNICAST3.class); + if(uni != null) { + uni.stopRetransmitTask(); + uni.sendPendingAcks(); + } + } + return removeProtocols(); } + // Removes all protocols but the transports + protected BundlerStressTest removeProtocols() { + for(JChannel ch: channels) { + ProtocolStack stack=ch.getProtocolStack(); + Protocol prot=stack.getTopProtocol(); + while(prot != null && !(prot instanceof TP)) { + try { + stack.removeProtocol(prot); + } + catch(Throwable ignored) {} + prot=stack.getTopProtocol(); + } + } + return this; + } - public BundlerStressTest(String bundler_type) { - this.bundler_type=bundler_type; + protected void start(boolean interactive) throws Exception { + try { + createChannels(); + if(interactive) + loop(); + else { + sendMessages(true); + sendMessages(false); + } + } + finally { + stop(); + } } - protected void start() throws Exception { - this.bundler=TP.createBundler(bundler_type, getClass()); - this.bundler.init(transport); - this.bundler.start(); - loop(); + protected void stop() { + Util.closeReverse(channels); + channels=null; + } + + protected String getBundlerType() { + return channels[0].getProtocolStack().getTransport().getBundler().getClass().getSimpleName(); } protected void loop() { boolean looping=true; while(looping) { - int c=Util.keyPress(String.format("[1] send [2] num_msgs (%d) [3] senders (%d) [4] msg size (%d bytes)\n" + - "[b] change bundler (%s) [d] details (%b) [x] exit\nbundler: %s\n", - num_msgs, num_senders, msg_size, bundler.getClass().getSimpleName(), - details, bundler.toString())); + int c=Util.keyPress(String.format("[1] send [2] num_sender_threads (%d) [3] nodes (%d) " + + "[4] msg size (%d bytes) [5] time %s)\n" + + "[b] change bundler (%s) [d] details (%b) [x] exit\n", + num_sender_threads, nodes, msg_size, Util.printTime(time, SECONDS), + getBundlerType(), details)); try { switch(c) { case '1': - sendMessages(); + sendMessages(false); break; case '2': - num_msgs=Util.readIntFromStdin("num_msgs: "); + num_sender_threads=Util.readIntFromStdin("num_sender_threads: "); break; case '3': - num_senders=Util.readIntFromStdin("num_senders: "); + int old=nodes; + nodes=Util.readIntFromStdin("nodes: "); + if(old != nodes) + createChannels(); break; case '4': msg_size=Util.readIntFromStdin("msg_size: "); @@ -73,12 +161,8 @@ protected void loop() { String type=null; try { type=Util.readStringFromStdin("new bundler type: "); - Bundler old=this.bundler; - this.bundler=TP.createBundler(type, getClass()); - this.bundler.init(transport); - this.bundler.start(); - if(old != null) - old.stop(); + TP tp=channels[0].getProtocolStack().getTransport(); + tp.bundler(type); } catch(Throwable t) { System.err.printf("failed changing bundler to %s: %s\n", type, t); @@ -97,46 +181,62 @@ protected void loop() { t.printStackTrace(); } } - if(this.bundler != null) - this.bundler.stop(); + stop(); + } + + + protected Address pickRandomDestination() { + if(channels == null) return null; + int size=channels.length; + int index=(int)Util.random(size-1); + return channels[index].getAddress(); } - protected void sendMessages() throws Exception { - Message[] msgs=generateMessages(num_msgs); + protected void sendMessages(boolean is_warmup) throws Exception { + sender_threads.clear(); CountDownLatch latch=new CountDownLatch(1); - AtomicInteger index=new AtomicInteger(0); - Sender[] senders=new Sender[num_senders]; + LongAdder sent_msgs=new LongAdder(); + Sender[] senders=new Sender[num_sender_threads]; for(int i=0; i < senders.length; i++) { - senders[i]=new Sender(latch, msgs, index); + senders[i]=new Sender(latch, sent_msgs); senders[i].start(); } + if(is_warmup) + System.out.printf("-- warmup for %d seconds\n", this.warmup); + else + System.out.printf("-- %d sender threads sending messages for %d seconds\n", num_sender_threads, time); long start=Util.micros(); latch.countDown(); // starts all sender threads + // wait for time seconds + long t=is_warmup? this.warmup : this.time; + long interval=(long)((t * 1000.0) / 10.0), sent_since_last_interval=0; + for(int i=1; i <= 10; i++) { + Util.sleep(interval); + long curr=sent_msgs.sum(); + long sent=curr - sent_since_last_interval; + sent_since_last_interval=curr; + double reqs_sec=sent / (t/10.0); + if(is_warmup) + System.out.print("."); + else + System.out.printf("%d: %,.2f reqs/sec, %s / req\n", i, reqs_sec, + Util.printTime(senders[0].send.getAverage(), NANOSECONDS)); + } + + for(Sender sender: senders) + sender.stopThread(); for(Sender sender: senders) sender.join(); - - // wait until the bundler has no pending msgs left - long park_time=1; - for(int i=0; i < 1_000_000; i++) { - int pending_msgs=bundler.size(); - if(pending_msgs == 0) - break; - - LockSupport.parkNanos(park_time); - if(i % 10000 == 0) { - park_time=Math.min(park_time*2, 1_000_000); // 1 ms max park time - } - + if(is_warmup) { + System.out.println(); + return; } - if(bundler.size() > 0) - throw new Exception(String.format("bundler still has %d pending messages", bundler.size())); - long time_us=Util.micros()-start; AverageMinMax send_avg=null; for(Sender sender: senders) { - if(details) + if(details && !is_warmup) System.out.printf("[%d] count=%d, send-time = %s\n", sender.getId(), sender.send.count(), sender.send); if(send_avg == null) send_avg=sender.send; @@ -144,68 +244,105 @@ protected void sendMessages() throws Exception { send_avg.merge(sender.send); } - - - - double msgs_sec=num_msgs / (time_us / 1_000.0); - System.out.printf(Util.bold("\n\nreqs/ms = %.2f (time: %d us)" + - "\nsend-time = min/avg/max: %d / %.2f / %d ns\n"), - msgs_sec, time_us, send_avg.min(), send_avg.average(), send_avg.max()); + long num_msgs=sent_msgs.sum(); + double msgs_sec=num_msgs / (time_us / 1000.0 / 1000.0); + System.out.printf(Util.bold("\n" + + "\nbundler: %s" + + "\nsender threads: %s" + + "\nreqs/sec: %,.2f (time: %s)" + + "\nsend-time: %s / %s / %s (min/avg/max)\n"), + getBundlerType(), num_sender_threads, + msgs_sec, Util.printTime(time_us, MICROSECONDS), Util.printTime(send_avg.min(), NANOSECONDS), + Util.printTime(send_avg.average(), NANOSECONDS), + Util.printTime(send_avg.max(), NANOSECONDS)); } - protected Message[] generateMessages(int num) { - Message[] msgs=new Message[num]; - for(int i=0; i < msgs.length; i++) - msgs[i]=new BytesMessage(pickAddress(), new byte[msg_size]); - return msgs; - } - - protected static Address pickAddress() { - return Util.pickRandomElement(ADDRESSES); - } - public static void main(String[] args) throws Exception { - String bundler="ring-buffer-lockless2"; + String bundler="transfer-queue", props="tcp.xml"; + int time=60, warmup=time/2, nodes=4, num_sender_threads=1, msg_size=1000; + boolean interactive=true; for(int i=0; i < args.length; i++) { if(args[i].equals("-bundler")) { bundler=args[++i]; continue; } - System.out.print("BundlerStressTest [-bundler bundler-type]\n"); + if("-time".equals(args[i])) { + time=Integer.parseInt(args[++i]); + warmup=time/2; + continue; + } + if("-warmup".equals(args[i])) { + warmup=Integer.parseInt(args[++i]); + continue; + } + if("-nodes".equals(args[i])) { + nodes=Integer.parseInt(args[++i]); + continue; + } + if("-num_sender_threads".equals(args[i])) { + num_sender_threads=Integer.parseInt(args[++i]); + continue; + } + if("-msg_size".equals(args[i])) { + msg_size=Integer.parseInt(args[++i]); + continue; + } + if("-interactive".equals(args[i])) { + interactive=Boolean.parseBoolean(args[++i]); + continue; + } + if("-props".equals(args[i])) { + props=args[++i]; + continue; + } + System.out.print("BundlerStressTest [-props config] [-bundler bundler-type] [-time secs] [-warmup secs]" + + "[-num_sender_threads num] [-nodes num] [-msg_size size] [-interactive false|true]\n"); return; } - new BundlerStressTest(bundler).start(); + BundlerStressTest test=new BundlerStressTest(props, bundler, time, warmup, nodes, num_sender_threads, msg_size); + test.start(interactive); } protected class Sender extends Thread { protected final CountDownLatch latch; - protected final Message[] msgs; - protected final AtomicInteger index; protected final AverageMinMax send=new AverageMinMax(); // ns + protected long thread_id; + protected Promise promise; + protected final LongAdder sent_msgs; + protected volatile boolean running=true; - public Sender(CountDownLatch latch, Message[] msgs, AtomicInteger index) { + public Sender(CountDownLatch latch, LongAdder sent_msgs) { this.latch=latch; - this.msgs=msgs; - this.index=index; + this.sent_msgs=sent_msgs; + } + + public void stopThread() { + running=false; } public void run() { + thread_id=Thread.currentThread().getId(); + sender_threads.put(thread_id, promise=new Promise<>()); try { latch.await(); } catch(InterruptedException e) { } - while(true) { - int idx=index.getAndIncrement(); - if(idx >= msgs.length) - break; + byte[] buf=new byte[msg_size]; // the first 8 bytes are the thread_id + Bits.writeLong(thread_id, buf, 0); + while(running) { try { + promise.reset(false); + Address dest=pickRandomDestination(); + Message msg=new BytesMessage(dest, buf); long start=System.nanoTime(); - bundler.send(msgs[idx]); + channels[0].send(msg); + promise.getResult(0); + sent_msgs.increment(); long time_ns=System.nanoTime()-start; - send.add(time_ns); + send.add(time_ns); // single threaded; no lock needed } catch(Exception e) { e.printStackTrace(); @@ -214,32 +351,13 @@ public void run() { } } - protected static class MockTransport extends TP { - - - public MockTransport() { - this.cluster_name=new AsciiString("mock"); - thread_factory=new DefaultThreadFactory("", false); - } - - public boolean supportsMulticasting() { - return false; - } - - public void doSend(byte[] buf, int offset, int length, Address dest) throws Exception { - - } - - public void sendUnicast(PhysicalAddress dest, byte[] data, int offset, int length) throws Exception { - - } - - public String getInfo() { - return null; - } - - protected PhysicalAddress getPhysicalAddress() { - return null; + protected class BundlerTestReceiver implements Receiver { + @Override + public void receive(Message msg) { + byte[] buf=msg.getArray(); + long thread_id=Bits.readLong(buf, msg.getOffset()); + Promise promise=sender_threads.get(thread_id); + promise.setResult(thread_id); // wakes up sender } }