From 9fad37a83c772005225a4a4042121a6de1f9b6f5 Mon Sep 17 00:00:00 2001 From: David Sloan Date: Tue, 7 Nov 2023 19:48:00 +0000 Subject: [PATCH] Post rebase edits --- .../connect/aws/s3/sink/S3SinkTask.scala | 3 +-- .../connect/aws/s3/source/S3SourceTask.scala | 4 +-- .../aws/s3/source/config/S3SourceConfig.scala | 3 +-- .../aws/s3/source/state/S3SourceBuilder.scala | 3 +-- .../config/S3SinkConfigDefBuilderTest.scala | 27 +++++++++---------- .../aws/s3/sink/config/S3SinkConfigTest.scala | 14 +++++----- .../s3/source/config/S3SourceConfigTest.scala | 4 +-- .../cloud/common/sink/CloudSinkTask.scala | 5 ++-- .../common/config/ConnectorTaskIdTest.scala | 26 +++++++++--------- .../sink/config/TestConfigDefBuilder.scala | 8 ++---- project/Dependencies.scala | 10 +++---- 11 files changed, 47 insertions(+), 60 deletions(-) diff --git a/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3SinkTask.scala b/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3SinkTask.scala index ccb2c87972..132c411c43 100644 --- a/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3SinkTask.scala +++ b/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/sink/S3SinkTask.scala @@ -27,7 +27,6 @@ import io.lenses.streamreactor.connect.aws.s3.storage.S3FileMetadata import io.lenses.streamreactor.connect.cloud.common.sink.CloudSinkTask import io.lenses.streamreactor.connect.cloud.common.sink.writer.WriterManager -import scala.jdk.CollectionConverters.MapHasAsJava import scala.util.Try object S3SinkTask {} @@ -43,7 +42,7 @@ class S3SinkTask def createWriterMan(props: Map[String, String]): Either[Throwable, WriterManager[S3FileMetadata]] = for { - config <- S3SinkConfig.fromProps(props.asJava) + config <- S3SinkConfig.fromProps(props) s3Client <- AwsS3ClientCreator.make(config.s3Config) storageInterface = new AwsS3StorageInterface(connectorTaskId, s3Client, config.batchDelete) _ <- Try(setErrorRetryInterval(config.s3Config)).toEither diff --git a/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/source/S3SourceTask.scala b/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/source/S3SourceTask.scala index 3746532df8..3732f8622e 100644 --- a/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/source/S3SourceTask.scala +++ b/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/source/S3SourceTask.scala @@ -64,9 +64,9 @@ class S3SourceTask extends SourceTask with LazyLogging { logger.debug(s"Received call to S3SourceTask.start with ${props.size()} properties") val contextProperties = Option(context).flatMap(c => Option(c.configs()).map(_.asScala.toMap)).getOrElse(Map.empty) - val mergedProperties = MapUtils.mergeProps(contextProperties, props.asScala.toMap).asJava + val mergedProperties = MapUtils.mergeProps(contextProperties, props.asScala.toMap) (for { - result <- S3SourceState.make(mergedProperties.asScala.toMap, contextOffsetFn) + result <- S3SourceState.make(mergedProperties, contextOffsetFn) fiber <- result.partitionDiscoveryLoop.start } yield { s3SourceTaskState = result.state.some diff --git a/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/source/config/S3SourceConfig.scala b/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/source/config/S3SourceConfig.scala index 62e4107c2c..850f48b6e8 100644 --- a/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/source/config/S3SourceConfig.scala +++ b/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/source/config/S3SourceConfig.scala @@ -40,13 +40,12 @@ import io.lenses.streamreactor.connect.cloud.common.storage.ListOfKeysResponse import io.lenses.streamreactor.connect.cloud.common.storage.StorageInterface import io.lenses.streamreactor.connect.config.kcqlprops.KcqlProperties -import java.util import scala.util.Try object S3SourceConfig { def fromProps( - props: util.Map[String, String], + props: Map[String, String], ): Either[Throwable, S3SourceConfig] = S3SourceConfig(S3SourceConfigDefBuilder(props)) diff --git a/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/source/state/S3SourceBuilder.scala b/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/source/state/S3SourceBuilder.scala index bdd2e99bc5..20469e9c9d 100644 --- a/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/source/state/S3SourceBuilder.scala +++ b/kafka-connect-aws-s3/src/main/scala/io/lenses/streamreactor/connect/aws/s3/source/state/S3SourceBuilder.scala @@ -31,12 +31,11 @@ import io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager import io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManagerState import io.lenses.streamreactor.connect.cloud.common.source.state.CloudSourceTaskState -import java.util import scala.jdk.CollectionConverters.IteratorHasAsScala object S3SourceState extends StrictLogging { def make( - props: util.Map[String, String], + props: Map[String, String], contextOffsetFn: CloudLocation => Option[CloudLocation], )( implicit diff --git a/kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/sink/config/S3SinkConfigDefBuilderTest.scala b/kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/sink/config/S3SinkConfigDefBuilderTest.scala index 29bc0b6fff..8e18741398 100644 --- a/kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/sink/config/S3SinkConfigDefBuilderTest.scala +++ b/kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/sink/config/S3SinkConfigDefBuilderTest.scala @@ -30,7 +30,6 @@ import org.scalatest.matchers.should.Matchers import scala.concurrent.duration.DurationInt import scala.jdk.CollectionConverters.IteratorHasAsScala -import scala.jdk.CollectionConverters.MapHasAsJava class S3SinkConfigDefBuilderTest extends AnyFlatSpec with MockitoSugar with Matchers with EitherValues { @@ -46,7 +45,7 @@ class S3SinkConfigDefBuilderTest extends AnyFlatSpec with MockitoSugar with Matc "connect.s3.kcql" -> s"insert into $BucketName:$PrefixName select * from $TopicName PARTITIONBY _key STOREAS `CSV` WITHPARTITIONER=Values WITH_FLUSH_COUNT = 1", ) - val kcql = SinkConfigDefBuilder(props.asJava).getKCQL + val kcql = SinkConfigDefBuilder(props).getKCQL kcql should have size 1 val element = kcql.head @@ -63,7 +62,7 @@ class S3SinkConfigDefBuilderTest extends AnyFlatSpec with MockitoSugar with Matc "connect.s3.kcql" -> s"insert into mybucket:myprefix select * from $TopicName PARTITIONBY _key STOREAS CSV WITHPARTITIONER=Values WITH_FLUSH_COUNT = 1", ) - SinkBucketOptions(SinkConfigDefBuilder(props.asJava)) match { + SinkBucketOptions(SinkConfigDefBuilder(props)) match { case Left(value) => fail(value.toString) case Right(value) => value.map(_.dataStorage) should be(List(DataStorageSettings.Default)) } @@ -74,7 +73,7 @@ class S3SinkConfigDefBuilderTest extends AnyFlatSpec with MockitoSugar with Matc "connect.s3.kcql" -> s"insert into mybucket:myprefix select * from $TopicName PARTITIONBY _key STOREAS `JSON` WITHPARTITIONER=Values WITH_FLUSH_COUNT = 1 PROPERTIES('${DataStorageSettings.StoreEnvelopeKey}'=true)", ) - SinkBucketOptions(SinkConfigDefBuilder(props.asJava)) match { + SinkBucketOptions(SinkConfigDefBuilder(props)) match { case Left(value) => fail(value.toString) case Right(value) => value.map(_.dataStorage) should be(List(DataStorageSettings.enabled)) } @@ -85,7 +84,7 @@ class S3SinkConfigDefBuilderTest extends AnyFlatSpec with MockitoSugar with Matc "connect.s3.kcql" -> s"insert into mybucket:myprefix select * from $TopicName PARTITIONBY _key STOREAS `PARQUET` WITHPARTITIONER=Values WITH_FLUSH_COUNT = 1 PROPERTIES('${DataStorageSettings.StoreEnvelopeKey}'=true, '${DataStorageSettings.StoreKeyKey}'=true, '${DataStorageSettings.StoreValueKey}'=true, '${DataStorageSettings.StoreMetadataKey}'=false, '${DataStorageSettings.StoreHeadersKey}'=false)", ) - SinkBucketOptions(SinkConfigDefBuilder(props.asJava)) match { + SinkBucketOptions(SinkConfigDefBuilder(props)) match { case Left(value) => fail(value.toString) case Right(value) => value.map(_.dataStorage) should be(List(DataStorageSettings(true, true, true, false, false))) @@ -114,7 +113,7 @@ class S3SinkConfigDefBuilderTest extends AnyFlatSpec with MockitoSugar with Matc |""".stripMargin, ) - SinkBucketOptions(SinkConfigDefBuilder(props.asJava)) match { + SinkBucketOptions(SinkConfigDefBuilder(props)) match { case Left(value) => fail(value.toString) case Right(value) => value.map(_.dataStorage) should be(List(DataStorageSettings(true, true, true, false, false), @@ -129,7 +128,7 @@ class S3SinkConfigDefBuilderTest extends AnyFlatSpec with MockitoSugar with Matc ) val commitPolicy = - SinkConfigDefBuilder(props.asJava).commitPolicy(SinkConfigDefBuilder(props.asJava).getKCQL.head) + SinkConfigDefBuilder(props).commitPolicy(SinkConfigDefBuilder(props).getKCQL.head) commitPolicy.conditions should be( Seq( @@ -147,7 +146,7 @@ class S3SinkConfigDefBuilderTest extends AnyFlatSpec with MockitoSugar with Matc ) val commitPolicy = - SinkConfigDefBuilder(props.asJava).commitPolicy(SinkConfigDefBuilder(props.asJava).getKCQL.head) + SinkConfigDefBuilder(props).commitPolicy(SinkConfigDefBuilder(props).getKCQL.head) commitPolicy.conditions should be( Seq( @@ -163,7 +162,7 @@ class S3SinkConfigDefBuilderTest extends AnyFlatSpec with MockitoSugar with Matc ) val commitPolicy = - SinkConfigDefBuilder(props.asJava).commitPolicy(SinkConfigDefBuilder(props.asJava).getKCQL.head) + SinkConfigDefBuilder(props).commitPolicy(SinkConfigDefBuilder(props).getKCQL.head) commitPolicy.conditions should be( Seq( @@ -179,7 +178,7 @@ class S3SinkConfigDefBuilderTest extends AnyFlatSpec with MockitoSugar with Matc "connect.s3.kcql" -> s"insert into $BucketName:$PrefixName select * from $TopicName BATCH = 150 STOREAS `CSV` LIMIT 550", ) - val kcql = SinkConfigDefBuilder(props.asJava).getKCQL + val kcql = SinkConfigDefBuilder(props).getKCQL kcql.head.getBatchSize should be(150) kcql.head.getLimit should be(550) @@ -190,7 +189,7 @@ class S3SinkConfigDefBuilderTest extends AnyFlatSpec with MockitoSugar with Matc "connect.s3.kcql" -> s"insert into $BucketName:$PrefixName select * from $TopicName STOREAS `JSON` WITH_FLUSH_COUNT = 1 PROPERTIES('${DataStorageSettings.StoreEnvelopeKey}'=true, '${DataStorageSettings.StoreKeyKey}'=true, '${DataStorageSettings.StoreValueKey}'=true, '${DataStorageSettings.StoreMetadataKey}'=false, '${DataStorageSettings.StoreHeadersKey}'=false)", ) - SinkBucketOptions(SinkConfigDefBuilder(props.asJava)) match { + SinkBucketOptions(SinkConfigDefBuilder(props)) match { case Left(value) => fail(value.toString) case Right(value) => value.map(_.dataStorage) should be(List(DataStorageSettings(envelope = true, @@ -207,7 +206,7 @@ class S3SinkConfigDefBuilderTest extends AnyFlatSpec with MockitoSugar with Matc "connect.s3.kcql" -> s"insert into $BucketName:$PrefixName select * from $TopicName STOREAS `JSON` WITH_FLUSH_COUNT = 1 PROPERTIES('${DataStorageSettings.StoreEnvelopeKey}'=true, '${DataStorageSettings.StoreKeyKey}'=true, '${DataStorageSettings.StoreValueKey}'=true, '${DataStorageSettings.StoreMetadataKey}'=false, '${DataStorageSettings.StoreHeadersKey}'=false)", ) - SinkBucketOptions(SinkConfigDefBuilder(props.asJava)) match { + SinkBucketOptions(SinkConfigDefBuilder(props)) match { case Left(value) => fail(value.toString) case Right(value) => value.map(_.dataStorage) should be(List(DataStorageSettings(envelope = true, @@ -224,7 +223,7 @@ class S3SinkConfigDefBuilderTest extends AnyFlatSpec with MockitoSugar with Matc "connect.s3.kcql" -> s"insert into $BucketName:$PrefixName select * from $TopicName STOREAS `BYTES_VALUEONLY` WITH_FLUSH_COUNT = 1", ) - SinkBucketOptions(SinkConfigDefBuilder(props.asJava)).left.value.getMessage should startWith( + SinkBucketOptions(SinkConfigDefBuilder(props)).left.value.getMessage should startWith( "Unsupported format - BYTES_VALUEONLY. Please note", ) } @@ -234,7 +233,7 @@ class S3SinkConfigDefBuilderTest extends AnyFlatSpec with MockitoSugar with Matc "connect.s3.kcql" -> s"insert into $BucketName:$PrefixName select * from $TopicName STOREAS `BYTES` WITH_FLUSH_COUNT = 3", ) - SinkBucketOptions(SinkConfigDefBuilder(props.asJava)).left.value.getMessage should startWith( + SinkBucketOptions(SinkConfigDefBuilder(props)).left.value.getMessage should startWith( "FLUSH_COUNT > 1 is not allowed for BYTES", ) } diff --git a/kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/sink/config/S3SinkConfigTest.scala b/kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/sink/config/S3SinkConfigTest.scala index 19de40690a..0a4de7c964 100644 --- a/kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/sink/config/S3SinkConfigTest.scala +++ b/kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/sink/config/S3SinkConfigTest.scala @@ -22,8 +22,6 @@ import io.lenses.streamreactor.connect.cloud.common.model.location.CloudLocation import org.scalatest.funsuite.AnyFunSuite import org.scalatest.matchers.should.Matchers -import scala.jdk.CollectionConverters.MapHasAsJava - class S3SinkConfigTest extends AnyFunSuite with Matchers { private implicit val connectorTaskId = ConnectorTaskId("connector", 1, 0) private implicit val cloudLocationValidator: CloudLocationValidator = S3LocationValidator @@ -32,7 +30,7 @@ class S3SinkConfigTest extends AnyFunSuite with Matchers { "connect.s3.kcql" -> s"insert into mybucket:myprefix select * from TopicName PARTITIONBY _key STOREAS `CSV` WITHPARTITIONER=Values WITH_FLUSH_COUNT = 1 PROPERTIES('${DataStorageSettings.StoreEnvelopeKey}'=true)", ) - SinkBucketOptions(SinkConfigDefBuilder(props.asJava)) match { + SinkBucketOptions(SinkConfigDefBuilder(props)) match { case Left(value) => value.getMessage shouldBe "Envelope is not supported for format CSV." case Right(_) => fail("Should fail since envelope and CSV storage is not allowed") } @@ -43,7 +41,7 @@ class S3SinkConfigTest extends AnyFunSuite with Matchers { "connect.s3.kcql" -> s"insert into mybucket:myprefix select * from TopicName PARTITIONBY _key STOREAS `Parquet` WITHPARTITIONER=Values WITH_FLUSH_COUNT = 1 PROPERTIES('${DataStorageSettings.StoreEnvelopeKey}'=true)", ) - SinkBucketOptions(SinkConfigDefBuilder(props.asJava)) match { + SinkBucketOptions(SinkConfigDefBuilder(props)) match { case Left(error) => fail("Should not fail since envelope and Parquet storage is allowed", error) case Right(_) => succeed } @@ -53,7 +51,7 @@ class S3SinkConfigTest extends AnyFunSuite with Matchers { "connect.s3.kcql" -> s"insert into mybucket:myprefix select * from TopicName PARTITIONBY _key STOREAS `Avro` WITHPARTITIONER=Values WITH_FLUSH_COUNT = 1 PROPERTIES('${DataStorageSettings.StoreEnvelopeKey}'=true)", ) - SinkBucketOptions(SinkConfigDefBuilder(props.asJava)) match { + SinkBucketOptions(SinkConfigDefBuilder(props)) match { case Left(error) => fail("Should not fail since envelope and Avro storage is allowed", error) case Right(_) => succeed } @@ -63,7 +61,7 @@ class S3SinkConfigTest extends AnyFunSuite with Matchers { "connect.s3.kcql" -> s"insert into mybucket:myprefix select * from TopicName PARTITIONBY _key STOREAS `Json` WITHPARTITIONER=Values WITH_FLUSH_COUNT = 1 PROPERTIES('${DataStorageSettings.StoreEnvelopeKey}'=true)", ) - SinkBucketOptions(SinkConfigDefBuilder(props.asJava)) match { + SinkBucketOptions(SinkConfigDefBuilder(props)) match { case Left(error) => fail("Should not fail since envelope and Json storage is allowed", error) case Right(_) => succeed } @@ -73,7 +71,7 @@ class S3SinkConfigTest extends AnyFunSuite with Matchers { "connect.s3.kcql" -> s"insert into mybucket:myprefix select * from TopicName PARTITIONBY _key STOREAS `Text` WITHPARTITIONER=Values WITH_FLUSH_COUNT = 1 PROPERTIES('${DataStorageSettings.StoreEnvelopeKey}'=true)", ) - SinkBucketOptions(SinkConfigDefBuilder(props.asJava)) match { + SinkBucketOptions(SinkConfigDefBuilder(props)) match { case Left(value) => value.getMessage shouldBe "Envelope is not supported for format TEXT." case Right(_) => fail("Should fail since text and envelope storage is not allowed") } @@ -83,7 +81,7 @@ class S3SinkConfigTest extends AnyFunSuite with Matchers { "connect.s3.kcql" -> s"insert into mybucket:myprefix select * from TopicName PARTITIONBY _key STOREAS `Bytes` WITHPARTITIONER=Values WITH_FLUSH_COUNT = 1 PROPERTIES('${DataStorageSettings.StoreEnvelopeKey}'=true)", ) - SinkBucketOptions(SinkConfigDefBuilder(props.asJava)) match { + SinkBucketOptions(SinkConfigDefBuilder(props)) match { case Left(value) => value.getMessage shouldBe "Envelope is not supported for format BYTES." case Right(_) => fail("Should fail since envelope and bytes storage is not allowed") } diff --git a/kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/source/config/S3SourceConfigTest.scala b/kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/source/config/S3SourceConfigTest.scala index 7709f215f5..4c25bfe459 100644 --- a/kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/source/config/S3SourceConfigTest.scala +++ b/kafka-connect-aws-s3/src/test/scala/io/lenses/streamreactor/connect/aws/s3/source/config/S3SourceConfigTest.scala @@ -21,8 +21,6 @@ import io.lenses.streamreactor.connect.cloud.common.config.TaskIndexKey import org.scalatest.funsuite.AnyFunSuite import org.scalatest.matchers.should.Matchers -import scala.jdk.CollectionConverters.MapHasAsJava - class S3SourceConfigTest extends AnyFunSuite with Matchers with TaskIndexKey with SourcePartitionSearcherSettingsKeys { private val Identity: String = "identity" private val Credential: String = "credential" @@ -55,7 +53,7 @@ class S3SourceConfigTest extends AnyFunSuite with Matchers with TaskIndexKey wit "connect.s3.partition.search.recurse.levels" -> "0", ) - S3SourceConfig(S3SourceConfigDefBuilder(props.asJava)) match { + S3SourceConfig(S3SourceConfigDefBuilder(props)) match { case Left(value) => fail(value.toString) case Right(config) => config.bucketOptions.size shouldBe 3 diff --git a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/CloudSinkTask.scala b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/CloudSinkTask.scala index c0c1efd712..c7455bfdb1 100644 --- a/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/CloudSinkTask.scala +++ b/kafka-connect-cloud-common/src/main/scala/io/lenses/streamreactor/connect/cloud/common/sink/CloudSinkTask.scala @@ -62,7 +62,8 @@ abstract class CloudSinkTask[SM <: FileMetadata]( printAsciiHeader(manifest, sinkAsciiArtResource) - new ConnectorTaskIdCreator(connectorPrefix).fromProps(fallbackProps) match { + val scalaFallbackProps = fallbackProps.asScala.toMap + new ConnectorTaskIdCreator(connectorPrefix).fromProps(scalaFallbackProps) match { case Left(value) => throw new IllegalArgumentException(value) case Right(value) => connectorTaskId = value } @@ -70,7 +71,7 @@ abstract class CloudSinkTask[SM <: FileMetadata]( logger.debug(s"[{}] CloudSinkTask.start", connectorTaskId.show) val contextProps = Option(context).flatMap(c => Option(c.configs())).map(_.asScala.toMap).getOrElse(Map.empty) - val props = MapUtils.mergeProps(contextProps, fallbackProps.asScala.toMap) + val props = MapUtils.mergeProps(contextProps, scalaFallbackProps) val errOrWriterMan = createWriterMan(props) errOrWriterMan.leftMap(throw _).foreach(writerManager = _) diff --git a/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/config/ConnectorTaskIdTest.scala b/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/config/ConnectorTaskIdTest.scala index 45b37714e4..30675843fc 100644 --- a/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/config/ConnectorTaskIdTest.scala +++ b/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/config/ConnectorTaskIdTest.scala @@ -19,21 +19,19 @@ import cats.implicits.catsSyntaxEitherId import io.lenses.streamreactor.connect.cloud.common.source.config.distribution.PartitionHasher import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpec - -import scala.jdk.CollectionConverters._ class ConnectorTaskIdTest extends AnyWordSpec with Matchers with TaskIndexKey { private val connectorName = "connectorName" "ConnectorTaskId" should { "create the instance" in { val from = Map("a" -> "1", "b" -> "2", TASK_INDEX -> "0:2", "name" -> connectorName) - new ConnectorTaskIdCreator(connectorPrefix).fromProps(from.asJava) shouldBe ConnectorTaskId(connectorName, + new ConnectorTaskIdCreator(connectorPrefix).fromProps(from) shouldBe ConnectorTaskId(connectorName, 2, 0, ).asRight[String] } "fail if max tasks is not valid integer" in { val from = Map("a" -> "1", "b" -> "2", TASK_INDEX -> "0:2a", "name" -> connectorName) - val actual = new ConnectorTaskIdCreator(connectorPrefix).fromProps(from.asJava) + val actual = new ConnectorTaskIdCreator(connectorPrefix).fromProps(from) actual match { case Left(e) => e.getMessage shouldBe s"Invalid $TASK_INDEX. Expecting an integer but found:2a" case Right(_) => fail("Should have failed") @@ -41,14 +39,14 @@ class ConnectorTaskIdTest extends AnyWordSpec with Matchers with TaskIndexKey { } "fail if task number is not a valid integer" in { val from = Map("a" -> "1", "b" -> "2", TASK_INDEX -> "0a:2", "name" -> connectorName) - new ConnectorTaskIdCreator(connectorPrefix).fromProps(from.asJava) match { + new ConnectorTaskIdCreator(connectorPrefix).fromProps(from) match { case Left(value) => value.getMessage shouldBe s"Invalid $TASK_INDEX. Expecting an integer but found:0a" case Right(_) => fail("Should have failed") } } "fail if task number < 0" in { val from = Map("a" -> "1", "b" -> "2", TASK_INDEX -> "-1:2", "name" -> connectorName) - new ConnectorTaskIdCreator(connectorPrefix).fromProps(from.asJava) match { + new ConnectorTaskIdCreator(connectorPrefix).fromProps(from) match { case Left(value) => value.getMessage shouldBe s"Invalid $TASK_INDEX. Expecting a positive integer but found:-1" case Right(value) => fail(s"Should have failed but got $value") } @@ -56,14 +54,14 @@ class ConnectorTaskIdTest extends AnyWordSpec with Matchers with TaskIndexKey { } "fail if max tasks is zero" in { val from = Map("a" -> "1", "b" -> "2", TASK_INDEX -> "0:0", "name" -> connectorName) - new ConnectorTaskIdCreator(connectorPrefix).fromProps(from.asJava) match { + new ConnectorTaskIdCreator(connectorPrefix).fromProps(from) match { case Left(value) => value.getMessage shouldBe s"Invalid $TASK_INDEX. Expecting a positive integer but found:0" case Right(value) => fail(s"Should have failed but got $value") } } "fail if max tasks is negative" in { val from = Map("a" -> "1", "b" -> "2", TASK_INDEX -> "0:-1", "name" -> connectorName) - new ConnectorTaskIdCreator(connectorPrefix).fromProps(from.asJava) match { + new ConnectorTaskIdCreator(connectorPrefix).fromProps(from) match { case Left(value) => value.getMessage shouldBe s"Invalid $TASK_INDEX. Expecting a positive integer but found:-1" case Right(value) => fail(s"Should have failed but got $value") } @@ -72,7 +70,7 @@ class ConnectorTaskIdTest extends AnyWordSpec with Matchers with TaskIndexKey { "own the partitions when max task is 1" in { val from = Map("a" -> "1", "b" -> "2", TASK_INDEX -> "0:1", "name" -> connectorName) val actual = - new ConnectorTaskIdCreator(connectorPrefix).fromProps(from.asJava).getOrElse(fail("Should be valid")) + new ConnectorTaskIdCreator(connectorPrefix).fromProps(from).getOrElse(fail("Should be valid")) Seq("/myTopic/", "/anotherTopic/", "/thirdTopic/") .flatMap { value => @@ -89,12 +87,12 @@ class ConnectorTaskIdTest extends AnyWordSpec with Matchers with TaskIndexKey { "b" -> "2", TASK_INDEX -> "0:2", "name" -> connectorName, - ).asJava).getOrElse(fail("Should be valid")) + )).getOrElse(fail("Should be valid")) val two = new ConnectorTaskIdCreator(connectorPrefix).fromProps(Map("a" -> "1", "b" -> "2", TASK_INDEX -> "1:2", "name" -> connectorName, - ).asJava).getOrElse(fail("Should be valid")) + )).getOrElse(fail("Should be valid")) PartitionHasher.hash(2, "1") shouldBe 1 one.ownsDir("1") shouldBe false @@ -111,17 +109,17 @@ class ConnectorTaskIdTest extends AnyWordSpec with Matchers with TaskIndexKey { "b" -> "2", TASK_INDEX -> "0:3", "name" -> connectorName, - ).asJava).getOrElse(fail("Should be valid")) + )).getOrElse(fail("Should be valid")) val two = new ConnectorTaskIdCreator(connectorPrefix).fromProps(Map("a" -> "1", "b" -> "2", TASK_INDEX -> "1:3", "name" -> connectorName, - ).asJava).getOrElse(fail("Should be valid")) + )).getOrElse(fail("Should be valid")) val three = new ConnectorTaskIdCreator(connectorPrefix).fromProps(Map("a" -> "1", "b" -> "2", TASK_INDEX -> "2:3", "name" -> connectorName, - ).asJava).getOrElse(fail("Should be valid")) + )).getOrElse(fail("Should be valid")) PartitionHasher.hash(3, "1") shouldBe 1 one.ownsDir("1") shouldBe false diff --git a/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/sink/config/TestConfigDefBuilder.scala b/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/sink/config/TestConfigDefBuilder.scala index 4bc8da6ef7..8cc6f2e627 100644 --- a/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/sink/config/TestConfigDefBuilder.scala +++ b/kafka-connect-cloud-common/src/test/scala/io/lenses/streamreactor/connect/cloud/common/sink/config/TestConfigDefBuilder.scala @@ -18,15 +18,11 @@ package io.lenses.streamreactor.connect.cloud.common.sink.config import com.datamountaineer.streamreactor.common.config.base.traits.BaseConfig import io.lenses.streamreactor.connect.cloud.common.sink.config.padding.PaddingStrategyConfigKeys import io.lenses.streamreactor.connect.cloud.common.sink.config.padding.PaddingStrategySettings -import io.lenses.streamreactor.connect.cloud.common.sink.config.LocalStagingAreaConfigKeys -import io.lenses.streamreactor.connect.cloud.common.sink.config.LocalStagingAreaSettings import org.apache.kafka.common.config.ConfigDef -import java.util -import scala.jdk.CollectionConverters.MapHasAsJava import scala.jdk.CollectionConverters.MapHasAsScala -case class TestConfigDefBuilder(configDef: ConfigDef, props: util.Map[String, String]) +case class TestConfigDefBuilder(configDef: ConfigDef, props: Map[String, String]) extends BaseConfig("connect.testing", configDef, props) with PaddingStrategySettings with LocalStagingAreaSettings { @@ -51,7 +47,7 @@ object TestConfig { val newMap = map + { "connect.s3.kcql" -> "dummy value" } - TestConfigDefBuilder(defineProps(new ConfigDef()), newMap.asJava) + TestConfigDefBuilder(defineProps(new ConfigDef()), newMap) } } diff --git a/project/Dependencies.scala b/project/Dependencies.scala index e339e2921b..c36b74f455 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -1,6 +1,6 @@ -import Dependencies._ +import Dependencies.* import KafkaVersionAxis.kafkaVersionAxis -import sbt._ +import sbt.* import sbt.librarymanagement.InclExclRule object Dependencies { @@ -80,7 +80,7 @@ object Dependencies { val jerseyCommonVersion = "3.1.1" val calciteVersion = "1.34.0" - val awsSdkVersion = "2.20.153" + val awsSdkVersion = "2.20.153" val azureDataLakeVersion = "12.17.0" val azureIdentityVersion = "1.8.1" @@ -176,7 +176,7 @@ object Dependencies { } } - import Versions._ + import Versions.* // functional libraries val catsEffectKernel = "org.typelevel" %% "cats-effect-kernel" % catsEffectVersion @@ -457,7 +457,7 @@ object Dependencies { trait Dependencies { - import Versions._ + import Versions.* val loggingDeps: Seq[ModuleID] = Seq( "org.apache.logging.log4j" % "log4j-api" % "2.20.0",