-
Notifications
You must be signed in to change notification settings - Fork 513
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Managed + Iceberg IO #5494
base: main
Are you sure you want to change the base?
Managed + Iceberg IO #5494
Conversation
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## main #5494 +/- ##
==========================================
- Coverage 61.32% 61.32% -0.01%
==========================================
Files 312 312
Lines 11080 11082 +2
Branches 770 728 -42
==========================================
+ Hits 6795 6796 +1
- Misses 4285 4286 +1 ☔ View full report in Codecov by Sentry. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add the new module in the README
?
new Schema( | ||
NestedField.required(0, "a", IntegerType.get()), | ||
NestedField.required(1, "b", StringType.get()) | ||
), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't this be given by the RowType
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, this is an Iceberg schema rather than the Beam schema. The lack of create-on-write does raise the question of whether we also need to derive the iceberg schemas
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe RowType
could also offer a def icebergSchema
? (Similar to how magnolify-parquet has both def schema
and def avroSchema
...)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That would mean pulling in iceberg deps into the beam module fyi
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could make it provided, I guess, but point taken
Seems like Beam should have a utility function for converting between Beam/Icerberg Schemas. They have similar stuff for BQ/Avro/BeamSchema interop. Maybe we could contribute there
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Beam does have this but it introduces a dep on the iceberg part of the sdk that in theory should be managed.
I could use it in the integration test directly but that wouldn't help users at all.
IcebergUtils.beamSchemaToIcebergSchema(rowType.schema)
OTOH ... the class has this comment so 🤷
// This is made public for users convenience, as many may have more experience working with
// Iceberg types.
scio-managed/src/main/scala/com/spotify/scio/iceberg/IcebergIO.scala
Outdated
Show resolved
Hide resolved
scio-managed/src/main/scala/com/spotify/scio/iceberg/IcebergIO.scala
Outdated
Show resolved
Hide resolved
scio-managed/src/main/scala/com/spotify/scio/iceberg/IcebergIO.scala
Outdated
Show resolved
Hide resolved
private lazy val _config: java.util.Map[String, Object] = { | ||
// recursively convert this yaml-compatible nested scala map to java map | ||
// we either do this or the user has to create nested java maps in scala code | ||
// both are bad | ||
def _convert(a: Object): Object = { | ||
a match { | ||
case m: Map[_, _] => | ||
m.asInstanceOf[Map[_, Object]].map { case (k, v) => k -> _convert(v) }.asJava | ||
case i: Iterable[_] => i.map(o => _convert(o.asInstanceOf[Object])).asJava | ||
case _ => a | ||
} | ||
} | ||
config.map { case (k, v) => k -> _convert(v) }.asJava | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering if we should introduce either a config AST to ensure what is passed is YAML compatible, or maybe use lightbend config
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The API can aslo take a yaml file location, e.g. classpath://foo.yaml
if we wanted to support that.
scio-managed/src/main/scala/com/spotify/scio/managed/ManagedIO.scala
Outdated
Show resolved
Hide resolved
scio-managed/src/main/scala/com/spotify/scio/iceberg/IcebergIO.scala
Outdated
Show resolved
Hide resolved
scio-managed/src/main/scala/com/spotify/scio/iceberg/IcebergIO.scala
Outdated
Show resolved
Hide resolved
Converted to draft until |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
overall looks good to me! the amount of testing+examples are impressive
scio-core/src/main/scala/com/spotify/scio/values/SCollection.scala
Outdated
Show resolved
Hide resolved
import org.apache.beam.sdk.values.{PCollectionRowTuple, Row} | ||
import scala.jdk.CollectionConverters._ | ||
|
||
final case class ManagedIO(ioName: String, config: Map[String, Object]) extends ScioIO[Row] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i guess once this is merged, we can add a ManagedTypedIO
to the 0.15.x branch? maybe let's file a ticket to track the Magnolify API work...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need to support managed very extensively, so I decided to just skip any typed variant for Managed
in favor of doing it downstream in our io-specific APIs
…cala Co-authored-by: Claire McGinty <clairem@spotify.com>
Adds support for Beam's managed transforms and for Iceberg, which is implemented as a managed transform.
Note this is on a snapshot of magnolify, will need a release before merge.