Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Broadcast variable support #414

Merged

Conversation

Niharikadutta
Copy link
Collaborator

@Niharikadutta Niharikadutta commented Feb 4, 2020

This PR addresses issue #253

Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks. They can be used, for example, to give every node a copy of a large input dataset in an efficient manner.
Through this PR, user will be able to create a broadcast variable that will be copied to and accessible from all the executors, and can be used within a UDF.

SparkContext sc = SparkContext.GetOrCreate(new SparkConf());
Broadcast broadcastVariable = sc.Broadcast([1,2,3,4,5]);

// The value of this broadcast variable can be accessed from any executor using the Value function:
broadcastVariable.Value();

Note: This PR does not support Encryption/Decryption of broadcast variables.

@Niharikadutta Niharikadutta reopened this Feb 4, 2020
@imback82 imback82 added the enhancement New feature or request label Feb 7, 2020
@Niharikadutta Niharikadutta changed the title [WIP] Broadcast variable support Broadcast variable support Feb 22, 2020
@imback82
Copy link
Contributor

Can you fix the test failures first?

src/csharp/Microsoft.Spark/Broadcast.cs Outdated Show resolved Hide resolved
src/csharp/Microsoft.Spark/Broadcast.cs Outdated Show resolved Hide resolved
src/csharp/Microsoft.Spark/Broadcast.cs Outdated Show resolved Hide resolved
src/csharp/Microsoft.Spark/Broadcast.cs Outdated Show resolved Hide resolved
src/csharp/Microsoft.Spark/Broadcast.cs Outdated Show resolved Hide resolved
src/csharp/Microsoft.Spark/Broadcast.cs Outdated Show resolved Hide resolved
src/csharp/Microsoft.Spark/Broadcast.cs Outdated Show resolved Hide resolved
src/csharp/Microsoft.Spark/SparkContext.cs Outdated Show resolved Hide resolved
@imback82
Copy link
Contributor

@elvaliuliuliu can you help with review as well?

src/csharp/Microsoft.Spark/Broadcast.cs Outdated Show resolved Hide resolved
src/csharp/Microsoft.Spark/Broadcast.cs Outdated Show resolved Hide resolved
src/csharp/Microsoft.Spark/Broadcast.cs Outdated Show resolved Hide resolved
src/csharp/Microsoft.Spark/Broadcast.cs Outdated Show resolved Hide resolved
src/csharp/Microsoft.Spark/SparkContext.cs Outdated Show resolved Hide resolved
/// </summary>
/// <param name="bid">Id of the Broadcast variable object to add</param>
/// <param name="value">Value of the Broadcast variable</param>
internal static void Add(long bid, object value) => s_registry.TryAdd(bid, value);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we at least log or do something if TryAdd returns false?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If TryAdd returns false, that means the key already exists in the registry. Should that warrant a log or exception?

src/csharp/Microsoft.Spark/Broadcast.cs Outdated Show resolved Hide resolved
src/csharp/Microsoft.Spark/Broadcast.cs Outdated Show resolved Hide resolved
src/csharp/Microsoft.Spark/Broadcast.cs Show resolved Hide resolved
Version version = SparkEnvironment.SparkVersion;

