Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

when can support hive user defined udf functions? #940

Closed
zwx109473 opened this issue May 26, 2022 · 16 comments
Closed

when can support hive user defined udf functions? #940

zwx109473 opened this issue May 26, 2022 · 16 comments
Labels
enhancement New feature or request

Comments

@zwx109473
Copy link

We are using this native-sql-engine plugin, but in real scenarios, the built-in udf function of spark cannot meet the business requirements.
The sql statement contains many user defined udf functions, which causes the operator to fall back and the performance does not improve significantly.
Please do we have a solution to this problem? program or plan?

@zwx109473 zwx109473 added the enhancement New feature or request label May 26, 2022
@PHILO-HE
Copy link
Collaborator

Hi, @zwx109473, recently, we have added the support for a hive UDF. It works well.
Please refer to #925. And we will document how to support a hive UDF soon.

@PHILO-HE
Copy link
Collaborator

PHILO-HE commented Jun 1, 2022

We have drafted a doc to summarize how to support hive UDF in Gazelle. See #945.

@PHILO-HE
Copy link
Collaborator

PHILO-HE commented Jun 7, 2022

Do you mean our implemented columnar UrlDecoder doesn't actually replace the original UDF at runtime? You can verify it by checking whether there is a columnar projection in spark DAG for your test cases.

@zwx109473
Copy link
Author

zwx109473 commented Jun 8, 2022

Thank you for your help @PHILO-HE
I've solved these problems, but I've found a new problem,when I create a permanent function as follows
spark.sql(CREATE FUNCTION UrlDecoder AS 'com.zl.udf.UrlDecoderUDF' USING JAR 'hdfs://tmp/test_udf.jar';")
spark.sql("select UrlDecoder(name) from default.zl").show;
This udf does not support columnar.
How do I modify the ColumnarUDF.scala and arrow/gandiva?
Can you give an example of a permanent function?

@PHILO-HE
Copy link
Collaborator

PHILO-HE commented Jun 8, 2022

Yes, the current code has not considered permanent UDF. I just found that permanent UDF will be renamed as {YourDatabaseName}.UrlDecoder in your case, not UrlDecoder as we know for temporary UDF.
To let it be supported in Gazelle, you can just change the below two lines by replacing "urldecoder" with "default.urldecoder", assuming default database is used. If you are using a user-created database, {YourDatabaeName}.urldecoder (in lower case) should be used instead. There should be no other code change required. Please have a try.
Line90, Line102

@zwx109473
Copy link
Author

Thank you for your answer. I managed to get through. @PHILO-HE

@zwx109473
Copy link
Author

zwx109473 commented Jun 11, 2022

Hi, @PHILO-HE .
I want to add a UDF function isempty, which accepts a string, determines whether it is an empty string, and returns a Boolean value.
My doColumnarCodeGen is like this:

override def doColumnarCodeGen(args: Object): (TreeNode, ArrowType) = {
val (inputNode, _): (TreeNode, ArrowType) =
input.asInstanceOf[ColumnarExpression].doColumnarCodeGen(args)
val resultType = new ArrowType.Bool()
val funcNode =
TreeBuilder.makeFunction(
"is_empty",
Lists.newArrayList(inputNode),
resultType)
(funcNode, resultType)
}

Other methods are written by referring to the urldecoder function.
string_ops.cc add the following methods:

FORCE_INLINE
const bool is_empty(gdv_int64 context, const char* input, gdv_int32 input_len, gdv_int32* out_len) {
if (input_len == 0 || input == nullptr) {
return true;
}
return false;
}

function_registry_string.cc add the following methods:

NativeFunction("is_empty", {}, DataTypeVector{utf8()}, utf8(),
kResultNullIfNull, "is_empty",
NativeFunction::kNeedsContext | NativeFunction::kCanReturnErrors)};

The spark plan is as follows:

