-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-23094][SPARK-23723][SPARK-23724][SQL] Support custom encoding for json files #20937
Changes from 101 commits
b2e92b4
cb2f27b
0d45fd3
1fb9b32
c3b04ee
93d3879
15798a1
cc05ce9
74f2026
4856b8e
084f41f
31cd793
6eacd18
3b4a509
cd1124e
ebf5390
c5b6a35
ef5e6c6
f9b6ad1
3b7714c
edb9167
5ba2881
1509e10
e3184b3
87d259c
76c1d08
88395b5
f2f8ae7
b451a03
c13c159
1cb3ac0
108e8e7
0d20cc6
54baf9f
1d50d94
bb53798
961b482
a794988
dccdaa2
d0abab7
6741796
e4faae1
01f4ef5
24cedb9
d40dda2
ad6496c
358863d
7e5be5e
d138d2d
c26ef5d
5f0b069
ef8248f
2efac08
b2020fa
f99c1e1
6d13d00
77112ef
d632706
bbff402
3af996b
8253811
ab8210c
7c6f115
f553b07
d6a07a1
cb12ea3
eb2965b
7a4865c
dbeb0c1
ac67020
d96b720
75f7bb6
d93dcdc
65b4b73
6b52419
6116bac
5383400
1aeae3c
7e20891
0d3ed3c
5d5c295
e7be77d
6bd841a
6a62679
3b30ce0
fcd0a21
af71324
76dbbed
3207e59
b817184
15df9af
36253f4
aa69559
c35d5d1
58fc5c6
63b5894
1ace082
6c0df03
b4c0d38
f2a259f
482b799
a0ab98b
a7be182
e0cebf4
d3d28aa
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -176,7 +176,7 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None, | |
allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None, | ||
allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None, | ||
mode=None, columnNameOfCorruptRecord=None, dateFormat=None, timestampFormat=None, | ||
multiLine=None, allowUnquotedControlChars=None, lineSep=None): | ||
multiLine=None, allowUnquotedControlChars=None, encoding=None, lineSep=None): | ||
""" | ||
Loads JSON files and returns the results as a :class:`DataFrame`. | ||
|
||
|
@@ -237,6 +237,9 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None, | |
:param allowUnquotedControlChars: allows JSON Strings to contain unquoted control | ||
characters (ASCII characters with value less than 32, | ||
including tab and line feed characters) or not. | ||
:param encoding: standard encoding (charset) name, for example UTF-8, UTF-16LE and UTF-32BE. | ||
If None is set, the encoding of input JSON will be detected automatically | ||
when the multiLine option is set to ``true``. | ||
:param lineSep: defines the line separator that should be used for parsing. If None is | ||
set, it covers all ``\\r``, ``\\r\\n`` and ``\\n``. | ||
|
||
|
@@ -256,7 +259,7 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None, | |
allowBackslashEscapingAnyCharacter=allowBackslashEscapingAnyCharacter, | ||
mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord, dateFormat=dateFormat, | ||
timestampFormat=timestampFormat, multiLine=multiLine, | ||
allowUnquotedControlChars=allowUnquotedControlChars, lineSep=lineSep) | ||
allowUnquotedControlChars=allowUnquotedControlChars, encoding=encoding, lineSep=lineSep) | ||
if isinstance(path, basestring): | ||
path = [path] | ||
if type(path) == list: | ||
|
@@ -749,7 +752,7 @@ def saveAsTable(self, name, format=None, mode=None, partitionBy=None, **options) | |
|
||
@since(1.4) | ||
def json(self, path, mode=None, compression=None, dateFormat=None, timestampFormat=None, | ||
lineSep=None): | ||
encoding=None, lineSep=None): | ||
"""Saves the content of the :class:`DataFrame` in JSON format | ||
(`JSON Lines text format or newline-delimited JSON <http://jsonlines.org/>`_) at the | ||
specified path. | ||
|
@@ -773,6 +776,8 @@ def json(self, path, mode=None, compression=None, dateFormat=None, timestampForm | |
formats follow the formats at ``java.text.SimpleDateFormat``. | ||
This applies to timestamp type. If None is set, it uses the | ||
default value, ``yyyy-MM-dd'T'HH:mm:ss.SSSXXX``. | ||
:param encoding: specifies encoding (charset) of saved json files. If None is set, | ||
the default UTF-8 charset will be used. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. shall we mention that, if There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, we can mention this in the comment but the user will get the error: https://github.com/MaxGekk/spark-1/blob/482b79969b9e0cc475e63b415051b32423facef4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala#L116-L117 if the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry, I didn't realized initially that the comment related to writing. For writing if Actually, the current implementation is more strict than it is needed. It requires to set |
||
:param lineSep: defines the line separator that should be used for writing. If None is | ||
set, it uses the default value, ``\\n``. | ||
|
||
|
@@ -781,7 +786,7 @@ def json(self, path, mode=None, compression=None, dateFormat=None, timestampForm | |
self.mode(mode) | ||
self._set_opts( | ||
compression=compression, dateFormat=dateFormat, timestampFormat=timestampFormat, | ||
lineSep=lineSep) | ||
encoding=encoding, lineSep=lineSep) | ||
self._jwrite.json(path) | ||
|
||
@since(1.4) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,10 +18,14 @@ | |
package org.apache.spark.sql.catalyst.json | ||
|
||
import java.io.{ByteArrayInputStream, InputStream, InputStreamReader} | ||
import java.nio.channels.Channels | ||
import java.nio.charset.Charset | ||
|
||
import com.fasterxml.jackson.core.{JsonFactory, JsonParser} | ||
import org.apache.hadoop.io.Text | ||
import sun.nio.cs.StreamDecoder | ||
|
||
import org.apache.spark.sql.catalyst.InternalRow | ||
import org.apache.spark.unsafe.types.UTF8String | ||
|
||
private[sql] object CreateJacksonParser extends Serializable { | ||
|
@@ -43,7 +47,38 @@ private[sql] object CreateJacksonParser extends Serializable { | |
jsonFactory.createParser(record.getBytes, 0, record.getLength) | ||
} | ||
|
||
def inputStream(jsonFactory: JsonFactory, record: InputStream): JsonParser = { | ||
jsonFactory.createParser(record) | ||
def getStreamDecoder(enc: String, in: Array[Byte], length: Int): StreamDecoder = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: private? |
||
val bais = new ByteArrayInputStream(in, 0, length) | ||
val byteChannel = Channels.newChannel(bais) | ||
val decodingBufferSize = Math.min(length, 8192) | ||
val decoder = Charset.forName(enc).newDecoder() | ||
|
||
StreamDecoder.forDecoder(byteChannel, decoder, decodingBufferSize) | ||
} | ||
|
||
def text(enc: String, jsonFactory: JsonFactory, record: Text): JsonParser = { | ||
val sd = getStreamDecoder(enc, record.getBytes, record.getLength) | ||
jsonFactory.createParser(sd) | ||
} | ||
|
||
def inputStream(jsonFactory: JsonFactory, is: InputStream): JsonParser = { | ||
jsonFactory.createParser(is) | ||
} | ||
|
||
def inputStream(enc: String, jsonFactory: JsonFactory, is: InputStream): JsonParser = { | ||
jsonFactory.createParser(new InputStreamReader(is, enc)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think #20937 (comment) is a good investigation. It should be good to leave a small note that we should avoid this way if possible. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I added a comment above |
||
} | ||
|
||
def internalRow(jsonFactory: JsonFactory, row: InternalRow): JsonParser = { | ||
val ba = row.getBinary(0) | ||
|
||
jsonFactory.createParser(ba, 0, ba.length) | ||
} | ||
|
||
def internalRow(enc: String, jsonFactory: JsonFactory, row: InternalRow): JsonParser = { | ||
val binary = row.getBinary(0) | ||
val sd = getStreamDecoder(enc, binary, binary.length) | ||
|
||
jsonFactory.createParser(sd) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,7 +17,7 @@ | |
|
||
package org.apache.spark.sql.catalyst.json | ||
|
||
import java.nio.charset.StandardCharsets | ||
import java.nio.charset.Charset | ||
import java.util.{Locale, TimeZone} | ||
|
||
import com.fasterxml.jackson.core.{JsonFactory, JsonParser} | ||
|
@@ -86,14 +86,41 @@ private[sql] class JSONOptions( | |
|
||
val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false) | ||
|
||
/** | ||
* A string between two consecutive JSON records. | ||
*/ | ||
val lineSeparator: Option[String] = parameters.get("lineSep").map { sep => | ||
require(sep.nonEmpty, "'lineSep' cannot be an empty string.") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Seems this nonempty requirement is removed. Did I miss something or was it mistake? |
||
sep | ||
} | ||
// Note that the option 'lineSep' uses a different default value in read and write. | ||
val lineSeparatorInRead: Option[Array[Byte]] = | ||
lineSeparator.map(_.getBytes(StandardCharsets.UTF_8)) | ||
// Note that JSON uses writer with UTF-8 charset. This string will be written out as UTF-8. | ||
|
||
/** | ||
* Standard encoding (charset) name. For example UTF-8, UTF-16LE and UTF-32BE. | ||
* If the encoding is not specified (None), it will be detected automatically | ||
* when the multiLine option is set to `true`. | ||
*/ | ||
val encoding: Option[String] = parameters.get("encoding") | ||
.orElse(parameters.get("charset")).map { enc => | ||
// The following encodings are not supported in per-line mode (multiline is false) | ||
// because they cause some problems in reading files with BOM which is supposed to | ||
// present in the files with such encodings. After splitting input files by lines, | ||
// only the first lines will have the BOM which leads to impossibility for reading | ||
// the rest lines. Besides of that, the lineSep option must have the BOM in such | ||
// encodings which can never present between lines. | ||
val blacklist = Seq(Charset.forName("UTF-16"), Charset.forName("UTF-32")) | ||
val isBlacklisted = blacklist.contains(Charset.forName(enc)) | ||
require(multiLine || !isBlacklisted, | ||
s"""The ${enc} encoding must not be included in the blacklist when multiLine is disabled: | ||
| ${blacklist.mkString(", ")}""".stripMargin) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. let's add "when multiLine is disabled". |
||
|
||
val forcingLineSep = !(multiLine == false && enc != "UTF-8" && lineSeparator.isEmpty) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
require(forcingLineSep, s"The lineSep option must be specified for the $enc encoding") | ||
enc | ||
} | ||
|
||
val lineSeparatorInRead: Option[Array[Byte]] = lineSeparator.map { lineSep => | ||
lineSep.getBytes(encoding.getOrElse("UTF-8")) | ||
} | ||
val lineSeparatorInWrite: String = lineSeparator.getOrElse("\n") | ||
|
||
/** Sets config options on a Jackson [[JsonFactory]]. */ | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,7 +17,7 @@ | |
|
||
package org.apache.spark.sql.catalyst.json | ||
|
||
import java.io.ByteArrayOutputStream | ||
import java.io.{ByteArrayOutputStream, CharConversionException} | ||
|
||
import scala.collection.mutable.ArrayBuffer | ||
import scala.util.Try | ||
|
@@ -361,6 +361,12 @@ class JacksonParser( | |
// For such records, all fields other than the field configured by | ||
// `columnNameOfCorruptRecord` are set to `null`. | ||
throw BadRecordException(() => recordLiteral(record), () => None, e) | ||
case e: CharConversionException if options.encoding.isEmpty => | ||
val msg = | ||
"""JSON parser cannot handle a character in its input. | ||
|Specifying encoding as an input option explicitly might help to resolve the issue. | ||
|""".stripMargin + e.getMessage | ||
throw new CharConversionException(msg) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This will lose the original stack trace, we should something like
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. BTW we should also follow the existing rule and wrap the exception with |
||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -366,6 +366,9 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { | |
* `java.text.SimpleDateFormat`. This applies to timestamp type.</li> | ||
* <li>`multiLine` (default `false`): parse one record, which may span multiple lines, | ||
* per file</li> | ||
* <li>`encoding` (by default it is not set): allows to forcibly set one of standard basic | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not a big deal but shall we match the description to Python side? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I updated python's comment to make it the same as here |
||
* or extended encoding for the JSON files. For example UTF-16BE, UTF-32LE. If the encoding | ||
* is not specified and `multiLine` is set to `true`, it will be detected automatically.</li> | ||
* <li>`lineSep` (default covers all `\r`, `\r\n` and `\n`): defines the line separator | ||
* that should be used for parsing.</li> | ||
* </ul> | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -31,11 +31,11 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat | |
import org.apache.spark.TaskContext | ||
import org.apache.spark.input.{PortableDataStream, StreamInputFormat} | ||
import org.apache.spark.rdd.{BinaryFileRDD, RDD} | ||
import org.apache.spark.sql.{AnalysisException, Dataset, Encoders, SparkSession} | ||
import org.apache.spark.sql.{Dataset, Encoders, SparkSession} | ||
import org.apache.spark.sql.catalyst.InternalRow | ||
import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, JacksonParser, JSONOptions} | ||
import org.apache.spark.sql.execution.datasources._ | ||
import org.apache.spark.sql.execution.datasources.text.{TextFileFormat, TextOptions} | ||
import org.apache.spark.sql.execution.datasources.text.TextFileFormat | ||
import org.apache.spark.sql.types.StructType | ||
import org.apache.spark.unsafe.types.UTF8String | ||
import org.apache.spark.util.Utils | ||
|
@@ -92,26 +92,30 @@ object TextInputJsonDataSource extends JsonDataSource { | |
sparkSession: SparkSession, | ||
inputPaths: Seq[FileStatus], | ||
parsedOptions: JSONOptions): StructType = { | ||
val json: Dataset[String] = createBaseDataset( | ||
sparkSession, inputPaths, parsedOptions.lineSeparator) | ||
val json: Dataset[String] = createBaseDataset(sparkSession, inputPaths, parsedOptions) | ||
|
||
inferFromDataset(json, parsedOptions) | ||
} | ||
|
||
def inferFromDataset(json: Dataset[String], parsedOptions: JSONOptions): StructType = { | ||
val sampled: Dataset[String] = JsonUtils.sample(json, parsedOptions) | ||
val rdd: RDD[UTF8String] = sampled.queryExecution.toRdd.map(_.getUTF8String(0)) | ||
JsonInferSchema.infer(rdd, parsedOptions, CreateJacksonParser.utf8String) | ||
val rdd: RDD[InternalRow] = sampled.queryExecution.toRdd | ||
val rowParser = parsedOptions.encoding.map { enc => | ||
CreateJacksonParser.internalRow(enc, _: JsonFactory, _: InternalRow) | ||
}.getOrElse(CreateJacksonParser.internalRow(_: JsonFactory, _: InternalRow)) | ||
|
||
JsonInferSchema.infer(rdd, parsedOptions, rowParser) | ||
} | ||
|
||
private def createBaseDataset( | ||
sparkSession: SparkSession, | ||
inputPaths: Seq[FileStatus], | ||
lineSeparator: Option[String]): Dataset[String] = { | ||
val textOptions = lineSeparator.map { lineSep => | ||
Map(TextOptions.LINE_SEPARATOR -> lineSep) | ||
}.getOrElse(Map.empty[String, String]) | ||
|
||
parsedOptions: JSONOptions): Dataset[String] = { | ||
val paths = inputPaths.map(_.getPath.toString) | ||
val textOptions = Map.empty[String, String] ++ | ||
parsedOptions.encoding.map("encoding" -> _) ++ | ||
parsedOptions.lineSeparator.map("lineSep" -> _) | ||
|
||
sparkSession.baseRelationToDataFrame( | ||
DataSource.apply( | ||
sparkSession, | ||
|
@@ -129,8 +133,12 @@ object TextInputJsonDataSource extends JsonDataSource { | |
schema: StructType): Iterator[InternalRow] = { | ||
val linesReader = new HadoopFileLinesReader(file, parser.options.lineSeparatorInRead, conf) | ||
Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => linesReader.close())) | ||
val textParser = parser.options.encoding | ||
.map(enc => CreateJacksonParser.text(enc, _: JsonFactory, _: Text)) | ||
.getOrElse(CreateJacksonParser.text(_: JsonFactory, _: Text)) | ||
|
||
val safeParser = new FailureSafeParser[Text]( | ||
input => parser.parse(input, CreateJacksonParser.text, textToUTF8String), | ||
input => parser.parse(input, textParser, textToUTF8String), | ||
parser.options.parseMode, | ||
schema, | ||
parser.options.columnNameOfCorruptRecord) | ||
|
@@ -153,7 +161,11 @@ object MultiLineJsonDataSource extends JsonDataSource { | |
parsedOptions: JSONOptions): StructType = { | ||
val json: RDD[PortableDataStream] = createBaseRdd(sparkSession, inputPaths) | ||
val sampled: RDD[PortableDataStream] = JsonUtils.sample(json, parsedOptions) | ||
JsonInferSchema.infer(sampled, parsedOptions, createParser) | ||
val parser = parsedOptions.encoding | ||
.map(enc => createParser(enc, _: JsonFactory, _: PortableDataStream)) | ||
.getOrElse(createParser(_: JsonFactory, _: PortableDataStream)) | ||
|
||
JsonInferSchema.infer[PortableDataStream](sampled, parsedOptions, parser) | ||
} | ||
|
||
private def createBaseRdd( | ||
|
@@ -175,11 +187,18 @@ object MultiLineJsonDataSource extends JsonDataSource { | |
.values | ||
} | ||
|
||
private def createParser(jsonFactory: JsonFactory, record: PortableDataStream): JsonParser = { | ||
val path = new Path(record.getPath()) | ||
CreateJacksonParser.inputStream( | ||
jsonFactory, | ||
CodecStreams.createInputStreamWithCloseResource(record.getConfiguration, path)) | ||
private def dataToInputStream(dataStream: PortableDataStream): InputStream = { | ||
val path = new Path(dataStream.getPath()) | ||
CodecStreams.createInputStreamWithCloseResource(dataStream.getConfiguration, path) | ||
} | ||
|
||
private def createParser(jsonFactory: JsonFactory, stream: PortableDataStream): JsonParser = { | ||
CreateJacksonParser.inputStream(jsonFactory, dataToInputStream(stream)) | ||
} | ||
|
||
private def createParser(enc: String, jsonFactory: JsonFactory, | ||
stream: PortableDataStream): JsonParser = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ditto for style |
||
CreateJacksonParser.inputStream(enc, jsonFactory, dataToInputStream(stream)) | ||
} | ||
|
||
override def readFile( | ||
|
@@ -194,9 +213,12 @@ object MultiLineJsonDataSource extends JsonDataSource { | |
UTF8String.fromBytes(ByteStreams.toByteArray(inputStream)) | ||
} | ||
} | ||
val streamParser = parser.options.encoding | ||
.map(enc => CreateJacksonParser.inputStream(enc, _: JsonFactory, _: InputStream)) | ||
.getOrElse(CreateJacksonParser.inputStream(_: JsonFactory, _: InputStream)) | ||
|
||
val safeParser = new FailureSafeParser[InputStream]( | ||
input => parser.parse(input, CreateJacksonParser.inputStream, partitionedFileString), | ||
input => parser.parse[InputStream](input, streamParser, partitionedFileString), | ||
parser.options.parseMode, | ||
schema, | ||
parser.options.columnNameOfCorruptRecord) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it mean users have to set the encoding if
multiLine
is false?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, it doesn't. If it had been true, it would break backward compatibility. In the comment, we just want to highlight that encoding auto-detection (it means correct auto-detection in all cases) is officially supported in the multiLine mode only.
In per-line mode, the auto-detection mechanism (when
encoding
is not set) can fail in some cases, for example if actual encoding of json file isUTF-16
with BOM but in some case it works (file's encoding isUTF-8
and actual line separator\n
for example). That's why @HyukjinKwon suggested to mention only working case.