Skip to content

Commit

Permalink
post merge fixes and review takeaways
Browse files Browse the repository at this point in the history
  • Loading branch information
ABMC831 committed Oct 23, 2024
1 parent 3bb76fa commit 56ff6ed
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,23 @@ object KafkaCase {

def main(args: Array[String]): Unit = {
val writerConfig = config.getConfig("writer")
val readerConfig = config.getConfig("reader")
val topicName = config.getString("topicName")
println("Sample how to use writer with manual resource handling")
WriterManualResourceHandling(writerConfig, topicName, sampleMessageToWrite)
println("Sample how to use writer with custom resource handling")
WriterCustomResourceHandling(writerConfig, topicName, sampleMessageToWrite)
println("Sample how to use writer with Usings in scala 3")
WriterUsingsResourceHandling(writerConfig, topicName, sampleMessageToWrite)
println("Sample how to use writer write-once fashion")
WriterWriteOnce(writerConfig, topicName, sampleMessageToWrite)
val readerConfig = config.getConfig("reader")
println("Sample how to use reader with manual resource handling")
ReaderManualResourceHandling[EdlaChange](readerConfig, topicName)
println("Sample how to use reader with custom resource handling")
ReaderCustomResourceHandling[EdlaChange](readerConfig, topicName)
println("Sample how to use reader with Usings in scala 3")
ReaderUsingsResourceHandling[EdlaChange](readerConfig, topicName)
println("Sample how to use reader read-once fashion")
ReaderReadOnce[EdlaChange](readerConfig, topicName)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,11 @@

package za.co.absa.kafkacase.examples.reader

import com.typesafe.config.Config
import io.circe.Decoder
import za.co.absa.kafkacase.reader.Reader

import java.util.Properties

object ReaderReadOnce {
def apply[T: Decoder](readerProps: Properties, topicName: String): Unit =
Reader.readOnce[T](readerProps, topicName, println)
def apply[T: Decoder](readerConf: Config, topicName: String): Unit =
Reader.readOnce[T](readerConf, topicName, println)
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,11 @@

package za.co.absa.kafkacase.examples.writer

import com.typesafe.config.Config
import io.circe.Encoder
import za.co.absa.kafkacase.writer.Writer

import java.util.Properties

object WriterWriteOnce {
def apply[T: Encoder](writerProps: Properties, topicName: String, sampleMessageToWrite: T): Unit =
Writer.writeOnce(writerProps, topicName, "sampleKey", sampleMessageToWrite)
def apply[T: Encoder](writerConf: Config, topicName: String, sampleMessageToWrite: T): Unit =
Writer.writeOnce(writerConf, topicName, "sampleKey", sampleMessageToWrite)
}
11 changes: 11 additions & 0 deletions reader/src/main/scala/za/co/absa/kafkacase/reader/Reader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package za.co.absa.kafkacase.reader

import com.typesafe.config.Config
import io.circe.Decoder

import java.util.Properties
Expand All @@ -32,4 +33,14 @@ object Reader {
reader.close()
}
}

def readOnce[T: Decoder](readerConf: Config, topicName: String, work: ((String, Either[String, T])) => Unit): Unit = {
val reader = new ReaderImpl[T](readerConf, topicName, neverEnding = false)
try {
for (item <- reader)
work(item)
} finally {
reader.close()
}
}
}
10 changes: 10 additions & 0 deletions writer/src/main/scala/za/co/absa/kafkacase/writer/Writer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package za.co.absa.kafkacase.writer

import com.typesafe.config.Config
import io.circe.Encoder

import java.util.Properties
Expand All @@ -39,4 +40,13 @@ object Writer {
writer.close()
}
}

def writeOnce[T: Encoder](writerConf: Config, topicName: String, messageKey: String, sampleMessageToWrite: T): Unit = {
val writer = new WriterImpl[T](writerConf, topicName)
try {
writer.write(messageKey, sampleMessageToWrite)
} finally {
writer.close()
}
}
}

0 comments on commit 56ff6ed

Please sign in to comment.