diff --git a/examples/src/main/scala/za/co/absa/kafkacase/examples/reader/ReaderCustomResourceHandling.scala b/examples/src/main/scala/za/co/absa/kafkacase/examples/reader/ReaderCustomResourceHandling.scala index 48bbc0f..13183f7 100644 --- a/examples/src/main/scala/za/co/absa/kafkacase/examples/reader/ReaderCustomResourceHandling.scala +++ b/examples/src/main/scala/za/co/absa/kafkacase/examples/reader/ReaderCustomResourceHandling.scala @@ -23,7 +23,7 @@ import za.co.absa.kafkacase.reader.ReaderImpl object ReaderCustomResourceHandling { def apply[T: Decoder](readerConf: Config, topicName: String): Unit = { - withResource(new ReaderImpl[T](readerConf, topicName, neverEnding = false))(reader => { + withResource(ReaderImpl[T](readerConf, topicName))(reader => { for (item <- reader) println(item) }) diff --git a/examples/src/main/scala/za/co/absa/kafkacase/examples/reader/ReaderManualResourceHandling.scala b/examples/src/main/scala/za/co/absa/kafkacase/examples/reader/ReaderManualResourceHandling.scala index 8cbd112..af381e1 100644 --- a/examples/src/main/scala/za/co/absa/kafkacase/examples/reader/ReaderManualResourceHandling.scala +++ b/examples/src/main/scala/za/co/absa/kafkacase/examples/reader/ReaderManualResourceHandling.scala @@ -22,7 +22,7 @@ import za.co.absa.kafkacase.reader.ReaderImpl object ReaderManualResourceHandling { def apply[T: Decoder](readerConf: Config, topicName: String): Unit = { - val reader = new ReaderImpl[T](readerConf, topicName, neverEnding = false) + val reader = ReaderImpl[T](readerConf, topicName) try { for (item <- reader) println(item) diff --git a/reader/src/main/scala/za/co/absa/kafkacase/reader/Reader.scala b/reader/src/main/scala/za/co/absa/kafkacase/reader/Reader.scala index 5b5f5fb..d14f5ab 100644 --- a/reader/src/main/scala/za/co/absa/kafkacase/reader/Reader.scala +++ b/reader/src/main/scala/za/co/absa/kafkacase/reader/Reader.scala @@ -25,7 +25,7 @@ trait Reader[TType] extends Iterator[(String, Either[String, TType])] with AutoC object Reader { def readOnce[T: Decoder](readerProps: Properties, topicName: String, work: ((String, Either[String, T])) => Unit): Unit = { - val reader = new ReaderImpl[T](readerProps, topicName, neverEnding = false) + val reader = ReaderImpl[T](readerProps, topicName) try { for (item <- reader) work(item) @@ -35,7 +35,7 @@ object Reader { } def readOnce[T: Decoder](readerConf: Config, topicName: String, work: ((String, Either[String, T])) => Unit): Unit = { - val reader = new ReaderImpl[T](readerConf, topicName, neverEnding = false) + val reader = ReaderImpl[T](readerConf, topicName) try { for (item <- reader) work(item) diff --git a/reader/src/main/scala/za/co/absa/kafkacase/reader/ReaderImpl.scala b/reader/src/main/scala/za/co/absa/kafkacase/reader/ReaderImpl.scala index dd9eca1..3471b9e 100644 --- a/reader/src/main/scala/za/co/absa/kafkacase/reader/ReaderImpl.scala +++ b/reader/src/main/scala/za/co/absa/kafkacase/reader/ReaderImpl.scala @@ -28,7 +28,6 @@ import java.util import java.util.Properties class ReaderImpl[TType: Decoder](props: Properties, topic: String, timeout: Duration, neverEnding: Boolean) extends Reader[TType] { - private val consumer = new KafkaConsumer[String, String](props) consumer.subscribe(util.Arrays.asList(topic)) private var singlePollIterator = fetchNextBatch() @@ -62,39 +61,34 @@ class ReaderImpl[TType: Decoder](props: Properties, topic: String, timeout: Dura object ReaderImpl { private val DEFAULT_TIMEOUT: Duration = Duration.ofSeconds(3) + private val DEFAULT_NEVER_ENDING: Boolean = true private val log = LoggerFactory.getLogger(this.getClass) -private val DEFAULT_NEVERENDING: Boolean = true -// note: scala can't handle default parameters together with overloading.... hence slightly exponential number of auxiliary constructors -// Primary method that contains default arguments - def apply[TType: Decoder](props: Properties, topic: String, timeout: Duration = DEFAULT_TIMEOUT, neverEnding: Boolean = true): ReaderImpl[TType] = { + // note: scala can't handle default parameters together with overloading.... hence slightly exponential number of auxiliary apply methods + // Primary method that contains default arguments + def apply[TType: Decoder](props: Properties, topic: String, timeout: Duration = DEFAULT_TIMEOUT, neverEnding: Boolean = DEFAULT_NEVER_ENDING): ReaderImpl[TType] = { new ReaderImpl[TType](props, topic, timeout, neverEnding) } - // Overloaded method without timeout - def apply[TType: Decoder](props: Properties, topic: String, neverEnding: Boolean): ReaderImpl[TType] = { - apply[TType](props, topic, DEFAULT_TIMEOUT, neverEnding) - } - - // Overloaded method without timeout and neverEnding - def apply[TType: Decoder](props: Properties, topic: String): ReaderImpl[TType] = { - apply[TType](props, topic, DEFAULT_TIMEOUT, DEFAULT_NEVERENDING) - } - - // Overloaded method with Config (converts Config to Properties) - def apply[TType: Decoder](config: Config, topic: String, timeout: Duration = DEFAULT_TIMEOUT, neverEnding: Boolean = true): ReaderImpl[TType] = { + // Overloaded method with Config and all optional arguments + def apply[TType: Decoder](config: Config, topic: String, timeout: Duration, neverEnding: Boolean): ReaderImpl[TType] = { val props = convertConfigToProperties(config) apply[TType](props, topic, timeout, neverEnding) } - // Overloaded method with Config and neverEnding + // Overloaded method with Config and neverEnding optional argument def apply[TType: Decoder](config: Config, topic: String, neverEnding: Boolean): ReaderImpl[TType] = { apply[TType](config, topic, DEFAULT_TIMEOUT, neverEnding) } - // Overloaded method with Config only + // Overloaded method with Config and timeout optional argument + def apply[TType: Decoder](config: Config, topic: String, timeout: Duration): ReaderImpl[TType] = { + apply[TType](config, topic, timeout, DEFAULT_NEVER_ENDING) + } + + // Overloaded method with Config and none of optional arguments def apply[TType: Decoder](config: Config, topic: String): ReaderImpl[TType] = { - apply[TType](config, topic, DEFAULT_TIMEOUT, DEFAULT_NEVERENDING) + apply[TType](config, topic, DEFAULT_TIMEOUT, DEFAULT_NEVER_ENDING) } private def convertConfigToProperties(config: Config): Properties = {