Skip to content

Commit

Permalink
[SPARK-51440][SS] classify the NPE when null topic field value is in …
Browse files Browse the repository at this point in the history
…kafka message data and there is no topic option

### What changes were proposed in this pull request?

We are throwing out the NPE now when null topic field value is in kafka message data and there is no topic option. Introduce a new kafka error and throw out this classified exception instead.

### Why are the changes needed?

Error classification for better user experience

### Does this PR introduce _any_ user-facing change?

Yes, error classification

### How was this patch tested?

modify the existing unit test

### Was this patch authored or co-authored using generative AI tooling?

N/A

Closes apache#50214 from huanliwang-db/kafka-error.

Authored-by: huanliwang-db <huanli.wang@databricks.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
  • Loading branch information
huanliwang-db authored and HeartSaVioR committed Mar 11, 2025
1 parent 76a1413 commit a7b864f
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@
"Specified: <specifiedPartitions> Assigned: <assignedPartitions>"
]
},
"KAFKA_NULL_TOPIC_IN_DATA": {
"message" : [
"The Kafka message data sent to the producer contains a null topic. Use the `topic` option for setting a default topic."
]
},
"KAFKA_DATA_LOSS" : {
"message" : [
"Some data may have been lost because they are not available in Kafka any more;",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,12 @@ object KafkaExceptions {
"specifiedPartitions" -> specifiedPartitions.toString,
"assignedPartitions" -> assignedPartitions.toString))
}

def nullTopicInData(): KafkaIllegalStateException = {
new KafkaIllegalStateException(
errorClass = "KAFKA_NULL_TOPIC_IN_DATA",
messageParameters = Map.empty)
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.kafka.common.header.internals.RecordHeader

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, UnsafeProjection}
import org.apache.spark.sql.kafka010.KafkaExceptions.nullTopicInData
import org.apache.spark.sql.kafka010.producer.{CachedKafkaProducer, InternalKafkaProducerPool}
import org.apache.spark.sql.types.BinaryType

Expand Down Expand Up @@ -95,8 +96,7 @@ private[kafka010] abstract class KafkaRowWriter(
val key = projectedRow.getBinary(1)
val value = projectedRow.getBinary(2)
if (topic == null) {
throw new NullPointerException(s"null topic present in the data. Use the " +
s"${KafkaSourceProvider.TOPIC_OPTION_KEY} option for setting a default topic.")
throw nullTopicInData()
}
val partition: Integer =
if (projectedRow.isNullAt(4)) null else projectedRow.getInt(4)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.kafka.common.Cluster
import org.apache.kafka.common.serialization.ByteArraySerializer
import org.scalatest.time.SpanSugar._

import org.apache.spark.{SparkConf, SparkContext, SparkException, TestUtils}
import org.apache.spark.{SparkConf, SparkContext, TestUtils}
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, SpecificInternalRow, UnsafeProjection}
import org.apache.spark.sql.execution.streaming.{MemoryStream, MemoryStreamBase}
Expand Down Expand Up @@ -491,14 +491,17 @@ abstract class KafkaSinkBatchSuiteBase extends KafkaSinkSuiteBase {

test("batch - null topic field value, and no topic option") {
val df = Seq[(String, String)](null.asInstanceOf[String] -> "1").toDF("topic", "value")
val ex = intercept[SparkException] {
val ex = intercept[KafkaIllegalStateException] {
df.write
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.mode("append")
.save()
}
TestUtils.assertExceptionMsg(ex, "null topic present in the data")
checkError(
exception = ex,
condition = "KAFKA_NULL_TOPIC_IN_DATA"
)
}

protected def testUnsupportedSaveModes(msg: (SaveMode) => Seq[String]): Unit = {
Expand Down

0 comments on commit a7b864f

Please sign in to comment.