Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] [Java] CudfException on conversion of data between Arrow and Cudf #16794

Closed
mythrocks opened this issue Sep 11, 2024 · 10 comments
Closed
Assignees
Labels
bug Something isn't working

Comments

@mythrocks
Copy link
Contributor

Description
The following exception is seen when CUDF JNI bindings are used to convert CUDF data to Arrow format, and then back to CUDF:

"Caused by: ai.rapids.cudf.CudfException: CUDF failure at:/home/jenkins/agent/workspace/jenkins-spark-rapids-jni_nightly-dev-856-cuda12/thirdparty/cudf/cpp/src/interop/arrow_utilities.cpp:74: Unsupported type_id conversion to cudf",

This was found during integration tests with https://github.com/NVIDIA/spark-rapids and https://github.com/NVIDIA/spark-rapids-jni.

I have narrowed it down to when #16590 was merged. Prior versions of CUDF that don't have this commit seem to work fine.

Repro
We don't yet have a narrow repro that uses on CUDF/JNI. I will include the pyspark repro here, and replace it with something smaller, once we have it:

import sys
import awkward as ak
import numpy as np
from pathlib import Path
from pyspark.sql.types import *
from pyspark.sql import SparkSession, DataFrame
import pyspark.sql.functions as F

def udf_array_square(iterator):
    for batch in iterator:
        b = ak.from_arrow(batch)
        b2 = ak.zip({"doubleF": np.square(b["doubleF"])}, depth_limit=1)
        yield from ak.to_arrow_table(b2).to_batches()

if __name__ == '__main__':
    spark = SparkSession.builder.appName(f'{Path(__file__).stem}').getOrCreate()
    spark.conf.set("spark.rapids.sql.enabled","true")

    df = spark.sql("select Array(rand(),rand(),rand()) doubleF from range(1e6)")
    df.show(truncate=False)

    newdf = df.mapInArrow(udf_array_square, df.schema)
    newdf.show(truncate=False)
    newdf.collect()
    newdf.explain()

Running this Pyspark script causes the CudfException to occur, and the query to fail.

Expected behaviour
One would expect that type-conversions not fail between CUDF and Arrow.

@mythrocks mythrocks added the bug Something isn't working label Sep 11, 2024
@mythrocks
Copy link
Contributor Author

Here are my findings so far:

  1. The failure is the result of the Arrow type ARROW_LARGE_LIST not having a mapping for conversion to the corresponding CUDF type.
  2. It doesn't look like this has to do with a mismatch in NANOARROW types.
  3. It isn't yet clear where the "large list" type lands up being introduced:
    a. The input CUDF table consists of STRUCT< LIST< FLOAT >>.
    b. While tracing, during conversion to Arrow (Table::convertCudfToArrowTable()), one sees that the type ids are congruent.
    c. It is only on the conversion back to CUDF (Table::convertArrowTableToCudf()) that the LIST is deemed a LARGE_LIST. The reader seems to misinterpret that specific type.

I can narrow this repro down further with a Java test shortly.

@mythrocks mythrocks self-assigned this Sep 18, 2024
@mythrocks
Copy link
Contributor Author

I can narrow this repro down further with a Java test shortly.

I should revise this: It wouldn't be productive to convert this into a standalone Java test. The problem is specifically caused because the result of the UDF in the repro is returning an Arrow LARGE_LIST, where it should be returning LIST.

I have done a fair bit of digging into the problem, and bisecting the changes. The crux of my findings concerns the effect of this part of the changes in #16590:

    - auto result = cudf::to_arrow(*tview, state->get_column_metadata(*tview));
    + auto got_arrow_schema = cudf::to_arrow_schema(*tview, state->get_column_metadata(*tview));
    + cudf::jni::set_nullable(got_arrow_schema.get());
    + auto got_arrow_array = cudf::to_arrow_host(*tview);
    + auto batch =
    +   arrow::ImportRecordBatch(&got_arrow_array->array, got_arrow_schema.get()).ValueOrDie();
    + auto result = arrow::Table::FromRecordBatches({batch}).ValueOrDie();

I can confirm that rolling back to using cudf::to_arrow() instead of cudf::to_arrow_host() / cudf::to_arrow_schema() allows the test to pass. Note that for the test to pass, I only rolled back the CUDF->Arrow conversion; I kept the new implementation of Arrow->CUDF.

It seems odd, but something in the way cudf::to_arrow_host() constructs the input Arrow table messes up the types in the output of the UDF.

On the face of it, the schemas of the tables constructed in both methods (i.e. cudf::to_arrow() and cudf::to_arrow_host()) seem to be identical. But I'm sure there's a subtle difference that precipitates the failure.

I'm still hopeful that we should be able to remedy this with a change in either TableJni::convertCudfToArrowTable() or cudf::to_arrow_schema(). I'd better consult @vyasr to confirm that I'm on the right track.

@mythrocks
Copy link
Contributor Author

I think I have a way forward for this bug. It has to do with the nullability vector. I'll post a PR, once I've gotten to the bottom of how nullability was transmitted in the old implementation (i.e. with cudf::to_arrow()).

@mythrocks
Copy link
Contributor Author

I'll raise a PR for what follows.

I'm not sure I've grokked the context for the following change from #16590:

void set_nullable(ArrowSchema* schema)
{
  schema->flags |= ARROW_FLAG_NULLABLE;
  for (int i = 0; i < schema->n_children; ++i) {
    set_nullable(schema->children[i]);
  }
}

