diff --git a/tests/perf/org/jgroups/tests/perf/MPerf.java b/tests/perf/org/jgroups/tests/perf/MPerf.java
index 396e4f25b31..20fbba9b851 100644
--- a/tests/perf/org/jgroups/tests/perf/MPerf.java
+++ b/tests/perf/org/jgroups/tests/perf/MPerf.java
@@ -15,6 +15,7 @@
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.*;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
@@ -43,7 +44,6 @@ public class MPerf implements Receiver {
protected boolean oob;
protected boolean log_local=true; // default: same behavior as before
protected boolean display_msg_src=false;
- protected long start_time; // set on reception of START message
protected MessageCounter received_msgs_map=new MessageCounter();
protected final List
members=new CopyOnWriteArrayList<>();
protected final Log log=LogFactory.getLog(getClass());
@@ -277,10 +277,12 @@ public void receive(Message msg) {
if(my_rank >= 0 && my_rank > num_senders)
isSender = false;
}
- start_time=System.currentTimeMillis();
- Result r=sendMessages(isSender);
- System.out.println("-- done");
- sendNoException(msg.getSrc(), r, MPerfHeader.RESULT, Message.Flag.OOB);
+ final boolean is_sender=isSender;
+ CompletableFuture.supplyAsync(() -> {
+ Result r=sendMessages(is_sender);
+ System.out.println("-- done");
+ return r;
+ }).thenAccept(r -> sendNoException(msg.src(), r, MPerfHeader.RESULT, Message.Flag.OOB));
break;
case MPerfHeader.RESULT:
@@ -381,7 +383,7 @@ protected Result sendMessages(boolean isSender) {
final Thread[] senders=new Thread[num_threads];
final CountDownLatch latch=new CountDownLatch(1);
final byte[] payload=new byte[msg_size];
- final AtomicBoolean running = new AtomicBoolean(true);
+ final AtomicBoolean running=new AtomicBoolean(true);
received_msgs_map.reset();
diff --git a/tests/perf/org/jgroups/tests/perf/UPerf.java b/tests/perf/org/jgroups/tests/perf/UPerf.java
index 8b121e0df61..7cfdd7cc256 100644
--- a/tests/perf/org/jgroups/tests/perf/UPerf.java
+++ b/tests/perf/org/jgroups/tests/perf/UPerf.java
@@ -21,6 +21,7 @@
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.*;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.LongAdder;
@@ -31,16 +32,17 @@
* @author Bela Ban
*/
public class UPerf implements Receiver {
- private JChannel channel;
- private Address local_addr;
- private RpcDispatcher disp;
- static final String groupname="uperf";
- protected final List members=new ArrayList<>();
- protected volatile View view;
- protected volatile boolean looping=true;
- protected final LongAdder num_reads=new LongAdder();
- protected final LongAdder num_writes=new LongAdder();
- protected ThreadFactory thread_factory;
+ private JChannel channel;
+ private Address local_addr;
+ private RpcDispatcher disp;
+ static final String groupname="uperf";
+ protected final List members=new ArrayList<>();
+ protected volatile View view;
+ protected volatile boolean looping=true;
+ protected final LongAdder num_reads=new LongAdder();
+ protected final LongAdder num_writes=new LongAdder();
+ protected ThreadFactory thread_factory;
+ protected final ResponseCollector results_coll=new ResponseCollector<>();
@@ -57,19 +59,20 @@ public class UPerf implements Receiver {
// ... add your own here, just don't forget to annotate them with @Property
// =======================================================
- private static final Method[] METHODS=new Method[6];
- private static final short START = 0;
+ private static final Method[] METHODS=new Method[7];
+ private static final short START_TEST = 0;
private static final short GET = 1;
private static final short PUT = 2;
private static final short GET_CONFIG = 3;
private static final short SET = 4;
private static final short QUIT_ALL = 5;
+ private static final short ACCEPT_RESULTS = 6;
protected static final Field OOB, NUM_THREADS, TIME, RPC_TIMEOUT, MSG_SIZE, ANYCAST_COUNT,
READ_PERCENTAGE, PRINT_INVOKERS, PRINT_DETAILS;
- private byte[] BUFFER=new byte[msg_size];
+ private byte[] BUFFER=new byte[msg_size];
protected static final String format=
"[1] Start test [2] View [4] Threads (%d) [6] Time (%,ds) [7] Msg size (%s)" +
"\n[o] OOB (%b) [t] RPC timeout (%,dms) [a] Anycast count (%d) [r] Read percentage (%.2f) " +
@@ -79,12 +82,13 @@ public class UPerf implements Receiver {
static {
try {
- METHODS[START] = UPerf.class.getMethod("startTest");
+ METHODS[START_TEST] = UPerf.class.getMethod("startTest", Address.class);
METHODS[GET] = UPerf.class.getMethod("get", long.class);
METHODS[PUT] = UPerf.class.getMethod("put", long.class, byte[].class);
METHODS[GET_CONFIG] = UPerf.class.getMethod("getConfig");
METHODS[SET] = UPerf.class.getMethod("set", String.class, Object.class);
METHODS[QUIT_ALL] = UPerf.class.getMethod("quitAll");
+ METHODS[ACCEPT_RESULTS] = UPerf.class.getMethod("acceptResults", Address.class, Results.class);
OOB=Util.getField(UPerf.class, "oob", true);
NUM_THREADS=Util.getField(UPerf.class, "num_threads", true);
@@ -154,12 +158,73 @@ public void viewAccepted(View new_view) {
System.out.println("** view: " + new_view);
members.clear();
members.addAll(new_view.getMembers());
+ results_coll.retainAll(members);
}
// =================================== callbacks ======================================
- public Results startTest() throws Throwable {
+ public void startTest(Address sender) throws Throwable {
+ CompletableFuture.supplyAsync(() -> {
+ try {
+ return startTest();
+ }
+ catch(Throwable ex) {
+ System.err.printf("failure running the test: %s", ex);
+ return null;
+ }
+ }).thenAccept(r -> sendResults(sender, r));
+ }
+
+ /** Called by worker when done; response to {@link #startTest()} */
+ public void acceptResults(Address sender, Results results) {
+ results_coll.add(sender, results);
+ }
+
+ public void quitAll() {
+ System.out.println("-- received quitAll(): shutting down");
+ stopEventLoop();
+ System.exit(0);
+ }
+
+ protected String printAverage(long start_time) {
+ long tmp_time=System.currentTimeMillis() - start_time;
+ long reads=num_reads.sum(), writes=num_writes.sum();
+ double reqs_sec=(reads+writes) / (tmp_time / 1000.0);
+ return String.format("%,.0f reqs/sec (%,d reads %,d writes)", reqs_sec, reads, writes);
+ }
+
+
+ public void set(String field_name, Object value) {
+ Field field=Util.getField(this.getClass(),field_name);
+ if(field == null)
+ System.err.println("Field " + field_name + " not found");
+ else {
+ Util.setField(field, this, value);
+ System.out.println(field.getName() + "=" + value);
+ }
+ }
+
+ public byte[] get(@SuppressWarnings("UnusedParameters") long key) {
+ return BUFFER;
+ }
+
+
+ @SuppressWarnings("UnusedParameters")
+ public void put(long key, byte[] val) {
+ }
+
+ public Config getConfig() {
+ Config config=new Config();
+ for(Field field: Util.getAllDeclaredFieldsWithAnnotations(UPerf.class, Property.class)) {
+ if(field.isAnnotationPresent(Property.class)) {
+ config.add(field.getName(), Util.getField(field, this));
+ }
+ }
+ return config;
+ }
+
+ protected Results startTest() throws Throwable {
BUFFER=new byte[msg_size];
System.out.printf("running for %d seconds\n", time);
@@ -213,47 +278,15 @@ public Results startTest() throws Throwable {
return new Results((int)num_reads.sum(), (int)num_writes.sum(), total_time, avg_gets, avg_puts);
}
- public void quitAll() {
- System.out.println("-- received quitAll(): shutting down");
- stopEventLoop();
- System.exit(0);
- }
-
- protected String printAverage(long start_time) {
- long tmp_time=System.currentTimeMillis() - start_time;
- long reads=num_reads.sum(), writes=num_writes.sum();
- double reqs_sec=(reads+writes) / (tmp_time / 1000.0);
- return String.format("%,.0f reqs/sec (%,d reads %,d writes)", reqs_sec, reads, writes);
- }
-
-
- public void set(String field_name, Object value) {
- Field field=Util.getField(this.getClass(),field_name);
- if(field == null)
- System.err.println("Field " + field_name + " not found");
- else {
- Util.setField(field, this, value);
- System.out.println(field.getName() + "=" + value);
+ protected void sendResults(Address dest, Results results) {
+ RequestOptions options=new RequestOptions(ResponseMode.GET_NONE, 0)
+ .flags(Message.Flag.OOB, Message.Flag.DONT_BUNDLE, Message.Flag.NO_FC);
+ try {
+ disp.callRemoteMethod(dest, new MethodCall(ACCEPT_RESULTS, local_addr, results), options);
}
- }
-
- public byte[] get(@SuppressWarnings("UnusedParameters") long key) {
- return BUFFER;
- }
-
-
- @SuppressWarnings("UnusedParameters")
- public void put(long key, byte[] val) {
- }
-
- public Config getConfig() {
- Config config=new Config();
- for(Field field: Util.getAllDeclaredFieldsWithAnnotations(UPerf.class, Property.class)) {
- if(field.isAnnotationPresent(Property.class)) {
- config.add(field.getName(), Util.getField(field, this));
- }
+ catch(Exception ex) {
+ System.err.printf("failed sending results to %s: %s", dest, ex);
}
- return config;
}
protected void applyConfig(Config config) {
@@ -263,6 +296,7 @@ protected void applyConfig(Config config) {
}
}
+
// ================================= end of callbacks =====================================
@@ -342,27 +376,29 @@ public void eventLoop() {
/** Kicks off the benchmark on all cluster nodes */
- void startBenchmark() {
- RspList responses=null;
+ protected void startBenchmark() {
+ results_coll.reset(members);
+
try {
- RequestOptions options=new RequestOptions(ResponseMode.GET_ALL, 0)
+ RequestOptions options=new RequestOptions(ResponseMode.GET_NONE, 0)
.flags(Message.Flag.OOB, Message.Flag.DONT_BUNDLE, Message.Flag.NO_FC);
- responses=disp.callRemoteMethods(null, new MethodCall(START), options);
+ disp.callRemoteMethods(null, new MethodCall(START_TEST, local_addr), options);
}
catch(Throwable t) {
System.err.println("starting the benchmark failed: " + t);
return;
}
- long total_reqs=0;
- long total_time=0;
+ long total_reqs=0, total_time=0;
AverageMinMax avg_gets=null, avg_puts=null;
+ long time_to_wait=(long)(time * 1000 * 1.1); // add 10% more
+ results_coll.waitForAllResponses(time_to_wait);
+ Map results=results_coll.getResults();
System.out.println("\n======================= Results: ===========================");
- for(Map.Entry> entry: responses.entrySet()) {
+ for(Map.Entry entry: results.entrySet()) {
Address mbr=entry.getKey();
- Rsp rsp=entry.getValue();
- Results result=rsp.getValue();
+ Results result=entry.getValue();
if(result != null) {
total_reqs+=result.num_gets + result.num_puts;
total_time+=result.total_time;