Skip to content

Commit

Permalink
[SPARK-19104][BACKPORT-2.1][SQL] Lambda variables in ExternalMapToCat…
Browse files Browse the repository at this point in the history
…alyst should be global

## What changes were proposed in this pull request?

This PR is backport of apache#18418 to Spark 2.1. [SPARK-21391](https://issues.apache.org/jira/browse/SPARK-21391) reported this problem in Spark 2.1.

The issue happens in `ExternalMapToCatalyst`. For example, the following codes create ExternalMap`ExternalMapToCatalyst`ToCatalyst to convert Scala Map to catalyst map format.

```
val data = Seq.tabulate(10)(i => NestedData(1, Map("key" -> InnerData("name", i + 100))))
val ds = spark.createDataset(data)
```
The `valueConverter` in `ExternalMapToCatalyst` looks like:

```
if (isnull(lambdavariable(ExternalMapToCatalyst_value52, ExternalMapToCatalyst_value_isNull52, ObjectType(class org.apache.spark.sql.InnerData), true))) null else named_struct(name, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(lambdavariable(ExternalMapToCatalyst_value52, ExternalMapToCatalyst_value_isNull52, ObjectType(class org.apache.spark.sql.InnerData), true)).name, true), value, assertnotnull(lambdavariable(ExternalMapToCatalyst_value52, ExternalMapToCatalyst_value_isNull52, ObjectType(class org.apache.spark.sql.InnerData), true)).value)
```
There is a `CreateNamedStruct` expression (`named_struct`) to create a row of `InnerData.name` and `InnerData.value` that are referred by `ExternalMapToCatalyst_value52`.

Because `ExternalMapToCatalyst_value52` are local variable, when `CreateNamedStruct` splits expressions to individual functions, the local variable can't be accessed anymore.

## How was this patch tested?

Added a new test suite into `DatasetPrimitiveSuite`

Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>

Closes apache#18627 from kiszk/SPARK-21391.
  • Loading branch information
kiszk authored and cloud-fan committed Jul 18, 2017
1 parent ca4d2aa commit a9efce4
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -660,6 +660,12 @@ case class ExternalMapToCatalyst private(
val entry = ctx.freshName("entry")
val entries = ctx.freshName("entries")

val keyElementJavaType = ctx.javaType(keyType)
val valueElementJavaType = ctx.javaType(valueType)
ctx.addMutableState(keyElementJavaType, key, "")
ctx.addMutableState("boolean", valueIsNull, "")
ctx.addMutableState(valueElementJavaType, value, "")

val (defineEntries, defineKeyValue) = child.dataType match {
case ObjectType(cls) if classOf[java.util.Map[_, _]].isAssignableFrom(cls) =>
val javaIteratorCls = classOf[java.util.Iterator[_]].getName
Expand All @@ -671,8 +677,8 @@ case class ExternalMapToCatalyst private(
val defineKeyValue =
s"""
final $javaMapEntryCls $entry = ($javaMapEntryCls) $entries.next();
${ctx.javaType(keyType)} $key = (${ctx.boxedType(keyType)}) $entry.getKey();
${ctx.javaType(valueType)} $value = (${ctx.boxedType(valueType)}) $entry.getValue();
$key = (${ctx.boxedType(keyType)}) $entry.getKey();
$value = (${ctx.boxedType(valueType)}) $entry.getValue();
"""

defineEntries -> defineKeyValue
Expand All @@ -686,17 +692,17 @@ case class ExternalMapToCatalyst private(
val defineKeyValue =
s"""
final $scalaMapEntryCls $entry = ($scalaMapEntryCls) $entries.next();
${ctx.javaType(keyType)} $key = (${ctx.boxedType(keyType)}) $entry._1();
${ctx.javaType(valueType)} $value = (${ctx.boxedType(valueType)}) $entry._2();
$key = (${ctx.boxedType(keyType)}) $entry._1();
$value = (${ctx.boxedType(valueType)}) $entry._2();
"""

defineEntries -> defineKeyValue
}

val valueNullCheck = if (ctx.isPrimitiveType(valueType)) {
s"boolean $valueIsNull = false;"
s"$valueIsNull = false;"
} else {
s"boolean $valueIsNull = $value == null;"
s"$valueIsNull = $value == null;"
}

val arrayCls = classOf[GenericArrayData].getName
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ import org.apache.spark.sql.test.SharedSQLContext

case class IntClass(value: Int)

case class InnerData(name: String, value: Int)
case class NestedData(id: Int, param: Map[String, InnerData])

package object packageobject {
case class PackageClass(value: Int)
}
Expand Down Expand Up @@ -135,4 +138,9 @@ class DatasetPrimitiveSuite extends QueryTest with SharedSQLContext {
checkDataset(Seq(PackageClass(1)).toDS(), PackageClass(1))
}

test("SPARK-19104: Lambda variables in ExternalMapToCatalyst should be global") {
val data = Seq.tabulate(10)(i => NestedData(1, Map("key" -> InnerData("name", i + 100))))
val ds = spark.createDataset(data)
checkDataset(ds, data: _*)
}
}

0 comments on commit a9efce4

Please sign in to comment.