Skip to content

Commit

Permalink
UPerf/MPerf now send the START RPC/message asynchronpously (https://i…
Browse files Browse the repository at this point in the history
  • Loading branch information
belaban committed Oct 18, 2022
1 parent 5542f0e commit 6780549
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 69 deletions.
14 changes: 8 additions & 6 deletions tests/perf/org/jgroups/tests/perf/MPerf.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -43,7 +44,6 @@ public class MPerf implements Receiver {
protected boolean oob;
protected boolean log_local=true; // default: same behavior as before
protected boolean display_msg_src=false;
protected long start_time; // set on reception of START message
protected MessageCounter received_msgs_map=new MessageCounter();
protected final List<Address> members=new CopyOnWriteArrayList<>();
protected final Log log=LogFactory.getLog(getClass());
Expand Down Expand Up @@ -277,10 +277,12 @@ public void receive(Message msg) {
if(my_rank >= 0 && my_rank > num_senders)
isSender = false;
}
start_time=System.currentTimeMillis();
Result r=sendMessages(isSender);
System.out.println("-- done");
sendNoException(msg.getSrc(), r, MPerfHeader.RESULT, Message.Flag.OOB);
final boolean is_sender=isSender;
CompletableFuture.supplyAsync(() -> {
Result r=sendMessages(is_sender);
System.out.println("-- done");
return r;
}).thenAccept(r -> sendNoException(msg.src(), r, MPerfHeader.RESULT, Message.Flag.OOB));
break;

case MPerfHeader.RESULT:
Expand Down Expand Up @@ -381,7 +383,7 @@ protected Result sendMessages(boolean isSender) {
final Thread[] senders=new Thread[num_threads];
final CountDownLatch latch=new CountDownLatch(1);
final byte[] payload=new byte[msg_size];
final AtomicBoolean running = new AtomicBoolean(true);
final AtomicBoolean running=new AtomicBoolean(true);

received_msgs_map.reset();

Expand Down
162 changes: 99 additions & 63 deletions tests/perf/org/jgroups/tests/perf/UPerf.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.LongAdder;

Expand All @@ -31,16 +32,17 @@
* @author Bela Ban
*/
public class UPerf implements Receiver {
private JChannel channel;
private Address local_addr;
private RpcDispatcher disp;
static final String groupname="uperf";
protected final List<Address> members=new ArrayList<>();
protected volatile View view;
protected volatile boolean looping=true;
protected final LongAdder num_reads=new LongAdder();
protected final LongAdder num_writes=new LongAdder();
protected ThreadFactory thread_factory;
private JChannel channel;
private Address local_addr;
private RpcDispatcher disp;
static final String groupname="uperf";
protected final List<Address> members=new ArrayList<>();
protected volatile View view;
protected volatile boolean looping=true;
protected final LongAdder num_reads=new LongAdder();
protected final LongAdder num_writes=new LongAdder();
protected ThreadFactory thread_factory;
protected final ResponseCollector<Results> results_coll=new ResponseCollector<>();



Expand All @@ -57,19 +59,20 @@ public class UPerf implements Receiver {
// ... add your own here, just don't forget to annotate them with @Property
// =======================================================

private static final Method[] METHODS=new Method[6];
private static final short START = 0;
private static final Method[] METHODS=new Method[7];
private static final short START_TEST = 0;
private static final short GET = 1;
private static final short PUT = 2;
private static final short GET_CONFIG = 3;
private static final short SET = 4;
private static final short QUIT_ALL = 5;
private static final short ACCEPT_RESULTS = 6;

protected static final Field OOB, NUM_THREADS, TIME, RPC_TIMEOUT, MSG_SIZE, ANYCAST_COUNT,
READ_PERCENTAGE, PRINT_INVOKERS, PRINT_DETAILS;


private byte[] BUFFER=new byte[msg_size];
private byte[] BUFFER=new byte[msg_size];
protected static final String format=
"[1] Start test [2] View [4] Threads (%d) [6] Time (%,ds) [7] Msg size (%s)" +
"\n[o] OOB (%b) [t] RPC timeout (%,dms) [a] Anycast count (%d) [r] Read percentage (%.2f) " +
Expand All @@ -79,12 +82,13 @@ public class UPerf implements Receiver {

static {
try {
METHODS[START] = UPerf.class.getMethod("startTest");
METHODS[START_TEST] = UPerf.class.getMethod("startTest", Address.class);
METHODS[GET] = UPerf.class.getMethod("get", long.class);
METHODS[PUT] = UPerf.class.getMethod("put", long.class, byte[].class);
METHODS[GET_CONFIG] = UPerf.class.getMethod("getConfig");
METHODS[SET] = UPerf.class.getMethod("set", String.class, Object.class);
METHODS[QUIT_ALL] = UPerf.class.getMethod("quitAll");
METHODS[ACCEPT_RESULTS] = UPerf.class.getMethod("acceptResults", Address.class, Results.class);

OOB=Util.getField(UPerf.class, "oob", true);
NUM_THREADS=Util.getField(UPerf.class, "num_threads", true);
Expand Down Expand Up @@ -154,12 +158,73 @@ public void viewAccepted(View new_view) {
System.out.println("** view: " + new_view);
members.clear();
members.addAll(new_view.getMembers());
results_coll.retainAll(members);
}


// =================================== callbacks ======================================

public Results startTest() throws Throwable {
public void startTest(Address sender) throws Throwable {
CompletableFuture.supplyAsync(() -> {
try {
return startTest();
}
catch(Throwable ex) {
System.err.printf("failure running the test: %s", ex);
return null;
}
}).thenAccept(r -> sendResults(sender, r));
}

/** Called by worker when done; response to {@link #startTest()} */
public void acceptResults(Address sender, Results results) {
results_coll.add(sender, results);
}

public void quitAll() {
System.out.println("-- received quitAll(): shutting down");
stopEventLoop();
System.exit(0);
}

protected String printAverage(long start_time) {
long tmp_time=System.currentTimeMillis() - start_time;
long reads=num_reads.sum(), writes=num_writes.sum();
double reqs_sec=(reads+writes) / (tmp_time / 1000.0);
return String.format("%,.0f reqs/sec (%,d reads %,d writes)", reqs_sec, reads, writes);
}


public void set(String field_name, Object value) {
Field field=Util.getField(this.getClass(),field_name);
if(field == null)
System.err.println("Field " + field_name + " not found");
else {
Util.setField(field, this, value);
System.out.println(field.getName() + "=" + value);
}
}

public byte[] get(@SuppressWarnings("UnusedParameters") long key) {
return BUFFER;
}


@SuppressWarnings("UnusedParameters")
public void put(long key, byte[] val) {
}

public Config getConfig() {
Config config=new Config();
for(Field field: Util.getAllDeclaredFieldsWithAnnotations(UPerf.class, Property.class)) {
if(field.isAnnotationPresent(Property.class)) {
config.add(field.getName(), Util.getField(field, this));
}
}
return config;
}

protected Results startTest() throws Throwable {
BUFFER=new byte[msg_size];

System.out.printf("running for %d seconds\n", time);
Expand Down Expand Up @@ -213,47 +278,15 @@ public Results startTest() throws Throwable {
return new Results((int)num_reads.sum(), (int)num_writes.sum(), total_time, avg_gets, avg_puts);
}

public void quitAll() {
System.out.println("-- received quitAll(): shutting down");
stopEventLoop();
System.exit(0);
}

protected String printAverage(long start_time) {
long tmp_time=System.currentTimeMillis() - start_time;
long reads=num_reads.sum(), writes=num_writes.sum();
double reqs_sec=(reads+writes) / (tmp_time / 1000.0);
return String.format("%,.0f reqs/sec (%,d reads %,d writes)", reqs_sec, reads, writes);
}


public void set(String field_name, Object value) {
Field field=Util.getField(this.getClass(),field_name);
if(field == null)
System.err.println("Field " + field_name + " not found");
else {
Util.setField(field, this, value);
System.out.println(field.getName() + "=" + value);
protected void sendResults(Address dest, Results results) {
RequestOptions options=new RequestOptions(ResponseMode.GET_NONE, 0)
.flags(Message.Flag.OOB, Message.Flag.DONT_BUNDLE, Message.Flag.NO_FC);
try {
disp.callRemoteMethod(dest, new MethodCall(ACCEPT_RESULTS, local_addr, results), options);
}
}

public byte[] get(@SuppressWarnings("UnusedParameters") long key) {
return BUFFER;
}


@SuppressWarnings("UnusedParameters")
public void put(long key, byte[] val) {
}

public Config getConfig() {
Config config=new Config();
for(Field field: Util.getAllDeclaredFieldsWithAnnotations(UPerf.class, Property.class)) {
if(field.isAnnotationPresent(Property.class)) {
config.add(field.getName(), Util.getField(field, this));
}
catch(Exception ex) {
System.err.printf("failed sending results to %s: %s", dest, ex);
}
return config;
}

protected void applyConfig(Config config) {
Expand All @@ -263,6 +296,7 @@ protected void applyConfig(Config config) {
}
}


// ================================= end of callbacks =====================================


Expand Down Expand Up @@ -342,27 +376,29 @@ public void eventLoop() {


/** Kicks off the benchmark on all cluster nodes */
void startBenchmark() {
RspList<Results> responses=null;
protected void startBenchmark() {
results_coll.reset(members);

try {
RequestOptions options=new RequestOptions(ResponseMode.GET_ALL, 0)
RequestOptions options=new RequestOptions(ResponseMode.GET_NONE, 0)
.flags(Message.Flag.OOB, Message.Flag.DONT_BUNDLE, Message.Flag.NO_FC);
responses=disp.callRemoteMethods(null, new MethodCall(START), options);
disp.callRemoteMethods(null, new MethodCall(START_TEST, local_addr), options);
}
catch(Throwable t) {
System.err.println("starting the benchmark failed: " + t);
return;
}

long total_reqs=0;
long total_time=0;
long total_reqs=0, total_time=0;
AverageMinMax avg_gets=null, avg_puts=null;
long time_to_wait=(long)(time * 1000 * 1.1); // add 10% more
results_coll.waitForAllResponses(time_to_wait);
Map<Address,Results> results=results_coll.getResults();

System.out.println("\n======================= Results: ===========================");
for(Map.Entry<Address,Rsp<Results>> entry: responses.entrySet()) {
for(Map.Entry<Address,Results> entry: results.entrySet()) {
Address mbr=entry.getKey();
Rsp<Results> rsp=entry.getValue();
Results result=rsp.getValue();
Results result=entry.getValue();
if(result != null) {
total_reqs+=result.num_gets + result.num_puts;
total_time+=result.total_time;
Expand Down

0 comments on commit 6780549

Please sign in to comment.