From 2207f5dfc9864af85dca5a54387529733cf37505 Mon Sep 17 00:00:00 2001 From: Kellen Dye Date: Fri, 6 Sep 2024 15:50:32 -0400 Subject: [PATCH 01/13] wip --- build.sbt | 64 +++++++++++++++-- .../scio/util/FilenamePolicySupplier.scala | 1 + .../scio/examples/extra/IcebergExample.scala | 66 +++++++++++++++++ .../scio/examples/extra/ManagedExample.scala | 62 ++++++++++++++++ .../com/spotify/scio/iceberg/IcebergIO.scala | 49 +++++++++++++ .../com/spotify/scio/iceberg/package.scala | 28 ++++++++ .../syntax/IcebergSCollectionSyntax.scala | 34 +++++++++ .../syntax/IcebergScioContextSyntax.scala | 49 +++++++++++++ .../com/spotify/scio/managed/ManagedIO.scala | 70 +++++++++++++++++++ .../com/spotify/scio/managed/package.scala | 28 ++++++++ .../syntax/ManagedSCollectionSyntax.scala | 32 +++++++++ .../syntax/ManagedScioContextSyntax.scala | 33 +++++++++ site/src/main/paradox/io/Iceberg.md | 3 + site/src/main/paradox/io/Managed.md | 48 +++++++++++++ site/src/main/paradox/io/Neo4J.md | 2 +- site/src/main/paradox/io/index.md | 4 +- 16 files changed, 564 insertions(+), 9 deletions(-) create mode 100644 scio-examples/src/main/scala/com/spotify/scio/examples/extra/IcebergExample.scala create mode 100644 scio-examples/src/main/scala/com/spotify/scio/examples/extra/ManagedExample.scala create mode 100644 scio-iceberg/src/main/scala/com/spotify/scio/iceberg/IcebergIO.scala create mode 100644 scio-iceberg/src/main/scala/com/spotify/scio/iceberg/package.scala create mode 100644 scio-iceberg/src/main/scala/com/spotify/scio/iceberg/syntax/IcebergSCollectionSyntax.scala create mode 100644 scio-iceberg/src/main/scala/com/spotify/scio/iceberg/syntax/IcebergScioContextSyntax.scala create mode 100644 scio-iceberg/src/main/scala/com/spotify/scio/managed/ManagedIO.scala create mode 100644 scio-iceberg/src/main/scala/com/spotify/scio/managed/package.scala create mode 100644 scio-iceberg/src/main/scala/com/spotify/scio/managed/syntax/ManagedSCollectionSyntax.scala create mode 100644 scio-iceberg/src/main/scala/com/spotify/scio/managed/syntax/ManagedScioContextSyntax.scala create mode 100644 site/src/main/paradox/io/Iceberg.md create mode 100644 site/src/main/paradox/io/Managed.md diff --git a/build.sbt b/build.sbt index 6bd12044c7..153ee67270 100644 --- a/build.sbt +++ b/build.sbt @@ -100,7 +100,7 @@ val kantanCodecsVersion = "0.5.3" val kantanCsvVersion = "0.7.0" val kryoVersion = "4.0.3" val magnoliaVersion = "1.1.10" -val magnolifyVersion = "0.7.4" +val magnolifyVersion = "0.7.4-47-aa9f05b-20240903T173705Z-SNAPSHOT" val metricsVersion = "4.2.27" val munitVersion = "1.0.1" val neo4jDriverVersion = "4.4.18" @@ -697,6 +697,7 @@ lazy val `scio-bom` = project `scio-extra`, `scio-google-cloud-platform`, `scio-grpc`, + `scio-iceberg`, `scio-jdbc`, `scio-macros`, `scio-neo4j`, @@ -1171,6 +1172,25 @@ lazy val `scio-grpc` = project ) ) +lazy val `scio-iceberg` = project + .in(file("scio-iceberg")) + .dependsOn( + `scio-core` % "compile;test->test" + ) + .settings(commonSettings) + .settings( + description := "Scio add-on for Iceberg", + libraryDependencies ++= Seq( + // compile + "org.apache.beam" % "beam-sdks-java-core" % beamVersion, + "org.apache.beam" % "beam-sdks-java-managed" % beamVersion, + // TODO add iceberg as test source + "org.apache.beam" % "beam-sdks-java-io-iceberg" % beamVersion, + "com.spotify" %% "magnolify-beam" % magnolifyVersion, + // test + ) + ) + lazy val `scio-jdbc` = project .in(file("scio-jdbc")) .dependsOn( @@ -1340,17 +1360,18 @@ lazy val `scio-examples` = project .enablePlugins(NoPublishPlugin) .disablePlugins(ScalafixPlugin) .dependsOn( - `scio-core` % "compile->test", `scio-avro` % "compile->test", + `scio-core` % "compile->test", + `scio-elasticsearch8`, + `scio-extra`, `scio-google-cloud-platform`, + `scio-iceberg`, `scio-jdbc`, - `scio-extra`, - `scio-elasticsearch8`, `scio-neo4j`, - `scio-tensorflow`, - `scio-smb`, + `scio-parquet`, `scio-redis`, - `scio-parquet` + `scio-smb`, + `scio-tensorflow` ) .settings(commonSettings) .settings(soccoSettings) @@ -1422,6 +1443,35 @@ lazy val `scio-examples` = project "org.apache.beam" % "beam-sdks-java-extensions-sql" % beamVersion, "org.apache.beam" % "beam-sdks-java-io-google-cloud-platform" % beamVersion, "org.apache.beam" % "beam-sdks-java-io-jdbc" % beamVersion, + "org.apache.beam" % "beam-sdks-java-io-iceberg" % beamVersion, + // no + "org.apache.iceberg" % "iceberg-hive-metastore" % "1.4.2", + "org.apache.hive.hcatalog" % "hive-hcatalog-core" % "3.1.3", + /* + def hive_version = "3.1.3" +def iceberg_version = "1.4.2" + testRuntimeOnly library.java.snake_yaml + testRuntimeOnly library.java.bigdataoss_gcs_connector + testRuntimeOnly library.java.hadoop_client + + // needed to set up the test environment + testImplementation "org.apache.iceberg:iceberg-common:$iceberg_version" + testImplementation "org.apache.iceberg:iceberg-core:$iceberg_version" + testImplementation "org.assertj:assertj-core:3.11.1" + testImplementation library.java.junit + + // needed to set up test Hive Metastore and run tests + testImplementation ("org.apache.iceberg:iceberg-hive-metastore:$iceberg_version") + testImplementation project(path: ":sdks:java:io:iceberg:hive:exec", configuration: "shadow") + testRuntimeOnly ("org.apache.hive.hcatalog:hive-hcatalog-core:$hive_version") { + exclude group: "org.apache.hive", module: "hive-exec" + exclude group: "org.apache.parquet", module: "parquet-hadoop-bundle" + } + testImplementation "org.apache.iceberg:iceberg-parquet:$iceberg_version" + testImplementation "org.apache.parquet:parquet-column:1.12.0" + */ + + // no "org.apache.hadoop" % "hadoop-common" % hadoopVersion, "org.apache.httpcomponents" % "httpcore" % httpCoreVersion, "org.apache.parquet" % "parquet-column" % parquetVersion, diff --git a/scio-core/src/main/scala/com/spotify/scio/util/FilenamePolicySupplier.scala b/scio-core/src/main/scala/com/spotify/scio/util/FilenamePolicySupplier.scala index d66e1f3ec2..decf98bf41 100644 --- a/scio-core/src/main/scala/com/spotify/scio/util/FilenamePolicySupplier.scala +++ b/scio-core/src/main/scala/com/spotify/scio/util/FilenamePolicySupplier.scala @@ -23,6 +23,7 @@ import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions import org.apache.beam.sdk.io.fs.ResourceId import org.apache.beam.sdk.transforms.windowing.{BoundedWindow, PaneInfo} +@FunctionalInterface trait FilenamePolicySupplier { def apply(path: String, suffix: String): FilenamePolicy } diff --git a/scio-examples/src/main/scala/com/spotify/scio/examples/extra/IcebergExample.scala b/scio-examples/src/main/scala/com/spotify/scio/examples/extra/IcebergExample.scala new file mode 100644 index 0000000000..a71503f7cc --- /dev/null +++ b/scio-examples/src/main/scala/com/spotify/scio/examples/extra/IcebergExample.scala @@ -0,0 +1,66 @@ +/* + * Copyright 2024 Spotify AB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.spotify.scio.examples.extra + +import com.spotify.scio.ContextAndArgs +import com.spotify.scio.iceberg._ +import magnolify.beam._ +import magnolify.beam.logical.nanos._ +import org.joda.time.Instant + +// ## Iceberg IO example + +// Usage: + +// `sbt "runMain com.spotify.scio.examples.extra.IcebergExample +// --project=[PROJECT] --runner=DataflowRunner --region=[REGION NAME] +// FIXME +object IcebergExample { + /* + ---------------------------------------------------------------------------------------------> + partition | row(__PARTITIONTIME timestamp(6)) > + record_count | BigDecimal > + file_count | BigDecimal > + total_size | BigDecimal > + data | row(timestamp row(min timestamp(6), max timestamp(6), null_count BigDecimal, nan_count BigDecimal), country_code row(min varchar, max varchar, null_count BigDecimal, nan_count BigDecimal), url row(min varchar, max varchar, null_count BigDecimal, nan_count BigDecimal), project row(min varchar, max varchar, null_count BigDecimal, nan_count BigDecimal), tls_protocol + */ + case class FileDownloads( + record_count: BigDecimal, + file_count: BigDecimal, + total_size: BigDecimal, + data: Data + ) + case class Data(timestamp: Timestamp) + case class Timestamp(min: Instant, max: Instant, null_count: BigDecimal, nan_count: BigDecimal) + + def main(cmdlineArgs: Array[String]): Unit = { + val (sc, args) = ContextAndArgs(cmdlineArgs) + + sc.iceberg[FileDownloads]( + // TODO quoted things don't work via iceberg/hive + "", + "", + Map( + "type" -> "hive", + "uri" -> "", + "warehouse" -> "" + ) + ).debug(prefix = "FileDownload: ") + + sc.run() + } +} diff --git a/scio-examples/src/main/scala/com/spotify/scio/examples/extra/ManagedExample.scala b/scio-examples/src/main/scala/com/spotify/scio/examples/extra/ManagedExample.scala new file mode 100644 index 0000000000..8657144d9c --- /dev/null +++ b/scio-examples/src/main/scala/com/spotify/scio/examples/extra/ManagedExample.scala @@ -0,0 +1,62 @@ +package com.spotify.scio.examples.extra + +import com.spotify.scio.ContextAndArgs +import com.spotify.scio.coders.Coder +import com.spotify.scio.managed._ +import com.spotify.scio.values.SCollection +import magnolify.beam._ +import org.apache.beam.sdk.managed.Managed +import org.apache.beam.sdk.values.Row + +// ## Managed IO example + +// Usage: + +// `sbt "runMain com.spotify.scio.examples.extra.ManagedExample +// --project=[PROJECT] --runner=DataflowRunner --region=[REGION NAME] +// --table=[TABLE] --catalogName=[CATALOG] --catalogType=[CATALOG TYPE] +// --catalogUri=[CATALOG URI] --catalogWarehouse=[CATALOG WAREHOUSE] +// --output=[OUTPUT PATH] +object ManagedExample { + + case class Record(a: Int, b: String) + + def main(cmdlineArgs: Array[String]): Unit = { + val (sc, args) = ContextAndArgs(cmdlineArgs) + + val config: Map[String, Object] = Map( + "table" -> args("table"), + "catalog_name" -> args("catalogName"), + "catalog_properties" -> + Map( + "type" -> args("catalogType"), + "uri" -> args("catalogUri"), + "warehouse" -> args("catalogWarehouse") + ) + ) + + val rt = RowType[Record] + // provide implicit coder for Row with the schema derived from Record case class + implicit val recordRowCoder: Coder[Row] = Coder.row(rt.schema) + + // read beam Row instances from iceberg + val records: SCollection[Record] = sc + .managed( + Managed.ICEBERG, + // schema derived from the Record case class + rt.schema, + config + ) + // convert the Row instance to a Record + .map(rt.apply) + + records + .map(r => r.copy(a = r.a + 1)) + // convert the Record to a Row + .map(rt.apply) + // save Row instances to Iceberg + .saveAsManaged(Managed.ICEBERG, config) + + sc.run() + } +} diff --git a/scio-iceberg/src/main/scala/com/spotify/scio/iceberg/IcebergIO.scala b/scio-iceberg/src/main/scala/com/spotify/scio/iceberg/IcebergIO.scala new file mode 100644 index 0000000000..48e5875894 --- /dev/null +++ b/scio-iceberg/src/main/scala/com/spotify/scio/iceberg/IcebergIO.scala @@ -0,0 +1,49 @@ +/* + * Copyright 2024 Spotify AB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.spotify.scio.iceberg + +import com.spotify.scio.ScioContext +import com.spotify.scio.coders.Coder +import com.spotify.scio.io.{EmptyTapOf, ScioIO, Tap, TapT} +import com.spotify.scio.values.SCollection +import magnolify.beam.RowType +import org.apache.beam.sdk.managed.Managed +import com.spotify.scio.managed.ManagedIO +import org.apache.beam.sdk.values.Row + +final case class IcebergIO[T : RowType : Coder](config: Map[String, AnyRef]) extends ScioIO[T] { + override type ReadP = IcebergIO.ReadParam + override type WriteP = IcebergIO.WriteParam + override val tapT: TapT.Aux[T, Nothing] = EmptyTapOf[T] + + private lazy val rowType: RowType[T] = implicitly + private lazy val underlying: ManagedIO = ManagedIO(Managed.ICEBERG, config) + private lazy implicit val rowCoder: Coder[Row] = Coder.row(rowType.schema) + + override protected def read(sc: ScioContext, params: IcebergIO.ReadParam): SCollection[T] = + underlying.readWithContext(sc, params).map(rowType.from) + + override protected def write(data: SCollection[T], params: IcebergIO.WriteParam): Tap[tapT.T] = + underlying.writeWithContext(data.transform(_.map(rowType.to)), params).underlying + + override def tap(read: IcebergIO.ReadParam): Tap[tapT.T] = underlying.tap(read) +} + +object IcebergIO { + type ReadParam = ManagedIO.ReadParam + type WriteParam = ManagedIO.WriteParam +} diff --git a/scio-iceberg/src/main/scala/com/spotify/scio/iceberg/package.scala b/scio-iceberg/src/main/scala/com/spotify/scio/iceberg/package.scala new file mode 100644 index 0000000000..ce39cf310a --- /dev/null +++ b/scio-iceberg/src/main/scala/com/spotify/scio/iceberg/package.scala @@ -0,0 +1,28 @@ +/* + * Copyright 2024 Spotify AB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.spotify.scio + +import com.spotify.scio.iceberg.syntax.{ScioContextSyntax, SCollectionSyntax} + +/** + * Iceberg IO APIs. Import all. + * + * {{{ + * import com.spotify.scio.iceberg._ + * }}} + */ +package object iceberg extends ScioContextSyntax with SCollectionSyntax diff --git a/scio-iceberg/src/main/scala/com/spotify/scio/iceberg/syntax/IcebergSCollectionSyntax.scala b/scio-iceberg/src/main/scala/com/spotify/scio/iceberg/syntax/IcebergSCollectionSyntax.scala new file mode 100644 index 0000000000..cd686827ec --- /dev/null +++ b/scio-iceberg/src/main/scala/com/spotify/scio/iceberg/syntax/IcebergSCollectionSyntax.scala @@ -0,0 +1,34 @@ +/* + * Copyright 2024 Spotify AB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.spotify.scio.iceberg.syntax + +import com.spotify.scio.coders.Coder +import com.spotify.scio.iceberg.IcebergIO +import com.spotify.scio.io.ClosedTap +import com.spotify.scio.managed.ManagedIO +import com.spotify.scio.values.SCollection +import magnolify.beam.RowType + +class IcebergSCollectionSyntax[T : RowType : Coder](self: SCollection[T]) { + def saveAsIceberg(config: Map[String, AnyRef]): ClosedTap[Nothing] = + self.write(IcebergIO(config))(ManagedIO.WriteParam()) +} + +trait SCollectionSyntax { + implicit def icebergSCollectionSyntax[T : RowType : Coder](self: SCollection[T]): IcebergSCollectionSyntax[T] = + new IcebergSCollectionSyntax(self) +} diff --git a/scio-iceberg/src/main/scala/com/spotify/scio/iceberg/syntax/IcebergScioContextSyntax.scala b/scio-iceberg/src/main/scala/com/spotify/scio/iceberg/syntax/IcebergScioContextSyntax.scala new file mode 100644 index 0000000000..fcb37eedcd --- /dev/null +++ b/scio-iceberg/src/main/scala/com/spotify/scio/iceberg/syntax/IcebergScioContextSyntax.scala @@ -0,0 +1,49 @@ +/* + * Copyright 2024 Spotify AB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.spotify.scio.iceberg.syntax + +import com.spotify.scio.ScioContext +import com.spotify.scio.coders.Coder +import com.spotify.scio.iceberg.IcebergIO +import com.spotify.scio.managed.ManagedIO +import com.spotify.scio.values.SCollection +import magnolify.beam.RowType + + +class IcebergScioContextSyntax(self: ScioContext) { + def iceberg[T : Coder](config: Map[String, AnyRef])(implicit rt: RowType[T]): SCollection[T] = + self.read(IcebergIO(config))(ManagedIO.ReadParam(rt.schema)) + + /** + * @see [[org.apache.beam.sdk.io.iceberg.SchemaTransformConfiguration SchemaTransformConfiguration]] + */ + def iceberg[T : Coder](table: String, catalogName: String = null, catalogProperties: Map[String, String] = null, configProperties: Map[String, String] = null)(implicit rt: RowType[T]): SCollection[T] = { + // see + val config = Map[String, AnyRef]( + "table" -> table, + "catalog_name" -> catalogName, + "catalog_properties" -> catalogProperties, + "config_properties" -> configProperties + ).filter(_._2 != null) + iceberg(config) + } +} + +trait ScioContextSyntax { + implicit def icebergScioContextSyntax(self: ScioContext): IcebergScioContextSyntax = + new IcebergScioContextSyntax(self) +} diff --git a/scio-iceberg/src/main/scala/com/spotify/scio/managed/ManagedIO.scala b/scio-iceberg/src/main/scala/com/spotify/scio/managed/ManagedIO.scala new file mode 100644 index 0000000000..67463f1352 --- /dev/null +++ b/scio-iceberg/src/main/scala/com/spotify/scio/managed/ManagedIO.scala @@ -0,0 +1,70 @@ +/* + * Copyright 2024 Spotify AB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.spotify.scio.managed + +import com.spotify.scio.ScioContext +import com.spotify.scio.coders.{Coder, CoderMaterializer} +import com.spotify.scio.io.{EmptyTap, EmptyTapOf, ScioIO, Tap, TapT} +import com.spotify.scio.values.SCollection +import org.apache.beam.sdk.managed.Managed +import org.apache.beam.sdk.schemas.Schema +import org.apache.beam.sdk.values.{PCollectionRowTuple, Row} +import scala.jdk.CollectionConverters._ + +final case class ManagedIO(ioName: String, config: Map[String, Object]) extends ScioIO[Row] { + override type ReadP = ManagedIO.ReadParam + override type WriteP = ManagedIO.WriteParam + override val tapT: TapT.Aux[Row, Nothing] = EmptyTapOf[Row] + + private lazy val _config: java.util.Map[String, Object] = { + // recursively convert this yaml-compatible nested scala map to java map + // we either do this or the user has to create nested java maps in scala code + // both are bad + def _convert(a: Object): Object = { + a match { + case m: Map[_, _] => m.asInstanceOf[Map[_, Object]].map { case (k, v) => k -> _convert(v) }.asJava + case i: Iterable[_] => i.map(o => _convert(o.asInstanceOf[Object])).asJava + case _ => a + } + } + config.map { case (k, v) => k -> _convert(v) }.asJava + } + + // not-ideal IO naming, but we have no identifier except the config map + override def testId: String = s"ManagedIO($ioName, ${config.toString})" + override protected def read(sc: ScioContext, params: ManagedIO.ReadParam): SCollection[Row] = { + sc.wrap( + sc.applyInternal[PCollectionRowTuple]( + Managed.read(ioName).withConfig(_config) + ).getSinglePCollection + ) + .setCoder(CoderMaterializer.beam(sc, Coder.row(params.schema))) + } + + override protected def write(data: SCollection[Row], params: ManagedIO.WriteParam): Tap[tapT.T] = { + val t = Managed.write(ioName).withConfig(_config) + data.applyInternal(t) + EmptyTap + } + + override def tap(read: ManagedIO.ReadParam): Tap[tapT.T] = EmptyTap +} + +object ManagedIO { + final case class ReadParam(schema: Schema) + final case class WriteParam() +} diff --git a/scio-iceberg/src/main/scala/com/spotify/scio/managed/package.scala b/scio-iceberg/src/main/scala/com/spotify/scio/managed/package.scala new file mode 100644 index 0000000000..963be7e1b8 --- /dev/null +++ b/scio-iceberg/src/main/scala/com/spotify/scio/managed/package.scala @@ -0,0 +1,28 @@ +/* + * Copyright 2024 Spotify AB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.spotify.scio + +import com.spotify.scio.managed.syntax.{ScioContextSyntax, SCollectionSyntax} + +/** + * Managed IO APIs. Import all. + * + * {{{ + * import com.spotify.scio.managed._ + * }}} + */ +package object managed extends ScioContextSyntax with SCollectionSyntax diff --git a/scio-iceberg/src/main/scala/com/spotify/scio/managed/syntax/ManagedSCollectionSyntax.scala b/scio-iceberg/src/main/scala/com/spotify/scio/managed/syntax/ManagedSCollectionSyntax.scala new file mode 100644 index 0000000000..7186e5b644 --- /dev/null +++ b/scio-iceberg/src/main/scala/com/spotify/scio/managed/syntax/ManagedSCollectionSyntax.scala @@ -0,0 +1,32 @@ +/* + * Copyright 2024 Spotify AB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.spotify.scio.managed.syntax + +import com.spotify.scio.io.ClosedTap +import com.spotify.scio.managed.ManagedIO +import com.spotify.scio.values.SCollection +import org.apache.beam.sdk.values.Row + +class ManagedSCollectionSyntax(self: SCollection[Row]) { + def saveAsManaged(sink: String, config: Map[String, AnyRef] = Map.empty): ClosedTap[Nothing] = + self.write(ManagedIO(sink, config))(ManagedIO.WriteParam()) +} + +trait SCollectionSyntax { + implicit def managedSCollectionSyntax(self: SCollection[Row]): ManagedSCollectionSyntax = + new ManagedSCollectionSyntax(self) +} diff --git a/scio-iceberg/src/main/scala/com/spotify/scio/managed/syntax/ManagedScioContextSyntax.scala b/scio-iceberg/src/main/scala/com/spotify/scio/managed/syntax/ManagedScioContextSyntax.scala new file mode 100644 index 0000000000..ee09b69d29 --- /dev/null +++ b/scio-iceberg/src/main/scala/com/spotify/scio/managed/syntax/ManagedScioContextSyntax.scala @@ -0,0 +1,33 @@ +/* + * Copyright 2024 Spotify AB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.spotify.scio.managed.syntax + +import com.spotify.scio.ScioContext +import com.spotify.scio.managed.ManagedIO +import com.spotify.scio.values.SCollection +import org.apache.beam.sdk.schemas.Schema +import org.apache.beam.sdk.values.Row + +class ManagedScioContextSyntax(self: ScioContext) { + def managed(source: String, schema: Schema, config: Map[String, Object] = Map.empty): SCollection[Row] = + self.read[Row](ManagedIO(source, config))(ManagedIO.ReadParam(schema)) +} + +trait ScioContextSyntax { + implicit def managedScioContextSyntax(self: ScioContext): ManagedScioContextSyntax = + new ManagedScioContextSyntax(self) +} diff --git a/site/src/main/paradox/io/Iceberg.md b/site/src/main/paradox/io/Iceberg.md new file mode 100644 index 0000000000..4548243b39 --- /dev/null +++ b/site/src/main/paradox/io/Iceberg.md @@ -0,0 +1,3 @@ +# Iceberg + +TODO \ No newline at end of file diff --git a/site/src/main/paradox/io/Managed.md b/site/src/main/paradox/io/Managed.md new file mode 100644 index 0000000000..66b6dffd4d --- /dev/null +++ b/site/src/main/paradox/io/Managed.md @@ -0,0 +1,48 @@ +# Managed IO + +Beam's Managed transforms move responsibility for the creation of transform classes from user code to the runner, allowing runner-specific optimizations like hot-swapping an instance of a transform with an updated one. +Beam currently supports Iceberg and Kafka managed transforms. +See also [Dataflow's supported transforms](https://cloud.google.com/dataflow/docs/guides/managed-io). + +A Scio @ref:[Coder](../internals/Coders.md) must be defined for the Beam @javadoc[Row](org.apache.beam.sdk.values.Row), derived from the Beam @javadoc[Schema](org.apache.beam.sdk.schemas.Schema) expected from the datasource. +If you have more than one type of data being read into Beam Rows, you will need to provide the coders explicitly instead of implicitly. + +The source and sink parameters should be imported from Beam's @javadoc[Managed](org.apache.beam.sdk.managed.Managed). + +```scala mdoc:compile-only +import com.spotify.scio.ScioContext +import com.spotify.scio.managed._ +import com.spotify.scio.values.SCollection +import org.apache.beam.sdk.managed.Managed +import org.apache.beam.sdk.schemas.Schema +import org.apache.beam.sdk.values.Row + +val sc: ScioContext = ??? + +val rowSchema: Schema = ??? +implicit val rowCoder: Coder[Row] = Coder.row(rowSchema) + +val config: Map[String, Object] = ??? +val rows: SCollection[Row] = sc.managed(Managed.ICEBERG, schema, config) +``` + +Saving data to a Managed IO is similar: + +```scala mdoc:compile-only +import com.spotify.scio.ScioContext +import com.spotify.scio.managed._ +import com.spotify.scio.values.SCollection +import org.apache.beam.sdk.managed.Managed +import org.apache.beam.sdk.schemas.Schema +import org.apache.beam.sdk.values.Row + +val rows: SCollection[Row] = ??? +val config: Map[String, Object] = ??? + +rows.saveAsManaged(Managed.ICEBERG, config) +``` + +[Magnolify's](https://github.com/spotify/magnolify) `RowType` (available as part of the `magnolify-beam` artifact) provides automatically-derived mappings between Beam's `Row` and scala case classes. + +See [full documentation here](https://github.com/spotify/magnolify/blob/main/docs/beam.md) and [an example usage here](https://spotify.github.io/scio/examples/extra/ManagedExample.scala.html). + diff --git a/site/src/main/paradox/io/Neo4J.md b/site/src/main/paradox/io/Neo4J.md index 8a932965fc..40d4977d46 100644 --- a/site/src/main/paradox/io/Neo4J.md +++ b/site/src/main/paradox/io/Neo4J.md @@ -1,6 +1,6 @@ # Neo4J -Scio provides support [Neo4J](https://neo4j.com/) in the `scio-neo4j` artifact. +Scio provides support for [Neo4J](https://neo4j.com/) in the `scio-neo4j` artifact. Scio uses [magnolify's](https://github.com/spotify/magnolify) `magnolify-neo4j` to convert to and from Neo4J types. diff --git a/site/src/main/paradox/io/index.md b/site/src/main/paradox/io/index.md index 72a47d427a..422f8a6023 100644 --- a/site/src/main/paradox/io/index.md +++ b/site/src/main/paradox/io/index.md @@ -11,10 +11,12 @@ * @ref:[Cassandra](Cassandra.md) * @ref:[CSV](Csv.md) * @ref:[Datastore](Datastore.md) -* @ref:[Grpc](Grpc.md) * @ref:[Elasticsearch](Elasticsearch.md) +* @ref:[Grpc](Grpc.md) +* @ref:[Iceberg](Iceberg.md) * @ref:[JDBC](Jdbc.md) * @ref:[Json](Json.md) +* @ref:[Managed](Managed.md) * @ref:[Neo4J](Neo4J.md) * @ref:[Object](Object.md) * @ref:[Parquet](Parquet.md) From 5ed2f0483b3a5da1a3ad0443d83ded9f2ffa2304 Mon Sep 17 00:00:00 2001 From: Kellen Dye Date: Wed, 11 Sep 2024 11:07:51 -0400 Subject: [PATCH 02/13] wip --- build.sbt | 1 + site/src/main/paradox/io/Managed.md | 4 +++- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/build.sbt b/build.sbt index 153ee67270..5df113aa3d 100644 --- a/build.sbt +++ b/build.sbt @@ -1812,6 +1812,7 @@ lazy val site = project `scio-extra`, `scio-google-cloud-platform`, `scio-grpc` % "compile->test", + `scio-iceberg`, `scio-jdbc`, `scio-macros`, `scio-neo4j`, diff --git a/site/src/main/paradox/io/Managed.md b/site/src/main/paradox/io/Managed.md index 66b6dffd4d..92f23346b6 100644 --- a/site/src/main/paradox/io/Managed.md +++ b/site/src/main/paradox/io/Managed.md @@ -11,6 +11,7 @@ The source and sink parameters should be imported from Beam's @javadoc[Managed]( ```scala mdoc:compile-only import com.spotify.scio.ScioContext +import com.spotify.scio.coders.Coder import com.spotify.scio.managed._ import com.spotify.scio.values.SCollection import org.apache.beam.sdk.managed.Managed @@ -23,13 +24,14 @@ val rowSchema: Schema = ??? implicit val rowCoder: Coder[Row] = Coder.row(rowSchema) val config: Map[String, Object] = ??? -val rows: SCollection[Row] = sc.managed(Managed.ICEBERG, schema, config) +val rows: SCollection[Row] = sc.managed(Managed.ICEBERG, rowSchema, config) ``` Saving data to a Managed IO is similar: ```scala mdoc:compile-only import com.spotify.scio.ScioContext +import com.spotify.scio.coders.Coder import com.spotify.scio.managed._ import com.spotify.scio.values.SCollection import org.apache.beam.sdk.managed.Managed From d0a01e85b8e43171b7cc210d1a034938ea7711ca Mon Sep 17 00:00:00 2001 From: Kellen Dye Date: Wed, 11 Sep 2024 18:10:53 -0400 Subject: [PATCH 03/13] wip --- .../scio/examples/extra/IcebergExample.scala | 58 +++++++++---------- .../scio/examples/extra/ManagedExample.scala | 16 ++--- .../scio/examples/extra/RedisExamples.scala | 6 +- .../com/spotify/scio/iceberg/IcebergIO.scala | 42 +++++++++++--- .../com/spotify/scio/iceberg/package.scala | 2 +- .../syntax/IcebergSCollectionSyntax.scala | 18 ++++-- .../syntax/IcebergScioContextSyntax.scala | 24 ++++---- .../com/spotify/scio/managed/ManagedIO.scala | 19 +++--- .../com/spotify/scio/managed/package.scala | 2 +- .../syntax/ManagedScioContextSyntax.scala | 6 +- site/src/main/paradox/io/Iceberg.md | 46 ++++++++++++++- site/src/main/paradox/io/Managed.md | 36 ++++++++++-- 12 files changed, 189 insertions(+), 86 deletions(-) diff --git a/scio-examples/src/main/scala/com/spotify/scio/examples/extra/IcebergExample.scala b/scio-examples/src/main/scala/com/spotify/scio/examples/extra/IcebergExample.scala index a71503f7cc..b3ffc386e5 100644 --- a/scio-examples/src/main/scala/com/spotify/scio/examples/extra/IcebergExample.scala +++ b/scio-examples/src/main/scala/com/spotify/scio/examples/extra/IcebergExample.scala @@ -19,47 +19,47 @@ package com.spotify.scio.examples.extra import com.spotify.scio.ContextAndArgs import com.spotify.scio.iceberg._ import magnolify.beam._ -import magnolify.beam.logical.nanos._ -import org.joda.time.Instant -// ## Iceberg IO example +// Example: Apache Iceberg read/write Example // Usage: // `sbt "runMain com.spotify.scio.examples.extra.IcebergExample // --project=[PROJECT] --runner=DataflowRunner --region=[REGION NAME] -// FIXME +// --inputTable=[INPUT TABLE] --catalogName=[CATALOG NAME] +// --catalogType=[CATALOG TYPE] --catalogUri=[CATALOG URI] +// --catalogWarehouse=[CATALOG WAREHOUSE] --outputTable=[OUTPUT TABLE]"` object IcebergExample { - /* - ---------------------------------------------------------------------------------------------> - partition | row(__PARTITIONTIME timestamp(6)) > - record_count | BigDecimal > - file_count | BigDecimal > - total_size | BigDecimal > - data | row(timestamp row(min timestamp(6), max timestamp(6), null_count BigDecimal, nan_count BigDecimal), country_code row(min varchar, max varchar, null_count BigDecimal, nan_count BigDecimal), url row(min varchar, max varchar, null_count BigDecimal, nan_count BigDecimal), project row(min varchar, max varchar, null_count BigDecimal, nan_count BigDecimal), tls_protocol - */ - case class FileDownloads( - record_count: BigDecimal, - file_count: BigDecimal, - total_size: BigDecimal, - data: Data - ) - case class Data(timestamp: Timestamp) - case class Timestamp(min: Instant, max: Instant, null_count: BigDecimal, nan_count: BigDecimal) + + case class Record(a: Int, b: String) def main(cmdlineArgs: Array[String]): Unit = { val (sc, args) = ContextAndArgs(cmdlineArgs) - sc.iceberg[FileDownloads]( - // TODO quoted things don't work via iceberg/hive - "", - "", - Map( - "type" -> "hive", - "uri" -> "", - "warehouse" -> "" + // Catalog configuration + val catalogConfig = Map( + "type" -> args("catalogType"), + "uri" -> args("catalogUri"), + "warehouse" -> args("catalogWarehouse") + ) + + // Derive a conversion between Record and Beam Row + implicit val rt: RowType[Record] = RowType[Record] + + sc + // Read Records from Iceberg + .iceberg[Record]( + args("inputTable"), + args.optional("catalogName").orNull, + catalogConfig + ) + .map(r => r.copy(a = r.a + 1)) + // Write Records to Iceberg + .saveAsIceberg( + args("outputTable"), + args.optional("catalogName").orNull, + catalogConfig ) - ).debug(prefix = "FileDownload: ") sc.run() } diff --git a/scio-examples/src/main/scala/com/spotify/scio/examples/extra/ManagedExample.scala b/scio-examples/src/main/scala/com/spotify/scio/examples/extra/ManagedExample.scala index 8657144d9c..5651c48564 100644 --- a/scio-examples/src/main/scala/com/spotify/scio/examples/extra/ManagedExample.scala +++ b/scio-examples/src/main/scala/com/spotify/scio/examples/extra/ManagedExample.scala @@ -8,7 +8,7 @@ import magnolify.beam._ import org.apache.beam.sdk.managed.Managed import org.apache.beam.sdk.values.Row -// ## Managed IO example +// Example: Beam's Managed IO // Usage: @@ -16,7 +16,7 @@ import org.apache.beam.sdk.values.Row // --project=[PROJECT] --runner=DataflowRunner --region=[REGION NAME] // --table=[TABLE] --catalogName=[CATALOG] --catalogType=[CATALOG TYPE] // --catalogUri=[CATALOG URI] --catalogWarehouse=[CATALOG WAREHOUSE] -// --output=[OUTPUT PATH] +// --output=[OUTPUT PATH]"` object ManagedExample { case class Record(a: Int, b: String) @@ -36,25 +36,25 @@ object ManagedExample { ) val rt = RowType[Record] - // provide implicit coder for Row with the schema derived from Record case class + // Provide an implicit coder for Row with the schema derived from Record case class implicit val recordRowCoder: Coder[Row] = Coder.row(rt.schema) - // read beam Row instances from iceberg + // Read beam Row instances from iceberg val records: SCollection[Record] = sc .managed( Managed.ICEBERG, - // schema derived from the Record case class + // Schema derived from the Record case class rt.schema, config ) - // convert the Row instance to a Record + // Convert the Row instance to a Record .map(rt.apply) records .map(r => r.copy(a = r.a + 1)) - // convert the Record to a Row + // Convert the Record to a Row .map(rt.apply) - // save Row instances to Iceberg + // Save Row instances to Iceberg .saveAsManaged(Managed.ICEBERG, config) sc.run() diff --git a/scio-examples/src/main/scala/com/spotify/scio/examples/extra/RedisExamples.scala b/scio-examples/src/main/scala/com/spotify/scio/examples/extra/RedisExamples.scala index 68fced35b6..b50f22d58c 100644 --- a/scio-examples/src/main/scala/com/spotify/scio/examples/extra/RedisExamples.scala +++ b/scio-examples/src/main/scala/com/spotify/scio/examples/extra/RedisExamples.scala @@ -66,7 +66,7 @@ object RedisReadStringsExample { // `sbt "runMain com.spotify.scio.examples.extra.RedisWriteBatchExample // --project=[PROJECT] --runner=DataflowRunner --region=[REGION NAME] // --redisHost=[REDIS_HOST] -// --redisPort=[REDIS_PORT]` +// --redisPort=[REDIS_PORT]"` object RedisWriteBatchExample { def main(cmdlineArgs: Array[String]): Unit = { @@ -98,7 +98,7 @@ object RedisWriteBatchExample { // --project=[PROJECT] --runner=DataflowRunner --region=[REGION NAME] // --subscription=[PUBSUB_SUBSCRIPTION] // --redisHost=[REDIS_HOST] -// --redisPort=[REDIS_PORT]` +// --redisPort=[REDIS_PORT]"` object RedisWriteStreamingExample { def main(cmdlineArgs: Array[String]): Unit = { @@ -135,7 +135,7 @@ object RedisWriteStreamingExample { // `sbt "runMain com.spotify.scio.examples.extra.RedisLookUpStringsExample // --project=[PROJECT] --runner=DataflowRunner --region=[REGION NAME] // --redisHost=[REDIS_HOST] -// --redisPort=[REDIS_PORT] +// --redisPort=[REDIS_PORT]"` object RedisLookUpStringsExample { def main(cmdlineArgs: Array[String]): Unit = { diff --git a/scio-iceberg/src/main/scala/com/spotify/scio/iceberg/IcebergIO.scala b/scio-iceberg/src/main/scala/com/spotify/scio/iceberg/IcebergIO.scala index 48e5875894..c458ed15b7 100644 --- a/scio-iceberg/src/main/scala/com/spotify/scio/iceberg/IcebergIO.scala +++ b/scio-iceberg/src/main/scala/com/spotify/scio/iceberg/IcebergIO.scala @@ -18,32 +18,56 @@ package com.spotify.scio.iceberg import com.spotify.scio.ScioContext import com.spotify.scio.coders.Coder -import com.spotify.scio.io.{EmptyTapOf, ScioIO, Tap, TapT} +import com.spotify.scio.io.{EmptyTap, EmptyTapOf, ScioIO, Tap, TapT} import com.spotify.scio.values.SCollection import magnolify.beam.RowType import org.apache.beam.sdk.managed.Managed import com.spotify.scio.managed.ManagedIO import org.apache.beam.sdk.values.Row -final case class IcebergIO[T : RowType : Coder](config: Map[String, AnyRef]) extends ScioIO[T] { +final case class IcebergIO[T: RowType: Coder](table: String, catalogName: Option[String]) + extends ScioIO[T] { override type ReadP = IcebergIO.ReadParam override type WriteP = IcebergIO.WriteParam override val tapT: TapT.Aux[T, Nothing] = EmptyTapOf[T] private lazy val rowType: RowType[T] = implicitly - private lazy val underlying: ManagedIO = ManagedIO(Managed.ICEBERG, config) - private lazy implicit val rowCoder: Coder[Row] = Coder.row(rowType.schema) + implicit private lazy val rowCoder: Coder[Row] = Coder.row(rowType.schema) + + override def testId: String = s"IcebergIO($table, $catalogName)" + + private def config( + catalogProperties: Map[String, String], + configProperties: Map[String, String] + ): Map[String, AnyRef] = { + Map[String, AnyRef]( + "table" -> table, + "catalog_name" -> catalogName.orNull, + "catalog_properties" -> catalogProperties, + "config_properties" -> configProperties + ).filter(_._2 != null) + } override protected def read(sc: ScioContext, params: IcebergIO.ReadParam): SCollection[T] = - underlying.readWithContext(sc, params).map(rowType.from) + ManagedIO(Managed.ICEBERG, config(params.catalogProperties, params.configProperties)) + .readWithContext(sc, ManagedIO.ReadParam(rowType.schema)) + .map(rowType.from) override protected def write(data: SCollection[T], params: IcebergIO.WriteParam): Tap[tapT.T] = - underlying.writeWithContext(data.transform(_.map(rowType.to)), params).underlying + ManagedIO(Managed.ICEBERG, config(params.catalogProperties, params.configProperties)) + .writeWithContext(data.transform(_.map(rowType.to)), ManagedIO.WriteParam()) + .underlying - override def tap(read: IcebergIO.ReadParam): Tap[tapT.T] = underlying.tap(read) + override def tap(read: IcebergIO.ReadParam): Tap[tapT.T] = EmptyTap } object IcebergIO { - type ReadParam = ManagedIO.ReadParam - type WriteParam = ManagedIO.WriteParam + case class ReadParam private ( + catalogProperties: Map[String, String] = null, + configProperties: Map[String, String] = null + ) + case class WriteParam private ( + catalogProperties: Map[String, String] = null, + configProperties: Map[String, String] = null + ) } diff --git a/scio-iceberg/src/main/scala/com/spotify/scio/iceberg/package.scala b/scio-iceberg/src/main/scala/com/spotify/scio/iceberg/package.scala index ce39cf310a..579f505a1a 100644 --- a/scio-iceberg/src/main/scala/com/spotify/scio/iceberg/package.scala +++ b/scio-iceberg/src/main/scala/com/spotify/scio/iceberg/package.scala @@ -16,7 +16,7 @@ package com.spotify.scio -import com.spotify.scio.iceberg.syntax.{ScioContextSyntax, SCollectionSyntax} +import com.spotify.scio.iceberg.syntax.{SCollectionSyntax, ScioContextSyntax} /** * Iceberg IO APIs. Import all. diff --git a/scio-iceberg/src/main/scala/com/spotify/scio/iceberg/syntax/IcebergSCollectionSyntax.scala b/scio-iceberg/src/main/scala/com/spotify/scio/iceberg/syntax/IcebergSCollectionSyntax.scala index cd686827ec..c3e010ac15 100644 --- a/scio-iceberg/src/main/scala/com/spotify/scio/iceberg/syntax/IcebergSCollectionSyntax.scala +++ b/scio-iceberg/src/main/scala/com/spotify/scio/iceberg/syntax/IcebergSCollectionSyntax.scala @@ -19,16 +19,24 @@ package com.spotify.scio.iceberg.syntax import com.spotify.scio.coders.Coder import com.spotify.scio.iceberg.IcebergIO import com.spotify.scio.io.ClosedTap -import com.spotify.scio.managed.ManagedIO import com.spotify.scio.values.SCollection import magnolify.beam.RowType -class IcebergSCollectionSyntax[T : RowType : Coder](self: SCollection[T]) { - def saveAsIceberg(config: Map[String, AnyRef]): ClosedTap[Nothing] = - self.write(IcebergIO(config))(ManagedIO.WriteParam()) +class IcebergSCollectionSyntax[T: RowType: Coder](self: SCollection[T]) { + def saveAsIceberg( + table: String, + catalogName: String = null, + catalogProperties: Map[String, String] = null, + configProperties: Map[String, String] = null + ): ClosedTap[Nothing] = { + val params = IcebergIO.WriteParam(catalogProperties, configProperties) + self.write(IcebergIO(table, Option(catalogName)))(params) + } } trait SCollectionSyntax { - implicit def icebergSCollectionSyntax[T : RowType : Coder](self: SCollection[T]): IcebergSCollectionSyntax[T] = + implicit def icebergSCollectionSyntax[T: RowType: Coder]( + self: SCollection[T] + ): IcebergSCollectionSyntax[T] = new IcebergSCollectionSyntax(self) } diff --git a/scio-iceberg/src/main/scala/com/spotify/scio/iceberg/syntax/IcebergScioContextSyntax.scala b/scio-iceberg/src/main/scala/com/spotify/scio/iceberg/syntax/IcebergScioContextSyntax.scala index fcb37eedcd..b587ef413e 100644 --- a/scio-iceberg/src/main/scala/com/spotify/scio/iceberg/syntax/IcebergScioContextSyntax.scala +++ b/scio-iceberg/src/main/scala/com/spotify/scio/iceberg/syntax/IcebergScioContextSyntax.scala @@ -19,27 +19,23 @@ package com.spotify.scio.iceberg.syntax import com.spotify.scio.ScioContext import com.spotify.scio.coders.Coder import com.spotify.scio.iceberg.IcebergIO -import com.spotify.scio.managed.ManagedIO import com.spotify.scio.values.SCollection import magnolify.beam.RowType - class IcebergScioContextSyntax(self: ScioContext) { - def iceberg[T : Coder](config: Map[String, AnyRef])(implicit rt: RowType[T]): SCollection[T] = - self.read(IcebergIO(config))(ManagedIO.ReadParam(rt.schema)) /** - * @see [[org.apache.beam.sdk.io.iceberg.SchemaTransformConfiguration SchemaTransformConfiguration]] + * @see + * [[org.apache.beam.sdk.io.iceberg.SchemaTransformConfiguration SchemaTransformConfiguration]] */ - def iceberg[T : Coder](table: String, catalogName: String = null, catalogProperties: Map[String, String] = null, configProperties: Map[String, String] = null)(implicit rt: RowType[T]): SCollection[T] = { - // see - val config = Map[String, AnyRef]( - "table" -> table, - "catalog_name" -> catalogName, - "catalog_properties" -> catalogProperties, - "config_properties" -> configProperties - ).filter(_._2 != null) - iceberg(config) + def iceberg[T: Coder]( + table: String, + catalogName: String = null, + catalogProperties: Map[String, String] = null, + configProperties: Map[String, String] = null + )(implicit rt: RowType[T]): SCollection[T] = { + val params = IcebergIO.ReadParam(catalogProperties, configProperties) + self.read(IcebergIO(table, Option(catalogName)))(params) } } diff --git a/scio-iceberg/src/main/scala/com/spotify/scio/managed/ManagedIO.scala b/scio-iceberg/src/main/scala/com/spotify/scio/managed/ManagedIO.scala index 67463f1352..02686a9448 100644 --- a/scio-iceberg/src/main/scala/com/spotify/scio/managed/ManagedIO.scala +++ b/scio-iceberg/src/main/scala/com/spotify/scio/managed/ManagedIO.scala @@ -36,9 +36,10 @@ final case class ManagedIO(ioName: String, config: Map[String, Object]) extends // both are bad def _convert(a: Object): Object = { a match { - case m: Map[_, _] => m.asInstanceOf[Map[_, Object]].map { case (k, v) => k -> _convert(v) }.asJava + case m: Map[_, _] => + m.asInstanceOf[Map[_, Object]].map { case (k, v) => k -> _convert(v) }.asJava case i: Iterable[_] => i.map(o => _convert(o.asInstanceOf[Object])).asJava - case _ => a + case _ => a } } config.map { case (k, v) => k -> _convert(v) }.asJava @@ -48,14 +49,16 @@ final case class ManagedIO(ioName: String, config: Map[String, Object]) extends override def testId: String = s"ManagedIO($ioName, ${config.toString})" override protected def read(sc: ScioContext, params: ManagedIO.ReadParam): SCollection[Row] = { sc.wrap( - sc.applyInternal[PCollectionRowTuple]( - Managed.read(ioName).withConfig(_config) - ).getSinglePCollection - ) - .setCoder(CoderMaterializer.beam(sc, Coder.row(params.schema))) + sc.applyInternal[PCollectionRowTuple]( + Managed.read(ioName).withConfig(_config) + ).getSinglePCollection + ).setCoder(CoderMaterializer.beam(sc, Coder.row(params.schema))) } - override protected def write(data: SCollection[Row], params: ManagedIO.WriteParam): Tap[tapT.T] = { + override protected def write( + data: SCollection[Row], + params: ManagedIO.WriteParam + ): Tap[tapT.T] = { val t = Managed.write(ioName).withConfig(_config) data.applyInternal(t) EmptyTap diff --git a/scio-iceberg/src/main/scala/com/spotify/scio/managed/package.scala b/scio-iceberg/src/main/scala/com/spotify/scio/managed/package.scala index 963be7e1b8..dd0def4dd3 100644 --- a/scio-iceberg/src/main/scala/com/spotify/scio/managed/package.scala +++ b/scio-iceberg/src/main/scala/com/spotify/scio/managed/package.scala @@ -16,7 +16,7 @@ package com.spotify.scio -import com.spotify.scio.managed.syntax.{ScioContextSyntax, SCollectionSyntax} +import com.spotify.scio.managed.syntax.{SCollectionSyntax, ScioContextSyntax} /** * Managed IO APIs. Import all. diff --git a/scio-iceberg/src/main/scala/com/spotify/scio/managed/syntax/ManagedScioContextSyntax.scala b/scio-iceberg/src/main/scala/com/spotify/scio/managed/syntax/ManagedScioContextSyntax.scala index ee09b69d29..a00b7f45b2 100644 --- a/scio-iceberg/src/main/scala/com/spotify/scio/managed/syntax/ManagedScioContextSyntax.scala +++ b/scio-iceberg/src/main/scala/com/spotify/scio/managed/syntax/ManagedScioContextSyntax.scala @@ -23,7 +23,11 @@ import org.apache.beam.sdk.schemas.Schema import org.apache.beam.sdk.values.Row class ManagedScioContextSyntax(self: ScioContext) { - def managed(source: String, schema: Schema, config: Map[String, Object] = Map.empty): SCollection[Row] = + def managed( + source: String, + schema: Schema, + config: Map[String, Object] = Map.empty + ): SCollection[Row] = self.read[Row](ManagedIO(source, config))(ManagedIO.ReadParam(schema)) } diff --git a/site/src/main/paradox/io/Iceberg.md b/site/src/main/paradox/io/Iceberg.md index 4548243b39..c01adcd865 100644 --- a/site/src/main/paradox/io/Iceberg.md +++ b/site/src/main/paradox/io/Iceberg.md @@ -1,3 +1,47 @@ # Iceberg -TODO \ No newline at end of file +Scio supports reading from and writing to [Apache Iceberg](https://iceberg.apache.org/) via Beam's @ref[Managed transforms](Managed.md). +[Magnolify's](https://github.com/spotify/magnolify) `RowType` (available as part of the `magnolify-beam` artifact) provides automatically-derived mappings between scala case classes and Beam's `Row`, used by the underlying managed transform. See [full documentation here](https://github.com/spotify/magnolify/blob/main/docs/beam.md). + +To read: + +```scala mdoc:compile-only +import com.spotify.scio.ScioContext +import com.spotify.scio.iceberg._ +import com.spotify.scio.values.SCollection +import magnolify.beam._ + +case class Record(a: Int, b: String) +implicit val rt: RowType[Record] = RowType[Record] + +val sc: ScioContext = ??? +val table: String = ??? +val catalogName: String = ??? +val catalogConfig: Map[String, String] = ??? + +val records: SCollection[Record] = sc.iceberg[Record]( + table, + catalogName, + catalogConfig +) +``` + +To write: + +```scala mdoc:invisible +import com.spotify.scio.iceberg._ +import com.spotify.scio.values.SCollection +import magnolify.beam._ +case class Record(a: Int, b: String) +implicit val rt: RowType[Record] = RowType[Record] +``` + +```scala mdoc:compile-only +val records: SCollection[Record] = ??? + +val table: String = ??? +val catalogName: String = ??? +val catalogConfig: Map[String, String] = ??? + +records.saveAsIceberg(table, catalogName, catalogConfig) +``` diff --git a/site/src/main/paradox/io/Managed.md b/site/src/main/paradox/io/Managed.md index 92f23346b6..c6f2f6275a 100644 --- a/site/src/main/paradox/io/Managed.md +++ b/site/src/main/paradox/io/Managed.md @@ -28,23 +28,47 @@ val rows: SCollection[Row] = sc.managed(Managed.ICEBERG, rowSchema, config) ``` Saving data to a Managed IO is similar: - -```scala mdoc:compile-only -import com.spotify.scio.ScioContext -import com.spotify.scio.coders.Coder +```scala mdoc:invisible import com.spotify.scio.managed._ +import com.spotify.scio.coders.Coder import com.spotify.scio.values.SCollection import org.apache.beam.sdk.managed.Managed import org.apache.beam.sdk.schemas.Schema import org.apache.beam.sdk.values.Row +``` +```scala mdoc:compile-only val rows: SCollection[Row] = ??? val config: Map[String, Object] = ??? rows.saveAsManaged(Managed.ICEBERG, config) ``` -[Magnolify's](https://github.com/spotify/magnolify) `RowType` (available as part of the `magnolify-beam` artifact) provides automatically-derived mappings between Beam's `Row` and scala case classes. +[Magnolify's](https://github.com/spotify/magnolify) `RowType` (available as part of the `magnolify-beam` artifact) provides automatically-derived mappings between Beam's `Row` and scala case classes. See [full documentation here](https://github.com/spotify/magnolify/blob/main/docs/beam.md). + +```scala mdoc:invisible +import com.spotify.scio.ScioContext +import com.spotify.scio.managed._ +import org.apache.beam.sdk.managed.Managed +import org.apache.beam.sdk.schemas.Schema +import org.apache.beam.sdk.values.Row +``` + +```scala mdoc:compile-only +import magnolify.beam._ + +val config: Map[String, Object] = ??? -See [full documentation here](https://github.com/spotify/magnolify/blob/main/docs/beam.md) and [an example usage here](https://spotify.github.io/scio/examples/extra/ManagedExample.scala.html). +case class Record(a: Int, b: String) +val rt = RowType[Record] +implicit val recordRowCoder: Coder[Row] = Coder.row(rt.schema) +val sc: ScioContext = ??? +sc.managed(Managed.ICEBERG, rt.schema, config) + // convert the Row instance to a Record + .map(rt.apply) + .map(r => r.copy(a = r.a + 1)) + // convert the Record to a Row + .map(rt.apply) + .saveAsManaged(Managed.ICEBERG, config) +``` From 05d1311956dcca502841e1cc940652d1bf0aa208 Mon Sep 17 00:00:00 2001 From: Kellen Dye Date: Mon, 16 Sep 2024 15:43:38 -0400 Subject: [PATCH 04/13] wip --- .../spotify/scio/iceberg/IcebergIOIT.scala | 180 ++++++++++++++++++ .../com/spotify/scio/iceberg/IcebergIO.scala | 12 +- .../com/spotify/scio/managed/ManagedIO.scala | 2 + 3 files changed, 191 insertions(+), 3 deletions(-) create mode 100644 integration/src/test/scala/com/spotify/scio/iceberg/IcebergIOIT.scala diff --git a/integration/src/test/scala/com/spotify/scio/iceberg/IcebergIOIT.scala b/integration/src/test/scala/com/spotify/scio/iceberg/IcebergIOIT.scala new file mode 100644 index 0000000000..3ed3286ef0 --- /dev/null +++ b/integration/src/test/scala/com/spotify/scio/iceberg/IcebergIOIT.scala @@ -0,0 +1,180 @@ +/* + * Copyright 2024 Spotify AB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.spotify.scio.iceberg + +import com.dimafeng.testcontainers.GenericContainer.FileSystemBind +import com.dimafeng.testcontainers.{ForAllTestContainer, GenericContainer} +//import com.google.common.collect.ImmutableMap +import com.spotify.scio.testing.PipelineSpec + +import java.sql.DriverManager +//import org.scalatest.concurrent.Eventually +import magnolify.beam._ +//import org.apache.hadoop.conf.Configuration +//import org.apache.hadoop.hive.metastore.HiveMetaStoreClient +//import org.apache.hadoop.hive.metastore.api.{Catalog, Database, Table} +//import org.apache.iceberg.PartitionSpec +//import org.apache.iceberg.catalog.{Namespace, TableIdentifier} +//import org.apache.iceberg.hive.HiveCatalog +//import org.apache.iceberg.types.Types.{IntegerType, NestedField, StringType} + +import java.time.Duration +//import org.scalatest.BeforeAndAfterAll +import org.testcontainers.containers.BindMode +import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy + +import java.io.File +import java.nio.file.Files + +case class IcebergIOITRecord(a: Int, b: String, i: Int) +object IcebergIOITRecord { + implicit val icebergIOITRecordRowType: RowType[IcebergIOITRecord] = RowType[IcebergIOITRecord] +} + +class IcebergIOIT extends PipelineSpec with ForAllTestContainer { //with BeforeAndAfterAll with Eventually { // { + val warehouseLocalPath: String = "/opt/hive/data/warehouse/iceberg-it/" + lazy val tempDir: File = { + val t = Files.createTempDirectory("iceberg-it").toFile + t.deleteOnExit() + t + } + + // docker run -d -p 9083:9083 -v /Users/kellend/tmp/iceberg-it:/opt/hive/data/warehouse/iceberg-it/ --env SERVICE_NAME=metastore apache/hive:3.1.3 + override val container: GenericContainer = + GenericContainer( + GenericContainer.stringToDockerImage("apache/hive:4.0.0"), + env = Map( + "SERVICE_NAME" -> "hiveserver2", + "VERBOSE" -> "true", + ), + exposedPorts = Seq(10000, 10002), + fileSystemBind = Seq( + FileSystemBind(s"$tempDir", "/warehouse", BindMode.READ_WRITE) + ), + waitStrategy = new HostPortWaitStrategy() + .forPorts(10000, 10002) + // hive metastore + .withStartupTimeout(Duration.ofSeconds(180)) + ) + + override def afterStart(): Unit = { +// val conf = new Configuration(false) +// configMap.foreach { case (k, v) => conf.set(k, v) } +// val client = new HiveMetaStoreClient(conf) +// +// client.createCatalog( +// new Catalog( +// "iceberg_it", +// s"file://${warehouseLocalPath}" +// ) +// ) +// +// client.createDatabase( +// new Database( +// "iceberg_it", +// "iceberg test records", +// s"file://${warehouseLocalPath}/iceberg_it.db/", +// ImmutableMap.of[String, String]() +// ) +// ) +// +// client.createTable( +// new Table( +// "iceberg_records", +// "iceberg_it", +// "iceberg_owner", +// 0, +// 0, +// Int.MaxValue, +// /* +// private String tableName; // required +// private String dbName; // required +// private String owner; // required +// private int createTime; // required +// private int lastAccessTime; // required +// private int retention; // required +// +// private StorageDescriptor sd; // required +// private List partitionKeys; // required +// private Map parameters; // required +// private String viewOriginalText; // required +// private String viewExpandedText; // required +// private String tableType; // required +// +// private PrincipalPrivilegeSet privileges; // optional +// private boolean temporary; // optional +// private boolean rewriteEnabled; // optional +// private CreationMetadata creationMetadata; // optional +// private String catName; // optional +// private PrincipalType ownerType; // optional +// */ +// +// ) +// ) + +// // Note: this operates directly on the tempdir +// new File(tempDir + File.separator + "iceberg_it.db" + "iceberg_records").mkdirs() +// val cat = new HiveCatalog() +// cat.initialize( +// "iceberg_it", +// ImmutableMap.of[String, String]( +// "type", "hive", +// "uri", s"thrift://${container.containerIpAddress}:${container.mappedPort(9083)}", +//// "uri", s"thrift://0.0.0.0:9083", +// "warehouse", s"file://${tempDir}" +// ) +// ) +// +// import org.apache.iceberg.Schema +// cat.dropNamespace(Namespace.of("iceberg_it")) +// cat.createNamespace(Namespace.of("iceberg_it")) +// cat.createTable( +// TableIdentifier.parse("iceberg_it.iceberg_records"), +// new Schema( +// NestedField.required(0, "a", IntegerType.get()), +// NestedField.required(1, "b", StringType.get()), +// ), +// PartitionSpec.unpartitioned() +// ) + + Class.forName("org.apache.hive.jdbc.HiveDriver") + val conStr = s"jdbc:hive2://${container.containerIpAddress}:${container.mappedPort(10000)}/default" + val con = DriverManager.getConnection(conStr, "", "") + val stmt = con.createStatement + stmt.execute("CREATE DATABASE iceberg_it") + stmt.execute("CREATE TABLE iceberg_it.iceberg_records(a INT, b STRING) PARTITIONED BY (i int) STORED BY ICEBERG") + stmt.execute("INSERT INTO iceberg_it.iceberg_records VALUES(1, '1', 1)") + con.close() + } + + "IcebergIO" should "foo" in { + runWithRealContext() { sc => + val elements = 1.to(10).map(i => IcebergIOITRecord(i, s"$i", i)) + sc.parallelize(elements) + .saveAsIceberg( + "iceberg_it.iceberg_records", +// "iceberg_it", + catalogProperties = Map( + "type" -> "hive", + "uri" -> s"thrift://${container.containerIpAddress}:${container.mappedPort(10000)}", +// "warehouse" -> s"file://${warehouseLocalPath}" + ) + ) + } + } +} + diff --git a/scio-iceberg/src/main/scala/com/spotify/scio/iceberg/IcebergIO.scala b/scio-iceberg/src/main/scala/com/spotify/scio/iceberg/IcebergIO.scala index c458ed15b7..5df642cb09 100644 --- a/scio-iceberg/src/main/scala/com/spotify/scio/iceberg/IcebergIO.scala +++ b/scio-iceberg/src/main/scala/com/spotify/scio/iceberg/IcebergIO.scala @@ -23,6 +23,7 @@ import com.spotify.scio.values.SCollection import magnolify.beam.RowType import org.apache.beam.sdk.managed.Managed import com.spotify.scio.managed.ManagedIO +import org.apache.beam.sdk.coders.RowCoder import org.apache.beam.sdk.values.Row final case class IcebergIO[T: RowType: Coder](table: String, catalogName: Option[String]) @@ -32,7 +33,8 @@ final case class IcebergIO[T: RowType: Coder](table: String, catalogName: Option override val tapT: TapT.Aux[T, Nothing] = EmptyTapOf[T] private lazy val rowType: RowType[T] = implicitly - implicit private lazy val rowCoder: Coder[Row] = Coder.row(rowType.schema) + private lazy val beamRowCoder: RowCoder = RowCoder.of(rowType.schema) + implicit private lazy val rowCoder: Coder[Row] = Coder.beam(beamRowCoder) override def testId: String = s"IcebergIO($table, $catalogName)" @@ -53,10 +55,14 @@ final case class IcebergIO[T: RowType: Coder](table: String, catalogName: Option .readWithContext(sc, ManagedIO.ReadParam(rowType.schema)) .map(rowType.from) - override protected def write(data: SCollection[T], params: IcebergIO.WriteParam): Tap[tapT.T] = + override protected def write(data: SCollection[T], params: IcebergIO.WriteParam): Tap[tapT.T] = { + val tx = data.transform { + _.map(rowType.to).setCoder(beamRowCoder) + } ManagedIO(Managed.ICEBERG, config(params.catalogProperties, params.configProperties)) - .writeWithContext(data.transform(_.map(rowType.to)), ManagedIO.WriteParam()) + .writeWithContext(tx, ManagedIO.WriteParam()) .underlying +} override def tap(read: IcebergIO.ReadParam): Tap[tapT.T] = EmptyTap } diff --git a/scio-iceberg/src/main/scala/com/spotify/scio/managed/ManagedIO.scala b/scio-iceberg/src/main/scala/com/spotify/scio/managed/ManagedIO.scala index 02686a9448..73c11c5760 100644 --- a/scio-iceberg/src/main/scala/com/spotify/scio/managed/ManagedIO.scala +++ b/scio-iceberg/src/main/scala/com/spotify/scio/managed/ManagedIO.scala @@ -61,6 +61,8 @@ final case class ManagedIO(ioName: String, config: Map[String, Object]) extends ): Tap[tapT.T] = { val t = Managed.write(ioName).withConfig(_config) data.applyInternal(t) + .getSinglePCollection + .setCoder(data.internal.getCoder) EmptyTap } From 85a87081e14d6ee47783ce5cd6b4081c33e82f17 Mon Sep 17 00:00:00 2001 From: Kellen Dye Date: Tue, 17 Sep 2024 11:09:13 -0400 Subject: [PATCH 05/13] wip --- .../spotify/scio/iceberg/IcebergIOIT.scala | 34 ++++++++++++------- 1 file changed, 22 insertions(+), 12 deletions(-) diff --git a/integration/src/test/scala/com/spotify/scio/iceberg/IcebergIOIT.scala b/integration/src/test/scala/com/spotify/scio/iceberg/IcebergIOIT.scala index 3ed3286ef0..a7511f0c12 100644 --- a/integration/src/test/scala/com/spotify/scio/iceberg/IcebergIOIT.scala +++ b/integration/src/test/scala/com/spotify/scio/iceberg/IcebergIOIT.scala @@ -16,8 +16,9 @@ package com.spotify.scio.iceberg -import com.dimafeng.testcontainers.GenericContainer.FileSystemBind +//import com.dimafeng.testcontainers.GenericContainer.FileSystemBind import com.dimafeng.testcontainers.{ForAllTestContainer, GenericContainer} +import org.apache.iceberg.{CatalogProperties, CatalogUtil} //import com.google.common.collect.ImmutableMap import com.spotify.scio.testing.PipelineSpec @@ -34,7 +35,7 @@ import magnolify.beam._ import java.time.Duration //import org.scalatest.BeforeAndAfterAll -import org.testcontainers.containers.BindMode +//import org.testcontainers.containers.BindMode import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy import java.io.File @@ -54,17 +55,20 @@ class IcebergIOIT extends PipelineSpec with ForAllTestContainer { //with Before } // docker run -d -p 9083:9083 -v /Users/kellend/tmp/iceberg-it:/opt/hive/data/warehouse/iceberg-it/ --env SERVICE_NAME=metastore apache/hive:3.1.3 + // docker run -d -p 10000:10000 -p 10002:10002 --env SERVICE_NAME=hiveserver2 --env VERBOSE=true apache/hive:3.1.3 override val container: GenericContainer = GenericContainer( GenericContainer.stringToDockerImage("apache/hive:4.0.0"), env = Map( "SERVICE_NAME" -> "hiveserver2", "VERBOSE" -> "true", + "HIVE_SERVER2_THRIFT_PORT" -> "10000", + "SERVICE_OPTS" -> "-Dhive.metastore.uris=thrift://0.0.0.0:9083'" ), exposedPorts = Seq(10000, 10002), - fileSystemBind = Seq( - FileSystemBind(s"$tempDir", "/warehouse", BindMode.READ_WRITE) - ), +// fileSystemBind = Seq( +// FileSystemBind(s"$tempDir", "/warehouse", BindMode.READ_WRITE) +// ), waitStrategy = new HostPortWaitStrategy() .forPorts(10000, 10002) // hive metastore @@ -155,9 +159,10 @@ class IcebergIOIT extends PipelineSpec with ForAllTestContainer { //with Before val conStr = s"jdbc:hive2://${container.containerIpAddress}:${container.mappedPort(10000)}/default" val con = DriverManager.getConnection(conStr, "", "") val stmt = con.createStatement - stmt.execute("CREATE DATABASE iceberg_it") - stmt.execute("CREATE TABLE iceberg_it.iceberg_records(a INT, b STRING) PARTITIONED BY (i int) STORED BY ICEBERG") - stmt.execute("INSERT INTO iceberg_it.iceberg_records VALUES(1, '1', 1)") + stmt.execute("CREATE DATABASE iceberg_it_db") +// stmt.execute("CREATE NAMESPACE iceberg_it_ns") + stmt.execute("CREATE TABLE iceberg_it_db.iceberg_records(a INT, b STRING) PARTITIONED BY (i int) STORED BY ICEBERG") + stmt.execute("INSERT INTO iceberg_it_db.iceberg_records VALUES(1, '1', 1)") con.close() } @@ -166,12 +171,17 @@ class IcebergIOIT extends PipelineSpec with ForAllTestContainer { //with Before val elements = 1.to(10).map(i => IcebergIOITRecord(i, s"$i", i)) sc.parallelize(elements) .saveAsIceberg( - "iceberg_it.iceberg_records", -// "iceberg_it", + "iceberg_it_db.iceberg_records", + "iceberg_it_db", catalogProperties = Map( - "type" -> "hive", - "uri" -> s"thrift://${container.containerIpAddress}:${container.mappedPort(10000)}", + CatalogUtil.ICEBERG_CATALOG_TYPE -> CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE, + CatalogProperties.URI -> s"thrift://${container.containerIpAddress}:${container.mappedPort(10000)}", // "warehouse" -> s"file://${warehouseLocalPath}" + ), + configProperties = Map( +// "hive.metastore.sasl.enabled" -> "true", + "hive.security.authorization.enabled" -> "true", + "hive.metastore.username" -> "", ) ) } From aeed6977c7f7604eeb2c187e91e867307cd53846 Mon Sep 17 00:00:00 2001 From: Kellen Dye Date: Tue, 17 Sep 2024 15:16:34 -0400 Subject: [PATCH 06/13] wip --- build.sbt | 55 ++---- .../spotify/scio/iceberg/IcebergIOIT.scala | 184 +++++------------- .../scio/examples/extra/ManagedExample.scala | 16 ++ .../com/spotify/scio/iceberg/IcebergIO.scala | 0 .../com/spotify/scio/iceberg/package.scala | 0 .../syntax/IcebergSCollectionSyntax.scala | 0 .../syntax/IcebergScioContextSyntax.scala | 0 .../com/spotify/scio/managed/ManagedIO.scala | 8 +- .../com/spotify/scio/managed/package.scala | 0 .../syntax/ManagedSCollectionSyntax.scala | 0 .../syntax/ManagedScioContextSyntax.scala | 0 11 files changed, 80 insertions(+), 183 deletions(-) rename {scio-iceberg => scio-managed}/src/main/scala/com/spotify/scio/iceberg/IcebergIO.scala (100%) rename {scio-iceberg => scio-managed}/src/main/scala/com/spotify/scio/iceberg/package.scala (100%) rename {scio-iceberg => scio-managed}/src/main/scala/com/spotify/scio/iceberg/syntax/IcebergSCollectionSyntax.scala (100%) rename {scio-iceberg => scio-managed}/src/main/scala/com/spotify/scio/iceberg/syntax/IcebergScioContextSyntax.scala (100%) rename {scio-iceberg => scio-managed}/src/main/scala/com/spotify/scio/managed/ManagedIO.scala (90%) rename {scio-iceberg => scio-managed}/src/main/scala/com/spotify/scio/managed/package.scala (100%) rename {scio-iceberg => scio-managed}/src/main/scala/com/spotify/scio/managed/syntax/ManagedSCollectionSyntax.scala (100%) rename {scio-iceberg => scio-managed}/src/main/scala/com/spotify/scio/managed/syntax/ManagedScioContextSyntax.scala (100%) diff --git a/build.sbt b/build.sbt index 057103229c..02e2b65a53 100644 --- a/build.sbt +++ b/build.sbt @@ -90,6 +90,7 @@ val elasticsearch8Version = "8.15.1" val fansiVersion = "0.5.0" val featranVersion = "0.8.0" val httpAsyncClientVersion = "4.1.5" +val icebergVersion = "1.4.2" val jakartaJsonVersion = "2.1.3" val javaLshVersion = "0.12" val jedisVersion = "5.1.5" @@ -697,9 +698,9 @@ lazy val `scio-bom` = project `scio-extra`, `scio-google-cloud-platform`, `scio-grpc`, - `scio-iceberg`, `scio-jdbc`, `scio-macros`, + `scio-managed`, `scio-neo4j`, `scio-parquet`, `scio-redis`, @@ -1172,21 +1173,19 @@ lazy val `scio-grpc` = project ) ) -lazy val `scio-iceberg` = project - .in(file("scio-iceberg")) +lazy val `scio-managed` = project + .in(file("scio-managed")) .dependsOn( `scio-core` % "compile;test->test" ) .settings(commonSettings) .settings( - description := "Scio add-on for Iceberg", + description := "Scio add-on for Beam's managed transforms", libraryDependencies ++= Seq( // compile "org.apache.beam" % "beam-sdks-java-core" % beamVersion, "org.apache.beam" % "beam-sdks-java-managed" % beamVersion, - // TODO add iceberg as test source - "org.apache.beam" % "beam-sdks-java-io-iceberg" % beamVersion, - "com.spotify" %% "magnolify-beam" % magnolifyVersion, + "com.spotify" %% "magnolify-beam" % magnolifyVersion // test ) ) @@ -1365,8 +1364,8 @@ lazy val `scio-examples` = project `scio-elasticsearch8`, `scio-extra`, `scio-google-cloud-platform`, - `scio-iceberg`, `scio-jdbc`, + `scio-managed`, `scio-neo4j`, `scio-parquet`, `scio-redis`, @@ -1444,34 +1443,7 @@ lazy val `scio-examples` = project "org.apache.beam" % "beam-sdks-java-io-google-cloud-platform" % beamVersion, "org.apache.beam" % "beam-sdks-java-io-jdbc" % beamVersion, "org.apache.beam" % "beam-sdks-java-io-iceberg" % beamVersion, - // no - "org.apache.iceberg" % "iceberg-hive-metastore" % "1.4.2", - "org.apache.hive.hcatalog" % "hive-hcatalog-core" % "3.1.3", - /* - def hive_version = "3.1.3" -def iceberg_version = "1.4.2" - testRuntimeOnly library.java.snake_yaml - testRuntimeOnly library.java.bigdataoss_gcs_connector - testRuntimeOnly library.java.hadoop_client - - // needed to set up the test environment - testImplementation "org.apache.iceberg:iceberg-common:$iceberg_version" - testImplementation "org.apache.iceberg:iceberg-core:$iceberg_version" - testImplementation "org.assertj:assertj-core:3.11.1" - testImplementation library.java.junit - - // needed to set up test Hive Metastore and run tests - testImplementation ("org.apache.iceberg:iceberg-hive-metastore:$iceberg_version") - testImplementation project(path: ":sdks:java:io:iceberg:hive:exec", configuration: "shadow") - testRuntimeOnly ("org.apache.hive.hcatalog:hive-hcatalog-core:$hive_version") { - exclude group: "org.apache.hive", module: "hive-exec" - exclude group: "org.apache.parquet", module: "parquet-hadoop-bundle" - } - testImplementation "org.apache.iceberg:iceberg-parquet:$iceberg_version" - testImplementation "org.apache.parquet:parquet-column:1.12.0" - */ - - // no + "org.apache.beam" % "beam-sdks-java-managed" % beamVersion, "org.apache.hadoop" % "hadoop-common" % hadoopVersion, "org.apache.httpcomponents" % "httpcore" % httpCoreVersion, "org.apache.parquet" % "parquet-column" % parquetVersion, @@ -1740,6 +1712,7 @@ lazy val integration = project `scio-extra` % "test->test", `scio-google-cloud-platform` % "compile;test->test", `scio-jdbc` % "compile;test->test", + `scio-managed` % "test->test", `scio-neo4j` % "test->test", `scio-smb` % "test->provided,test" ) @@ -1786,7 +1759,13 @@ lazy val integration = project "com.fasterxml.jackson.core" % "jackson-databind" % jacksonVersion % Test, "com.fasterxml.jackson.module" %% "jackson-module-scala" % jacksonVersion % Test, "com.spotify" %% "magnolify-datastore" % magnolifyVersion % Test, - "org.apache.beam" % "beam-runners-google-cloud-dataflow-java" % beamVersion % Test + "org.apache.beam" % "beam-runners-google-cloud-dataflow-java" % beamVersion % Test, + "org.apache.beam" % "beam-sdks-java-io-iceberg" % beamVersion % Test, + "org.apache.iceberg" % "iceberg-common" % icebergVersion % Test, + "org.apache.iceberg" % "iceberg-core" % icebergVersion % Test, + "org.apache.iceberg" % "iceberg-parquet" % icebergVersion % Test, + "org.apache.parquet" % "parquet-common" % parquetVersion % Test, + "org.apache.parquet" % "parquet-column" % parquetVersion % Test ) ) @@ -1812,9 +1791,9 @@ lazy val site = project `scio-extra`, `scio-google-cloud-platform`, `scio-grpc` % "compile->test", - `scio-iceberg`, `scio-jdbc`, `scio-macros`, + `scio-managed`, `scio-neo4j`, `scio-parquet`, `scio-redis`, diff --git a/integration/src/test/scala/com/spotify/scio/iceberg/IcebergIOIT.scala b/integration/src/test/scala/com/spotify/scio/iceberg/IcebergIOIT.scala index a7511f0c12..315ddecc65 100644 --- a/integration/src/test/scala/com/spotify/scio/iceberg/IcebergIOIT.scala +++ b/integration/src/test/scala/com/spotify/scio/iceberg/IcebergIOIT.scala @@ -16,175 +16,81 @@ package com.spotify.scio.iceberg -//import com.dimafeng.testcontainers.GenericContainer.FileSystemBind import com.dimafeng.testcontainers.{ForAllTestContainer, GenericContainer} -import org.apache.iceberg.{CatalogProperties, CatalogUtil} -//import com.google.common.collect.ImmutableMap import com.spotify.scio.testing.PipelineSpec - -import java.sql.DriverManager -//import org.scalatest.concurrent.Eventually import magnolify.beam._ -//import org.apache.hadoop.conf.Configuration -//import org.apache.hadoop.hive.metastore.HiveMetaStoreClient -//import org.apache.hadoop.hive.metastore.api.{Catalog, Database, Table} -//import org.apache.iceberg.PartitionSpec -//import org.apache.iceberg.catalog.{Namespace, TableIdentifier} -//import org.apache.iceberg.hive.HiveCatalog -//import org.apache.iceberg.types.Types.{IntegerType, NestedField, StringType} - -import java.time.Duration -//import org.scalatest.BeforeAndAfterAll -//import org.testcontainers.containers.BindMode +import org.apache.iceberg.catalog.{Namespace, TableIdentifier} +import org.apache.iceberg.rest.RESTCatalog +import org.apache.iceberg.types.Types.{IntegerType, NestedField, StringType} +import org.apache.iceberg.{CatalogProperties, CatalogUtil, PartitionSpec} import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy +import java.time.Duration import java.io.File import java.nio.file.Files +import scala.jdk.CollectionConverters._ -case class IcebergIOITRecord(a: Int, b: String, i: Int) +case class IcebergIOITRecord(a: Int, b: String) object IcebergIOITRecord { implicit val icebergIOITRecordRowType: RowType[IcebergIOITRecord] = RowType[IcebergIOITRecord] } -class IcebergIOIT extends PipelineSpec with ForAllTestContainer { //with BeforeAndAfterAll with Eventually { // { - val warehouseLocalPath: String = "/opt/hive/data/warehouse/iceberg-it/" +class IcebergIOIT extends PipelineSpec with ForAllTestContainer { // with BeforeAndAfterAll with Eventually { // { + val ContainerPort = 8181 + val CatalogName = "iceberg_it" + val NamespaceName = "iceberg_it_ns" + val TableName = s"${NamespaceName}.iceberg_records" + lazy val tempDir: File = { val t = Files.createTempDirectory("iceberg-it").toFile t.deleteOnExit() t } - // docker run -d -p 9083:9083 -v /Users/kellend/tmp/iceberg-it:/opt/hive/data/warehouse/iceberg-it/ --env SERVICE_NAME=metastore apache/hive:3.1.3 - // docker run -d -p 10000:10000 -p 10002:10002 --env SERVICE_NAME=hiveserver2 --env VERBOSE=true apache/hive:3.1.3 override val container: GenericContainer = GenericContainer( - GenericContainer.stringToDockerImage("apache/hive:4.0.0"), - env = Map( - "SERVICE_NAME" -> "hiveserver2", - "VERBOSE" -> "true", - "HIVE_SERVER2_THRIFT_PORT" -> "10000", - "SERVICE_OPTS" -> "-Dhive.metastore.uris=thrift://0.0.0.0:9083'" - ), - exposedPorts = Seq(10000, 10002), -// fileSystemBind = Seq( -// FileSystemBind(s"$tempDir", "/warehouse", BindMode.READ_WRITE) -// ), + GenericContainer.stringToDockerImage("tabulario/iceberg-rest:1.6.0"), + exposedPorts = Seq(ContainerPort), waitStrategy = new HostPortWaitStrategy() - .forPorts(10000, 10002) - // hive metastore + .forPorts(ContainerPort) .withStartupTimeout(Duration.ofSeconds(180)) ) - override def afterStart(): Unit = { -// val conf = new Configuration(false) -// configMap.foreach { case (k, v) => conf.set(k, v) } -// val client = new HiveMetaStoreClient(conf) -// -// client.createCatalog( -// new Catalog( -// "iceberg_it", -// s"file://${warehouseLocalPath}" -// ) -// ) -// -// client.createDatabase( -// new Database( -// "iceberg_it", -// "iceberg test records", -// s"file://${warehouseLocalPath}/iceberg_it.db/", -// ImmutableMap.of[String, String]() -// ) -// ) -// -// client.createTable( -// new Table( -// "iceberg_records", -// "iceberg_it", -// "iceberg_owner", -// 0, -// 0, -// Int.MaxValue, -// /* -// private String tableName; // required -// private String dbName; // required -// private String owner; // required -// private int createTime; // required -// private int lastAccessTime; // required -// private int retention; // required -// -// private StorageDescriptor sd; // required -// private List partitionKeys; // required -// private Map parameters; // required -// private String viewOriginalText; // required -// private String viewExpandedText; // required -// private String tableType; // required -// -// private PrincipalPrivilegeSet privileges; // optional -// private boolean temporary; // optional -// private boolean rewriteEnabled; // optional -// private CreationMetadata creationMetadata; // optional -// private String catName; // optional -// private PrincipalType ownerType; // optional -// */ -// -// ) -// ) + lazy val uri = s"http://${container.containerIpAddress}:${container.mappedPort(ContainerPort)}" -// // Note: this operates directly on the tempdir -// new File(tempDir + File.separator + "iceberg_it.db" + "iceberg_records").mkdirs() -// val cat = new HiveCatalog() -// cat.initialize( -// "iceberg_it", -// ImmutableMap.of[String, String]( -// "type", "hive", -// "uri", s"thrift://${container.containerIpAddress}:${container.mappedPort(9083)}", -//// "uri", s"thrift://0.0.0.0:9083", -// "warehouse", s"file://${tempDir}" -// ) -// ) -// -// import org.apache.iceberg.Schema -// cat.dropNamespace(Namespace.of("iceberg_it")) -// cat.createNamespace(Namespace.of("iceberg_it")) -// cat.createTable( -// TableIdentifier.parse("iceberg_it.iceberg_records"), -// new Schema( -// NestedField.required(0, "a", IntegerType.get()), -// NestedField.required(1, "b", StringType.get()), -// ), -// PartitionSpec.unpartitioned() -// ) + override def afterStart(): Unit = { + val cat = new RESTCatalog() + cat.initialize(CatalogName, Map("uri" -> uri).asJava) - Class.forName("org.apache.hive.jdbc.HiveDriver") - val conStr = s"jdbc:hive2://${container.containerIpAddress}:${container.mappedPort(10000)}/default" - val con = DriverManager.getConnection(conStr, "", "") - val stmt = con.createStatement - stmt.execute("CREATE DATABASE iceberg_it_db") -// stmt.execute("CREATE NAMESPACE iceberg_it_ns") - stmt.execute("CREATE TABLE iceberg_it_db.iceberg_records(a INT, b STRING) PARTITIONED BY (i int) STORED BY ICEBERG") - stmt.execute("INSERT INTO iceberg_it_db.iceberg_records VALUES(1, '1', 1)") - con.close() + import org.apache.iceberg.Schema + cat.createNamespace(Namespace.of(NamespaceName)) + cat.createTable( + TableIdentifier.parse(TableName), + new Schema( + NestedField.required(0, "a", IntegerType.get()), + NestedField.required(1, "b", StringType.get()) + ), + PartitionSpec.unpartitioned() + ) } - "IcebergIO" should "foo" in { + "IcebergIO" should "work" in { + val catalogProperties = Map( + CatalogUtil.ICEBERG_CATALOG_TYPE -> CatalogUtil.ICEBERG_CATALOG_TYPE_REST, + CatalogProperties.URI -> uri + ) + val elements = 1.to(10).map(i => IcebergIOITRecord(i, s"$i")) + runWithRealContext() { sc => - val elements = 1.to(10).map(i => IcebergIOITRecord(i, s"$i", i)) sc.parallelize(elements) - .saveAsIceberg( - "iceberg_it_db.iceberg_records", - "iceberg_it_db", - catalogProperties = Map( - CatalogUtil.ICEBERG_CATALOG_TYPE -> CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE, - CatalogProperties.URI -> s"thrift://${container.containerIpAddress}:${container.mappedPort(10000)}", -// "warehouse" -> s"file://${warehouseLocalPath}" - ), - configProperties = Map( -// "hive.metastore.sasl.enabled" -> "true", - "hive.security.authorization.enabled" -> "true", - "hive.metastore.username" -> "", - ) - ) + .saveAsIceberg(TableName, catalogProperties = catalogProperties) + } + + runWithRealContext() { sc => + sc.iceberg[IcebergIOITRecord]( + TableName, + catalogProperties = catalogProperties + ) should containInAnyOrder(elements) } } } - diff --git a/scio-examples/src/main/scala/com/spotify/scio/examples/extra/ManagedExample.scala b/scio-examples/src/main/scala/com/spotify/scio/examples/extra/ManagedExample.scala index 5651c48564..efe825e617 100644 --- a/scio-examples/src/main/scala/com/spotify/scio/examples/extra/ManagedExample.scala +++ b/scio-examples/src/main/scala/com/spotify/scio/examples/extra/ManagedExample.scala @@ -1,3 +1,19 @@ +/* + * Copyright 2024 Spotify AB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package com.spotify.scio.examples.extra import com.spotify.scio.ContextAndArgs diff --git a/scio-iceberg/src/main/scala/com/spotify/scio/iceberg/IcebergIO.scala b/scio-managed/src/main/scala/com/spotify/scio/iceberg/IcebergIO.scala similarity index 100% rename from scio-iceberg/src/main/scala/com/spotify/scio/iceberg/IcebergIO.scala rename to scio-managed/src/main/scala/com/spotify/scio/iceberg/IcebergIO.scala diff --git a/scio-iceberg/src/main/scala/com/spotify/scio/iceberg/package.scala b/scio-managed/src/main/scala/com/spotify/scio/iceberg/package.scala similarity index 100% rename from scio-iceberg/src/main/scala/com/spotify/scio/iceberg/package.scala rename to scio-managed/src/main/scala/com/spotify/scio/iceberg/package.scala diff --git a/scio-iceberg/src/main/scala/com/spotify/scio/iceberg/syntax/IcebergSCollectionSyntax.scala b/scio-managed/src/main/scala/com/spotify/scio/iceberg/syntax/IcebergSCollectionSyntax.scala similarity index 100% rename from scio-iceberg/src/main/scala/com/spotify/scio/iceberg/syntax/IcebergSCollectionSyntax.scala rename to scio-managed/src/main/scala/com/spotify/scio/iceberg/syntax/IcebergSCollectionSyntax.scala diff --git a/scio-iceberg/src/main/scala/com/spotify/scio/iceberg/syntax/IcebergScioContextSyntax.scala b/scio-managed/src/main/scala/com/spotify/scio/iceberg/syntax/IcebergScioContextSyntax.scala similarity index 100% rename from scio-iceberg/src/main/scala/com/spotify/scio/iceberg/syntax/IcebergScioContextSyntax.scala rename to scio-managed/src/main/scala/com/spotify/scio/iceberg/syntax/IcebergScioContextSyntax.scala diff --git a/scio-iceberg/src/main/scala/com/spotify/scio/managed/ManagedIO.scala b/scio-managed/src/main/scala/com/spotify/scio/managed/ManagedIO.scala similarity index 90% rename from scio-iceberg/src/main/scala/com/spotify/scio/managed/ManagedIO.scala rename to scio-managed/src/main/scala/com/spotify/scio/managed/ManagedIO.scala index 73c11c5760..9c63ed6ddb 100644 --- a/scio-iceberg/src/main/scala/com/spotify/scio/managed/ManagedIO.scala +++ b/scio-managed/src/main/scala/com/spotify/scio/managed/ManagedIO.scala @@ -17,7 +17,6 @@ package com.spotify.scio.managed import com.spotify.scio.ScioContext -import com.spotify.scio.coders.{Coder, CoderMaterializer} import com.spotify.scio.io.{EmptyTap, EmptyTapOf, ScioIO, Tap, TapT} import com.spotify.scio.values.SCollection import org.apache.beam.sdk.managed.Managed @@ -52,17 +51,14 @@ final case class ManagedIO(ioName: String, config: Map[String, Object]) extends sc.applyInternal[PCollectionRowTuple]( Managed.read(ioName).withConfig(_config) ).getSinglePCollection - ).setCoder(CoderMaterializer.beam(sc, Coder.row(params.schema))) + ) } override protected def write( data: SCollection[Row], params: ManagedIO.WriteParam ): Tap[tapT.T] = { - val t = Managed.write(ioName).withConfig(_config) - data.applyInternal(t) - .getSinglePCollection - .setCoder(data.internal.getCoder) + data.applyInternal(Managed.write(ioName).withConfig(_config)) EmptyTap } diff --git a/scio-iceberg/src/main/scala/com/spotify/scio/managed/package.scala b/scio-managed/src/main/scala/com/spotify/scio/managed/package.scala similarity index 100% rename from scio-iceberg/src/main/scala/com/spotify/scio/managed/package.scala rename to scio-managed/src/main/scala/com/spotify/scio/managed/package.scala diff --git a/scio-iceberg/src/main/scala/com/spotify/scio/managed/syntax/ManagedSCollectionSyntax.scala b/scio-managed/src/main/scala/com/spotify/scio/managed/syntax/ManagedSCollectionSyntax.scala similarity index 100% rename from scio-iceberg/src/main/scala/com/spotify/scio/managed/syntax/ManagedSCollectionSyntax.scala rename to scio-managed/src/main/scala/com/spotify/scio/managed/syntax/ManagedSCollectionSyntax.scala diff --git a/scio-iceberg/src/main/scala/com/spotify/scio/managed/syntax/ManagedScioContextSyntax.scala b/scio-managed/src/main/scala/com/spotify/scio/managed/syntax/ManagedScioContextSyntax.scala similarity index 100% rename from scio-iceberg/src/main/scala/com/spotify/scio/managed/syntax/ManagedScioContextSyntax.scala rename to scio-managed/src/main/scala/com/spotify/scio/managed/syntax/ManagedScioContextSyntax.scala From eb933e3db02c33f23fdc9b70147987e84eb5d377 Mon Sep 17 00:00:00 2001 From: Kellen Dye Date: Tue, 17 Sep 2024 15:32:28 -0400 Subject: [PATCH 07/13] wip --- build.sbt | 1 - 1 file changed, 1 deletion(-) diff --git a/build.sbt b/build.sbt index 02e2b65a53..92355d4a65 100644 --- a/build.sbt +++ b/build.sbt @@ -1442,7 +1442,6 @@ lazy val `scio-examples` = project "org.apache.beam" % "beam-sdks-java-extensions-sql" % beamVersion, "org.apache.beam" % "beam-sdks-java-io-google-cloud-platform" % beamVersion, "org.apache.beam" % "beam-sdks-java-io-jdbc" % beamVersion, - "org.apache.beam" % "beam-sdks-java-io-iceberg" % beamVersion, "org.apache.beam" % "beam-sdks-java-managed" % beamVersion, "org.apache.hadoop" % "hadoop-common" % hadoopVersion, "org.apache.httpcomponents" % "httpcore" % httpCoreVersion, From 34fa5a9cae13d01c2a2a2ce890f963d57b2a0058 Mon Sep 17 00:00:00 2001 From: Kellen Dye Date: Tue, 17 Sep 2024 16:18:12 -0400 Subject: [PATCH 08/13] wip --- build.sbt | 2 +- .../src/test/scala/com/spotify/scio/iceberg/IcebergIOIT.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/build.sbt b/build.sbt index 92355d4a65..8e61279f4f 100644 --- a/build.sbt +++ b/build.sbt @@ -101,7 +101,7 @@ val kantanCodecsVersion = "0.5.3" val kantanCsvVersion = "0.7.0" val kryoVersion = "4.0.3" val magnoliaVersion = "1.1.10" -val magnolifyVersion = "0.7.4-47-aa9f05b-20240903T173705Z-SNAPSHOT" +val magnolifyVersion = "0.7.4-34-a3708ba-SNAPSHOT" val metricsVersion = "4.2.27" val munitVersion = "1.0.1" val neo4jDriverVersion = "4.4.18" diff --git a/integration/src/test/scala/com/spotify/scio/iceberg/IcebergIOIT.scala b/integration/src/test/scala/com/spotify/scio/iceberg/IcebergIOIT.scala index 315ddecc65..8a0af1afcc 100644 --- a/integration/src/test/scala/com/spotify/scio/iceberg/IcebergIOIT.scala +++ b/integration/src/test/scala/com/spotify/scio/iceberg/IcebergIOIT.scala @@ -35,7 +35,7 @@ object IcebergIOITRecord { implicit val icebergIOITRecordRowType: RowType[IcebergIOITRecord] = RowType[IcebergIOITRecord] } -class IcebergIOIT extends PipelineSpec with ForAllTestContainer { // with BeforeAndAfterAll with Eventually { // { +class IcebergIOIT extends PipelineSpec with ForAllTestContainer { val ContainerPort = 8181 val CatalogName = "iceberg_it" val NamespaceName = "iceberg_it_ns" From aa376704213ea823af6ba1ee91b0cc5eea96e895 Mon Sep 17 00:00:00 2001 From: Kellen Dye Date: Tue, 17 Sep 2024 16:39:51 -0400 Subject: [PATCH 09/13] wip --- .github/workflows/ci.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 477c4964e8..bd0921661a 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -120,11 +120,11 @@ jobs: - name: Make target directories if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/main') - run: mkdir -p scio-bom/target scio-tensorflow/target site/target scio-cassandra/cassandra3/target scio-elasticsearch/es8/target scio-jdbc/target scio-macros/target scio-grpc/target scio-elasticsearch/common/target scio-test/target scio-avro/target scio-elasticsearch/es7/target scio-redis/target scio-extra/target scio-test/parquet/target scio-test/core/target scio-google-cloud-platform/target scio-smb/target scio-test/google-cloud-platform/target scio-neo4j/target scio-parquet/target scio-core/target scio-repl/target project/target + run: mkdir -p scio-bom/target scio-tensorflow/target site/target scio-cassandra/cassandra3/target scio-elasticsearch/es8/target scio-jdbc/target scio-macros/target scio-grpc/target scio-elasticsearch/common/target scio-test/target scio-avro/target scio-elasticsearch/es7/target scio-redis/target scio-extra/target scio-test/parquet/target scio-test/core/target scio-google-cloud-platform/target scio-smb/target scio-test/google-cloud-platform/target scio-neo4j/target scio-parquet/target scio-core/target scio-repl/target scio-managed/target project/target - name: Compress target directories if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/main') - run: tar cf targets.tar scio-bom/target scio-tensorflow/target site/target scio-cassandra/cassandra3/target scio-elasticsearch/es8/target scio-jdbc/target scio-macros/target scio-grpc/target scio-elasticsearch/common/target scio-test/target scio-avro/target scio-elasticsearch/es7/target scio-redis/target scio-extra/target scio-test/parquet/target scio-test/core/target scio-google-cloud-platform/target scio-smb/target scio-test/google-cloud-platform/target scio-neo4j/target scio-parquet/target scio-core/target scio-repl/target project/target + run: tar cf targets.tar scio-bom/target scio-tensorflow/target site/target scio-cassandra/cassandra3/target scio-elasticsearch/es8/target scio-jdbc/target scio-macros/target scio-grpc/target scio-elasticsearch/common/target scio-test/target scio-avro/target scio-elasticsearch/es7/target scio-redis/target scio-extra/target scio-test/parquet/target scio-test/core/target scio-google-cloud-platform/target scio-smb/target scio-test/google-cloud-platform/target scio-neo4j/target scio-parquet/target scio-core/target scio-repl/target scio-managed/target project/target - name: Upload target directories if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/main') From 55ed82884312ebac8d8dc665a21da9dde97af711 Mon Sep 17 00:00:00 2001 From: Kellen Dye Date: Tue, 17 Sep 2024 17:09:30 -0400 Subject: [PATCH 10/13] wip --- build.sbt | 1 + 1 file changed, 1 insertion(+) diff --git a/build.sbt b/build.sbt index 8e61279f4f..f81948f050 100644 --- a/build.sbt +++ b/build.sbt @@ -1425,6 +1425,7 @@ lazy val `scio-examples` = project "com.mysql" % "mysql-connector-j" % "9.0.0", "com.softwaremill.magnolia1_2" %% "magnolia" % magnoliaVersion, "com.spotify" %% "magnolify-avro" % magnolifyVersion, + "com.spotify" %% "magnolify-beam" % magnolifyVersion, "com.spotify" %% "magnolify-bigtable" % magnolifyVersion, "com.spotify" %% "magnolify-datastore" % magnolifyVersion, "com.spotify" %% "magnolify-guava" % magnolifyVersion, From eb57321e075363e1303639966c0128413f6984a6 Mon Sep 17 00:00:00 2001 From: Kellen Dye Date: Wed, 18 Sep 2024 06:23:32 -0400 Subject: [PATCH 11/13] wip --- README.md | 1 + .../spotify/scio/iceberg/IcebergIOIT.scala | 3 +- .../com/spotify/scio/values/SCollection.scala | 8 ++-- .../com/spotify/scio/iceberg/IcebergIO.scala | 48 ++++++++++--------- .../syntax/IcebergSCollectionSyntax.scala | 4 +- .../syntax/IcebergScioContextSyntax.scala | 4 +- .../com/spotify/scio/managed/ManagedIO.scala | 2 +- .../syntax/ManagedSCollectionSyntax.scala | 2 +- 8 files changed, 38 insertions(+), 34 deletions(-) diff --git a/README.md b/README.md index c13c9467fe..58ffdf0a41 100644 --- a/README.md +++ b/README.md @@ -79,6 +79,7 @@ Scio includes the following artifacts: - `scio-google-cloud-platform`: add-on for Google Cloud IO's: BigQuery, Bigtable, Pub/Sub, Datastore, Spanner - `scio-grpc`: add-on for gRPC service calls - `scio-jdbc`: add-on for JDBC IO +- `scio-managed`: add-on for Beam's managed transforms. Includes Iceberg - `scio-neo4j`: add-on for Neo4J IO - `scio-parquet`: add-on for Parquet - `scio-redis`: add-on for Redis diff --git a/integration/src/test/scala/com/spotify/scio/iceberg/IcebergIOIT.scala b/integration/src/test/scala/com/spotify/scio/iceberg/IcebergIOIT.scala index 8a0af1afcc..f2f7fa2c0f 100644 --- a/integration/src/test/scala/com/spotify/scio/iceberg/IcebergIOIT.scala +++ b/integration/src/test/scala/com/spotify/scio/iceberg/IcebergIOIT.scala @@ -22,7 +22,7 @@ import magnolify.beam._ import org.apache.iceberg.catalog.{Namespace, TableIdentifier} import org.apache.iceberg.rest.RESTCatalog import org.apache.iceberg.types.Types.{IntegerType, NestedField, StringType} -import org.apache.iceberg.{CatalogProperties, CatalogUtil, PartitionSpec} +import org.apache.iceberg.{CatalogProperties, CatalogUtil, PartitionSpec, Schema} import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy import java.time.Duration @@ -62,7 +62,6 @@ class IcebergIOIT extends PipelineSpec with ForAllTestContainer { val cat = new RESTCatalog() cat.initialize(CatalogName, Map("uri" -> uri).asJava) - import org.apache.iceberg.Schema cat.createNamespace(Namespace.of(NamespaceName)) cat.createTable( TableIdentifier.parse(TableName), diff --git a/scio-core/src/main/scala/com/spotify/scio/values/SCollection.scala b/scio-core/src/main/scala/com/spotify/scio/values/SCollection.scala index f7f3221493..0f11c67ed2 100644 --- a/scio-core/src/main/scala/com/spotify/scio/values/SCollection.scala +++ b/scio-core/src/main/scala/com/spotify/scio/values/SCollection.scala @@ -1737,15 +1737,15 @@ sealed trait SCollection[T] extends PCollectionWrapper[T] { } /** - * Generic write method for all `ScioIO[T]` implementations, if it is test pipeline this will + * Generic write method for all `ScioIO[T]` implementations, if it is a test pipeline this will * evaluate pre-registered output IO implementation which match for the passing `ScioIO[T]` - * implementation. if not this will invoke [[com.spotify.scio.io.ScioIO[T]#write]] method along - * with write configurations passed by. + * implementation. If not, this will invoke [[com.spotify.scio.io.ScioIO[T]#write]] with the + * passed write configuration. * * @param io * an implementation of `ScioIO[T]` trait * @param params - * configurations need to pass to perform underline write implementation + * configurations need to pass to perform underlying write implementation */ def write(io: ScioIO[T])(params: io.WriteP): ClosedTap[io.tapT.T] = io.writeWithContext(this, params) diff --git a/scio-managed/src/main/scala/com/spotify/scio/iceberg/IcebergIO.scala b/scio-managed/src/main/scala/com/spotify/scio/iceberg/IcebergIO.scala index 5df642cb09..81c17bf662 100644 --- a/scio-managed/src/main/scala/com/spotify/scio/iceberg/IcebergIO.scala +++ b/scio-managed/src/main/scala/com/spotify/scio/iceberg/IcebergIO.scala @@ -36,44 +36,48 @@ final case class IcebergIO[T: RowType: Coder](table: String, catalogName: Option private lazy val beamRowCoder: RowCoder = RowCoder.of(rowType.schema) implicit private lazy val rowCoder: Coder[Row] = Coder.beam(beamRowCoder) - override def testId: String = s"IcebergIO($table, $catalogName)" + override def testId: String = s"IcebergIO(${(Some(table) ++ catalogName).mkString(", ")})" private def config( catalogProperties: Map[String, String], configProperties: Map[String, String] ): Map[String, AnyRef] = { - Map[String, AnyRef]( - "table" -> table, - "catalog_name" -> catalogName.orNull, - "catalog_properties" -> catalogProperties, - "config_properties" -> configProperties - ).filter(_._2 != null) + val b = Map.newBuilder[String, AnyRef] + b.addOne("table" -> table) + catalogName.foreach(name => b.addOne("catalog_name" -> name)) + Option(catalogProperties).foreach(p => b.addOne("catalog_properties" -> p)) + Option(configProperties).foreach(p => b.addOne("config_properties" -> p)) + b.result() } - override protected def read(sc: ScioContext, params: IcebergIO.ReadParam): SCollection[T] = - ManagedIO(Managed.ICEBERG, config(params.catalogProperties, params.configProperties)) - .readWithContext(sc, ManagedIO.ReadParam(rowType.schema)) - .map(rowType.from) + override protected def read(sc: ScioContext, params: IcebergIO.ReadParam): SCollection[T] = { + val io = ManagedIO(Managed.ICEBERG, config(params.catalogProperties, params.configProperties)) + sc.transform(_.read(io)(ManagedIO.ReadParam(rowType.schema)).map(rowType.from)) + } override protected def write(data: SCollection[T], params: IcebergIO.WriteParam): Tap[tapT.T] = { - val tx = data.transform { - _.map(rowType.to).setCoder(beamRowCoder) - } - ManagedIO(Managed.ICEBERG, config(params.catalogProperties, params.configProperties)) - .writeWithContext(tx, ManagedIO.WriteParam()) - .underlying -} + val io = ManagedIO(Managed.ICEBERG, config(params.catalogProperties, params.configProperties)) + data.map(rowType.to).setCoder(beamRowCoder).write(io).underlying + } override def tap(read: IcebergIO.ReadParam): Tap[tapT.T] = EmptyTap } object IcebergIO { case class ReadParam private ( - catalogProperties: Map[String, String] = null, - configProperties: Map[String, String] = null + catalogProperties: Map[String, String] = ReadParam.DefaultCatalogProperties, + configProperties: Map[String, String] = ReadParam.DefaultConfigProperties ) + object ReadParam { + val DefaultCatalogProperties: Map[String, String] = null + val DefaultConfigProperties: Map[String, String] = null + } case class WriteParam private ( - catalogProperties: Map[String, String] = null, - configProperties: Map[String, String] = null + catalogProperties: Map[String, String] = WriteParam.DefaultCatalogProperties, + configProperties: Map[String, String] = WriteParam.DefaultConfigProperties ) + object WriteParam { + val DefaultCatalogProperties: Map[String, String] = null + val DefaultConfigProperties: Map[String, String] = null + } } diff --git a/scio-managed/src/main/scala/com/spotify/scio/iceberg/syntax/IcebergSCollectionSyntax.scala b/scio-managed/src/main/scala/com/spotify/scio/iceberg/syntax/IcebergSCollectionSyntax.scala index c3e010ac15..86e3c6e007 100644 --- a/scio-managed/src/main/scala/com/spotify/scio/iceberg/syntax/IcebergSCollectionSyntax.scala +++ b/scio-managed/src/main/scala/com/spotify/scio/iceberg/syntax/IcebergSCollectionSyntax.scala @@ -26,8 +26,8 @@ class IcebergSCollectionSyntax[T: RowType: Coder](self: SCollection[T]) { def saveAsIceberg( table: String, catalogName: String = null, - catalogProperties: Map[String, String] = null, - configProperties: Map[String, String] = null + catalogProperties: Map[String, String] = IcebergIO.WriteParam.DefaultCatalogProperties, + configProperties: Map[String, String] = IcebergIO.WriteParam.DefaultConfigProperties ): ClosedTap[Nothing] = { val params = IcebergIO.WriteParam(catalogProperties, configProperties) self.write(IcebergIO(table, Option(catalogName)))(params) diff --git a/scio-managed/src/main/scala/com/spotify/scio/iceberg/syntax/IcebergScioContextSyntax.scala b/scio-managed/src/main/scala/com/spotify/scio/iceberg/syntax/IcebergScioContextSyntax.scala index b587ef413e..21ce3b698e 100644 --- a/scio-managed/src/main/scala/com/spotify/scio/iceberg/syntax/IcebergScioContextSyntax.scala +++ b/scio-managed/src/main/scala/com/spotify/scio/iceberg/syntax/IcebergScioContextSyntax.scala @@ -31,8 +31,8 @@ class IcebergScioContextSyntax(self: ScioContext) { def iceberg[T: Coder]( table: String, catalogName: String = null, - catalogProperties: Map[String, String] = null, - configProperties: Map[String, String] = null + catalogProperties: Map[String, String] = IcebergIO.ReadParam.DefaultCatalogProperties, + configProperties: Map[String, String] = IcebergIO.ReadParam.DefaultConfigProperties )(implicit rt: RowType[T]): SCollection[T] = { val params = IcebergIO.ReadParam(catalogProperties, configProperties) self.read(IcebergIO(table, Option(catalogName)))(params) diff --git a/scio-managed/src/main/scala/com/spotify/scio/managed/ManagedIO.scala b/scio-managed/src/main/scala/com/spotify/scio/managed/ManagedIO.scala index 9c63ed6ddb..557bed16f2 100644 --- a/scio-managed/src/main/scala/com/spotify/scio/managed/ManagedIO.scala +++ b/scio-managed/src/main/scala/com/spotify/scio/managed/ManagedIO.scala @@ -67,5 +67,5 @@ final case class ManagedIO(ioName: String, config: Map[String, Object]) extends object ManagedIO { final case class ReadParam(schema: Schema) - final case class WriteParam() + type WriteParam = Unit } diff --git a/scio-managed/src/main/scala/com/spotify/scio/managed/syntax/ManagedSCollectionSyntax.scala b/scio-managed/src/main/scala/com/spotify/scio/managed/syntax/ManagedSCollectionSyntax.scala index 7186e5b644..076c112c03 100644 --- a/scio-managed/src/main/scala/com/spotify/scio/managed/syntax/ManagedSCollectionSyntax.scala +++ b/scio-managed/src/main/scala/com/spotify/scio/managed/syntax/ManagedSCollectionSyntax.scala @@ -23,7 +23,7 @@ import org.apache.beam.sdk.values.Row class ManagedSCollectionSyntax(self: SCollection[Row]) { def saveAsManaged(sink: String, config: Map[String, AnyRef] = Map.empty): ClosedTap[Nothing] = - self.write(ManagedIO(sink, config))(ManagedIO.WriteParam()) + self.write(ManagedIO(sink, config)) } trait SCollectionSyntax { From cd76b91e9dc081ad25b0a17729cc266851089247 Mon Sep 17 00:00:00 2001 From: Kellen Dye Date: Wed, 18 Sep 2024 08:44:51 -0400 Subject: [PATCH 12/13] wip --- .../main/scala/com/spotify/scio/iceberg/IcebergIO.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/scio-managed/src/main/scala/com/spotify/scio/iceberg/IcebergIO.scala b/scio-managed/src/main/scala/com/spotify/scio/iceberg/IcebergIO.scala index 81c17bf662..4343cd3244 100644 --- a/scio-managed/src/main/scala/com/spotify/scio/iceberg/IcebergIO.scala +++ b/scio-managed/src/main/scala/com/spotify/scio/iceberg/IcebergIO.scala @@ -43,10 +43,10 @@ final case class IcebergIO[T: RowType: Coder](table: String, catalogName: Option configProperties: Map[String, String] ): Map[String, AnyRef] = { val b = Map.newBuilder[String, AnyRef] - b.addOne("table" -> table) - catalogName.foreach(name => b.addOne("catalog_name" -> name)) - Option(catalogProperties).foreach(p => b.addOne("catalog_properties" -> p)) - Option(configProperties).foreach(p => b.addOne("config_properties" -> p)) + b += ("table" -> table) + catalogName.foreach(name => b += ("catalog_name" -> name)) + Option(catalogProperties).foreach(p => b += ("catalog_properties" -> p)) + Option(configProperties).foreach(p => b += ("config_properties" -> p)) b.result() } From d3b67dc77c3c5e3dff8f3dc9c299f5ed941e742a Mon Sep 17 00:00:00 2001 From: kellen Date: Thu, 19 Sep 2024 09:36:02 -0400 Subject: [PATCH 13/13] Update scio-core/src/main/scala/com/spotify/scio/values/SCollection.scala Co-authored-by: Claire McGinty --- .../src/main/scala/com/spotify/scio/values/SCollection.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scio-core/src/main/scala/com/spotify/scio/values/SCollection.scala b/scio-core/src/main/scala/com/spotify/scio/values/SCollection.scala index 0f11c67ed2..ab03774f07 100644 --- a/scio-core/src/main/scala/com/spotify/scio/values/SCollection.scala +++ b/scio-core/src/main/scala/com/spotify/scio/values/SCollection.scala @@ -1740,7 +1740,7 @@ sealed trait SCollection[T] extends PCollectionWrapper[T] { * Generic write method for all `ScioIO[T]` implementations, if it is a test pipeline this will * evaluate pre-registered output IO implementation which match for the passing `ScioIO[T]` * implementation. If not, this will invoke [[com.spotify.scio.io.ScioIO[T]#write]] with the - * passed write configuration. + * provided write configuration. * * @param io * an implementation of `ScioIO[T]` trait