Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

[NSE-394]Support ColumnarArrowEvalPython operator #395

Merged
merged 4 commits into from
Jul 8, 2021

Conversation

xuechendi
Copy link
Collaborator

Main functionality is working, UT is added, jupyter script as below

import findspark
findspark.init()

import os
from pyspark.sql import *
from pyspark import *
import pyspark.sql.functions as F
from pyspark.sql.types import *

native_sql_path = "/mnt/nvme2/chendi/intel-bigdata/OAP/native-sql-engine/native-sql-engine/core/target/spark-columnar-core-1.2.0-snapshot-jar-with-dependencies.jar"
native_arrow_datasource_path = "/mnt/nvme2/chendi/intel-bigdata/OAP/native-sql-engine/arrow-data-source/standard/target/spark-arrow-datasource-standard-1.2.0-snapshot-jar-with-dependencies.jar"

spark = SparkSession.builder.master('yarn')\
        .appName("udf_test")\
        .config("spark.executorEnv.LD_LIBRARY_PATH", "/usr/local/lib64/")\
        .config("spark.driver.extraClassPath", 
                f"{native_sql_path}:{native_arrow_datasource_path}")\
        .config("spark.executor.extraClassPath",
                f"{native_sql_path}:{native_arrow_datasource_path}")\
        .config("spark.sql.extensions", "com.intel.oap.ColumnarPlugin")\
        .config("spark.shuffle.manager", "org.apache.spark.shuffle.sort.ColumnarShuffleManager")\
        .config("spark.executor.memory", "10g")\
        .config("spark.executor.memoryOverhead", "16g")\
        .config("spark.memory.offHeap.use", "true")\
        .config("spark.memory.offHeap.size", "12G")\
        .config("spark.executor.extraJavaOptions", "-XX:MaxDirectMemorySize=25G")\
        .getOrCreate()
from pyspark.sql.functions import pandas_udf, PandasUDFType
@pandas_udf('long')
def pandas_plus_one(v):
    return (v + 1)

df = spark.read.format("arrow").load("/recsys2021_0608")
dict_df = spark.read.parquet("/recsys2021_0608_processed/recsys_dicts/language")
df = df.select("language", "tweet_timestamp")
df = df.join(dict_df.withColumnRenamed('dict_col', 'language'), 'language', 'left')
df = df.withColumn("processed_tweet_timestamp", pandas_plus_one(F.col("tweet_timestamp")))
#df = df.withColumn("processed_tweet_timestamp", pandas_plus_one(F.col("tweet_timestamp")+F.lit(1)))
%time df.show()

@github-actions
Copy link

github-actions bot commented Jul 6, 2021

#394

Support Arrow UDF
ColumnarArrowEvalPythonExec is basically runnable

Signed-off-by: Chendi Xue <chendi.xue@intel.com>
Still need to re-walk for non-project input data retain and close

Signed-off-by: Chendi Xue <chendi.xue@intel.com>
spark.oap.sql.columnar.arrowudf = false

Signed-off-by: Chendi Xue <chendi.xue@intel.com>
Signed-off-by: Chendi Xue <chendi.xue@intel.com>
@xuechendi xuechendi force-pushed the wip_arrow_udf_no_conversion branch from dc832f4 to 750db13 Compare July 6, 2021 08:41
@xuechendi
Copy link
Collaborator Author

string test

from pyspark.sql.functions import pandas_udf, PandasUDFType
@pandas_udf('string')
def pd_get_first(v):
    return v.str.split('\t').str[0]

df = spark.read.format("arrow").load("/recsys2021_0608")
dict_df = spark.read.parquet("/recsys2021_0608_processed/recsys_dicts/language")
df = df.select("language", "present_domains")
df = df.join(dict_df.withColumnRenamed('dict_col', 'language'), 'language', 'left')
df = df.withColumn("processed_present_domains", pd_get_first(F.col("present_domains")))
%time df.select('processed_present_domains').distinct().show()

@zhouyuan zhouyuan merged commit 9c42e25 into oap-project:master Jul 8, 2021
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants