Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Managed + Iceberg IO #5494

Draft
wants to merge 14 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,11 @@ jobs:

- name: Make target directories
if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/main')
run: mkdir -p scio-bom/target scio-tensorflow/target site/target scio-cassandra/cassandra3/target scio-elasticsearch/es8/target scio-jdbc/target scio-macros/target scio-grpc/target scio-elasticsearch/common/target scio-test/target scio-avro/target scio-elasticsearch/es7/target scio-redis/target scio-extra/target scio-test/parquet/target scio-test/core/target scio-google-cloud-platform/target scio-smb/target scio-test/google-cloud-platform/target scio-neo4j/target scio-parquet/target scio-core/target scio-repl/target project/target
run: mkdir -p scio-bom/target scio-tensorflow/target site/target scio-cassandra/cassandra3/target scio-elasticsearch/es8/target scio-jdbc/target scio-macros/target scio-grpc/target scio-elasticsearch/common/target scio-test/target scio-avro/target scio-elasticsearch/es7/target scio-redis/target scio-extra/target scio-test/parquet/target scio-test/core/target scio-google-cloud-platform/target scio-smb/target scio-test/google-cloud-platform/target scio-neo4j/target scio-parquet/target scio-core/target scio-repl/target scio-managed/target project/target

- name: Compress target directories
if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/main')
run: tar cf targets.tar scio-bom/target scio-tensorflow/target site/target scio-cassandra/cassandra3/target scio-elasticsearch/es8/target scio-jdbc/target scio-macros/target scio-grpc/target scio-elasticsearch/common/target scio-test/target scio-avro/target scio-elasticsearch/es7/target scio-redis/target scio-extra/target scio-test/parquet/target scio-test/core/target scio-google-cloud-platform/target scio-smb/target scio-test/google-cloud-platform/target scio-neo4j/target scio-parquet/target scio-core/target scio-repl/target project/target
run: tar cf targets.tar scio-bom/target scio-tensorflow/target site/target scio-cassandra/cassandra3/target scio-elasticsearch/es8/target scio-jdbc/target scio-macros/target scio-grpc/target scio-elasticsearch/common/target scio-test/target scio-avro/target scio-elasticsearch/es7/target scio-redis/target scio-extra/target scio-test/parquet/target scio-test/core/target scio-google-cloud-platform/target scio-smb/target scio-test/google-cloud-platform/target scio-neo4j/target scio-parquet/target scio-core/target scio-repl/target scio-managed/target project/target

- name: Upload target directories
if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/main')
Expand Down
46 changes: 38 additions & 8 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ val elasticsearch8Version = "8.15.1"
val fansiVersion = "0.5.0"
val featranVersion = "0.8.0"
val httpAsyncClientVersion = "4.1.5"
val icebergVersion = "1.4.2"
val jakartaJsonVersion = "2.1.3"
val javaLshVersion = "0.12"
val jedisVersion = "5.1.5"
Expand All @@ -100,7 +101,7 @@ val kantanCodecsVersion = "0.5.3"
val kantanCsvVersion = "0.7.0"
val kryoVersion = "4.0.3"
val magnoliaVersion = "1.1.10"
val magnolifyVersion = "0.7.4"
val magnolifyVersion = "0.7.4-34-a3708ba-SNAPSHOT"
val metricsVersion = "4.2.27"
val munitVersion = "1.0.1"
val neo4jDriverVersion = "4.4.18"
Expand Down Expand Up @@ -699,6 +700,7 @@ lazy val `scio-bom` = project
`scio-grpc`,
`scio-jdbc`,
`scio-macros`,
`scio-managed`,
`scio-neo4j`,
`scio-parquet`,
`scio-redis`,
Expand Down Expand Up @@ -1171,6 +1173,23 @@ lazy val `scio-grpc` = project
)
)