== Physical Plan ==
CollectLimit (5)
+- ArrowColumnarToRow (4)
+- ColumnarConditionProject (3)
+- ArrowRowToColumnarExec (2)
+- Scan hive default.zl (1)
(1) Scan hive default.zl
Output [1]: [name#40]
Arguments: [name#40], HiveTableRelation [default.zl, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [id#39, name#40], Partition Cols: []]
(2) ArrowRowToColumnarExec
Input [1]: [name#40]
(3) ColumnarConditionProject
Input [1]: [name#40]
Arguments: [cast(HiveSimpleUDF#com.zl.udf.common.IsEmptyUDF(name#40) as string) AS default.isempty(name)#44]
(4) ArrowColumnarToRow
Input [1]: [default.isempty(name)#44]
(5) CollectLimit
Input [1]: [default.isempty(name)#44]
Arguments: 21

However, an error is reported during the operation. The error is “org.apache.arrow.gandiva.exceptions.GandivaException: Failed to make LLVM module due to Function bool is_empty(string) not supported yet.”
The executor reports an error lisk this

ERROR ColumnarConditionProjector:
originalInputAttributes is ArrayBuffer(name#1) ArrayBuffer(StringType),
projectionSchema is Schema<c_0: Utf8>,
resultSchema is Schema<result: Utf8>,
Projection is List(root {
fnNode {
functionName: "castVARCHAR"
inArgs {
fnNode {
functionName: "is_empty"
inArgs {
fieldNode {
field {
name: "c_0"
type {
type: UTF8
}
nullable: true
}
}
}
returnType {
type: BOOL
}
}
}
inArgs {
longNode {
value: 10
}
}
returnType {
type: UTF8
}
}
}
resultType {
name: "result"
type {
type: UTF8
}
nullable: true
}
)
22/06/11 15:35:50 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
Failed to make LLVM module due to Function bool is_empty(string) not supported yet.

at org.apache.arrow.gandiva.evaluator.JniWrapper.buildProjector(Native Method)
at org.apache.arrow.gandiva.evaluator.Projector.make(Projector.java:199)
at org.apache.arrow.gandiva.evaluator.Projector.make(Projector.java:78)
at com.intel.oap.expression.ColumnarConditionProjector$FieldOptimizedProjector.(ColumnarConditionProjector.scala:537)
at com.intel.oap.expression.ColumnarConditionProjector.createProjector(ColumnarConditionProjector.scala:158)
at com.intel.oap.expression.ColumnarConditionProjector.(ColumnarConditionProjector.scala:114)
at com.intel.oap.expression.ColumnarConditionProjector$.create(ColumnarConditionProjector.scala:472)
at com.intel.oap.execution.ColumnarConditionProjectExec.$anonfun$doExecuteColumnar$1(ColumnarBasicPhysicalOperators.scala:262)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:863)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:863)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

Why can't my functions be found by the FunctionRegistry?

@PHILO-HE
Copy link
Collaborator

In function_registry_string.cc, ***DataTypeVector{utf8()}, utf8()** should be changed to ***DataTypeVector{utf8()}, boolean()** to indicate the return type is boolean.

BTW, please note, to test your changes in arrow, you should change the arrow branch to your own's working branch for arrow in Line#65 of arrow-data-source/script/build_arrow.sh of Gazelle. And use -Dbuild_arrow=ON in compiling Gazelle.

@zwx109473
Copy link
Author

Hi,@PHILO-HE ,I made a correction according to your guidance, and a new error occurred.

Both operands to ICmp instruction are not of the same type!
  %10 = icmp eq i64 %context_ptr, i8* null
Both operands to ICmp instruction are not of the same type!
  %15 = icmp eq i8* %14, i32 0
Both operands to ICmp instruction are not of the same type!
  %35 = icmp eq i8* %34, i32 0
22/06/13 11:07:23 ERROR ColumnarConditionProjector: 
originalInputAttributes is ArrayBuffer(name#11) ArrayBuffer(StringType), 
projectionSchema is Schema<c_0: Utf8>, 
resultSchema is Schema<result: Utf8>, 
Projection is List(root {
  fnNode {
    functionName: "castVARCHAR"
    inArgs {
      fnNode {
        functionName: "is_empty"
        inArgs {
          fieldNode {
            field {
              name: "c_0"
              type {
                type: UTF8
              }
              nullable: true
            }
          }
        }
        returnType {
          type: BOOL
        }
      }
    }
    inArgs {
      longNode {
        value: 10
      }
    }
    returnType {
      type: UTF8
    }
  }
}
resultType {
  name: "result"
  type {
    type: UTF8
  }
  nullable: true
}
)
22/06/13 11:07:23 ERROR Executor: Exception in task 0.3 in stage 2.0 (TID 5)
Failed to make LLVM module due to Module verification failed after optimizer

	at org.apache.arrow.gandiva.evaluator.JniWrapper.buildProjector(Native Method)
	at org.apache.arrow.gandiva.evaluator.Projector.make(Projector.java:199)
	at org.apache.arrow.gandiva.evaluator.Projector.make(Projector.java:78)
	at com.intel.oap.expression.ColumnarConditionProjector$FieldOptimizedProjector.<init>(ColumnarConditionProjector.scala:537)
	at com.intel.oap.expression.ColumnarConditionProjector.createProjector(ColumnarConditionProjector.scala:158)
	at com.intel.oap.expression.ColumnarConditionProjector.<init>(ColumnarConditionProjector.scala:114)
	at com.intel.oap.expression.ColumnarConditionProjector$.create(ColumnarConditionProjector.scala:472)
	at com.intel.oap.execution.ColumnarConditionProjectExec.$anonfun$doExecuteColumnar$1(ColumnarBasicPhysicalOperators.scala:262)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:863)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:863)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

@PHILO-HE
Copy link
Collaborator

In your arrow function, gdv_int32* out_len in the below piece of code should be removed. The out_len is only applicable to the case of utf8 return type. I am not sure whether it is the root cause for the above exception. You can have a try.

const bool is_empty(***, gdv_int32* out_len)

@zwx109473
Copy link
Author

zwx109473 commented Jun 13, 2022

@PHILO-HE ,I've removed unnecessary input parameters from the function, but I haven't fixed them.
The function is as follows:
const bool is_empty(const char* input, gdv_int32 input_len) {
if (input_len == 0 || input == nullptr) {
return true;
}
return false;
}

I guess it might have something to do with the cast function!

@PHILO-HE
Copy link
Collaborator

Could you share the links for your arrow/gazelle patch?

@zwx109473
Copy link
Author

Hi,@PHILO-HE,his is my modification. Please check the problem.
oap-project/arrow@arrow-4.0.0-oap...zwx109473:arrow-4.0.0-oap
main...zwx109473:main

@zwx109473
Copy link
Author

Could you share the links for your arrow/gazelle patch?

@PHILO-HE ,are there any new developments or suggestions?

@PHILO-HE
Copy link
Collaborator

PHILO-HE commented Jun 17, 2022

Sorry for this late reply.

Please keep context in your below gandiva function, which is consistent with the specified NativeFunction::kNeedsContext attribute in function_registry_string.cc. If context is useless to you, you can remove NativeFunction::kNeedsContext attribute and meanwhile omit context argument in your is_empty function.

bool is_empty(int64_t context, const char* input, gdv_int32 input_len) {

With the above correcting, you can resolve the reported exception.

By the way, I notice you tried to handle null case inside your gandiva function. Actually, for null input, gandiva will directly return null since you specified kResultNullIfNull attribute. If you would like to handle null case in your own function, kResultNullInternal attribute should be used. With this attribute used, you will have to add extra argument(s) in boolean type to let arrow mark whether input is null and set *out_valid in your code logic to tell arrow output is null. Please refer to Arrow PR#114.

@zwx109473
Copy link
Author

@PHILO-HE ,Thank you very much for your help! I have solved problem.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

2 participants