Skip to content

Commit

Permalink
avoid multiple json generator created
Browse files Browse the repository at this point in the history
  • Loading branch information
chenghao-intel committed Feb 9, 2015
1 parent 4396dfb commit aeb7801
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 16 deletions.
24 changes: 22 additions & 2 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql

import java.io.CharArrayWriter

import scala.language.implicitConversions
import scala.reflect.ClassTag
import scala.collection.JavaConversions._
Expand Down Expand Up @@ -409,8 +411,26 @@ private[sql] class DataFrameImpl protected[sql](
override def toJSON: RDD[String] = {
val rowSchema = this.schema
this.mapPartitions { iter =>
val jsonFactory = new JsonFactory()
iter.map(JsonRDD.rowToJSON(rowSchema, jsonFactory))
val writer = new CharArrayWriter()
// create the Generator without separator inserted between 2 records
val gen = new JsonFactory().createGenerator(writer).setRootValueSeparator(null)

new Iterator[String] {
override def hasNext() = iter.hasNext
override def next(): String = {
JsonRDD.rowToJSON(rowSchema, gen)(iter.next())
gen.flush()

val json = writer.toString
if (hasNext) {
writer.reset()
} else {
gen.close()
}

json
}
}
}
}

Expand Down
13 changes: 3 additions & 10 deletions sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@ import java.sql.{Date, Timestamp}
import scala.collection.Map
import scala.collection.convert.Wrappers.{JMapWrapper, JListWrapper}

import com.fasterxml.jackson.core.JsonProcessingException
import com.fasterxml.jackson.core.JsonFactory
import com.fasterxml.jackson.core.{JsonGenerator, JsonProcessingException, JsonFactory}
import com.fasterxml.jackson.databind.ObjectMapper

import org.apache.spark.rdd.RDD
Expand Down Expand Up @@ -430,14 +429,11 @@ private[sql] object JsonRDD extends Logging {

/** Transforms a single Row to JSON using Jackson
*
* @param jsonFactory a JsonFactory object to construct a JsonGenerator
* @param rowSchema the schema object used for conversion
* @param gen a JsonGenerator object
* @param row The row to convert
*/
private[sql] def rowToJSON(rowSchema: StructType, jsonFactory: JsonFactory)(row: Row): String = {
val writer = new StringWriter()
val gen = jsonFactory.createGenerator(writer)

private[sql] def rowToJSON(rowSchema: StructType, gen: JsonGenerator)(row: Row) = {
def valWriter: (DataType, Any) => Unit = {
case (_, null) | (NullType, _) => gen.writeNull()
case (StringType, v: String) => gen.writeString(v)
Expand Down Expand Up @@ -479,8 +475,5 @@ private[sql] object JsonRDD extends Logging {
}

valWriter(rowSchema, row)
gen.close()
writer.toString
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -826,8 +826,8 @@ class JsonSuite extends QueryTest {
df1.registerTempTable("applySchema1")
val df2 = df1.toDataFrame
val result = df2.toJSON.collect()
assert(result(0) == "{\"f1\":1,\"f2\":\"A1\",\"f3\":true,\"f4\":[\"1\",\" A1\",\" true\",\" null\"]}")
assert(result(3) == "{\"f1\":4,\"f2\":\"D4\",\"f3\":true,\"f4\":[\"4\",\" D4\",\" true\",\" 2147483644\"],\"f5\":2147483644}")
assert(result(0) === "{\"f1\":1,\"f2\":\"A1\",\"f3\":true,\"f4\":[\"1\",\" A1\",\" true\",\" null\"]}")
assert(result(3) === "{\"f1\":4,\"f2\":\"D4\",\"f3\":true,\"f4\":[\"4\",\" D4\",\" true\",\" 2147483644\"],\"f5\":2147483644}")

val schema2 = StructType(
StructField("f1", StructType(
Expand All @@ -848,8 +848,8 @@ class JsonSuite extends QueryTest {
val df4 = df3.toDataFrame
val result2 = df4.toJSON.collect()

assert(result2(1) == "{\"f1\":{\"f11\":2,\"f12\":false},\"f2\":{\"B2\":null}}")
assert(result2(3) == "{\"f1\":{\"f11\":4,\"f12\":true},\"f2\":{\"D4\":2147483644}}")
assert(result2(1) === "{\"f1\":{\"f11\":2,\"f12\":false},\"f2\":{\"B2\":null}}")
assert(result2(3) === "{\"f1\":{\"f11\":4,\"f12\":true},\"f2\":{\"D4\":2147483644}}")

val jsonDF = jsonRDD(primitiveFieldAndType)
val primTable = jsonRDD(jsonDF.toJSON)
Expand Down

0 comments on commit aeb7801

Please sign in to comment.