diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/extension/ColumnarOverrides.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/extension/ColumnarOverrides.scala index d902305c8..933202117 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/extension/ColumnarOverrides.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/extension/ColumnarOverrides.scala @@ -209,8 +209,15 @@ case class ColumnarPreOverrides() extends Rule[SparkPlan] { right) case plan: BroadcastQueryStageExec => logDebug( - s"Columnar Processing for ${plan.getClass} is currently supported, actual plan is ${plan.plan.getClass}.") - plan + s"Columnar Processing for ${plan.getClass} is currently supported, actual plan is ${plan.plan}.") + plan.plan match { + case ReusedExchangeExec(_, originalBroadcastPlan: ColumnarBroadcastExchangeAdaptor) => + val newBroadcast = BroadcastExchangeExec( + originalBroadcastPlan.mode, + DataToArrowColumnarExec(plan.plan, 1)) + SparkShimLoader.getSparkShims.newBroadcastQueryStageExec(plan.id, newBroadcast) + case other => plan + } case plan: BroadcastExchangeExec => val child = replaceWithColumnarPlan(plan.child) logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")