Skip to content

Commit

Permalink
[CELEBORN-1728] Fix NPE when failing to connect to celeborn worker
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
Fix NPE. When failed to connect to celeborn worker, the currentReader might be `null`.

### Why are the changes needed?

I am testing #2921 in the celeborn cluster.

And set the `celeborn.data.io.connectionTimeout` to 30s for fetch failure testing, and it failed to connect to celeborn worker for 3 times, and then the currentReader was null.

<img width="1700" alt="image" src="https://github.com/user-attachments/assets/9473294d-2cca-4f8b-bc86-ab6f70f04cff">

https://github.com/turboFei/incubator-celeborn/blob/2be9682a34f97ff10b90f22f60d9fea2bc5b81b7/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java#L672

```
24/11/20 16:15:41 ERROR Executor: Exception in task 16238.0 in stage 9.0 (TID 108550)
java.lang.NullPointerException
	at org.apache.celeborn.client.read.CelebornInputStream$CelebornInputStreamImpl.fillBuffer(CelebornInputStream.java:672)
	at org.apache.celeborn.client.read.CelebornInputStream$CelebornInputStreamImpl.read(CelebornInputStream.java:515)
	at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
	at java.io.BufferedInputStream.read1(BufferedInputStream.java:286)
	at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
	at java.io.DataInputStream.read(DataInputStream.java:149)
	at org.sparkproject.guava.io.ByteStreams.read(ByteStreams.java:899)
	at org.sparkproject.guava.io.ByteStreams.readFully(ByteStreams.java:733)
	at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.next(UnsafeRowSerializer.scala:127)
	at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.next(UnsafeRowSerializer.scala:110)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:496)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29)
	at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:40)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage8.sort_addToSorter_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage8.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:756)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:225)
	at org.apache.spark.sql.execution.SortExec.$anonfun$doExecute$1(SortExec.scala:119)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:498)
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?
GA.

Closes #2933 from turboFei/npe_reader.

Authored-by: Wang, Fei <fwang12@ebay.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
(cherry picked from commit 094fe28)
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
  • Loading branch information
turboFei authored and FMX committed Nov 21, 2024
1 parent 42b3032 commit 5341de5
Showing 1 changed file with 2 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -663,7 +663,7 @@ private boolean fillBuffer() throws IOException {
appShuffleId,
shuffleId,
partitionId,
currentReader.getLocation(),
Optional.ofNullable(currentReader).map(PartitionReader::getLocation).orElse(null),
e);
IOException ioe;
if (e instanceof IOException) {
Expand Down Expand Up @@ -691,7 +691,7 @@ private boolean fillBuffer() throws IOException {
appShuffleId,
shuffleId,
partitionId,
currentReader.getLocation(),
Optional.ofNullable(currentReader).map(PartitionReader::getLocation).orElse(null),
e);
throw e;
}
Expand Down

0 comments on commit 5341de5

Please sign in to comment.