From d20392c2c71cf93e23ee7481127070deab855a6e Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Tue, 9 Mar 2021 17:13:41 +0800 Subject: [PATCH 1/6] [NSE-145] Support decimal in columnar window --- .../com/intel/oap/ColumnarGuardRule.scala | 3 +- .../scala/com/intel/oap/ColumnarPlugin.scala | 19 +- .../oap/execution/ColumnarWindowExec.scala | 71 +++++- .../com/intel/oap/tpc/ds/TPCDSSuite.scala | 28 +++ .../com/intel/oap/tpc/ds/TPCDSTableGen.scala | 158 +++++++------- cpp/src/CMakeLists.txt | 8 +- .../codegen/arrow_compute/ext/kernels_ext.h | 11 +- .../arrow_compute/ext/window_kernel.cc | 81 ++++--- .../arrow_compute/ext/window_sort_kernel.h | 142 ++---------- cpp/src/tests/CMakeLists.txt | 2 + cpp/src/tests/arrow_compute_test_window.cc | 202 ++++++++++++++++++ 11 files changed, 473 insertions(+), 252 deletions(-) create mode 100644 cpp/src/tests/arrow_compute_test_window.cc diff --git a/core/src/main/scala/com/intel/oap/ColumnarGuardRule.scala b/core/src/main/scala/com/intel/oap/ColumnarGuardRule.scala index 2d9934166..eb6a153c0 100644 --- a/core/src/main/scala/com/intel/oap/ColumnarGuardRule.scala +++ b/core/src/main/scala/com/intel/oap/ColumnarGuardRule.scala @@ -160,11 +160,12 @@ case class ColumnarGuardRule(conf: SparkConf) extends Rule[SparkPlan] { plan.isSkewJoin) case plan: WindowExec => if (!enableColumnarWindow) return false - new ColumnarWindowExec( + val window = ColumnarWindowExec.create( plan.windowExpression, plan.partitionSpec, plan.orderSpec, plan.child) + window case p => p } diff --git a/core/src/main/scala/com/intel/oap/ColumnarPlugin.scala b/core/src/main/scala/com/intel/oap/ColumnarPlugin.scala index afa4d995b..8272ba667 100644 --- a/core/src/main/scala/com/intel/oap/ColumnarPlugin.scala +++ b/core/src/main/scala/com/intel/oap/ColumnarPlugin.scala @@ -67,13 +67,17 @@ case class ColumnarPreOverrides(conf: SparkConf) extends Rule[SparkPlan] { new ColumnarBatchScanExec(plan.output, plan.scan) } case plan: ProjectExec => - val columnarPlan = replaceWithColumnarPlan(plan.child) + val columnarChild = replaceWithColumnarPlan(plan.child) logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.") - if (columnarPlan.isInstanceOf[ColumnarConditionProjectExec]) { - val cur_plan = columnarPlan.asInstanceOf[ColumnarConditionProjectExec] - ColumnarConditionProjectExec(cur_plan.condition, plan.projectList, cur_plan.child) - } else { - ColumnarConditionProjectExec(null, plan.projectList, columnarPlan) + columnarChild match { + case ch: ColumnarConditionProjectExec => + if (ch.projectList == null) { + ColumnarConditionProjectExec(ch.condition, plan.projectList, ch.child) + } else { + ColumnarConditionProjectExec(null, plan.projectList, columnarChild) + } + case _ => + ColumnarConditionProjectExec(null, plan.projectList, columnarChild) } case plan: FilterExec => val child = replaceWithColumnarPlan(plan.child) @@ -234,11 +238,12 @@ case class ColumnarPreOverrides(conf: SparkConf) extends Rule[SparkPlan] { } logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.") try { - return new ColumnarWindowExec( + val window = ColumnarWindowExec.create( plan.windowExpression, plan.partitionSpec, plan.orderSpec, coalesceBatchRemoved) + return window } catch { case _: Throwable => logInfo("Columnar Window: Falling back to regular Window...") diff --git a/core/src/main/scala/com/intel/oap/execution/ColumnarWindowExec.scala b/core/src/main/scala/com/intel/oap/execution/ColumnarWindowExec.scala index 4ea693dca..65b70dc20 100644 --- a/core/src/main/scala/com/intel/oap/execution/ColumnarWindowExec.scala +++ b/core/src/main/scala/com/intel/oap/execution/ColumnarWindowExec.scala @@ -27,20 +27,21 @@ import org.apache.arrow.gandiva.expression.TreeBuilder import org.apache.arrow.vector.types.pojo.{ArrowType, Field, FieldType, Schema} import org.apache.arrow.vector.types.pojo.ArrowType.ArrowTypeID import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, Attribute, AttributeReference, Cast, Descending, Expression, NamedExpression, Rank, SortOrder, WindowExpression, WindowFunction} -import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Average, Sum} +import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, Attribute, AttributeReference, Cast, Descending, Expression, MakeDecimal, NamedExpression, Rank, SortOrder, UnscaledValue, WindowExpression, WindowFunction} +import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, AggregateFunction, Average, Sum} import org.apache.spark.sql.execution.window.WindowExec import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.datasources.v2.arrow.SparkMemoryUtils import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.ArrayType +import org.apache.spark.sql.types.{ArrayType, DecimalType} import org.apache.spark.sql.util.ArrowUtils import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.util.ExecutorManager import scala.collection.JavaConverters._ import scala.collection.mutable.ListBuffer +import scala.util.Random class ColumnarWindowExec(windowExpression: Seq[NamedExpression], partitionSpec: Seq[Expression], @@ -248,3 +249,67 @@ class ColumnarWindowExec(windowExpression: Seq[NamedExpression], override def isComplex: Boolean = false } } + +object ColumnarWindowExec { + + def createWithProjection( + windowExpression: Seq[NamedExpression], + partitionSpec: Seq[Expression], + orderSpec: Seq[SortOrder], + child: SparkPlan): SparkPlan = { + + def makeInputProject(ex: Expression, inputProjects: ListBuffer[NamedExpression]): Expression = { + ex match { + case ae: AggregateExpression => ae.withNewChildren(ae.children.map(makeInputProject(_, inputProjects))) + case ae: WindowExpression => ae.withNewChildren(ae.children.map(makeInputProject(_, inputProjects))) + case func @ (_: AggregateFunction | _:WindowFunction) => + val params = func.children + func.withNewChildren(params.map { + case param @ (_: Cast | _: UnscaledValue) => + val aliasName = "__alias_%d__".format(Random.nextLong()) + val alias = Alias(param, aliasName)() + inputProjects.append(alias) + alias.toAttribute + case other => other + }) + case other => other + } + } + + def makeOutputProject(ex: Expression, windows: ListBuffer[NamedExpression], inputProjects: ListBuffer[NamedExpression]): Expression = { + ex match { + case we: WindowExpression => + val aliasName = "__alias_%d__".format(Random.nextLong()) + val alias = Alias(makeInputProject(we, inputProjects), aliasName)() + windows.append(alias) + alias.toAttribute + case _ => + ex.withNewChildren(ex.children.map(makeOutputProject(_, windows, inputProjects))) + } + } + + val windows = ListBuffer[NamedExpression]() + val inProjectExpressions = ListBuffer[NamedExpression]() + val outProjectExpressions = windowExpression.map(e => e.asInstanceOf[Alias]) + .map { a => + a.withNewChildren(List(makeOutputProject(a.child, windows, inProjectExpressions))) + .asInstanceOf[NamedExpression] + } + + val inputProject = ColumnarConditionProjectExec(null, child.output ++ inProjectExpressions, child) + + val window = new ColumnarWindowExec(windows, partitionSpec, orderSpec, inputProject) + + val outputProject = ColumnarConditionProjectExec(null, child.output ++ outProjectExpressions, window) + + outputProject + } + + def create( + windowExpression: Seq[NamedExpression], + partitionSpec: Seq[Expression], + orderSpec: Seq[SortOrder], + child: SparkPlan): SparkPlan = { + createWithProjection(windowExpression, partitionSpec, orderSpec, child) + } +} diff --git a/core/src/test/scala/com/intel/oap/tpc/ds/TPCDSSuite.scala b/core/src/test/scala/com/intel/oap/tpc/ds/TPCDSSuite.scala index feedd53d6..b1382a78d 100644 --- a/core/src/test/scala/com/intel/oap/tpc/ds/TPCDSSuite.scala +++ b/core/src/test/scala/com/intel/oap/tpc/ds/TPCDSSuite.scala @@ -88,6 +88,34 @@ class TPCDSSuite extends QueryTest with SharedSparkSession { test("window query") { runner.runTPCQuery("q67", 1, true) } + + test("window function with non-decimal input") { + val df = spark.sql("SELECT i_item_sk, i_class_id, SUM(i_category_id)" + + " OVER (PARTITION BY i_class_id) FROM item LIMIT 1000") + df.explain() + df.show() + } + + test("window function with decimal input") { + val df = spark.sql("SELECT i_item_sk, i_class_id, SUM(i_current_price)" + + " OVER (PARTITION BY i_class_id) FROM item LIMIT 1000") + df.explain() + df.show() + } + + test("window function with decimal input 2") { + val df = spark.sql("SELECT i_item_sk, i_class_id, RANK()" + + " OVER (PARTITION BY i_class_id ORDER BY i_current_price) FROM item LIMIT 1000") + df.explain() + df.show() + } + + test("window function with decimal input 3") { + val df = spark.sql("SELECT i_item_sk, i_class_id, AVG(i_current_price)" + + " OVER (PARTITION BY i_class_id) FROM item LIMIT 1000") + df.explain() + df.show() + } } object TPCDSSuite { diff --git a/core/src/test/scala/com/intel/oap/tpc/ds/TPCDSTableGen.scala b/core/src/test/scala/com/intel/oap/tpc/ds/TPCDSTableGen.scala index e2bdaa264..0cbd13232 100644 --- a/core/src/test/scala/com/intel/oap/tpc/ds/TPCDSTableGen.scala +++ b/core/src/test/scala/com/intel/oap/tpc/ds/TPCDSTableGen.scala @@ -177,21 +177,21 @@ object TPCDSTableGen { StructField("cs_promo_sk", LongType), StructField("cs_order_number", LongType), StructField("cs_quantity", LongType), - StructField("cs_wholesale_cost", DoubleType), - StructField("cs_list_price", DoubleType), - StructField("cs_sales_price", DoubleType), - StructField("cs_ext_discount_amt", DoubleType), - StructField("cs_ext_sales_price", DoubleType), - StructField("cs_ext_wholesale_cost", DoubleType), - StructField("cs_ext_list_price", DoubleType), - StructField("cs_ext_tax", DoubleType), - StructField("cs_coupon_amt", DoubleType), - StructField("cs_ext_ship_cost", DoubleType), - StructField("cs_net_paid", DoubleType), - StructField("cs_net_paid_inc_tax", DoubleType), - StructField("cs_net_paid_inc_ship", DoubleType), - StructField("cs_net_paid_inc_ship_tax", DoubleType), - StructField("cs_net_profit", DoubleType) + StructField("cs_wholesale_cost", DecimalType(7, 2)), + StructField("cs_list_price", DecimalType(7, 2)), + StructField("cs_sales_price", DecimalType(7, 2)), + StructField("cs_ext_discount_amt", DecimalType(7, 2)), + StructField("cs_ext_sales_price", DecimalType(7, 2)), + StructField("cs_ext_wholesale_cost", DecimalType(7, 2)), + StructField("cs_ext_list_price", DecimalType(7, 2)), + StructField("cs_ext_tax", DecimalType(7, 2)), + StructField("cs_coupon_amt", DecimalType(7, 2)), + StructField("cs_ext_ship_cost", DecimalType(7, 2)), + StructField("cs_net_paid", DecimalType(7, 2)), + StructField("cs_net_paid_inc_tax", DecimalType(7, 2)), + StructField("cs_net_paid_inc_ship", DecimalType(7, 2)), + StructField("cs_net_paid_inc_ship_tax", DecimalType(7, 2)), + StructField("cs_net_profit", DecimalType(7, 2)) )) } private def catalogReturnsSchema = { @@ -214,15 +214,15 @@ object TPCDSTableGen { StructField("cr_reason_sk", LongType), StructField("cr_order_number", LongType), StructField("cr_return_quantity", LongType), - StructField("cr_return_amount", DoubleType), - StructField("cr_return_tax", DoubleType), - StructField("cr_return_amt_inc_tax", DoubleType), - StructField("cr_fee", DoubleType), - StructField("cr_return_ship_cost", DoubleType), - StructField("cr_refunded_cash", DoubleType), - StructField("cr_reversed_charge", DoubleType), - StructField("cr_store_credit", DoubleType), - StructField("cr_net_loss", DoubleType) + StructField("cr_return_amount", DecimalType(7, 2)), + StructField("cr_return_tax", DecimalType(7, 2)), + StructField("cr_return_amt_inc_tax", DecimalType(7, 2)), + StructField("cr_fee", DecimalType(7, 2)), + StructField("cr_return_ship_cost", DecimalType(7, 2)), + StructField("cr_refunded_cash", DecimalType(7, 2)), + StructField("cr_reversed_charge", DecimalType(7, 2)), + StructField("cr_store_credit", DecimalType(7, 2)), + StructField("cr_net_loss", DecimalType(7, 2)) )) } private def inventorySchema = { @@ -246,18 +246,18 @@ object TPCDSTableGen { StructField("ss_promo_sk", LongType), StructField("ss_ticket_number", LongType), StructField("ss_quantity", LongType), - StructField("ss_wholesale_cost", DoubleType), - StructField("ss_list_price", DoubleType), - StructField("ss_sales_price", DoubleType), - StructField("ss_ext_discount_amt", DoubleType), - StructField("ss_ext_sales_price", DoubleType), - StructField("ss_ext_wholesale_cost", DoubleType), - StructField("ss_ext_list_price", DoubleType), - StructField("ss_ext_tax", DoubleType), - StructField("ss_coupon_amt", DoubleType), - StructField("ss_net_paid", DoubleType), - StructField("ss_net_paid_inc_tax", DoubleType), - StructField("ss_net_profit", DoubleType) + StructField("ss_wholesale_cost", DecimalType(7, 2)), + StructField("ss_list_price", DecimalType(7, 2)), + StructField("ss_sales_price", DecimalType(7, 2)), + StructField("ss_ext_discount_amt", DecimalType(7, 2)), + StructField("ss_ext_sales_price", DecimalType(7, 2)), + StructField("ss_ext_wholesale_cost", DecimalType(7, 2)), + StructField("ss_ext_list_price", DecimalType(7, 2)), + StructField("ss_ext_tax", DecimalType(7, 2)), + StructField("ss_coupon_amt", DecimalType(7, 2)), + StructField("ss_net_paid", DecimalType(7, 2)), + StructField("ss_net_paid_inc_tax", DecimalType(7, 2)), + StructField("ss_net_profit", DecimalType(7, 2)) )) } private def storeReturnsSchema = { @@ -273,15 +273,15 @@ object TPCDSTableGen { StructField("sr_reason_sk", LongType), StructField("sr_ticket_number", LongType), StructField("sr_return_quantity", LongType), - StructField("sr_return_amt", DoubleType), - StructField("sr_return_tax", DoubleType), - StructField("sr_return_amt_inc_tax", DoubleType), - StructField("sr_fee", DoubleType), - StructField("sr_return_ship_cost", DoubleType), - StructField("sr_refunded_cash", DoubleType), - StructField("sr_reversed_charge", DoubleType), - StructField("sr_store_credit", DoubleType), - StructField("sr_net_loss", DoubleType) + StructField("sr_return_amt", DecimalType(7, 2)), + StructField("sr_return_tax", DecimalType(7, 2)), + StructField("sr_return_amt_inc_tax", DecimalType(7, 2)), + StructField("sr_fee", DecimalType(7, 2)), + StructField("sr_return_ship_cost", DecimalType(7, 2)), + StructField("sr_refunded_cash", DecimalType(7, 2)), + StructField("sr_reversed_charge", DecimalType(7, 2)), + StructField("sr_store_credit", DecimalType(7, 2)), + StructField("sr_net_loss", DecimalType(7, 2)) )) } private def webSalesSchema = { @@ -305,21 +305,21 @@ object TPCDSTableGen { StructField("ws_promo_sk", LongType), StructField("ws_order_number", LongType), StructField("ws_quantity", LongType), - StructField("ws_wholesale_cost", DoubleType), - StructField("ws_list_price", DoubleType), - StructField("ws_sales_price", DoubleType), - StructField("ws_ext_discount_amt", DoubleType), - StructField("ws_ext_sales_price", DoubleType), - StructField("ws_ext_wholesale_cost", DoubleType), - StructField("ws_ext_list_price", DoubleType), - StructField("ws_ext_tax", DoubleType), - StructField("ws_coupon_amt", DoubleType), - StructField("ws_ext_ship_cost", DoubleType), - StructField("ws_net_paid", DoubleType), - StructField("ws_net_paid_inc_tax", DoubleType), - StructField("ws_net_paid_inc_ship", DoubleType), - StructField("ws_net_paid_inc_ship_tax", DoubleType), - StructField("ws_net_profit", DoubleType) + StructField("ws_wholesale_cost", DecimalType(7, 2)), + StructField("ws_list_price", DecimalType(7, 2)), + StructField("ws_sales_price", DecimalType(7, 2)), + StructField("ws_ext_discount_amt", DecimalType(7, 2)), + StructField("ws_ext_sales_price", DecimalType(7, 2)), + StructField("ws_ext_wholesale_cost", DecimalType(7, 2)), + StructField("ws_ext_list_price", DecimalType(7, 2)), + StructField("ws_ext_tax", DecimalType(7, 2)), + StructField("ws_coupon_amt", DecimalType(7, 2)), + StructField("ws_ext_ship_cost", DecimalType(7, 2)), + StructField("ws_net_paid", DecimalType(7, 2)), + StructField("ws_net_paid_inc_tax", DecimalType(7, 2)), + StructField("ws_net_paid_inc_ship", DecimalType(7, 2)), + StructField("ws_net_paid_inc_ship_tax", DecimalType(7, 2)), + StructField("ws_net_profit", DecimalType(7, 2)) )) } private def webReturnsSchema = { @@ -339,15 +339,15 @@ object TPCDSTableGen { StructField("wr_reason_sk", LongType), StructField("wr_order_number", LongType), StructField("wr_return_quantity", LongType), - StructField("wr_return_amt", DoubleType), - StructField("wr_return_tax", DoubleType), - StructField("wr_return_amt_inc_tax", DoubleType), - StructField("wr_fee", DoubleType), - StructField("wr_return_ship_cost", DoubleType), - StructField("wr_refunded_cash", DoubleType), - StructField("wr_reversed_charge", DoubleType), - StructField("wr_account_credit", DoubleType), - StructField("wr_net_loss", DoubleType) + StructField("wr_return_amt", DecimalType(7, 2)), + StructField("wr_return_tax", DecimalType(7, 2)), + StructField("wr_return_amt_inc_tax", DecimalType(7, 2)), + StructField("wr_fee", DecimalType(7, 2)), + StructField("wr_return_ship_cost", DecimalType(7, 2)), + StructField("wr_refunded_cash", DecimalType(7, 2)), + StructField("wr_reversed_charge", DecimalType(7, 2)), + StructField("wr_account_credit", DecimalType(7, 2)), + StructField("wr_net_loss", DecimalType(7, 2)) )) } private def callCenterSchema = { @@ -381,8 +381,8 @@ object TPCDSTableGen { StructField("cc_state", StringType), StructField("cc_zip", StringType), StructField("cc_country", StringType), - StructField("cc_gmt_offset", DoubleType), - StructField("cc_tax_percentage", DoubleType) + StructField("cc_gmt_offset", DecimalType(5, 2)), + StructField("cc_tax_percentage", DecimalType(5, 2)) )) } private def catalogPageSchema = { @@ -433,7 +433,7 @@ object TPCDSTableGen { StructField("ca_state", StringType), StructField("ca_zip", StringType), StructField("ca_country", StringType), - StructField("ca_gmt_offset", DoubleType), + StructField("ca_gmt_offset", DecimalType(5, 2)), StructField("ca_location_type", StringType) )) } @@ -505,8 +505,8 @@ object TPCDSTableGen { StructField("i_rec_start_date", StringType), StructField("i_rec_end_date", StringType), StructField("i_item_desc", StringType), - StructField("i_current_price", DoubleType), - StructField("i_wholesale_cost", DoubleType), + StructField("i_current_price", DecimalType(7, 2)), + StructField("i_wholesale_cost", DecimalType(7, 2)), StructField("i_brand_id", LongType), StructField("i_brand", StringType), StructField("i_class_id", LongType), @@ -531,7 +531,7 @@ object TPCDSTableGen { StructField("p_start_date_sk", LongType), StructField("p_end_date_sk", LongType), StructField("p_item_sk", LongType), - StructField("p_cost", DoubleType), + StructField("p_cost", DecimalType(15, 2)), StructField("p_response_target", LongType), StructField("p_promo_name", StringType), StructField("p_channel_dmail", StringType), @@ -593,8 +593,8 @@ object TPCDSTableGen { StructField("s_state", StringType), StructField("s_zip", StringType), StructField("s_country", StringType), - StructField("s_gmt_offset", DoubleType), - StructField("s_tax_precentage", DoubleType) + StructField("s_gmt_offset", DecimalType(5, 2)), + StructField("s_tax_precentage", DecimalType(5, 2)) )) } private def timeDimSchema = { @@ -626,7 +626,7 @@ object TPCDSTableGen { StructField("w_state", StringType), StructField("w_zip", StringType), StructField("w_country", StringType), - StructField("w_gmt_offset", DoubleType) + StructField("w_gmt_offset", DecimalType(5, 2)) )) } private def webPageSchema = { @@ -674,7 +674,7 @@ object TPCDSTableGen { StructField("web_zip", StringType), StructField("web_country", StringType), StructField("web_gmt_offset", StringType), - StructField("web_tax_percentage", DoubleType) + StructField("web_tax_percentage", DecimalType(5, 2)) )) } } diff --git a/cpp/src/CMakeLists.txt b/cpp/src/CMakeLists.txt index c2ac266c7..236237e2e 100644 --- a/cpp/src/CMakeLists.txt +++ b/cpp/src/CMakeLists.txt @@ -116,6 +116,10 @@ add_dependencies(jni_proto protobuf::libprotobuf) set(PROTO_SRCS "${PROTO_OUTPUT_DIR}/Exprs.pb.cc") set(PROTO_HDRS "${PROTO_OUTPUT_DIR}/Exprs.pb.h") +if(DEBUG) + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -g -O0 -DDEBUG -DDEBUG_LEVEL_1 -DDEBUG_LEVEL_2") +endif() + if(USE_AVX512) # Only enable additional instruction sets if they are supported message(STATUS "System processor: ${CMAKE_SYSTEM_PROCESSOR}") @@ -167,10 +171,6 @@ endmacro() add_subdirectory(benchmarks) endif() -if(DEBUG) - set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -g -O0 -DDEBUG -DDEBUG_LEVEL_1 -DDEBUG_LEVEL_2") -endif() - find_library(ARROW_LIB arrow) find_library(GANDIVA_LIB gandiva) diff --git a/cpp/src/codegen/arrow_compute/ext/kernels_ext.h b/cpp/src/codegen/arrow_compute/ext/kernels_ext.h index c8051150b..4ede6fd30 100644 --- a/cpp/src/codegen/arrow_compute/ext/kernels_ext.h +++ b/cpp/src/codegen/arrow_compute/ext/kernels_ext.h @@ -134,10 +134,17 @@ class WindowAggregateFunctionKernel : public KernalBase { std::shared_ptr* out); arrow::Status Evaluate(const ArrayList& in) override; arrow::Status Finish(ArrayList* out) override; - template - arrow::Status Finish0(ArrayList* out); private: + template + arrow::Status Finish0(ArrayList* out, std::shared_ptr data_type); + + template + typename arrow::enable_if_decimal128>> createBuilder(std::shared_ptr data_type); + + template + typename arrow::enable_if_number>> createBuilder(std::shared_ptr data_type); + arrow::compute::ExecContext* ctx_; std::shared_ptr action_; std::vector> accumulated_group_ids_; diff --git a/cpp/src/codegen/arrow_compute/ext/window_kernel.cc b/cpp/src/codegen/arrow_compute/ext/window_kernel.cc index 1d793f08a..d8ec9db3c 100644 --- a/cpp/src/codegen/arrow_compute/ext/window_kernel.cc +++ b/cpp/src/codegen/arrow_compute/ext/window_kernel.cc @@ -33,12 +33,13 @@ class WindowAggregateFunctionKernel::ActionFactory { static arrow::Status Make(std::string action_name, arrow::compute::ExecContext *ctx, std::shared_ptr type, + std::shared_ptr return_type, std::shared_ptr *out) { std::shared_ptr action; if (action_name == "sum") { - RETURN_NOT_OK(MakeSumAction(ctx, type, {type}, &action)); + RETURN_NOT_OK(MakeSumAction(ctx, type, {return_type}, &action)); } else if (action_name == "avg") { - RETURN_NOT_OK(MakeAvgAction(ctx, type, {type}, &action)); + RETURN_NOT_OK(MakeAvgAction(ctx, type, {return_type}, &action)); } else { return arrow::Status::Invalid("window aggregate function: unsupported action name: " + action_name); } @@ -65,7 +66,7 @@ arrow::Status WindowAggregateFunctionKernel::Make(arrow::compute::ExecContext *c std::shared_ptr action; if (function_name == "sum" || function_name == "avg") { - RETURN_NOT_OK(ActionFactory::Make(function_name, ctx, type_list[0], &action)); + RETURN_NOT_OK(ActionFactory::Make(function_name, ctx, type_list[0], result_type, &action)); } else { return arrow::Status::Invalid("window function not supported: " + function_name); } @@ -127,45 +128,48 @@ arrow::Status WindowAggregateFunctionKernel::Evaluate(const ArrayList &in) { return arrow::Status::OK(); } -#define PROCESS_SUPPORTED_TYPES(PROCESS) \ - PROCESS(arrow::UInt8Type) \ - PROCESS(arrow::Int8Type) \ - PROCESS(arrow::UInt16Type) \ - PROCESS(arrow::Int16Type) \ - PROCESS(arrow::UInt32Type) \ - PROCESS(arrow::Int32Type) \ - PROCESS(arrow::UInt64Type) \ - PROCESS(arrow::Int64Type) \ - PROCESS(arrow::FloatType) \ - PROCESS(arrow::DoubleType) +#define PROCESS_SUPPORTED_TYPES_WINDOW(PROC) \ + PROC(arrow::UInt8Type, arrow::UInt8Builder, arrow::UInt8Array) \ + PROC(arrow::Int8Type, arrow::Int8Builder, arrow::Int8Array) \ + PROC(arrow::UInt16Type, arrow::UInt16Builder, arrow::UInt16Array) \ + PROC(arrow::Int16Type, arrow::Int16Builder, arrow::Int16Array) \ + PROC(arrow::UInt32Type, arrow::UInt32Builder, arrow::UInt32Array) \ + PROC(arrow::Int32Type, arrow::Int32Builder, arrow::Int32Array) \ + PROC(arrow::UInt64Type, arrow::UInt64Builder, arrow::UInt64Array) \ + PROC(arrow::Int64Type, arrow::Int64Builder, arrow::Int64Array) \ + PROC(arrow::FloatType, arrow::FloatBuilder, arrow::FloatArray) \ + PROC(arrow::DoubleType, arrow::DoubleBuilder, arrow::DoubleArray) \ + PROC(arrow::Decimal128Type, arrow::Decimal128Builder, arrow::Decimal128Array) arrow::Status WindowAggregateFunctionKernel::Finish(ArrayList *out) { std::shared_ptr value_type = result_type_; switch (value_type->id()) { -#define PROCESS(NUMERIC_TYPE) \ - case NUMERIC_TYPE::type_id: { \ - RETURN_NOT_OK(Finish0(out)); \ + +#define PROCESS(VALUE_TYPE, BUILDER_TYPE, ARRAY_TYPE) \ + case VALUE_TYPE::type_id: { \ + RETURN_NOT_OK((Finish0(out, value_type))); \ } break; - PROCESS_SUPPORTED_TYPES(PROCESS) + + PROCESS_SUPPORTED_TYPES_WINDOW(PROCESS) #undef PROCESS - default:return arrow::Status::Invalid("window function: unsupported input type"); + default: return arrow::Status::Invalid("window function: unsupported input type: " + value_type->name()); } return arrow::Status::OK(); } -template -arrow::Status WindowAggregateFunctionKernel::Finish0(ArrayList *out) { +template +arrow::Status WindowAggregateFunctionKernel::Finish0(ArrayList *out, std::shared_ptr data_type) { ArrayList action_output; RETURN_NOT_OK(action_->Get()->Finish(&action_output)); if (action_output.size() != 1) { return arrow::Status::Invalid("window function: got invalid result from corresponding action"); } - auto action_output_values = std::dynamic_pointer_cast>(action_output.at(0)); + auto action_output_values = std::dynamic_pointer_cast(action_output.at(0)); for (const auto &accumulated_group_ids_single_part : accumulated_group_ids_) { - std::unique_ptr> output_builder - = std::make_unique>(ctx_->memory_pool()); + std::shared_ptr output_builder; + ARROW_ASSIGN_OR_RAISE(output_builder, (createBuilder(data_type))) for (int i = 0; i < accumulated_group_ids_single_part->length(); i++) { if (accumulated_group_ids_single_part->IsNull(i)) { @@ -182,6 +186,18 @@ arrow::Status WindowAggregateFunctionKernel::Finish0(ArrayList *out) { return arrow::Status::OK(); } +template +typename arrow::enable_if_decimal128>> + WindowAggregateFunctionKernel::createBuilder(std::shared_ptr data_type) { + return std::make_shared(data_type, ctx_->memory_pool()); +} + +template +typename arrow::enable_if_number>> + WindowAggregateFunctionKernel::createBuilder(std::shared_ptr data_type) { + return std::make_shared(ctx_->memory_pool()); +} + WindowRankKernel::WindowRankKernel(arrow::compute::ExecContext *ctx, std::vector> type_list, std::shared_ptr sorter, @@ -215,15 +231,15 @@ arrow::Status WindowRankKernel::Make(arrow::compute::ExecContext *ctx, result_schema, nulls_first, asc)); } else { switch (key_field->type()->id()) { -#define PROCESS(InType) \ +#define PROCESS(InType, BUILDER_TYPE, ARRAY_TYPE) \ case InType::type_id: { \ - using CType = typename arrow::TypeTraits::CType; \ + using CType = typename TypeTraits::CType; \ sorter.reset(new WindowSortOnekeyKernel(ctx, key_fields, result_schema, nulls_first, asc)); \ } break; - PROCESS_SUPPORTED_TYPES(PROCESS) + PROCESS_SUPPORTED_TYPES_WINDOW(PROCESS) #undef PROCESS default: { - std::cout << "WindowSortOnekeyKernel type not supported, type is " + std::cout << "WindowRankKernel type not supported, type is " << key_field->type() << std::endl; } break; @@ -352,12 +368,11 @@ arrow::Status WindowRankKernel::Finish(ArrayList *out) { bool s; std::shared_ptr type = type_list_.at(column_id); switch (type->id()) { -#define PROCESS(InType) \ +#define PROCESS(InType, BUILDER_TYPE, ARRAY_TYPE) \ case InType::type_id: { \ - using ArrayType = typename arrow::TypeTraits::ArrayType; \ - RETURN_NOT_OK(AreTheSameValue(values, column_id, index, last_index, &s)); \ + RETURN_NOT_OK(AreTheSameValue(values, column_id, index, last_index, &s)); \ } break; - PROCESS_SUPPORTED_TYPES(PROCESS) + PROCESS_SUPPORTED_TYPES_WINDOW(PROCESS) #undef PROCESS default: { std::cout << "WindowRankKernel: type not supported: " @@ -458,6 +473,8 @@ arrow::Status WindowRankKernel::AreTheSameValue(const std::vector& va return arrow::Status::OK(); } +#undef PROCESS_SUPPORTED_TYPES_WINDOW + } } } diff --git a/cpp/src/codegen/arrow_compute/ext/window_sort_kernel.h b/cpp/src/codegen/arrow_compute/ext/window_sort_kernel.h index 5ada38824..087c15577 100644 --- a/cpp/src/codegen/arrow_compute/ext/window_sort_kernel.h +++ b/cpp/src/codegen/arrow_compute/ext/window_sort_kernel.h @@ -696,115 +696,6 @@ class WindowSortOnekeyKernel : public WindowSortKernel::Impl { uint64_t num_batches_ = 0; uint64_t col_num_; int key_id_; - -#define PROCESS_SUPPORTED_TYPES(PROCESS) \ - PROCESS(arrow::UInt8Type) \ - PROCESS(arrow::Int8Type) \ - PROCESS(arrow::UInt16Type) \ - PROCESS(arrow::Int16Type) \ - PROCESS(arrow::UInt32Type) \ - PROCESS(arrow::Int32Type) \ - PROCESS(arrow::UInt64Type) \ - PROCESS(arrow::Int64Type) \ - PROCESS(arrow::FloatType) \ - PROCESS(arrow::DoubleType) - class SorterResultIterator : public ResultIterator { - public: - SorterResultIterator(arrow::compute::ExecContext* ctx, - std::shared_ptr schema, - std::shared_ptr result_schema, - std::shared_ptr indices_in, - std::vector& cached) - : ctx_(ctx), - result_schema_(result_schema), - indices_in_cache_(indices_in), - total_length_(indices_in->length()), - cached_in_(cached) { - col_num_ = result_schema->num_fields(); - indices_begin_ = (ArrayItemIndex*)indices_in->value_data(); - for (uint64_t i = 0; i < col_num_; i++) { - auto field = result_schema->field(i); - if (field->type()->id() == arrow::Type::STRING) { - auto app_ptr = std::make_shared>(ctx); - auto appender = std::dynamic_pointer_cast(app_ptr); - appender_list_.push_back(appender); - } else { - switch (field->type()->id()) { -#define PROCESS(InType) \ - case InType::type_id: { \ - auto app_ptr = std::make_shared>(ctx); \ - auto appender = std::dynamic_pointer_cast(app_ptr); \ - appender_list_.push_back(appender); \ - } break; - PROCESS_SUPPORTED_TYPES(PROCESS) - default: { - std::cout << "WindowSortOnekeyKernel type not supported, type is " - << field->type() << std::endl; - } break; -#undef PROCESS - } - } - } - for (int i = 0; i < col_num_; i++) { - arrow::ArrayVector array_vector = cached_in_[i]; - int array_num = array_vector.size(); - for (int array_id = 0; array_id < array_num; array_id++) { - auto arr = array_vector[array_id]; - appender_list_[i]->AddArray(arr); - } - } - batch_size_ = GetBatchSize(); - } - - std::string ToString() override { return "SortArraysToIndicesResultIterator"; } - - bool HasNext() override { - if (offset_ >= total_length_) { - return false; - } - return true; - } - - arrow::Status Next(std::shared_ptr* out) { - auto length = (total_length_ - offset_) > batch_size_ ? batch_size_ - : (total_length_ - offset_); - uint64_t count = 0; - for (int i = 0; i < col_num_; i++) { - while (count < length) { - auto item = indices_begin_ + offset_ + count++; - RETURN_NOT_OK(appender_list_[i]->Append(item->array_id, item->id)); - } - count = 0; - } - offset_ += length; - ArrayList arrays; - for (int i = 0; i < col_num_; i++) { - std::shared_ptr out_; - RETURN_NOT_OK(appender_list_[i]->Finish(&out_)); - arrays.push_back(out_); - appender_list_[i]->Reset(); - } - - *out = arrow::RecordBatch::Make(result_schema_, length, arrays); - return arrow::Status::OK(); - } - - private: - uint64_t offset_ = 0; - const uint64_t total_length_; - std::shared_ptr schema_; - std::shared_ptr result_schema_; - arrow::compute::ExecContext* ctx_; - uint64_t batch_size_; - uint64_t col_num_; - ArrayItemIndex* indices_begin_; - std::vector cached_in_; - std::vector> type_list_; - std::vector> appender_list_; - std::vector> array_list_; - std::shared_ptr indices_in_cache_; - }; -#undef PROCESS_SUPPORTED_TYPES }; arrow::Status WindowSortKernel::Make( @@ -816,17 +707,20 @@ arrow::Status WindowSortKernel::Make( nulls_first, asc); return arrow::Status::OK(); } -#define PROCESS_SUPPORTED_TYPES(PROCESS) \ - PROCESS(arrow::UInt8Type) \ - PROCESS(arrow::Int8Type) \ - PROCESS(arrow::UInt16Type) \ - PROCESS(arrow::Int16Type) \ - PROCESS(arrow::UInt32Type) \ - PROCESS(arrow::Int32Type) \ - PROCESS(arrow::UInt64Type) \ - PROCESS(arrow::Int64Type) \ - PROCESS(arrow::FloatType) \ - PROCESS(arrow::DoubleType) + +#define PROCESS_SUPPORTED_TYPES_WINDOW_SORT(PROC) \ + PROC(arrow::UInt8Type, arrow::UInt8Builder, arrow::UInt8Array) \ + PROC(arrow::Int8Type, arrow::Int8Builder, arrow::Int8Array) \ + PROC(arrow::UInt16Type, arrow::UInt16Builder, arrow::UInt16Array) \ + PROC(arrow::Int16Type, arrow::Int16Builder, arrow::Int16Array) \ + PROC(arrow::UInt32Type, arrow::UInt32Builder, arrow::UInt32Array) \ + PROC(arrow::Int32Type, arrow::Int32Builder, arrow::Int32Array) \ + PROC(arrow::UInt64Type, arrow::UInt64Builder, arrow::UInt64Array) \ + PROC(arrow::Int64Type, arrow::Int64Builder, arrow::Int64Array) \ + PROC(arrow::FloatType, arrow::FloatBuilder, arrow::FloatArray) \ + PROC(arrow::DoubleType, arrow::DoubleBuilder, arrow::DoubleArray) \ + PROC(arrow::Decimal128Type, arrow::Decimal128Builder, arrow::Decimal128Array) + WindowSortKernel::WindowSortKernel( arrow::compute::ExecContext* ctx, std::vector> key_field_list, @@ -841,12 +735,12 @@ WindowSortKernel::WindowSortKernel( result_schema, nulls_first, asc)); } else { switch (key_field_list[0]->type()->id()) { -#define PROCESS(InType) \ +#define PROCESS(InType, BUILDER_TYPE, ARRAY_TYPE) \ case InType::type_id: { \ - using CType = typename arrow::TypeTraits::CType; \ + using CType = typename TypeTraits::CType; \ impl_.reset(new WindowSortOnekeyKernel(ctx, key_field_list, result_schema, nulls_first, asc)); \ } break; - PROCESS_SUPPORTED_TYPES(PROCESS) + PROCESS_SUPPORTED_TYPES_WINDOW_SORT(PROCESS) #undef PROCESS default: { std::cout << "WindowSortOnekeyKernel type not supported, type is " @@ -864,7 +758,7 @@ WindowSortKernel::WindowSortKernel( } kernel_name_ = "WindowSortKernel"; } -#undef PROCESS_SUPPORTED_TYPES +#undef PROCESS_SUPPORTED_TYPES_WINDOW_SORT arrow::Status WindowSortKernel::Evaluate(const ArrayList& in) { return impl_->Evaluate(in); diff --git a/cpp/src/tests/CMakeLists.txt b/cpp/src/tests/CMakeLists.txt index f01c5c149..2d677f18a 100644 --- a/cpp/src/tests/CMakeLists.txt +++ b/cpp/src/tests/CMakeLists.txt @@ -7,3 +7,5 @@ package_add_test(TestArrowComputeCondition arrow_compute_test_check_condition.cc package_add_test(TestArrowComputeWSCG arrow_compute_test_wscg.cc) package_add_test(TestArrowComputeJoinWOCG arrow_compute_test_join_wocg.cc) package_add_test(TestShuffleSplit shuffle_split_test.cc) +package_add_test(TestArrowComputeWindow arrow_compute_test_window.cc) + diff --git a/cpp/src/tests/arrow_compute_test_window.cc b/cpp/src/tests/arrow_compute_test_window.cc new file mode 100644 index 000000000..b2039ce86 --- /dev/null +++ b/cpp/src/tests/arrow_compute_test_window.cc @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include + +#include "precompile/array.h" +#include "tests/test_utils.h" +#include "codegen/code_generator.h" +#include "codegen/code_generator_factory.h" +#include "precompile/gandiva.h" + +using arrow::int64; +using arrow::uint32; +using gandiva::TreeExprBuilder; + +namespace sparkcolumnarplugin { +namespace codegen { + +TEST(TestArrowComputeWindow, DoubleTest) { + std::shared_ptr input_batch; + auto sch = arrow::schema({field("col_int", arrow::int32()), field("col_dou", arrow::float64())}); + std::vector input_data = { + "[1, 2, 1]", + "[35.612, 37.244, 82.664]"}; + MakeInputBatch(input_data, sch, &input_batch); + + std::shared_ptr res = field("window_res", arrow::float64()); + + auto f_window = TreeExprBuilder::MakeExpression(TreeExprBuilder::MakeFunction("window", { + TreeExprBuilder::MakeFunction("sum", + {TreeExprBuilder::MakeField(field("col_dou", arrow::float64()))}, null()), + TreeExprBuilder::MakeFunction("partitionSpec", + {TreeExprBuilder::MakeField(field("col_int", arrow::int32()))}, null()), + }, binary()), res); + + arrow::compute::ExecContext ctx; + std::shared_ptr expr; + std::vector> out; + ASSERT_NOT_OK( + CreateCodeGenerator(ctx.memory_pool(), sch, {f_window}, {res}, &expr, true)) + ASSERT_NOT_OK(expr->evaluate(input_batch, nullptr)) + ASSERT_NOT_OK(expr->finish(&out)) + + std::shared_ptr expected_result; + std::vector expected_output_data = { + "[118.276, 37.244, 118.276]"}; + + MakeInputBatch(expected_output_data, arrow::schema({res}), &expected_result); + ASSERT_NOT_OK(Equals(*expected_result.get(), *(out.at(0).get()))); +} + +TEST(TestArrowComputeWindow, DecimalTest) { + std::shared_ptr input_batch; + auto sch = arrow::schema({field("col_int", arrow::int32()), field("col_dec", arrow::decimal128(8, 3))}); + std::vector input_data = { + "[1, 2, 1]", + "[\"35.612\", \"37.244\", \"82.664\"]"}; + MakeInputBatch(input_data, sch, &input_batch); + + std::shared_ptr res = field("window_res", arrow::decimal128(8, 3)); + + auto f_window = TreeExprBuilder::MakeExpression(TreeExprBuilder::MakeFunction("window", { + TreeExprBuilder::MakeFunction("sum", + {TreeExprBuilder::MakeField(field("col_dec", arrow::decimal128(8, 3)))}, null()), + TreeExprBuilder::MakeFunction("partitionSpec", + {TreeExprBuilder::MakeField(field("col_int", arrow::int32()))}, null()), + }, binary()), res); + + arrow::compute::ExecContext ctx; + std::shared_ptr expr; + std::vector> out; + ASSERT_NOT_OK( + CreateCodeGenerator(ctx.memory_pool(), sch, {f_window}, {res}, &expr, true)) + ASSERT_NOT_OK(expr->evaluate(input_batch, nullptr)) + ASSERT_NOT_OK(expr->finish(&out)) + + std::shared_ptr expected_result; + std::vector expected_output_data = { + "[\"118.276\", \"37.244\", \"118.276\"]"}; + + MakeInputBatch(expected_output_data, arrow::schema({res}), &expected_result); + ASSERT_NOT_OK(Equals(*expected_result.get(), *(out.at(0).get()))); +} + +TEST(TestArrowComputeWindow, DecimalAvgTest) { + return; // fixme decimal avg not supported? + std::shared_ptr input_batch; + auto sch = arrow::schema({field("col_int", arrow::int32()), field("col_dec", arrow::decimal128(8, 3))}); + std::vector input_data = { + "[1, 2, 1]", + "[\"35.612\", \"37.244\", \"82.664\"]"}; + MakeInputBatch(input_data, sch, &input_batch); + + std::shared_ptr res = field("window_res", arrow::decimal128(8, 3)); + + auto f_window = TreeExprBuilder::MakeExpression(TreeExprBuilder::MakeFunction("window", { + TreeExprBuilder::MakeFunction("avg", + {TreeExprBuilder::MakeField(field("col_dec", arrow::decimal128(8, 3)))}, null()), + TreeExprBuilder::MakeFunction("partitionSpec", + {TreeExprBuilder::MakeField(field("col_int", arrow::int32()))}, null()), + }, binary()), res); + + arrow::compute::ExecContext ctx; + std::shared_ptr expr; + std::vector> out; + ASSERT_NOT_OK( + CreateCodeGenerator(ctx.memory_pool(), sch, {f_window}, {res}, &expr, true)) + ASSERT_NOT_OK(expr->evaluate(input_batch, nullptr)) + ASSERT_NOT_OK(expr->finish(&out)) + + std::shared_ptr expected_result; + std::vector expected_output_data = { + "[\"118.276\", \"37.244\", \"118.276\"]"}; + + MakeInputBatch(expected_output_data, arrow::schema({res}), &expected_result); + ASSERT_NOT_OK(Equals(*expected_result.get(), *(out.at(0).get()))); +} + +TEST(TestArrowComputeWindow, DecimalRankTest) { + std::shared_ptr input_batch; + auto sch = arrow::schema({field("col_int", arrow::int32()), field("col_dec", arrow::decimal128(8, 3))}); + std::vector input_data = { + "[1, 2, 1]", + "[\"35.612\", \"37.244\", \"35.613\"]"}; + MakeInputBatch(input_data, sch, &input_batch); + + std::shared_ptr res = field("window_res", arrow::int32()); + + auto f_window = TreeExprBuilder::MakeExpression(TreeExprBuilder::MakeFunction("window", { + TreeExprBuilder::MakeFunction("rank_desc", + {TreeExprBuilder::MakeField(field("col_dec", arrow::decimal128(8, 3)))}, null()), + TreeExprBuilder::MakeFunction("partitionSpec", + {TreeExprBuilder::MakeField(field("col_int", arrow::int32()))}, null()), + }, binary()), res); + + arrow::compute::ExecContext ctx; + std::shared_ptr expr; + std::vector> out; + ASSERT_NOT_OK( + CreateCodeGenerator(ctx.memory_pool(), sch, {f_window}, {res}, &expr, true)) + ASSERT_NOT_OK(expr->evaluate(input_batch, nullptr)) + ASSERT_NOT_OK(expr->finish(&out)) + + std::shared_ptr expected_result; + std::vector expected_output_data = { + "[2, 1, 1]"}; + + MakeInputBatch(expected_output_data, arrow::schema({res}), &expected_result); + ASSERT_NOT_OK(Equals(*expected_result.get(), *(out.at(0).get()))); +} + +TEST(TestArrowComputeWindow, DecimalRankTest2) { + std::shared_ptr input_batch; + auto sch = arrow::schema({field("col_int", arrow::int32()), field("col_dec", arrow::decimal128(8, 3))}); + std::vector input_data = { + "[1, 2, 1]", + "[\"35.612\", \"37.244\", \"35.612\"]"}; + MakeInputBatch(input_data, sch, &input_batch); + + std::shared_ptr res = field("window_res", arrow::int32()); + + auto f_window = TreeExprBuilder::MakeExpression(TreeExprBuilder::MakeFunction("window", { + TreeExprBuilder::MakeFunction("rank_desc", + {TreeExprBuilder::MakeField(field("col_dec", arrow::decimal128(8, 3)))}, null()), + TreeExprBuilder::MakeFunction("partitionSpec", + {TreeExprBuilder::MakeField(field("col_int", arrow::int32()))}, null()), + }, binary()), res); + + arrow::compute::ExecContext ctx; + std::shared_ptr expr; + std::vector> out; + ASSERT_NOT_OK( + CreateCodeGenerator(ctx.memory_pool(), sch, {f_window}, {res}, &expr, true)) + ASSERT_NOT_OK(expr->evaluate(input_batch, nullptr)) + ASSERT_NOT_OK(expr->finish(&out)) + + std::shared_ptr expected_result; + std::vector expected_output_data = { + "[1, 1, 1]"}; + + MakeInputBatch(expected_output_data, arrow::schema({res}), &expected_result); + ASSERT_NOT_OK(Equals(*expected_result.get(), *(out.at(0).get()))); +} + +} // namespace codegen +} // namespace sparkcolumnarplugin From 693b64fa39e05cac3e9470894686166ace118546 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Wed, 10 Mar 2021 21:08:44 +0800 Subject: [PATCH 2/6] avg fix --- .../oap/execution/ColumnarWindowExec.scala | 22 +++++++++++-- cpp/src/tests/arrow_compute_test_window.cc | 33 +++++++++++++++++++ 2 files changed, 52 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/com/intel/oap/execution/ColumnarWindowExec.scala b/core/src/main/scala/com/intel/oap/execution/ColumnarWindowExec.scala index 65b70dc20..1f6f1b007 100644 --- a/core/src/main/scala/com/intel/oap/execution/ColumnarWindowExec.scala +++ b/core/src/main/scala/com/intel/oap/execution/ColumnarWindowExec.scala @@ -34,7 +34,7 @@ import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.datasources.v2.arrow.SparkMemoryUtils import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.{ArrayType, DecimalType} +import org.apache.spark.sql.types.{ArrayType, DataType, DecimalType, DoubleType, LongType} import org.apache.spark.sql.util.ArrowUtils import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.util.ExecutorManager @@ -262,9 +262,25 @@ object ColumnarWindowExec { ex match { case ae: AggregateExpression => ae.withNewChildren(ae.children.map(makeInputProject(_, inputProjects))) case ae: WindowExpression => ae.withNewChildren(ae.children.map(makeInputProject(_, inputProjects))) - case func @ (_: AggregateFunction | _:WindowFunction) => + case func @ (_: AggregateFunction | _: WindowFunction) => val params = func.children - func.withNewChildren(params.map { + // rewrite + val rewritten = func match { + case _: Average => + // rewrite params for AVG + params.map { + param => + param.dataType match { + case _: LongType | _: DecimalType => + Cast(param, DoubleType) + case _ => param + } + } + case _ => params + } + + // alias + func.withNewChildren(rewritten.map { case param @ (_: Cast | _: UnscaledValue) => val aliasName = "__alias_%d__".format(Random.nextLong()) val alias = Alias(param, aliasName)() diff --git a/cpp/src/tests/arrow_compute_test_window.cc b/cpp/src/tests/arrow_compute_test_window.cc index b2039ce86..cd98793ab 100644 --- a/cpp/src/tests/arrow_compute_test_window.cc +++ b/cpp/src/tests/arrow_compute_test_window.cc @@ -65,6 +65,39 @@ TEST(TestArrowComputeWindow, DoubleTest) { ASSERT_NOT_OK(Equals(*expected_result.get(), *(out.at(0).get()))); } +TEST(TestArrowComputeWindow, LongAvgTest) { + std::shared_ptr input_batch; + auto sch = arrow::schema({field("col_int", arrow::int32()), field("col_long", arrow::int64())}); + std::vector input_data = { + "[1, 2, 1]", + "[35612, 37244, 82664]"}; + MakeInputBatch(input_data, sch, &input_batch); + + std::shared_ptr res = field("window_res", arrow::int64()); + + auto f_window = TreeExprBuilder::MakeExpression(TreeExprBuilder::MakeFunction("window", { + TreeExprBuilder::MakeFunction("avg", + {TreeExprBuilder::MakeField(field("col_long", arrow::int64()))}, null()), + TreeExprBuilder::MakeFunction("partitionSpec", + {TreeExprBuilder::MakeField(field("col_int", arrow::int32()))}, null()), + }, binary()), res); + + arrow::compute::ExecContext ctx; + std::shared_ptr expr; + std::vector> out; + ASSERT_NOT_OK( + CreateCodeGenerator(ctx.memory_pool(), sch, {f_window}, {res}, &expr, true)) + ASSERT_NOT_OK(expr->evaluate(input_batch, nullptr)) + ASSERT_NOT_OK(expr->finish(&out)) + + std::shared_ptr expected_result; + std::vector expected_output_data = { + "[59138, 37244, 59138]"}; + + MakeInputBatch(expected_output_data, arrow::schema({res}), &expected_result); + ASSERT_NOT_OK(Equals(*expected_result.get(), *(out.at(0).get()))); +} + TEST(TestArrowComputeWindow, DecimalTest) { std::shared_ptr input_batch; auto sch = arrow::schema({field("col_int", arrow::int32()), field("col_dec", arrow::decimal128(8, 3))}); From 361ad0858eac16061eb2c9174ead255184156a46 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Thu, 11 Mar 2021 10:17:25 +0800 Subject: [PATCH 3/6] type fix --- .../oap/execution/ColumnarWindowExec.scala | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/com/intel/oap/execution/ColumnarWindowExec.scala b/core/src/main/scala/com/intel/oap/execution/ColumnarWindowExec.scala index 1f6f1b007..9414e87d6 100644 --- a/core/src/main/scala/com/intel/oap/execution/ColumnarWindowExec.scala +++ b/core/src/main/scala/com/intel/oap/execution/ColumnarWindowExec.scala @@ -292,8 +292,18 @@ object ColumnarWindowExec { } } + def sameType(from: DataType, to: DataType): Boolean = { + if (from == null || to == null) { + return false + } + if (from == to) { + return true + } + DataType.equalsStructurally(from, to) + } + def makeOutputProject(ex: Expression, windows: ListBuffer[NamedExpression], inputProjects: ListBuffer[NamedExpression]): Expression = { - ex match { + val out = ex match { case we: WindowExpression => val aliasName = "__alias_%d__".format(Random.nextLong()) val alias = Alias(makeInputProject(we, inputProjects), aliasName)() @@ -302,6 +312,13 @@ object ColumnarWindowExec { case _ => ex.withNewChildren(ex.children.map(makeOutputProject(_, windows, inputProjects))) } + // forcibly cast to original type against possible rewriting + val casted = if (sameType(ex.dataType, out.dataType)) { + out + } else { + Cast(out, ex.dataType) + } + casted } val windows = ListBuffer[NamedExpression]() From 9e3dec00bf91f22cfb117c5c7664919bf27ffc7c Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Thu, 11 Mar 2021 10:22:14 +0800 Subject: [PATCH 4/6] update resources --- .../q1.sql | 0 .../q10.sql | 0 .../q11.sql | 0 .../q12.sql | 0 .../q13.sql | 0 .../q14a.sql | 0 .../q14b.sql | 0 .../q15.sql | 0 .../q16.sql | 0 .../q17.sql | 0 .../q18.sql | 14 +++++------ .../q19.sql | 0 .../q2.sql | 0 .../q20.sql | 0 .../q21.sql | 0 .../q22.sql | 0 .../q23a.sql | 0 .../q23b.sql | 0 .../q24a.sql | 0 .../q24b.sql | 0 .../q25.sql | 0 .../q26.sql | 0 .../q27.sql | 0 .../q28.sql | 0 .../q29.sql | 0 .../q3.sql | 0 .../q30.sql | 0 .../q31.sql | 0 .../q32.sql | 0 .../q33.sql | 0 .../q34.sql | 0 .../q35.sql | 0 .../q36.sql | 0 .../q37.sql | 0 .../q38.sql | 0 .../q39a.sql | 0 .../q39b.sql | 0 .../q4.sql | 0 .../q40.sql | 0 .../q41.sql | 0 .../q42.sql | 0 .../q43.sql | 0 .../q44.sql | 0 .../q45.sql | 0 .../q46.sql | 0 .../q47.sql | 0 .../q48.sql | 0 .../q49.sql | 24 +++++++++---------- .../q5.sql | 24 +++++++++---------- .../q50.sql | 0 .../q51.sql | 0 .../q52.sql | 0 .../q53.sql | 0 .../q54.sql | 0 .../q55.sql | 0 .../q56.sql | 0 .../q57.sql | 0 .../q58.sql | 0 .../q59.sql | 0 .../q6.sql | 0 .../q60.sql | 0 .../q61.sql | 2 +- .../q62.sql | 0 .../q63.sql | 0 .../q64.sql | 0 .../q65.sql | 0 .../q66.sql | 0 .../q67.sql | 0 .../q68.sql | 0 .../q69.sql | 0 .../q7.sql | 0 .../q70.sql | 0 .../q71.sql | 0 .../q72.sql | 0 .../q73.sql | 0 .../q74.sql | 0 .../q75.sql | 2 +- .../q76.sql | 0 .../q77.sql | 0 .../q78.sql | 0 .../q79.sql | 0 .../q8.sql | 0 .../q80.sql | 0 .../q81.sql | 0 .../q82.sql | 0 .../q83.sql | 0 .../q84.sql | 0 .../q85.sql | 0 .../q86.sql | 0 .../q87.sql | 0 .../q88.sql | 0 .../q89.sql | 0 .../q9.sql | 0 .../q90.sql | 2 +- .../q91.sql | 0 .../q92.sql | 0 .../q93.sql | 0 .../q94.sql | 0 .../q95.sql | 0 .../q96.sql | 0 .../q97.sql | 0 .../q98.sql | 0 .../q99.sql | 0 .../q1.sql | 0 .../q10.sql | 0 .../q11.sql | 0 .../q12.sql | 0 .../q13.sql | 0 .../q14.sql | 0 .../q15.sql | 0 .../q16.sql | 0 .../q17.sql | 0 .../q18.sql | 0 .../q19.sql | 0 .../q2.sql | 0 .../q20.sql | 2 +- .../q21.sql | 0 .../q22.sql | 0 .../q3.sql | 0 .../q4.sql | 0 .../q5.sql | 0 .../q6.sql | 0 .../q7.sql | 0 .../q8.sql | 0 .../q9.sql | 0 .../com/intel/oap/tpc/ds/TPCDSSuite.scala | 2 +- .../scala/com/intel/oap/tpc/h/TPCHSuite.scala | 2 +- 127 files changed, 37 insertions(+), 37 deletions(-) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q1.sql (100%) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q10.sql (100%) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q11.sql (100%) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q12.sql (100%) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q13.sql (100%) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q14a.sql (100%) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q14b.sql (100%) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q15.sql (100%) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q16.sql (100%) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q17.sql (100%) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q18.sql (66%) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q19.sql (100%) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q2.sql (100%) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q20.sql (100%) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q21.sql (100%) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q22.sql (100%) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q23a.sql (100%) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q23b.sql (100%) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q24a.sql (100%) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q24b.sql (100%) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q25.sql (100%) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q26.sql (100%) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q27.sql (100%) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q28.sql (100%) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q29.sql (100%) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q3.sql (100%) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q30.sql (100%) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q31.sql (100%) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q32.sql (100%) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q33.sql (100%) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q34.sql (100%) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q35.sql (100%) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q36.sql (100%) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q37.sql (100%) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q38.sql (100%) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q39a.sql (100%) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q39b.sql (100%) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q4.sql (100%) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q40.sql (100%) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q41.sql (100%) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q42.sql (100%) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q43.sql (100%) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q44.sql (100%) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q45.sql (100%) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q46.sql (100%) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q47.sql (100%) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q48.sql (100%) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q49.sql (75%) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q5.sql (85%) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q50.sql (100%) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q51.sql (100%) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q52.sql (100%) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q53.sql (100%) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q54.sql (100%) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q55.sql (100%) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q56.sql (100%) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q57.sql (100%) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q58.sql (100%) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q59.sql (100%) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q6.sql (100%) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q60.sql (100%) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q61.sql (93%) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q62.sql (100%) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q63.sql (100%) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q64.sql (100%) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q65.sql (100%) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q66.sql (100%) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q67.sql (100%) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q68.sql (100%) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q69.sql (100%) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q7.sql (100%) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q70.sql (100%) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q71.sql (100%) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q72.sql (100%) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q73.sql (100%) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q74.sql (100%) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q75.sql (96%) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q76.sql (100%) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q77.sql (100%) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q78.sql (100%) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q79.sql (100%) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q8.sql (100%) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q80.sql (100%) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q81.sql (100%) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q82.sql (100%) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q83.sql (100%) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q84.sql (100%) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q85.sql (100%) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q86.sql (100%) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q87.sql (100%) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q88.sql (100%) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q89.sql (100%) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q9.sql (100%) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q90.sql (91%) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q91.sql (100%) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q92.sql (100%) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q93.sql (100%) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q94.sql (100%) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q95.sql (100%) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q96.sql (100%) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q97.sql (100%) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q98.sql (100%) rename core/src/test/resources/{tpcds-queries-double => tpcds-queries}/q99.sql (100%) rename core/src/test/resources/{tpch-queries-double => tpch-queries}/q1.sql (100%) rename core/src/test/resources/{tpch-queries-double => tpch-queries}/q10.sql (100%) rename core/src/test/resources/{tpch-queries-double => tpch-queries}/q11.sql (100%) rename core/src/test/resources/{tpch-queries-double => tpch-queries}/q12.sql (100%) rename core/src/test/resources/{tpch-queries-double => tpch-queries}/q13.sql (100%) rename core/src/test/resources/{tpch-queries-double => tpch-queries}/q14.sql (100%) rename core/src/test/resources/{tpch-queries-double => tpch-queries}/q15.sql (100%) rename core/src/test/resources/{tpch-queries-double => tpch-queries}/q16.sql (100%) rename core/src/test/resources/{tpch-queries-double => tpch-queries}/q17.sql (100%) rename core/src/test/resources/{tpch-queries-double => tpch-queries}/q18.sql (100%) rename core/src/test/resources/{tpch-queries-double => tpch-queries}/q19.sql (100%) rename core/src/test/resources/{tpch-queries-double => tpch-queries}/q2.sql (100%) rename core/src/test/resources/{tpch-queries-double => tpch-queries}/q20.sql (92%) rename core/src/test/resources/{tpch-queries-double => tpch-queries}/q21.sql (100%) rename core/src/test/resources/{tpch-queries-double => tpch-queries}/q22.sql (100%) rename core/src/test/resources/{tpch-queries-double => tpch-queries}/q3.sql (100%) rename core/src/test/resources/{tpch-queries-double => tpch-queries}/q4.sql (100%) rename core/src/test/resources/{tpch-queries-double => tpch-queries}/q5.sql (100%) rename core/src/test/resources/{tpch-queries-double => tpch-queries}/q6.sql (100%) rename core/src/test/resources/{tpch-queries-double => tpch-queries}/q7.sql (100%) rename core/src/test/resources/{tpch-queries-double => tpch-queries}/q8.sql (100%) rename core/src/test/resources/{tpch-queries-double => tpch-queries}/q9.sql (100%) diff --git a/core/src/test/resources/tpcds-queries-double/q1.sql b/core/src/test/resources/tpcds-queries/q1.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q1.sql rename to core/src/test/resources/tpcds-queries/q1.sql diff --git a/core/src/test/resources/tpcds-queries-double/q10.sql b/core/src/test/resources/tpcds-queries/q10.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q10.sql rename to core/src/test/resources/tpcds-queries/q10.sql diff --git a/core/src/test/resources/tpcds-queries-double/q11.sql b/core/src/test/resources/tpcds-queries/q11.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q11.sql rename to core/src/test/resources/tpcds-queries/q11.sql diff --git a/core/src/test/resources/tpcds-queries-double/q12.sql b/core/src/test/resources/tpcds-queries/q12.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q12.sql rename to core/src/test/resources/tpcds-queries/q12.sql diff --git a/core/src/test/resources/tpcds-queries-double/q13.sql b/core/src/test/resources/tpcds-queries/q13.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q13.sql rename to core/src/test/resources/tpcds-queries/q13.sql diff --git a/core/src/test/resources/tpcds-queries-double/q14a.sql b/core/src/test/resources/tpcds-queries/q14a.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q14a.sql rename to core/src/test/resources/tpcds-queries/q14a.sql diff --git a/core/src/test/resources/tpcds-queries-double/q14b.sql b/core/src/test/resources/tpcds-queries/q14b.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q14b.sql rename to core/src/test/resources/tpcds-queries/q14b.sql diff --git a/core/src/test/resources/tpcds-queries-double/q15.sql b/core/src/test/resources/tpcds-queries/q15.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q15.sql rename to core/src/test/resources/tpcds-queries/q15.sql diff --git a/core/src/test/resources/tpcds-queries-double/q16.sql b/core/src/test/resources/tpcds-queries/q16.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q16.sql rename to core/src/test/resources/tpcds-queries/q16.sql diff --git a/core/src/test/resources/tpcds-queries-double/q17.sql b/core/src/test/resources/tpcds-queries/q17.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q17.sql rename to core/src/test/resources/tpcds-queries/q17.sql diff --git a/core/src/test/resources/tpcds-queries-double/q18.sql b/core/src/test/resources/tpcds-queries/q18.sql similarity index 66% rename from core/src/test/resources/tpcds-queries-double/q18.sql rename to core/src/test/resources/tpcds-queries/q18.sql index 567d4da66..4055c80fd 100755 --- a/core/src/test/resources/tpcds-queries-double/q18.sql +++ b/core/src/test/resources/tpcds-queries/q18.sql @@ -3,13 +3,13 @@ SELECT ca_country, ca_state, ca_county, - avg(cast(cs_quantity AS DOUBLE)) agg1, - avg(cast(cs_list_price AS DOUBLE)) agg2, - avg(cast(cs_coupon_amt AS DOUBLE)) agg3, - avg(cast(cs_sales_price AS DOUBLE)) agg4, - avg(cast(cs_net_profit AS DOUBLE)) agg5, - avg(cast(c_birth_year AS DOUBLE)) agg6, - avg(cast(cd1.cd_dep_count AS DOUBLE)) agg7 + avg(cast(cs_quantity AS DECIMAL(12, 2))) agg1, + avg(cast(cs_list_price AS DECIMAL(12, 2))) agg2, + avg(cast(cs_coupon_amt AS DECIMAL(12, 2))) agg3, + avg(cast(cs_sales_price AS DECIMAL(12, 2))) agg4, + avg(cast(cs_net_profit AS DECIMAL(12, 2))) agg5, + avg(cast(c_birth_year AS DECIMAL(12, 2))) agg6, + avg(cast(cd1.cd_dep_count AS DECIMAL(12, 2))) agg7 FROM catalog_sales, customer_demographics cd1, customer_demographics cd2, customer, customer_address, date_dim, item WHERE cs_sold_date_sk = d_date_sk AND diff --git a/core/src/test/resources/tpcds-queries-double/q19.sql b/core/src/test/resources/tpcds-queries/q19.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q19.sql rename to core/src/test/resources/tpcds-queries/q19.sql diff --git a/core/src/test/resources/tpcds-queries-double/q2.sql b/core/src/test/resources/tpcds-queries/q2.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q2.sql rename to core/src/test/resources/tpcds-queries/q2.sql diff --git a/core/src/test/resources/tpcds-queries-double/q20.sql b/core/src/test/resources/tpcds-queries/q20.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q20.sql rename to core/src/test/resources/tpcds-queries/q20.sql diff --git a/core/src/test/resources/tpcds-queries-double/q21.sql b/core/src/test/resources/tpcds-queries/q21.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q21.sql rename to core/src/test/resources/tpcds-queries/q21.sql diff --git a/core/src/test/resources/tpcds-queries-double/q22.sql b/core/src/test/resources/tpcds-queries/q22.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q22.sql rename to core/src/test/resources/tpcds-queries/q22.sql diff --git a/core/src/test/resources/tpcds-queries-double/q23a.sql b/core/src/test/resources/tpcds-queries/q23a.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q23a.sql rename to core/src/test/resources/tpcds-queries/q23a.sql diff --git a/core/src/test/resources/tpcds-queries-double/q23b.sql b/core/src/test/resources/tpcds-queries/q23b.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q23b.sql rename to core/src/test/resources/tpcds-queries/q23b.sql diff --git a/core/src/test/resources/tpcds-queries-double/q24a.sql b/core/src/test/resources/tpcds-queries/q24a.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q24a.sql rename to core/src/test/resources/tpcds-queries/q24a.sql diff --git a/core/src/test/resources/tpcds-queries-double/q24b.sql b/core/src/test/resources/tpcds-queries/q24b.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q24b.sql rename to core/src/test/resources/tpcds-queries/q24b.sql diff --git a/core/src/test/resources/tpcds-queries-double/q25.sql b/core/src/test/resources/tpcds-queries/q25.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q25.sql rename to core/src/test/resources/tpcds-queries/q25.sql diff --git a/core/src/test/resources/tpcds-queries-double/q26.sql b/core/src/test/resources/tpcds-queries/q26.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q26.sql rename to core/src/test/resources/tpcds-queries/q26.sql diff --git a/core/src/test/resources/tpcds-queries-double/q27.sql b/core/src/test/resources/tpcds-queries/q27.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q27.sql rename to core/src/test/resources/tpcds-queries/q27.sql diff --git a/core/src/test/resources/tpcds-queries-double/q28.sql b/core/src/test/resources/tpcds-queries/q28.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q28.sql rename to core/src/test/resources/tpcds-queries/q28.sql diff --git a/core/src/test/resources/tpcds-queries-double/q29.sql b/core/src/test/resources/tpcds-queries/q29.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q29.sql rename to core/src/test/resources/tpcds-queries/q29.sql diff --git a/core/src/test/resources/tpcds-queries-double/q3.sql b/core/src/test/resources/tpcds-queries/q3.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q3.sql rename to core/src/test/resources/tpcds-queries/q3.sql diff --git a/core/src/test/resources/tpcds-queries-double/q30.sql b/core/src/test/resources/tpcds-queries/q30.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q30.sql rename to core/src/test/resources/tpcds-queries/q30.sql diff --git a/core/src/test/resources/tpcds-queries-double/q31.sql b/core/src/test/resources/tpcds-queries/q31.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q31.sql rename to core/src/test/resources/tpcds-queries/q31.sql diff --git a/core/src/test/resources/tpcds-queries-double/q32.sql b/core/src/test/resources/tpcds-queries/q32.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q32.sql rename to core/src/test/resources/tpcds-queries/q32.sql diff --git a/core/src/test/resources/tpcds-queries-double/q33.sql b/core/src/test/resources/tpcds-queries/q33.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q33.sql rename to core/src/test/resources/tpcds-queries/q33.sql diff --git a/core/src/test/resources/tpcds-queries-double/q34.sql b/core/src/test/resources/tpcds-queries/q34.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q34.sql rename to core/src/test/resources/tpcds-queries/q34.sql diff --git a/core/src/test/resources/tpcds-queries-double/q35.sql b/core/src/test/resources/tpcds-queries/q35.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q35.sql rename to core/src/test/resources/tpcds-queries/q35.sql diff --git a/core/src/test/resources/tpcds-queries-double/q36.sql b/core/src/test/resources/tpcds-queries/q36.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q36.sql rename to core/src/test/resources/tpcds-queries/q36.sql diff --git a/core/src/test/resources/tpcds-queries-double/q37.sql b/core/src/test/resources/tpcds-queries/q37.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q37.sql rename to core/src/test/resources/tpcds-queries/q37.sql diff --git a/core/src/test/resources/tpcds-queries-double/q38.sql b/core/src/test/resources/tpcds-queries/q38.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q38.sql rename to core/src/test/resources/tpcds-queries/q38.sql diff --git a/core/src/test/resources/tpcds-queries-double/q39a.sql b/core/src/test/resources/tpcds-queries/q39a.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q39a.sql rename to core/src/test/resources/tpcds-queries/q39a.sql diff --git a/core/src/test/resources/tpcds-queries-double/q39b.sql b/core/src/test/resources/tpcds-queries/q39b.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q39b.sql rename to core/src/test/resources/tpcds-queries/q39b.sql diff --git a/core/src/test/resources/tpcds-queries-double/q4.sql b/core/src/test/resources/tpcds-queries/q4.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q4.sql rename to core/src/test/resources/tpcds-queries/q4.sql diff --git a/core/src/test/resources/tpcds-queries-double/q40.sql b/core/src/test/resources/tpcds-queries/q40.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q40.sql rename to core/src/test/resources/tpcds-queries/q40.sql diff --git a/core/src/test/resources/tpcds-queries-double/q41.sql b/core/src/test/resources/tpcds-queries/q41.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q41.sql rename to core/src/test/resources/tpcds-queries/q41.sql diff --git a/core/src/test/resources/tpcds-queries-double/q42.sql b/core/src/test/resources/tpcds-queries/q42.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q42.sql rename to core/src/test/resources/tpcds-queries/q42.sql diff --git a/core/src/test/resources/tpcds-queries-double/q43.sql b/core/src/test/resources/tpcds-queries/q43.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q43.sql rename to core/src/test/resources/tpcds-queries/q43.sql diff --git a/core/src/test/resources/tpcds-queries-double/q44.sql b/core/src/test/resources/tpcds-queries/q44.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q44.sql rename to core/src/test/resources/tpcds-queries/q44.sql diff --git a/core/src/test/resources/tpcds-queries-double/q45.sql b/core/src/test/resources/tpcds-queries/q45.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q45.sql rename to core/src/test/resources/tpcds-queries/q45.sql diff --git a/core/src/test/resources/tpcds-queries-double/q46.sql b/core/src/test/resources/tpcds-queries/q46.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q46.sql rename to core/src/test/resources/tpcds-queries/q46.sql diff --git a/core/src/test/resources/tpcds-queries-double/q47.sql b/core/src/test/resources/tpcds-queries/q47.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q47.sql rename to core/src/test/resources/tpcds-queries/q47.sql diff --git a/core/src/test/resources/tpcds-queries-double/q48.sql b/core/src/test/resources/tpcds-queries/q48.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q48.sql rename to core/src/test/resources/tpcds-queries/q48.sql diff --git a/core/src/test/resources/tpcds-queries-double/q49.sql b/core/src/test/resources/tpcds-queries/q49.sql similarity index 75% rename from core/src/test/resources/tpcds-queries-double/q49.sql rename to core/src/test/resources/tpcds-queries/q49.sql index 2f7f497cf..9568d8b92 100755 --- a/core/src/test/resources/tpcds-queries-double/q49.sql +++ b/core/src/test/resources/tpcds-queries/q49.sql @@ -18,10 +18,10 @@ FROM ( FROM (SELECT ws.ws_item_sk AS item, - (cast(sum(coalesce(wr.wr_return_quantity, 0)) AS DOUBLE) / - cast(sum(coalesce(ws.ws_quantity, 0)) AS DOUBLE)) AS return_ratio, - (cast(sum(coalesce(wr.wr_return_amt, 0)) AS DOUBLE) / - cast(sum(coalesce(ws.ws_net_paid, 0)) AS DOUBLE)) AS currency_ratio + (cast(sum(coalesce(wr.wr_return_quantity, 0)) AS DECIMAL(15, 4)) / + cast(sum(coalesce(ws.ws_quantity, 0)) AS DECIMAL(15, 4))) AS return_ratio, + (cast(sum(coalesce(wr.wr_return_amt, 0)) AS DECIMAL(15, 4)) / + cast(sum(coalesce(ws.ws_net_paid, 0)) AS DECIMAL(15, 4))) AS currency_ratio FROM web_sales ws LEFT OUTER JOIN web_returns wr ON (ws.ws_order_number = wr.wr_order_number AND @@ -60,10 +60,10 @@ FROM ( FROM (SELECT cs.cs_item_sk AS item, - (cast(sum(coalesce(cr.cr_return_quantity, 0)) AS DOUBLE) / - cast(sum(coalesce(cs.cs_quantity, 0)) AS DOUBLE)) AS return_ratio, - (cast(sum(coalesce(cr.cr_return_amount, 0)) AS DOUBLE) / - cast(sum(coalesce(cs.cs_net_paid, 0)) AS DOUBLE)) AS currency_ratio + (cast(sum(coalesce(cr.cr_return_quantity, 0)) AS DECIMAL(15, 4)) / + cast(sum(coalesce(cs.cs_quantity, 0)) AS DECIMAL(15, 4))) AS return_ratio, + (cast(sum(coalesce(cr.cr_return_amount, 0)) AS DECIMAL(15, 4)) / + cast(sum(coalesce(cs.cs_net_paid, 0)) AS DECIMAL(15, 4))) AS currency_ratio FROM catalog_sales cs LEFT OUTER JOIN catalog_returns cr ON (cs.cs_order_number = cr.cr_order_number AND @@ -102,10 +102,10 @@ FROM ( FROM (SELECT sts.ss_item_sk AS item, - (cast(sum(coalesce(sr.sr_return_quantity, 0)) AS DOUBLE) / - cast(sum(coalesce(sts.ss_quantity, 0)) AS DOUBLE)) AS return_ratio, - (cast(sum(coalesce(sr.sr_return_amt, 0)) AS DOUBLE) / - cast(sum(coalesce(sts.ss_net_paid, 0)) AS DOUBLE)) AS currency_ratio + (cast(sum(coalesce(sr.sr_return_quantity, 0)) AS DECIMAL(15, 4)) / + cast(sum(coalesce(sts.ss_quantity, 0)) AS DECIMAL(15, 4))) AS return_ratio, + (cast(sum(coalesce(sr.sr_return_amt, 0)) AS DECIMAL(15, 4)) / + cast(sum(coalesce(sts.ss_net_paid, 0)) AS DECIMAL(15, 4))) AS currency_ratio FROM store_sales sts LEFT OUTER JOIN store_returns sr ON (sts.ss_ticket_number = sr.sr_ticket_number AND sts.ss_item_sk = sr.sr_item_sk) diff --git a/core/src/test/resources/tpcds-queries-double/q5.sql b/core/src/test/resources/tpcds-queries/q5.sql similarity index 85% rename from core/src/test/resources/tpcds-queries-double/q5.sql rename to core/src/test/resources/tpcds-queries/q5.sql index e242d008e..b87cf3a44 100755 --- a/core/src/test/resources/tpcds-queries-double/q5.sql +++ b/core/src/test/resources/tpcds-queries/q5.sql @@ -11,15 +11,15 @@ WITH ssr AS ss_sold_date_sk AS date_sk, ss_ext_sales_price AS sales_price, ss_net_profit AS profit, - cast(0 AS DOUBLE) AS return_amt, - cast(0 AS DOUBLE) AS net_loss + cast(0 AS DECIMAL(7, 2)) AS return_amt, + cast(0 AS DECIMAL(7, 2)) AS net_loss FROM store_sales UNION ALL SELECT sr_store_sk AS store_sk, sr_returned_date_sk AS date_sk, - cast(0 AS DOUBLE) AS sales_price, - cast(0 AS DOUBLE) AS profit, + cast(0 AS DECIMAL(7, 2)) AS sales_price, + cast(0 AS DECIMAL(7, 2)) AS profit, sr_return_amt AS return_amt, sr_net_loss AS net_loss FROM store_returns) @@ -42,15 +42,15 @@ WITH ssr AS cs_sold_date_sk AS date_sk, cs_ext_sales_price AS sales_price, cs_net_profit AS profit, - cast(0 AS DOUBLE) AS return_amt, - cast(0 AS DOUBLE) AS net_loss + cast(0 AS DECIMAL(7, 2)) AS return_amt, + cast(0 AS DECIMAL(7, 2)) AS net_loss FROM catalog_sales UNION ALL SELECT cr_catalog_page_sk AS page_sk, cr_returned_date_sk AS date_sk, - cast(0 AS DOUBLE) AS sales_price, - cast(0 AS DOUBLE) AS profit, + cast(0 AS DECIMAL(7, 2)) AS sales_price, + cast(0 AS DECIMAL(7, 2)) AS profit, cr_return_amount AS return_amt, cr_net_loss AS net_loss FROM catalog_returns @@ -74,15 +74,15 @@ WITH ssr AS ws_sold_date_sk AS date_sk, ws_ext_sales_price AS sales_price, ws_net_profit AS profit, - cast(0 AS DOUBLE) AS return_amt, - cast(0 AS DOUBLE) AS net_loss + cast(0 AS DECIMAL(7, 2)) AS return_amt, + cast(0 AS DECIMAL(7, 2)) AS net_loss FROM web_sales UNION ALL SELECT ws_web_site_sk AS wsr_web_site_sk, wr_returned_date_sk AS date_sk, - cast(0 AS DOUBLE) AS sales_price, - cast(0 AS DOUBLE) AS profit, + cast(0 AS DECIMAL(7, 2)) AS sales_price, + cast(0 AS DECIMAL(7, 2)) AS profit, wr_return_amt AS return_amt, wr_net_loss AS net_loss FROM web_returns diff --git a/core/src/test/resources/tpcds-queries-double/q50.sql b/core/src/test/resources/tpcds-queries/q50.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q50.sql rename to core/src/test/resources/tpcds-queries/q50.sql diff --git a/core/src/test/resources/tpcds-queries-double/q51.sql b/core/src/test/resources/tpcds-queries/q51.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q51.sql rename to core/src/test/resources/tpcds-queries/q51.sql diff --git a/core/src/test/resources/tpcds-queries-double/q52.sql b/core/src/test/resources/tpcds-queries/q52.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q52.sql rename to core/src/test/resources/tpcds-queries/q52.sql diff --git a/core/src/test/resources/tpcds-queries-double/q53.sql b/core/src/test/resources/tpcds-queries/q53.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q53.sql rename to core/src/test/resources/tpcds-queries/q53.sql diff --git a/core/src/test/resources/tpcds-queries-double/q54.sql b/core/src/test/resources/tpcds-queries/q54.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q54.sql rename to core/src/test/resources/tpcds-queries/q54.sql diff --git a/core/src/test/resources/tpcds-queries-double/q55.sql b/core/src/test/resources/tpcds-queries/q55.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q55.sql rename to core/src/test/resources/tpcds-queries/q55.sql diff --git a/core/src/test/resources/tpcds-queries-double/q56.sql b/core/src/test/resources/tpcds-queries/q56.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q56.sql rename to core/src/test/resources/tpcds-queries/q56.sql diff --git a/core/src/test/resources/tpcds-queries-double/q57.sql b/core/src/test/resources/tpcds-queries/q57.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q57.sql rename to core/src/test/resources/tpcds-queries/q57.sql diff --git a/core/src/test/resources/tpcds-queries-double/q58.sql b/core/src/test/resources/tpcds-queries/q58.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q58.sql rename to core/src/test/resources/tpcds-queries/q58.sql diff --git a/core/src/test/resources/tpcds-queries-double/q59.sql b/core/src/test/resources/tpcds-queries/q59.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q59.sql rename to core/src/test/resources/tpcds-queries/q59.sql diff --git a/core/src/test/resources/tpcds-queries-double/q6.sql b/core/src/test/resources/tpcds-queries/q6.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q6.sql rename to core/src/test/resources/tpcds-queries/q6.sql diff --git a/core/src/test/resources/tpcds-queries-double/q60.sql b/core/src/test/resources/tpcds-queries/q60.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q60.sql rename to core/src/test/resources/tpcds-queries/q60.sql diff --git a/core/src/test/resources/tpcds-queries-double/q61.sql b/core/src/test/resources/tpcds-queries/q61.sql similarity index 93% rename from core/src/test/resources/tpcds-queries-double/q61.sql rename to core/src/test/resources/tpcds-queries/q61.sql index 79e5d975c..b0a872b4b 100755 --- a/core/src/test/resources/tpcds-queries-double/q61.sql +++ b/core/src/test/resources/tpcds-queries/q61.sql @@ -1,7 +1,7 @@ SELECT promotions, total, - cast(promotions AS DOUBLE) / cast(total AS DOUBLE) * 100 + cast(promotions AS DECIMAL(15, 4)) / cast(total AS DECIMAL(15, 4)) * 100 FROM (SELECT sum(ss_ext_sales_price) promotions FROM store_sales, store, promotion, date_dim, customer, customer_address, item diff --git a/core/src/test/resources/tpcds-queries-double/q62.sql b/core/src/test/resources/tpcds-queries/q62.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q62.sql rename to core/src/test/resources/tpcds-queries/q62.sql diff --git a/core/src/test/resources/tpcds-queries-double/q63.sql b/core/src/test/resources/tpcds-queries/q63.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q63.sql rename to core/src/test/resources/tpcds-queries/q63.sql diff --git a/core/src/test/resources/tpcds-queries-double/q64.sql b/core/src/test/resources/tpcds-queries/q64.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q64.sql rename to core/src/test/resources/tpcds-queries/q64.sql diff --git a/core/src/test/resources/tpcds-queries-double/q65.sql b/core/src/test/resources/tpcds-queries/q65.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q65.sql rename to core/src/test/resources/tpcds-queries/q65.sql diff --git a/core/src/test/resources/tpcds-queries-double/q66.sql b/core/src/test/resources/tpcds-queries/q66.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q66.sql rename to core/src/test/resources/tpcds-queries/q66.sql diff --git a/core/src/test/resources/tpcds-queries-double/q67.sql b/core/src/test/resources/tpcds-queries/q67.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q67.sql rename to core/src/test/resources/tpcds-queries/q67.sql diff --git a/core/src/test/resources/tpcds-queries-double/q68.sql b/core/src/test/resources/tpcds-queries/q68.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q68.sql rename to core/src/test/resources/tpcds-queries/q68.sql diff --git a/core/src/test/resources/tpcds-queries-double/q69.sql b/core/src/test/resources/tpcds-queries/q69.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q69.sql rename to core/src/test/resources/tpcds-queries/q69.sql diff --git a/core/src/test/resources/tpcds-queries-double/q7.sql b/core/src/test/resources/tpcds-queries/q7.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q7.sql rename to core/src/test/resources/tpcds-queries/q7.sql diff --git a/core/src/test/resources/tpcds-queries-double/q70.sql b/core/src/test/resources/tpcds-queries/q70.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q70.sql rename to core/src/test/resources/tpcds-queries/q70.sql diff --git a/core/src/test/resources/tpcds-queries-double/q71.sql b/core/src/test/resources/tpcds-queries/q71.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q71.sql rename to core/src/test/resources/tpcds-queries/q71.sql diff --git a/core/src/test/resources/tpcds-queries-double/q72.sql b/core/src/test/resources/tpcds-queries/q72.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q72.sql rename to core/src/test/resources/tpcds-queries/q72.sql diff --git a/core/src/test/resources/tpcds-queries-double/q73.sql b/core/src/test/resources/tpcds-queries/q73.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q73.sql rename to core/src/test/resources/tpcds-queries/q73.sql diff --git a/core/src/test/resources/tpcds-queries-double/q74.sql b/core/src/test/resources/tpcds-queries/q74.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q74.sql rename to core/src/test/resources/tpcds-queries/q74.sql diff --git a/core/src/test/resources/tpcds-queries-double/q75.sql b/core/src/test/resources/tpcds-queries/q75.sql similarity index 96% rename from core/src/test/resources/tpcds-queries-double/q75.sql rename to core/src/test/resources/tpcds-queries/q75.sql index 3f7b67926..2a143232b 100755 --- a/core/src/test/resources/tpcds-queries-double/q75.sql +++ b/core/src/test/resources/tpcds-queries/q75.sql @@ -71,6 +71,6 @@ WHERE curr_yr.i_brand_id = prev_yr.i_brand_id AND curr_yr.i_manufact_id = prev_yr.i_manufact_id AND curr_yr.d_year = 2002 AND prev_yr.d_year = 2002 - 1 - AND CAST(curr_yr.sales_cnt AS DOUBLE) / CAST(prev_yr.sales_cnt AS DOUBLE) < 0.9 + AND CAST(curr_yr.sales_cnt AS DECIMAL(17, 2)) / CAST(prev_yr.sales_cnt AS DECIMAL(17, 2)) < 0.9 ORDER BY sales_cnt_diff LIMIT 100 diff --git a/core/src/test/resources/tpcds-queries-double/q76.sql b/core/src/test/resources/tpcds-queries/q76.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q76.sql rename to core/src/test/resources/tpcds-queries/q76.sql diff --git a/core/src/test/resources/tpcds-queries-double/q77.sql b/core/src/test/resources/tpcds-queries/q77.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q77.sql rename to core/src/test/resources/tpcds-queries/q77.sql diff --git a/core/src/test/resources/tpcds-queries-double/q78.sql b/core/src/test/resources/tpcds-queries/q78.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q78.sql rename to core/src/test/resources/tpcds-queries/q78.sql diff --git a/core/src/test/resources/tpcds-queries-double/q79.sql b/core/src/test/resources/tpcds-queries/q79.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q79.sql rename to core/src/test/resources/tpcds-queries/q79.sql diff --git a/core/src/test/resources/tpcds-queries-double/q8.sql b/core/src/test/resources/tpcds-queries/q8.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q8.sql rename to core/src/test/resources/tpcds-queries/q8.sql diff --git a/core/src/test/resources/tpcds-queries-double/q80.sql b/core/src/test/resources/tpcds-queries/q80.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q80.sql rename to core/src/test/resources/tpcds-queries/q80.sql diff --git a/core/src/test/resources/tpcds-queries-double/q81.sql b/core/src/test/resources/tpcds-queries/q81.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q81.sql rename to core/src/test/resources/tpcds-queries/q81.sql diff --git a/core/src/test/resources/tpcds-queries-double/q82.sql b/core/src/test/resources/tpcds-queries/q82.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q82.sql rename to core/src/test/resources/tpcds-queries/q82.sql diff --git a/core/src/test/resources/tpcds-queries-double/q83.sql b/core/src/test/resources/tpcds-queries/q83.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q83.sql rename to core/src/test/resources/tpcds-queries/q83.sql diff --git a/core/src/test/resources/tpcds-queries-double/q84.sql b/core/src/test/resources/tpcds-queries/q84.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q84.sql rename to core/src/test/resources/tpcds-queries/q84.sql diff --git a/core/src/test/resources/tpcds-queries-double/q85.sql b/core/src/test/resources/tpcds-queries/q85.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q85.sql rename to core/src/test/resources/tpcds-queries/q85.sql diff --git a/core/src/test/resources/tpcds-queries-double/q86.sql b/core/src/test/resources/tpcds-queries/q86.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q86.sql rename to core/src/test/resources/tpcds-queries/q86.sql diff --git a/core/src/test/resources/tpcds-queries-double/q87.sql b/core/src/test/resources/tpcds-queries/q87.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q87.sql rename to core/src/test/resources/tpcds-queries/q87.sql diff --git a/core/src/test/resources/tpcds-queries-double/q88.sql b/core/src/test/resources/tpcds-queries/q88.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q88.sql rename to core/src/test/resources/tpcds-queries/q88.sql diff --git a/core/src/test/resources/tpcds-queries-double/q89.sql b/core/src/test/resources/tpcds-queries/q89.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q89.sql rename to core/src/test/resources/tpcds-queries/q89.sql diff --git a/core/src/test/resources/tpcds-queries-double/q9.sql b/core/src/test/resources/tpcds-queries/q9.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q9.sql rename to core/src/test/resources/tpcds-queries/q9.sql diff --git a/core/src/test/resources/tpcds-queries-double/q90.sql b/core/src/test/resources/tpcds-queries/q90.sql similarity index 91% rename from core/src/test/resources/tpcds-queries-double/q90.sql rename to core/src/test/resources/tpcds-queries/q90.sql index 2ecf7d571..85e35bf8b 100755 --- a/core/src/test/resources/tpcds-queries-double/q90.sql +++ b/core/src/test/resources/tpcds-queries/q90.sql @@ -1,4 +1,4 @@ -SELECT cast(amc AS DOUBLE) / cast(pmc AS DOUBLE) am_pm_ratio +SELECT cast(amc AS DECIMAL(15, 4)) / cast(pmc AS DECIMAL(15, 4)) am_pm_ratio FROM (SELECT count(*) amc FROM web_sales, household_demographics, time_dim, web_page WHERE ws_sold_time_sk = time_dim.t_time_sk diff --git a/core/src/test/resources/tpcds-queries-double/q91.sql b/core/src/test/resources/tpcds-queries/q91.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q91.sql rename to core/src/test/resources/tpcds-queries/q91.sql diff --git a/core/src/test/resources/tpcds-queries-double/q92.sql b/core/src/test/resources/tpcds-queries/q92.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q92.sql rename to core/src/test/resources/tpcds-queries/q92.sql diff --git a/core/src/test/resources/tpcds-queries-double/q93.sql b/core/src/test/resources/tpcds-queries/q93.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q93.sql rename to core/src/test/resources/tpcds-queries/q93.sql diff --git a/core/src/test/resources/tpcds-queries-double/q94.sql b/core/src/test/resources/tpcds-queries/q94.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q94.sql rename to core/src/test/resources/tpcds-queries/q94.sql diff --git a/core/src/test/resources/tpcds-queries-double/q95.sql b/core/src/test/resources/tpcds-queries/q95.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q95.sql rename to core/src/test/resources/tpcds-queries/q95.sql diff --git a/core/src/test/resources/tpcds-queries-double/q96.sql b/core/src/test/resources/tpcds-queries/q96.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q96.sql rename to core/src/test/resources/tpcds-queries/q96.sql diff --git a/core/src/test/resources/tpcds-queries-double/q97.sql b/core/src/test/resources/tpcds-queries/q97.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q97.sql rename to core/src/test/resources/tpcds-queries/q97.sql diff --git a/core/src/test/resources/tpcds-queries-double/q98.sql b/core/src/test/resources/tpcds-queries/q98.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q98.sql rename to core/src/test/resources/tpcds-queries/q98.sql diff --git a/core/src/test/resources/tpcds-queries-double/q99.sql b/core/src/test/resources/tpcds-queries/q99.sql similarity index 100% rename from core/src/test/resources/tpcds-queries-double/q99.sql rename to core/src/test/resources/tpcds-queries/q99.sql diff --git a/core/src/test/resources/tpch-queries-double/q1.sql b/core/src/test/resources/tpch-queries/q1.sql similarity index 100% rename from core/src/test/resources/tpch-queries-double/q1.sql rename to core/src/test/resources/tpch-queries/q1.sql diff --git a/core/src/test/resources/tpch-queries-double/q10.sql b/core/src/test/resources/tpch-queries/q10.sql similarity index 100% rename from core/src/test/resources/tpch-queries-double/q10.sql rename to core/src/test/resources/tpch-queries/q10.sql diff --git a/core/src/test/resources/tpch-queries-double/q11.sql b/core/src/test/resources/tpch-queries/q11.sql similarity index 100% rename from core/src/test/resources/tpch-queries-double/q11.sql rename to core/src/test/resources/tpch-queries/q11.sql diff --git a/core/src/test/resources/tpch-queries-double/q12.sql b/core/src/test/resources/tpch-queries/q12.sql similarity index 100% rename from core/src/test/resources/tpch-queries-double/q12.sql rename to core/src/test/resources/tpch-queries/q12.sql diff --git a/core/src/test/resources/tpch-queries-double/q13.sql b/core/src/test/resources/tpch-queries/q13.sql similarity index 100% rename from core/src/test/resources/tpch-queries-double/q13.sql rename to core/src/test/resources/tpch-queries/q13.sql diff --git a/core/src/test/resources/tpch-queries-double/q14.sql b/core/src/test/resources/tpch-queries/q14.sql similarity index 100% rename from core/src/test/resources/tpch-queries-double/q14.sql rename to core/src/test/resources/tpch-queries/q14.sql diff --git a/core/src/test/resources/tpch-queries-double/q15.sql b/core/src/test/resources/tpch-queries/q15.sql similarity index 100% rename from core/src/test/resources/tpch-queries-double/q15.sql rename to core/src/test/resources/tpch-queries/q15.sql diff --git a/core/src/test/resources/tpch-queries-double/q16.sql b/core/src/test/resources/tpch-queries/q16.sql similarity index 100% rename from core/src/test/resources/tpch-queries-double/q16.sql rename to core/src/test/resources/tpch-queries/q16.sql diff --git a/core/src/test/resources/tpch-queries-double/q17.sql b/core/src/test/resources/tpch-queries/q17.sql similarity index 100% rename from core/src/test/resources/tpch-queries-double/q17.sql rename to core/src/test/resources/tpch-queries/q17.sql diff --git a/core/src/test/resources/tpch-queries-double/q18.sql b/core/src/test/resources/tpch-queries/q18.sql similarity index 100% rename from core/src/test/resources/tpch-queries-double/q18.sql rename to core/src/test/resources/tpch-queries/q18.sql diff --git a/core/src/test/resources/tpch-queries-double/q19.sql b/core/src/test/resources/tpch-queries/q19.sql similarity index 100% rename from core/src/test/resources/tpch-queries-double/q19.sql rename to core/src/test/resources/tpch-queries/q19.sql diff --git a/core/src/test/resources/tpch-queries-double/q2.sql b/core/src/test/resources/tpch-queries/q2.sql similarity index 100% rename from core/src/test/resources/tpch-queries-double/q2.sql rename to core/src/test/resources/tpch-queries/q2.sql diff --git a/core/src/test/resources/tpch-queries-double/q20.sql b/core/src/test/resources/tpch-queries/q20.sql similarity index 92% rename from core/src/test/resources/tpch-queries-double/q20.sql rename to core/src/test/resources/tpch-queries/q20.sql index 472a3b8c5..e161d340b 100644 --- a/core/src/test/resources/tpch-queries-double/q20.sql +++ b/core/src/test/resources/tpch-queries/q20.sql @@ -23,7 +23,7 @@ where ) and ps_availqty > ( select - 0.5 * sum(CAST(l_quantity AS DOUBLE)) + 0.5 * sum(l_quantity) from lineitem where diff --git a/core/src/test/resources/tpch-queries-double/q21.sql b/core/src/test/resources/tpch-queries/q21.sql similarity index 100% rename from core/src/test/resources/tpch-queries-double/q21.sql rename to core/src/test/resources/tpch-queries/q21.sql diff --git a/core/src/test/resources/tpch-queries-double/q22.sql b/core/src/test/resources/tpch-queries/q22.sql similarity index 100% rename from core/src/test/resources/tpch-queries-double/q22.sql rename to core/src/test/resources/tpch-queries/q22.sql diff --git a/core/src/test/resources/tpch-queries-double/q3.sql b/core/src/test/resources/tpch-queries/q3.sql similarity index 100% rename from core/src/test/resources/tpch-queries-double/q3.sql rename to core/src/test/resources/tpch-queries/q3.sql diff --git a/core/src/test/resources/tpch-queries-double/q4.sql b/core/src/test/resources/tpch-queries/q4.sql similarity index 100% rename from core/src/test/resources/tpch-queries-double/q4.sql rename to core/src/test/resources/tpch-queries/q4.sql diff --git a/core/src/test/resources/tpch-queries-double/q5.sql b/core/src/test/resources/tpch-queries/q5.sql similarity index 100% rename from core/src/test/resources/tpch-queries-double/q5.sql rename to core/src/test/resources/tpch-queries/q5.sql diff --git a/core/src/test/resources/tpch-queries-double/q6.sql b/core/src/test/resources/tpch-queries/q6.sql similarity index 100% rename from core/src/test/resources/tpch-queries-double/q6.sql rename to core/src/test/resources/tpch-queries/q6.sql diff --git a/core/src/test/resources/tpch-queries-double/q7.sql b/core/src/test/resources/tpch-queries/q7.sql similarity index 100% rename from core/src/test/resources/tpch-queries-double/q7.sql rename to core/src/test/resources/tpch-queries/q7.sql diff --git a/core/src/test/resources/tpch-queries-double/q8.sql b/core/src/test/resources/tpch-queries/q8.sql similarity index 100% rename from core/src/test/resources/tpch-queries-double/q8.sql rename to core/src/test/resources/tpch-queries/q8.sql diff --git a/core/src/test/resources/tpch-queries-double/q9.sql b/core/src/test/resources/tpch-queries/q9.sql similarity index 100% rename from core/src/test/resources/tpch-queries-double/q9.sql rename to core/src/test/resources/tpch-queries/q9.sql diff --git a/core/src/test/scala/com/intel/oap/tpc/ds/TPCDSSuite.scala b/core/src/test/scala/com/intel/oap/tpc/ds/TPCDSSuite.scala index b1382a78d..b9cec3729 100644 --- a/core/src/test/scala/com/intel/oap/tpc/ds/TPCDSSuite.scala +++ b/core/src/test/scala/com/intel/oap/tpc/ds/TPCDSSuite.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.test.SharedSparkSession class TPCDSSuite extends QueryTest with SharedSparkSession { private val MAX_DIRECT_MEMORY = "6g" - private val TPCDS_QUERIES_RESOURCE = "tpcds-queries-double" + private val TPCDS_QUERIES_RESOURCE = "tpcds-queries" private val TPCDS_WRITE_PATH = "/tmp/tpcds-generated" private var runner: TPCRunner = _ diff --git a/core/src/test/scala/com/intel/oap/tpc/h/TPCHSuite.scala b/core/src/test/scala/com/intel/oap/tpc/h/TPCHSuite.scala index 7776a2b34..81a55569b 100644 --- a/core/src/test/scala/com/intel/oap/tpc/h/TPCHSuite.scala +++ b/core/src/test/scala/com/intel/oap/tpc/h/TPCHSuite.scala @@ -41,7 +41,7 @@ import scala.collection.mutable.ArrayBuffer class TPCHSuite extends QueryTest with SharedSparkSession { private val MAX_DIRECT_MEMORY = "6g" - private val TPCH_QUERIES_RESOURCE = "tpch-queries-double" + private val TPCH_QUERIES_RESOURCE = "tpch-queries" private val TPCH_WRITE_PATH = "/tmp/tpch-generated" private var runner: TPCRunner = _ From 39664175e1bf24e62d61c596e1d6e7ff01a85497 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Thu, 11 Mar 2021 10:25:55 +0800 Subject: [PATCH 5/6] fix --- .../main/scala/com/intel/oap/execution/ColumnarWindowExec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/com/intel/oap/execution/ColumnarWindowExec.scala b/core/src/main/scala/com/intel/oap/execution/ColumnarWindowExec.scala index 9414e87d6..54eefc3eb 100644 --- a/core/src/main/scala/com/intel/oap/execution/ColumnarWindowExec.scala +++ b/core/src/main/scala/com/intel/oap/execution/ColumnarWindowExec.scala @@ -313,7 +313,7 @@ object ColumnarWindowExec { ex.withNewChildren(ex.children.map(makeOutputProject(_, windows, inputProjects))) } // forcibly cast to original type against possible rewriting - val casted = if (sameType(ex.dataType, out.dataType)) { + val casted = if (sameType(out.dataType, ex.dataType)) { out } else { Cast(out, ex.dataType) From 0c6c78046c8c7b58ca467a9cdabeb8276d0fb517 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Thu, 11 Mar 2021 10:34:01 +0800 Subject: [PATCH 6/6] fix --- .../intel/oap/execution/ColumnarWindowExec.scala | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/com/intel/oap/execution/ColumnarWindowExec.scala b/core/src/main/scala/com/intel/oap/execution/ColumnarWindowExec.scala index 54eefc3eb..e59a732dd 100644 --- a/core/src/main/scala/com/intel/oap/execution/ColumnarWindowExec.scala +++ b/core/src/main/scala/com/intel/oap/execution/ColumnarWindowExec.scala @@ -294,7 +294,7 @@ object ColumnarWindowExec { def sameType(from: DataType, to: DataType): Boolean = { if (from == null || to == null) { - return false + throw new IllegalArgumentException("null type found during type enforcement") } if (from == to) { return true @@ -313,10 +313,16 @@ object ColumnarWindowExec { ex.withNewChildren(ex.children.map(makeOutputProject(_, windows, inputProjects))) } // forcibly cast to original type against possible rewriting - val casted = if (sameType(out.dataType, ex.dataType)) { - out - } else { - Cast(out, ex.dataType) + val casted = try { + if (sameType(out.dataType, ex.dataType)) { + out + } else { + Cast(out, ex.dataType) + } + } catch { + case t: Throwable => + System.err.println("Warning: " + t.getMessage) + Cast(out, ex.dataType) } casted }