Skip to content

Commit

Permalink
Enable tree data to be queried by a table view [PART-1]
Browse files Browse the repository at this point in the history
  • Loading branch information
JackieTien97 authored Jan 2, 2025
1 parent be9a05f commit 256737f
Show file tree
Hide file tree
Showing 41 changed files with 3,091 additions and 1,265 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import org.apache.iotdb.commons.path.AlignedFullPath;
import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
import org.apache.iotdb.db.queryengine.execution.aggregation.timerangeiterator.ITableTimeRangeIterator;
import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
import org.apache.iotdb.db.queryengine.execution.operator.source.AbstractDataSourceOperator;
Expand All @@ -31,6 +30,7 @@
import org.apache.iotdb.db.queryengine.execution.operator.window.TimeWindow;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.SeriesScanOptions;
import org.apache.iotdb.db.queryengine.plan.relational.metadata.AlignedDeviceEntry;
import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema;
import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry;
import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
Expand All @@ -52,7 +52,6 @@
import org.apache.tsfile.read.common.block.column.RunLengthEncodedColumn;
import org.apache.tsfile.utils.Binary;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.utils.RamUsageEstimator;
import org.apache.tsfile.write.schema.IMeasurementSchema;

import java.io.IOException;
Expand All @@ -61,7 +60,6 @@
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil.satisfiedTimeRange;
Expand All @@ -73,9 +71,6 @@

