Skip to content

Commit

Permalink
fix after replacing auxiliary constructors for apply method
Browse files Browse the repository at this point in the history
  • Loading branch information
ABMC831 committed Oct 24, 2024
1 parent 786bab2 commit 3246624
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import za.co.absa.kafkacase.reader.ReaderImpl

object ReaderCustomResourceHandling {
def apply[T: Decoder](readerConf: Config, topicName: String): Unit = {
withResource(new ReaderImpl[T](readerConf, topicName, neverEnding = false))(reader => {
withResource(ReaderImpl[T](readerConf, topicName))(reader => {
for (item <- reader)
println(item)
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import za.co.absa.kafkacase.reader.ReaderImpl

object ReaderManualResourceHandling {
def apply[T: Decoder](readerConf: Config, topicName: String): Unit = {
val reader = new ReaderImpl[T](readerConf, topicName, neverEnding = false)
val reader = ReaderImpl[T](readerConf, topicName)
try {
for (item <- reader)
println(item)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ trait Reader[TType] extends Iterator[(String, Either[String, TType])] with AutoC

object Reader {
def readOnce[T: Decoder](readerProps: Properties, topicName: String, work: ((String, Either[String, T])) => Unit): Unit = {
val reader = new ReaderImpl[T](readerProps, topicName, neverEnding = false)
val reader = ReaderImpl[T](readerProps, topicName)
try {
for (item <- reader)
work(item)
Expand All @@ -35,7 +35,7 @@ object Reader {
}

def readOnce[T: Decoder](readerConf: Config, topicName: String, work: ((String, Either[String, T])) => Unit): Unit = {
val reader = new ReaderImpl[T](readerConf, topicName, neverEnding = false)
val reader = ReaderImpl[T](readerConf, topicName)
try {
for (item <- reader)
work(item)
Expand Down
34 changes: 14 additions & 20 deletions reader/src/main/scala/za/co/absa/kafkacase/reader/ReaderImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import java.util
import java.util.Properties

class ReaderImpl[TType: Decoder](props: Properties, topic: String, timeout: Duration, neverEnding: Boolean) extends Reader[TType] {

private val consumer = new KafkaConsumer[String, String](props)
consumer.subscribe(util.Arrays.asList(topic))
private var singlePollIterator = fetchNextBatch()
Expand Down Expand Up @@ -62,39 +61,34 @@ class ReaderImpl[TType: Decoder](props: Properties, topic: String, timeout: Dura

object ReaderImpl {
private val DEFAULT_TIMEOUT: Duration = Duration.ofSeconds(3)
private val DEFAULT_NEVER_ENDING: Boolean = true
private val log = LoggerFactory.getLogger(this.getClass)
private val DEFAULT_NEVERENDING: Boolean = true

// note: scala can't handle default parameters together with overloading.... hence slightly exponential number of auxiliary constructors
// Primary method that contains default arguments
def apply[TType: Decoder](props: Properties, topic: String, timeout: Duration = DEFAULT_TIMEOUT, neverEnding: Boolean = true): ReaderImpl[TType] = {
// note: scala can't handle default parameters together with overloading.... hence slightly exponential number of auxiliary apply methods
// Primary method that contains default arguments
def apply[TType: Decoder](props: Properties, topic: String, timeout: Duration = DEFAULT_TIMEOUT, neverEnding: Boolean = DEFAULT_NEVER_ENDING): ReaderImpl[TType] = {
new ReaderImpl[TType](props, topic, timeout, neverEnding)
}

// Overloaded method without timeout
def apply[TType: Decoder](props: Properties, topic: String, neverEnding: Boolean): ReaderImpl[TType] = {
apply[TType](props, topic, DEFAULT_TIMEOUT, neverEnding)
}

// Overloaded method without timeout and neverEnding
def apply[TType: Decoder](props: Properties, topic: String): ReaderImpl[TType] = {
apply[TType](props, topic, DEFAULT_TIMEOUT, DEFAULT_NEVERENDING)
}

// Overloaded method with Config (converts Config to Properties)
def apply[TType: Decoder](config: Config, topic: String, timeout: Duration = DEFAULT_TIMEOUT, neverEnding: Boolean = true): ReaderImpl[TType] = {
// Overloaded method with Config and all optional arguments
def apply[TType: Decoder](config: Config, topic: String, timeout: Duration, neverEnding: Boolean): ReaderImpl[TType] = {
val props = convertConfigToProperties(config)
apply[TType](props, topic, timeout, neverEnding)
}

// Overloaded method with Config and neverEnding
// Overloaded method with Config and neverEnding optional argument
def apply[TType: Decoder](config: Config, topic: String, neverEnding: Boolean): ReaderImpl[TType] = {
apply[TType](config, topic, DEFAULT_TIMEOUT, neverEnding)
}

// Overloaded method with Config only
// Overloaded method with Config and timeout optional argument
def apply[TType: Decoder](config: Config, topic: String, timeout: Duration): ReaderImpl[TType] = {
apply[TType](config, topic, timeout, DEFAULT_NEVER_ENDING)
}

// Overloaded method with Config and none of optional arguments
def apply[TType: Decoder](config: Config, topic: String): ReaderImpl[TType] = {
apply[TType](config, topic, DEFAULT_TIMEOUT, DEFAULT_NEVERENDING)
apply[TType](config, topic, DEFAULT_TIMEOUT, DEFAULT_NEVER_ENDING)
}

private def convertConfigToProperties(config: Config): Properties = {
Expand Down

0 comments on commit 3246624

Please sign in to comment.