Skip to content

Commit

Permalink
[SPARK-19949][SQL][FOLLOW-UP] Clean up parse modes and update related…
Browse files Browse the repository at this point in the history
… comments

## What changes were proposed in this pull request?

This PR proposes to make `mode` options in both CSV and JSON to use `cass object` and fix some related comments related previous fix.

Also, this PR modifies some tests related parse modes.

## How was this patch tested?

Modified unit tests in both `CSVSuite.scala` and `JsonSuite.scala`.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #17377 from HyukjinKwon/SPARK-19949.
  • Loading branch information
HyukjinKwon authored and gatorsmile committed Mar 22, 2017
1 parent 0caade6 commit 4658183
Show file tree
Hide file tree
Showing 14 changed files with 130 additions and 153 deletions.
6 changes: 2 additions & 4 deletions python/pyspark/sql/readwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -369,10 +369,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
:param maxCharsPerColumn: defines the maximum number of characters allowed for any given
value being read. If None is set, it uses the default value,
``-1`` meaning unlimited length.
:param maxMalformedLogPerPartition: sets the maximum number of malformed rows Spark will
log for each partition. Malformed records beyond this
number will be ignored. If None is set, it
uses the default value, ``10``.
:param maxMalformedLogPerPartition: this parameter is no longer used since Spark 2.2.0.
If specified, it is ignored.
:param mode: allows a mode for dealing with corrupt records during parsing. If None is
set, it uses the default value, ``PERMISSIVE``.
Expand Down
2 changes: 2 additions & 0 deletions python/pyspark/sql/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -625,6 +625,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
:param maxCharsPerColumn: defines the maximum number of characters allowed for any given
value being read. If None is set, it uses the default value,
``-1`` meaning unlimited length.
:param maxMalformedLogPerPartition: this parameter is no longer used since Spark 2.2.0.
If specified, it is ignored.
:param mode: allows a mode for dealing with corrupt records during parsing. If None is
set, it uses the default value, ``PERMISSIVE``.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.json._
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, BadRecordException, GenericArrayData, ParseModes}
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, BadRecordException, FailFastMode, GenericArrayData}
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -548,7 +548,7 @@ case class JsonToStructs(
lazy val parser =
new JacksonParser(
rowSchema,
new JSONOptions(options + ("mode" -> ParseModes.FAIL_FAST_MODE), timeZoneId.get))
new JSONOptions(options + ("mode" -> FailFastMode.name), timeZoneId.get))

