Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-23723][SPARK-23724][SQL] Flexible format for the lineSep option of CSV datasource #2

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -85,35 +85,59 @@ private[sql] class JSONOptions(

val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false)

/**
* A sequence of bytes between two consecutive json records.
*/
val lineSeparator: Option[String] = parameters.get("lineSep")

/**
* Standard charset name. For example UTF-8, UTF-16LE and UTF-32BE.
* If the encoding is not specified (None), it will be detected automatically.
*/
val encoding: Option[String] = parameters.get("encoding")
.orElse(parameters.get("charset")).map { enc =>
val blacklist = List("UTF16", "UTF32")
val isBlacklisted = blacklist.contains(enc.toUpperCase.replaceAll("-|_", ""))
require(multiLine || !isBlacklisted,
s"""The ${enc} encoding must not be included in the blacklist:
| ${blacklist.mkString(", ")}""".stripMargin)

val forcingLineSep = !(multiLine == false && enc != "UTF-8" && lineSeparator.isEmpty)
require(forcingLineSep,
s"""The lineSep option must be specified for the $enc encoding.
|Example: .option("lineSep", "|^|")
|Note: lineSep can be detected automatically for UTF-8 only.""".stripMargin)
enc
val blacklist = List("UTF16", "UTF32")
val isBlacklisted = blacklist.contains(enc.toUpperCase.replaceAll("-|_", ""))
require(multiLine || !isBlacklisted,
s"""The ${enc} encoding must not be included in the blacklist:
| ${blacklist.mkString(", ")}""".stripMargin)

enc
}

