Skip to content

Commit

Permalink
DataFrameWriter Option fails (#59)
Browse files Browse the repository at this point in the history
  • Loading branch information
suhsteve authored and imback82 committed Apr 28, 2019
1 parent 8f33a16 commit f67140a
Show file tree
Hide file tree
Showing 12 changed files with 269 additions and 34 deletions.
2 changes: 1 addition & 1 deletion src/csharp/Microsoft.Spark.E2ETest/IpcTests/RDDTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public void TestParallelize()
[Fact]
public void TestTextFile()
{
RDD<string> rdd = _sc.TextFile(TestEnvironment.ResourceDirectory + "people.txt");
RDD<string> rdd = _sc.TextFile($"{TestEnvironment.ResourceDirectory}people.txt");
var strs = new[] { "Michael, 29", "Andy, 30", "Justin, 19" };
Assert.Equal(strs, rdd.Collect());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using Microsoft.Spark.E2ETest.Utils;
using Xunit;

namespace Microsoft.Spark.E2ETest.IpcTests
Expand Down Expand Up @@ -30,11 +31,14 @@ public void TestSignaturesV2_3_X()

sc.ClearJobGroup();

string filePath = TestEnvironment.ResourceDirectory + "people.txt";
string filePath = $"{TestEnvironment.ResourceDirectory}people.txt";
sc.AddFile(filePath);
sc.AddFile(filePath, true);

sc.SetCheckpointDir(TestEnvironment.ResourceDirectory);
using (var tempDir = new TemporaryDirectory())
{
sc.SetCheckpointDir(TestEnvironment.ResourceDirectory);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public DataFrameFunctionsTests(SparkFixture fixture)
_spark = fixture.Spark;
_df = _spark
.Read()
.Json(TestEnvironment.ResourceDirectory + "people.json");
.Json($"{TestEnvironment.ResourceDirectory}people.json");
}

[Fact]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using System.Collections.Generic;
using Microsoft.Spark.Sql;
using Xunit;

namespace Microsoft.Spark.E2ETest.IpcTests
{
[Collection("Spark E2E Tests")]
public class DataFrameReaderTests
{
private readonly SparkSession _spark;

public DataFrameReaderTests(SparkFixture fixture)
{
_spark = fixture.Spark;
}

/// <summary>
/// Test signatures for APIs up to Spark 2.3.*.
/// </summary>
[Fact]
public void TestSignaturesV2_3_X()
{
DataFrameReader dfr = _spark.Read();

Assert.IsType<DataFrameReader>(dfr.Format("json"));

Assert.IsType<DataFrameReader>(dfr.Schema("age INT, name STRING"));

Assert.IsType<DataFrameReader>(dfr.Option("stringOption", "value"));
Assert.IsType<DataFrameReader>(dfr.Option("boolOption", true));
Assert.IsType<DataFrameReader>(dfr.Option("longOption", 1L));
Assert.IsType<DataFrameReader>(dfr.Option("doubleOption", 3D));

Assert.IsType<DataFrameReader>(
dfr.Options(
new Dictionary<string, string>
{
{ "option1", "value1" },
{ "option2", "value2" }
}));

string jsonFile = $"{TestEnvironment.ResourceDirectory}people.json";
Assert.IsType<DataFrame>(dfr.Load(jsonFile));
Assert.IsType<DataFrame>(dfr.Load(jsonFile, jsonFile));

Assert.IsType<DataFrame>(dfr.Json(jsonFile));
Assert.IsType<DataFrame>(dfr.Json(jsonFile, jsonFile));

string csvFile = $"{TestEnvironment.ResourceDirectory}people.csv";
Assert.IsType<DataFrame>(dfr.Csv(csvFile));
Assert.IsType<DataFrame>(dfr.Csv(csvFile, csvFile));

string parquetFile = $"{TestEnvironment.ResourceDirectory}users.parquet";
Assert.IsType<DataFrame>(dfr.Parquet(parquetFile));
Assert.IsType<DataFrame>(dfr.Parquet(parquetFile, parquetFile));

string orcFile = $"{TestEnvironment.ResourceDirectory}users.orc";
Assert.IsType<DataFrame>(dfr.Orc(orcFile));
Assert.IsType<DataFrame>(dfr.Orc(orcFile, orcFile));

dfr = _spark.Read();
string textFile = $"{TestEnvironment.ResourceDirectory}people.txt";
Assert.IsType<DataFrame>(dfr.Text(textFile));
Assert.IsType<DataFrame>(dfr.Text(textFile, textFile));
}
}
}
21 changes: 12 additions & 9 deletions src/csharp/Microsoft.Spark.E2ETest/IpcTests/Sql/DataFrameTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public DataFrameTests(SparkFixture fixture)
_df = _spark
.Read()
.Schema("age INT, name STRING")
.Json(TestEnvironment.ResourceDirectory + "people.json");
.Json($"{TestEnvironment.ResourceDirectory}people.json");
}

[Fact]
Expand Down Expand Up @@ -135,14 +135,17 @@ public void TestSignaturesV2_3_X()

_df.IsStreaming();

// The following is required for *CheckPoint().
_spark.SparkContext.SetCheckpointDir(TestEnvironment.ResourceDirectory);

_df.Checkpoint();
_df.Checkpoint(false);

_df.LocalCheckpoint();
_df.LocalCheckpoint(false);
using (var tempDir = new TemporaryDirectory())
{
// The following is required for *CheckPoint().
_spark.SparkContext.SetCheckpointDir(tempDir.Path);

_df.Checkpoint();
_df.Checkpoint(false);

_df.LocalCheckpoint();
_df.LocalCheckpoint(false);
}

_df.WithWatermark("time", "10 minutes");

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using System.Collections.Generic;
using Microsoft.Spark.E2ETest.Utils;
using Microsoft.Spark.Sql;
using Xunit;

namespace Microsoft.Spark.E2ETest.IpcTests
{
[Collection("Spark E2E Tests")]
public class DataFrameWriterTests
{
private readonly SparkSession _spark;

public DataFrameWriterTests(SparkFixture fixture)
{
_spark = fixture.Spark;
}

/// <summary>
/// Test signatures for APIs up to Spark 2.3.*.
/// </summary>
[Fact]
public void TestSignaturesV2_3_X()
{
{
DataFrameWriter dfw = _spark
.Read()
.Schema("age INT, name STRING")
.Json($"{TestEnvironment.ResourceDirectory}people.json")
.Write();

Assert.IsType<DataFrameWriter>(dfw.Mode(SaveMode.Ignore));

Assert.IsType<DataFrameWriter>(dfw.Mode("overwrite"));

Assert.IsType<DataFrameWriter>(dfw.Format("json"));

Assert.IsType<DataFrameWriter>(dfw.Option("stringOption", "value"));
Assert.IsType<DataFrameWriter>(dfw.Option("boolOption", true));
Assert.IsType<DataFrameWriter>(dfw.Option("longOption", 1L));
Assert.IsType<DataFrameWriter>(dfw.Option("doubleOption", 3D));

Assert.IsType<DataFrameWriter>(
dfw.Options(
new Dictionary<string, string>
{
{ "option1", "value1" },
{ "option2", "value2" }
}));

Assert.IsType<DataFrameWriter>(dfw.PartitionBy("age"));
Assert.IsType<DataFrameWriter>(dfw.PartitionBy("age", "name"));

Assert.IsType<DataFrameWriter>(dfw.BucketBy(3, "age"));
Assert.IsType<DataFrameWriter>(dfw.BucketBy(3, "age", "name"));

Assert.IsType<DataFrameWriter>(dfw.SortBy("name"));
}

using (var tempDir = new TemporaryDirectory())
{
DataFrameWriter dfw = _spark
.Read()
.Csv($"{TestEnvironment.ResourceDirectory}people.csv")
.Write();

// TODO: Test dfw.Jdbc without running a local db.

dfw.Option("path", tempDir.Path).SaveAsTable("TestTable");

dfw.InsertInto("TestTable");

dfw.Option("path", $"{tempDir.Path}TestSavePath1").Save();
dfw.Save($"{tempDir.Path}TestSavePath2");

dfw.Json($"{tempDir.Path}TestJsonPath");

dfw.Parquet($"{tempDir.Path}TestParquetPath");

dfw.Orc($"{tempDir.Path}TestOrcPath");

dfw.Text($"{tempDir.Path}TestTextPath");

dfw.Csv($"{tempDir.Path}TestCsvPath");
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ public void TestSignaturesV2_3_X()

DataFrame df = _spark
.Read()
.Json(TestEnvironment.ResourceDirectory + "people.json");
.Json($"{TestEnvironment.ResourceDirectory}people.json");
df = Broadcast(df);

col = Coalesce();
Expand Down
Binary file not shown.
63 changes: 63 additions & 0 deletions src/csharp/Microsoft.Spark.E2ETest/Utils/TemporaryDirectory.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using System;
using System.IO;

namespace Microsoft.Spark.E2ETest.Utils
{
/// <summary>
/// Creates a temporary folder that is automatically cleaned up when disposed.
/// </summary>
internal sealed class TemporaryDirectory : IDisposable
{
private bool disposed = false;

/// <summary>
/// Path to temporary folder.
/// </summary>
public string Path { get; }

public TemporaryDirectory()
{
Path = System.IO.Path.Combine(System.IO.Path.GetTempPath(), Guid.NewGuid().ToString());
Cleanup();
Directory.CreateDirectory(Path);
Path = $"{Path}{System.IO.Path.DirectorySeparatorChar}";
}

public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}

private void Cleanup()
{
if (File.Exists(Path))
{
File.Delete(Path);
}
else if (Directory.Exists(Path))
{
Directory.Delete(Path, true);
}
}

private void Dispose(bool disposing)
{
if (disposed)
{
return;
}

if (disposing)
{
Cleanup();
}

disposed = true;
}
}
}
16 changes: 6 additions & 10 deletions src/csharp/Microsoft.Spark/Sql/DataFrameReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,7 @@ public DataFrameReader Schema(string schemaString)
/// <returns>This DataFrameReader object</returns>
public DataFrameReader Option(string key, string value)
{
OptionInternal(key, value);
return this;
return OptionInternal(key, value);
}

/// <summary>
Expand All @@ -70,8 +69,7 @@ public DataFrameReader Option(string key, string value)
/// <returns>This DataFrameReader object</returns>
public DataFrameReader Option(string key, bool value)
{
OptionInternal(key, value);
return this;
return OptionInternal(key, value);
}

/// <summary>
Expand All @@ -82,8 +80,7 @@ public DataFrameReader Option(string key, bool value)
/// <returns>This DataFrameReader object</returns>
public DataFrameReader Option(string key, long value)
{
OptionInternal(key, value);
return this;
return OptionInternal(key, value);
}

/// <summary>
Expand All @@ -94,8 +91,7 @@ public DataFrameReader Option(string key, long value)
/// <returns>This DataFrameReader object</returns>
public DataFrameReader Option(string key, double value)
{
OptionInternal(key, value);
return this;
return OptionInternal(key, value);
}

/// <summary>
Expand All @@ -119,7 +115,7 @@ public DataFrameReader Options(Dictionary<string, string> options)
/// <param name="paths">Input paths</param>
/// <returns>DataFrame object</returns>
public DataFrame Load(params string[] paths) =>
new DataFrame((JvmObjectReference)_jvmObject.Invoke("load", paths));
new DataFrame((JvmObjectReference)_jvmObject.Invoke("load", (object)paths));

/// <summary>
/// Loads a JSON file (one object per line) and returns the result as a DataFrame.
Expand Down Expand Up @@ -182,7 +178,7 @@ private DataFrame LoadSource(string source, params string[] paths)
throw new ArgumentException($"paths cannot be empty for source: {source}");
}

return new DataFrame((JvmObjectReference)_jvmObject.Invoke(source, paths));
return new DataFrame((JvmObjectReference)_jvmObject.Invoke(source, (object)paths));
}
}
}
Loading

0 comments on commit f67140a

Please sign in to comment.