Skip to content

Commit

Permalink
#384: Add | teragrep exec foreachbatch command that sets processing…
Browse files Browse the repository at this point in the history
… to batch mode (#390)

* update pth_03 to 9.1.0, initial step object and transformation for Teragrep ForEachBatch

* implement TeragrepForEachBatchStep and add tests

* apply spotless

* made TeragrepForEachBatchStep class final
  • Loading branch information
eemhu authored Oct 30, 2024
1 parent 68dba59 commit 04b3308
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 1 deletion.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
<teragrep.dpf_03.version>11.0.1</teragrep.dpf_03.version>
<teragrep.jpr_01.version>3.1.1</teragrep.jpr_01.version>
<teragrep.jue_01.version>0.4.3</teragrep.jue_01.version>
<teragrep.pth_03.version>9.0.0</teragrep.pth_03.version>
<teragrep.pth_03.version>9.1.0</teragrep.pth_03.version>
<teragrep.pth_06.version>3.2.2</teragrep.pth_06.version>
<teragrep.rlp_01.version>4.0.1</teragrep.rlp_01.version>
<teragrep.rlp_03.version>1.7.6</teragrep.rlp_03.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -607,4 +607,9 @@ public Node visitT_pathParameter(DPLParser.T_pathParameterContext ctx) {
String path = new UnquotedText(new TextString(ctx.stringType().getText())).read();
return new StringNode(new Token(Token.Type.STRING, path));
}

@Override
public Node visitT_forEachBatchParameter(DPLParser.T_forEachBatchParameterContext ctx) {
return new StepNode(new TeragrepForEachBatchStep());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Teragrep Data Processing Language (DPL) translator for Apache Spark (pth_10)
* Copyright (C) 2019-2024 Suomen Kanuuna Oy
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*
* Additional permission under GNU Affero General Public License version 3
* section 7
*
* If you modify this Program, or any covered work, by linking or combining it
* with other code, such other code is not for that reason alone subject to any
* of the requirements of the GNU Affero GPL version 3 as long as this Program
* is the same Program as licensed from Suomen Kanuuna Oy without any additional
* modifications.
*
* Supplemented terms under GNU Affero General Public License version 3
* section 7
*
* Origin of the software must be attributed to Suomen Kanuuna Oy. Any modified
* versions must be marked as "Modified version of" The Program.
*
* Names of the licensors and authors may not be used for publicity purposes.
*
* No rights are granted for use of trade names, trademarks, or service marks
* which are in The Program if any.
*
* Licensee must indemnify licensors and authors for any liability that these
* contractual assumptions impose on licensors and authors.
*
* To the extent this program is licensed as part of the Commercial versions of
* Teragrep, the applicable Commercial License may apply to this file if you as
* a licensee so wish it.
*/
package com.teragrep.pth10.steps.teragrep;

import com.teragrep.pth10.steps.AbstractStep;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.streaming.StreamingQueryException;

public final class TeragrepForEachBatchStep extends AbstractStep {

public TeragrepForEachBatchStep() {
this.properties.add(CommandProperty.SEQUENTIAL_ONLY); // Switch to sequential mode
this.properties.add(CommandProperty.USES_INTERNAL_BATCHCOLLECT); // Skip using batch collect
}

@Override
public Dataset<Row> get(Dataset<Row> dataset) throws StreamingQueryException {
return dataset;
}
}
28 changes: 28 additions & 0 deletions src/test/java/com/teragrep/pth10/TeragrepTransformationTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -791,4 +791,32 @@ public void tgHdfsSaveAfterBloomEstimateTestUsingRegexExtract() {
Assertions.assertEquals(Collections.singletonList("5"), listOfResult);
});
}

@Test
@DisabledIfSystemProperty(
named = "skipSparkTest",
matches = "true"
)
public void tgForEachBatchWithStatsTest() {
streamingTestUtil.performDPLTest("index=index_A | teragrep exec foreachbatch | stats count", testFile, ds -> {
List<String> listOfResult = ds
.select("count")
.collectAsList()
.stream()
.map(r -> r.getAs(0).toString())
.collect(Collectors.toList());
Assertions.assertEquals(Collections.singletonList("5"), listOfResult);
});
}

@Test
@DisabledIfSystemProperty(
named = "skipSparkTest",
matches = "true"
)
public void tgForEachBatchTest() {
streamingTestUtil.performDPLTest("index=index_A | teragrep exec foreachbatch", testFile, ds -> {
Assertions.assertEquals(5, ds.count());
});
}
}

0 comments on commit 04b3308

Please sign in to comment.