Skip to content

Commit

Permalink
[SPARK-44695][PYTHON] Improve error message for DataFrame.toDF
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

This PR proposes to improve error message for `DataFrame.toDF`

### Why are the changes needed?

The current error message is not helpful to solve the problem.

### Does this PR introduce _any_ user-facing change?

Displaying more clear error message than before.

**Before**
```python
>>> df = spark.createDataFrame([("John", 30), ("Alice", 25), ("Bob", 28)])
>>> cols = ['A', None]
>>> df.toDF(*cols)
Traceback (most recent call last):
...
py4j.protocol.Py4JJavaError: An error occurred while calling o54.toDF.
: org.apache.spark.SparkException: [INTERNAL_ERROR] The Spark SQL phase analysis failed with an internal error. You hit a bug in Spark or the Spark plugins you use. Please, report this bug to the corresponding communities or vendors, and provide the full stack trace.
	at org.apache.spark.SparkException$.internalError(SparkException.scala:98)
	at org.apache.spark.sql.execution.QueryExecution$.toInternalError(QueryExecution.scala:519)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:531)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:202)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:858)
	at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:201)
	at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:76)
	at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:74)
	at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:66)
	at org.apache.spark.sql.Dataset$.$anonfun$ofRows$1(Dataset.scala:92)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:858)
	at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:90)
	at org.apache.spark.sql.Dataset.withPlan(Dataset.scala:4318)
	at org.apache.spark.sql.Dataset.select(Dataset.scala:1541)
	at org.apache.spark.sql.Dataset.toDF(Dataset.scala:539)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.NullPointerException
	at org.apache.spark.sql.catalyst.analysis.ColumnResolutionHelper.$anonfun$resolveLateralColumnAlias$2(ColumnResolutionHelper.scala:308)
	at scala.collection.immutable.List.map(List.scala:297)
	at org.apache.spark.sql.catalyst.analysis.ColumnResolutionHelper.resolveLateralColumnAlias(ColumnResolutionHelper.scala:305)
	at org.apache.spark.sql.catalyst.analysis.ColumnResolutionHelper.resolveLateralColumnAlias$(ColumnResolutionHelper.scala:260)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.resolveLateralColumnAlias(Analyzer.scala:1462)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$16.applyOrElse(Analyzer.scala:1602)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$16.applyOrElse(Analyzer.scala:1487)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$3(AnalysisHelper.scala:138)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$1(AnalysisHelper.scala:138)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning(AnalysisHelper.scala:134)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning$(AnalysisHelper.scala:130)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUpWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUp(AnalysisHelper.scala:111)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUp$(AnalysisHelper.scala:110)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUp(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.apply(Analyzer.scala:1487)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.apply(Analyzer.scala:1462)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:222)
	at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
	at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
	at scala.collection.immutable.List.foldLeft(List.scala:91)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:219)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:211)
	at scala.collection.immutable.List.foreach(List.scala:431)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:211)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:228)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$execute$1(Analyzer.scala:224)
	at org.apache.spark.sql.catalyst.analysis.AnalysisContext$.withNewAnalysisContext(Analyzer.scala:173)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:224)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:188)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:182)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:182)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:209)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:330)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:208)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:76)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:202)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:529)
	... 24 more
```

**After**
```python
>>> df = spark.createDataFrame([("John", 30), ("Alice", 25), ("Bob", 28)])
>>> cols = ['A', None]
>>> df.toDF(*cols)
Traceback (most recent call last):
...
    raise PySparkTypeError(
pyspark.errors.exceptions.base.PySparkTypeError: [NOT_LIST_OF_STR] Argument `cols` should be a list[str], got NoneType.
```

### How was this patch tested?

Add UT.

Closes apache#42369 from itholic/improve_error_toDF.

Authored-by: itholic <haejoon.lee@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
  • Loading branch information
itholic authored and vpolet committed Aug 24, 2023
1 parent b9dfe6a commit a6cbe6c
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 0 deletions.
6 changes: 6 additions & 0 deletions python/pyspark/sql/connect/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -1732,6 +1732,12 @@ def to(self, schema: StructType) -> "DataFrame":
to.__doc__ = PySparkDataFrame.to.__doc__

def toDF(self, *cols: str) -> "DataFrame":
for col_ in cols:
if not isinstance(col_, str):
raise PySparkTypeError(
error_class="NOT_LIST_OF_STR",
message_parameters={"arg_name": "cols", "arg_type": type(col_).__name__},
)
return DataFrame.withPlan(plan.ToDF(self._plan, list(cols)), self._session)

toDF.__doc__ = PySparkDataFrame.toDF.__doc__
Expand Down
6 changes: 6 additions & 0 deletions python/pyspark/sql/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -5304,6 +5304,12 @@ def toDF(self, *cols: str) -> "DataFrame":
| 16| Bob|
+---+-----+
"""
for col in cols:
if not isinstance(col, str):
raise PySparkTypeError(
error_class="NOT_LIST_OF_STR",
message_parameters={"arg_name": "cols", "arg_type": type(col).__name__},
)
jdf = self._jdf.toDF(self._jseq(cols))
return DataFrame(jdf, self.sparkSession)

Expand Down
3 changes: 3 additions & 0 deletions python/pyspark/sql/tests/connect/test_parity_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ def test_to_pandas_with_duplicated_column_names(self):
def test_to_pandas_from_mixed_dataframe(self):
self.check_to_pandas_from_mixed_dataframe()

def test_toDF_with_string(self):
super().test_toDF_with_string()


if __name__ == "__main__":
import unittest
Expand Down
17 changes: 17 additions & 0 deletions python/pyspark/sql/tests/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -1008,6 +1008,23 @@ def test_sample(self):
IllegalArgumentException, lambda: self.spark.range(1).sample(-1.0).count()
)

def test_toDF_with_string(self):
df = self.spark.createDataFrame([("John", 30), ("Alice", 25), ("Bob", 28)])
data = [("John", 30), ("Alice", 25), ("Bob", 28)]

result = df.toDF("key", "value")
self.assertEqual(result.schema.simpleString(), "struct<key:string,value:bigint>")
self.assertEqual(result.collect(), data)

with self.assertRaises(PySparkTypeError) as pe:
df.toDF("key", None)

self.check_error(
exception=pe.exception,
error_class="NOT_LIST_OF_STR",
message_parameters={"arg_name": "cols", "arg_type": "NoneType"},
)

def test_toDF_with_schema_string(self):
data = [Row(key=i, value=str(i)) for i in range(100)]
rdd = self.sc.parallelize(data, 5)
Expand Down

0 comments on commit a6cbe6c

Please sign in to comment.