@vyasr: This code sets all the columns to "nullable", unconditionally. Given that cudf::to_arrow_schema() already sets the nullability based on the nullability of the input table's columns, I'm wondering if this is strictly necessary.

I'm considering leaving the nullability as it is set from cudf::to_arrow_schema().

@mythrocks
Copy link
Contributor Author

The breakdown

There are a couple of factors at play here. Here is the breakdown of what's happening:

The failing test uses the awkward-array to test a PySpark UDF that processes Spark tables through Apache Arrow table batches.

One eccentricity of awkward-array is that processing a LIST<>(nullable=True) produces LARGE_LIST<> results. I haven't identified where in awkward-arrow this happens, but I've verified the behaviour empirically.
Note that CUDF does not support Arrow's LARGE_LIST data-type yet. (Neither does Apache Spark, incidentally.)

Why didn't it fail before?

Before #16590, the test above was passing because the cudf::to_arrow produced a LIST column that correctly had nullable=False.

Why fail now?

As part of #16590, all table columns (and child columns) had their nullability set to true. I've yet to fully understand the context of that change.

With the LIST column nullable, awkward-array does the awkward thing and returns LARGE_LIST, thus breaking libcudf and the test.

The remedy

Per the test, the LIST column ought not to be nullable. (It has no null rows.). I have verified the following:

  1. The ArrowSchema returned from cudf::to_arrow_schema() does have the right nullability set for LIST. For instance, if the input table contained only ARRAY( 4, null ), cudf::to_arrow_schema() returns:
+s (is_nullable=true)  with 1 children.
  +s (is_nullable=false)  with 1 children.
    +l (is_nullable=false)  with 1 children.  // <---- Not nullable:  No null ARRAY rows.
      i (is_nullable=true)  with 0 children.  // <---- Nullable INT child. The 2nd array element is null.
  1. Removing the forced (re)setting of nullability stops the crash.

@revans2
Copy link
Contributor

revans2 commented Sep 20, 2024

#16590 (comment) explains the issue.

The problem is that the schema is generated once and only once using the first batch of data. We probably need to regenerate the schema each time that we send a batch. But the fact that awkward-array changes to a LARGE_LIST for a nullable array is a bit odd. Why would it do that when there is not enough data to warrant it.

@mythrocks
Copy link
Contributor Author

mythrocks commented Sep 20, 2024

awkward-array changes to a LARGE_LIST... Why would it do that when there is not enough data to warrant it.

Baffling. It does so even with a single row, if it's a null row.

I have verified that this awkward-array's behaviour is reproducible without CUDF in the mix:

    spark.conf.set("spark.rapids.sql.enabled","false")

    df = spark.sql("SELECT intI FROM values (ARRAY(3, 4)), (CAST(NULL AS ARRAY<INT>)) AS intI")
    df.show(truncate=False)

    newdf = df.mapInArrow(udf_array_square, df.schema)
    newdf.show(truncate=False)

With this two-row table, with the second row being null, and CUDF/JNI nowhere in sight, here's Apache Spark choking on the same thing:

Caused by: org.apache.spark.SparkUnsupportedOperationException: [UNSUPPORTED_ARROWTYPE] Unsupported arrow type LargeList. 
        at org.apache.spark.sql.errors.ExecutionErrors.unsupportedArrowTypeError(ExecutionErrors.scala:143) 
        at org.apache.spark.sql.errors.ExecutionErrors.unsupportedArrowTypeError$(ExecutionErrors.scala:140) 
        at org.apache.spark.sql.errors.ExecutionErrors$.unsupportedArrowTypeError(ExecutionErrors.scala:218) 
        at org.apache.spark.sql.util.ArrowUtils$.fromArrowType(ArrowUtils.scala:88) 
        at org.apache.spark.sql.util.ArrowUtils$.fromArrowField(ArrowUtils.scala:147) 
        at org.apache.spark.sql.util.ArrowUtils$.$anonfun$fromArrowField$1(ArrowUtils.scala:143)

So awkward-array does do this for null rows. And neither Apache Spark nor CUDF/JNI deal with Arrow's LARGE_LIST.

If removing the nullable reset isn't viable, then maybe we should consider xfailing the spark-rapids UDF test.

@revans2
Copy link
Contributor

revans2 commented Sep 20, 2024

I think we have to fix the UDF test at least. xfail is okay in the short term, but long term I don't think that awkward is compatible with spark.

@vyasr
Copy link
Contributor

vyasr commented Sep 20, 2024

#16621 seems relevant here, although probably not in the short term. We need to figure out how best to match Arrow's choice of nullability in libcudf, particularly when there are no nulls in a column (and in the case of Spark, when there are no nulls in one batch but future batches might have them). It's tricky because of some implementation details like the fact that nullability in Arrow is a property of the type (more precisely, the field) while nullability in libcudf is defined by the (non)existence of the null mask in the column. It seems that the change to the C Data Interface is uncovering a lot more of those subtle edge cases. Rather than changing the behavior of the arrow conversion APIs, perhaps we need to add more explicit set_null_mask calls in various places to address the case of combining columns.

@mythrocks
Copy link
Contributor Author

Thank you for your input, @revans2, @vyasr. I will close this bug, and see what can be done to test PySpark UDFs without using awkward-array.

@mythrocks mythrocks closed this as not planned Won't fix, can't repro, duplicate, stale Sep 20, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
Status: No status
Development

No branches or pull requests

3 participants