-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-23094][SPARK-23723][SPARK-23724][SQL] Support custom encoding for json files #20937
Conversation
… only in the test
Test build #89691 has finished for PR 20937 at commit
|
python/pyspark/sql/readwriter.py
Outdated
@@ -237,6 +237,9 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None, | |||
:param allowUnquotedControlChars: allows JSON Strings to contain unquoted control | |||
characters (ASCII characters with value less than 32, | |||
including tab and line feed characters) or not. | |||
:param encoding: standard encoding (charset) name, for example UTF-8, UTF-16LE and UTF-32BE. | |||
If None is set, the encoding of input JSON will be detected automatically | |||
when the multiLine option is set to ``true``. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it mean users have to set the encoding if multiLine
is false?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, it doesn't. If it had been true, it would break backward compatibility. In the comment, we just want to highlight that encoding auto-detection (it means correct auto-detection in all cases) is officially supported in the multiLine mode only.
In per-line mode, the auto-detection mechanism (when encoding
is not set) can fail in some cases, for example if actual encoding of json file is UTF-16
with BOM but in some case it works (file's encoding is UTF-8
and actual line separator \n
for example). That's why @HyukjinKwon suggested to mention only working case.
@@ -773,6 +776,8 @@ def json(self, path, mode=None, compression=None, dateFormat=None, timestampForm | |||
formats follow the formats at ``java.text.SimpleDateFormat``. | |||
This applies to timestamp type. If None is set, it uses the | |||
default value, ``yyyy-MM-dd'T'HH:mm:ss.SSSXXX``. | |||
:param encoding: specifies encoding (charset) of saved json files. If None is set, | |||
the default UTF-8 charset will be used. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we mention that, if encoding
is set and not utf-8
, lineSep
also need to be set when multiLine
is false?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we can mention this in the comment but the user will get the error: https://github.com/MaxGekk/spark-1/blob/482b79969b9e0cc475e63b415051b32423facef4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala#L116-L117 if the lineSep
is not set.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I didn't realized initially that the comment related to writing. For writing if lineSep
is not set by user, it will be set to \n
in any case: https://github.com/MaxGekk/spark-1/blob/482b79969b9e0cc475e63b415051b32423facef4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala#L124
Actually, the current implementation is more strict than it is needed. It requires to set lineSep
explicitly in write if multiLine
is false
and encoding
is different from UTF-8
.
s"""The ${enc} encoding must not be included in the blacklist when multiLine is disabled: | ||
| ${blacklist.mkString(", ")}""".stripMargin) | ||
|
||
val forcingLineSep = !(multiLine == false && enc != "UTF-8" && lineSeparator.isEmpty) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
enc != "UTF-8"
, we should not compare string directly, but turn them into Charset
"""JSON parser cannot handle a character in its input. | ||
|Specifying encoding as an input option explicitly might help to resolve the issue. | ||
|""".stripMargin + e.getMessage | ||
throw new CharConversionException(msg) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will lose the original stack trace, we should something like
val newException = new CharConversionException(msg)
newException.setStackStrace(e.getStrackTrace)
throw newException
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW we should also follow the existing rule and wrap the exception with BadRecordException
. See the code above.
LGTM except a few minor comments |
…nto json-encoding-line-sep
Test build #89741 has finished for PR 20937 at commit
|
retest this please |
Test build #89751 has finished for PR 20937 at commit
|
Seems fine but please allow me to take another look, which I will take within this weekend. |
Test build #89938 has finished for PR 20937 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM otherwise!
@@ -43,7 +47,38 @@ private[sql] object CreateJacksonParser extends Serializable { | |||
jsonFactory.createParser(record.getBytes, 0, record.getLength) | |||
} | |||
|
|||
def inputStream(jsonFactory: JsonFactory, record: InputStream): JsonParser = { | |||
jsonFactory.createParser(record) | |||
def getStreamDecoder(enc: String, in: Array[Byte], length: Int): StreamDecoder = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: private?
} | ||
|
||
def inputStream(enc: String, jsonFactory: JsonFactory, is: InputStream): JsonParser = { | ||
jsonFactory.createParser(new InputStreamReader(is, enc)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think #20937 (comment) is a good investigation. It should be good to leave a small note that we should avoid this way if possible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a comment above
s"""The ${enc} encoding must not be included in the blacklist when multiLine is disabled: | ||
| ${blacklist.mkString(", ")}""".stripMargin) | ||
|
||
val forcingLineSep = !(multiLine == false && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
forcingLineSep
-> things like ... isLineSepRequired
?
@@ -372,6 +372,9 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { | |||
* `java.text.SimpleDateFormat`. This applies to timestamp type.</li> | |||
* <li>`multiLine` (default `false`): parse one record, which may span multiple lines, | |||
* per file</li> | |||
* <li>`encoding` (by default it is not set): allows to forcibly set one of standard basic |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not a big deal but shall we match the description to Python side?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I updated python's comment to make it the same as here
val benchmark = new Benchmark("JSON schema inferring", rowsNum) | ||
|
||
withTempPath { path => | ||
// scalastyle:off |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// scalastyle:off println
...
// scalastyle:on println
} | ||
} | ||
|
||
test("SPARK-23094: invalid json with leading nulls - from dataset") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's set PERMISSIVE explicitly and add this fact to this test title too.
} | ||
} | ||
|
||
test("SPARK-23094: invalid json with leading nulls - from file (multiLine=false)") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
|
||
private val badJson = "\u0000\u0000\u0000A\u0001AAA" | ||
|
||
test("SPARK-23094: invalid json with leading nulls - from file (multiLine=true)") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
assert(exception.getMessage == encoding) | ||
} | ||
|
||
test("SPARK-23723: read written json in UTF-16LE") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Test tile like ... read back or roundtrip in read and write?
).repartition(2) | ||
ds.write | ||
.options(options) | ||
.format("json").mode("overwrite") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto for overwrite
Test build #89962 has finished for PR 20937 at commit
|
(12, "===", "US-ASCII", false), | ||
(13, "$^+", "utf-32le", true) | ||
).foreach { | ||
case (testNum, sep, encoding, inferSchema) => checkReadJson(sep, encoding, inferSchema, testNum) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
foreach { case (testNum, sep, encoding, inferSchema) =>
...
}
This is actually a style - https://github.com/databricks/scala-style-guide#pattern-matching
not a big deal
} | ||
|
||
def checkEncoding(expectedEncoding: String, pathToJsonFiles: String, | ||
expectedContent: String): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it should be
def checkEncoding(
expectedEncoding: String,
pathToJsonFiles: String,
expectedContent: String): Unit = {
per https://github.com/databricks/scala-style-guide#spacing-and-indentation
or
def checkEncoding(
expectedEncoding: String, pathToJsonFiles: String, expectedContent: String): Unit = {
if it fits per databricks/scala-style-guide#58 (comment)
Not a big deal
import org.apache.spark.util.{Benchmark, Utils} | ||
|
||
/** | ||
* The benchmarks aims to measure performance of JSON parsing when encoding is set and isn't. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I usually avoid abbreviation in the doc tho.
} | ||
|
||
private def createParser(enc: String, jsonFactory: JsonFactory, | ||
stream: PortableDataStream): JsonParser = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto for style
Merged to master !!! |
It doesn't necessarily make a followup for styles but it should be good to remember those when we review related PRs next time. Thanks for bearing with me all here. |
What changes were proposed in this pull request?
I propose new option for JSON datasource which allows to specify encoding (charset) of input and output files. Here is an example of using of the option:
If the option is not specified, charset auto-detection mechanism is used by default.
The option can be used for saving datasets to jsons. Currently Spark is able to save datasets into json files in
UTF-8
charset only. The changes allow to save data in any supported charset. Here is the approximate list of supported charsets by Oracle Java SE: https://docs.oracle.com/javase/8/docs/technotes/guides/intl/encoding.doc.html . An user can specify the charset of output jsons via the charset option like.option("charset", "UTF-16BE")
. By default the output charset is stillUTF-8
to keep backward compatibility.The solution has the following restrictions for per-line mode (
multiline = false
):If charset is different from UTF-8, the lineSep option must be specified. The option required because Hadoop LineReader cannot detect the line separator correctly. Here is the ticket for solving the issue: https://issues.apache.org/jira/browse/SPARK-23725
Encoding with BOM are not supported. For example, the
UTF-16
andUTF-32
encodings are blacklisted. The problem can be solved by [SPARK-23723][SPARK-23724][SQL] Flexible format for the lineSep option of CSV datasource MaxGekk/spark#2How was this patch tested?
I added the following tests:
UTF-16LE
encoding with BOM inmultiline
modeUTF-32BE
with BOM)UTF-16LE
)UTF-32BE
and read the result by standard library (not by Spark)UTF-8