-
Notifications
You must be signed in to change notification settings - Fork 319
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Expose DataStreamWriter.Foreach API #387
Conversation
src/csharp/Microsoft.Spark.E2ETest/IpcTests/Sql/Streaming/DataStreamWriterTests.cs
Show resolved
Hide resolved
src/csharp/Microsoft.Spark.E2ETest/IpcTests/Sql/Streaming/DataStreamWriterTests.cs
Outdated
Show resolved
Hide resolved
Co-Authored-By: Niharika Dutta <nidutta@microsoft.com>
src/csharp/Microsoft.Spark.E2ETest/IpcTests/Sql/Streaming/DataStreamWriterTests.cs
Outdated
Show resolved
Hide resolved
src/csharp/Microsoft.Spark.E2ETest/IpcTests/Sql/Streaming/DataStreamWriterTests.cs
Outdated
Show resolved
Hide resolved
src/csharp/Microsoft.Spark.E2ETest/IpcTests/Sql/Streaming/DataStreamWriterTests.cs
Outdated
Show resolved
Hide resolved
src/csharp/Microsoft.Spark.E2ETest/IpcTests/Sql/Streaming/DataStreamWriterTests.cs
Outdated
Show resolved
Hide resolved
// Mark the StreamWriter as ThreadStatic. If there are multiple Tasks | ||
// running this ForeachWriter, on the same Worker, and in parallel, then | ||
// it may be possible that they may use the same StreamWriter object. This | ||
// may cause an unintended side effect of a Task writing the output to a |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we just remove this side effect? You can use locks such that there is always only one StreamWriter
per file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also see this comment A single instance of this class is responsible of all the data generated by a single task
. Do we need to handle thread issue here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Originally I this a static field because the StreamWriter
was not marked as [Serializable]
. But since we are creating the StreamWriter
object in the open
call, marking the field as [NonSerialized]
addresses the issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also see this comment
A single instance of this class is responsible of all the data generated by a single task
. Do we need to handle thread issue here?
If all the fields of the ForeachWriter are non static then I don't think there will be issues, but if there are static fields then I think it'll be up to the user to determine how to handle them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have few questions / comments, but generally looks good to me.
src/csharp/Microsoft.Spark.E2ETest/IpcTests/Sql/Streaming/DataStreamWriterTests.cs
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
very minor comment / question.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks @suhsteve!
@elvaliuliuliu we need to update the breaking change section in release notes with this change. |
This PR exposes the
DataStreamWriter.Foreach
API.#208
Users can use this API by creating a
[Serializable]
class that implements the following interface:This user-defined class will be wrapped in a Wrapper that will call its respective methods according to the spark ForeachWriter lifecycle specifications. The lifecycle of the methods are as follows:
An example of a IForeachWriter can be something like the following: