Skip to content

Commit

Permalink
No Partition Support for Cloud Datalake Sink Connectors (#80)
Browse files Browse the repository at this point in the history
Data Lakes - Enable storing files without partitioning
  • Loading branch information
davidsloan authored Aug 27, 2024
1 parent c59c130 commit 8a6819c
Show file tree
Hide file tree
Showing 9 changed files with 192 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2094,6 +2094,50 @@ abstract class CoreSinkTaskTestCases[

}

unitUnderTest should "use custom partitioning without any partitions" in {

val task = createSinkTask()

val props = (defaultProps + (
s"$prefix.kcql" -> s"insert into $BucketName:$PrefixName select * from $TopicName NOPARTITION PROPERTIES('${FlushCount.entryName}'=1)",
)).asJava

task.start(props)
task.open(Seq(new TopicPartition(TopicName, 1)).asJava)
task.put(headerPartitionedRecords.asJava)
task.close(Seq(new TopicPartition(TopicName, 1)).asJava)
task.stop()

val list = listBucketPath(BucketName, "streamReactorBackups/")
list.size should be(6)

remoteFileAsString(
BucketName,
"streamReactorBackups/myTopic(1_000000000000).json",
) should be("""{"name":"first","title":"primary","salary":null}""")
remoteFileAsString(
BucketName,
"streamReactorBackups/myTopic(1_000000000001).json",
) should be("""{"name":"second","title":"secondary","salary":100.0}""")
remoteFileAsString(
BucketName,
"streamReactorBackups/myTopic(1_000000000002).json",
) should be("""{"name":"third","title":"primary","salary":100.0}""")
remoteFileAsString(
BucketName,
"streamReactorBackups/myTopic(1_000000000003).json",
) should be("""{"name":"first","title":null,"salary":200.0}""")
remoteFileAsString(
BucketName,
"streamReactorBackups/myTopic(1_000000000004).json",
) should be("""{"name":"second","title":null,"salary":100.0}""")
remoteFileAsString(
BucketName,
"streamReactorBackups/myTopic(1_000000000005).json",
) should be("""{"name":"third","title":null,"salary":100.0}""")

}

private def createSinkRecord(partition: Int, valueStruct: Struct, offset: Int, headers: lang.Iterable[Header]) =
new SinkRecord(TopicName,
partition,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,12 @@ class PartitionDisplayTest extends AnyFlatSpec with MockitoSugar with Matchers w
PartitionIncludeKeys.entryName -> includeKeys.toString,
),
)
PartitionDisplay(kcql, keyValueProp(true), Values) should be(KeysAndValues)
PartitionDisplay(kcql, keyValueProp(false), Values) should be(Values)
PartitionDisplay(keyValueProp(true), Values) should be(KeysAndValues)
PartitionDisplay(keyValueProp(false), Values) should be(Values)
}

"apply" should "default to specified default when no partitioner specified in kcql" in {
PartitionDisplay(kcql, emptyProps, KeysAndValues) should be(KeysAndValues)
PartitionDisplay(kcql, emptyProps, Values) should be(Values)
PartitionDisplay(emptyProps, KeysAndValues) should be(KeysAndValues)
PartitionDisplay(emptyProps, Values) should be(Values)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,19 +66,18 @@ object CloudSinkBucketOptions extends LazyLogging {
): Either[Throwable, Seq[CloudSinkBucketOptions]] =
config.getKCQL.map { kcql: Kcql =>
for {
_ <- validateWithFlush(kcql)
_ <- validateNoMoreWithPartitioner(kcql)
formatSelection <- FormatSelection.fromKcql(kcql, SinkPropsSchema.schema)
fileExtension = FileExtensionNamer.fileExtension(config.getCompressionCodec(), formatSelection)
sinkProps = CloudSinkProps.fromKcql(kcql)
partitionSelection = PartitionSelection(kcql, sinkProps)
paddingService <- PaddingService.fromConfig(config, sinkProps)
storageSettings <- DataStorageSettings.from(sinkProps)
fileNamer <- getFileNamer(storageSettings, fileExtension, partitionSelection, paddingService)
_ = println("File Namer: " + fileNamer)
keyNamer = CloudKeyNamer(formatSelection, partitionSelection, fileNamer, paddingService)
stagingArea <- config.getLocalStagingArea()(connectorTaskId)
target <- CloudLocation.splitAndValidate(kcql.getTarget)
_ <- validateWithFlush(kcql)
_ <- validateNoMoreWithPartitioner(kcql)
formatSelection <- FormatSelection.fromKcql(kcql, SinkPropsSchema.schema)
fileExtension = FileExtensionNamer.fileExtension(config.getCompressionCodec(), formatSelection)
sinkProps = CloudSinkProps.fromKcql(kcql)
partitionSelection <- PartitionSelection(kcql, sinkProps)
paddingService <- PaddingService.fromConfig(config, sinkProps)
storageSettings <- DataStorageSettings.from(sinkProps)
fileNamer <- getFileNamer(storageSettings, fileExtension, partitionSelection, paddingService)
keyNamer = CloudKeyNamer(formatSelection, partitionSelection, fileNamer, paddingService)
stagingArea <- config.getLocalStagingArea()(connectorTaskId)
target <- CloudLocation.splitAndValidate(kcql.getTarget)

_ <- validateEnvelopeAndFormat(formatSelection, storageSettings)
commitPolicy = config.commitPolicy(kcql)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/
package io.lenses.streamreactor.connect.cloud.common.sink.config

import io.lenses.kcql.Kcql
import enumeratum.Enum
import enumeratum.EnumEntry
import io.lenses.streamreactor.connect.cloud.common.config.kcqlprops.PropsKeyEnum.PartitionIncludeKeys
Expand All @@ -36,7 +35,6 @@ object PartitionDisplay extends Enum[PartitionDisplay] {
case object Values extends PartitionDisplay

def apply(
kcql: Kcql,
props: KcqlProperties[PropsKeyEntry, PropsKeyEnum.type],
default: PartitionDisplay,
): PartitionDisplay = fromProps(props).getOrElse(default)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,19 @@
* limitations under the License.
*/
package io.lenses.streamreactor.connect.cloud.common.sink.config

import io.lenses.kcql.Kcql
import cats.implicits._
import cats.implicits.catsSyntaxEitherId
import io.lenses.kcql.partitions.Partitions

import java.time.format.DateTimeFormatter
import java.util.TimeZone
import scala.jdk.CollectionConverters.IteratorHasAsScala
import scala.jdk.CollectionConverters.ListHasAsScala

/**
* The `PartitionField` trait represents a field that can be used for partitioning data.
* It provides a method to get the name of the field and a flag to indicate if the field supports padding.
* Different types of partition fields are represented as case classes extending this trait.
*/
sealed trait PartitionField {
def name(): String

Expand All @@ -29,44 +35,75 @@ sealed trait PartitionField {

object PartitionField {

def apply(kcql: Kcql): Seq[PartitionField] =
Option(kcql.getPartitionBy)
.map(_.asScala)
.getOrElse(Nil)
.map { name =>
val split: Seq[String] = PartitionFieldSplitter.split(name)
PartitionSpecifier.withNameOption(split.head).fold(PartitionField(split))(hd =>
if (split.tail.isEmpty) PartitionField(hd) else PartitionField(hd, split.tail),
)
}.toSeq

def apply(valuePartitionPath: Seq[String]): PartitionField =
ValuePartitionField(PartitionNamePath(valuePartitionPath: _*))

def apply(partitionSpecifier: PartitionSpecifier): PartitionField =
/**
* Creates a sequence of `PartitionField` instances based on the provided `Partitions` instance.
*
* @param partitions The `Partitions` instance containing the partition specifications.
* @return Either a `Throwable` if an error occurred during the operation, or a `Seq[PartitionField]` containing the created `PartitionField` instances.
*/
def apply(partitions: Partitions): Either[Throwable, Seq[PartitionField]] =
partitions.getPartitionBy.asScala.toSeq
.map {
spec =>
val split: Seq[String] = PartitionFieldSplitter.split(spec)
// if a PartitionSpecifier keyword is found, then use that with the tail of the list - otherwise default to the 'Value' keyword with the entirety of the list.
val (pSpec, pPath) = PartitionSpecifier.withNameOption(split.head) match {
case Some(partitionSpecifier) =>
partitionSpecifier -> split.tail
case None =>
PartitionSpecifier.Value -> split
}

if (pPath.isEmpty) PartitionField(pSpec) else PartitionField(pSpec, pPath)

}.sequence.leftMap(new IllegalArgumentException(_))

/**
* Creates a `PartitionField` instance for a value partition path.
*
* @param valuePartitionPath The sequence of strings representing the value partition path.
* @return Either a `String` error message if an error occurred during the operation, or a `PartitionField` instance.
*/
def apply(valuePartitionPath: Seq[String]): Either[String, PartitionField] =
ValuePartitionField(PartitionNamePath(valuePartitionPath: _*)).asRight

/**
* Creates a `PartitionField` instance based on the provided `PartitionSpecifier`.
*
* @param partitionSpecifier The `PartitionSpecifier` to use when creating the `PartitionField`.
* @return Either a `String` error message if an error occurred during the operation, or a `PartitionField` instance.
*/
def apply(partitionSpecifier: PartitionSpecifier): Either[String, PartitionField] =
partitionSpecifier match {
case PartitionSpecifier.Key => WholeKeyPartitionField
case PartitionSpecifier.Topic => TopicPartitionField
case PartitionSpecifier.Partition => PartitionPartitionField
case PartitionSpecifier.Key => WholeKeyPartitionField.asRight
case PartitionSpecifier.Topic => TopicPartitionField.asRight
case PartitionSpecifier.Partition => PartitionPartitionField.asRight
case PartitionSpecifier.Header =>
throw new IllegalArgumentException("cannot partition by Header partition field without path")
"cannot partition by Header partition field without path".asLeft
case PartitionSpecifier.Value =>
throw new IllegalArgumentException("cannot partition by Value partition field without path")
"cannot partition by Value partition field without path".asLeft
case PartitionSpecifier.Date =>
throw new IllegalArgumentException("cannot partition by Date partition field without format")
"cannot partition by Date partition field without format".asLeft
}

def apply(partitionSpecifier: PartitionSpecifier, path: Seq[String]): PartitionField =
/**
* Creates a `PartitionField` instance based on the provided `PartitionSpecifier` and path.
*
* @param partitionSpecifier The `PartitionSpecifier` to use when creating the `PartitionField`.
* @param path The sequence of strings representing the path.
* @return Either a `String` error message if an error occurred during the operation, or a `PartitionField` instance.
*/
def apply(partitionSpecifier: PartitionSpecifier, path: Seq[String]): Either[String, PartitionField] =
partitionSpecifier match {
case PartitionSpecifier.Key => KeyPartitionField(PartitionNamePath(path: _*))
case PartitionSpecifier.Value => ValuePartitionField(PartitionNamePath(path: _*))
case PartitionSpecifier.Header => HeaderPartitionField(PartitionNamePath(path: _*))
case PartitionSpecifier.Topic => throw new IllegalArgumentException("partitioning by topic requires no path")
case PartitionSpecifier.Key => KeyPartitionField(PartitionNamePath(path: _*)).asRight
case PartitionSpecifier.Value => ValuePartitionField(PartitionNamePath(path: _*)).asRight
case PartitionSpecifier.Header => HeaderPartitionField(PartitionNamePath(path: _*)).asRight
case PartitionSpecifier.Topic => "partitioning by topic requires no path".asLeft
case PartitionSpecifier.Partition =>
throw new IllegalArgumentException("partitioning by partition requires no path")
"partitioning by partition requires no path".asLeft
case PartitionSpecifier.Date =>
if (path.size == 1) DatePartitionField(path.head)
else throw new IllegalArgumentException("only one format should be provided for date")
if (path.size == 1) DatePartitionField(path.head).asRight
else "only one format should be provided for date".asLeft
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,15 @@ import io.lenses.streamreactor.connect.cloud.common.config.kcqlprops.PropsKeyEnt
import io.lenses.streamreactor.connect.cloud.common.config.kcqlprops.PropsKeyEnum
import PartitionDisplay.KeysAndValues
import PartitionDisplay.Values
import cats.implicits.catsSyntaxEitherId
import io.lenses.kcql.partitions.NoPartitions
import io.lenses.kcql.partitions.Partitions
import io.lenses.streamreactor.connect.config.kcqlprops.KcqlProperties

/**
* The `PartitionSpecifier` object is used to specify the type of partitioning to be applied.
* It provides different options for partitioning such as by Key, Value, Header, Topic, Partition, and Date.
*/
case class PartitionSelection(
isCustom: Boolean,
partitions: Seq[PartitionField],
Expand All @@ -37,20 +44,30 @@ object PartitionSelection {
def apply(
kcql: Kcql,
props: KcqlProperties[PropsKeyEntry, PropsKeyEnum.type],
): PartitionSelection = {
val fields: Seq[PartitionField] = PartitionField(kcql)
if (fields.isEmpty) {
defaultPartitionSelection(
PartitionDisplay(kcql, props, Values),
)
} else {
PartitionSelection(
isCustom = true,
fields,
PartitionDisplay(kcql, props, KeysAndValues),
)
): Either[Throwable, PartitionSelection] =
kcql.getPartitions match {
case _: NoPartitions =>
PartitionSelection(
isCustom = true,
Seq.empty,
PartitionDisplay(props, Values),
).asRight
case partitions: Partitions =>
for {
fields: Seq[PartitionField] <- PartitionField(partitions)
} yield {
fields match {
case partitions if partitions.nonEmpty => PartitionSelection(
isCustom = true,
partitions,
PartitionDisplay(props, KeysAndValues),
)
case _ =>
defaultPartitionSelection(
PartitionDisplay(props, Values),
)
}
}
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ import enumeratum.EnumEntry

import scala.collection.immutable

/**
* The `PartitionSpecifier` object is an enumeration used to specify the type of partitioning to be applied.
* It provides different options for partitioning such as by Key, Value, Header, Topic, Partition, and Date.
* Generally this is used transitively as the specifier will be mapped to a specific type of 'PartitionField' along with the value.
*/
sealed abstract class PartitionSpecifier(override val entryName: String) extends EnumEntry

object PartitionSpecifier extends Enum[PartitionSpecifier] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,37 +15,30 @@
*/
package io.lenses.streamreactor.connect.cloud.common.model

import io.lenses.kcql.Kcql
import io.lenses.kcql.partitions.Partitions
import io.lenses.streamreactor.connect.cloud.common.sink.config._
import org.mockito.MockitoSugar
import org.scalatest.EitherValues
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers

import java.util.Collections
import scala.jdk.CollectionConverters.IteratorHasAsJava
import java.util

class PartitionFieldTest extends AnyFlatSpec with MockitoSugar with Matchers {
class PartitionFieldTest extends AnyFlatSpec with MockitoSugar with Matchers with EitherValues {

val kcql: Kcql = mock[Kcql]

"partitionField.apply" should "return empty Seq if null kcql partitionBy supplied" in {
when(kcql.getPartitionBy).thenReturn(null)
PartitionField(kcql) should be(Seq.empty[PartitionField])
}

"partitionField.apply" should "return empty Seq if no kcql partitionBy supplied" in {
when(kcql.getPartitionBy).thenReturn(Collections.emptyIterator())
PartitionField(kcql) should be(Seq.empty[PartitionField])
"partitionField.apply" should "return empty Seq if empty kcql partitionBy supplied" in {
val partitions = new Partitions(util.List.of());
PartitionField(partitions).value should be(Seq.empty[PartitionField])
}

"partitionField.apply" should "parse partitions by whole key" in {
when(kcql.getPartitionBy).thenReturn(Seq("_key").iterator.asJava)
PartitionField(kcql) should be(Seq(WholeKeyPartitionField))
val partitions = new Partitions(util.List.of("_key"));
PartitionField(partitions).value should be(Seq(WholeKeyPartitionField))
}

"partitionField.apply" should "parse partitions by keys" in {
when(kcql.getPartitionBy).thenReturn(Seq("_key.fieldA", "_key.fieldB", "_key.field_c").iterator.asJava)
PartitionField(kcql) should be(
val partitions = new Partitions(util.List.of("_key.fieldA", "_key.fieldB", "_key.field_c"))
PartitionField(partitions).value should be(
Seq(
KeyPartitionField(PartitionNamePath("fieldA")),
KeyPartitionField(PartitionNamePath("fieldB")),
Expand All @@ -55,8 +48,8 @@ class PartitionFieldTest extends AnyFlatSpec with MockitoSugar with Matchers {
}

"partitionField.apply" should "parse partitions by values by default" in {
when(kcql.getPartitionBy).thenReturn(Seq("fieldA", "fieldB", "field_c").iterator.asJava)
PartitionField(kcql) should be(
val partitions = new Partitions(util.List.of("fieldA", "fieldB", "field_c"))
PartitionField(partitions).value should be(
Seq(
ValuePartitionField(PartitionNamePath("fieldA")),
ValuePartitionField(PartitionNamePath("fieldB")),
Expand All @@ -66,15 +59,15 @@ class PartitionFieldTest extends AnyFlatSpec with MockitoSugar with Matchers {
}

"partitionField.apply" should "parse partitions by values" in {
when(kcql.getPartitionBy).thenReturn(Seq("_value.fieldA", "_value.fieldB").iterator.asJava)
PartitionField(kcql) should be(Seq(ValuePartitionField(PartitionNamePath("fieldA")),
ValuePartitionField(PartitionNamePath("fieldB")),
val partitions = new Partitions(util.List.of("_value.fieldA", "_value.fieldB"))
PartitionField(partitions).value should be(Seq(ValuePartitionField(PartitionNamePath("fieldA")),
ValuePartitionField(PartitionNamePath("fieldB")),
))
}

"partitionField.apply" should "parse nested partitions" in {
when(kcql.getPartitionBy).thenReturn(Seq("_value.userDetails.address.houseNumber", "_value.fieldB").iterator.asJava)
PartitionField(kcql) should be(
val partitions = new Partitions(util.List.of("_value.userDetails.address.houseNumber", "_value.fieldB"))
PartitionField(partitions).value should be(
Seq(
ValuePartitionField(PartitionNamePath("userDetails", "address", "houseNumber")),
ValuePartitionField(PartitionNamePath("fieldB")),
Expand Down
Loading

0 comments on commit 8a6819c

Please sign in to comment.