diff --git a/extensions/csv/src/main/java/io/deephaven/csv/CsvSpecs.java b/extensions/csv/src/main/java/io/deephaven/csv/CsvSpecs.java
index 1fb56527dc1..ea6a1763bbc 100644
--- a/extensions/csv/src/main/java/io/deephaven/csv/CsvSpecs.java
+++ b/extensions/csv/src/main/java/io/deephaven/csv/CsvSpecs.java
@@ -263,7 +263,7 @@ public Charset charset() {
}
/**
- * Should the CSVReader run asynchronously for better performance.
+ * Should the CSVReader run its processing steps on multiple threads for better performance.
* @return the async flag
*/
@Default
diff --git a/extensions/csv/src/main/java/io/deephaven/csv/containers/ByteSlice.java b/extensions/csv/src/main/java/io/deephaven/csv/containers/ByteSlice.java
index 6100c8563d7..178b092a2fc 100644
--- a/extensions/csv/src/main/java/io/deephaven/csv/containers/ByteSlice.java
+++ b/extensions/csv/src/main/java/io/deephaven/csv/containers/ByteSlice.java
@@ -34,18 +34,6 @@ public byte back() {
return data[end - 1];
}
- public void copyTo(byte[] dest, int destOffset) {
- for (int cur = begin; cur != end; ++cur) {
- dest[destOffset++] = data[cur];
- }
- }
-
- public void copyTo(char[] dest, int destOffset) {
- for (int cur = begin; cur != end; ++cur) {
- dest[destOffset++] = (char)data[cur];
- }
- }
-
public byte[] data() { return data; }
public int begin() { return begin; }
public int end() { return end; }
diff --git a/extensions/csv/src/main/java/io/deephaven/csv/densestorage/DenseStorageReader.java b/extensions/csv/src/main/java/io/deephaven/csv/densestorage/DenseStorageReader.java
index 1cfdfc6ae84..5271dbd52ec 100644
--- a/extensions/csv/src/main/java/io/deephaven/csv/densestorage/DenseStorageReader.java
+++ b/extensions/csv/src/main/java/io/deephaven/csv/densestorage/DenseStorageReader.java
@@ -3,9 +3,10 @@
import io.deephaven.csv.containers.ByteSlice;
import io.deephaven.csv.containers.CharSlice;
+import io.deephaven.csv.util.CsvReaderException;
/**
- * Companion to the DenseStorageWriter.
+ * Companion to the {@link DenseStorageWriter}. See the documentation there for details.
*/
public final class DenseStorageReader {
/**
@@ -33,11 +34,11 @@ public final class DenseStorageReader {
*/
private final int[] intHolder = new int[1];
- public DenseStorageReader(QueueReader.IntReader controlReader,
- QueueReader.ByteReader byteReader,
- QueueReader.CharReader charReader,
- QueueReader.ByteArrayReader largeByteArrayReader,
- QueueReader.CharArrayReader largeCharArrayReader) {
+ public DenseStorageReader(final QueueReader.IntReader controlReader,
+ final QueueReader.ByteReader byteReader,
+ final QueueReader.CharReader charReader,
+ final QueueReader.ByteArrayReader largeByteArrayReader,
+ final QueueReader.CharArrayReader largeCharArrayReader) {
this.controlReader = controlReader;
this.byteReader = byteReader;
this.charReader = charReader;
@@ -45,7 +46,8 @@ public DenseStorageReader(QueueReader.IntReader controlReader,
this.largeCharArrayReader = largeCharArrayReader;
}
- public boolean tryGetNextSlice(ByteSlice bs, CharSlice cs, boolean[] nextIsBytes) {
+ public boolean tryGetNextSlice(final ByteSlice bs, final CharSlice cs, final boolean[] nextIsBytes)
+ throws CsvReaderException {
if (!controlReader.tryGetInt(intHolder)) {
return false;
}
@@ -70,10 +72,10 @@ public boolean tryGetNextSlice(ByteSlice bs, CharSlice cs, boolean[] nextIsBytes
return true;
}
- private static void mustSucceed(boolean success, String what) {
+ private static void mustSucceed(final boolean success, final String what) throws CsvReaderException {
if (success) {
return;
}
- throw new RuntimeException("Data unexpectedly exhausted: " + what);
+ throw new CsvReaderException("Data unexpectedly exhausted: " + what);
}
}
diff --git a/extensions/csv/src/main/java/io/deephaven/csv/densestorage/DenseStorageWriter.java b/extensions/csv/src/main/java/io/deephaven/csv/densestorage/DenseStorageWriter.java
index 4fdfc09e9ab..e5b34a3a650 100644
--- a/extensions/csv/src/main/java/io/deephaven/csv/densestorage/DenseStorageWriter.java
+++ b/extensions/csv/src/main/java/io/deephaven/csv/densestorage/DenseStorageWriter.java
@@ -3,16 +3,82 @@
import io.deephaven.csv.containers.CharSlice;
/**
- * The point of this object is to store data with a small fraction of overhead. "Large" objects (byte or char sequences
- * with length >= a threshold) are stored directly. "Small" objects (byte or char sequences with a smaller length) are
- * compacted into byte and char pools.
+ * The DenseStorageWriter and {@link DenseStorageReader} work in tandem, forming a FIFO queue. The DenseStorageWriter writes
+ * data, and the {@link DenseStorageReader} reads that data. If the {@link DenseStorageReader} "catches up", it will block until
+ * the DenseStorageWriter provides more data, or indicates that it is done (via the {@link #finish()} method.
+ * This synchronization is done at "block" granularity, so the DenseStorageReader can only proceed when the
+ * DenseStorageWriter has written at least a "block" of data or is done. We allow multiple independent
+ * {@link DenseStorageReader}s to consume the same underlying data. In our implementation this is used so our
+ * type inferencer can take a second "pass" over the same input data.
+ *
+ * The point of this object is to store a sequence of (character sequences aka "strings", but not java.lang.String),
+ * using a small fraction of overhead. The problem with storing every character sequence as a java.lang.String is:
+ *
+ *
Per-object overhead (8 or 16 bytes)
+ *
The memory cost of holding a reference to that String (4 or 8 bytes)
+ *
The string has to know its length (4 bytes)
+ *
Java characters are 2 bytes even though in practice many strings are ASCII-only and their chars can fit in a byte
+ *
+ *
+ * For small strings (say the word "hello" or the input text "12345.6789" ) the overhead can be 100% or worse.
+ *
+ * For our purposes we:
+ *
+ *
Only need sequential access. i.e. we don't need random access into the sequence of "strings". So we can support
+ * a model where we can have a forward-only cursor moving over the sequence of "strings".
+ *
Don't need to give our caller a data structure that they can hold on to. The caller only gets a "view"
+ * (a slice) of the current "string" data. The view is invalidated when they move to the next "string"
+ *
+ *
+ * Furthermore we:
+ *
+ *
Offer a FIFO model where the reader (in a separate thread) can chase the writer but there is not an
+ * inordinate amount of synchronization overhead.
+ *
Have the ability to make multiple Readers which pass over the same underlying data. This is our
+ * low-drama way of allowing our client to make multiple passes over the data, without complicating the iteration
+ * interface, with, e.g., a reset method.
+ *
Use a linked-list structure so that when all existing readers have move passed a block of data, that
+ * block can be freed by the garbage collector.
+ *
+ *
+ * If you are familiar with the structure of our inference, you may initially think that this reader-chasing-writer
+ * garbage collection trick doesn't buy us much because we have a two-phase parser. However, when the inferencer has
+ * gotten to the last parser in its set of allowable parsers (say, the String parser), or the user has specified
+ * that there is only one parser for this column, then the code doesn't need to do any inference and can parse the
+ * column in one pass. (TODO(kosak): one-pass not implemented yet. Coming shortly).
+ *
+ * The implementation used here is to look at the "string" being added to the writer and categorize it along two
+ * dimensions:
+ *
+ *
Small vs large
+ *
Byte vs char
+ *
+ *
+ * These dimensions are broken out in the following way:
+ *
Small byte "strings" are packed into a byte block, and we maintain a linked list of these byte blocks
+ *
Small char "strings" are packed into a char block, and we maintain a linked list of these char blocks
+ *
"Large" objects (byte or char sequences with length >= a threshold) are stored directly, meaning a byte[] or char[]
+ * array is allocated for their data, then a reference to that array is added to a byte-array or char-array block.
+ * (And again, we maintain a linked list of these byte-array or char-array blocks).
+ * It is not typical for CSV data to contain a cell this large, but the feature is there for completeness.
+ * We do not want want large "strings" to contaminate our byte and char blocks because they would not likely pack
+ * into them tightly. It's OK to keep them on their own because by definition, large "strings" are not going to have
+ * much overhead, as a percentage of their size.
+ *
+ *
*/
public final class DenseStorageWriter {
/**
- * The ints in this array indicate where the next item is stored: Integer.MIN_VALUE: largeStringWriter
- * Integer.MAX_VALUE: largeByteWriter == 0: no bytes or characters, so they're not stored anywhere otherwise < 0:
- * charWriter (the number of chars is the negative of this value) otherwise >= 0 : byteWriter (the number of bytes
- * is equal to this value)
+ * The ints in this array indicate where the next item is stored:
+ *
+ *
Integer.MIN_VALUE: largeStringWriter.
+ *
Integer.MAX_VALUE: largeByteWriter.
+ *
== 0: no bytes or characters, so they're not stored anywhere. Will be interpreted as a ByteSlice with
+ * arbitrary byte data and length 0.
+ *
< 0:charWriter (the number of chars is the negative of this value)
+ *
> 0:byteWriter (the number of chars is equal to this value)
+ *
+ *
*/
private final QueueWriter.IntWriter controlWriter;
/**
diff --git a/extensions/csv/src/main/java/io/deephaven/csv/densestorage/QueueNode.java b/extensions/csv/src/main/java/io/deephaven/csv/densestorage/QueueNode.java
index b231cb7ca07..ad6436c6c67 100644
--- a/extensions/csv/src/main/java/io/deephaven/csv/densestorage/QueueNode.java
+++ b/extensions/csv/src/main/java/io/deephaven/csv/densestorage/QueueNode.java
@@ -1,10 +1,18 @@
package io.deephaven.csv.densestorage;
+/**
+ * Linked list node that holds data for a {@link DenseStorageWriter} or {@link DenseStorageReader}.
+ * All fields are immutable except the "next" field. Synchronization for reading/writing the "next" field
+ * is managed by the {@link DenseStorageWriter} and {@link DenseStorageReader}.
+ */
public final class QueueNode {
public final TARRAY data;
public final int begin;
public final int end;
public final boolean isLast;
+ /**
+ * Readers and writers of this field have arranged to synchronize with each other.
+ */
public QueueNode next;
public QueueNode(TARRAY data, int begin, int end, boolean isLast) {
diff --git a/extensions/csv/src/main/java/io/deephaven/csv/densestorage/QueueReader.java b/extensions/csv/src/main/java/io/deephaven/csv/densestorage/QueueReader.java
index d40c1941eb6..0f4c1b8b112 100644
--- a/extensions/csv/src/main/java/io/deephaven/csv/densestorage/QueueReader.java
+++ b/extensions/csv/src/main/java/io/deephaven/csv/densestorage/QueueReader.java
@@ -3,6 +3,9 @@
import io.deephaven.csv.containers.ByteSlice;
import io.deephaven.csv.containers.CharSlice;
+/**
+ * Companion to the {@link QueueWriter}. See the documentation there for details.
+ */
public class QueueReader {
private final Object sync;
private QueueNode node;
@@ -105,6 +108,9 @@ public boolean tryGetChars(int size, CharSlice cs) {
}
}
+ /**
+ * A QueueReader specialized for bytes.
+ */
public static final class ByteReader extends QueueReader {
private byte[] typedBlock;
@@ -131,6 +137,9 @@ public boolean tryGetBytes(int size, ByteSlice bs) {
}
}
+ /**
+ * A QueueReader specialized for ints.
+ */
public static final class IntReader extends QueueReader {
private int[] typedBlock;
@@ -157,6 +166,9 @@ public boolean tryGetInt(int[] result) {
}
}
+ /**
+ * A QueueReader specialized for byte arrays.
+ */
public static final class ByteArrayReader extends QueueReader {
private byte[][] typedBlock;
@@ -183,6 +195,9 @@ public boolean tryGetBytes(ByteSlice bs) {
}
}
+ /**
+ * A QueueReader specialized for char arrays.
+ */
public static final class CharArrayReader extends QueueReader {
private char[][] typedBlock;
diff --git a/extensions/csv/src/main/java/io/deephaven/csv/densestorage/QueueWriter.java b/extensions/csv/src/main/java/io/deephaven/csv/densestorage/QueueWriter.java
index 8b86c668c89..7885109537c 100644
--- a/extensions/csv/src/main/java/io/deephaven/csv/densestorage/QueueWriter.java
+++ b/extensions/csv/src/main/java/io/deephaven/csv/densestorage/QueueWriter.java
@@ -5,6 +5,22 @@
import java.util.function.BiFunction;
import java.util.function.IntFunction;
+/**
+ * The various QueueWriters ({@link ByteWriter}, {@link CharWriter}, etc.)
+ * work in tandem with their corresponding {@link QueueReader}s ({@link QueueReader.ByteReader},
+ * {@link QueueReader.CharReader}, etc), forming a FIFO queue. The QueueWriter writes
+ * data, and the {@link QueueReader} reads that data. If the {@link QueueReader} "catches up", it will block until
+ * the QueueWriter provides more data, or indicates that it is done (via the {@link #finish()} method.
+ * This synchronization is done at "block" granularity, so the {@link QueueReader} can only proceed when the
+ * QueueWriter has written at least a "block" of data or is done. We allow multiple independent
+ * {@link QueueReader}s to consume the same underlying data. In our implementation this is used so our
+ * type inferencer can take a second "pass" over the same input data.
+ *
+ * In our implementation the {@link DenseStorageWriter} and {@link DenseStorageReader} are built out of
+ * various QueueWriters and {@link QueueReader}s. This explains why the semantics of
+ * {@link DenseStorageWriter} and {@link DenseStorageReader} are similar to those of the underlying
+ * QueueWriters and {@link QueueReader}s.
+ */
public class QueueWriter {
private final Object sync;
private QueueNode tail;
@@ -17,9 +33,9 @@ public class QueueWriter {
protected int current;
protected int end;
- protected QueueWriter(int blobSize,
- IntFunction arrayFactory,
- BiFunction