-
Notifications
You must be signed in to change notification settings - Fork 244
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[query] Stage (matrix)table writes locally #12773
Conversation
cb += os1.invoke[Unit]("close") | ||
cb += os2.invoke[Unit]("close") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All OutputBuffer
implementations seem to close their underlying output stream so there's no need to maintain a reference to them here
val filename1 = mb.newLocal[String]("filename1") | ||
val os1 = mb.newLocal[ByteTrackingOutputStream]("write_os1") | ||
val ob1 = mb.newLocal[OutputBuffer]("write_ob1") | ||
val filename2 = mb.newLocal[String]("filename2") | ||
val os2 = mb.newLocal[ByteTrackingOutputStream]("write_os2") | ||
val ob2 = mb.newLocal[OutputBuffer]("write_ob2") | ||
val n = mb.newLocal[Long]("partition_count") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code duplication was crying out for parameterisation. Since I'm still using locals, there shouldn't be too much difference in the generated code
val iAnnotationType = PCanonicalStruct(required = true, "entries_offset" -> PInt64()) | ||
val mb = cb.emb | ||
|
||
val indexWriter = ifIndexed { StagedIndexWriter.withDefaults(index.get._2, mb.ecb, annotationType = iAnnotationType, | ||
branchingFactor = Option(mb.ctx.getFlag("index_branching_factor")).map(_.toInt).getOrElse(4096)) } | ||
val writeIndexInfo = index.map { case (name, ktype) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm allergic to null
as have been bitten by them too many times. Keeping things within the Option
allows us to avoid things like null refs and ifIndexed
buffers.foreach { buff => | ||
cb += buff.writeByte(0.asInstanceOf[Byte]) | ||
cb += buff.flush() | ||
cb += buff.close() | ||
} | ||
|
||
cb += ob1.writeByte(0.asInstanceOf[Byte]) | ||
cb += ob2.writeByte(0.asInstanceOf[Byte]) | ||
if (hasIndex) | ||
indexWriter.close(cb) | ||
cb += ob1.flush() | ||
cb += ob2.flush() | ||
cb += os1.invoke[Unit]("close") | ||
cb += os2.invoke[Unit]("close") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a slight difference in ordering with respect to the index writer. I didn't think that mattered as the the output buffers and index writer seem to be quite separate.
) { | ||
private[this] val ctx = _ctx.asString |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private[this]
was unnecessary. sorry for the noise
@@ -654,7 +671,7 @@ case class TableNativeFanoutWriter( | |||
keyFields, | |||
s"$targetPath/rows/parts/", | |||
Some(s"$targetPath/index/" -> keyPType), | |||
if (stageLocally) Some(ctx.localTmpdir) else None | |||
if (stageLocally) Some(Path.of(ctx.localTmpdir, s"hail_staging_tmp_${UUID.randomUUID()}", "rows", "parts")) else None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've noticed we defer to String
s for files and paths and so on. I have been bitten by this too, hence prefer to use stronger types to encode at the type level that this is indeed a path, not a generic string. Let me know your thoughts, realise this goes against the grain somewhat but if you like it perhaps we can adopt this stuff more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Honestly, I prefer this. We need to take some amount of care for the semantics of cloud storage object names vs filesystem paths, but that is again a way that types come in handy. Thanks for this.
This comment was marked as resolved.
This comment was marked as resolved.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great to finally have this!
@@ -654,7 +671,7 @@ case class TableNativeFanoutWriter( | |||
keyFields, | |||
s"$targetPath/rows/parts/", | |||
Some(s"$targetPath/index/" -> keyPType), | |||
if (stageLocally) Some(ctx.localTmpdir) else None | |||
if (stageLocally) Some(Path.of(ctx.localTmpdir, s"hail_staging_tmp_${UUID.randomUUID()}", "rows", "parts")) else None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Honestly, I prefer this. We need to take some amount of care for the semantics of cloud storage object names vs filesystem paths, but that is again a way that types come in handy. Thanks for this.
No description provided.