var javaSparkContext = (JvmObjectReference)((IJvmObjectReferenceProvider)sc).Reference
.Jvm.CallStaticJavaMethod(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking like the following:

    [Serializable]
    public sealed class Broadcast<T> : IJvmObjectReferenceProvider
    {
        [NonSerialized]
        private readonly string _path;
        [NonSerialized]
        private readonly JvmObjectReference _jvmObject;

        private readonly long _bid;

        internal Broadcast(SparkContext sc, T value)
        {
            _path = CreateTempFilePath(sc.GetConf());
            _jvmObject = CreateBroadcast(sc, value);
            _bid = (long)_jvmObject.Invoke("id");
        }

        JvmObjectReference IJvmObjectReferenceProvider.Reference => _jvmObject;

        public T Value()
        {
            return (T)BroadcastRegistry.Use(_bid);
        }

        public void Unpersist()
        {
            _jvmObject.Invoke("unpersist");
        }

        public void Unpersist(bool blocking)
        {
            _jvmObject.Invoke("unpersist", blocking);
        }

        public void Destroy()
        {
            _jvmObject.Invoke("destroy");
            File.Delete(_path);
        }

        /// <param name="context"></param>
        [OnSerialized]
        internal void OnSerialized(StreamingContext context)
        {
            JvmBroadcastRegistry.Add(_jvmObject);
        }

        private string CreateTempFilePath(SparkConf conf)
        {
            IJvmBridge jvm = ((IJvmObjectReferenceProvider)conf).Reference.Jvm;
            var localDir = (string)jvm.CallStaticJavaMethod(
                "org.apache.spark.util.Utils",
                "getLocalDir",
                conf);
            string dir = Path.Combine(localDir, "sparkdotnet");
            Directory.CreateDirectory(dir);
            return Path.Combine(dir, Path.GetRandomFileName());
        }

        private JvmObjectReference CreateBroadcast(SparkContext sc, T value)
        {
            IJvmBridge jvm = ((IJvmObjectReferenceProvider)sc).Reference.Jvm; 
            var javaSparkContext = (JvmObjectReference)jvm.CallStaticJavaMethod(
                "org.apache.spark.api.java.JavaSparkContext",
                "fromSparkContext",
                sc);

            Version version = SparkEnvironment.SparkVersion;
            return (version.Major, version.Minor) switch
            {
                (2, 3) when version.Build == 0 || version.Build == 1 =>
                    CreateBroadcast_V2_3_1_AndBelow(javaSparkContext, value),
                (2, 3) => CreateBroadcast_V2_3_2_AndAbove(javaSparkContext, sc, value),
                (2, 4) => CreateBroadcast_V2_3_2_AndAbove(javaSparkContext, sc, value),
                _ => throw new NotSupportedException($"Spark {version} not supported.")
            };
        }

        private JvmObjectReference CreateBroadcast_V2_3_1_AndBelow(
            JvmObjectReference javaSparkContext,
            object value)
        {
            WriteToFile(value);
            return (JvmObjectReference)javaSparkContext.Jvm.CallStaticJavaMethod(
                "org.apache.spark.api.python.PythonRDD",
                "readBroadcastFromFile",
                javaSparkContext,
                _path);
        }

        private JvmObjectReference CreateBroadcast_V2_3_2_AndAbove(
            JvmObjectReference javaSparkContext,
            SparkContext sc,
            object value)
        {
            bool encryptionEnabled = bool.Parse(
                sc.GetConf().Get("spark.io.encryption.enabled", "false"));

            if (encryptionEnabled)
            {
                throw new NotImplementedException("Broadcast encryption is not supported yet.");
            }
            else
            {
                WriteToFile(value);
            }

            var pythonBroadcast = (JvmObjectReference)javaSparkContext.Jvm.CallStaticJavaMethod(
                "org.apache.spark.api.python.PythonRDD",
                "setupBroadcast",
                _path);

            return (JvmObjectReference)javaSparkContext.Jvm.CallNonStaticJavaMethod(
                javaSparkContext,
                "broadcast",
                pythonBroadcast);
        }

        private void WriteToFile(object value)
        {
            using FileStream f = File.Create(_path);
            Dump(value, f);
        }

        private void Dump(object value, Stream stream)
        {
            var formatter = new BinaryFormatter();
            formatter.Serialize(stream, value);
        }
    }

src/csharp/Microsoft.Spark/SparkConf.cs Outdated Show resolved Hide resolved
src/csharp/Microsoft.Spark/Broadcast.cs Show resolved Hide resolved
Copy link
Contributor

@imback82 imback82 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have minor comments, but generally looks good to me.

src/csharp/Microsoft.Spark/Broadcast.cs Outdated Show resolved Hide resolved
src/csharp/Microsoft.Spark/SparkContext.cs Outdated Show resolved Hide resolved
/// executor only once.
/// </summary>
/// <typeparam name="T">Type of the variable being broadcast</typeparam>
/// <param name="value">Value/variable to be broadcast</param>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is Value/variable?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed the wording to 'Value of the broadcast variable'. Please let me know if that looks fine. Thanks!

src/csharp/Microsoft.Spark/SparkContext.cs Outdated Show resolved Hide resolved
src/csharp/Microsoft.Spark/Broadcast.cs Outdated Show resolved Hide resolved
Copy link
Contributor

@imback82 imback82 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks @Niharikadutta!

@Niharikadutta
Copy link
Collaborator Author

Thanks for reviewing @imback82 , @suhsteve , @AFFogarty @elvaliuliuliu ! Appreciate all the feedbacks and time!

@Niharikadutta Niharikadutta reopened this Apr 2, 2020
@imback82 imback82 merged commit ddafcfb into dotnet:master Apr 2, 2020
@Niharikadutta Niharikadutta deleted the nidutta/BroadcastVariableSupport branch April 2, 2020 23:10
@MikeRys MikeRys mentioned this pull request May 7, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants