From 41d4522848610d3c8c7983157f0b4b7bded9dd94 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Wed, 13 Jun 2018 07:56:33 +0200 Subject: [PATCH 1/6] Support any types in schema DDL --- .../sql/catalyst/expressions/jsonExpressions.scala | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index 04a4eb0ffc032..db82c4e4488bd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.expressions import java.io.{ByteArrayInputStream, ByteArrayOutputStream, CharArrayWriter, InputStreamReader, StringWriter} +import scala.util.control.NonFatal import scala.util.parsing.combinator.RegexParsers import com.fasterxml.jackson.core._ @@ -747,8 +748,14 @@ case class StructsToJson( object JsonExprUtils { - def validateSchemaLiteral(exp: Expression): StructType = exp match { - case Literal(s, StringType) => CatalystSqlParser.parseTableSchema(s.toString) + def validateSchemaLiteral(exp: Expression): DataType = exp match { + case Literal(s, StringType) => + val schema = s.toString + try { + CatalystSqlParser.parseDataType(schema) + } catch { + case NonFatal(_) => CatalystSqlParser.parseTableSchema(schema) + } case e => throw new AnalysisException(s"Expected a string literal instead of $e") } From f824f1651999f0ba8919d4b8d29329eb1f538237 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Wed, 13 Jun 2018 07:56:57 +0200 Subject: [PATCH 2/6] SQL tests for from_json --- .../sql-tests/inputs/json-functions.sql | 4 ++++ .../sql-tests/results/json-functions.sql.out | 18 +++++++++++++++++- 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/sql/core/src/test/resources/sql-tests/inputs/json-functions.sql b/sql/core/src/test/resources/sql-tests/inputs/json-functions.sql index fea069eac4d48..dc15d13cd1dd3 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/json-functions.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/json-functions.sql @@ -31,3 +31,7 @@ CREATE TEMPORARY VIEW jsonTable(jsonField, a) AS SELECT * FROM VALUES ('{"a": 1, SELECT json_tuple(jsonField, 'b', CAST(NULL AS STRING), a) FROM jsonTable; -- Clean up DROP VIEW IF EXISTS jsonTable; + +-- from_json - complex types +select from_json('{"a":1, "b":2}', 'map'); +select from_json('{"a":1, "b":"2"}', 'struct'); diff --git a/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out b/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out index 14a69128ffb41..2b3288dc5a137 100644 --- a/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out @@ -1,5 +1,5 @@ -- Automatically generated by SQLQueryTestSuite --- Number of queries: 26 +-- Number of queries: 28 -- !query 0 @@ -258,3 +258,19 @@ DROP VIEW IF EXISTS jsonTable struct<> -- !query 25 output + + +-- !query 26 +select from_json('{"a":1, "b":2}', 'map') +-- !query 26 schema +struct> +-- !query 26 output +{"a":1,"b":2} + + +-- !query 27 +select from_json('{"a":1, "b":"2"}', 'struct') +-- !query 27 schema +struct> +-- !query 27 output +{"a":1,"b":"2"} From 08a01223354cf44174653996dae936aa09bf340d Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Wed, 13 Jun 2018 08:47:46 +0200 Subject: [PATCH 3/6] Support any DataType as schema for from_json --- .../main/scala/org/apache/spark/sql/types/DataType.scala | 3 +++ .../src/main/scala/org/apache/spark/sql/functions.scala | 7 ++++++- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala index 0bef11659fc9e..8a441dbb8e696 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala @@ -26,6 +26,7 @@ import org.json4s.jackson.JsonMethods._ import org.apache.spark.annotation.InterfaceStability import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.internal.SQLConf import org.apache.spark.util.Utils @@ -110,6 +111,8 @@ abstract class DataType extends AbstractDataType { @InterfaceStability.Stable object DataType { + def fromDDL(ddl: String): DataType = CatalystSqlParser.parseDataType(ddl) + def fromJson(json: String): DataType = parseDataType(parse(json)) private val nonDecimalNameToType = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index 87bd7b3b0f9c6..1f678f7e6f14f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -3369,7 +3369,12 @@ object functions { val dataType = try { DataType.fromJson(schema) } catch { - case NonFatal(_) => StructType.fromDDL(schema) + case NonFatal(_) => + try { + StructType.fromDDL(schema) + } catch { + case NonFatal(_) => DataType.fromDDL(schema) + } } from_json(e, dataType, options) } From 41ad77ee74265a170191203bf0330a7c7b3b384d Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Wed, 13 Jun 2018 08:48:40 +0200 Subject: [PATCH 4/6] Test for MapType in PySpark's from_json --- python/pyspark/sql/functions.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index a5e3384e802b8..e6346691fb1d4 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -2168,8 +2168,7 @@ def from_json(col, schema, options={}): [Row(json=Row(a=1))] >>> df.select(from_json(df.value, "a INT").alias("json")).collect() [Row(json=Row(a=1))] - >>> schema = MapType(StringType(), IntegerType()) - >>> df.select(from_json(df.value, schema).alias("json")).collect() + >>> df.select(from_json(df.value, "MAP").alias("json")).collect() [Row(json={u'a': 1})] >>> data = [(1, '''[{"a": 1}]''')] >>> schema = ArrayType(StructType([StructField("a", IntegerType())])) From 5d53ec77f022a17a1ffb5c77937a32b3a32cea63 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Wed, 13 Jun 2018 08:53:35 +0200 Subject: [PATCH 5/6] Test for MapType in DDL as the root type for from_json --- .../test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala index 055e1fc5640f3..7bf17cbcd9c97 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala @@ -354,8 +354,8 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext { test("SPARK-24027: from_json - map>") { val in = Seq("""{"a": {"b": 1}}""").toDS() - val schema = MapType(StringType, MapType(StringType, IntegerType)) - val out = in.select(from_json($"value", schema)) + val schema = "map>" + val out = in.select(from_json($"value", schema, Map.empty[String, String])) checkAnswer(out, Row(Map("a" -> Map("b" -> 1)))) } From af946b8ada5af91428e7ab44478e920308846a59 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Thu, 14 Jun 2018 11:51:57 +0200 Subject: [PATCH 6/6] Moving parsing of DDL string as StructType and DataType in the same method. --- .../sql/catalyst/expressions/jsonExpressions.scala | 10 +--------- .../scala/org/apache/spark/sql/types/DataType.scala | 10 +++++++++- .../main/scala/org/apache/spark/sql/functions.scala | 7 +------ 3 files changed, 11 insertions(+), 16 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index db82c4e4488bd..f6d74f5b74c8e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.catalyst.expressions import java.io.{ByteArrayInputStream, ByteArrayOutputStream, CharArrayWriter, InputStreamReader, StringWriter} -import scala.util.control.NonFatal import scala.util.parsing.combinator.RegexParsers import com.fasterxml.jackson.core._ @@ -29,7 +28,6 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.TypeCheckResult import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback import org.apache.spark.sql.catalyst.json._ -import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, BadRecordException, FailFastMode, GenericArrayData, MapData} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -749,13 +747,7 @@ case class StructsToJson( object JsonExprUtils { def validateSchemaLiteral(exp: Expression): DataType = exp match { - case Literal(s, StringType) => - val schema = s.toString - try { - CatalystSqlParser.parseDataType(schema) - } catch { - case NonFatal(_) => CatalystSqlParser.parseTableSchema(schema) - } + case Literal(s, StringType) => DataType.fromDDL(s.toString) case e => throw new AnalysisException(s"Expected a string literal instead of $e") } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala index 8a441dbb8e696..fd40741cfb5f1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.types import java.util.Locale +import scala.util.control.NonFatal + import org.json4s._ import org.json4s.JsonAST.JValue import org.json4s.JsonDSL._ @@ -111,7 +113,13 @@ abstract class DataType extends AbstractDataType { @InterfaceStability.Stable object DataType { - def fromDDL(ddl: String): DataType = CatalystSqlParser.parseDataType(ddl) + def fromDDL(ddl: String): DataType = { + try { + CatalystSqlParser.parseDataType(ddl) + } catch { + case NonFatal(_) => CatalystSqlParser.parseTableSchema(ddl) + } + } def fromJson(json: String): DataType = parseDataType(parse(json)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index 1f678f7e6f14f..8551058ec58ce 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -3369,12 +3369,7 @@ object functions { val dataType = try { DataType.fromJson(schema) } catch { - case NonFatal(_) => - try { - StructType.fromDDL(schema) - } catch { - case NonFatal(_) => DataType.fromDDL(schema) - } + case NonFatal(_) => DataType.fromDDL(schema) } from_json(e, dataType, options) }