public abstract class AbstractAggTableScanOperator extends AbstractDataSourceOperator {

private static final long INSTANCE_SIZE =
RamUsageEstimator.shallowSizeOfInstance(AbstractAggTableScanOperator.class);

private boolean finished = false;
private TsBlock inputTsBlock;

Expand Down Expand Up @@ -116,53 +111,39 @@ public abstract class AbstractAggTableScanOperator extends AbstractDataSourceOpe

private boolean allAggregatorsHasFinalResult = false;

public AbstractAggTableScanOperator(
PlanNodeId sourceId,
OperatorContext context,
List<ColumnSchema> aggColumnSchemas,
int[] aggColumnsIndexArray,
List<DeviceEntry> deviceEntries,
int deviceCount,
SeriesScanOptions seriesScanOptions,
List<String> measurementColumnNames,
Set<String> allSensors,
List<IMeasurementSchema> measurementSchemas,
List<TableAggregator> tableAggregators,
List<ColumnSchema> groupingKeySchemas,
int[] groupingKeyIndex,
ITableTimeRangeIterator tableTimeRangeIterator,
boolean ascending,
boolean canUseStatistics,
List<Integer> aggregatorInputChannels) {

this.sourceId = sourceId;
this.operatorContext = context;
this.canUseStatistics = canUseStatistics;
this.tableAggregators = tableAggregators;
this.groupingKeySchemas = groupingKeySchemas;
this.groupingKeyIndex = groupingKeyIndex;
this.groupingKeySize = groupingKeySchemas == null ? 0 : groupingKeySchemas.size();
this.aggColumnSchemas = aggColumnSchemas;
this.aggColumnsIndexArray = aggColumnsIndexArray;
this.deviceEntries = deviceEntries;
this.deviceCount = deviceCount;
public AbstractAggTableScanOperator(AbstractAggTableScanOperatorParameter parameter) {

this.sourceId = parameter.sourceId;
this.operatorContext = parameter.context;
this.canUseStatistics = parameter.canUseStatistics;
this.tableAggregators = parameter.tableAggregators;
this.groupingKeySchemas = parameter.groupingKeySchemas;
this.groupingKeyIndex = parameter.groupingKeyIndex;
this.groupingKeySize =
parameter.groupingKeySchemas == null ? 0 : parameter.groupingKeySchemas.size();
this.aggColumnSchemas = parameter.aggColumnSchemas;
this.aggColumnsIndexArray = parameter.aggColumnsIndexArray;
this.deviceEntries = parameter.deviceEntries;
this.deviceCount = parameter.deviceCount;
this.operatorContext.recordSpecifiedInfo(DEVICE_NUMBER, Integer.toString(this.deviceCount));
this.ascending = ascending;
this.scanOrder = ascending ? Ordering.ASC : Ordering.DESC;
this.seriesScanOptions = seriesScanOptions;
this.measurementColumnNames = measurementColumnNames;
this.ascending = parameter.ascending;
this.scanOrder = parameter.ascending ? Ordering.ASC : Ordering.DESC;
this.seriesScanOptions = parameter.seriesScanOptions;
this.measurementColumnNames = parameter.measurementColumnNames;
this.measurementCount = measurementColumnNames.size();
this.cachedRawDataSize =
(1L + this.measurementCount)
* TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
this.allSensors = allSensors;
this.measurementSchemas = measurementSchemas;
this.allSensors = parameter.allSensors;
this.measurementSchemas = parameter.measurementSchemas;
this.measurementColumnTSDataTypes =
measurementSchemas.stream().map(IMeasurementSchema::getType).collect(Collectors.toList());
parameter.measurementSchemas.stream()
.map(IMeasurementSchema::getType)
.collect(Collectors.toList());
this.currentDeviceIndex = 0;
this.operatorContext.recordSpecifiedInfo(CURRENT_DEVICE_INDEX_STRING, Integer.toString(0));
this.aggregatorInputChannels = aggregatorInputChannels;
this.timeIterator = tableTimeRangeIterator;
this.aggregatorInputChannels = parameter.aggregatorInputChannels;
this.timeIterator = parameter.tableTimeRangeIterator;
this.dateBinSize =
timeIterator.getType() == ITableTimeRangeIterator.TimeIteratorType.DATE_BIN_TIME_ITERATOR
? 1
Expand All @@ -179,56 +160,6 @@ public boolean isFinished() throws Exception {
return finished;
}

@Override
public boolean hasNext() throws Exception {
if (retainedTsBlock != null) {
return true;
}

return timeIterator.hasCachedTimeRange() || timeIterator.hasNextTimeRange();
}

@Override
public TsBlock next() throws Exception {
if (retainedTsBlock != null) {
return getResultFromRetainedTsBlock();
}

// optimize for sql: select count(*) from (select count(s1), sum(s1) from table)
if (tableAggregators.isEmpty()
&& timeIterator.getType() == ITableTimeRangeIterator.TimeIteratorType.SINGLE_TIME_ITERATOR
&& resultTsBlockBuilder.getValueColumnBuilders().length == 0) {
resultTsBlockBuilder.reset();
currentDeviceIndex = deviceCount;
timeIterator.setFinished();
Column[] valueColumns = new Column[0];
return new TsBlock(1, new RunLengthEncodedColumn(TIME_COLUMN_TEMPLATE, 1), valueColumns);
}

// start stopwatch, reset leftRuntimeOfOneNextCall
long start = System.nanoTime();
leftRuntimeOfOneNextCall = 1000 * operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS);
long maxRuntime = leftRuntimeOfOneNextCall;

while (System.nanoTime() - start < maxRuntime
&& (timeIterator.hasCachedTimeRange() || timeIterator.hasNextTimeRange())
&& !resultTsBlockBuilder.isFull()) {

// calculate aggregation result on current time window
// return true if current time window is calc finished
if (calculateAggregationResultForCurrentTimeRange()) {
timeIterator.resetCurTimeRange();
}
}

if (resultTsBlockBuilder.isEmpty()) {
return null;
}

buildResultTsBlock();
return checkTsBlockSizeAndGetResult();
}

protected abstract void updateResultTsBlock();

protected void buildResultTsBlock() {
Expand All @@ -244,7 +175,7 @@ protected void constructAlignedSeriesScanUtil() {

if (this.deviceEntries.isEmpty() || this.deviceEntries.get(this.currentDeviceIndex) == null) {
// for device which is not exist
deviceEntry = new DeviceEntry(new StringArrayDeviceID(""), Collections.emptyList());
deviceEntry = new AlignedDeviceEntry(new StringArrayDeviceID(""), Collections.emptyList());
} else {
deviceEntry = this.deviceEntries.get(this.currentDeviceIndex);
}
Expand Down Expand Up @@ -422,12 +353,9 @@ private Column buildValueColumn(
case TIME:
return inputRegion.getTimeColumn();
case TAG:
// TODO avoid create deviceStatics multi times; count, sum can use time statistics
String id =
(String)
deviceEntries
.get(currentDeviceIndex)
.getNthSegment(aggColumnsIndexArray[columnIdx] + 1);
getNthIdColumnValue(
deviceEntries.get(currentDeviceIndex), aggColumnsIndexArray[columnIdx]);
return getIdOrAttrColumn(
inputRegion.getTimeColumn().getPositionCount(),
id == null ? null : new Binary(id, TSFileConfig.STRING_CHARSET));
Expand Down Expand Up @@ -493,12 +421,9 @@ private Statistics buildStatistics(
case TIME:
return timeStatistics;
case TAG:
// TODO avoid create deviceStatics multi times; count, sum can use time statistics
String id =
(String)
deviceEntries
.get(currentDeviceIndex)
.getNthSegment(aggColumnsIndexArray[columnIdx] + 1);
getNthIdColumnValue(
deviceEntries.get(currentDeviceIndex), aggColumnsIndexArray[columnIdx]);
return getStatistics(
timeStatistics, id == null ? null : new Binary(id, TSFileConfig.STRING_CHARSET));
case ATTRIBUTE:
Expand Down Expand Up @@ -723,7 +648,7 @@ protected void appendGroupKeysToResult(List<DeviceEntry> deviceEntries, int devi
ColumnBuilder[] columnBuilders = resultTsBlockBuilder.getValueColumnBuilders();
for (int i = 0; i < groupingKeySize; i++) {
if (TsTableColumnCategory.TAG == groupingKeySchemas.get(i).getColumnCategory()) {
String id = (String) deviceEntries.get(deviceIndex).getNthSegment(groupingKeyIndex[i] + 1);
String id = getNthIdColumnValue(deviceEntries.get(deviceIndex), groupingKeyIndex[i]);
if (id == null) {
columnBuilders[i].appendNull();
} else {
Expand Down Expand Up @@ -839,19 +764,101 @@ public long calculateRetainedSizeAfterCallingNext() {
: 0;
}

@Override
public long ramBytesUsed() {
return INSTANCE_SIZE
+ MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(seriesScanUtil)
+ MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(operatorContext)
+ MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(sourceId)
+ (resultTsBlockBuilder == null ? 0 : resultTsBlockBuilder.getRetainedSizeInBytes())
+ RamUsageEstimator.sizeOfCollection(deviceEntries);
}

@Override
public void close() throws Exception {
super.close();
tableAggregators.forEach(TableAggregator::close);
}

abstract String getNthIdColumnValue(DeviceEntry deviceEntry, int idColumnIndex);

public static class AbstractAggTableScanOperatorParameter {
private final String timeColumnName;
protected final PlanNodeId sourceId;
protected final OperatorContext context;
protected final List<ColumnSchema> aggColumnSchemas;
protected final int[] aggColumnsIndexArray;
protected final SeriesScanOptions seriesScanOptions;
protected final List<String> measurementColumnNames;
protected final Set<String> allSensors;
protected final List<IMeasurementSchema> measurementSchemas;
protected final List<TableAggregator> tableAggregators;
protected final List<ColumnSchema> groupingKeySchemas;
protected final int[] groupingKeyIndex;
protected final ITableTimeRangeIterator tableTimeRangeIterator;
protected final boolean ascending;
protected final boolean canUseStatistics;
protected final List<Integer> aggregatorInputChannels;

protected List<DeviceEntry> deviceEntries;
protected int deviceCount;

public AbstractAggTableScanOperatorParameter(
PlanNodeId sourceId,
OperatorContext context,
List<ColumnSchema> aggColumnSchemas,
int[] aggColumnsIndexArray,
List<DeviceEntry> deviceEntries,
int deviceCount,
SeriesScanOptions seriesScanOptions,
List<String> measurementColumnNames,
Set<String> allSensors,
List<IMeasurementSchema> measurementSchemas,
List<TableAggregator> tableAggregators,
List<ColumnSchema> groupingKeySchemas,
int[] groupingKeyIndex,
ITableTimeRangeIterator tableTimeRangeIterator,
boolean ascending,
boolean canUseStatistics,
List<Integer> aggregatorInputChannels,
String timeColumnName) {
this.sourceId = sourceId;
this.context = context;
this.aggColumnSchemas = aggColumnSchemas;
this.aggColumnsIndexArray = aggColumnsIndexArray;
this.deviceEntries = deviceEntries;
this.deviceCount = deviceCount;
this.seriesScanOptions = seriesScanOptions;
this.measurementColumnNames = measurementColumnNames;
this.allSensors = allSensors;
this.measurementSchemas = measurementSchemas;
this.tableAggregators = tableAggregators;
this.groupingKeySchemas = groupingKeySchemas;
this.groupingKeyIndex = groupingKeyIndex;
this.tableTimeRangeIterator = tableTimeRangeIterator;
this.ascending = ascending;
this.canUseStatistics = canUseStatistics;
this.aggregatorInputChannels = aggregatorInputChannels;
this.timeColumnName = timeColumnName;
}

public List<TableAggregator> getTableAggregators() {
return tableAggregators;
}

public SeriesScanOptions getSeriesScanOptions() {
return seriesScanOptions;
}

public Set<String> getAllSensors() {
return allSensors;
}

public List<String> getMeasurementColumnNames() {
return measurementColumnNames;
}

public List<IMeasurementSchema> getMeasurementSchemas() {
return measurementSchemas;
}

public String getTimeColumnName() {
return timeColumnName;
}

public void setDeviceEntries(List<DeviceEntry> deviceEntries) {
this.deviceEntries = deviceEntries;
this.deviceCount = deviceEntries.size();
}
}
}
Loading

0 comments on commit 256737f

Please sign in to comment.