override def dataType: DataType = schema

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ private[sql] class JSONOptions(
val allowBackslashEscapingAnyCharacter =
parameters.get("allowBackslashEscapingAnyCharacter").map(_.toBoolean).getOrElse(false)
val compressionCodec = parameters.get("compression").map(CompressionCodecs.getCodecClassName)
val parseMode = parameters.getOrElse("mode", "PERMISSIVE")
val parseMode: ParseMode =
parameters.get("mode").map(ParseMode.fromString).getOrElse(PermissiveMode)
val columnNameOfCorruptRecord =
parameters.getOrElse("columnNameOfCorruptRecord", defaultColumnNameOfCorruptRecord)

Expand All @@ -82,15 +83,6 @@ private[sql] class JSONOptions(

val wholeFile = parameters.get("wholeFile").map(_.toBoolean).getOrElse(false)

// Parse mode flags
if (!ParseModes.isValidMode(parseMode)) {
logWarning(s"$parseMode is not a valid parse mode. Using ${ParseModes.DEFAULT}.")
}

val failFast = ParseModes.isFailFastMode(parseMode)
val dropMalformed = ParseModes.isDropMalformedMode(parseMode)
val permissive = ParseModes.isPermissiveMode(parseMode)

/** Sets config options on a Jackson [[JsonFactory]]. */
def setJacksonOptions(factory: JsonFactory): Unit = {
factory.configure(JsonParser.Feature.ALLOW_COMMENTS, allowComments)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.unsafe.types.UTF8String

class FailureSafeParser[IN](
rawParser: IN => Seq[InternalRow],
mode: String,
mode: ParseMode,
schema: StructType,
columnNameOfCorruptRecord: String) {

Expand Down Expand Up @@ -58,11 +58,14 @@ class FailureSafeParser[IN](
try {
rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), () => null))
} catch {
case e: BadRecordException if ParseModes.isPermissiveMode(mode) =>
Iterator(toResultRow(e.partialResult(), e.record))
case _: BadRecordException if ParseModes.isDropMalformedMode(mode) =>
Iterator.empty
case e: BadRecordException => throw e.cause
case e: BadRecordException => mode match {
case PermissiveMode =>
Iterator(toResultRow(e.partialResult(), e.record))
case DropMalformedMode =>
Iterator.empty
case FailFastMode =>
throw e.cause
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.util

import org.apache.spark.internal.Logging

sealed trait ParseMode {
/**
* String name of the parse mode.
*/
def name: String
}

/**
* This mode permissively parses the records.
*/
case object PermissiveMode extends ParseMode { val name = "PERMISSIVE" }

/**
* This mode ignores the whole corrupted records.
*/
case object DropMalformedMode extends ParseMode { val name = "DROPMALFORMED" }

/**
* This mode throws an exception when it meets corrupted records.
*/
case object FailFastMode extends ParseMode { val name = "FAILFAST" }

object ParseMode extends Logging {
/**
* Returns the parse mode from the given string.
*/
def fromString(mode: String): ParseMode = mode.toUpperCase match {
case PermissiveMode.name => PermissiveMode
case DropMalformedMode.name => DropMalformedMode
case FailFastMode.name => FailFastMode
case _ =>
logWarning(s"$mode is not a valid parse mode. Using ${PermissiveMode.name}.")
PermissiveMode
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.util.Calendar

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils, GenericArrayData, ParseModes}
import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils, GenericArrayData, PermissiveMode}
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String

Expand Down Expand Up @@ -367,7 +367,7 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {

// Other modes should still return `null`.
checkEvaluation(
JsonToStructs(schema, Map("mode" -> ParseModes.PERMISSIVE_MODE), Literal(jsonData), gmtId),
JsonToStructs(schema, Map("mode" -> PermissiveMode.name), Literal(jsonData), gmtId),
null
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -510,10 +510,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* a record can have.</li>
* <li>`maxCharsPerColumn` (default `-1`): defines the maximum number of characters allowed
* for any given value being read. By default, it is -1 meaning unlimited length</li>
* <li>`maxMalformedLogPerPartition` (default `10`): sets the maximum number of malformed rows
* Spark will log for each partition. Malformed records beyond this number will be ignored.</li>
* <li>`mode` (default `PERMISSIVE`): allows a mode for dealing with corrupt records
* during parsing.
* during parsing. It supports the following case-insensitive modes.
* <ul>
* <li>`PERMISSIVE` : sets other fields to `null` when it meets a corrupted record, and puts
* the malformed string into a field configured by `columnNameOfCorruptRecord`. To keep
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ class CSVOptions(

val delimiter = CSVUtils.toChar(
parameters.getOrElse("sep", parameters.getOrElse("delimiter", ",")))
val parseMode = parameters.getOrElse("mode", "PERMISSIVE")
val parseMode: ParseMode =
parameters.get("mode").map(ParseMode.fromString).getOrElse(PermissiveMode)
val charset = parameters.getOrElse("encoding",
parameters.getOrElse("charset", StandardCharsets.UTF_8.name()))

Expand All @@ -95,15 +96,6 @@ class CSVOptions(
val ignoreLeadingWhiteSpaceFlag = getBool("ignoreLeadingWhiteSpace")
val ignoreTrailingWhiteSpaceFlag = getBool("ignoreTrailingWhiteSpace")

// Parse mode flags
if (!ParseModes.isValidMode(parseMode)) {
logWarning(s"$parseMode is not a valid parse mode. Using ${ParseModes.DEFAULT}.")
}

val failFast = ParseModes.isFailFastMode(parseMode)
val dropMalformed = ParseModes.isDropMalformedMode(parseMode)
val permissive = ParseModes.isPermissiveMode(parseMode)

val columnNameOfCorruptRecord =
parameters.getOrElse("columnNameOfCorruptRecord", defaultColumnNameOfCorruptRecord)

Expand Down Expand Up @@ -139,8 +131,6 @@ class CSVOptions(

val escapeQuotes = getBool("escapeQuotes", true)

val maxMalformedLogPerPartition = getInt("maxMalformedLogPerPartition", 10)

val quoteAll = getBool("quoteAll", false)

val inputBufferSize = 128
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.analysis.TypeCoercion
import org.apache.spark.sql.catalyst.json.JacksonUtils.nextUntil
import org.apache.spark.sql.catalyst.json.JSONOptions
import org.apache.spark.sql.catalyst.util.PermissiveMode
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils

Expand All @@ -40,7 +41,7 @@ private[sql] object JsonInferSchema {
json: RDD[T],
configOptions: JSONOptions,
createParser: (JsonFactory, T) => JsonParser): StructType = {
val shouldHandleCorruptRecord = configOptions.permissive
val shouldHandleCorruptRecord = configOptions.parseMode == PermissiveMode
val columnNameOfCorruptRecord = configOptions.columnNameOfCorruptRecord

// perform schema inference on each row and merge afterwards
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
* <li>`maxCharsPerColumn` (default `-1`): defines the maximum number of characters allowed
* for any given value being read. By default, it is -1 meaning unlimited length</li>
* <li>`mode` (default `PERMISSIVE`): allows a mode for dealing with corrupt records
* during parsing.
* during parsing. It supports the following case-insensitive modes.
* <ul>
* <li>`PERMISSIVE` : sets other fields to `null` when it meets a corrupted record, and puts
* the malformed string into a field configured by `columnNameOfCorruptRecord`. To keep
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -992,9 +992,10 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
test("SPARK-18699 put malformed records in a `columnNameOfCorruptRecord` field") {
Seq(false, true).foreach { wholeFile =>
val schema = new StructType().add("a", IntegerType).add("b", TimestampType)
// We use `PERMISSIVE` mode by default if invalid string is given.
val df1 = spark
.read
.option("mode", "PERMISSIVE")
.option("mode", "abcd")
.option("wholeFile", wholeFile)
.schema(schema)
.csv(testFile(valueMalformedFile))
Expand All @@ -1008,7 +1009,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
val schemaWithCorrField1 = schema.add(columnNameOfCorruptRecord, StringType)
val df2 = spark
.read
.option("mode", "PERMISSIVE")
.option("mode", "Permissive")
.option("columnNameOfCorruptRecord", columnNameOfCorruptRecord)
.option("wholeFile", wholeFile)
.schema(schemaWithCorrField1)
Expand All @@ -1025,7 +1026,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
.add("b", TimestampType)
val df3 = spark
.read
.option("mode", "PERMISSIVE")
.option("mode", "permissive")
.option("columnNameOfCorruptRecord", columnNameOfCorruptRecord)
.option("wholeFile", wholeFile)
.schema(schemaWithCorrField2)
Expand Down
Loading

0 comments on commit 4658183

Please sign in to comment.