Skip to content

Commit

Permalink
Add direct executor in non-concurrent case (#151)
Browse files Browse the repository at this point in the history
This allows us to perform the operations on thread as opposed to submitting to a single-threaded executor.

Additionally, breaks apart the Deephaven benchmarks into concurrent and non-concurrent versions.
  • Loading branch information
devinrsmith authored Oct 27, 2023
1 parent bfa6b67 commit 12cd517
Show file tree
Hide file tree
Showing 13 changed files with 84 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,18 @@ public static class ReusableStorage {
public final long[][] output = Util.makeArray(ROWS, COLS, long[]::new, long[][]::new);
}

@Benchmark
@OperationsPerInvocation(OPERATIONS)
public BenchmarkResult<long[]> deephavenSingle(final InputProvider input, final ReusableStorage storage)
throws Exception {
return DateTimeColumnParserDeephaven.read(input.tableMaker.makeStream(), storage.output, false);
}

@Benchmark
@OperationsPerInvocation(OPERATIONS)
public BenchmarkResult<long[]> deephaven(final InputProvider input, final ReusableStorage storage)
throws Exception {
return DateTimeColumnParserDeephaven.read(input.tableMaker.makeStream(), storage.output);
return DateTimeColumnParserDeephaven.read(input.tableMaker.makeStream(), storage.output, true);
}

@Benchmark
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@
import java.util.Collections;

public final class DateTimeColumnParserDeephaven {
public static BenchmarkResult<long[]> read(final InputStream in, final long[][] storage) throws Exception {
public static BenchmarkResult<long[]> read(final InputStream in, final long[][] storage, boolean concurrent)
throws Exception {
final SinkFactory sinkFactory = SinkFactories.makeRecyclingSinkFactory(null, null, null, null, null, storage);
final CsvSpecs specs = CsvSpecs.builder()
.parsers(Collections.singleton(Parsers.DATETIME))
.hasHeaderRow(true)
.concurrent(concurrent)
.build();
final CsvReader.Result result = CsvReader.read(specs, in, sinkFactory);
final long[][] data = Arrays.stream(result.columns())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,18 @@ public static class ReusableStorage {
public final double[][] output = Util.makeArray(ROWS, COLS, double[]::new, double[][]::new);
}

@Benchmark
@OperationsPerInvocation(OPERATIONS)
public BenchmarkResult<double[]> deephavenSingle(final InputProvider input, final ReusableStorage storage)
throws Exception {
return DoubleColumnParserDeephaven.read(input.tableMaker.makeStream(), storage.output, false);
}

@Benchmark
@OperationsPerInvocation(OPERATIONS)
public BenchmarkResult<double[]> deephaven(final InputProvider input, final ReusableStorage storage)
throws Exception {
return DoubleColumnParserDeephaven.read(input.tableMaker.makeStream(), storage.output);
return DoubleColumnParserDeephaven.read(input.tableMaker.makeStream(), storage.output, true);
}

@Benchmark
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@
import java.util.Collections;

public final class DoubleColumnParserDeephaven {
public static BenchmarkResult<double[]> read(final InputStream in, final double[][] storage) throws Exception {
public static BenchmarkResult<double[]> read(final InputStream in, final double[][] storage, boolean concurrent)
throws Exception {
final SinkFactory sinkFactory = SinkFactories.makeRecyclingSinkFactory(null, null, null, storage, null, null);
final CsvSpecs specs = CsvSpecs.builder()
.parsers(Collections.singleton(Parsers.DOUBLE))
.hasHeaderRow(true)
.concurrent(concurrent)
.build();
final CsvReader.Result result = CsvReader.read(specs, in, sinkFactory);
final double[][] data = Arrays.stream(result.columns())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,17 @@ public static class ReusableStorage {
public final int[][] output = Util.makeArray(ROWS, COLS, int[]::new, int[][]::new);
}

@Benchmark
@OperationsPerInvocation(OPERATIONS)
public BenchmarkResult<int[]> deephavenSingle(final InputProvider input, final ReusableStorage storage)
throws Exception {
return IntColumnParserDeephaven.read(input.tableMaker.makeStream(), storage.output, false);
}

@Benchmark
@OperationsPerInvocation(OPERATIONS)
public BenchmarkResult<int[]> deephaven(final InputProvider input, final ReusableStorage storage) throws Exception {
return IntColumnParserDeephaven.read(input.tableMaker.makeStream(), storage.output);
return IntColumnParserDeephaven.read(input.tableMaker.makeStream(), storage.output, true);
}

@Benchmark
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@
import java.util.Collections;

public final class IntColumnParserDeephaven {
public static BenchmarkResult<int[]> read(final InputStream in, final int[][] storage) throws Exception {
public static BenchmarkResult<int[]> read(final InputStream in, final int[][] storage, boolean concurrent)
throws Exception {
final SinkFactory sinkFactory = SinkFactories.makeRecyclingSinkFactory(null, storage, null, null, null, null);
final CsvSpecs specs = CsvSpecs.builder()
.parsers(Collections.singleton(Parsers.INT))
.hasHeaderRow(true)
.concurrent(concurrent)
.build();
final CsvReader.Result result = CsvReader.read(specs, in, sinkFactory);
final int[][] data = Arrays.stream(result.columns())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,15 @@ public static class ReusableStorage {
public final Results results = new Results(ROWS);
}

@Benchmark
public void deephavenSingle(InputProvider ip, final ReusableStorage storage, final Blackhole bh) throws Exception {
final Results results = LargeNumericOnlyDeephaven.read(ip.makeStream(), storage.results, false);
bh.consume(results);
}

@Benchmark
public void deephaven(InputProvider ip, final ReusableStorage storage, final Blackhole bh) throws Exception {
final Results results = LargeNumericOnlyDeephaven.read(ip.makeStream(), storage.results);
final Results results = LargeNumericOnlyDeephaven.read(ip.makeStream(), storage.results, true);
bh.consume(results);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,12 @@
import io.deephaven.csv.benchmark.util.SinkFactories;
import io.deephaven.csv.parsers.Parsers;
import io.deephaven.csv.reading.CsvReader;
import io.deephaven.csv.sinks.Sink;
import io.deephaven.csv.sinks.SinkFactory;

import java.io.InputStream;

public class LargeNumericOnlyDeephaven {
public static Results read(final InputStream in, final Results results) throws Exception {
public static Results read(final InputStream in, final Results results, boolean concurrent) throws Exception {
final SinkFactory sinkFactory = SinkFactories.makeRecyclingSinkFactory(
null,
null,
Expand All @@ -21,6 +20,7 @@ public static Results read(final InputStream in, final Results results) throws E

final CsvSpecs specs = CsvSpecs.builder()
.hasHeaderRow(true)
.concurrent(concurrent)
.putParserForIndex(1, Parsers.LONG)
.putParserForIndex(2, Parsers.LONG)
.putParserForIndex(3, Parsers.LONG)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package io.deephaven.csv.benchmark.largetable;

import io.deephaven.csv.benchmark.doublecol.DoubleColumnBenchmark;
import io.deephaven.csv.benchmark.util.Util;
import org.openjdk.jmh.annotations.*;
import org.openjdk.jmh.infra.Blackhole;
Expand Down Expand Up @@ -141,9 +140,15 @@ public static class ReusableStorage {
public final Results results = new Results(ROWS);
}

@Benchmark
public void deephavenSingle(InputProvider ip, final ReusableStorage storage, final Blackhole bh) throws Exception {
final Results results = LargeTableDeephaven.read(ip.makeStream(), storage.results, false);
bh.consume(results);
}

@Benchmark
public void deephaven(InputProvider ip, final ReusableStorage storage, final Blackhole bh) throws Exception {
final Results results = LargeTableDeephaven.read(ip.makeStream(), storage.results);
final Results results = LargeTableDeephaven.read(ip.makeStream(), storage.results, true);
bh.consume(results);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,12 @@
import io.deephaven.csv.benchmark.util.SinkFactories;
import io.deephaven.csv.parsers.Parsers;
import io.deephaven.csv.reading.CsvReader;
import io.deephaven.csv.sinks.Sink;
import io.deephaven.csv.sinks.SinkFactory;

import java.io.InputStream;

public class LargeTableDeephaven {
public static Results read(final InputStream in, final Results results) throws Exception {
public static Results read(final InputStream in, final Results results, boolean concurrent) throws Exception {
final SinkFactory sinkFactory = SinkFactories.makeRecyclingSinkFactory(
new byte[][] {results.boolsAsBytes},
null,
Expand All @@ -20,6 +19,7 @@ public static Results read(final InputStream in, final Results results) throws E
new long[][] {results.timestamps});
final CsvSpecs specs = CsvSpecs.builder()
.hasHeaderRow(true)
.concurrent(concurrent)
.putParserForIndex(1, Parsers.DATETIME)
.putParserForIndex(2, Parsers.STRING)
.putParserForIndex(3, Parsers.BOOLEAN)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,18 @@ public String[][] output() {
}
}

@Benchmark
@OperationsPerInvocation(OPERATIONS)
public BenchmarkResult<String[]> deephavenSingle(final InputProvider input, final ReusableStorage storage)
throws Exception {
return StringColumnParserDeephaven.read(input.tableMaker.makeStream(), storage.output(), false);
}

@Benchmark
@OperationsPerInvocation(OPERATIONS)
public BenchmarkResult<String[]> deephaven(final InputProvider input, final ReusableStorage storage)
throws Exception {
return StringColumnParserDeephaven.read(input.tableMaker.makeStream(), storage.output());
return StringColumnParserDeephaven.read(input.tableMaker.makeStream(), storage.output(), true);
}

@Benchmark
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@
import java.util.Collections;

public final class StringColumnParserDeephaven {
public static BenchmarkResult<String[]> read(final InputStream in, final String[][] storage) throws Exception {
public static BenchmarkResult<String[]> read(final InputStream in, final String[][] storage, boolean concurrent)
throws Exception {
final SinkFactory sinkFactory = SinkFactories.makeRecyclingSinkFactory(null, null, null, null, storage, null);
final CsvSpecs specs = CsvSpecs.builder()
.parsers(Collections.singleton(Parsers.STRING))
.hasHeaderRow(true)
.concurrent(concurrent)
.build();
final CsvReader.Result result = CsvReader.read(specs, in, sinkFactory);
final String[][] data = Arrays.stream(result.columns())
Expand Down
30 changes: 22 additions & 8 deletions src/main/java/io/deephaven/csv/reading/CsvReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,15 @@ public static Result read(final CsvSpecs specs, final InputStream stream, final
dsrs.add(new Moveable<>(pair.second));
}

// Select an Excecutor based on whether the user wants the code to run asynchronously
// or not.
final ExecutorService exec =
specs.concurrent()
? Executors.newFixedThreadPool(numOutputCols + 1)
: Executors.newSingleThreadExecutor();
// Select an Excecutor based on whether the user wants the code to run asynchronously or not.
final Executor exec;
final ExecutorService executorService;
if (specs.concurrent()) {
exec = executorService = Executors.newFixedThreadPool(numOutputCols + 1);
} else {
exec = DirectExecutor.INSTANCE;
executorService = null;
}
// We are generic on Object because we have a diversity of Future types (Long vs
// ParseDenseStorageToColumn.Result)
final ExecutorCompletionService<Object> ecs = new ExecutorCompletionService<>(exec);
Expand Down Expand Up @@ -155,8 +158,10 @@ public static Result read(final CsvSpecs specs, final InputStream stream, final
} catch (Throwable throwable) {
throw new CsvReaderException("Caught exception", throwable);
} finally {
// Tear down everything (interrupting the threads if necessary).
exec.shutdownNow();
if (executorService != null) {
// Tear down everything (interrupting the threads if necessary).
executorService.shutdownNow();
}
}
}

Expand Down Expand Up @@ -369,4 +374,13 @@ public DataType dataType() {
return dataType;
}
}

private enum DirectExecutor implements Executor {
INSTANCE;

@Override
public void execute(@NotNull Runnable command) {
command.run();
}
}
}

0 comments on commit 12cd517

Please sign in to comment.