Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-23094][SPARK-23723][SPARK-23724][SQL] Support custom encoding for json files #20937

Closed
wants to merge 105 commits into from

Conversation

MaxGekk
Copy link
Member

@MaxGekk MaxGekk commented Mar 29, 2018

What changes were proposed in this pull request?

I propose new option for JSON datasource which allows to specify encoding (charset) of input and output files. Here is an example of using of the option:

spark.read.schema(schema)
  .option("multiline", "true")
  .option("encoding", "UTF-16LE")
  .json(fileName)

If the option is not specified, charset auto-detection mechanism is used by default.

The option can be used for saving datasets to jsons. Currently Spark is able to save datasets into json files in UTF-8 charset only. The changes allow to save data in any supported charset. Here is the approximate list of supported charsets by Oracle Java SE: https://docs.oracle.com/javase/8/docs/technotes/guides/intl/encoding.doc.html . An user can specify the charset of output jsons via the charset option like .option("charset", "UTF-16BE"). By default the output charset is still UTF-8 to keep backward compatibility.

The solution has the following restrictions for per-line mode (multiline = false):

How was this patch tested?

I added the following tests:

  • reads an json file in UTF-16LE encoding with BOM in multiline mode
  • read json file by using charset auto detection (UTF-32BE with BOM)
  • read json file using of user's charset (UTF-16LE)
  • saving in UTF-32BE and read the result by standard library (not by Spark)
  • checking that default charset is UTF-8
  • handling wrong (unsupported) charset

MaxGekk added 30 commits March 17, 2018 12:39
@SparkQA
Copy link

SparkQA commented Apr 22, 2018

Test build #89691 has finished for PR 20937 at commit 482b799.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -237,6 +237,9 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
:param allowUnquotedControlChars: allows JSON Strings to contain unquoted control
characters (ASCII characters with value less than 32,
including tab and line feed characters) or not.
:param encoding: standard encoding (charset) name, for example UTF-8, UTF-16LE and UTF-32BE.
If None is set, the encoding of input JSON will be detected automatically
when the multiLine option is set to ``true``.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it mean users have to set the encoding if multiLine is false?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it doesn't. If it had been true, it would break backward compatibility. In the comment, we just want to highlight that encoding auto-detection (it means correct auto-detection in all cases) is officially supported in the multiLine mode only.

