Skip to content

Commit

Permalink
Support Snowflake (#5500)
Browse files Browse the repository at this point in the history
  • Loading branch information
turb committed Sep 25, 2024
1 parent b2c4ff1 commit dda5487
Show file tree
Hide file tree
Showing 7 changed files with 476 additions and 0 deletions.
17 changes: 17 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -671,6 +671,7 @@ lazy val scio = project
`scio-redis`,
`scio-repl`,
`scio-smb`,
`scio-snowflake`,
`scio-tensorflow`,
`scio-test-core`,
`scio-test-google-cloud-platform`,
Expand Down Expand Up @@ -1265,6 +1266,22 @@ lazy val `scio-parquet` = project
)
)

lazy val `scio-snowflake` = project
.in(file("scio-snowflake"))
.dependsOn(
`scio-core` % "compile;test->test"
)
.settings(commonSettings)
.settings(
description := "Scio add-on for Neo4J",
libraryDependencies ++= Seq(
// compile
"org.apache.beam" % "beam-sdks-java-core" % beamVersion,
"org.apache.beam" % "beam-sdks-java-io-snowflake" % beamVersion,
"com.nrinaudo" %% "kantan.csv" % kantanCsvVersion,
)
)

val tensorFlowMetadataSourcesDir =
settingKey[File]("Directory containing TensorFlow metadata proto files")
val tensorFlowMetadata = taskKey[Seq[File]]("Retrieve TensorFlow metadata proto files")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
* Copyright 2024 Spotify AB.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package com.spotify.scio.snowflake

import com.spotify.scio.ScioContext
import com.spotify.scio.coders.{Coder, CoderMaterializer}
import com.spotify.scio.io.{EmptyTap, EmptyTapOf, ScioIO, Tap, TapT}
import com.spotify.scio.values.SCollection
import kantan.csv.{RowDecoder, RowEncoder}
import org.apache.beam.sdk.io.snowflake.SnowflakeIO.{CsvMapper, UserDataMapper}
import org.apache.beam.sdk.io.{snowflake => beam}

object SnowflakeIO {

private[snowflake] def dataSourceConfiguration(connectionOptions: SnowflakeConnectionOptions) = {

val datasourceInitial = beam.SnowflakeIO.DataSourceConfiguration
.create()

val datasourceWithAuthent = connectionOptions.authenticationOptions match {
case SnowflakeUsernamePasswordAuthenticationOptions(username, password) =>
datasourceInitial.withUsernamePasswordAuth(username, password)
case SnowflakeKeyPairAuthenticationOptions(username, privateKeyPath, None) =>
datasourceInitial.withKeyPairPathAuth(username, privateKeyPath)
case SnowflakeKeyPairAuthenticationOptions(username, privateKeyPath, Some(passphrase)) =>
datasourceInitial.withKeyPairPathAuth(username, privateKeyPath, passphrase)
case SnowflakeOAuthTokenAuthenticationOptions(token) =>
datasourceInitial.withOAuth(token)
}

val datasourceBeforeSchema = datasourceWithAuthent
.withServerName(connectionOptions.serverName)
.withDatabase(connectionOptions.database)
.withRole(connectionOptions.role)
.withWarehouse(connectionOptions.warehouse)

connectionOptions.schema
.map(schema => datasourceBeforeSchema.withSchema(schema))
.getOrElse(datasourceBeforeSchema)
}

private[snowflake] def buildCsvMapper[T](rowDecoder: RowDecoder[T]): CsvMapper[T] =
new CsvMapper[T] {
override def mapRow(parts: Array[String]): T = {
val unsnowedParts = parts.map {
case "\\N" => "" // needs to be mapped to an Option
case other => other
}.toSeq
rowDecoder.unsafeDecode(unsnowedParts)
}
}

private[snowflake] def prepareRead[T](
snowflakeOptions: SnowflakeOptions,
sc: ScioContext
)(implicit rowDecoder: RowDecoder[T], coder: Coder[T]): beam.SnowflakeIO.Read[T] =
beam.SnowflakeIO
.read()
.withDataSourceConfiguration(
SnowflakeIO.dataSourceConfiguration(snowflakeOptions.connectionOptions)
)
.withStagingBucketName(snowflakeOptions.stagingBucketName)
.withStorageIntegrationName(snowflakeOptions.storageIntegrationName)
.withCsvMapper(buildCsvMapper(rowDecoder))
.withCoder(CoderMaterializer.beam(sc, coder))
}

sealed trait SnowflakeIO[T] extends ScioIO[T]

