Skip to content

Commit

Permalink
Fix deadlock when a sequence of long-ish strings appear in the input …
Browse files Browse the repository at this point in the history
…(bug deephaven#101)
  • Loading branch information
kosak committed Mar 1, 2023
1 parent f182f23 commit b0a9e48
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 15 deletions.
32 changes: 21 additions & 11 deletions src/main/java/io/deephaven/csv/densestorage/DenseStorageWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -121,28 +121,38 @@ private DenseStorageWriter(QueueWriter.IntWriter controlWriter, QueueWriter.Byte
* queues, depending on its size.
*/
public void append(final ByteSlice bs) {
final boolean fctrl;
final boolean fctrl, fdata;
final int size = bs.size();
if (size >= DenseStorageConstants.LARGE_THRESHOLD) {
final byte[] data = new byte[size];
bs.copyTo(data, 0);
largeByteArrayWriter.addByteArray(data);
fdata = largeByteArrayWriter.addByteArray(data);
fctrl = controlWriter.addInt(DenseStorageConstants.LARGE_BYTE_ARRAY_SENTINEL);
} else {
byteWriter.addBytes(bs);
fdata = byteWriter.addBytes(bs);
fctrl = controlWriter.addInt(size);
}
// If the control queue flushed, then flush all the data queues, so the reader doesn't block for
// a long time waiting for some unflushed data queue. One might worry it s inefficient to flush
// a data queue that is not full, but (a) in practice it doesn't happen very often and (b) in
// our queue code, partially-filled blocks can share non-overlapping parts (slices) of their
// underlying storage array, so it's not particularly wasteful. Put another way, flushing an
// empty queue does nothing; flushing a partially-filled queue allocates a new QueueNode but
// not a new underlying data array; flushing a full queue will allocated a new QueueNode and
// (lazily deferred until the next write) a new underlying data array.
// If the control queue flushes, then flush the data queues, so the reader doesn't block for
// a long time waiting for some unflushed data queue. Conversely, if some data queue flushes, then
// flush the control queue for the same reasons. Importantly, we also want to do this because our
// flow control is based on limiting the number of data queue blocks outstanding
// (per DenseStorageConstants.MAX_UNOBSERVED_BLOCKS). We want to flush the control queue every
// time we fill a block on the data queue, so the consumer has a chance to consume the data. If we
// did not do this, in some cases the data queue would run too far ahead, the flow control be invoked
// to block the writer, but the reader would also be blocked because it is still waiting on control queue
// notifications, which haven't arrived because the latest control queue block isn't full and hasn't
// been flushed yet. See https://github.com/deephaven/deephaven-csv/issues/101.
// One might worry that it is inefficient to flush a queue that is not full, but (a) in practice it
// doesn't happen very often and (b) in our queue code, partially-filled blocks can share
// non-overlapping parts (slices) of their underlying storage array, so it's not particularly wasteful.
// Put another way, flushing an empty queue does nothing; flushing a partially-filled queue allocates
// a new QueueNode but not a new underlying data array; flushing a full queue will allocate a new
// QueueNode and a new underlying data array (btw, that allocation is lazily deferred until the next write).
if (fctrl) {
byteWriter.flush();
largeByteArrayWriter.flush();
} else if (fdata) {
controlWriter.flush();
}
}

Expand Down
32 changes: 28 additions & 4 deletions src/test/java/io/deephaven/csv/CsvReaderTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import gnu.trove.list.array.TLongArrayList;
import gnu.trove.list.array.TShortArrayList;
import io.deephaven.csv.containers.ByteSlice;
import io.deephaven.csv.densestorage.DenseStorageConstants;
import io.deephaven.csv.parsers.DataType;
import io.deephaven.csv.parsers.IteratorHolder;
import io.deephaven.csv.parsers.Parser;
Expand All @@ -26,9 +27,7 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

import java.io.IOException;
import java.io.InputStream;
import java.io.StringReader;
import java.io.*;
import java.lang.reflect.Array;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
Expand Down Expand Up @@ -98,7 +97,7 @@ public void bug48() {

/**
* Addresses <a href="https://github.com/deephaven/deephaven-csv/issues/52">Deephaven CSV Issue #52</a>. When the
* bug exists, the library hangs (and this tests times out). When the bug is fixed, the test succeeds.
* bug exists, the library hangs (and this test times out). When the bug is fixed, the test succeeds.
*/
@Test
@Timeout(value = 30)
Expand Down Expand Up @@ -134,6 +133,31 @@ public void bug70() throws CsvReaderException {
invokeTest(defaultCsvBuilder().parsers(Parsers.DEFAULT).build(), input, expected);
}

/**
* Addresses <a href="https://github.com/deephaven/deephaven-csv/issues/101">Deephaven CSV Issue #101</a>. When the
* bug exists, the library deadlocks (and this test times out). When the bug is fixed, the test succeeds.
*/
@Test
public void bug101() throws CsvReaderException {
final int numCharsInBigCell = DenseStorageConstants.LARGE_THRESHOLD - 1;
final int numStringsThatFitInAQueueBlock = DenseStorageConstants.PACKED_QUEUE_SIZE / numCharsInBigCell;
final int numRowsThatWillTriggerTheDeadlock = numStringsThatFitInAQueueBlock * (DenseStorageConstants.MAX_UNOBSERVED_BLOCKS + 1);

final StringBuilder sb = new StringBuilder();
for (int i = 0; i < numCharsInBigCell; ++i) {
sb.append('X');
}
sb.append('\n');
final String bigCell = sb.toString();

final RepeatingInputStream inputStream =
new RepeatingInputStream("Col1\n", bigCell, numRowsThatWillTriggerTheDeadlock);
final CsvSpecs specs = defaultCsvBuilder()
.parsers(Collections.singletonList(Parsers.STRING))
.build();
CsvReader.read(specs, inputStream, makeMySinkFactory());
}

@Test
public void validates() {
final String lengthyMessage = "CsvSpecs failed validation for the following reasons: "
Expand Down

0 comments on commit b0a9e48

Please sign in to comment.