Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose DataStreamWriter.Foreach API #387

Merged
merged 28 commits into from
Jan 30, 2020
Merged

Conversation

suhsteve
Copy link
Member

@suhsteve suhsteve commented Jan 8, 2020

This PR exposes the DataStreamWriter.Foreach API.

#208

Users can use this API by creating a [Serializable] class that implements the following interface:

    public interface IForeachWriter
    {
        bool Open(long partitionId, long epochId);

        void Process(Row row);

        void Close(Exception errorOrNull);
    }

This user-defined class will be wrapped in a Wrapper that will call its respective methods according to the spark ForeachWriter lifecycle specifications. The lifecycle of the methods are as follows:

For each partition with partitionId:
    For each batch/epoch of streaming data(if its streaming query) with epochId:
        Method Open(partitionId, epochId) is called.
        If Open returns true:
            For each row in the partition and batch/epoch, method Process(row) is called.
        Method Close(errorOrNull) is called with error(if any) seen while processing rows.

An example of a IForeachWriter can be something like the following:

        [Serializable]
        private class TestForeachWriter : IForeachWriter
        {
            [NonSerialized]
            private StreamWriter _streamWriter;

            private readonly string _writePath;

            public TestForeachWriter(string writePath)
            {
                _writePath = writePath;
            }

            public void Close(Exception errorOrNull)
            {
                _streamWriter?.Dispose();
            }

            public bool Open(long partitionId, long epochId)
            {
                try
                {
                    _streamWriter= new StreamWriter(
                        Path.Combine(
                            _writePath,
                            $"sink-foreachWriter-{Guid.NewGuid()}.csv"));
                    return true;
                }
                catch
                {
                    return false;
                }
            }

            public void Process(Row value)
            {
                _streamWriter.WriteLine(string.Join(",", value.Values));
            }

@suhsteve suhsteve changed the title [WIP] Expose DataStreamWriter.Foreach API Expose DataStreamWriter.Foreach API Jan 9, 2020
suhsteve and others added 2 commits January 22, 2020 22:30
// Mark the StreamWriter as ThreadStatic. If there are multiple Tasks
// running this ForeachWriter, on the same Worker, and in parallel, then
// it may be possible that they may use the same StreamWriter object. This
// may cause an unintended side effect of a Task writing the output to a
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just remove this side effect? You can use locks such that there is always only one StreamWriter per file.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also see this comment A single instance of this class is responsible of all the data generated by a single task. Do we need to handle thread issue here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Originally I this a static field because the StreamWriter was not marked as [Serializable]. But since we are creating the StreamWriter object in the open call, marking the field as [NonSerialized] addresses the issue.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also see this comment A single instance of this class is responsible of all the data generated by a single task. Do we need to handle thread issue here?

If all the fields of the ForeachWriter are non static then I don't think there will be issues, but if there are static fields then I think it'll be up to the user to determine how to handle them.

src/csharp/Microsoft.Spark/Sql/ForeachWriter.cs Outdated Show resolved Hide resolved
src/csharp/Microsoft.Spark/TaskContext.cs Outdated Show resolved Hide resolved
src/csharp/Microsoft.Spark/TaskContext.cs Show resolved Hide resolved
src/csharp/Microsoft.Spark/TaskContext.cs Show resolved Hide resolved
@imback82 imback82 added the enhancement New feature or request label Jan 24, 2020
Copy link
Contributor

@imback82 imback82 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have few questions / comments, but generally looks good to me.

src/csharp/Microsoft.Spark/Sql/ForeachWriter.cs Outdated Show resolved Hide resolved
src/csharp/Microsoft.Spark/Sql/ForeachWriter.cs Outdated Show resolved Hide resolved
src/csharp/Microsoft.Spark/TaskContext.cs Show resolved Hide resolved
src/csharp/Microsoft.Spark/RDD/Collector.cs Outdated Show resolved Hide resolved
Copy link
Contributor

@imback82 imback82 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

very minor comment / question.

Copy link
Contributor

@imback82 imback82 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks @suhsteve!

@imback82
Copy link
Contributor

@elvaliuliuliu we need to update the breaking change section in release notes with this change.

@imback82 imback82 merged commit 5f53809 into dotnet:master Jan 30, 2020
@suhsteve suhsteve deleted the foreach branch September 6, 2020 05:52
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants