Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-23094][SPARK-23723][SPARK-23724][SQL] Support custom encoding for json files #20937

Closed
wants to merge 105 commits into from
Closed
Show file tree
Hide file tree
Changes from 101 commits
Commits
Show all changes
105 commits
Select commit Hold shift + click to select a range
b2e92b4
Test for reading json in UTF-16 with BOM
MaxGekk Feb 11, 2018
cb2f27b
Use user's charset or autodetect it if the charset is not specified
MaxGekk Feb 11, 2018
0d45fd3
Added a type and a comment for charset
MaxGekk Feb 13, 2018
1fb9b32
Replacing the monadic chaining by matching because it is more readable
MaxGekk Feb 13, 2018
c3b04ee
Keeping the old method for backward compatibility
MaxGekk Feb 13, 2018
93d3879
testFile is moved into the test to make more local because it is used…
MaxGekk Feb 13, 2018
15798a1
Adding the charset as third parameter to the text method
MaxGekk Feb 13, 2018
cc05ce9
Removing whitespaces at the end of the line
MaxGekk Feb 13, 2018
74f2026
Fix the comment in javadoc style
MaxGekk Feb 13, 2018
4856b8e
Simplifying of the UTF-16 test
MaxGekk Feb 13, 2018
084f41f
A hint to the exception how to set the charset explicitly
MaxGekk Feb 15, 2018
31cd793
Fix for scala style checks
MaxGekk Feb 15, 2018
6eacd18
Run tests again
MaxGekk Feb 15, 2018
3b4a509
Improving of the exception message
MaxGekk Feb 15, 2018
cd1124e
Appended the original message to the exception
MaxGekk Feb 15, 2018
ebf5390
Multi-line reading of json file in utf-32
MaxGekk Feb 17, 2018
c5b6a35
Autodetect charset of jsons in the multiline mode
MaxGekk Feb 17, 2018
ef5e6c6
Test for reading a json in UTF-16LE in the multiline mode by using us…
MaxGekk Feb 17, 2018
f9b6ad1
Fix test: rename the test file - utf32be -> utf32BE
MaxGekk Feb 18, 2018
3b7714c
Fix code style
MaxGekk Feb 18, 2018
edb9167
Appending the create verb to the method for readability
MaxGekk Feb 18, 2018
5ba2881
Making the createParser as a separate private method
MaxGekk Feb 18, 2018
1509e10
Fix code style
MaxGekk Feb 18, 2018
e3184b3
Checks the charset option is supported
MaxGekk Feb 19, 2018
87d259c
Support charset as a parameter of the json method
MaxGekk Feb 19, 2018
76c1d08
Test for charset different from utf-8
MaxGekk Feb 19, 2018
88395b5
Description of the charset option of the json method
MaxGekk Feb 20, 2018
f2f8ae7
Minor changes in comments: added . at the end of a sentence
MaxGekk Feb 21, 2018
b451a03
Added a test for wrong charset name
MaxGekk Feb 21, 2018
c13c159
Testing that charset in any case is acceptable
MaxGekk Feb 21, 2018
1cb3ac0
Test: user specified wrong (but supported) charset
MaxGekk Feb 21, 2018
108e8e7
Set charset as an option
MaxGekk Feb 25, 2018
0d20cc6
Test: saving to json in UTF-32BE
MaxGekk Feb 23, 2018
54baf9f
Taking user's charset for saved json
MaxGekk Feb 23, 2018
1d50d94
Test: output charset is UTF-8 by default
MaxGekk Feb 23, 2018
bb53798
Changing the readJsonFiles method for readability
MaxGekk Mar 4, 2018
961b482
The test checks that json written by Spark can be read back
MaxGekk Mar 4, 2018
a794988
Adding the delimiter option encoded in base64
MaxGekk Feb 24, 2018
dccdaa2
Separator encoded as a sequence of bytes in hex
MaxGekk Feb 24, 2018
d0abab7
Refactoring: removed unused imports and renaming a parameter
MaxGekk Feb 24, 2018
6741796
The sep option is renamed to recordSeparator. The supported format is…
MaxGekk Mar 4, 2018
e4faae1
Renaming recordSeparator to recordDelimiter
MaxGekk Mar 18, 2018
01f4ef5
Comments for the recordDelimiter option
MaxGekk Mar 18, 2018
24cedb9
Support other formats of recordDelimiter
MaxGekk Mar 18, 2018
d40dda2
Checking different charsets and record delimiters
MaxGekk Mar 18, 2018
ad6496c
Renaming test's method to make it more readable
MaxGekk Mar 18, 2018
358863d
Test of reading json in different charsets and delimiters
MaxGekk Mar 18, 2018
7e5be5e
Fix inferring of csv schema for any charsets
MaxGekk Mar 18, 2018
d138d2d
Fix errors of scalastyle check
MaxGekk Mar 18, 2018
c26ef5d
Reserving format for regular expressions and concatenated json
MaxGekk Mar 22, 2018
5f0b069
Fix recordDelimiter tests
MaxGekk Mar 22, 2018
ef8248f
Additional cases are added to the delimiter test
MaxGekk Mar 22, 2018
2efac08
Renaming recordDelimiter to lineSeparator
MaxGekk Mar 22, 2018
b2020fa
Adding HyukjinKwon changes
MaxGekk Mar 22, 2018
f99c1e1
Revert lineSepInWrite back to lineSep
MaxGekk Mar 22, 2018
6d13d00
Merge remote-tracking branch 'origin/master' into json-line-sep
MaxGekk Mar 22, 2018
77112ef
Fix passing of the lineSeparator to HadoopFileLinesReader
MaxGekk Mar 22, 2018
d632706
Fix python style checking
MaxGekk Mar 23, 2018
bbff402
Fix text source tests and javadoc comments
MaxGekk Mar 23, 2018
3af996b
Merge branch 'json-charset' into json-charset-record-delimiter
MaxGekk Mar 27, 2018
8253811
Merge branch 'json-line-sep' into json-charset-record-delimiter
MaxGekk Mar 27, 2018
ab8210c
Getting UTF-8 as default charset for lineSep
MaxGekk Mar 27, 2018
7c6f115
Set charset different from UTF-8 in the test
MaxGekk Mar 27, 2018
f553b07
Fix for the charset test: charset wasn't specified
MaxGekk Mar 27, 2018
d6a07a1
Removing line leaved after merge
MaxGekk Mar 27, 2018
cb12ea3
Removing flexible format for lineSep
MaxGekk Mar 28, 2018
eb2965b
Adding ticket number to test titles
MaxGekk Mar 28, 2018
7a4865c
Making comments more precise
MaxGekk Mar 28, 2018
dbeb0c1
lineSep must be specified if charset is different from UTF-8
MaxGekk Mar 28, 2018
ac67020
Support encoding as a synonym for the charset option
MaxGekk Mar 29, 2018
d96b720
Merge remote-tracking branch 'origin/master' into json-encoding-line-sep
MaxGekk Mar 29, 2018
75f7bb6
Fix missing require and specifying field of internal row explicitly
MaxGekk Mar 29, 2018
d93dcdc
Making the doc generator happy
MaxGekk Mar 29, 2018
65b4b73
Making the encoding name as the primary name
MaxGekk Mar 31, 2018
6b52419
Blacklisting UTF-16 and UTF-32 in per-line mode
MaxGekk Mar 31, 2018
6116bac
Changes after code review
MaxGekk Mar 31, 2018
5383400
Renaming charset to encoding
MaxGekk Mar 31, 2018
1aeae3c
Changes requested by HyukjinKwon in the review
MaxGekk Apr 4, 2018
7e20891
Adding tests for SPARK-23094
MaxGekk Apr 4, 2018
0d3ed3c
Fix comments
MaxGekk Apr 5, 2018
5d5c295
Matching by encoding per each line is eliminated
MaxGekk Apr 6, 2018
e7be77d
Addressing Hyukjin's review comments
MaxGekk Apr 6, 2018
6bd841a
Fixes regarding to coding style
MaxGekk Apr 6, 2018
6a62679
Making lineSep as opt string
MaxGekk Apr 6, 2018
3b30ce0
Removing option name in a test
MaxGekk Apr 6, 2018
fcd0a21
Merge branch 'master' into json-encoding-line-sep
MaxGekk Apr 8, 2018
af71324
Addressing HyukjinKwon's review comments
MaxGekk Apr 8, 2018
76dbbed
Merge branch 'json-encoding-line-sep' of github.com:MaxGekk/spark-1 i…
MaxGekk Apr 8, 2018
3207e59
Merge remote-tracking branch 'origin/master' into json-encoding-line-sep
MaxGekk Apr 8, 2018
b817184
Making Scala style checker and compiler happy
MaxGekk Apr 8, 2018
15df9af
Merge remote-tracking branch 'origin/master' into json-encoding-line-sep
MaxGekk Apr 13, 2018
36253f4
Adressing Hyukjin Kwon's review comments
MaxGekk Apr 14, 2018
aa69559
Adding benchmarks for json reads
MaxGekk Apr 15, 2018
c35d5d1
Making Scala style checker happy
MaxGekk Apr 15, 2018
58fc5c6
Eliminate unneeded wrapping by ByteArrayInputStream per-line during s…
MaxGekk Apr 15, 2018
63b5894
Adding benchmarks for wide lines
MaxGekk Apr 15, 2018
1ace082
Making comments shorter
MaxGekk Apr 15, 2018
6c0df03
Removing empty line between spark's imports
MaxGekk Apr 15, 2018
b4c0d38
Creating a stream decoder with specific buffer size
MaxGekk Apr 15, 2018
f2a259f
Enable all JSON benchmarks
MaxGekk Apr 15, 2018
482b799
Addressing Hyukjin Kwon's review comments
MaxGekk Apr 22, 2018
a0ab98b
Addressing Wenchen Fan's review comments
MaxGekk Apr 23, 2018
a7be182
Merge branch 'json-encoding-line-sep' of github.com:MaxGekk/spark-1 i…
MaxGekk Apr 23, 2018
e0cebf4
Merge remote-tracking branch 'origin/master' into json-encoding-line-sep
MaxGekk Apr 27, 2018
d3d28aa
Addressing Hyukjin Kwon's review comments
MaxGekk Apr 28, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions python/pyspark/sql/readwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None,
allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None,
mode=None, columnNameOfCorruptRecord=None, dateFormat=None, timestampFormat=None,
multiLine=None, allowUnquotedControlChars=None, lineSep=None):
multiLine=None, allowUnquotedControlChars=None, encoding=None, lineSep=None):
"""
Loads JSON files and returns the results as a :class:`DataFrame`.

Expand Down Expand Up @@ -237,6 +237,9 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
:param allowUnquotedControlChars: allows JSON Strings to contain unquoted control
characters (ASCII characters with value less than 32,
including tab and line feed characters) or not.
:param encoding: standard encoding (charset) name, for example UTF-8, UTF-16LE and UTF-32BE.
If None is set, the encoding of input JSON will be detected automatically
when the multiLine option is set to ``true``.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it mean users have to set the encoding if multiLine is false?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it doesn't. If it had been true, it would break backward compatibility. In the comment, we just want to highlight that encoding auto-detection (it means correct auto-detection in all cases) is officially supported in the multiLine mode only.

In per-line mode, the auto-detection mechanism (when encoding is not set) can fail in some cases, for example if actual encoding of json file is UTF-16 with BOM but in some case it works (file's encoding is UTF-8 and actual line separator \n for example). That's why @HyukjinKwon suggested to mention only working case.

:param lineSep: defines the line separator that should be used for parsing. If None is
set, it covers all ``\\r``, ``\\r\\n`` and ``\\n``.

Expand All @@ -256,7 +259,7 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
allowBackslashEscapingAnyCharacter=allowBackslashEscapingAnyCharacter,
mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord, dateFormat=dateFormat,
timestampFormat=timestampFormat, multiLine=multiLine,
allowUnquotedControlChars=allowUnquotedControlChars, lineSep=lineSep)
allowUnquotedControlChars=allowUnquotedControlChars, encoding=encoding, lineSep=lineSep)
if isinstance(path, basestring):
path = [path]
if type(path) == list:
Expand Down Expand Up @@ -749,7 +752,7 @@ def saveAsTable(self, name, format=None, mode=None, partitionBy=None, **options)

@since(1.4)
def json(self, path, mode=None, compression=None, dateFormat=None, timestampFormat=None,
lineSep=None):
encoding=None, lineSep=None):
"""Saves the content of the :class:`DataFrame` in JSON format
(`JSON Lines text format or newline-delimited JSON <http://jsonlines.org/>`_) at the
specified path.
Expand All @@ -773,6 +776,8 @@ def json(self, path, mode=None, compression=None, dateFormat=None, timestampForm
formats follow the formats at ``java.text.SimpleDateFormat``.
This applies to timestamp type. If None is set, it uses the
default value, ``yyyy-MM-dd'T'HH:mm:ss.SSSXXX``.
:param encoding: specifies encoding (charset) of saved json files. If None is set,
the default UTF-8 charset will be used.
Copy link
Contributor

@cloud-fan cloud-fan Apr 23, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we mention that, if encoding is set and not utf-8, lineSep also need to be set when multiLine is false?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

@MaxGekk MaxGekk Apr 23, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I didn't realized initially that the comment related to writing. For writing if lineSep is not set by user, it will be set to \n in any case: https://github.com/MaxGekk/spark-1/blob/482b79969b9e0cc475e63b415051b32423facef4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala#L124

Actually, the current implementation is more strict than it is needed. It requires to set lineSep explicitly in write if multiLine is false and encoding is different from UTF-8.

:param lineSep: defines the line separator that should be used for writing. If None is
set, it uses the default value, ``\\n``.

Expand All @@ -781,7 +786,7 @@ def json(self, path, mode=None, compression=None, dateFormat=None, timestampForm
self.mode(mode)
self._set_opts(
compression=compression, dateFormat=dateFormat, timestampFormat=timestampFormat,
lineSep=lineSep)
encoding=encoding, lineSep=lineSep)
self._jwrite.json(path)

@since(1.4)
Expand Down
7 changes: 7 additions & 0 deletions python/pyspark/sql/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -685,6 +685,13 @@ def test_multiline_json(self):
multiLine=True)
self.assertEqual(people1.collect(), people_array.collect())

