From 93f8525eb160ac8527cefd9400fd99d7516eab1d Mon Sep 17 00:00:00 2001 From: Chendi Xue Date: Mon, 4 Jan 2021 10:49:14 +0800 Subject: [PATCH 1/2] [SCALA] Fix ColumnarBroadcastExchange didn't fallback issue when DPP is enabled Signed-off-by: Chendi Xue --- core/src/main/scala/com/intel/oap/ColumnarGuardRule.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/core/src/main/scala/com/intel/oap/ColumnarGuardRule.scala b/core/src/main/scala/com/intel/oap/ColumnarGuardRule.scala index bf42b5e9a..1757b4d31 100644 --- a/core/src/main/scala/com/intel/oap/ColumnarGuardRule.scala +++ b/core/src/main/scala/com/intel/oap/ColumnarGuardRule.scala @@ -151,6 +151,10 @@ case class ColumnarGuardRule(conf: SparkConf) extends Rule[SparkPlan] { */ private def insertRowGuardRecursive(plan: SparkPlan): SparkPlan = { plan match { + case p: ShuffleExchangeExec => + RowGuard(p.withNewChildren(p.children.map(insertRowGuardOrNot))) + case p: BroadcastExchangeExec => + RowGuard(p.withNewChildren(p.children.map(insertRowGuardOrNot))) case p: ShuffledHashJoinExec => RowGuard(p.withNewChildren(p.children.map(insertRowGuardRecursive))) case p if !supportCodegen(p) => From 9c51360edf3f347a644e716fcdf5d3b39b6f91e0 Mon Sep 17 00:00:00 2001 From: Chendi Xue Date: Mon, 4 Jan 2021 13:48:30 +0800 Subject: [PATCH 2/2] [SCALA & JAVA] Fix ExpressionEvaluator api change null pointer issue Signed-off-by: Chendi Xue --- .../java/com/intel/oap/vectorized/ExpressionEvaluator.java | 1 + .../spark/sql/execution/ColumnarBroadcastExchangeExec.scala | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/com/intel/oap/vectorized/ExpressionEvaluator.java b/core/src/main/java/com/intel/oap/vectorized/ExpressionEvaluator.java index 2da52068b..8b86c92ea 100644 --- a/core/src/main/java/com/intel/oap/vectorized/ExpressionEvaluator.java +++ b/core/src/main/java/com/intel/oap/vectorized/ExpressionEvaluator.java @@ -238,6 +238,7 @@ public void close() { } byte[] getSchemaBytesBuf(Schema schema) throws IOException { + if (schema == null) return null; ByteArrayOutputStream out = new ByteArrayOutputStream(); MessageSerializer.serialize(new WriteChannel(Channels.newChannel(out)), schema); return out.toByteArray(); diff --git a/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala b/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala index 7831ea3a2..36d0dfd0e 100644 --- a/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala +++ b/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala @@ -190,7 +190,8 @@ class ColumnarBroadcastExchangeExec(mode: BroadcastMode, child: SparkPlan) throw e } finally { val timeout: Int = SQLConf.get.broadcastTimeout.toInt - relation.asInstanceOf[ColumnarHashedRelation].countDownClose(timeout) + if (relation != null) + relation.asInstanceOf[ColumnarHashedRelation].countDownClose(timeout) } } }