Skip to content

Commit

Permalink
Respond to review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
kosak committed Dec 14, 2021
1 parent 7877908 commit 75fae21
Show file tree
Hide file tree
Showing 21 changed files with 205 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ public Charset charset() {
}

/**
* Should the CSVReader run asynchronously for better performance.
* Should the CSVReader run its processing steps on multiple threads for better performance.
* @return the async flag
*/
@Default
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,6 @@ public byte back() {
return data[end - 1];
}

public void copyTo(byte[] dest, int destOffset) {
for (int cur = begin; cur != end; ++cur) {
dest[destOffset++] = data[cur];
}
}

public void copyTo(char[] dest, int destOffset) {
for (int cur = begin; cur != end; ++cur) {
dest[destOffset++] = (char)data[cur];
}
}

public byte[] data() { return data; }
public int begin() { return begin; }
public int end() { return end; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@

import io.deephaven.csv.containers.ByteSlice;
import io.deephaven.csv.containers.CharSlice;
import io.deephaven.csv.util.CsvReaderException;

/**
* Companion to the DenseStorageWriter.
* Companion to the {@link DenseStorageWriter}. See the documentation there for details.
*/
public final class DenseStorageReader {
/**
Expand Down Expand Up @@ -33,19 +34,20 @@ public final class DenseStorageReader {
*/
private final int[] intHolder = new int[1];

public DenseStorageReader(QueueReader.IntReader controlReader,
QueueReader.ByteReader byteReader,
QueueReader.CharReader charReader,
QueueReader.ByteArrayReader largeByteArrayReader,
QueueReader.CharArrayReader largeCharArrayReader) {
public DenseStorageReader(final QueueReader.IntReader controlReader,
final QueueReader.ByteReader byteReader,
final QueueReader.CharReader charReader,
final QueueReader.ByteArrayReader largeByteArrayReader,
final QueueReader.CharArrayReader largeCharArrayReader) {
this.controlReader = controlReader;
this.byteReader = byteReader;
this.charReader = charReader;
this.largeByteArrayReader = largeByteArrayReader;
this.largeCharArrayReader = largeCharArrayReader;
}

public boolean tryGetNextSlice(ByteSlice bs, CharSlice cs, boolean[] nextIsBytes) {
public boolean tryGetNextSlice(final ByteSlice bs, final CharSlice cs, final boolean[] nextIsBytes)
throws CsvReaderException {
if (!controlReader.tryGetInt(intHolder)) {
return false;
}
Expand All @@ -70,10 +72,10 @@ public boolean tryGetNextSlice(ByteSlice bs, CharSlice cs, boolean[] nextIsBytes
return true;
}

private static void mustSucceed(boolean success, String what) {
private static void mustSucceed(final boolean success, final String what) throws CsvReaderException {
if (success) {
return;
}
throw new RuntimeException("Data unexpectedly exhausted: " + what);
throw new CsvReaderException("Data unexpectedly exhausted: " + what);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,82 @@
import io.deephaven.csv.containers.CharSlice;

/**
* The point of this object is to store data with a small fraction of overhead. "Large" objects (byte or char sequences
* with length >= a threshold) are stored directly. "Small" objects (byte or char sequences with a smaller length) are
* compacted into byte and char pools.
* The DenseStorageWriter and {@link DenseStorageReader} work in tandem, forming a FIFO queue. The DenseStorageWriter writes
* data, and the {@link DenseStorageReader} reads that data. If the {@link DenseStorageReader} "catches up", it will block until
* the DenseStorageWriter provides more data, or indicates that it is done (via the {@link #finish()} method.
* This synchronization is done at "block" granularity, so the DenseStorageReader can only proceed when the
* DenseStorageWriter has written at least a "block" of data or is done. We allow multiple independent
* {@link DenseStorageReader}s to consume the same underlying data. In our implementation this is used so our
* type inferencer can take a second "pass" over the same input data.
*
* The point of this object is to store a sequence of (character sequences aka "strings", but not java.lang.String),
* using a small fraction of overhead. The problem with storing every character sequence as a java.lang.String is:
* <ol>
* <li>Per-object overhead (8 or 16 bytes)</li>
* <li>The memory cost of holding a reference to that String (4 or 8 bytes)</li>
* <li>The string has to know its length (4 bytes)</li>
* <li>Java characters are 2 bytes even though in practice many strings are ASCII-only and their chars can fit in a byte</li>
* </ol>
*
* For small strings (say the word "hello" or the input text "12345.6789" ) the overhead can be 100% or worse.
*
* For our purposes we:
* <ol>
* <li>Only need sequential access. i.e. we don't need random access into the sequence of "strings". So we can support
* a model where we can have a forward-only cursor moving over the sequence of "strings".</li>
* <li>Don't need to give our caller a data structure that they can hold on to. The caller only gets a "view"
* (a slice) of the current "string" data. The view is invalidated when they move to the next "string"</li>
* </ol>
*
* Furthermore we:
* <ol>
* <li>Offer a FIFO model where the reader (in a separate thread) can chase the writer but there is not an
* inordinate amount of synchronization overhead.</li>
* <li>Have the ability to make multiple Readers which pass over the same underlying data. This is our
* low-drama way of allowing our client to make multiple passes over the data, without complicating the iteration
* interface, with, e.g., a reset method.</li>
* <li>Use a linked-list structure so that when all existing readers have move passed a block of data, that
* block can be freed by the garbage collector.</li>
* </ol>
*
* If you are familiar with the structure of our inference, you may initially think that this reader-chasing-writer
* garbage collection trick doesn't buy us much because we have a two-phase parser. However, when the inferencer has
* gotten to the last parser in its set of allowable parsers (say, the String parser), or the user has specified
* that there is only one parser for this column, then the code doesn't need to do any inference and can parse the
* column in one pass. (TODO(kosak): one-pass not implemented yet. Coming shortly).
*
* The implementation used here is to look at the "string" being added to the writer and categorize it along two
* dimensions:
* <ul>
* <li>Small vs large</li>
* <li>Byte vs char</li>
* </ul>
*
* These dimensions are broken out in the following way:
* <li>Small byte "strings" are packed into a byte block, and we maintain a linked list of these byte blocks</li>
* <li>Small char "strings" are packed into a char block, and we maintain a linked list of these char blocks</li>
* <li>"Large" objects (byte or char sequences with length >= a threshold) are stored directly, meaning a byte[] or char[]
* array is allocated for their data, then a reference to that array is added to a byte-array or char-array block.
* (And again, we maintain a linked list of these byte-array or char-array blocks).
* It is not typical for CSV data to contain a cell this large, but the feature is there for completeness.
* We do not want want large "strings" to contaminate our byte and char blocks because they would not likely pack
* into them tightly. It's OK to keep them on their own because by definition, large "strings" are not going to have
* much overhead, as a percentage of their size.
* </li>
* </ul>
*/
public final class DenseStorageWriter {
/**
* The ints in this array indicate where the next item is stored: Integer.MIN_VALUE: largeStringWriter
* Integer.MAX_VALUE: largeByteWriter == 0: no bytes or characters, so they're not stored anywhere otherwise < 0:
* charWriter (the number of chars is the negative of this value) otherwise >= 0 : byteWriter (the number of bytes
* is equal to this value)
* The ints in this array indicate where the next item is stored:
* <ul>
* <li>Integer.MIN_VALUE: largeStringWriter.</li>
* <li>Integer.MAX_VALUE: largeByteWriter.</li>
* <li>== 0: no bytes or characters, so they're not stored anywhere. Will be interpreted as a ByteSlice with
* arbitrary byte data and length 0.</li>
* <li>&lt; 0:charWriter (the number of chars is the negative of this value)</li>
* <li>&gt; 0:byteWriter (the number of chars is equal to this value)</li>
* <li></li>
* </ul>
*/
private final QueueWriter.IntWriter controlWriter;
/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
package io.deephaven.csv.densestorage;

/**
* Linked list node that holds data for a {@link DenseStorageWriter} or {@link DenseStorageReader}.
* All fields are immutable except the "next" field. Synchronization for reading/writing the "next" field
* is managed by the {@link DenseStorageWriter} and {@link DenseStorageReader}.
*/
public final class QueueNode<TARRAY> {
public final TARRAY data;
public final int begin;
public final int end;
public final boolean isLast;
/**
* Readers and writers of this field have arranged to synchronize with each other.
*/
public QueueNode<TARRAY> next;

public QueueNode(TARRAY data, int begin, int end, boolean isLast) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
import io.deephaven.csv.containers.ByteSlice;
import io.deephaven.csv.containers.CharSlice;

/**
* Companion to the {@link QueueWriter}. See the documentation there for details.
*/
public class QueueReader<TARRAY> {
private final Object sync;
private QueueNode<TARRAY> node;
Expand Down Expand Up @@ -105,6 +108,9 @@ public boolean tryGetChars(int size, CharSlice cs) {
}
}

/**
* A QueueReader specialized for bytes.
*/
public static final class ByteReader extends QueueReader<byte[]> {
private byte[] typedBlock;

Expand All @@ -131,6 +137,9 @@ public boolean tryGetBytes(int size, ByteSlice bs) {
}
}

/**
* A QueueReader specialized for ints.
*/
public static final class IntReader extends QueueReader<int[]> {
private int[] typedBlock;

Expand All @@ -157,6 +166,9 @@ public boolean tryGetInt(int[] result) {
}
}

/**
* A QueueReader specialized for byte arrays.
*/
public static final class ByteArrayReader extends QueueReader<byte[][]> {
private byte[][] typedBlock;

Expand All @@ -183,6 +195,9 @@ public boolean tryGetBytes(ByteSlice bs) {
}
}

/**
* A QueueReader specialized for char arrays.
*/
public static final class CharArrayReader extends QueueReader<char[][]> {
private char[][] typedBlock;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,22 @@
import java.util.function.BiFunction;
import java.util.function.IntFunction;

/**
* The various QueueWriters ({@link ByteWriter}, {@link CharWriter}, etc.)
* work in tandem with their corresponding {@link QueueReader}s ({@link QueueReader.ByteReader},
* {@link QueueReader.CharReader}, etc), forming a FIFO queue. The QueueWriter writes
* data, and the {@link QueueReader} reads that data. If the {@link QueueReader} "catches up", it will block until
* the QueueWriter provides more data, or indicates that it is done (via the {@link #finish()} method.
* This synchronization is done at "block" granularity, so the {@link QueueReader} can only proceed when the
* QueueWriter has written at least a "block" of data or is done. We allow multiple independent
* {@link QueueReader}s to consume the same underlying data. In our implementation this is used so our
* type inferencer can take a second "pass" over the same input data.
*
* In our implementation the {@link DenseStorageWriter} and {@link DenseStorageReader} are built out of
* various QueueWriters and {@link QueueReader}s. This explains why the semantics of
* {@link DenseStorageWriter} and {@link DenseStorageReader} are similar to those of the underlying
* QueueWriters and {@link QueueReader}s.
*/
public class QueueWriter<TARRAY, TREADER> {
private final Object sync;
private QueueNode<TARRAY> tail;
Expand All @@ -17,9 +33,9 @@ public class QueueWriter<TARRAY, TREADER> {
protected int current;
protected int end;

protected QueueWriter(int blobSize,
IntFunction<TARRAY> arrayFactory,
BiFunction<Object, QueueNode<TARRAY>, TREADER> readerFactory) {
protected QueueWriter(final int blobSize,
final IntFunction<TARRAY> arrayFactory,
final BiFunction<Object, QueueNode<TARRAY>, TREADER> readerFactory) {
this.sync = new Object();
// Placeholder object at head of linked list
this.tail = new QueueNode<>(null, 0, 0, false);
Expand All @@ -33,6 +49,9 @@ protected QueueWriter(int blobSize,
this.end = 0;
}

/**
* Caller is finished writing.
*/
public void finish() {
flush(true);
genericBlock = null; // hygeine
Expand All @@ -41,6 +60,10 @@ public void finish() {
end = 0;
}

/**
* Make a {@link QueueReader} corresponding to this QueueWriter. You can make as many
* {@link QueueReader}s as you want, but you should make them before you start writing data.
*/
public TREADER newReader() {
if (!allowReaderCreation) {
throw new RuntimeException("Must allocate readers before writing any data");
Expand All @@ -49,8 +72,8 @@ public TREADER newReader() {
}

/**
* This supports an "early flush" for callers like DenseStorageWriter who want to flush all their queues from time
* to time.
* This supports an "early flush" for callers like {@link DenseStorageWriter} who want to flush all their queues
* from time to time.
*/
public void flush() {
flush(false);
Expand All @@ -64,8 +87,8 @@ public void flush() {
*/
private void flush(boolean isLast) {
// Sometimes our users ask us to flush even if there is nothing to flush.
// We need to flush "isLast" blocks (whether or not they contain data) and
// we need to flush blocks containing data. We don't need to flush empty blocks.
// If the block is an "isLast" block, we need to flush it regardless of whether it contains data.
// Otherwise (if the block is not an "isLast" block), we only flush it if it contains data.
if (!isLast && (begin == end)) {
// No need to flush.
return;
Expand Down Expand Up @@ -93,6 +116,9 @@ protected final TARRAY flushAndAllocate(int additional) {
return genericBlock;
}

/**
* A QueueWriter specialized for chars.
*/
public static final class CharWriter extends QueueWriter<char[], QueueReader.CharReader> {
private char[] block = null;

Expand All @@ -115,6 +141,9 @@ public boolean addChars(CharSlice cs) {
}
}

/**
* A QueueWriter specialized for bytes.
*/
public static final class ByteWriter extends QueueWriter<byte[], QueueReader.ByteReader> {
private byte[] block = null;

Expand All @@ -140,6 +169,9 @@ public boolean addBytesFromCharSlice(CharSlice cs) {
}
}

/**
* A QueueWriter specialized for ints.
*/
public static final class IntWriter extends QueueWriter<int[], QueueReader.IntReader> {
private int[] block = null;

Expand All @@ -160,7 +192,9 @@ public boolean addInt(int value) {
}
}


/**
* A QueueWriter specialized for byte arrays.
*/
public static final class ByteArrayWriter extends QueueWriter<byte[][], QueueReader.ByteArrayReader> {
private byte[][] block = null;

Expand All @@ -181,6 +215,9 @@ public boolean addByteArray(byte[] value) {
}
}

/**
* A QueueWriter specialized for char arrays.
*/
public static final class CharArrayWriter extends QueueWriter<char[][], QueueReader.CharArrayReader> {
private char[][] block = null;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,22 @@
import io.deephaven.csv.sinks.Sink;
import io.deephaven.csv.tokenization.Tokenizer;
import io.deephaven.csv.parsers.context.ParseContext;
import io.deephaven.csv.util.CsvReaderException;

public final class BooleanParser extends ParserBase<byte[]> {
public static BooleanParser INSTANCE = new BooleanParser();

private BooleanParser() {}

@Override
public Sink<byte[]> tryParse(ParseContext ctx, IteratorHolder ih, IteratorHolder ihAlt) {
public Sink<byte[]> tryParse(ParseContext ctx, IteratorHolder ih, IteratorHolder ihAlt) throws CsvReaderException {
return twoPhaseDriver(ctx, ih, ihAlt, BooleanParser::tryParseHelper,
ctx.sinkFactory::makeBooleanAsByteSink);
}

private static boolean tryParseHelper(ParseContext ctx, IteratorHolder ih,
Sink<byte[]> sink, long current, long end) {
Sink<byte[]> sink, long current, long end)
throws CsvReaderException {
final byte[] chunk = new byte[DEST_BLOCK_SIZE];
final boolean[] boolHolder = new boolean[1];
final Tokenizer t = ctx.tokenizer;
Expand Down
Loading

0 comments on commit 75fae21

Please sign in to comment.