def test_encoding_json(self):
people_array = self.spark.read\
.json("python/test_support/sql/people_array_utf16le.json",
multiLine=True, encoding="UTF-16LE")
expected = [Row(age=30, name=u'Andy'), Row(age=19, name=u'Justin')]
self.assertEqual(people_array.collect(), expected)

def test_linesep_json(self):
df = self.spark.read.json("python/test_support/sql/people.json", lineSep=",")
expected = [Row(_corrupt_record=None, name=u'Michael'),
Expand Down
Binary file added python/test_support/sql/people_array_utf16le.json
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,14 @@
package org.apache.spark.sql.catalyst.json

import java.io.{ByteArrayInputStream, InputStream, InputStreamReader}
import java.nio.channels.Channels
import java.nio.charset.Charset

import com.fasterxml.jackson.core.{JsonFactory, JsonParser}
import org.apache.hadoop.io.Text
import sun.nio.cs.StreamDecoder

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.unsafe.types.UTF8String

private[sql] object CreateJacksonParser extends Serializable {
Expand All @@ -43,7 +47,38 @@ private[sql] object CreateJacksonParser extends Serializable {
jsonFactory.createParser(record.getBytes, 0, record.getLength)
}

def inputStream(jsonFactory: JsonFactory, record: InputStream): JsonParser = {
jsonFactory.createParser(record)
def getStreamDecoder(enc: String, in: Array[Byte], length: Int): StreamDecoder = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: private?

val bais = new ByteArrayInputStream(in, 0, length)
val byteChannel = Channels.newChannel(bais)
val decodingBufferSize = Math.min(length, 8192)
val decoder = Charset.forName(enc).newDecoder()

StreamDecoder.forDecoder(byteChannel, decoder, decodingBufferSize)
}

def text(enc: String, jsonFactory: JsonFactory, record: Text): JsonParser = {
val sd = getStreamDecoder(enc, record.getBytes, record.getLength)
jsonFactory.createParser(sd)
}

def inputStream(jsonFactory: JsonFactory, is: InputStream): JsonParser = {
jsonFactory.createParser(is)
}

def inputStream(enc: String, jsonFactory: JsonFactory, is: InputStream): JsonParser = {
jsonFactory.createParser(new InputStreamReader(is, enc))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think #20937 (comment) is a good investigation. It should be good to leave a small note that we should avoid this way if possible.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a comment above

}

def internalRow(jsonFactory: JsonFactory, row: InternalRow): JsonParser = {
val ba = row.getBinary(0)

jsonFactory.createParser(ba, 0, ba.length)
}

def internalRow(enc: String, jsonFactory: JsonFactory, row: InternalRow): JsonParser = {
val binary = row.getBinary(0)
val sd = getStreamDecoder(enc, binary, binary.length)

jsonFactory.createParser(sd)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.catalyst.json

import java.nio.charset.StandardCharsets
import java.nio.charset.Charset
import java.util.{Locale, TimeZone}

import com.fasterxml.jackson.core.{JsonFactory, JsonParser}
Expand Down Expand Up @@ -86,14 +86,41 @@ private[sql] class JSONOptions(

val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false)

/**
* A string between two consecutive JSON records.
*/
val lineSeparator: Option[String] = parameters.get("lineSep").map { sep =>
require(sep.nonEmpty, "'lineSep' cannot be an empty string.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems this nonempty requirement is removed. Did I miss something or was it mistake?

sep
}
// Note that the option 'lineSep' uses a different default value in read and write.
val lineSeparatorInRead: Option[Array[Byte]] =
lineSeparator.map(_.getBytes(StandardCharsets.UTF_8))
// Note that JSON uses writer with UTF-8 charset. This string will be written out as UTF-8.

/**
* Standard encoding (charset) name. For example UTF-8, UTF-16LE and UTF-32BE.
* If the encoding is not specified (None), it will be detected automatically
* when the multiLine option is set to `true`.
*/
val encoding: Option[String] = parameters.get("encoding")
.orElse(parameters.get("charset")).map { enc =>
// The following encodings are not supported in per-line mode (multiline is false)
// because they cause some problems in reading files with BOM which is supposed to
// present in the files with such encodings. After splitting input files by lines,
// only the first lines will have the BOM which leads to impossibility for reading
// the rest lines. Besides of that, the lineSep option must have the BOM in such
// encodings which can never present between lines.
val blacklist = Seq(Charset.forName("UTF-16"), Charset.forName("UTF-32"))
val isBlacklisted = blacklist.contains(Charset.forName(enc))
require(multiLine || !isBlacklisted,
s"""The ${enc} encoding must not be included in the blacklist when multiLine is disabled:
| ${blacklist.mkString(", ")}""".stripMargin)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's add "when multiLine is disabled".


val forcingLineSep = !(multiLine == false && enc != "UTF-8" && lineSeparator.isEmpty)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

enc != "UTF-8", we should not compare string directly, but turn them into Charset

require(forcingLineSep, s"The lineSep option must be specified for the $enc encoding")
enc
}

val lineSeparatorInRead: Option[Array[Byte]] = lineSeparator.map { lineSep =>
lineSep.getBytes(encoding.getOrElse("UTF-8"))
}
val lineSeparatorInWrite: String = lineSeparator.getOrElse("\n")

/** Sets config options on a Jackson [[JsonFactory]]. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.catalyst.json

import java.io.ByteArrayOutputStream
import java.io.{ByteArrayOutputStream, CharConversionException}

import scala.collection.mutable.ArrayBuffer
import scala.util.Try
Expand Down Expand Up @@ -361,6 +361,12 @@ class JacksonParser(
// For such records, all fields other than the field configured by
// `columnNameOfCorruptRecord` are set to `null`.
throw BadRecordException(() => recordLiteral(record), () => None, e)
case e: CharConversionException if options.encoding.isEmpty =>
val msg =
"""JSON parser cannot handle a character in its input.
|Specifying encoding as an input option explicitly might help to resolve the issue.
|""".stripMargin + e.getMessage
throw new CharConversionException(msg)
Copy link
Contributor

@cloud-fan cloud-fan Apr 23, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will lose the original stack trace, we should something like

val newException = new CharConversionException(msg)
newException.setStackStrace(e.getStrackTrace)
throw newException

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW we should also follow the existing rule and wrap the exception with BadRecordException. See the code above.

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,9 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* `java.text.SimpleDateFormat`. This applies to timestamp type.</li>
* <li>`multiLine` (default `false`): parse one record, which may span multiple lines,
* per file</li>
* <li>`encoding` (by default it is not set): allows to forcibly set one of standard basic
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a big deal but shall we match the description to Python side?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated python's comment to make it the same as here

* or extended encoding for the JSON files. For example UTF-16BE, UTF-32LE. If the encoding
* is not specified and `multiLine` is set to `true`, it will be detected automatically.</li>
* <li>`lineSep` (default covers all `\r`, `\r\n` and `\n`): defines the line separator
* that should be used for parsing.</li>
* </ul>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -518,8 +518,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
* <li>`timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSXXX`): sets the string that
* indicates a timestamp format. Custom date formats follow the formats at
* `java.text.SimpleDateFormat`. This applies to timestamp type.</li>
* <li>`lineSep` (default `\n`): defines the line separator that should
* be used for writing.</li>
* <li>`encoding` (by default it is not set): specifies encoding (charset) of saved json
* files. If it is not set, the UTF-8 charset will be used. </li>
* <li>`lineSep` (default `\n`): defines the line separator that should be used for writing.</li>
* </ul>
*
* @since 1.4.0
Expand Down Expand Up @@ -589,8 +590,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
* <li>`compression` (default `null`): compression codec to use when saving to file. This can be
* one of the known case-insensitive shorten names (`none`, `bzip2`, `gzip`, `lz4`,
* `snappy` and `deflate`). </li>
* <li>`lineSep` (default `\n`): defines the line separator that should
* be used for writing.</li>
* <li>`lineSep` (default `\n`): defines the line separator that should be used for writing.</li>
* </ul>
*
* @since 1.6.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.spark.TaskContext
import org.apache.spark.input.{PortableDataStream, StreamInputFormat}
import org.apache.spark.rdd.{BinaryFileRDD, RDD}
import org.apache.spark.sql.{AnalysisException, Dataset, Encoders, SparkSession}
import org.apache.spark.sql.{Dataset, Encoders, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, JacksonParser, JSONOptions}
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.text.{TextFileFormat, TextOptions}
import org.apache.spark.sql.execution.datasources.text.TextFileFormat
import org.apache.spark.sql.types.StructType
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -92,26 +92,30 @@ object TextInputJsonDataSource extends JsonDataSource {
sparkSession: SparkSession,
inputPaths: Seq[FileStatus],
parsedOptions: JSONOptions): StructType = {
val json: Dataset[String] = createBaseDataset(
sparkSession, inputPaths, parsedOptions.lineSeparator)
val json: Dataset[String] = createBaseDataset(sparkSession, inputPaths, parsedOptions)

inferFromDataset(json, parsedOptions)
}

def inferFromDataset(json: Dataset[String], parsedOptions: JSONOptions): StructType = {
val sampled: Dataset[String] = JsonUtils.sample(json, parsedOptions)
val rdd: RDD[UTF8String] = sampled.queryExecution.toRdd.map(_.getUTF8String(0))
JsonInferSchema.infer(rdd, parsedOptions, CreateJacksonParser.utf8String)
val rdd: RDD[InternalRow] = sampled.queryExecution.toRdd
val rowParser = parsedOptions.encoding.map { enc =>
CreateJacksonParser.internalRow(enc, _: JsonFactory, _: InternalRow)
}.getOrElse(CreateJacksonParser.internalRow(_: JsonFactory, _: InternalRow))

JsonInferSchema.infer(rdd, parsedOptions, rowParser)
}

private def createBaseDataset(
sparkSession: SparkSession,
inputPaths: Seq[FileStatus],
lineSeparator: Option[String]): Dataset[String] = {
val textOptions = lineSeparator.map { lineSep =>
Map(TextOptions.LINE_SEPARATOR -> lineSep)
}.getOrElse(Map.empty[String, String])

parsedOptions: JSONOptions): Dataset[String] = {
val paths = inputPaths.map(_.getPath.toString)
val textOptions = Map.empty[String, String] ++
parsedOptions.encoding.map("encoding" -> _) ++
parsedOptions.lineSeparator.map("lineSep" -> _)

sparkSession.baseRelationToDataFrame(
DataSource.apply(
sparkSession,
Expand All @@ -129,8 +133,12 @@ object TextInputJsonDataSource extends JsonDataSource {
schema: StructType): Iterator[InternalRow] = {
val linesReader = new HadoopFileLinesReader(file, parser.options.lineSeparatorInRead, conf)
Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => linesReader.close()))
val textParser = parser.options.encoding
.map(enc => CreateJacksonParser.text(enc, _: JsonFactory, _: Text))
.getOrElse(CreateJacksonParser.text(_: JsonFactory, _: Text))

val safeParser = new FailureSafeParser[Text](
input => parser.parse(input, CreateJacksonParser.text, textToUTF8String),
input => parser.parse(input, textParser, textToUTF8String),
parser.options.parseMode,
schema,
parser.options.columnNameOfCorruptRecord)
Expand All @@ -153,7 +161,11 @@ object MultiLineJsonDataSource extends JsonDataSource {
parsedOptions: JSONOptions): StructType = {
val json: RDD[PortableDataStream] = createBaseRdd(sparkSession, inputPaths)
val sampled: RDD[PortableDataStream] = JsonUtils.sample(json, parsedOptions)
JsonInferSchema.infer(sampled, parsedOptions, createParser)
val parser = parsedOptions.encoding
.map(enc => createParser(enc, _: JsonFactory, _: PortableDataStream))
.getOrElse(createParser(_: JsonFactory, _: PortableDataStream))

JsonInferSchema.infer[PortableDataStream](sampled, parsedOptions, parser)
}

private def createBaseRdd(
Expand All @@ -175,11 +187,18 @@ object MultiLineJsonDataSource extends JsonDataSource {
.values
}

private def createParser(jsonFactory: JsonFactory, record: PortableDataStream): JsonParser = {
val path = new Path(record.getPath())
CreateJacksonParser.inputStream(
jsonFactory,
CodecStreams.createInputStreamWithCloseResource(record.getConfiguration, path))
private def dataToInputStream(dataStream: PortableDataStream): InputStream = {
val path = new Path(dataStream.getPath())
CodecStreams.createInputStreamWithCloseResource(dataStream.getConfiguration, path)
}

private def createParser(jsonFactory: JsonFactory, stream: PortableDataStream): JsonParser = {
CreateJacksonParser.inputStream(jsonFactory, dataToInputStream(stream))
}

private def createParser(enc: String, jsonFactory: JsonFactory,
stream: PortableDataStream): JsonParser = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto for style

CreateJacksonParser.inputStream(enc, jsonFactory, dataToInputStream(stream))
}

override def readFile(
Expand All @@ -194,9 +213,12 @@ object MultiLineJsonDataSource extends JsonDataSource {
UTF8String.fromBytes(ByteStreams.toByteArray(inputStream))
}
}
val streamParser = parser.options.encoding
.map(enc => CreateJacksonParser.inputStream(enc, _: JsonFactory, _: InputStream))
.getOrElse(CreateJacksonParser.inputStream(_: JsonFactory, _: InputStream))

val safeParser = new FailureSafeParser[InputStream](
input => parser.parse(input, CreateJacksonParser.inputStream, partitionedFileString),
input => parser.parse[InputStream](input, streamParser, partitionedFileString),
parser.options.parseMode,
schema,
parser.options.columnNameOfCorruptRecord)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.execution.datasources.json

import java.nio.charset.{Charset, StandardCharsets}

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
Expand Down Expand Up @@ -151,7 +153,13 @@ private[json] class JsonOutputWriter(
context: TaskAttemptContext)
extends OutputWriter with Logging {

private val writer = CodecStreams.createOutputStreamWriter(context, new Path(path))
private val encoding = options.encoding match {
case Some(charsetName) => Charset.forName(charsetName)
case None => StandardCharsets.UTF_8
}

private val writer = CodecStreams.createOutputStreamWriter(
context, new Path(path), encoding)

// create the Generator without separator inserted between 2 records
private[this] val gen = new JacksonGenerator(dataSchema, writer, options)
Expand Down
Loading