lazy val `scio-managed` = project
.in(file("scio-managed"))
.dependsOn(
`scio-core` % "compile;test->test"
)
.settings(commonSettings)
.settings(
description := "Scio add-on for Beam's managed transforms",
libraryDependencies ++= Seq(
// compile
"org.apache.beam" % "beam-sdks-java-core" % beamVersion,
"org.apache.beam" % "beam-sdks-java-managed" % beamVersion,
"com.spotify" %% "magnolify-beam" % magnolifyVersion
// test
)
)

lazy val `scio-jdbc` = project
.in(file("scio-jdbc"))
.dependsOn(
Expand Down Expand Up @@ -1340,17 +1359,18 @@ lazy val `scio-examples` = project
.enablePlugins(NoPublishPlugin)
.disablePlugins(ScalafixPlugin)
.dependsOn(
`scio-core` % "compile->test",
`scio-avro` % "compile->test",
`scio-core` % "compile->test",
`scio-elasticsearch8`,
`scio-extra`,
`scio-google-cloud-platform`,
`scio-jdbc`,
`scio-extra`,
`scio-elasticsearch8`,
`scio-managed`,
`scio-neo4j`,
`scio-tensorflow`,
`scio-smb`,
`scio-parquet`,
`scio-redis`,
`scio-parquet`
`scio-smb`,
`scio-tensorflow`
)
.settings(commonSettings)
.settings(soccoSettings)
Expand Down Expand Up @@ -1405,6 +1425,7 @@ lazy val `scio-examples` = project
"com.mysql" % "mysql-connector-j" % "9.0.0",
"com.softwaremill.magnolia1_2" %% "magnolia" % magnoliaVersion,
"com.spotify" %% "magnolify-avro" % magnolifyVersion,
"com.spotify" %% "magnolify-beam" % magnolifyVersion,
"com.spotify" %% "magnolify-bigtable" % magnolifyVersion,
"com.spotify" %% "magnolify-datastore" % magnolifyVersion,
"com.spotify" %% "magnolify-guava" % magnolifyVersion,
Expand All @@ -1422,6 +1443,7 @@ lazy val `scio-examples` = project
"org.apache.beam" % "beam-sdks-java-extensions-sql" % beamVersion,
"org.apache.beam" % "beam-sdks-java-io-google-cloud-platform" % beamVersion,
"org.apache.beam" % "beam-sdks-java-io-jdbc" % beamVersion,
"org.apache.beam" % "beam-sdks-java-managed" % beamVersion,
"org.apache.hadoop" % "hadoop-common" % hadoopVersion,
"org.apache.httpcomponents" % "httpcore" % httpCoreVersion,
"org.apache.parquet" % "parquet-column" % parquetVersion,
Expand Down Expand Up @@ -1690,6 +1712,7 @@ lazy val integration = project
`scio-extra` % "test->test",
`scio-google-cloud-platform` % "compile;test->test",
`scio-jdbc` % "compile;test->test",
`scio-managed` % "test->test",
`scio-neo4j` % "test->test",
`scio-smb` % "test->provided,test"
)
Expand Down Expand Up @@ -1736,7 +1759,13 @@ lazy val integration = project
"com.fasterxml.jackson.core" % "jackson-databind" % jacksonVersion % Test,
"com.fasterxml.jackson.module" %% "jackson-module-scala" % jacksonVersion % Test,
"com.spotify" %% "magnolify-datastore" % magnolifyVersion % Test,
"org.apache.beam" % "beam-runners-google-cloud-dataflow-java" % beamVersion % Test
"org.apache.beam" % "beam-runners-google-cloud-dataflow-java" % beamVersion % Test,
"org.apache.beam" % "beam-sdks-java-io-iceberg" % beamVersion % Test,
"org.apache.iceberg" % "iceberg-common" % icebergVersion % Test,
"org.apache.iceberg" % "iceberg-core" % icebergVersion % Test,
"org.apache.iceberg" % "iceberg-parquet" % icebergVersion % Test,
"org.apache.parquet" % "parquet-common" % parquetVersion % Test,
"org.apache.parquet" % "parquet-column" % parquetVersion % Test
)
)