In per-line mode, the auto-detection mechanism (when encoding is not set) can fail in some cases, for example if actual encoding of json file is UTF-16 with BOM but in some case it works (file's encoding is UTF-8 and actual line separator \n for example). That's why @HyukjinKwon suggested to mention only working case.

@@ -773,6 +776,8 @@ def json(self, path, mode=None, compression=None, dateFormat=None, timestampForm
formats follow the formats at ``java.text.SimpleDateFormat``.
This applies to timestamp type. If None is set, it uses the
default value, ``yyyy-MM-dd'T'HH:mm:ss.SSSXXX``.
:param encoding: specifies encoding (charset) of saved json files. If None is set,
the default UTF-8 charset will be used.
Copy link
Contributor

@cloud-fan cloud-fan Apr 23, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we mention that, if encoding is set and not utf-8, lineSep also need to be set when multiLine is false?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

@MaxGekk MaxGekk Apr 23, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I didn't realized initially that the comment related to writing. For writing if lineSep is not set by user, it will be set to \n in any case: https://github.com/MaxGekk/spark-1/blob/482b79969b9e0cc475e63b415051b32423facef4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala#L124

Actually, the current implementation is more strict than it is needed. It requires to set lineSep explicitly in write if multiLine is false and encoding is different from UTF-8.

s"""The ${enc} encoding must not be included in the blacklist when multiLine is disabled:
| ${blacklist.mkString(", ")}""".stripMargin)

val forcingLineSep = !(multiLine == false && enc != "UTF-8" && lineSeparator.isEmpty)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

enc != "UTF-8", we should not compare string directly, but turn them into Charset

"""JSON parser cannot handle a character in its input.
|Specifying encoding as an input option explicitly might help to resolve the issue.
|""".stripMargin + e.getMessage
throw new CharConversionException(msg)
Copy link
Contributor

@cloud-fan cloud-fan Apr 23, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will lose the original stack trace, we should something like

val newException = new CharConversionException(msg)
newException.setStackStrace(e.getStrackTrace)
throw newException

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW we should also follow the existing rule and wrap the exception with BadRecordException. See the code above.

@cloud-fan
Copy link
Contributor

LGTM except a few minor comments

@SparkQA
Copy link

SparkQA commented Apr 23, 2018

Test build #89741 has finished for PR 20937 at commit a7be182.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

retest this please

@SparkQA
Copy link

SparkQA commented Apr 24, 2018

Test build #89751 has finished for PR 20937 at commit a7be182.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

Seems fine but please allow me to take another look, which I will take within this weekend.

@SparkQA
Copy link

SparkQA commented Apr 27, 2018

Test build #89938 has finished for PR 20937 at commit e0cebf4.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class HasCollectSubModels(Params):
  • class Summarizer(object):
  • class SummaryBuilder(JavaWrapper):
  • class CrossValidator(Estimator, ValidatorParams, HasParallelism, HasCollectSubModels,
  • class TrainValidationSplit(Estimator, ValidatorParams, HasParallelism, HasCollectSubModels,
  • case class Reverse(child: Expression) extends UnaryExpression with ImplicitCastInputTypes
  • case class ArrayJoin(
  • case class ArrayMin(child: Expression) extends UnaryExpression with ImplicitCastInputTypes
  • case class ArrayMax(child: Expression) extends UnaryExpression with ImplicitCastInputTypes
  • case class ArrayPosition(left: Expression, right: Expression)
  • case class ElementAt(left: Expression, right: Expression) extends GetMapValueUtil
  • case class Concat(children: Seq[Expression]) extends Expression
  • case class Flatten(child: Expression) extends UnaryExpression
  • abstract class GetMapValueUtil extends BinaryExpression with ImplicitCastInputTypes
  • case class GetMapValue(child: Expression, key: Expression)
  • case class MonthsBetween(
  • trait QueryPlanConstraints extends ConstraintHelper
  • trait ConstraintHelper
  • class ArrayDataIndexedSeq[T](arrayData: ArrayData, dataType: DataType) extends IndexedSeq[T]
  • .doc(\"The class used to write checkpoint files atomically. This class must be a subclass \" +
  • case class CachedRDDBuilder(
  • case class InMemoryRelation(
  • trait CheckpointFileManager
  • sealed trait RenameHelperMethods
  • abstract class CancellableFSDataOutputStream(protected val underlyingStream: OutputStream)
  • sealed class RenameBasedFSDataOutputStream(
  • class FileSystemBasedCheckpointFileManager(path: Path, hadoopConf: Configuration)
  • class FileContextBasedCheckpointFileManager(path: Path, hadoopConf: Configuration)
  • case class WriteToContinuousDataSource(
  • case class WriteToContinuousDataSourceExec(writer: StreamWriter, query: SparkPlan)
  • abstract class MemoryStreamBase[A : Encoder](sqlContext: SQLContext) extends BaseStreamingSource
  • class ContinuousMemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
  • case class GetRecord(offset: ContinuousMemoryStreamPartitionOffset)
  • class ContinuousMemoryStreamDataReaderFactory(
  • class ContinuousMemoryStreamDataReader(
  • case class ContinuousMemoryStreamOffset(partitionNums: Map[Int, Int])
  • case class ContinuousMemoryStreamPartitionOffset(partition: Int, numProcessed: Int)

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM otherwise!

@@ -43,7 +47,38 @@ private[sql] object CreateJacksonParser extends Serializable {
jsonFactory.createParser(record.getBytes, 0, record.getLength)
}

def inputStream(jsonFactory: JsonFactory, record: InputStream): JsonParser = {
jsonFactory.createParser(record)
def getStreamDecoder(enc: String, in: Array[Byte], length: Int): StreamDecoder = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: private?

}

def inputStream(enc: String, jsonFactory: JsonFactory, is: InputStream): JsonParser = {
jsonFactory.createParser(new InputStreamReader(is, enc))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think #20937 (comment) is a good investigation. It should be good to leave a small note that we should avoid this way if possible.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a comment above

s"""The ${enc} encoding must not be included in the blacklist when multiLine is disabled:
| ${blacklist.mkString(", ")}""".stripMargin)

val forcingLineSep = !(multiLine == false &&
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

forcingLineSep -> things like ... isLineSepRequired?

@@ -372,6 +372,9 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* `java.text.SimpleDateFormat`. This applies to timestamp type.</li>
* <li>`multiLine` (default `false`): parse one record, which may span multiple lines,
* per file</li>
* <li>`encoding` (by default it is not set): allows to forcibly set one of standard basic
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a big deal but shall we match the description to Python side?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated python's comment to make it the same as here

val benchmark = new Benchmark("JSON schema inferring", rowsNum)

withTempPath { path =>
// scalastyle:off
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// scalastyle:off println
...
// scalastyle:on println

}
}

test("SPARK-23094: invalid json with leading nulls - from dataset") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's set PERMISSIVE explicitly and add this fact to this test title too.

}
}

test("SPARK-23094: invalid json with leading nulls - from file (multiLine=false)") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto


private val badJson = "\u0000\u0000\u0000A\u0001AAA"

test("SPARK-23094: invalid json with leading nulls - from file (multiLine=true)") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

assert(exception.getMessage == encoding)
}

test("SPARK-23723: read written json in UTF-16LE") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test tile like ... read back or roundtrip in read and write?

).repartition(2)
ds.write
.options(options)
.format("json").mode("overwrite")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto for overwrite

@SparkQA
Copy link

SparkQA commented Apr 28, 2018

Test build #89962 has finished for PR 20937 at commit d3d28aa.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

(12, "===", "US-ASCII", false),
(13, "$^+", "utf-32le", true)
).foreach {
case (testNum, sep, encoding, inferSchema) => checkReadJson(sep, encoding, inferSchema, testNum)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

foreach { case (testNum, sep, encoding, inferSchema) =>
  ...
}

This is actually a style - https://github.com/databricks/scala-style-guide#pattern-matching

not a big deal

}

def checkEncoding(expectedEncoding: String, pathToJsonFiles: String,
expectedContent: String): Unit = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it should be

def checkEncoding(
    expectedEncoding: String,
    pathToJsonFiles: String,
    expectedContent: String): Unit = {

per https://github.com/databricks/scala-style-guide#spacing-and-indentation

or

def checkEncoding(
    expectedEncoding: String, pathToJsonFiles: String, expectedContent: String): Unit = {

if it fits per databricks/scala-style-guide#58 (comment)

Not a big deal

import org.apache.spark.util.{Benchmark, Utils}

/**
* The benchmarks aims to measure performance of JSON parsing when encoding is set and isn't.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I usually avoid abbreviation in the doc tho.

}

private def createParser(enc: String, jsonFactory: JsonFactory,
stream: PortableDataStream): JsonParser = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto for style

@HyukjinKwon
Copy link
Member

Merged to master !!!

@HyukjinKwon
Copy link
Member

It doesn't necessarily make a followup for styles but it should be good to remember those when we review related PRs next time.

Thanks for bearing with me all here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants