Skip to content

Commit

Permalink
Refactor S3 components into common module for reuse (#995)
Browse files Browse the repository at this point in the history
* Unit testing
Remaining transformers into common.  Unit testing.
Removing scala testcontainers lib from S3 and reusing the test-common S3 test containers
Extract shared code to cloud-common
Renaming some common things which shouldn't have S3 in their name
Clean up, reduce tech debt, remove duplicated functionality, avoid weak string referencing
Updating references to S3 in common module

* Fix odd class name, remove netty hack

* Fix file name and remove comments:
  • Loading branch information
davidsloan authored Nov 7, 2023
1 parent dd85a36 commit 5b598c4
Show file tree
Hide file tree
Showing 145 changed files with 2,175 additions and 2,090 deletions.
10 changes: 5 additions & 5 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,14 @@ lazy val `cloud-common` = (project in file("kafka-connect-cloud-common"))

lazy val `aws-s3` = (project in file("kafka-connect-aws-s3"))
.dependsOn(common)
.dependsOn(`cloud-common` % "compile->compile;test->test")
.dependsOn(`test-common` % "fun->compile")
.dependsOn(`cloud-common` % "compile->compile;test->test;it->it")
.dependsOn(`test-common` % "fun->compile;it->compile")
.settings(
settings ++
Seq(
name := "kafka-connect-aws-s3",
description := "Kafka Connect compatible connectors to move data between Kafka and popular data stores",
libraryDependencies ++= baseDeps ++ kafkaConnectS3Deps,
libraryDependencies ++= baseDeps ++ kafkaConnectCloudCommonDeps ++ kafkaConnectS3Deps,
publish / skip := true,
packExcludeJars := Seq(
"scala-.*\\.jar",
Expand Down Expand Up @@ -446,11 +446,11 @@ lazy val `test-common` = (project in file("test-common"))

addCommandAlias(
"validateAll",
";headerCheck;test:headerCheck;it:headerCheck;fun:headerCheck;scalafmtCheckAll;",
"headerCheck;test:headerCheck;it:headerCheck;fun:headerCheck;scalafmtCheckAll;",
)
addCommandAlias(
"formatAll",
";headerCreateAll;scalafmtAll;scalafmtSbt;",
"headerCreateAll;scalafmtAll;scalafmtSbt;",
)
addCommandAlias("fullTest", ";test;it:test;fun:test")
addCommandAlias("fullCoverageTest", ";coverage;test;it:test;coverageReport;coverageAggregate")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,28 +16,28 @@

package io.lenses.streamreactor.connect.aws.s3.formats

import com.typesafe.scalalogging.LazyLogging
import io.lenses.streamreactor.connect.aws.s3.utils.ITSampleSchemaAndData.firstUsers
import io.lenses.streamreactor.connect.aws.s3.utils.ITSampleSchemaAndData.users
import io.lenses.streamreactor.connect.aws.s3.utils.S3ProxyContainerTest
import io.lenses.streamreactor.connect.cloud.common.utils.SampleData.checkRecord
import io.lenses.streamreactor.connect.cloud.common.utils.SampleData.topic
import io.lenses.streamreactor.connect.cloud.common.config.AvroFormatSelection
import io.lenses.streamreactor.connect.cloud.common.formats.AvroFormatReader
import io.lenses.streamreactor.connect.cloud.common.formats.writer
import io.lenses.streamreactor.connect.cloud.common.formats.writer.AvroFormatWriter
import io.lenses.streamreactor.connect.cloud.common.formats.writer.MessageDetail
import io.lenses.streamreactor.connect.cloud.common.formats.writer.NullSinkData
import io.lenses.streamreactor.connect.cloud.common.formats.writer.StructSinkData
import io.lenses.streamreactor.connect.cloud.common.model.CompressionCodecName.UNCOMPRESSED
import io.lenses.streamreactor.connect.cloud.common.model.CompressionCodec
import io.lenses.streamreactor.connect.cloud.common.model.CompressionCodecName.UNCOMPRESSED
import io.lenses.streamreactor.connect.cloud.common.model.Offset
import io.lenses.streamreactor.connect.cloud.common.model.Topic
import io.lenses.streamreactor.connect.cloud.common.model.location.FileUtils.toBufferedOutputStream
import io.lenses.streamreactor.connect.cloud.common.stream.BuildLocalOutputStream
import io.lenses.streamreactor.connect.cloud.common.utils.SampleData.checkRecord
import io.lenses.streamreactor.connect.cloud.common.utils.SampleData.topic
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers

class AvroFormatWriterStreamTest extends AnyFlatSpec with Matchers with S3ProxyContainerTest {
import helper._
class AvroFormatWriterStreamTest extends AnyFlatSpec with Matchers with S3ProxyContainerTest with LazyLogging {

val avroFormatReader = new AvroFormatReader()

Expand All @@ -48,13 +48,13 @@ class AvroFormatWriterStreamTest extends AnyFlatSpec with Matchers with S3ProxyC
val blobStream = new BuildLocalOutputStream(toBufferedOutputStream(localFile), Topic("testTopic").withPartition(1))

val avroFormatWriter = new AvroFormatWriter(blobStream)
avroFormatWriter.write(writer.MessageDetail(NullSinkData(None),
StructSinkData(users.head),
Map.empty,
None,
topic,
1,
Offset(1),
avroFormatWriter.write(MessageDetail(NullSinkData(None),
StructSinkData(users.head),
Map.empty,
None,
topic,
1,
Offset(1),
))
avroFormatWriter.complete() should be(Right(()))
val bytes = localFileAsBytes(localFile)
Expand Down Expand Up @@ -110,7 +110,7 @@ class AvroFormatWriterStreamTest extends AnyFlatSpec with Matchers with S3ProxyC
val avroFormatWriter = new AvroFormatWriter(blobStream)
firstUsers.foreach(u =>
avroFormatWriter.write(
writer.MessageDetail(NullSinkData(None), StructSinkData(u), Map.empty, None, topic, 1, Offset(2)),
MessageDetail(NullSinkData(None), StructSinkData(u), Map.empty, None, topic, 1, Offset(2)),
) should be(
Right(()),
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@

package io.lenses.streamreactor.connect.aws.s3.formats

import com.typesafe.scalalogging.LazyLogging
import io.lenses.streamreactor.connect.aws.s3.utils.ITSampleSchemaAndData.firstUsers
import io.lenses.streamreactor.connect.aws.s3.utils.ITSampleSchemaAndData.users
import io.lenses.streamreactor.connect.aws.s3.utils.S3ProxyContainerTest
import io.lenses.streamreactor.connect.cloud.common.utils.SampleData.checkRecord
import io.lenses.streamreactor.connect.cloud.common.utils.SampleData.topic
import io.lenses.streamreactor.connect.cloud.common.config.ParquetFormatSelection
import io.lenses.streamreactor.connect.cloud.common.formats.reader.ParquetFormatReader
import io.lenses.streamreactor.connect.cloud.common.formats.writer
import io.lenses.streamreactor.connect.cloud.common.formats.writer._
import io.lenses.streamreactor.connect.cloud.common.model.CompressionCodec
import io.lenses.streamreactor.connect.cloud.common.model.Offset
Expand All @@ -43,8 +43,12 @@ import org.scalatest.matchers.should.Matchers
import scala.jdk.CollectionConverters.MapHasAsJava
import scala.jdk.CollectionConverters.SeqHasAsJava

class ParquetFormatWriterStreamTest extends AnyFlatSpec with Matchers with S3ProxyContainerTest with EitherValues {
import helper._
class ParquetFormatWriterStreamTest
extends AnyFlatSpec
with Matchers
with S3ProxyContainerTest
with EitherValues
with LazyLogging {

implicit val compressionCodec: CompressionCodec = UNCOMPRESSED.toCodec()

Expand All @@ -56,31 +60,31 @@ class ParquetFormatWriterStreamTest extends AnyFlatSpec with Matchers with S3Pro

val blobStream = new BuildLocalOutputStream(toBufferedOutputStream(localFile), Topic("testTopic").withPartition(1))
val parquetFormatWriter = new ParquetFormatWriter(blobStream)
parquetFormatWriter.write(writer.MessageDetail(NullSinkData(None),
StructSinkData(users.head),
Map.empty,
None,
topic,
1,
Offset(1),
parquetFormatWriter.write(MessageDetail(NullSinkData(None),
StructSinkData(users.head),
Map.empty,
None,
topic,
1,
Offset(1),
))
parquetFormatWriter.getPointer should be(21)
parquetFormatWriter.write(writer.MessageDetail(NullSinkData(None),
StructSinkData(users(1)),
Map.empty,
None,
topic,
1,
Offset(2),
parquetFormatWriter.write(MessageDetail(NullSinkData(None),
StructSinkData(users(1)),
Map.empty,
None,
topic,
1,
Offset(2),
))
parquetFormatWriter.getPointer should be(44)
parquetFormatWriter.write(writer.MessageDetail(NullSinkData(None),
StructSinkData(users(2)),
Map.empty,
None,
topic,
1,
Offset(3),
parquetFormatWriter.write(MessageDetail(NullSinkData(None),
StructSinkData(users(2)),
Map.empty,
None,
topic,
1,
Offset(3),
))
parquetFormatWriter.getPointer should be(59)
parquetFormatWriter.complete() should be(Right(()))
Expand All @@ -107,7 +111,7 @@ class ParquetFormatWriterStreamTest extends AnyFlatSpec with Matchers with S3Pro
val blobStream = new BuildLocalOutputStream(toBufferedOutputStream(localFile), Topic("testTopic").withPartition(1))
val parquetFormatWriter = new ParquetFormatWriter(blobStream)
parquetFormatWriter.write(
writer.MessageDetail(
MessageDetail(
NullSinkData(None),
ArraySinkData(
Seq(
Expand All @@ -132,7 +136,7 @@ class ParquetFormatWriterStreamTest extends AnyFlatSpec with Matchers with S3Pro
val blobStream = new BuildLocalOutputStream(toBufferedOutputStream(localFile), Topic("testTopic").withPartition(1))
val parquetFormatWriter = new ParquetFormatWriter(blobStream)
parquetFormatWriter.write(
writer.MessageDetail(
MessageDetail(
NullSinkData(None),
MapSinkData(
Map(
Expand Down Expand Up @@ -185,13 +189,13 @@ class ParquetFormatWriterStreamTest extends AnyFlatSpec with Matchers with S3Pro

val parquetFormatWriter = new ParquetFormatWriter(blobStream)
firstUsers.foreach(u =>
parquetFormatWriter.write(writer.MessageDetail(NullSinkData(None),
StructSinkData(u),
Map.empty,
None,
topic,
1,
Offset(1),
parquetFormatWriter.write(MessageDetail(NullSinkData(None),
StructSinkData(u),
Map.empty,
None,
topic,
1,
Offset(1),
)) should be(
Right(()),
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,8 @@ import io.lenses.streamreactor.connect.aws.s3.sink.config.SinkBucketOptions
import io.lenses.streamreactor.connect.aws.s3.utils.ITSampleSchemaAndData.firstUsers
import io.lenses.streamreactor.connect.aws.s3.utils.S3ProxyContainerTest
import io.lenses.streamreactor.connect.cloud.common.config.AvroFormatSelection
import io.lenses.streamreactor.connect.cloud.common.config.ConnectorTaskId
import io.lenses.streamreactor.connect.cloud.common.config.DataStorageSettings
import io.lenses.streamreactor.connect.cloud.common.formats.AvroFormatReader
import io.lenses.streamreactor.connect.cloud.common.formats.writer
import io.lenses.streamreactor.connect.cloud.common.formats.writer.MessageDetail
import io.lenses.streamreactor.connect.cloud.common.formats.writer.NullSinkData
import io.lenses.streamreactor.connect.cloud.common.formats.writer.SinkData
Expand All @@ -47,8 +45,8 @@ import io.lenses.streamreactor.connect.cloud.common.sink.config.padding.LeftPadP
import io.lenses.streamreactor.connect.cloud.common.sink.config.padding.NoOpPaddingStrategy
import io.lenses.streamreactor.connect.cloud.common.sink.config.padding.PaddingService
import io.lenses.streamreactor.connect.cloud.common.sink.config.padding.PaddingStrategy
import io.lenses.streamreactor.connect.cloud.common.sink.naming.OffsetS3FileNamer
import io.lenses.streamreactor.connect.cloud.common.sink.naming.S3KeyNamer
import io.lenses.streamreactor.connect.cloud.common.sink.naming.OffsetFileNamer
import io.lenses.streamreactor.connect.cloud.common.sink.naming.CloudKeyNamer
import org.apache.avro.generic.GenericRecord
import org.apache.kafka.connect.data.Schema
import org.apache.kafka.connect.data.SchemaBuilder
Expand All @@ -58,22 +56,19 @@ import org.scalatest.matchers.should.Matchers

class S3AvroWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyContainerTest {

import helper._
private val compressionCodec = UNCOMPRESSED.toCodec()

private val TopicName = "myTopic"
private val PathPrefix = "streamReactorBackups"
private val avroFormatReader = new AvroFormatReader

private implicit val connectorTaskId: ConnectorTaskId = ConnectorTaskId("sinkName", 1, 1)

private implicit val cloudLocationValidator = S3LocationValidator
private val bucketAndPrefix = CloudLocation(BucketName, PathPrefix.some)
private def avroConfig = S3SinkConfig(
S3Config(
None,
Some(Identity),
Some(Credential),
Some(container.identity.identity),
Some(container.identity.credential),
AuthMode.Credentials,
),
bucketOptions = Seq(
Expand All @@ -82,10 +77,10 @@ class S3AvroWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont
bucketAndPrefix,
commitPolicy = CommitPolicy(Count(2)),
formatSelection = AvroFormatSelection,
keyNamer = new S3KeyNamer(
keyNamer = new CloudKeyNamer(
AvroFormatSelection,
defaultPartitionSelection(Values),
new OffsetS3FileNamer(
new OffsetFileNamer(
identity[String],
AvroFormatSelection.extension,
),
Expand Down Expand Up @@ -154,13 +149,13 @@ class S3AvroWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont
case (user, index) =>
sink.write(
TopicPartitionOffset(Topic(TopicName), 1, Offset((index + 1).toLong)),
writer.MessageDetail(NullSinkData(None),
StructSinkData(user),
Map.empty[String, SinkData],
None,
Topic(TopicName),
1,
Offset((index + 1).toLong),
MessageDetail(NullSinkData(None),
StructSinkData(user),
Map.empty[String, SinkData],
None,
Topic(TopicName),
1,
Offset((index + 1).toLong),
),
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import io.lenses.streamreactor.connect.aws.s3.utils.ITSampleSchemaAndData.firstU
import io.lenses.streamreactor.connect.aws.s3.utils.ITSampleSchemaAndData.users
import io.lenses.streamreactor.connect.aws.s3.utils.S3ProxyContainerTest
import io.lenses.streamreactor.connect.cloud.common.config.AvroFormatSelection
import io.lenses.streamreactor.connect.cloud.common.config.ConnectorTaskId
import io.lenses.streamreactor.connect.cloud.common.config.DataStorageSettings
import io.lenses.streamreactor.connect.cloud.common.config.JsonFormatSelection
import io.lenses.streamreactor.connect.cloud.common.formats.writer.MessageDetail
Expand All @@ -47,21 +46,18 @@ import io.lenses.streamreactor.connect.cloud.common.sink.config.padding.LeftPadP
import io.lenses.streamreactor.connect.cloud.common.sink.config.padding.NoOpPaddingStrategy
import io.lenses.streamreactor.connect.cloud.common.sink.config.padding.PaddingService
import io.lenses.streamreactor.connect.cloud.common.sink.config.padding.PaddingStrategy
import io.lenses.streamreactor.connect.cloud.common.sink.naming.OffsetS3FileNamer
import io.lenses.streamreactor.connect.cloud.common.sink.naming.S3KeyNamer
import io.lenses.streamreactor.connect.cloud.common.sink.naming.OffsetFileNamer
import io.lenses.streamreactor.connect.cloud.common.sink.naming.CloudKeyNamer
import org.apache.kafka.connect.data.Struct
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers

class S3JsonWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyContainerTest {

import helper._

private val compressionCodec = UNCOMPRESSED.toCodec()

private val TopicName = "myTopic"
private val PathPrefix = "streamReactorBackups"
private implicit val connectorTaskId: ConnectorTaskId = ConnectorTaskId("sinkName", 1, 1)
private implicit val cloudLocationValidator: S3LocationValidator.type = S3LocationValidator

"json sink" should "write single json record" in {
Expand All @@ -70,8 +66,8 @@ class S3JsonWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont
val config = S3SinkConfig(
S3Config(
None,
Some(Identity),
Some(Credential),
Some(container.identity.identity),
Some(container.identity.credential),
AuthMode.Credentials,
),
bucketOptions = Seq(
Expand All @@ -80,10 +76,10 @@ class S3JsonWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont
bucketAndPrefix,
commitPolicy = CommitPolicy(Count(1)),
formatSelection = JsonFormatSelection,
keyNamer = new S3KeyNamer(
keyNamer = new CloudKeyNamer(
JsonFormatSelection,
defaultPartitionSelection(Values),
new OffsetS3FileNamer(
new OffsetFileNamer(
identity[String],
JsonFormatSelection.extension,
),
Expand Down Expand Up @@ -124,8 +120,8 @@ class S3JsonWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont
val config = S3SinkConfig(
S3Config(
None,
Some(Identity),
Some(Credential),
Some(container.identity.identity),
Some(container.identity.credential),
AuthMode.Credentials,
),
bucketOptions = Seq(
Expand All @@ -134,10 +130,10 @@ class S3JsonWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont
bucketAndPrefix,
commitPolicy = CommitPolicy(Count(3)),
formatSelection = JsonFormatSelection,
keyNamer = new S3KeyNamer(
keyNamer = new CloudKeyNamer(
AvroFormatSelection,
defaultPartitionSelection(Values),
new OffsetS3FileNamer(
new OffsetFileNamer(
identity[String],
JsonFormatSelection.extension,
),
Expand Down
Loading

0 comments on commit 5b598c4

Please sign in to comment.