final case class SnowflakeSelect[T](snowflakeOptions: SnowflakeOptions, select: String)(implicit
rowDecoder: RowDecoder[T],
coder: Coder[T]
) extends SnowflakeIO[T] {

override type ReadP = Unit
override type WriteP = Unit
override val tapT: TapT.Aux[T, Nothing] = EmptyTapOf[T]

override protected def read(sc: ScioContext, params: ReadP): SCollection[T] =
sc.applyTransform(SnowflakeIO.prepareRead(snowflakeOptions, sc).fromQuery(select))

override protected def write(data: SCollection[T], params: WriteP): Tap[Nothing] =
throw new UnsupportedOperationException("SnowflakeSelect is read-only")

override def tap(params: ReadP): Tap[Nothing] = EmptyTap
}

final case class SnowflakeTable[T](snowflakeOptions: SnowflakeOptions, table: String)(implicit
rowDecoder: RowDecoder[T],
rowEncoder: RowEncoder[T],
coder: Coder[T]
) extends SnowflakeIO[T] {

override type ReadP = Unit
override type WriteP = Unit
override val tapT: TapT.Aux[T, Nothing] = EmptyTapOf[T]

override protected def read(sc: ScioContext, params: ReadP): SCollection[T] =
sc.applyTransform(SnowflakeIO.prepareRead(snowflakeOptions, sc).fromTable(table))

override protected def write(data: SCollection[T], params: WriteP): Tap[Nothing] = {
data.applyInternal(
beam.SnowflakeIO
.write[T]()
.withDataSourceConfiguration(
SnowflakeIO.dataSourceConfiguration(snowflakeOptions.connectionOptions)
)
.to(table)
.withStagingBucketName(snowflakeOptions.stagingBucketName)
.withStorageIntegrationName(snowflakeOptions.storageIntegrationName)
.withUserDataMapper(new UserDataMapper[T] {
override def mapRow(element: T): Array[AnyRef] = rowEncoder.encode(element).toArray
})
)
EmptyTap
}

override def tap(params: ReadP): Tap[Nothing] = EmptyTap
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Copyright 2024 Spotify AB.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package com.spotify.scio.snowflake

trait SnowflakeAuthenticationOptions

/**
* Options for a Snowflake username/password authentication.
*
* @param username
* username
* @param password
* password
*/
final case class SnowflakeUsernamePasswordAuthenticationOptions(
username: String,
password: String
) extends SnowflakeAuthenticationOptions

/**
* Options for a Snowflake key pair authentication.
*
* @param username
* username
* @param privateKeyPath
* path to the private key
* @param privateKeyPassphrase
* passphrase for the private key (optional)
*/
final case class SnowflakeKeyPairAuthenticationOptions(
username: String,
privateKeyPath: String,
privateKeyPassphrase: Option[String]
) extends SnowflakeAuthenticationOptions

/**
* Options for a Snowflake OAuth token authentication.
*
* @param token
* OAuth token
*/
final case class SnowflakeOAuthTokenAuthenticationOptions(
token: String
) extends SnowflakeAuthenticationOptions

/**
* Options for a Snowflake connection.
*
* @param authenticationOptions
* authentication options
* @param serverName
* server name (e.g. "account.region.snowflakecomputing.com")
* @param database
* database name
* @param role
* role name
* @param warehouse
* warehouse name
* @param schema
* schema name (optional)
*/
final case class SnowflakeConnectionOptions(
authenticationOptions: SnowflakeAuthenticationOptions,
serverName: String,
database: String,
role: String,
warehouse: String,
schema: Option[String]
)

/**
* Options for configuring a Neo4J driver.
*
* @param connectionOptions
* connection options
* @param stagingBucketName
* Snowflake staging bucket name where CSV files will be stored
* @param storageIntegrationName
* Storage integration name as created in Snowflake
*/
final case class SnowflakeOptions(
connectionOptions: SnowflakeConnectionOptions,
stagingBucketName: String,
storageIntegrationName: String
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright 2024 Spotify AB.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package com.spotify.scio

import com.spotify.scio.snowflake.syntax.AllSyntax

/**
* Main package for Snowflake APIs. Import all.
*
* {{{
* import com.spotify.scio.snowflake._
* }}}
*/
package object snowflake extends AllSyntax
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright 2024 Spotify AB.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package com.spotify.scio.snowflake.syntax

trait AllSyntax extends ScioContextSyntax with SCollectionSyntax
Loading

0 comments on commit dda5487

Please sign in to comment.