-
Notifications
You must be signed in to change notification settings - Fork 321
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Broadcast variable support #414
Broadcast variable support #414
Conversation
Can you fix the test failures first? |
@elvaliuliuliu can you help with review as well? |
src/scala/microsoft-spark-2.4.x/src/main/scala/org/apache/spark/sql/api/dotnet/SQLUtils.scala
Show resolved
Hide resolved
…adutta/spark into nidutta/BroadcastVariableSupport
/// </summary> | ||
/// <param name="bid">Id of the Broadcast variable object to add</param> | ||
/// <param name="value">Value of the Broadcast variable</param> | ||
internal static void Add(long bid, object value) => s_registry.TryAdd(bid, value); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we at least log or do something if TryAdd
returns false?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If TryAdd
returns false, that means the key already exists in the registry. Should that warrant a log or exception?
Version version = SparkEnvironment.SparkVersion; | ||
|
||
var javaSparkContext = (JvmObjectReference)((IJvmObjectReferenceProvider)sc).Reference | ||
.Jvm.CallStaticJavaMethod( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking like the following:
[Serializable]
public sealed class Broadcast<T> : IJvmObjectReferenceProvider
{
[NonSerialized]
private readonly string _path;
[NonSerialized]
private readonly JvmObjectReference _jvmObject;
private readonly long _bid;
internal Broadcast(SparkContext sc, T value)
{
_path = CreateTempFilePath(sc.GetConf());
_jvmObject = CreateBroadcast(sc, value);
_bid = (long)_jvmObject.Invoke("id");
}
JvmObjectReference IJvmObjectReferenceProvider.Reference => _jvmObject;
public T Value()
{
return (T)BroadcastRegistry.Use(_bid);
}
public void Unpersist()
{
_jvmObject.Invoke("unpersist");
}
public void Unpersist(bool blocking)
{
_jvmObject.Invoke("unpersist", blocking);
}
public void Destroy()
{
_jvmObject.Invoke("destroy");
File.Delete(_path);
}
/// <param name="context"></param>
[OnSerialized]
internal void OnSerialized(StreamingContext context)
{
JvmBroadcastRegistry.Add(_jvmObject);
}
private string CreateTempFilePath(SparkConf conf)
{
IJvmBridge jvm = ((IJvmObjectReferenceProvider)conf).Reference.Jvm;
var localDir = (string)jvm.CallStaticJavaMethod(
"org.apache.spark.util.Utils",
"getLocalDir",
conf);
string dir = Path.Combine(localDir, "sparkdotnet");
Directory.CreateDirectory(dir);
return Path.Combine(dir, Path.GetRandomFileName());
}
private JvmObjectReference CreateBroadcast(SparkContext sc, T value)
{
IJvmBridge jvm = ((IJvmObjectReferenceProvider)sc).Reference.Jvm;
var javaSparkContext = (JvmObjectReference)jvm.CallStaticJavaMethod(
"org.apache.spark.api.java.JavaSparkContext",
"fromSparkContext",
sc);
Version version = SparkEnvironment.SparkVersion;
return (version.Major, version.Minor) switch
{
(2, 3) when version.Build == 0 || version.Build == 1 =>
CreateBroadcast_V2_3_1_AndBelow(javaSparkContext, value),
(2, 3) => CreateBroadcast_V2_3_2_AndAbove(javaSparkContext, sc, value),
(2, 4) => CreateBroadcast_V2_3_2_AndAbove(javaSparkContext, sc, value),
_ => throw new NotSupportedException($"Spark {version} not supported.")
};
}
private JvmObjectReference CreateBroadcast_V2_3_1_AndBelow(
JvmObjectReference javaSparkContext,
object value)
{
WriteToFile(value);
return (JvmObjectReference)javaSparkContext.Jvm.CallStaticJavaMethod(
"org.apache.spark.api.python.PythonRDD",
"readBroadcastFromFile",
javaSparkContext,
_path);
}
private JvmObjectReference CreateBroadcast_V2_3_2_AndAbove(
JvmObjectReference javaSparkContext,
SparkContext sc,
object value)
{
bool encryptionEnabled = bool.Parse(
sc.GetConf().Get("spark.io.encryption.enabled", "false"));
if (encryptionEnabled)
{
throw new NotImplementedException("Broadcast encryption is not supported yet.");
}
else
{
WriteToFile(value);
}
var pythonBroadcast = (JvmObjectReference)javaSparkContext.Jvm.CallStaticJavaMethod(
"org.apache.spark.api.python.PythonRDD",
"setupBroadcast",
_path);
return (JvmObjectReference)javaSparkContext.Jvm.CallNonStaticJavaMethod(
javaSparkContext,
"broadcast",
pythonBroadcast);
}
private void WriteToFile(object value)
{
using FileStream f = File.Create(_path);
Dump(value, f);
}
private void Dump(object value, Stream stream)
{
var formatter = new BinaryFormatter();
formatter.Serialize(stream, value);
}
}
…) to get value of encryption.enabled spark config
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have minor comments, but generally looks good to me.
/// executor only once. | ||
/// </summary> | ||
/// <typeparam name="T">Type of the variable being broadcast</typeparam> | ||
/// <param name="value">Value/variable to be broadcast</param> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is Value/variable
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed the wording to 'Value of the broadcast variable'. Please let me know if that looks fine. Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks @Niharikadutta!
Thanks for reviewing @imback82 , @suhsteve , @AFFogarty @elvaliuliuliu ! Appreciate all the feedbacks and time! |
This PR addresses issue #253
Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks. They can be used, for example, to give every node a copy of a large input dataset in an efficient manner.
Through this PR, user will be able to create a broadcast variable that will be copied to and accessible from all the executors, and can be used within a UDF.
Note: This PR does not support Encryption/Decryption of broadcast variables.