Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Snowflake (#5500) #5502

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open

Support Snowflake (#5500) #5502

wants to merge 3 commits into from

Conversation

turb
Copy link
Contributor

@turb turb commented Sep 25, 2024

First draft of SnowflakeIO for scio, using the one from Beam.

It has:

  • SnowflakeSelect (read only), allowing to read from a select
  • SnowflakeTable (rw), allowing to read or write from/to a table
  • ScioContext and SCollection implicits

Notes:

Copy link

codecov bot commented Sep 25, 2024

Codecov Report

Attention: Patch coverage is 0% with 50 lines in your changes missing coverage. Please review.

Project coverage is 61.16%. Comparing base (14c7077) to head (25c5a76).
Report is 6 commits behind head on main.

Files with missing lines Patch % Lines
...scala/com/spotify/scio/snowflake/SnowflakeIO.scala 0.00% 38 Missing ⚠️
...tify/scio/snowflake/syntax/SCollectionSyntax.scala 0.00% 10 Missing ⚠️
...tify/scio/snowflake/syntax/ScioContextSyntax.scala 0.00% 2 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #5502      +/-   ##
==========================================
- Coverage   61.41%   61.16%   -0.25%     
==========================================
  Files         312      315       +3     
  Lines       11106    11153      +47     
  Branches      771      771              
==========================================
+ Hits         6821     6822       +1     
- Misses       4285     4331      +46     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@turb turb force-pushed the snowflake branch 2 times, most recently from d179ff2 to 9f6010d Compare September 25, 2024 16:37
@turb
Copy link
Contributor Author

turb commented Sep 25, 2024

The build wants me to have "com.nrinaudo" %% "kantan.csv" % kantanCsvVersion as a runtime dep, but then it cannot compile. I thought that a compile dep was also supposed to be in runtime (at least in the Maven model).

@turb turb force-pushed the snowflake branch 3 times, most recently from 8c434ed to 74d15ae Compare October 16, 2024 12:11
@turb
Copy link
Contributor Author

turb commented Oct 16, 2024

@RustedBones I finally had the time to fix this build :)

@RustedBones
Copy link
Contributor

Sorry for the delay. Will look at it this week !

build.sbt Outdated
)
.settings(commonSettings)
.settings(
description := "Scio add-on for Neo4J",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
description := "Scio add-on for Neo4J",
description := "Scio add-on for Snowflake",

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixup 42a4674


package com.spotify.scio.snowflake

trait SnowflakeAuthenticationOptions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
trait SnowflakeAuthenticationOptions
sealed trait SnowflakeAuthenticationOptions

To avoid the match may not be exhaustive warning

Comment on lines 50 to 54
.withWarehouse(connectionOptions.warehouse)

connectionOptions.schema
.map(schema => datasourceBeforeSchema.withSchema(schema))
.getOrElse(datasourceBeforeSchema)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: to avoid using intermediate variable, you can use syntactic sugar from scala.util.chaining._.
(can be done above too)

Suggested change
.withWarehouse(connectionOptions.warehouse)
connectionOptions.schema
.map(schema => datasourceBeforeSchema.withSchema(schema))
.getOrElse(datasourceBeforeSchema)
.withWarehouse(connectionOptions.warehouse)
.pipe(ds => connectionOptions.schema.fold(ds)(ds.withSchema))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TIL
fixup 25c5a76

new CsvMapper[T] {
override def mapRow(parts: Array[String]): T = {
val unsnowedParts = parts.map {
case "\\N" => "" // needs to be mapped to an Option
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I never used Snowflake. Can you give some context for this case ?

Copy link
Contributor Author

@turb turb Nov 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SnowflakeIO from Apache Beam uses COPY from Snowflake, that simply exports to CSV, that is then read from storage. And this COPY uses \N to represent null.

Thing is, kantan maps empty string with None, so we provide it.

Comment on lines +96 to +100
final case class SnowflakeOptions(
connectionOptions: SnowflakeConnectionOptions,
stagingBucketName: String,
storageIntegrationName: String
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to make a distinction between Options and Params:

  • Options, passed in the IO constructor, are used to iodentify distinct sources
  • Params of an IO are some tuning parameters to consume the IO.

We need to split the models here to reflect that

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am unsure where each should stand:

  • connectionOptions are well, connection options
  • stagingBucketName is a bucket used by the IO to drop/store the CSV files used by COPY, so that's a temp storage not accessed by the user.
  • storageIntegrationName gives the name of a Snowflake abstraction linking to this storage

I have the feeling all that should go in Options, since they are linked to potentially distinct sources, and not tuning paramters, but maybe I'm mistaken?

override def mapRow(element: T): Array[AnyRef] = rowEncoder.encode(element).toArray
})
)
EmptyTap
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't it possible to create a tap ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TBH I now little about taps. The IO I played with (JDBC and Neo4J) are only using EmptyTap. If I understand it well, maybe it is possible to map it to the stored COPYed CSV file?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants