Skip to content

Commit

Permalink
chore(sql): fix missing allocator initialization in parallel group by (
Browse files Browse the repository at this point in the history
  • Loading branch information
puzpuzpuz authored Oct 8, 2024
1 parent 78973fe commit c833e6f
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,13 @@ private static boolean consumeQueue(
reduce(workerId, record, circuitBreaker, task, frameSequence, stealingFrameSequence);
}
} catch (Throwable th) {
LOG.error()
.$("reduce error [error=").$(th)
.$(", id=").$(frameSequence.getId())
.$(", taskType=").$(task.getType())
.$(", frameIndex=").$(task.getFrameIndex())
.$(", frameCount=").$(frameSequence.getFrameCount())
.I$();
int interruptReason = SqlExecutionCircuitBreaker.STATE_OK;
if (th instanceof CairoException) {
CairoException e = (CairoException) th;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,44 @@

package io.questdb.griffin.engine.table;

import io.questdb.cairo.*;
import io.questdb.cairo.map.*;
import io.questdb.cairo.sql.*;
import io.questdb.cairo.ArrayColumnTypes;
import io.questdb.cairo.CairoConfiguration;
import io.questdb.cairo.ColumnTypes;
import io.questdb.cairo.ListColumnFilter;
import io.questdb.cairo.RecordSink;
import io.questdb.cairo.RecordSinkFactory;
import io.questdb.cairo.Reopenable;
import io.questdb.cairo.map.Map;
import io.questdb.cairo.map.MapFactory;
import io.questdb.cairo.map.MapKey;
import io.questdb.cairo.map.MapRecord;
import io.questdb.cairo.map.MapValue;
import io.questdb.cairo.sql.ExecutionCircuitBreaker;
import io.questdb.cairo.sql.Function;
import io.questdb.cairo.sql.RecordCursor;
import io.questdb.cairo.sql.SqlExecutionCircuitBreaker;
import io.questdb.cairo.sql.StatefulAtom;
import io.questdb.cairo.sql.SymbolTableSource;
import io.questdb.cairo.vm.api.MemoryCARW;
import io.questdb.griffin.PlanSink;
import io.questdb.griffin.Plannable;
import io.questdb.griffin.SqlException;
import io.questdb.griffin.SqlExecutionContext;
import io.questdb.griffin.engine.PerWorkerLocks;
import io.questdb.griffin.engine.functions.GroupByFunction;
import io.questdb.griffin.engine.groupby.*;
import io.questdb.griffin.engine.groupby.GroupByAllocator;
import io.questdb.griffin.engine.groupby.GroupByAllocatorFactory;
import io.questdb.griffin.engine.groupby.GroupByFunctionsUpdater;
import io.questdb.griffin.engine.groupby.GroupByFunctionsUpdaterFactory;
import io.questdb.griffin.engine.groupby.GroupByUtils;
import io.questdb.jit.CompiledFilter;
import io.questdb.std.*;
import io.questdb.std.BytecodeAssembler;
import io.questdb.std.LongList;
import io.questdb.std.Misc;
import io.questdb.std.Numbers;
import io.questdb.std.ObjList;
import io.questdb.std.QuietCloseable;
import io.questdb.std.Transient;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

Expand All @@ -59,6 +84,8 @@ public class AsyncGroupByAtom implements StatefulAtom, Closeable, Reopenable, Pl
private final GroupByAllocator ownerAllocator;
private final Function ownerFilter;
private final MapFragment ownerFragment;
// Note: all function updaters should be used through a getFunctionUpdater() call
// to properly initialize group by functions' allocator.
private final GroupByFunctionsUpdater ownerFunctionUpdater;
private final ObjList<GroupByFunction> ownerGroupByFunctions;
private final ObjList<Function> ownerKeyFunctions;
Expand Down Expand Up @@ -363,6 +390,9 @@ public Map mergeOwnerMap() {
lastSharded = false;
final Map destMap = ownerFragment.reopenMap();
final int perWorkerMapCount = perWorkerFragments.size();
// Make sure to set the allocator for the owner's group by functions.
// This is done by the getFunctionUpdater() method.
final GroupByFunctionsUpdater functionUpdater = getFunctionUpdater(-1);

// Calculate medians before the merge.
final MapStats stats = lastOwnerStats;
Expand Down Expand Up @@ -391,7 +421,7 @@ public Map mergeOwnerMap() {
// Now do the actual merge.
for (int i = 0; i < perWorkerMapCount; i++) {
final Map srcMap = perWorkerFragments.getQuick(i).getMap();
destMap.merge(srcMap, ownerFunctionUpdater);
destMap.merge(srcMap, functionUpdater);
srcMap.close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,12 @@
import io.questdb.griffin.SqlExecutionContext;
import io.questdb.griffin.engine.PerWorkerLocks;
import io.questdb.griffin.engine.functions.GroupByFunction;
import io.questdb.griffin.engine.groupby.*;
import io.questdb.griffin.engine.groupby.GroupByAllocator;
import io.questdb.griffin.engine.groupby.GroupByAllocatorFactory;
import io.questdb.griffin.engine.groupby.GroupByFunctionsUpdater;
import io.questdb.griffin.engine.groupby.GroupByFunctionsUpdaterFactory;
import io.questdb.griffin.engine.groupby.GroupByUtils;
import io.questdb.griffin.engine.groupby.SimpleMapValue;
import io.questdb.jit.CompiledFilter;
import io.questdb.std.BytecodeAssembler;
import io.questdb.std.Misc;
Expand All @@ -55,6 +60,8 @@ public class AsyncGroupByNotKeyedAtom implements StatefulAtom, Closeable, Planna
private final CompiledFilter compiledFilter;
private final GroupByAllocator ownerAllocator;
private final Function ownerFilter;
// Note: all function updaters should be used through a getFunctionUpdater() call
// to properly initialize group by functions' allocator.
private final GroupByFunctionsUpdater ownerFunctionUpdater;
private final ObjList<GroupByFunction> ownerGroupByFunctions;
private final SimpleMapValue ownerMapValue;
Expand Down Expand Up @@ -124,11 +131,14 @@ public AsyncGroupByNotKeyedAtom(

@Override
public void clear() {
ownerFunctionUpdater.updateEmpty(ownerMapValue);
// Make sure to set the allocator for the owner's group by functions.
// This is done by the getFunctionUpdater() method.
final GroupByFunctionsUpdater functionUpdater = getFunctionUpdater(-1);
functionUpdater.updateEmpty(ownerMapValue);
ownerMapValue.setNew(true);
for (int i = 0, n = perWorkerMapValues.size(); i < n; i++) {
SimpleMapValue value = perWorkerMapValues.getQuick(i);
ownerFunctionUpdater.updateEmpty(value);
functionUpdater.updateEmpty(value);
value.setNew(true);
}
if (perWorkerGroupByFunctions != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,13 @@
import io.questdb.MessageBus;
import io.questdb.cairo.AbstractRecordCursorFactory;
import io.questdb.cairo.CairoConfiguration;
import io.questdb.cairo.sql.*;
import io.questdb.cairo.sql.Function;
import io.questdb.cairo.sql.PageFrameMemory;
import io.questdb.cairo.sql.PageFrameMemoryRecord;
import io.questdb.cairo.sql.RecordCursor;
import io.questdb.cairo.sql.RecordCursorFactory;
import io.questdb.cairo.sql.RecordMetadata;
import io.questdb.cairo.sql.SqlExecutionCircuitBreaker;
import io.questdb.cairo.sql.async.PageFrameReduceTask;
import io.questdb.cairo.sql.async.PageFrameReduceTaskFactory;
import io.questdb.cairo.sql.async.PageFrameReducer;
Expand All @@ -41,7 +47,11 @@
import io.questdb.griffin.engine.groupby.SimpleMapValue;
import io.questdb.jit.CompiledFilter;
import io.questdb.mp.SCSequence;
import io.questdb.std.*;
import io.questdb.std.BytecodeAssembler;
import io.questdb.std.DirectLongList;
import io.questdb.std.Misc;
import io.questdb.std.ObjList;
import io.questdb.std.Transient;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

Expand Down Expand Up @@ -305,7 +315,7 @@ static void applyFilter(Function filter, DirectLongList rows, PageFrameMemoryRec
protected void _close() {
Misc.free(base);
Misc.free(cursor);
Misc.freeObjList(groupByFunctions);
Misc.free(frameSequence);
Misc.freeObjList(groupByFunctions);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,21 @@
package io.questdb.griffin.engine.table;

import io.questdb.MessageBus;
import io.questdb.cairo.*;
import io.questdb.cairo.AbstractRecordCursorFactory;
import io.questdb.cairo.ArrayColumnTypes;
import io.questdb.cairo.CairoConfiguration;
import io.questdb.cairo.ListColumnFilter;
import io.questdb.cairo.RecordSink;
import io.questdb.cairo.map.Map;
import io.questdb.cairo.map.MapKey;
import io.questdb.cairo.map.MapValue;
import io.questdb.cairo.sql.*;
import io.questdb.cairo.sql.Function;
import io.questdb.cairo.sql.PageFrameMemory;
import io.questdb.cairo.sql.PageFrameMemoryRecord;
import io.questdb.cairo.sql.RecordCursor;
import io.questdb.cairo.sql.RecordCursorFactory;
import io.questdb.cairo.sql.RecordMetadata;
import io.questdb.cairo.sql.SqlExecutionCircuitBreaker;
import io.questdb.cairo.sql.async.PageFrameReduceTask;
import io.questdb.cairo.sql.async.PageFrameReduceTaskFactory;
import io.questdb.cairo.sql.async.PageFrameReducer;
Expand All @@ -43,7 +53,11 @@
import io.questdb.griffin.engine.groupby.GroupByRecordCursorFactory;
import io.questdb.jit.CompiledFilter;
import io.questdb.mp.SCSequence;
import io.questdb.std.*;
import io.questdb.std.BytecodeAssembler;
import io.questdb.std.DirectLongList;
import io.questdb.std.Misc;
import io.questdb.std.ObjList;
import io.questdb.std.Transient;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

Expand Down Expand Up @@ -391,7 +405,7 @@ private static void filterAndAggregate(
protected void _close() {
Misc.free(base);
Misc.free(cursor);
Misc.freeObjList(recordFunctions); // groupByFunctions are included in recordFunctions
Misc.free(frameSequence);
Misc.freeObjList(recordFunctions); // groupByFunctions are included in recordFunctions
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import java.util.concurrent.atomic.AtomicReference;

public class ConcurrentTcpSenderBootstrapTest extends AbstractBootstrapTest {

private static final int CONCURRENCY_LEVEL = 64;

@Before
Expand All @@ -59,6 +58,11 @@ public void setUp() {
dbPath.parent().$();
}

@Override
public void tearDown() throws Exception {
super.tearDown();
ILP_WORKER_COUNT = 1;
}

@Test
public void testConcurrentAuth() throws Exception {
Expand Down

0 comments on commit c833e6f

Please sign in to comment.