diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 122e17f2020f4..759c27507c397 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -369,10 +369,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non :param maxCharsPerColumn: defines the maximum number of characters allowed for any given value being read. If None is set, it uses the default value, ``-1`` meaning unlimited length. - :param maxMalformedLogPerPartition: sets the maximum number of malformed rows Spark will - log for each partition. Malformed records beyond this - number will be ignored. If None is set, it - uses the default value, ``10``. + :param maxMalformedLogPerPartition: this parameter is no longer used since Spark 2.2.0. + If specified, it is ignored. :param mode: allows a mode for dealing with corrupt records during parsing. If None is set, it uses the default value, ``PERMISSIVE``. diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index 288cc1e4f64dc..e227f9ceb5769 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -625,6 +625,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non :param maxCharsPerColumn: defines the maximum number of characters allowed for any given value being read. If None is set, it uses the default value, ``-1`` meaning unlimited length. + :param maxMalformedLogPerPartition: this parameter is no longer used since Spark 2.2.0. + If specified, it is ignored. :param mode: allows a mode for dealing with corrupt records during parsing. If None is set, it uses the default value, ``PERMISSIVE``. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index 08af5522d822d..df4d406b84d60 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.json._ -import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, BadRecordException, GenericArrayData, ParseModes} +import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, BadRecordException, FailFastMode, GenericArrayData} import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.Utils @@ -548,7 +548,7 @@ case class JsonToStructs( lazy val parser = new JacksonParser( rowSchema, - new JSONOptions(options + ("mode" -> ParseModes.FAIL_FAST_MODE), timeZoneId.get)) + new JSONOptions(options + ("mode" -> FailFastMode.name), timeZoneId.get)) override def dataType: DataType = schema diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala index 355c26afa6f0d..c22b1ade4e64b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala @@ -65,7 +65,8 @@ private[sql] class JSONOptions( val allowBackslashEscapingAnyCharacter = parameters.get("allowBackslashEscapingAnyCharacter").map(_.toBoolean).getOrElse(false) val compressionCodec = parameters.get("compression").map(CompressionCodecs.getCodecClassName) - val parseMode = parameters.getOrElse("mode", "PERMISSIVE") + val parseMode: ParseMode = + parameters.get("mode").map(ParseMode.fromString).getOrElse(PermissiveMode) val columnNameOfCorruptRecord = parameters.getOrElse("columnNameOfCorruptRecord", defaultColumnNameOfCorruptRecord) @@ -82,15 +83,6 @@ private[sql] class JSONOptions( val wholeFile = parameters.get("wholeFile").map(_.toBoolean).getOrElse(false) - // Parse mode flags - if (!ParseModes.isValidMode(parseMode)) { - logWarning(s"$parseMode is not a valid parse mode. Using ${ParseModes.DEFAULT}.") - } - - val failFast = ParseModes.isFailFastMode(parseMode) - val dropMalformed = ParseModes.isDropMalformedMode(parseMode) - val permissive = ParseModes.isPermissiveMode(parseMode) - /** Sets config options on a Jackson [[JsonFactory]]. */ def setJacksonOptions(factory: JsonFactory): Unit = { factory.configure(JsonParser.Feature.ALLOW_COMMENTS, allowComments) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala index e8da10d65ecb9..725e3015b3416 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala @@ -24,7 +24,7 @@ import org.apache.spark.unsafe.types.UTF8String class FailureSafeParser[IN]( rawParser: IN => Seq[InternalRow], - mode: String, + mode: ParseMode, schema: StructType, columnNameOfCorruptRecord: String) { @@ -58,11 +58,14 @@ class FailureSafeParser[IN]( try { rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), () => null)) } catch { - case e: BadRecordException if ParseModes.isPermissiveMode(mode) => - Iterator(toResultRow(e.partialResult(), e.record)) - case _: BadRecordException if ParseModes.isDropMalformedMode(mode) => - Iterator.empty - case e: BadRecordException => throw e.cause + case e: BadRecordException => mode match { + case PermissiveMode => + Iterator(toResultRow(e.partialResult(), e.record)) + case DropMalformedMode => + Iterator.empty + case FailFastMode => + throw e.cause + } } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ParseMode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ParseMode.scala new file mode 100644 index 0000000000000..4565dbde88c88 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ParseMode.scala @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.util + +import org.apache.spark.internal.Logging + +sealed trait ParseMode { + /** + * String name of the parse mode. + */ + def name: String +} + +/** + * This mode permissively parses the records. + */ +case object PermissiveMode extends ParseMode { val name = "PERMISSIVE" } + +/** + * This mode ignores the whole corrupted records. + */ +case object DropMalformedMode extends ParseMode { val name = "DROPMALFORMED" } + +/** + * This mode throws an exception when it meets corrupted records. + */ +case object FailFastMode extends ParseMode { val name = "FAILFAST" } + +object ParseMode extends Logging { + /** + * Returns the parse mode from the given string. + */ + def fromString(mode: String): ParseMode = mode.toUpperCase match { + case PermissiveMode.name => PermissiveMode + case DropMalformedMode.name => DropMalformedMode + case FailFastMode.name => FailFastMode + case _ => + logWarning(s"$mode is not a valid parse mode. Using ${PermissiveMode.name}.") + PermissiveMode + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ParseModes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ParseModes.scala deleted file mode 100644 index 0e466962b4678..0000000000000 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ParseModes.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.catalyst.util - -object ParseModes { - val PERMISSIVE_MODE = "PERMISSIVE" - val DROP_MALFORMED_MODE = "DROPMALFORMED" - val FAIL_FAST_MODE = "FAILFAST" - - val DEFAULT = PERMISSIVE_MODE - - def isValidMode(mode: String): Boolean = { - mode.toUpperCase match { - case PERMISSIVE_MODE | DROP_MALFORMED_MODE | FAIL_FAST_MODE => true - case _ => false - } - } - - def isDropMalformedMode(mode: String): Boolean = mode.toUpperCase == DROP_MALFORMED_MODE - def isFailFastMode(mode: String): Boolean = mode.toUpperCase == FAIL_FAST_MODE - def isPermissiveMode(mode: String): Boolean = if (isValidMode(mode)) { - mode.toUpperCase == PERMISSIVE_MODE - } else { - true // We default to permissive is the mode string is not valid - } -} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala index e4698d44636b6..c5b72235e5db0 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala @@ -21,7 +21,7 @@ import java.util.Calendar import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils, GenericArrayData, ParseModes} +import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils, GenericArrayData, PermissiveMode} import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String @@ -367,7 +367,7 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { // Other modes should still return `null`. checkEvaluation( - JsonToStructs(schema, Map("mode" -> ParseModes.PERMISSIVE_MODE), Literal(jsonData), gmtId), + JsonToStructs(schema, Map("mode" -> PermissiveMode.name), Literal(jsonData), gmtId), null ) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 767a636d70731..e39b4d91f1f6a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -510,10 +510,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * a record can have. *
  • `maxCharsPerColumn` (default `-1`): defines the maximum number of characters allowed * for any given value being read. By default, it is -1 meaning unlimited length
  • - *
  • `maxMalformedLogPerPartition` (default `10`): sets the maximum number of malformed rows - * Spark will log for each partition. Malformed records beyond this number will be ignored.
  • *
  • `mode` (default `PERMISSIVE`): allows a mode for dealing with corrupt records - * during parsing. + * during parsing. It supports the following case-insensitive modes. *