Expand Down Expand Up @@ -1764,6 +1793,7 @@ lazy val site = project
`scio-grpc` % "compile->test",
`scio-jdbc`,
`scio-macros`,
`scio-managed`,
`scio-neo4j`,
`scio-parquet`,
`scio-redis`,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright 2024 Spotify AB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.spotify.scio.iceberg

import com.dimafeng.testcontainers.{ForAllTestContainer, GenericContainer}
import com.spotify.scio.testing.PipelineSpec
import magnolify.beam._
import org.apache.iceberg.catalog.{Namespace, TableIdentifier}
import org.apache.iceberg.rest.RESTCatalog
import org.apache.iceberg.types.Types.{IntegerType, NestedField, StringType}
import org.apache.iceberg.{CatalogProperties, CatalogUtil, PartitionSpec}
import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy

import java.time.Duration
import java.io.File
import java.nio.file.Files
import scala.jdk.CollectionConverters._

case class IcebergIOITRecord(a: Int, b: String)
object IcebergIOITRecord {
implicit val icebergIOITRecordRowType: RowType[IcebergIOITRecord] = RowType[IcebergIOITRecord]
}

class IcebergIOIT extends PipelineSpec with ForAllTestContainer {
val ContainerPort = 8181
val CatalogName = "iceberg_it"
val NamespaceName = "iceberg_it_ns"
val TableName = s"${NamespaceName}.iceberg_records"

lazy val tempDir: File = {
val t = Files.createTempDirectory("iceberg-it").toFile
t.deleteOnExit()
t
}

override val container: GenericContainer =
GenericContainer(
GenericContainer.stringToDockerImage("tabulario/iceberg-rest:1.6.0"),
exposedPorts = Seq(ContainerPort),
waitStrategy = new HostPortWaitStrategy()
.forPorts(ContainerPort)
.withStartupTimeout(Duration.ofSeconds(180))
)

lazy val uri = s"http://${container.containerIpAddress}:${container.mappedPort(ContainerPort)}"

override def afterStart(): Unit = {
val cat = new RESTCatalog()
cat.initialize(CatalogName, Map("uri" -> uri).asJava)

import org.apache.iceberg.Schema
cat.createNamespace(Namespace.of(NamespaceName))
cat.createTable(
TableIdentifier.parse(TableName),
new Schema(
NestedField.required(0, "a", IntegerType.get()),
NestedField.required(1, "b", StringType.get())
),
Comment on lines +68 to +71
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't this be given by the RowType ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this is an Iceberg schema rather than the Beam schema. The lack of create-on-write does raise the question of whether we also need to derive the iceberg schemas

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe RowType could also offer a def icebergSchema? (Similar to how magnolify-parquet has both def schema and def avroSchema...)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would mean pulling in iceberg deps into the beam module fyi

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could make it provided, I guess, but point taken

Seems like Beam should have a utility function for converting between Beam/Icerberg Schemas. They have similar stuff for BQ/Avro/BeamSchema interop. Maybe we could contribute there

Copy link
Contributor Author

@kellen kellen Sep 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Beam does have this but it introduces a dep on the iceberg part of the sdk that in theory should be managed.

I could use it in the integration test directly but that wouldn't help users at all.

IcebergUtils.beamSchemaToIcebergSchema(rowType.schema)

OTOH ... the class has this comment so 🤷

  // This is made public for users convenience, as many may have more experience working with
  // Iceberg types.

PartitionSpec.unpartitioned()
)
}

"IcebergIO" should "work" in {
val catalogProperties = Map(
CatalogUtil.ICEBERG_CATALOG_TYPE -> CatalogUtil.ICEBERG_CATALOG_TYPE_REST,
CatalogProperties.URI -> uri
)
val elements = 1.to(10).map(i => IcebergIOITRecord(i, s"$i"))

runWithRealContext() { sc =>
sc.parallelize(elements)
.saveAsIceberg(TableName, catalogProperties = catalogProperties)
}

runWithRealContext() { sc =>
sc.iceberg[IcebergIOITRecord](
TableName,
catalogProperties = catalogProperties
) should containInAnyOrder(elements)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions
import org.apache.beam.sdk.io.fs.ResourceId
import org.apache.beam.sdk.transforms.windowing.{BoundedWindow, PaneInfo}

@FunctionalInterface
trait FilenamePolicySupplier {
def apply(path: String, suffix: String): FilenamePolicy
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright 2024 Spotify AB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.spotify.scio.examples.extra

import com.spotify.scio.ContextAndArgs
import com.spotify.scio.iceberg._
import magnolify.beam._

// Example: Apache Iceberg read/write Example

// Usage:

// `sbt "runMain com.spotify.scio.examples.extra.IcebergExample
// --project=[PROJECT] --runner=DataflowRunner --region=[REGION NAME]
// --inputTable=[INPUT TABLE] --catalogName=[CATALOG NAME]
// --catalogType=[CATALOG TYPE] --catalogUri=[CATALOG URI]
// --catalogWarehouse=[CATALOG WAREHOUSE] --outputTable=[OUTPUT TABLE]"`
object IcebergExample {

case class Record(a: Int, b: String)

def main(cmdlineArgs: Array[String]): Unit = {
val (sc, args) = ContextAndArgs(cmdlineArgs)

// Catalog configuration
val catalogConfig = Map(
"type" -> args("catalogType"),
"uri" -> args("catalogUri"),
"warehouse" -> args("catalogWarehouse")
)

// Derive a conversion between Record and Beam Row
implicit val rt: RowType[Record] = RowType[Record]

sc
// Read Records from Iceberg
.iceberg[Record](
args("inputTable"),
args.optional("catalogName").orNull,
catalogConfig
)
.map(r => r.copy(a = r.a + 1))
// Write Records to Iceberg
.saveAsIceberg(
args("outputTable"),
args.optional("catalogName").orNull,
catalogConfig
)

sc.run()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright 2024 Spotify AB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.spotify.scio.examples.extra

import com.spotify.scio.ContextAndArgs
import com.spotify.scio.coders.Coder
import com.spotify.scio.managed._
import com.spotify.scio.values.SCollection
import magnolify.beam._
import org.apache.beam.sdk.managed.Managed
import org.apache.beam.sdk.values.Row

// Example: Beam's Managed IO

// Usage:

// `sbt "runMain com.spotify.scio.examples.extra.ManagedExample
// --project=[PROJECT] --runner=DataflowRunner --region=[REGION NAME]
// --table=[TABLE] --catalogName=[CATALOG] --catalogType=[CATALOG TYPE]
// --catalogUri=[CATALOG URI] --catalogWarehouse=[CATALOG WAREHOUSE]
// --output=[OUTPUT PATH]"`
object ManagedExample {

case class Record(a: Int, b: String)

def main(cmdlineArgs: Array[String]): Unit = {
val (sc, args) = ContextAndArgs(cmdlineArgs)

val config: Map[String, Object] = Map(
"table" -> args("table"),
"catalog_name" -> args("catalogName"),
"catalog_properties" ->
Map(
"type" -> args("catalogType"),
"uri" -> args("catalogUri"),
"warehouse" -> args("catalogWarehouse")
)
)

val rt = RowType[Record]
// Provide an implicit coder for Row with the schema derived from Record case class
implicit val recordRowCoder: Coder[Row] = Coder.row(rt.schema)

// Read beam Row instances from iceberg
val records: SCollection[Record] = sc
.managed(
Managed.ICEBERG,
// Schema derived from the Record case class
rt.schema,
config
)
// Convert the Row instance to a Record
.map(rt.apply)

records
.map(r => r.copy(a = r.a + 1))
// Convert the Record to a Row
.map(rt.apply)
// Save Row instances to Iceberg
.saveAsManaged(Managed.ICEBERG, config)

sc.run()
}
}
Loading