val lineSeparatorInRead: Option[Array[Byte]] = lineSeparator.map { lineSep =>
lineSep.getBytes(encoding.getOrElse("UTF-8"))
/**
* A sequence of bytes between two consecutive json objects.
* Format of the option is:
* selector (1 char) + separator spec (any length) | sequence of chars
*
* Currently the following selectors are supported:
* - 'x' + sequence of bytes in hexadecimal format. For example: "x0a 0d".
* Hex pairs can be separated by any chars different from 0-9,A-F,a-f
* - '\' - reserved for a sequence of control chars like "\r\n"
* and unicode escape like "\u000D\u000A"
* - 'r' and '/' - reserved for future use
*/
val lineSeparator: Option[Array[Byte]] = parameters.get("lineSep").collect {
case hexs if hexs.startsWith("x") =>
hexs.replaceAll("[^0-9A-Fa-f]", "").sliding(2, 2).toArray
.map(Integer.parseInt(_, 16).toByte)
case reserved if reserved.startsWith("r") || reserved.startsWith("/") =>
throw new NotImplementedError(s"The $reserved selector has not supported yet")
case "" => throw new IllegalArgumentException("lineSep cannot be empty string")
case lineSep => lineSep.getBytes(encoding.getOrElse("UTF-8"))
}.orElse {
val forcingLineSep = multiLine || encoding.isEmpty || encoding == Some("UTF-8")
require(forcingLineSep,
s"""The lineSep option must be specified for the ${encoding.get} encoding.
|Example: .option("lineSep", "|^|")
|Note: lineSep can be detected automatically for UTF-8 only.""".stripMargin)
None
}
val lineSeparatorInWrite: String = lineSeparator.getOrElse("\n")

/**
* A sequence of bytes between two consecutive json objects used by JSON Reader to
* split input stream/text.
*/
val lineSeparatorInRead: Option[Array[Byte]] = lineSeparator
/**
* JSON Writer puts the string between json objects in output stream/text.
*/
val lineSeparatorInWrite: Option[Array[Byte]] = lineSeparator

/** Sets config options on a Jackson [[JsonFactory]]. */
def setJacksonOptions(factory: JsonFactory): Unit = {
Expand All @@ -128,7 +152,7 @@ private[sql] class JSONOptions(
}

def getTextOptions: Map[String, String] = {
Map[String, String]() ++
encoding.map("encoding" -> _) ++ lineSeparator.map("lineSep" -> _)
Map[String, String]() ++ encoding.map("encoding" -> _) ++
lineSeparator.map("lineSep" -> _.map("x%02x".format(_)).mkString)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.sql.catalyst.json

import java.io.Writer
import java.nio.charset.StandardCharsets
import java.nio.charset.Charset

import com.fasterxml.jackson.core._

Expand Down Expand Up @@ -75,7 +75,12 @@ private[sql] class JacksonGenerator(

private val gen = new JsonFactory().createGenerator(writer).setRootValueSeparator(null)

private val lineSeparator: String = options.lineSeparatorInWrite
private val lineSeparator: String = {
new String(
options.lineSeparatorInWrite.getOrElse(Array(0x0A.toByte)),
Charset.forName(options.encoding.getOrElse("UTF-8"))
)
}

private def makeWriter(dataType: DataType): ValueWriter = dataType match {
case NullType =>
Expand Down Expand Up @@ -255,7 +260,6 @@ private[sql] class JacksonGenerator(
}

def writeLineEnding(): Unit = {
// Note that JSON uses writer with UTF-8 charset. This string will be written out as UTF-8.
gen.writeRaw(lineSeparator)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ class TextFileFormat extends TextBasedFileFormat with DataSourceRegister {
path: String,
dataSchema: StructType,
context: TaskAttemptContext): OutputWriter = {
new TextOutputWriter(path, dataSchema, textOptions.lineSeparatorInWrite, context)
new TextOutputWriter(path, dataSchema, textOptions, context)
}

override def getFileExtension(context: TaskAttemptContext): String = {
Expand Down Expand Up @@ -149,10 +149,12 @@ class TextFileFormat extends TextBasedFileFormat with DataSourceRegister {
class TextOutputWriter(
path: String,
dataSchema: StructType,
lineSeparator: Array[Byte],
options: TextOptions,
context: TaskAttemptContext)
extends OutputWriter {

private val lineSeparator = options.lineSeparatorInWrite.getOrElse(Array(0x0A.toByte))

private val writer = CodecStreams.createOutputStream(context, new Path(path))

override def write(row: InternalRow): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,37 @@ private[text] class TextOptions(@transient private val parameters: CaseInsensiti

val encoding: Option[String] = parameters.get(ENCODING)

val lineSeparator: Option[Array[Byte]] = parameters.get(LINE_SEPARATOR).map { lineSep =>
require(lineSep.nonEmpty, s"'$LINE_SEPARATOR' cannot be an empty string.")

lineSep.getBytes(encoding.getOrElse("UTF-8"))
/**
* A sequence of bytes between two consecutive lines in a text.
* Format of the option is:
* selector (1 char) + separator spec (any length) | sequence of chars
*
* Currently the following selectors are supported:
* - 'x' + sequence of bytes in hexadecimal format. For example: "x0a 0d".
* Hex pairs can be separated by any chars different from 0-9,A-F,a-f
* - '\' - reserved for a sequence of control chars like "\r\n"
* and unicode escape like "\u000D\u000A"
* - 'r' and '/' - reserved for future use
*/
val lineSeparator: Option[Array[Byte]] = parameters.get("lineSep").collect {
case hexs if hexs.startsWith("x") =>
hexs.replaceAll("[^0-9A-Fa-f]", "").sliding(2, 2).toArray
.map(Integer.parseInt(_, 16).toByte)
case reserved if reserved.startsWith("r") || reserved.startsWith("/") =>
throw new NotImplementedError(s"The $reserved selector has not supported yet")
case "" => throw new IllegalArgumentException("lineSep cannot be empty string")
case lineSep => lineSep.getBytes(encoding.getOrElse("UTF-8"))
}

// Note that the option 'lineSep' uses a different default value in read and write.
/**
* A sequence of bytes between two consecutive lines used by Text Reader to
* split input stream/text.
*/
val lineSeparatorInRead: Option[Array[Byte]] = lineSeparator
val lineSeparatorInWrite: Array[Byte] =
lineSeparatorInRead.getOrElse("\n".getBytes("UTF-8"))
/**
* Text Writer puts the string between lines in output stream/text.
*/
val lineSeparatorInWrite: Option[Array[Byte]] = lineSeparator
}

private[datasources] object TextOptions {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2140,6 +2140,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
.option("encoding", "UTF-16")
.json(testFile(fileName))

val a = jsonDF.collect()
checkAnswer(jsonDF, Seq(
Row("Chris", "Baird"), Row("Doug", "Rood")
))
Expand Down Expand Up @@ -2349,7 +2350,9 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
("\u000d\u000a", "encoding", "UTF-32BE", false),
("\u000a\u000d", "encoding", "UTF-8", true),
("===", "encoding", "US-ASCII", false),
("$^+", "encoding", "utf-32le", true)
("$^+", "encoding", "utf-32le", true),
("x00 0a 00 0d", "encoding", "UTF-16BE", false),
("x0a.00.00.00 0d.00.00.00", "encoding", "UTF-32LE", true)
).zipWithIndex.foreach{case ((d, o, c, s), i) => checkReadJson(d, o, c, s, i)}
// scalastyle:on nonascii

Expand Down