From dda5487517d89809fbecf1efe3eaa0363b42c9ed Mon Sep 17 00:00:00 2001 From: "sveyrie@luminatedata.com" Date: Wed, 25 Sep 2024 15:42:05 +0200 Subject: [PATCH] Support Snowflake (#5500) --- build.sbt | 17 +++ .../spotify/scio/snowflake/SnowflakeIO.scala | 134 ++++++++++++++++++ .../scio/snowflake/SnowflakeOptions.scala | 100 +++++++++++++ .../com/spotify/scio/snowflake/package.scala | 29 ++++ .../scio/snowflake/syntax/AllSyntax.scala | 20 +++ .../snowflake/syntax/SCollectionSyntax.scala | 129 +++++++++++++++++ .../snowflake/syntax/ScioContextSyntax.scala | 47 ++++++ 7 files changed, 476 insertions(+) create mode 100644 scio-snowflake/src/main/scala/com/spotify/scio/snowflake/SnowflakeIO.scala create mode 100644 scio-snowflake/src/main/scala/com/spotify/scio/snowflake/SnowflakeOptions.scala create mode 100644 scio-snowflake/src/main/scala/com/spotify/scio/snowflake/package.scala create mode 100644 scio-snowflake/src/main/scala/com/spotify/scio/snowflake/syntax/AllSyntax.scala create mode 100644 scio-snowflake/src/main/scala/com/spotify/scio/snowflake/syntax/SCollectionSyntax.scala create mode 100644 scio-snowflake/src/main/scala/com/spotify/scio/snowflake/syntax/ScioContextSyntax.scala diff --git a/build.sbt b/build.sbt index 19be3a0f04..dd3051855d 100644 --- a/build.sbt +++ b/build.sbt @@ -671,6 +671,7 @@ lazy val scio = project `scio-redis`, `scio-repl`, `scio-smb`, + `scio-snowflake`, `scio-tensorflow`, `scio-test-core`, `scio-test-google-cloud-platform`, @@ -1265,6 +1266,22 @@ lazy val `scio-parquet` = project ) ) +lazy val `scio-snowflake` = project + .in(file("scio-snowflake")) + .dependsOn( + `scio-core` % "compile;test->test" + ) + .settings(commonSettings) + .settings( + description := "Scio add-on for Neo4J", + libraryDependencies ++= Seq( + // compile + "org.apache.beam" % "beam-sdks-java-core" % beamVersion, + "org.apache.beam" % "beam-sdks-java-io-snowflake" % beamVersion, + "com.nrinaudo" %% "kantan.csv" % kantanCsvVersion, + ) + ) + val tensorFlowMetadataSourcesDir = settingKey[File]("Directory containing TensorFlow metadata proto files") val tensorFlowMetadata = taskKey[Seq[File]]("Retrieve TensorFlow metadata proto files") diff --git a/scio-snowflake/src/main/scala/com/spotify/scio/snowflake/SnowflakeIO.scala b/scio-snowflake/src/main/scala/com/spotify/scio/snowflake/SnowflakeIO.scala new file mode 100644 index 0000000000..5984910857 --- /dev/null +++ b/scio-snowflake/src/main/scala/com/spotify/scio/snowflake/SnowflakeIO.scala @@ -0,0 +1,134 @@ +/* + * Copyright 2024 Spotify AB. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.spotify.scio.snowflake + +import com.spotify.scio.ScioContext +import com.spotify.scio.coders.{Coder, CoderMaterializer} +import com.spotify.scio.io.{EmptyTap, EmptyTapOf, ScioIO, Tap, TapT} +import com.spotify.scio.values.SCollection +import kantan.csv.{RowDecoder, RowEncoder} +import org.apache.beam.sdk.io.snowflake.SnowflakeIO.{CsvMapper, UserDataMapper} +import org.apache.beam.sdk.io.{snowflake => beam} + +object SnowflakeIO { + + private[snowflake] def dataSourceConfiguration(connectionOptions: SnowflakeConnectionOptions) = { + + val datasourceInitial = beam.SnowflakeIO.DataSourceConfiguration + .create() + + val datasourceWithAuthent = connectionOptions.authenticationOptions match { + case SnowflakeUsernamePasswordAuthenticationOptions(username, password) => + datasourceInitial.withUsernamePasswordAuth(username, password) + case SnowflakeKeyPairAuthenticationOptions(username, privateKeyPath, None) => + datasourceInitial.withKeyPairPathAuth(username, privateKeyPath) + case SnowflakeKeyPairAuthenticationOptions(username, privateKeyPath, Some(passphrase)) => + datasourceInitial.withKeyPairPathAuth(username, privateKeyPath, passphrase) + case SnowflakeOAuthTokenAuthenticationOptions(token) => + datasourceInitial.withOAuth(token) + } + + val datasourceBeforeSchema = datasourceWithAuthent + .withServerName(connectionOptions.serverName) + .withDatabase(connectionOptions.database) + .withRole(connectionOptions.role) + .withWarehouse(connectionOptions.warehouse) + + connectionOptions.schema + .map(schema => datasourceBeforeSchema.withSchema(schema)) + .getOrElse(datasourceBeforeSchema) + } + + private[snowflake] def buildCsvMapper[T](rowDecoder: RowDecoder[T]): CsvMapper[T] = + new CsvMapper[T] { + override def mapRow(parts: Array[String]): T = { + val unsnowedParts = parts.map { + case "\\N" => "" // needs to be mapped to an Option + case other => other + }.toSeq + rowDecoder.unsafeDecode(unsnowedParts) + } + } + + private[snowflake] def prepareRead[T]( + snowflakeOptions: SnowflakeOptions, + sc: ScioContext + )(implicit rowDecoder: RowDecoder[T], coder: Coder[T]): beam.SnowflakeIO.Read[T] = + beam.SnowflakeIO + .read() + .withDataSourceConfiguration( + SnowflakeIO.dataSourceConfiguration(snowflakeOptions.connectionOptions) + ) + .withStagingBucketName(snowflakeOptions.stagingBucketName) + .withStorageIntegrationName(snowflakeOptions.storageIntegrationName) + .withCsvMapper(buildCsvMapper(rowDecoder)) + .withCoder(CoderMaterializer.beam(sc, coder)) +} + +sealed trait SnowflakeIO[T] extends ScioIO[T] + +final case class SnowflakeSelect[T](snowflakeOptions: SnowflakeOptions, select: String)(implicit + rowDecoder: RowDecoder[T], + coder: Coder[T] +) extends SnowflakeIO[T] { + + override type ReadP = Unit + override type WriteP = Unit + override val tapT: TapT.Aux[T, Nothing] = EmptyTapOf[T] + + override protected def read(sc: ScioContext, params: ReadP): SCollection[T] = + sc.applyTransform(SnowflakeIO.prepareRead(snowflakeOptions, sc).fromQuery(select)) + + override protected def write(data: SCollection[T], params: WriteP): Tap[Nothing] = + throw new UnsupportedOperationException("SnowflakeSelect is read-only") + + override def tap(params: ReadP): Tap[Nothing] = EmptyTap +} + +final case class SnowflakeTable[T](snowflakeOptions: SnowflakeOptions, table: String)(implicit + rowDecoder: RowDecoder[T], + rowEncoder: RowEncoder[T], + coder: Coder[T] +) extends SnowflakeIO[T] { + + override type ReadP = Unit + override type WriteP = Unit + override val tapT: TapT.Aux[T, Nothing] = EmptyTapOf[T] + + override protected def read(sc: ScioContext, params: ReadP): SCollection[T] = + sc.applyTransform(SnowflakeIO.prepareRead(snowflakeOptions, sc).fromTable(table)) + + override protected def write(data: SCollection[T], params: WriteP): Tap[Nothing] = { + data.applyInternal( + beam.SnowflakeIO + .write[T]() + .withDataSourceConfiguration( + SnowflakeIO.dataSourceConfiguration(snowflakeOptions.connectionOptions) + ) + .to(table) + .withStagingBucketName(snowflakeOptions.stagingBucketName) + .withStorageIntegrationName(snowflakeOptions.storageIntegrationName) + .withUserDataMapper(new UserDataMapper[T] { + override def mapRow(element: T): Array[AnyRef] = rowEncoder.encode(element).toArray + }) + ) + EmptyTap + } + + override def tap(params: ReadP): Tap[Nothing] = EmptyTap +} diff --git a/scio-snowflake/src/main/scala/com/spotify/scio/snowflake/SnowflakeOptions.scala b/scio-snowflake/src/main/scala/com/spotify/scio/snowflake/SnowflakeOptions.scala new file mode 100644 index 0000000000..4ee8aaaefb --- /dev/null +++ b/scio-snowflake/src/main/scala/com/spotify/scio/snowflake/SnowflakeOptions.scala @@ -0,0 +1,100 @@ +/* + * Copyright 2024 Spotify AB. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.spotify.scio.snowflake + +trait SnowflakeAuthenticationOptions + +/** + * Options for a Snowflake username/password authentication. + * + * @param username + * username + * @param password + * password + */ +final case class SnowflakeUsernamePasswordAuthenticationOptions( + username: String, + password: String +) extends SnowflakeAuthenticationOptions + +/** + * Options for a Snowflake key pair authentication. + * + * @param username + * username + * @param privateKeyPath + * path to the private key + * @param privateKeyPassphrase + * passphrase for the private key (optional) + */ +final case class SnowflakeKeyPairAuthenticationOptions( + username: String, + privateKeyPath: String, + privateKeyPassphrase: Option[String] +) extends SnowflakeAuthenticationOptions + +/** + * Options for a Snowflake OAuth token authentication. + * + * @param token + * OAuth token + */ +final case class SnowflakeOAuthTokenAuthenticationOptions( + token: String +) extends SnowflakeAuthenticationOptions + +/** + * Options for a Snowflake connection. + * + * @param authenticationOptions + * authentication options + * @param serverName + * server name (e.g. "account.region.snowflakecomputing.com") + * @param database + * database name + * @param role + * role name + * @param warehouse + * warehouse name + * @param schema + * schema name (optional) + */ +final case class SnowflakeConnectionOptions( + authenticationOptions: SnowflakeAuthenticationOptions, + serverName: String, + database: String, + role: String, + warehouse: String, + schema: Option[String] +) + +/** + * Options for configuring a Neo4J driver. + * + * @param connectionOptions + * connection options + * @param stagingBucketName + * Snowflake staging bucket name where CSV files will be stored + * @param storageIntegrationName + * Storage integration name as created in Snowflake + */ +final case class SnowflakeOptions( + connectionOptions: SnowflakeConnectionOptions, + stagingBucketName: String, + storageIntegrationName: String +) diff --git a/scio-snowflake/src/main/scala/com/spotify/scio/snowflake/package.scala b/scio-snowflake/src/main/scala/com/spotify/scio/snowflake/package.scala new file mode 100644 index 0000000000..2c80313e7a --- /dev/null +++ b/scio-snowflake/src/main/scala/com/spotify/scio/snowflake/package.scala @@ -0,0 +1,29 @@ +/* + * Copyright 2024 Spotify AB. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.spotify.scio + +import com.spotify.scio.snowflake.syntax.AllSyntax + +/** + * Main package for Snowflake APIs. Import all. + * + * {{{ + * import com.spotify.scio.snowflake._ + * }}} + */ +package object snowflake extends AllSyntax diff --git a/scio-snowflake/src/main/scala/com/spotify/scio/snowflake/syntax/AllSyntax.scala b/scio-snowflake/src/main/scala/com/spotify/scio/snowflake/syntax/AllSyntax.scala new file mode 100644 index 0000000000..3ce04e0130 --- /dev/null +++ b/scio-snowflake/src/main/scala/com/spotify/scio/snowflake/syntax/AllSyntax.scala @@ -0,0 +1,20 @@ +/* + * Copyright 2024 Spotify AB. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.spotify.scio.snowflake.syntax + +trait AllSyntax extends ScioContextSyntax with SCollectionSyntax diff --git a/scio-snowflake/src/main/scala/com/spotify/scio/snowflake/syntax/SCollectionSyntax.scala b/scio-snowflake/src/main/scala/com/spotify/scio/snowflake/syntax/SCollectionSyntax.scala new file mode 100644 index 0000000000..1ff2c3f235 --- /dev/null +++ b/scio-snowflake/src/main/scala/com/spotify/scio/snowflake/syntax/SCollectionSyntax.scala @@ -0,0 +1,129 @@ +/* + * Copyright 2024 Spotify AB. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.spotify.scio.snowflake.syntax + +import com.spotify.scio.coders.Coder +import com.spotify.scio.io.{EmptyTap, Tap} +import com.spotify.scio.snowflake.SnowflakeOptions +import com.spotify.scio.values.SCollection +import kantan.csv.{RowDecoder, RowEncoder} +import org.apache.beam.sdk.io.snowflake.SnowflakeIO.UserDataMapper +import org.apache.beam.sdk.io.{snowflake => beam} + +/** + * Enhanced version of [[com.spotify.scio.values.SCollection SCollection]] with Snowflake methods. + */ +final class SnowflakeSCollectionOps[T](private val self: SCollection[T]) extends AnyVal { + + import com.spotify.scio.snowflake.SnowflakeIO._ + + /** + * Execute the provided SQL query in Snowflake, COPYing the result in CSV format to the provided + * bucket, and return an [[SCollection]] of provided type, reading this bucket. + * + * The [[SCollection]] is generated using [[kantan.csv.RowDecoded]]. [[SCollection]] type + * properties must then match the order of the columns of the SELECT, that will be copied to the + * bucket. + * + * @see + * ''Reading from Snowflake'' in the + * [[https://beam.apache.org/documentation/io/built-in/snowflake/ Beam `SnowflakeIO` documentation]] + * @param snowflakeConf + * options for configuring a Snowflake integration + * @param query + * SQL select query + * @return + * [[SCollection]] containing the query results as parsed from the CSV bucket copied from + * Snowflake + */ + def snowflakeSelect[U]( + snowflakeConf: SnowflakeOptions, + query: String + )(implicit + rowDecoder: RowDecoder[U], + coder: Coder[U] + ): SCollection[U] = + self.context.applyTransform(prepareRead(snowflakeConf, self.context).fromQuery(query)) + + /** + * Copy the provided Snowflake table in CSV format to the provided bucket, and * return an + * [[SCollection]] of provided type, reading this bucket. + * + * The [[SCollection]] is generated using [[kantan.csv.RowDecoded]]. [[SCollection]] type + * properties must then match the order of the columns of the table, that will be copied to the + * bucket. + * + * @see + * ''Reading from Snowflake'' in the + * [[https://beam.apache.org/documentation/io/built-in/snowflake/ Beam `SnowflakeIO` documentation]] + * @param snowflakeConf + * options for configuring a Snowflake integration + * @param table + * table + * @return + * [[SCollection]] containing the table elements as parsed from the CSV bucket copied from + * Snowflake table + */ + def snowflakeTable[U]( + snowflakeConf: SnowflakeOptions, + table: String + )(implicit + rowDecoder: RowDecoder[U], + coder: Coder[U] + ): SCollection[U] = + self.context.applyTransform(prepareRead(snowflakeConf, self.context).fromTable(table)) + + /** + * Save this SCollection as a Snowflake database table. The [[SCollection]] is written to CSV + * files in a bucket, using the provided [[kantan.csv.RowEncoder]] to encode each element as a CSV + * row. The bucket is then COPYied to the Snowflake table. + * + * @see + * ''Writing to Snowflake tables'' in the + * [[https://beam.apache.org/documentation/io/built-in/snowflake/ Beam `SnowflakeIO` documentation]] + * + * @param snowflakeOptions + * options for configuring a Snowflake connexion + * @param table + * Snowflake table + */ + def saveAsSnowflakeTable( + snowflakeOptions: SnowflakeOptions, + table: String + )(implicit rowEncoder: RowEncoder[T], coder: Coder[T]): Tap[Nothing] = { + self.applyInternal( + beam.SnowflakeIO + .write[T]() + .withDataSourceConfiguration( + dataSourceConfiguration(snowflakeOptions.connectionOptions) + ) + .to(table) + .withStagingBucketName(snowflakeOptions.stagingBucketName) + .withStorageIntegrationName(snowflakeOptions.storageIntegrationName) + .withUserDataMapper(new UserDataMapper[T] { + override def mapRow(element: T): Array[AnyRef] = rowEncoder.encode(element).toArray + }) + ) + EmptyTap + } +} + +trait SCollectionSyntax { + implicit def snowflakeSCollectionOps[T](sc: SCollection[T]): SnowflakeSCollectionOps[T] = + new SnowflakeSCollectionOps(sc) +} diff --git a/scio-snowflake/src/main/scala/com/spotify/scio/snowflake/syntax/ScioContextSyntax.scala b/scio-snowflake/src/main/scala/com/spotify/scio/snowflake/syntax/ScioContextSyntax.scala new file mode 100644 index 0000000000..846f437da2 --- /dev/null +++ b/scio-snowflake/src/main/scala/com/spotify/scio/snowflake/syntax/ScioContextSyntax.scala @@ -0,0 +1,47 @@ +/* + * Copyright 2024 Spotify AB. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.spotify.scio.snowflake.syntax + +import com.spotify.scio.ScioContext +import com.spotify.scio.coders.Coder +import com.spotify.scio.snowflake.{SnowflakeOptions, SnowflakeSelect} +import com.spotify.scio.values.SCollection +import kantan.csv.RowDecoder + +/** Enhanced version of [[ScioContext]] with Snowflake methods. */ +final class SnowflakeScioContextOps(private val self: ScioContext) extends AnyVal { + + /** + * Get an SCollection for a Snowflake SQL query + * + * @param snowflakeOptions + * options for configuring a Snowflake connexion + * @param query + * Snowflake SQL select query + */ + def snowflakeQuery[T: RowDecoder: Coder]( + snowflakeOptions: SnowflakeOptions, + query: String + ): SCollection[T] = + self.read(SnowflakeSelect(snowflakeOptions, query)) + +} +trait ScioContextSyntax { + implicit def snowflakeScioContextOps(sc: ScioContext): SnowflakeScioContextOps = + new SnowflakeScioContextOps(sc) +}