Skip to content

Commit

Permalink
Support Spark 3.1.1 (#836)
Browse files Browse the repository at this point in the history
  • Loading branch information
suhsteve authored Mar 17, 2021
1 parent 04b9a33 commit dbad951
Show file tree
Hide file tree
Showing 11 changed files with 92 additions and 14 deletions.
58 changes: 58 additions & 0 deletions azure-pipelines.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,54 @@ variables:
forwardCompatibleTestOptions_Windows_3_0_2: "--filter FullyQualifiedName=NONE"
forwardCompatibleTestOptions_Linux_3_0_2: $(forwardCompatibleTestOptions_Windows_3_0_2)

# Skip backwardCompatible tests because Microsoft.Spark.Worker requires Spark 3.1 support in
# CommandProcessor.cs and TaskContextProcessor.cs. Support added in https://github.com/dotnet/spark/pull/836
backwardCompatibleTestOptions_Windows_3_1: "--filter \
(FullyQualifiedName!=Microsoft.Spark.E2ETest.IpcTests.DataFrameTests.TestDataFrameGroupedMapUdf)&\
(FullyQualifiedName!=Microsoft.Spark.E2ETest.IpcTests.DataFrameTests.TestGroupedMapUdf&\
(FullyQualifiedName!=Microsoft.Spark.E2ETest.UdfTests.UdfComplexTypesTests.TestUdfRegistrationWithReturnAsRowType)&\
(FullyQualifiedName!=Microsoft.Spark.E2ETest.UdfTests.UdfComplexTypesTests.TestUdfWithArrayChain)&\
(FullyQualifiedName!=Microsoft.Spark.E2ETest.UdfTests.UdfComplexTypesTests.TestUdfWithSimpleArrayType)&\
(FullyQualifiedName!=Microsoft.Spark.E2ETest.UdfTests.UdfComplexTypesTests.TestUdfWithMapType)&\
(FullyQualifiedName!=Microsoft.Spark.E2ETest.UdfTests.UdfComplexTypesTests.TestUdfWithRowArrayType)&\
(FullyQualifiedName!=Microsoft.Spark.E2ETest.UdfTests.UdfComplexTypesTests.TestUdfWithReturnAsMapType)&\
(FullyQualifiedName!=Microsoft.Spark.E2ETest.UdfTests.UdfComplexTypesTests.TestUdfWithReturnAsArrayOfArrayType)&\
(FullyQualifiedName!=Microsoft.Spark.E2ETest.UdfTests.UdfComplexTypesTests.TestUdfWithArrayOfArrayType)&\
(FullyQualifiedName!=Microsoft.Spark.E2ETest.UdfTests.UdfComplexTypesTests.TestUdfWithMapOfMapType)&\
(FullyQualifiedName!=Microsoft.Spark.E2ETest.UdfTests.UdfComplexTypesTests.TestUdfWithReturnAsSimpleArrayType)&\
(FullyQualifiedName!=Microsoft.Spark.E2ETest.UdfTests.UdfComplexTypesTests.TestUdfWithRowType)&\
(FullyQualifiedName!=Microsoft.Spark.E2ETest.UdfTests.UdfComplexTypesTests.TestUdfWithReturnAsRowType)&\
(FullyQualifiedName!=Microsoft.Spark.E2ETest.UdfTests.UdfSerDeTests.TestExternalStaticMethodCall)&\
(FullyQualifiedName!=Microsoft.Spark.E2ETest.UdfTests.UdfSerDeTests.TestInitExternalClassInUdf)&\
(FullyQualifiedName!=Microsoft.Spark.E2ETest.UdfTests.UdfSerDeTests.TestUdfClosure)&\
(FullyQualifiedName!=Microsoft.Spark.E2ETest.UdfTests.UdfSimpleTypesTests.TestUdfWithReturnAsDateType)&\
(FullyQualifiedName!=Microsoft.Spark.E2ETest.UdfTests.UdfSimpleTypesTests.TestUdfWithReturnAsTimestampType)&\
(FullyQualifiedName!=Microsoft.Spark.E2ETest.UdfTests.UdfSimpleTypesTests.TestUdfWithDateType)&\
(FullyQualifiedName!=Microsoft.Spark.E2ETest.UdfTests.UdfSimpleTypesTests.TestUdfWithTimestampType)&\
(FullyQualifiedName!=Microsoft.Spark.E2ETest.IpcTests.BroadcastTests.TestMultipleBroadcast)&\
(FullyQualifiedName!=Microsoft.Spark.E2ETest.IpcTests.BroadcastTests.TestUnpersist)&\
(FullyQualifiedName!=Microsoft.Spark.E2ETest.IpcTests.BroadcastTests.TestDestroy)&\
(FullyQualifiedName!=Microsoft.Spark.E2ETest.IpcTests.PairRDDFunctionsTests.TestCollect)&\
(FullyQualifiedName!=Microsoft.Spark.E2ETest.IpcTests.RDDTests.TestPipelinedRDD)&\
(FullyQualifiedName!=Microsoft.Spark.E2ETest.IpcTests.RDDTests.TestMap)&\
(FullyQualifiedName!=Microsoft.Spark.E2ETest.IpcTests.RDDTests.TestFlatMap)&\
(FullyQualifiedName!=Microsoft.Spark.E2ETest.IpcTests.RDDTests.TestMapPartitions)&\
(FullyQualifiedName!=Microsoft.Spark.E2ETest.IpcTests.RDDTests.TestMapPartitionsWithIndex)&\
(FullyQualifiedName!=Microsoft.Spark.E2ETest.IpcTests.RDDTests.TestTextFile)&\
(FullyQualifiedName!=Microsoft.Spark.E2ETest.IpcTests.RDDTests.TestFilter)&\
(FullyQualifiedName!=Microsoft.Spark.E2ETest.IpcTests.DataFrameTests.TestDataFrameVectorUdf)&\
(FullyQualifiedName!=Microsoft.Spark.E2ETest.IpcTests.DataFrameTests.TestVectorUdf)&\
(FullyQualifiedName!=Microsoft.Spark.E2ETest.IpcTests.DataFrameTests.TestWithColumn)&\
(FullyQualifiedName!=Microsoft.Spark.E2ETest.IpcTests.DataFrameTests.TestUDF)&\
(FullyQualifiedName!=Microsoft.Spark.E2ETest.IpcTests.SparkSessionExtensionsTests.TestVersion)&\
(FullyQualifiedName!=Microsoft.Spark.E2ETest.IpcTests.DataStreamWriterTests.TestForeachBatch)&\
(FullyQualifiedName!=Microsoft.Spark.E2ETest.IpcTests.DataStreamWriterTests.TestForeach)"
# Skip all forwardCompatible tests since microsoft-spark-3-1 jar does not get built when
# building forwardCompatible repo.
forwardCompatibleTestOptions_Windows_3_1: "--filter FullyQualifiedName=NONE"
backwardCompatibleTestOptions_Linux_3_1: $(backwardCompatibleTestOptions_Windows_3_1)
forwardCompatibleTestOptions_Linux_3_1: $(forwardCompatibleTestOptions_Windows_3_1)

# Azure DevOps variables are transformed into environment variables, with these variables we
# avoid the first time experience and telemetry to speed up the build.
DOTNET_CLI_TELEMETRY_OPTOUT: 1
Expand Down Expand Up @@ -361,3 +409,13 @@ stages:
testOptions: ""
backwardCompatibleTestOptions: $(backwardCompatibleTestOptions_Linux_3_0)
forwardCompatibleTestOptions: $(forwardCompatibleTestOptions_Linux_3_0_2)
- version: '3.1.1'
jobOptions:
- pool: 'Hosted VS2017'
testOptions: ""
backwardCompatibleTestOptions: $(backwardCompatibleTestOptions_Windows_3_1)
forwardCompatibleTestOptions: $(backwardCompatibleTestOptions_Windows_3_1)
- pool: 'Hosted Ubuntu 1604'
testOptions: ""
backwardCompatibleTestOptions: $(backwardCompatibleTestOptions_Linux_3_1)
forwardCompatibleTestOptions: $(forwardCompatibleTestOptions_Linux_3_1)
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,11 @@ public DeltaTableTests(DeltaFixture fixture)
/// Run the end-to-end scenario from the Delta Quickstart tutorial.
/// </summary>
/// <see cref="https://docs.delta.io/latest/quick-start.html"/>
[SkipIfSparkVersionIsLessThan(Versions.V2_4_2)]
///
/// Delta 0.8.0 is not compatible with Spark 3.1.1
/// Disable Delta tests that have code paths that create an
/// `org.apache.spark.sql.catalyst.expressions.Alias` object.
[SkipIfSparkVersionIsNotInRange(Versions.V2_4_2, Versions.V3_1_1)]
public void TestTutorialScenario()
{
using var tempDirectory = new TemporaryDirectory();
Expand Down Expand Up @@ -223,7 +227,11 @@ void testWrapper(
/// <summary>
/// Test that methods return the expected signature.
/// </summary>
[SkipIfSparkVersionIsLessThan(Versions.V2_4_2)]
///
/// Delta 0.8.0 is not compatible with Spark 3.1.1
/// Disable Delta tests that have code paths that create an
/// `org.apache.spark.sql.catalyst.expressions.Alias` object.
[SkipIfSparkVersionIsNotInRange(Versions.V2_4_2, Versions.V3_1_1)]
public void TestSignaturesV2_4_X()
{
using var tempDirectory = new TemporaryDirectory();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,7 @@ public void TestSignaturesV2_3_X()

// TODO: Test dfw.Jdbc without running a local db.

dfw.Option("path", tempDir.Path).SaveAsTable("TestTable");

dfw.InsertInto("TestTable");

dfw.Option("path", $"{tempDir.Path}TestSavePath1").Save();
dfw.Save($"{tempDir.Path}TestSavePath2");
dfw.Save($"{tempDir.Path}TestSavePath1");

dfw.Json($"{tempDir.Path}TestJsonPath");

Expand All @@ -85,6 +80,16 @@ public void TestSignaturesV2_3_X()
dfw.Text($"{tempDir.Path}TestTextPath");

dfw.Csv($"{tempDir.Path}TestCsvPath");

dfw.Option("path", tempDir.Path).SaveAsTable("TestTable");

dfw.InsertInto("TestTable");

// In Spark 3.1.1+ setting the `path` Option and then calling .Save(path) is not
// supported unless `spark.sql.legacy.pathOptionBehavior.enabled` conf is set.
// .Json(path), .Parquet(path), etc follow the same code path so the conf
// needs to be set in these scenarios as well.
dfw.Option("path", $"{tempDir.Path}TestSavePath2").Save();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ public void TestSignaturesV2_3_X()
}));

string jsonFilePath = Path.Combine(TestEnvironment.ResourceDirectory, "people.json");
Assert.IsType<DataFrame>(dsr.Format("json").Option("path", jsonFilePath).Load());
Assert.IsType<DataFrame>(dsr.Format("json").Load(jsonFilePath));
Assert.IsType<DataFrame>(dsr.Json(jsonFilePath));
Assert.IsType<DataFrame>(
Expand All @@ -63,6 +62,12 @@ public void TestSignaturesV2_3_X()
dsr.Parquet(Path.Combine(TestEnvironment.ResourceDirectory, "users.parquet")));
Assert.IsType<DataFrame>
(dsr.Text(Path.Combine(TestEnvironment.ResourceDirectory, "people.txt")));

// In Spark 3.1.1+ setting the `path` Option and then calling .Load(path) is not
// supported unless `spark.sql.legacy.pathOptionBehavior.enabled` conf is set.
// .Json(path), .Parquet(path), etc follow the same code path so the conf
// needs to be set in these scenarios as well.
Assert.IsType<DataFrame>(dsr.Format("json").Option("path", jsonFilePath).Load());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ private static SqlCommand[] ReadSqlCommands(
{
(2, 3) => SqlCommandProcessorV2_3_X.Process(evalType, stream),
(2, 4) => SqlCommandProcessorV2_4_X.Process(evalType, stream),
(3, 0) => SqlCommandProcessorV2_4_X.Process(evalType, stream),
(3, _) => SqlCommandProcessorV2_4_X.Process(evalType, stream),
_ => throw new NotSupportedException($"Spark {version} not supported.")
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ internal TaskContext Process(Stream stream)
{
(2, 3) => TaskContextProcessorV2_3_X.Process(stream),
(2, 4) => TaskContextProcessorV2_4_X.Process(stream),
(3, 0) => TaskContextProcessorV3_0_X.Process(stream),
(3, _) => TaskContextProcessorV3_0_X.Process(stream),
_ => throw new NotSupportedException($"Spark {_version} not supported.")
};
}
Expand Down
2 changes: 1 addition & 1 deletion src/csharp/Microsoft.Spark/Broadcast.cs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ private JvmObjectReference CreateBroadcast(SparkContext sc, T value)
CreateBroadcast_V2_3_1_AndBelow(javaSparkContext, value),
(2, 3) => CreateBroadcast_V2_3_2_AndAbove(javaSparkContext, sc, value),
(2, 4) => CreateBroadcast_V2_3_2_AndAbove(javaSparkContext, sc, value),
(3, 0) => CreateBroadcast_V2_3_2_AndAbove(javaSparkContext, sc, value),
(3, _) => CreateBroadcast_V2_3_2_AndAbove(javaSparkContext, sc, value),
_ => throw new NotSupportedException($"Spark {version} not supported.")
};
}
Expand Down
2 changes: 1 addition & 1 deletion src/csharp/Microsoft.Spark/Sql/DataFrame.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1057,7 +1057,7 @@ private IEnumerable<Row> GetRows(string funcName, params object[] args)
// string to use for the authentication.
(2, 3, _) => ParseConnectionInfo(result, false),
(2, 4, _) => ParseConnectionInfo(result, false),
(3, 0, _) => ParseConnectionInfo(result, false),
(3, _, _) => ParseConnectionInfo(result, false),
_ => throw new NotSupportedException($"Spark {version} not supported.")
};
}
Expand Down
1 change: 1 addition & 0 deletions src/csharp/Microsoft.Spark/Versions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,6 @@ internal static class Versions
internal const string V2_4_0 = "2.4.0";
internal const string V2_4_2 = "2.4.2";
internal const string V3_0_0 = "3.0.0";
internal const string V3_1_1 = "3.1.1";
}
}
2 changes: 1 addition & 1 deletion src/scala/microsoft-spark-3-0/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
<inceptionYear>2019</inceptionYear>
<properties>
<encoding>UTF-8</encoding>
<scala.version>2.12.8</scala.version>
<scala.version>2.12.10</scala.version>
<scala.binary.version>2.12</scala.binary.version>
<spark.version>3.0.0</spark.version>
</properties>
Expand Down
1 change: 1 addition & 0 deletions src/scala/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
<module>microsoft-spark-2-3</module>
<module>microsoft-spark-2-4</module>
<module>microsoft-spark-3-0</module>
<module>microsoft-spark-3-1</module>
</modules>

<pluginRepositories>
Expand Down

0 comments on commit dbad951

Please sign in to comment.