Skip to content

Commit

Permalink
Post rebase edits
Browse files Browse the repository at this point in the history
  • Loading branch information
davidsloan committed Nov 8, 2023
1 parent 5f64d48 commit 9fad37a
Show file tree
Hide file tree
Showing 11 changed files with 47 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import io.lenses.streamreactor.connect.aws.s3.storage.S3FileMetadata
import io.lenses.streamreactor.connect.cloud.common.sink.CloudSinkTask
import io.lenses.streamreactor.connect.cloud.common.sink.writer.WriterManager

import scala.jdk.CollectionConverters.MapHasAsJava
import scala.util.Try

object S3SinkTask {}
Expand All @@ -43,7 +42,7 @@ class S3SinkTask

def createWriterMan(props: Map[String, String]): Either[Throwable, WriterManager[S3FileMetadata]] =
for {
config <- S3SinkConfig.fromProps(props.asJava)
config <- S3SinkConfig.fromProps(props)
s3Client <- AwsS3ClientCreator.make(config.s3Config)
storageInterface = new AwsS3StorageInterface(connectorTaskId, s3Client, config.batchDelete)
_ <- Try(setErrorRetryInterval(config.s3Config)).toEither
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@ class S3SourceTask extends SourceTask with LazyLogging {
logger.debug(s"Received call to S3SourceTask.start with ${props.size()} properties")

val contextProperties = Option(context).flatMap(c => Option(c.configs()).map(_.asScala.toMap)).getOrElse(Map.empty)
val mergedProperties = MapUtils.mergeProps(contextProperties, props.asScala.toMap).asJava
val mergedProperties = MapUtils.mergeProps(contextProperties, props.asScala.toMap)
(for {
result <- S3SourceState.make(mergedProperties.asScala.toMap, contextOffsetFn)
result <- S3SourceState.make(mergedProperties, contextOffsetFn)
fiber <- result.partitionDiscoveryLoop.start
} yield {
s3SourceTaskState = result.state.some
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,12 @@ import io.lenses.streamreactor.connect.cloud.common.storage.ListOfKeysResponse
import io.lenses.streamreactor.connect.cloud.common.storage.StorageInterface
import io.lenses.streamreactor.connect.config.kcqlprops.KcqlProperties

import java.util
import scala.util.Try

object S3SourceConfig {

def fromProps(
props: util.Map[String, String],
props: Map[String, String],
): Either[Throwable, S3SourceConfig] =
S3SourceConfig(S3SourceConfigDefBuilder(props))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,11 @@ import io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager
import io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManagerState
import io.lenses.streamreactor.connect.cloud.common.source.state.CloudSourceTaskState

import java.util
import scala.jdk.CollectionConverters.IteratorHasAsScala

object S3SourceState extends StrictLogging {
def make(
props: util.Map[String, String],
props: Map[String, String],
contextOffsetFn: CloudLocation => Option[CloudLocation],
)(
implicit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import org.scalatest.matchers.should.Matchers

import scala.concurrent.duration.DurationInt
import scala.jdk.CollectionConverters.IteratorHasAsScala
import scala.jdk.CollectionConverters.MapHasAsJava

class S3SinkConfigDefBuilderTest extends AnyFlatSpec with MockitoSugar with Matchers with EitherValues {

Expand All @@ -46,7 +45,7 @@ class S3SinkConfigDefBuilderTest extends AnyFlatSpec with MockitoSugar with Matc
"connect.s3.kcql" -> s"insert into $BucketName:$PrefixName select * from $TopicName PARTITIONBY _key STOREAS `CSV` WITHPARTITIONER=Values WITH_FLUSH_COUNT = 1",
)

val kcql = SinkConfigDefBuilder(props.asJava).getKCQL
val kcql = SinkConfigDefBuilder(props).getKCQL
kcql should have size 1

val element = kcql.head
Expand All @@ -63,7 +62,7 @@ class S3SinkConfigDefBuilderTest extends AnyFlatSpec with MockitoSugar with Matc
"connect.s3.kcql" -> s"insert into mybucket:myprefix select * from $TopicName PARTITIONBY _key STOREAS CSV WITHPARTITIONER=Values WITH_FLUSH_COUNT = 1",
)

SinkBucketOptions(SinkConfigDefBuilder(props.asJava)) match {
SinkBucketOptions(SinkConfigDefBuilder(props)) match {
case Left(value) => fail(value.toString)
case Right(value) => value.map(_.dataStorage) should be(List(DataStorageSettings.Default))
}
Expand All @@ -74,7 +73,7 @@ class S3SinkConfigDefBuilderTest extends AnyFlatSpec with MockitoSugar with Matc
"connect.s3.kcql" -> s"insert into mybucket:myprefix select * from $TopicName PARTITIONBY _key STOREAS `JSON` WITHPARTITIONER=Values WITH_FLUSH_COUNT = 1 PROPERTIES('${DataStorageSettings.StoreEnvelopeKey}'=true)",
)

SinkBucketOptions(SinkConfigDefBuilder(props.asJava)) match {
SinkBucketOptions(SinkConfigDefBuilder(props)) match {
case Left(value) => fail(value.toString)
case Right(value) => value.map(_.dataStorage) should be(List(DataStorageSettings.enabled))
}
Expand All @@ -85,7 +84,7 @@ class S3SinkConfigDefBuilderTest extends AnyFlatSpec with MockitoSugar with Matc
"connect.s3.kcql" -> s"insert into mybucket:myprefix select * from $TopicName PARTITIONBY _key STOREAS `PARQUET` WITHPARTITIONER=Values WITH_FLUSH_COUNT = 1 PROPERTIES('${DataStorageSettings.StoreEnvelopeKey}'=true, '${DataStorageSettings.StoreKeyKey}'=true, '${DataStorageSettings.StoreValueKey}'=true, '${DataStorageSettings.StoreMetadataKey}'=false, '${DataStorageSettings.StoreHeadersKey}'=false)",
)

SinkBucketOptions(SinkConfigDefBuilder(props.asJava)) match {
SinkBucketOptions(SinkConfigDefBuilder(props)) match {
case Left(value) => fail(value.toString)
case Right(value) =>
value.map(_.dataStorage) should be(List(DataStorageSettings(true, true, true, false, false)))
Expand Down Expand Up @@ -114,7 +113,7 @@ class S3SinkConfigDefBuilderTest extends AnyFlatSpec with MockitoSugar with Matc
|""".stripMargin,
)

SinkBucketOptions(SinkConfigDefBuilder(props.asJava)) match {
SinkBucketOptions(SinkConfigDefBuilder(props)) match {
case Left(value) => fail(value.toString)
case Right(value) =>
value.map(_.dataStorage) should be(List(DataStorageSettings(true, true, true, false, false),
Expand All @@ -129,7 +128,7 @@ class S3SinkConfigDefBuilderTest extends AnyFlatSpec with MockitoSugar with Matc
)

val commitPolicy =
SinkConfigDefBuilder(props.asJava).commitPolicy(SinkConfigDefBuilder(props.asJava).getKCQL.head)
SinkConfigDefBuilder(props).commitPolicy(SinkConfigDefBuilder(props).getKCQL.head)

commitPolicy.conditions should be(
Seq(
Expand All @@ -147,7 +146,7 @@ class S3SinkConfigDefBuilderTest extends AnyFlatSpec with MockitoSugar with Matc
)

val commitPolicy =
SinkConfigDefBuilder(props.asJava).commitPolicy(SinkConfigDefBuilder(props.asJava).getKCQL.head)
SinkConfigDefBuilder(props).commitPolicy(SinkConfigDefBuilder(props).getKCQL.head)

commitPolicy.conditions should be(
Seq(
Expand All @@ -163,7 +162,7 @@ class S3SinkConfigDefBuilderTest extends AnyFlatSpec with MockitoSugar with Matc
)

val commitPolicy =
SinkConfigDefBuilder(props.asJava).commitPolicy(SinkConfigDefBuilder(props.asJava).getKCQL.head)
SinkConfigDefBuilder(props).commitPolicy(SinkConfigDefBuilder(props).getKCQL.head)

commitPolicy.conditions should be(
Seq(
Expand All @@ -179,7 +178,7 @@ class S3SinkConfigDefBuilderTest extends AnyFlatSpec with MockitoSugar with Matc
"connect.s3.kcql" -> s"insert into $BucketName:$PrefixName select * from $TopicName BATCH = 150 STOREAS `CSV` LIMIT 550",
)

val kcql = SinkConfigDefBuilder(props.asJava).getKCQL
val kcql = SinkConfigDefBuilder(props).getKCQL

kcql.head.getBatchSize should be(150)
kcql.head.getLimit should be(550)
Expand All @@ -190,7 +189,7 @@ class S3SinkConfigDefBuilderTest extends AnyFlatSpec with MockitoSugar with Matc
"connect.s3.kcql" -> s"insert into $BucketName:$PrefixName select * from $TopicName STOREAS `JSON` WITH_FLUSH_COUNT = 1 PROPERTIES('${DataStorageSettings.StoreEnvelopeKey}'=true, '${DataStorageSettings.StoreKeyKey}'=true, '${DataStorageSettings.StoreValueKey}'=true, '${DataStorageSettings.StoreMetadataKey}'=false, '${DataStorageSettings.StoreHeadersKey}'=false)",
)

SinkBucketOptions(SinkConfigDefBuilder(props.asJava)) match {
SinkBucketOptions(SinkConfigDefBuilder(props)) match {
case Left(value) => fail(value.toString)
case Right(value) =>
value.map(_.dataStorage) should be(List(DataStorageSettings(envelope = true,
Expand All @@ -207,7 +206,7 @@ class S3SinkConfigDefBuilderTest extends AnyFlatSpec with MockitoSugar with Matc
"connect.s3.kcql" -> s"insert into $BucketName:$PrefixName select * from $TopicName STOREAS `JSON` WITH_FLUSH_COUNT = 1 PROPERTIES('${DataStorageSettings.StoreEnvelopeKey}'=true, '${DataStorageSettings.StoreKeyKey}'=true, '${DataStorageSettings.StoreValueKey}'=true, '${DataStorageSettings.StoreMetadataKey}'=false, '${DataStorageSettings.StoreHeadersKey}'=false)",
)

SinkBucketOptions(SinkConfigDefBuilder(props.asJava)) match {
SinkBucketOptions(SinkConfigDefBuilder(props)) match {
case Left(value) => fail(value.toString)
case Right(value) =>
value.map(_.dataStorage) should be(List(DataStorageSettings(envelope = true,
Expand All @@ -224,7 +223,7 @@ class S3SinkConfigDefBuilderTest extends AnyFlatSpec with MockitoSugar with Matc
"connect.s3.kcql" -> s"insert into $BucketName:$PrefixName select * from $TopicName STOREAS `BYTES_VALUEONLY` WITH_FLUSH_COUNT = 1",
)

SinkBucketOptions(SinkConfigDefBuilder(props.asJava)).left.value.getMessage should startWith(
SinkBucketOptions(SinkConfigDefBuilder(props)).left.value.getMessage should startWith(
"Unsupported format - BYTES_VALUEONLY. Please note",
)
}
Expand All @@ -234,7 +233,7 @@ class S3SinkConfigDefBuilderTest extends AnyFlatSpec with MockitoSugar with Matc
"connect.s3.kcql" -> s"insert into $BucketName:$PrefixName select * from $TopicName STOREAS `BYTES` WITH_FLUSH_COUNT = 3",
)

SinkBucketOptions(SinkConfigDefBuilder(props.asJava)).left.value.getMessage should startWith(
SinkBucketOptions(SinkConfigDefBuilder(props)).left.value.getMessage should startWith(
"FLUSH_COUNT > 1 is not allowed for BYTES",
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ import io.lenses.streamreactor.connect.cloud.common.model.location.CloudLocation
import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.matchers.should.Matchers

import scala.jdk.CollectionConverters.MapHasAsJava

class S3SinkConfigTest extends AnyFunSuite with Matchers {
private implicit val connectorTaskId = ConnectorTaskId("connector", 1, 0)
private implicit val cloudLocationValidator: CloudLocationValidator = S3LocationValidator
Expand All @@ -32,7 +30,7 @@ class S3SinkConfigTest extends AnyFunSuite with Matchers {
"connect.s3.kcql" -> s"insert into mybucket:myprefix select * from TopicName PARTITIONBY _key STOREAS `CSV` WITHPARTITIONER=Values WITH_FLUSH_COUNT = 1 PROPERTIES('${DataStorageSettings.StoreEnvelopeKey}'=true)",
)

SinkBucketOptions(SinkConfigDefBuilder(props.asJava)) match {
SinkBucketOptions(SinkConfigDefBuilder(props)) match {
case Left(value) => value.getMessage shouldBe "Envelope is not supported for format CSV."
case Right(_) => fail("Should fail since envelope and CSV storage is not allowed")
}
Expand All @@ -43,7 +41,7 @@ class S3SinkConfigTest extends AnyFunSuite with Matchers {
"connect.s3.kcql" -> s"insert into mybucket:myprefix select * from TopicName PARTITIONBY _key STOREAS `Parquet` WITHPARTITIONER=Values WITH_FLUSH_COUNT = 1 PROPERTIES('${DataStorageSettings.StoreEnvelopeKey}'=true)",
)

SinkBucketOptions(SinkConfigDefBuilder(props.asJava)) match {
SinkBucketOptions(SinkConfigDefBuilder(props)) match {
case Left(error) => fail("Should not fail since envelope and Parquet storage is allowed", error)
case Right(_) => succeed
}
Expand All @@ -53,7 +51,7 @@ class S3SinkConfigTest extends AnyFunSuite with Matchers {
"connect.s3.kcql" -> s"insert into mybucket:myprefix select * from TopicName PARTITIONBY _key STOREAS `Avro` WITHPARTITIONER=Values WITH_FLUSH_COUNT = 1 PROPERTIES('${DataStorageSettings.StoreEnvelopeKey}'=true)",
)

SinkBucketOptions(SinkConfigDefBuilder(props.asJava)) match {
SinkBucketOptions(SinkConfigDefBuilder(props)) match {
case Left(error) => fail("Should not fail since envelope and Avro storage is allowed", error)
case Right(_) => succeed
}
Expand All @@ -63,7 +61,7 @@ class S3SinkConfigTest extends AnyFunSuite with Matchers {
"connect.s3.kcql" -> s"insert into mybucket:myprefix select * from TopicName PARTITIONBY _key STOREAS `Json` WITHPARTITIONER=Values WITH_FLUSH_COUNT = 1 PROPERTIES('${DataStorageSettings.StoreEnvelopeKey}'=true)",
)

SinkBucketOptions(SinkConfigDefBuilder(props.asJava)) match {
SinkBucketOptions(SinkConfigDefBuilder(props)) match {
case Left(error) => fail("Should not fail since envelope and Json storage is allowed", error)
case Right(_) => succeed
}
Expand All @@ -73,7 +71,7 @@ class S3SinkConfigTest extends AnyFunSuite with Matchers {
"connect.s3.kcql" -> s"insert into mybucket:myprefix select * from TopicName PARTITIONBY _key STOREAS `Text` WITHPARTITIONER=Values WITH_FLUSH_COUNT = 1 PROPERTIES('${DataStorageSettings.StoreEnvelopeKey}'=true)",
)

SinkBucketOptions(SinkConfigDefBuilder(props.asJava)) match {
SinkBucketOptions(SinkConfigDefBuilder(props)) match {
case Left(value) => value.getMessage shouldBe "Envelope is not supported for format TEXT."
case Right(_) => fail("Should fail since text and envelope storage is not allowed")
}
Expand All @@ -83,7 +81,7 @@ class S3SinkConfigTest extends AnyFunSuite with Matchers {
"connect.s3.kcql" -> s"insert into mybucket:myprefix select * from TopicName PARTITIONBY _key STOREAS `Bytes` WITHPARTITIONER=Values WITH_FLUSH_COUNT = 1 PROPERTIES('${DataStorageSettings.StoreEnvelopeKey}'=true)",
)

SinkBucketOptions(SinkConfigDefBuilder(props.asJava)) match {
SinkBucketOptions(SinkConfigDefBuilder(props)) match {
case Left(value) => value.getMessage shouldBe "Envelope is not supported for format BYTES."
case Right(_) => fail("Should fail since envelope and bytes storage is not allowed")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ import io.lenses.streamreactor.connect.cloud.common.config.TaskIndexKey
import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.matchers.should.Matchers

import scala.jdk.CollectionConverters.MapHasAsJava

class S3SourceConfigTest extends AnyFunSuite with Matchers with TaskIndexKey with SourcePartitionSearcherSettingsKeys {
private val Identity: String = "identity"
private val Credential: String = "credential"
Expand Down Expand Up @@ -55,7 +53,7 @@ class S3SourceConfigTest extends AnyFunSuite with Matchers with TaskIndexKey wit
"connect.s3.partition.search.recurse.levels" -> "0",
)

S3SourceConfig(S3SourceConfigDefBuilder(props.asJava)) match {
S3SourceConfig(S3SourceConfigDefBuilder(props)) match {
case Left(value) => fail(value.toString)
case Right(config) =>
config.bucketOptions.size shouldBe 3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,16 @@ abstract class CloudSinkTask[SM <: FileMetadata](

printAsciiHeader(manifest, sinkAsciiArtResource)

new ConnectorTaskIdCreator(connectorPrefix).fromProps(fallbackProps) match {
val scalaFallbackProps = fallbackProps.asScala.toMap
new ConnectorTaskIdCreator(connectorPrefix).fromProps(scalaFallbackProps) match {
case Left(value) => throw new IllegalArgumentException(value)
case Right(value) => connectorTaskId = value
}

logger.debug(s"[{}] CloudSinkTask.start", connectorTaskId.show)

val contextProps = Option(context).flatMap(c => Option(c.configs())).map(_.asScala.toMap).getOrElse(Map.empty)
val props = MapUtils.mergeProps(contextProps, fallbackProps.asScala.toMap)
val props = MapUtils.mergeProps(contextProps, scalaFallbackProps)
val errOrWriterMan = createWriterMan(props)

errOrWriterMan.leftMap(throw _).foreach(writerManager = _)
Expand Down
Loading

0 comments on commit 9fad37a

Please sign in to comment.