Skip to content

Commit

Permalink
Allow multiple concurrent DynamicTableRow writers (#2805)
Browse files Browse the repository at this point in the history
  • Loading branch information
nbauernfeind authored and devinrsmith committed Sep 8, 2022
1 parent 8de443f commit 97876a3
Showing 1 changed file with 28 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@
* The DynamicTableWriter creates an in-memory table using ArrayBackedColumnSources of the type specified in the
* constructor. You can retrieve the table using the {@code getTable} function.
* <p>
* This class is not thread safe, you must synchronize externally.
* This class is not thread safe, you must synchronize externally. However, multiple setters may safely log
* concurrently.
*/
public class DynamicTableWriter implements TableWriter {
private final UpdateSourceQueryTable table;
Expand Down Expand Up @@ -782,32 +783,34 @@ public PermissiveRowSetter getSetter(final String name) {

@Override
public void writeRow() {
boolean doFlush = false;
switch (flags) {
case SingleRow:
doFlush = true;
case StartTransaction:
if (lastCommittedRow != lastSetterRow) {
lastSetterRow = lastCommittedRow + 1;
}
break;
case EndTransaction:
doFlush = true;
break;
case None:
break;
}
row = lastSetterRow++;
synchronized (DynamicTableWriter.this) {
boolean doFlush = false;
switch (flags) {
case SingleRow:
doFlush = true;
case StartTransaction:
if (lastCommittedRow != lastSetterRow) {
lastSetterRow = lastCommittedRow + 1;
}
break;
case EndTransaction:
doFlush = true;
break;
case None:
break;
}
row = lastSetterRow++;

// Before this row can be returned to a pool, it needs to ensure that the underlying sources
// are appropriately sized to avoid race conditions.
ensureCapacity(row);
columnToSetter.values().forEach((x) -> x.setRow(row));
// Before this row can be returned to a pool, it needs to ensure that the underlying sources
// are appropriately sized to avoid race conditions.
ensureCapacity(row);
columnToSetter.values().forEach((x) -> x.setRow(row));

// The row has been committed during set, we just need to insert the row keys into the table
if (doFlush) {
DynamicTableWriter.this.addRangeToTableIndex(lastCommittedRow + 1, row);
lastCommittedRow = row;
// The row has been committed during set, we just need to insert the row keys into the table
if (doFlush) {
DynamicTableWriter.this.addRangeToTableIndex(lastCommittedRow + 1, row);
lastCommittedRow = row;
}
}
}

Expand Down

0 comments on commit 97876a3

Please sign in to comment.