Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor S3 components into common module for reuse #995

Merged
merged 3 commits into from
Nov 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,14 @@ lazy val `cloud-common` = (project in file("kafka-connect-cloud-common"))

lazy val `aws-s3` = (project in file("kafka-connect-aws-s3"))
.dependsOn(common)
.dependsOn(`cloud-common` % "compile->compile;test->test")
.dependsOn(`test-common` % "fun->compile")
.dependsOn(`cloud-common` % "compile->compile;test->test;it->it")
.dependsOn(`test-common` % "fun->compile;it->compile")
.settings(
settings ++
Seq(
name := "kafka-connect-aws-s3",
description := "Kafka Connect compatible connectors to move data between Kafka and popular data stores",
libraryDependencies ++= baseDeps ++ kafkaConnectS3Deps,
libraryDependencies ++= baseDeps ++ kafkaConnectCloudCommonDeps ++ kafkaConnectS3Deps,
publish / skip := true,
packExcludeJars := Seq(
"scala-.*\\.jar",
Expand Down Expand Up @@ -446,11 +446,11 @@ lazy val `test-common` = (project in file("test-common"))

addCommandAlias(
"validateAll",
";headerCheck;test:headerCheck;it:headerCheck;fun:headerCheck;scalafmtCheckAll;",
"headerCheck;test:headerCheck;it:headerCheck;fun:headerCheck;scalafmtCheckAll;",
)
addCommandAlias(
"formatAll",
";headerCreateAll;scalafmtAll;scalafmtSbt;",
"headerCreateAll;scalafmtAll;scalafmtSbt;",
)
addCommandAlias("fullTest", ";test;it:test;fun:test")
addCommandAlias("fullCoverageTest", ";coverage;test;it:test;coverageReport;coverageAggregate")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,28 +16,28 @@

package io.lenses.streamreactor.connect.aws.s3.formats

import com.typesafe.scalalogging.LazyLogging
import io.lenses.streamreactor.connect.aws.s3.utils.ITSampleSchemaAndData.firstUsers
import io.lenses.streamreactor.connect.aws.s3.utils.ITSampleSchemaAndData.users
import io.lenses.streamreactor.connect.aws.s3.utils.S3ProxyContainerTest
import io.lenses.streamreactor.connect.cloud.common.utils.SampleData.checkRecord
import io.lenses.streamreactor.connect.cloud.common.utils.SampleData.topic
import io.lenses.streamreactor.connect.cloud.common.config.AvroFormatSelection
import io.lenses.streamreactor.connect.cloud.common.formats.AvroFormatReader
import io.lenses.streamreactor.connect.cloud.common.formats.writer
import io.lenses.streamreactor.connect.cloud.common.formats.writer.AvroFormatWriter
import io.lenses.streamreactor.connect.cloud.common.formats.writer.MessageDetail
import io.lenses.streamreactor.connect.cloud.common.formats.writer.NullSinkData
import io.lenses.streamreactor.connect.cloud.common.formats.writer.StructSinkData
import io.lenses.streamreactor.connect.cloud.common.model.CompressionCodecName.UNCOMPRESSED
import io.lenses.streamreactor.connect.cloud.common.model.CompressionCodec
import io.lenses.streamreactor.connect.cloud.common.model.CompressionCodecName.UNCOMPRESSED
import io.lenses.streamreactor.connect.cloud.common.model.Offset
import io.lenses.streamreactor.connect.cloud.common.model.Topic
import io.lenses.streamreactor.connect.cloud.common.model.location.FileUtils.toBufferedOutputStream
import io.lenses.streamreactor.connect.cloud.common.stream.BuildLocalOutputStream
import io.lenses.streamreactor.connect.cloud.common.utils.SampleData.checkRecord
import io.lenses.streamreactor.connect.cloud.common.utils.SampleData.topic
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers

class AvroFormatWriterStreamTest extends AnyFlatSpec with Matchers with S3ProxyContainerTest {
import helper._
class AvroFormatWriterStreamTest extends AnyFlatSpec with Matchers with S3ProxyContainerTest with LazyLogging {

val avroFormatReader = new AvroFormatReader()

Expand All @@ -48,13 +48,13 @@ class AvroFormatWriterStreamTest extends AnyFlatSpec with Matchers with S3ProxyC
val blobStream = new BuildLocalOutputStream(toBufferedOutputStream(localFile), Topic("testTopic").withPartition(1))

val avroFormatWriter = new AvroFormatWriter(blobStream)
avroFormatWriter.write(writer.MessageDetail(NullSinkData(None),
StructSinkData(users.head),
Map.empty,
None,
topic,
1,
Offset(1),
avroFormatWriter.write(MessageDetail(NullSinkData(None),
StructSinkData(users.head),
Map.empty,
None,
topic,
1,
Offset(1),
))
avroFormatWriter.complete() should be(Right(()))
val bytes = localFileAsBytes(localFile)
Expand Down Expand Up @@ -110,7 +110,7 @@ class AvroFormatWriterStreamTest extends AnyFlatSpec with Matchers with S3ProxyC
val avroFormatWriter = new AvroFormatWriter(blobStream)
firstUsers.foreach(u =>
avroFormatWriter.write(
writer.MessageDetail(NullSinkData(None), StructSinkData(u), Map.empty, None, topic, 1, Offset(2)),
MessageDetail(NullSinkData(None), StructSinkData(u), Map.empty, None, topic, 1, Offset(2)),
) should be(
Right(()),
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@

package io.lenses.streamreactor.connect.aws.s3.formats

import com.typesafe.scalalogging.LazyLogging
import io.lenses.streamreactor.connect.aws.s3.utils.ITSampleSchemaAndData.firstUsers
import io.lenses.streamreactor.connect.aws.s3.utils.ITSampleSchemaAndData.users
import io.lenses.streamreactor.connect.aws.s3.utils.S3ProxyContainerTest
import io.lenses.streamreactor.connect.cloud.common.utils.SampleData.checkRecord
import io.lenses.streamreactor.connect.cloud.common.utils.SampleData.topic
import io.lenses.streamreactor.connect.cloud.common.config.ParquetFormatSelection
import io.lenses.streamreactor.connect.cloud.common.formats.reader.ParquetFormatReader
import io.lenses.streamreactor.connect.cloud.common.formats.writer
import io.lenses.streamreactor.connect.cloud.common.formats.writer._
import io.lenses.streamreactor.connect.cloud.common.model.CompressionCodec
import io.lenses.streamreactor.connect.cloud.common.model.Offset
Expand All @@ -43,8 +43,12 @@ import org.scalatest.matchers.should.Matchers
import scala.jdk.CollectionConverters.MapHasAsJava
import scala.jdk.CollectionConverters.SeqHasAsJava

class ParquetFormatWriterStreamTest extends AnyFlatSpec with Matchers with S3ProxyContainerTest with EitherValues {
import helper._
class ParquetFormatWriterStreamTest
extends AnyFlatSpec
with Matchers
with S3ProxyContainerTest
with EitherValues
with LazyLogging {

implicit val compressionCodec: CompressionCodec = UNCOMPRESSED.toCodec()

Expand All @@ -56,31 +60,31 @@ class ParquetFormatWriterStreamTest extends AnyFlatSpec with Matchers with S3Pro

val blobStream = new BuildLocalOutputStream(toBufferedOutputStream(localFile), Topic("testTopic").withPartition(1))
val parquetFormatWriter = new ParquetFormatWriter(blobStream)
parquetFormatWriter.write(writer.MessageDetail(NullSinkData(None),
StructSinkData(users.head),
Map.empty,
None,
topic,
1,
Offset(1),
parquetFormatWriter.write(MessageDetail(NullSinkData(None),
StructSinkData(users.head),
Map.empty,
None,
topic,
1,
Offset(1),
))
parquetFormatWriter.getPointer should be(21)
parquetFormatWriter.write(writer.MessageDetail(NullSinkData(None),
StructSinkData(users(1)),
Map.empty,
None,
topic,
1,
Offset(2),
parquetFormatWriter.write(MessageDetail(NullSinkData(None),
StructSinkData(users(1)),
Map.empty,
None,
topic,
1,
Offset(2),
))
parquetFormatWriter.getPointer should be(44)
parquetFormatWriter.write(writer.MessageDetail(NullSinkData(None),
StructSinkData(users(2)),
Map.empty,
None,
topic,
1,
Offset(3),
parquetFormatWriter.write(MessageDetail(NullSinkData(None),
StructSinkData(users(2)),
Map.empty,
None,
topic,
1,
Offset(3),
))
parquetFormatWriter.getPointer should be(59)
parquetFormatWriter.complete() should be(Right(()))
Expand All @@ -107,7 +111,7 @@ class ParquetFormatWriterStreamTest extends AnyFlatSpec with Matchers with S3Pro
val blobStream = new BuildLocalOutputStream(toBufferedOutputStream(localFile), Topic("testTopic").withPartition(1))
val parquetFormatWriter = new ParquetFormatWriter(blobStream)
parquetFormatWriter.write(
writer.MessageDetail(
MessageDetail(
NullSinkData(None),
ArraySinkData(
Seq(
Expand All @@ -132,7 +136,7 @@ class ParquetFormatWriterStreamTest extends AnyFlatSpec with Matchers with S3Pro
val blobStream = new BuildLocalOutputStream(toBufferedOutputStream(localFile), Topic("testTopic").withPartition(1))
val parquetFormatWriter = new ParquetFormatWriter(blobStream)
parquetFormatWriter.write(
writer.MessageDetail(
MessageDetail(
NullSinkData(None),
MapSinkData(
Map(
Expand Down Expand Up @@ -185,13 +189,13 @@ class ParquetFormatWriterStreamTest extends AnyFlatSpec with Matchers with S3Pro

val parquetFormatWriter = new ParquetFormatWriter(blobStream)
firstUsers.foreach(u =>
parquetFormatWriter.write(writer.MessageDetail(NullSinkData(None),
StructSinkData(u),
Map.empty,
None,
topic,
1,
Offset(1),
parquetFormatWriter.write(MessageDetail(NullSinkData(None),
StructSinkData(u),
Map.empty,
None,
topic,
1,
Offset(1),
)) should be(
Right(()),
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,8 @@ import io.lenses.streamreactor.connect.aws.s3.sink.config.SinkBucketOptions
import io.lenses.streamreactor.connect.aws.s3.utils.ITSampleSchemaAndData.firstUsers
import io.lenses.streamreactor.connect.aws.s3.utils.S3ProxyContainerTest
import io.lenses.streamreactor.connect.cloud.common.config.AvroFormatSelection
import io.lenses.streamreactor.connect.cloud.common.config.ConnectorTaskId
import io.lenses.streamreactor.connect.cloud.common.config.DataStorageSettings
import io.lenses.streamreactor.connect.cloud.common.formats.AvroFormatReader
import io.lenses.streamreactor.connect.cloud.common.formats.writer
import io.lenses.streamreactor.connect.cloud.common.formats.writer.MessageDetail
import io.lenses.streamreactor.connect.cloud.common.formats.writer.NullSinkData
import io.lenses.streamreactor.connect.cloud.common.formats.writer.SinkData
Expand All @@ -47,8 +45,8 @@ import io.lenses.streamreactor.connect.cloud.common.sink.config.padding.LeftPadP
import io.lenses.streamreactor.connect.cloud.common.sink.config.padding.NoOpPaddingStrategy
import io.lenses.streamreactor.connect.cloud.common.sink.config.padding.PaddingService
import io.lenses.streamreactor.connect.cloud.common.sink.config.padding.PaddingStrategy
import io.lenses.streamreactor.connect.cloud.common.sink.naming.OffsetS3FileNamer
import io.lenses.streamreactor.connect.cloud.common.sink.naming.S3KeyNamer
import io.lenses.streamreactor.connect.cloud.common.sink.naming.OffsetFileNamer
import io.lenses.streamreactor.connect.cloud.common.sink.naming.CloudKeyNamer
import org.apache.avro.generic.GenericRecord
import org.apache.kafka.connect.data.Schema
import org.apache.kafka.connect.data.SchemaBuilder
Expand All @@ -58,22 +56,19 @@ import org.scalatest.matchers.should.Matchers

class S3AvroWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyContainerTest {

import helper._
private val compressionCodec = UNCOMPRESSED.toCodec()

private val TopicName = "myTopic"
private val PathPrefix = "streamReactorBackups"
private val avroFormatReader = new AvroFormatReader

private implicit val connectorTaskId: ConnectorTaskId = ConnectorTaskId("sinkName", 1, 1)

private implicit val cloudLocationValidator = S3LocationValidator
private val bucketAndPrefix = CloudLocation(BucketName, PathPrefix.some)
private def avroConfig = S3SinkConfig(
S3Config(
None,
Some(Identity),
Some(Credential),
Some(container.identity.identity),
Some(container.identity.credential),
AuthMode.Credentials,
),
bucketOptions = Seq(
Expand All @@ -82,10 +77,10 @@ class S3AvroWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont
bucketAndPrefix,
commitPolicy = CommitPolicy(Count(2)),
formatSelection = AvroFormatSelection,
keyNamer = new S3KeyNamer(
keyNamer = new CloudKeyNamer(
AvroFormatSelection,
defaultPartitionSelection(Values),
new OffsetS3FileNamer(
new OffsetFileNamer(
identity[String],
AvroFormatSelection.extension,
),
Expand Down Expand Up @@ -154,13 +149,13 @@ class S3AvroWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont
case (user, index) =>
sink.write(
TopicPartitionOffset(Topic(TopicName), 1, Offset((index + 1).toLong)),
writer.MessageDetail(NullSinkData(None),
StructSinkData(user),
Map.empty[String, SinkData],
None,
Topic(TopicName),
1,
Offset((index + 1).toLong),
MessageDetail(NullSinkData(None),
StructSinkData(user),
Map.empty[String, SinkData],
None,
Topic(TopicName),
1,
Offset((index + 1).toLong),
),
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import io.lenses.streamreactor.connect.aws.s3.utils.ITSampleSchemaAndData.firstU
import io.lenses.streamreactor.connect.aws.s3.utils.ITSampleSchemaAndData.users
import io.lenses.streamreactor.connect.aws.s3.utils.S3ProxyContainerTest
import io.lenses.streamreactor.connect.cloud.common.config.AvroFormatSelection
import io.lenses.streamreactor.connect.cloud.common.config.ConnectorTaskId
import io.lenses.streamreactor.connect.cloud.common.config.DataStorageSettings
import io.lenses.streamreactor.connect.cloud.common.config.JsonFormatSelection
import io.lenses.streamreactor.connect.cloud.common.formats.writer.MessageDetail
Expand All @@ -47,21 +46,18 @@ import io.lenses.streamreactor.connect.cloud.common.sink.config.padding.LeftPadP
import io.lenses.streamreactor.connect.cloud.common.sink.config.padding.NoOpPaddingStrategy
import io.lenses.streamreactor.connect.cloud.common.sink.config.padding.PaddingService
import io.lenses.streamreactor.connect.cloud.common.sink.config.padding.PaddingStrategy
import io.lenses.streamreactor.connect.cloud.common.sink.naming.OffsetS3FileNamer
import io.lenses.streamreactor.connect.cloud.common.sink.naming.S3KeyNamer
import io.lenses.streamreactor.connect.cloud.common.sink.naming.OffsetFileNamer
import io.lenses.streamreactor.connect.cloud.common.sink.naming.CloudKeyNamer
import org.apache.kafka.connect.data.Struct
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers

class S3JsonWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyContainerTest {

import helper._

private val compressionCodec = UNCOMPRESSED.toCodec()

private val TopicName = "myTopic"
private val PathPrefix = "streamReactorBackups"
private implicit val connectorTaskId: ConnectorTaskId = ConnectorTaskId("sinkName", 1, 1)
private implicit val cloudLocationValidator: S3LocationValidator.type = S3LocationValidator

"json sink" should "write single json record" in {
Expand All @@ -70,8 +66,8 @@ class S3JsonWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont
val config = S3SinkConfig(
S3Config(
None,
Some(Identity),
Some(Credential),
Some(container.identity.identity),
Some(container.identity.credential),
AuthMode.Credentials,
),
bucketOptions = Seq(
Expand All @@ -80,10 +76,10 @@ class S3JsonWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont
bucketAndPrefix,
commitPolicy = CommitPolicy(Count(1)),
formatSelection = JsonFormatSelection,
keyNamer = new S3KeyNamer(
keyNamer = new CloudKeyNamer(
JsonFormatSelection,
defaultPartitionSelection(Values),
new OffsetS3FileNamer(
new OffsetFileNamer(
identity[String],
JsonFormatSelection.extension,
),
Expand Down Expand Up @@ -124,8 +120,8 @@ class S3JsonWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont
val config = S3SinkConfig(
S3Config(
None,
Some(Identity),
Some(Credential),
Some(container.identity.identity),
Some(container.identity.credential),
AuthMode.Credentials,
),
bucketOptions = Seq(
Expand All @@ -134,10 +130,10 @@ class S3JsonWriterManagerTest extends AnyFlatSpec with Matchers with S3ProxyCont
bucketAndPrefix,
commitPolicy = CommitPolicy(Count(3)),
formatSelection = JsonFormatSelection,
keyNamer = new S3KeyNamer(
keyNamer = new CloudKeyNamer(
AvroFormatSelection,
defaultPartitionSelection(Values),
new OffsetS3FileNamer(
new OffsetFileNamer(
identity[String],
JsonFormatSelection.extension,
),
Expand Down
Loading
Loading