Skip to content

Commit

Permalink
Merge branch 'json-encoding-line-sep' of github.com:MaxGekk/spark-1 i…
Browse files Browse the repository at this point in the history
…nto json-encoding-line-sep
  • Loading branch information
MaxGekk committed Apr 23, 2018
2 parents a0ab98b + 482b799 commit a7be182
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -161,11 +161,11 @@ object MultiLineJsonDataSource extends JsonDataSource {
parsedOptions: JSONOptions): StructType = {
val json: RDD[PortableDataStream] = createBaseRdd(sparkSession, inputPaths)
val sampled: RDD[PortableDataStream] = JsonUtils.sample(json, parsedOptions)
val parser = parsedOptions.encoding
.map(enc => createParser(enc, _: JsonFactory, _: PortableDataStream))
.getOrElse(createParser(_: JsonFactory, _: PortableDataStream))

JsonInferSchema.infer[PortableDataStream](
sampled,
parsedOptions,
createParser(_, _, parsedOptions.encoding))
JsonInferSchema.infer[PortableDataStream](sampled, parsedOptions, parser)
}

private def createBaseRdd(
Expand All @@ -187,15 +187,18 @@ object MultiLineJsonDataSource extends JsonDataSource {
.values
}

private def createParser(
jsonFactory: JsonFactory,
record: PortableDataStream,
encoding: Option[String]): JsonParser = {
val path = new Path(record.getPath())
val is = CodecStreams.createInputStreamWithCloseResource(record.getConfiguration, path)
private def dataToInputStream(dataStream: PortableDataStream): InputStream = {
val path = new Path(dataStream.getPath())
CodecStreams.createInputStreamWithCloseResource(dataStream.getConfiguration, path)
}

private def createParser(jsonFactory: JsonFactory, stream: PortableDataStream): JsonParser = {
CreateJacksonParser.inputStream(jsonFactory, dataToInputStream(stream))
}

encoding.map(enc => CreateJacksonParser.inputStream(enc, jsonFactory, is))
.getOrElse(CreateJacksonParser.inputStream(jsonFactory, is))
private def createParser(enc: String, jsonFactory: JsonFactory,
stream: PortableDataStream): JsonParser = {
CreateJacksonParser.inputStream(enc, jsonFactory, dataToInputStream(stream))
}

override def readFile(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.execution.datasources.text

import java.nio.charset.StandardCharsets
import java.nio.charset.{Charset, StandardCharsets}

import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CompressionCodecs}

Expand Down Expand Up @@ -51,10 +51,10 @@ private[text] class TextOptions(@transient private val parameters: CaseInsensiti

// Note that the option 'lineSep' uses a different default value in read and write.
val lineSeparatorInRead: Option[Array[Byte]] = lineSeparator.map { lineSep =>
lineSep.getBytes(encoding.getOrElse("UTF-8"))
lineSep.getBytes(encoding.map(Charset.forName(_)).getOrElse(StandardCharsets.UTF_8))
}
val lineSeparatorInWrite: Array[Byte] =
lineSeparatorInRead.getOrElse("\n".getBytes("UTF-8"))
lineSeparatorInRead.getOrElse("\n".getBytes(StandardCharsets.UTF_8))
}

private[datasources] object TextOptions {
Expand Down

0 comments on commit a7be182

Please sign in to comment.