Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pipe: Pattern parsing pruning: When pattern is at the level below database, the parsing logic can be skipped if tsfiles / tablets completely match with the pattern #12049

Merged
merged 7 commits into from
Feb 20, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,12 @@ public void skipParsingTime() {
isTimeParsed = true;
}

public boolean shouldParsePatternOrTime() {
return !isPatternParsed || !isTimeParsed;
public boolean shouldParseTimeOrPattern() {
return shouldParseTime() || shouldParsePattern();
}

public boolean shouldParsePattern() {
return !isPatternParsed;
}

public boolean shouldParseTime() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,13 @@ public Tablet convertToTablet() {

/////////////////////////// parsePatternOrTime ///////////////////////////

@Override
public boolean shouldParsePattern() {
final InsertNode node = getInsertNodeViaCacheIfPossible();
return super.shouldParsePattern()
&& (Objects.isNull(node) || !node.getDevicePath().getFullPath().startsWith(pattern));
}

public PipeRawTabletInsertionEvent parseEventWithPatternOrTime() {
return new PipeRawTabletInsertionEvent(
convertToTablet(), isAligned, pipeName, pipeTaskMeta, this, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ public boolean isAligned() {
}

public Tablet convertToTablet() {
if (!shouldParsePatternOrTime()) {
if (!shouldParseTimeOrPattern()) {
return tablet;
}

Expand All @@ -206,7 +206,7 @@ public PipeRawTabletInsertionEvent parseEventWithPatternOrTime() {
}

public boolean hasNoNeedParsingAndIsEmpty() {
return !shouldParsePatternOrTime() && tablet.rowSize == 0;
return !shouldParseTimeOrPattern() && tablet.rowSize == 0;
}

/////////////////////////// Object ///////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,16 +181,41 @@ public boolean isEventTimeOverlappedWithTimeRange() {

/////////////////////////// TsFileInsertionEvent ///////////////////////////

@Override
public boolean shouldParseTimeOrPattern() {
boolean shouldParseTimeOrPattern = false;
try {
shouldParseTimeOrPattern = super.shouldParseTimeOrPattern();
return shouldParseTimeOrPattern;
} finally {
// Super method will call shouldParsePattern() and then init dataContainer at
// shouldParsePattern(). If shouldParsePattern() returns false, dataContainer will
// not be used, so we need to close the resource here.
if (!shouldParseTimeOrPattern) {
close();
}
}
}

@Override
public boolean shouldParsePattern() {
return super.shouldParsePattern() && initDataContainer().shouldParsePattern();
}

@Override
public Iterable<TabletInsertionEvent> toTabletInsertionEvents() {
return initDataContainer().toTabletInsertionEvents();
}

private TsFileInsertionDataContainer initDataContainer() {
try {
if (dataContainer == null) {
waitForTsFileClose();
dataContainer =
new TsFileInsertionDataContainer(
tsFile, getPattern(), startTime, endTime, pipeTaskMeta, this);
}
return dataContainer.toTabletInsertionEvents();
return dataContainer;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ public class TsFileInsertionDataContainer implements AutoCloseable {
private final Map<String, Boolean> deviceIsAlignedMap;
private final Map<String, TSDataType> measurementDataTypeMap;

private boolean shouldParsePattern = false;

public TsFileInsertionDataContainer(File tsFile, String pattern, long startTime, long endTime)
throws IOException {
this(tsFile, pattern, startTime, endTime, null, null);
Expand Down Expand Up @@ -161,6 +163,9 @@ else if (pattern.length() > deviceId.length() && pattern.startsWith(deviceId)) {
// high cost check comes later
&& pattern.endsWith(TsFileConstant.PATH_SEPARATOR + measurement)) {
filteredMeasurements.add(measurement);
} else {
// Parse pattern iff there are measurements filtered out
shouldParsePattern = true;
}
}

Expand Down Expand Up @@ -254,6 +259,10 @@ public TabletInsertionEvent next() {
};
}

public boolean shouldParsePattern() {
return shouldParsePattern;
}

@Override
public void close() {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public synchronized void collect(Event event) {
}

private void parseAndCollectEvent(PipeInsertNodeTabletInsertionEvent sourceEvent) {
if (sourceEvent.shouldParsePatternOrTime()) {
if (sourceEvent.shouldParseTimeOrPattern()) {
final PipeRawTabletInsertionEvent parsedEvent = sourceEvent.parseEventWithPatternOrTime();
if (!parsedEvent.hasNoNeedParsingAndIsEmpty()) {
collectEvent(parsedEvent);
Expand All @@ -89,7 +89,7 @@ private void parseAndCollectEvent(PipeInsertNodeTabletInsertionEvent sourceEvent
}

private void parseAndCollectEvent(PipeRawTabletInsertionEvent sourceEvent) {
if (sourceEvent.shouldParsePatternOrTime()) {
if (sourceEvent.shouldParseTimeOrPattern()) {
final PipeRawTabletInsertionEvent parsedEvent = sourceEvent.parseEventWithPatternOrTime();
if (!parsedEvent.hasNoNeedParsingAndIsEmpty()) {
collectEvent(parsedEvent);
Expand All @@ -107,7 +107,7 @@ private void parseAndCollectEvent(PipeTsFileInsertionEvent sourceEvent) throws E
return;
}

if (!sourceEvent.shouldParsePatternOrTime()) {
if (!sourceEvent.shouldParseTimeOrPattern()) {
collectEvent(sourceEvent);
return;
}
Expand Down
Loading