From 8fc6f51e3fce3abf761e68e792e993dac4a96910 Mon Sep 17 00:00:00 2001 From: Xi Liu Date: Thu, 15 May 2014 13:06:22 -0700 Subject: [PATCH 1/3] [SparkSQL] allow UDF on struct --- .../src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala index d50e2c65b7b36..f4359f4cd6061 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala @@ -320,6 +320,9 @@ private[hive] trait HiveInspectors { case BinaryType => PrimitiveObjectInspectorFactory.javaByteArrayObjectInspector case TimestampType => PrimitiveObjectInspectorFactory.javaTimestampObjectInspector case DecimalType => PrimitiveObjectInspectorFactory.javaHiveDecimalObjectInspector + case StructType(fields) => + ObjectInspectorFactory.getStandardStructObjectInspector( + fields.map(f => f.name), fields.map(f => toInspector(f.dataType))) } def inspectorToDataType(inspector: ObjectInspector): DataType = inspector match { From 354386aebfb2ce5928718b69e7a5ff458cbec849 Mon Sep 17 00:00:00 2001 From: Xi Liu Date: Wed, 21 May 2014 18:16:25 -0700 Subject: [PATCH 2/3] [Spark SQL] add test suite for UDF on struct --- .../resources/data/files/testUdf/part-00000 | Bin 0 -> 153 bytes .../sql/hive/execution/HiveUdfSuite.scala | 125 ++++++++++++++++++ 2 files changed, 125 insertions(+) create mode 100755 sql/hive/src/test/resources/data/files/testUdf/part-00000 create mode 100644 sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala diff --git a/sql/hive/src/test/resources/data/files/testUdf/part-00000 b/sql/hive/src/test/resources/data/files/testUdf/part-00000 new file mode 100755 index 0000000000000000000000000000000000000000..240a5c1a63c5c4016d096cbd13ddc8b787aee8da GIT binary patch literal 153 zcmWG`4P;ZyFG|--EJ#ewNY%?oOv%qL(96u%^DE8C2`|blNleN~)j?8GT##6ltyf%_ zqnD9cma3Opk(yjul9`{U7m`|B5|Ef#!~h0IwrxAkdN!tuT_U@F&YjICfr1 + |) + |PARTITIONED BY (partition STRING) + |ROW FORMAT SERDE '%s' + |STORED AS SEQUENCEFILE + """.stripMargin.format(classOf[PairSerDe].getName) + ) + + TestHive.hql( + "ALTER TABLE hiveUdfTestTable ADD IF NOT EXISTS PARTITION(partition='testUdf') LOCATION '%s'" + .format(this.getClass.getClassLoader.getResource("data/files/testUdf").getFile) + ) + + TestHive.hql("CREATE TEMPORARY FUNCTION testUdf AS '%s'".format(classOf[PairUdf].getName)) + + TestHive.hql("SELECT testUdf(pair) FROM hiveUdfTestTable") +} + +class TestPair(x: Int, y: Int) extends Writable with Serializable { + def this() = this(0, 0) + var entry: (Int, Int) = (x, y) + + override def write(output: DataOutput): Unit = { + output.writeInt(entry._1) + output.writeInt(entry._2) + } + + override def readFields(input: DataInput): Unit = { + val x = input.readInt() + val y = input.readInt() + entry = (x, y) + } +} + +class PairSerDe extends AbstractSerDe { + override def initialize(p1: Configuration, p2: Properties): Unit = {} + + override def getObjectInspector: ObjectInspector = { + ObjectInspectorFactory + .getStandardStructObjectInspector( + Seq("pair"), + Seq(ObjectInspectorFactory.getStandardStructObjectInspector( + Seq("id", "value"), + Seq(PrimitiveObjectInspectorFactory.javaIntObjectInspector, + PrimitiveObjectInspectorFactory.javaIntObjectInspector)) + )) + } + + override def getSerializedClass: Class[_ <: Writable] = classOf[TestPair] + + override def getSerDeStats: SerDeStats = null + + override def serialize(p1: scala.Any, p2: ObjectInspector): Writable = null + + override def deserialize(value: Writable): AnyRef = { + val pair = value.asInstanceOf[TestPair] + + val row = new util.ArrayList[util.ArrayList[AnyRef]] + row.add(new util.ArrayList[AnyRef](2)) + row(0).add(Integer.valueOf(pair.entry._1)) + row(0).add(Integer.valueOf(pair.entry._2)) + + row + } +} + +class PairUdf extends GenericUDF { + override def initialize(p1: Array[ObjectInspector]): ObjectInspector = + ObjectInspectorFactory.getStandardStructObjectInspector( + Seq("id", "value"), + Seq(PrimitiveObjectInspectorFactory.javaIntObjectInspector, PrimitiveObjectInspectorFactory.javaIntObjectInspector) + ) + + override def evaluate(args: Array[DeferredObject]): AnyRef = { + println("Type = %s".format(args(0).getClass.getName)) + Integer.valueOf(args(0).get.asInstanceOf[TestPair].entry._2) + } + + override def getDisplayString(p1: Array[String]): String = "" +} + + + From 328dfc45ab8f1f5ea131b72f92e9b7b8223b3a7b Mon Sep 17 00:00:00 2001 From: Xi Liu Date: Thu, 12 Jun 2014 09:45:03 -0700 Subject: [PATCH 3/3] [Spark SQL] remove a temporary function after test --- .../org/apache/spark/sql/hive/execution/HiveUdfSuite.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala index e54de15188adf..a9e3f42a3adfc 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala @@ -56,6 +56,8 @@ class HiveUdfSuite extends HiveComparisonTest { TestHive.hql("CREATE TEMPORARY FUNCTION testUdf AS '%s'".format(classOf[PairUdf].getName)) TestHive.hql("SELECT testUdf(pair) FROM hiveUdfTestTable") + + TestHive.hql("DROP TEMPORARY FUNCTION IF EXISTS testUdf") } class TestPair(x: Int, y: Int) extends Writable with Serializable {