Skip to content

Commit

Permalink
Fix: Java class class java.util.Date (#58)
Browse files Browse the repository at this point in the history
* Fix: Java class class java.util.Date does not have corresponding schema type.\n\tat org.apache.kafka.connect.json.JsonConverter.convertToJson

When the payload handed over to the sink in the form of a java.util.Map[_, _], and one value in the Map is of java.util.Date the conenctor fails with the exception above.

To fix the issue, the code relies on Jackson to serialise the code.

* Support the Map with Struct as part of the JSON conversion.

---------

Co-authored-by: stheppi <jenkins@lenses.io>
  • Loading branch information
stheppi and stheppi authored Jul 26, 2024
1 parent bb4e450 commit 4837412
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,12 @@ object ToJsonDataConverter {
data match {
case data: PrimitiveSinkData => converter.fromConnectData(topic.value, data.schema().orNull, data.safeValue)
case StructSinkData(structVal) => converter.fromConnectData(topic.value, data.schema().orNull, structVal)
case MapSinkData(map, schema) => converter.fromConnectData(topic.value, schema.orNull, map)
case MapSinkData(map, schema) =>
//In case of building a Map with Struct Jackson won't know how to serialise it
if (hasStructValues(map)) {
converter.fromConnectData(topic.value, schema.orNull, map)
} else jacksonJson.writeValueAsString(map).getBytes()

case ArraySinkData(array, _) if isPojo(array) =>
val json = jacksonJson.writeValueAsString(array)
json.getBytes()
Expand All @@ -61,6 +66,12 @@ object ToJsonDataConverter {
case data => data.value
}

private def hasStructValues(map: java.util.Map[_, _]) =
map.values().asScala.exists {
case _: Struct => true
case _ => false
}

/**
* This is a workaround to help some of the customers who use Kafka Connect SMT ignoring the best practices
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,14 @@ import org.scalatest.matchers.should.Matchers
import io.lenses.streamreactor.connect.cloud.common.model.CompressionCodec
import io.lenses.streamreactor.connect.cloud.common.model.CompressionCodecName.UNCOMPRESSED
import io.lenses.streamreactor.connect.cloud.common.model.CompressionCodecName.GZIP

import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream

import java.io.ByteArrayInputStream
import io.lenses.streamreactor.connect.cloud.common.formats.reader.TextStreamReader

import java.text.SimpleDateFormat
import java.time.Instant
import java.util.TimeZone
import scala.jdk.CollectionConverters.MapHasAsJava
import scala.jdk.CollectionConverters.SeqHasAsJava

Expand Down Expand Up @@ -281,6 +283,39 @@ class JsonFormatWriterTest extends AnyFlatSpec with Matchers {
treeLine1.get("wasps").textValue() should be("sting for fun")
}

"convert" should "write map to json containing java.util.date" in {

val outputStream = new CloudByteArrayOutputStream()
val jsonFormatWriter = new JsonFormatWriter(outputStream)

val dateFormat = new SimpleDateFormat("yyyy-MM-dd")
dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"))
val date = dateFormat.parse("2024-07-01")
jsonFormatWriter.write(
MessageDetail(
NullSinkData(None),
MapSinkData(
Map(
"bees" -> "sting when scared",
"wasps" -> "sting for fun",
"date" -> date,
).asJava,
),
Map.empty,
Some(Instant.now()),
topic,
0,
Offset(0),
),
)

val lines = outputStream.toString().split(System.lineSeparator())
val treeLine1 = new ObjectMapper().readTree(lines(0))
treeLine1.get("bees").textValue() should be("sting when scared")
treeLine1.get("wasps").textValue() should be("sting for fun")
treeLine1.get("date").numberValue().longValue() should be(1719792000000L)
}

"convert" should "write maps containing nulls as null in json" in {

val outputStream = new CloudByteArrayOutputStream()
Expand Down

0 comments on commit 4837412

Please sign in to comment.