-
Notifications
You must be signed in to change notification settings - Fork 513
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support Snowflake (#5500) #5502
base: main
Are you sure you want to change the base?
Conversation
b53021e
to
1cf2986
Compare
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #5502 +/- ##
==========================================
- Coverage 61.41% 61.16% -0.25%
==========================================
Files 312 315 +3
Lines 11106 11153 +47
Branches 771 771
==========================================
+ Hits 6821 6822 +1
- Misses 4285 4331 +46 ☔ View full report in Codecov by Sentry. |
d179ff2
to
9f6010d
Compare
The build wants me to have |
8c434ed
to
74d15ae
Compare
@RustedBones I finally had the time to fix this build :) |
Sorry for the delay. Will look at it this week ! |
build.sbt
Outdated
) | ||
.settings(commonSettings) | ||
.settings( | ||
description := "Scio add-on for Neo4J", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
description := "Scio add-on for Neo4J", | |
description := "Scio add-on for Snowflake", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixup 42a4674
|
||
package com.spotify.scio.snowflake | ||
|
||
trait SnowflakeAuthenticationOptions |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
trait SnowflakeAuthenticationOptions | |
sealed trait SnowflakeAuthenticationOptions |
To avoid the match may not be exhaustive
warning
.withWarehouse(connectionOptions.warehouse) | ||
|
||
connectionOptions.schema | ||
.map(schema => datasourceBeforeSchema.withSchema(schema)) | ||
.getOrElse(datasourceBeforeSchema) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: to avoid using intermediate variable, you can use syntactic sugar from scala.util.chaining._
.
(can be done above too)
.withWarehouse(connectionOptions.warehouse) | |
connectionOptions.schema | |
.map(schema => datasourceBeforeSchema.withSchema(schema)) | |
.getOrElse(datasourceBeforeSchema) | |
.withWarehouse(connectionOptions.warehouse) | |
.pipe(ds => connectionOptions.schema.fold(ds)(ds.withSchema)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TIL
fixup 25c5a76
new CsvMapper[T] { | ||
override def mapRow(parts: Array[String]): T = { | ||
val unsnowedParts = parts.map { | ||
case "\\N" => "" // needs to be mapped to an Option |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I never used Snowflake. Can you give some context for this case ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SnowflakeIO
from Apache Beam uses COPY
from Snowflake, that simply exports to CSV, that is then read from storage. And this COPY
uses \N
to represent null
.
Thing is, kantan maps empty string with None
, so we provide it.
final case class SnowflakeOptions( | ||
connectionOptions: SnowflakeConnectionOptions, | ||
stagingBucketName: String, | ||
storageIntegrationName: String | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to make a distinction between Options
and Params
:
Options
, passed in the IO constructor, are used to iodentify distinct sourcesParams
of an IO are some tuning parameters to consume the IO.
We need to split the models here to reflect that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am unsure where each should stand:
connectionOptions
are well, connection optionsstagingBucketName
is a bucket used by the IO to drop/store the CSV files used byCOPY
, so that's a temp storage not accessed by the user.storageIntegrationName
gives the name of a Snowflake abstraction linking to this storage
I have the feeling all that should go in Options
, since they are linked to potentially distinct sources, and not tuning paramters, but maybe I'm mistaken?
override def mapRow(element: T): Array[AnyRef] = rowEncoder.encode(element).toArray | ||
}) | ||
) | ||
EmptyTap |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't it possible to create a tap ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TBH I now little about taps. The IO I played with (JDBC and Neo4J) are only using EmptyTap
. If I understand it well, maybe it is possible to map it to the stored COPYed CSV file?
First draft of SnowflakeIO for scio, using the one from Beam.
It has:
ScioContext
andSCollection
implicitsNotes: