Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DataFrameWriter Option fails #59

Merged
merged 8 commits into from
Apr 28, 2019
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using System.Collections.Generic;
using Microsoft.Spark.Sql;
using Xunit;

namespace Microsoft.Spark.E2ETest.IpcTests
{
[Collection("Spark E2E Tests")]
public class DataFrameReaderTests
{
private readonly SparkSession _spark;

public DataFrameReaderTests(SparkFixture fixture)
{
_spark = fixture.Spark;
}

/// <summary>
/// Test signatures for APIs up to Spark 2.3.*.
/// </summary>
[Fact]
public void TestSignaturesV2_3_X()
{
DataFrameReader dfr = _spark.Read();

Assert.IsType<DataFrameReader>(dfr.Format("json"));

Assert.IsType<DataFrameReader>(dfr.Schema("age INT, name STRING"));

Assert.IsType<DataFrameReader>(dfr.Option("stringOption", "value"));
Assert.IsType<DataFrameReader>(dfr.Option("boolOption", true));
Assert.IsType<DataFrameReader>(dfr.Option("longOption", 1L));
Assert.IsType<DataFrameReader>(dfr.Option("doubleOption", 3D));

Assert.IsType<DataFrameReader>(
dfr.Options(
new Dictionary<string, string>
{
{ "option1", "value1" },
{ "option2", "value2" }
}));

Assert.IsType<DataFrame>(dfr.Load(TestEnvironment.ResourceDirectory + "people.json"));
Assert.IsType<DataFrame>(
dfr.Load(
TestEnvironment.ResourceDirectory + "people.csv",
TestEnvironment.ResourceDirectory + "people.csv"));

Assert.IsType<DataFrame>(dfr.Json(TestEnvironment.ResourceDirectory + "people.json"));
Assert.IsType<DataFrame>(
dfr.Json(
TestEnvironment.ResourceDirectory + "people.json",
TestEnvironment.ResourceDirectory + "people.json"));

Assert.IsType<DataFrame>(dfr.Csv(TestEnvironment.ResourceDirectory + "people.csv"));
Assert.IsType<DataFrame>(
dfr.Csv(
TestEnvironment.ResourceDirectory + "people.csv",
TestEnvironment.ResourceDirectory + "people.csv"));

Assert.IsType<DataFrame>(
suhsteve marked this conversation as resolved.
Show resolved Hide resolved
dfr.Parquet(TestEnvironment.ResourceDirectory + "users.parquet"));
Assert.IsType<DataFrame>(
dfr.Parquet(
TestEnvironment.ResourceDirectory + "users.parquet",
TestEnvironment.ResourceDirectory + "users.parquet"));

Assert.IsType<DataFrame>(dfr.Orc(TestEnvironment.ResourceDirectory + "users.orc"));
Assert.IsType<DataFrame>(
dfr.Orc(
TestEnvironment.ResourceDirectory + "users.orc",
TestEnvironment.ResourceDirectory + "users.orc"));

dfr = _spark.Read();
Assert.IsType<DataFrame>(dfr.Text(TestEnvironment.ResourceDirectory + "people.txt"));
Assert.IsType<DataFrame>(
dfr.Text(
TestEnvironment.ResourceDirectory + "people.txt",
TestEnvironment.ResourceDirectory + "people.txt"));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using System;
using System.Collections.Generic;
using Microsoft.Spark.E2ETest.Utils;
using Microsoft.Spark.Sql;
using Xunit;

namespace Microsoft.Spark.E2ETest.IpcTests
{
[Collection("Spark E2E Tests")]
public class DataFrameWriterTests
{
private readonly SparkSession _spark;

public DataFrameWriterTests(SparkFixture fixture)
{
_spark = fixture.Spark;
}

/// <summary>
/// Test signatures for APIs up to Spark 2.3.*.
/// </summary>
[Fact]
public void TestSignaturesV2_3_X()
{
{
DataFrameWriter dfw = _spark
.Read()
.Schema("age INT, name STRING")
.Json(TestEnvironment.ResourceDirectory + "people.json")
.Write();

Assert.IsType<DataFrameWriter>(dfw.Mode(SaveMode.Ignore));

Assert.IsType<DataFrameWriter>(dfw.Mode("overwrite"));

Assert.IsType<DataFrameWriter>(dfw.Format("json"));

Assert.IsType<DataFrameWriter>(dfw.Option("stringOption", "value"));
Assert.IsType<DataFrameWriter>(dfw.Option("boolOption", true));
Assert.IsType<DataFrameWriter>(dfw.Option("longOption", 1L));
Assert.IsType<DataFrameWriter>(dfw.Option("doubleOption", 3D));

Assert.IsType<DataFrameWriter>(
dfw.Options(
new Dictionary<string, string>
{
{ "option1", "value1" },
{ "option2", "value2" }
}));

suhsteve marked this conversation as resolved.
Show resolved Hide resolved

Assert.IsType<DataFrameWriter>(dfw.PartitionBy("age"));
Assert.IsType<DataFrameWriter>(dfw.PartitionBy("age", "name"));

Assert.IsType<DataFrameWriter>(dfw.BucketBy(3, "age"));
Assert.IsType<DataFrameWriter>(dfw.BucketBy(3, "age", "name"));

Assert.IsType<DataFrameWriter>(dfw.SortBy("name"));
}

using (var tempDir = new TemporaryDirectory(AppDomain.CurrentDomain.BaseDirectory))
suhsteve marked this conversation as resolved.
Show resolved Hide resolved
{
DataFrameWriter dfw = _spark
.Read()
.Csv(TestEnvironment.ResourceDirectory + "people.csv")
.Write();

// TODO: Test dfw.Jdbc without running a local db.

dfw.Option("path", tempDir.Path).SaveAsTable("TestTable");

dfw.InsertInto("TestTable");

dfw.Option("path", tempDir.Path + "TestSavePath1").Save();
suhsteve marked this conversation as resolved.
Show resolved Hide resolved
dfw.Save(tempDir.Path + "TestSavePath2");

dfw.Json(tempDir.Path + "TestJsonPath");

dfw.Parquet(tempDir.Path + "TestParquetPath");

dfw.Orc(tempDir.Path + "TestOrcPath");

dfw.Text(tempDir.Path + "TestTextPath");

dfw.Csv(tempDir.Path + "TestCsvPath");
}
}
}
}
Binary file not shown.
64 changes: 64 additions & 0 deletions src/csharp/Microsoft.Spark.E2ETest/Utils/TemporaryDirectory.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using System;
using System.IO;

namespace Microsoft.Spark.E2ETest.Utils
{
/// <summary>
/// Creates a temporary folder at a specified root path that is automatically
/// cleaned up when disposed.
/// </summary>
internal sealed class TemporaryDirectory : IDisposable
{
private bool disposed = false;

/// <summary>
/// Path to temporary folder.
/// </summary>
public string Path { get; }

public TemporaryDirectory(string rootDirectory)
{
Path = System.IO.Path.Combine(rootDirectory, Guid.NewGuid().ToString());
Cleanup();
Directory.CreateDirectory(Path);
Path += System.IO.Path.DirectorySeparatorChar;
}

public void Dispose()
{
// Dispose of unmanaged resources.
Dispose(true);
// Suppress finalization.
GC.SuppressFinalize(this);
}

private void Cleanup()
{
if (File.Exists(Path))
{
File.Delete(Path);
}
else if (Directory.Exists(Path))
{
Directory.Delete(Path, true);
}
}

private void Dispose(bool disposing)
{
if (disposed)
suhsteve marked this conversation as resolved.
Show resolved Hide resolved
return;

if (disposing)
{
Cleanup();
}

disposed = true;
}
}
}
17 changes: 6 additions & 11 deletions src/csharp/Microsoft.Spark/Sql/DataFrameReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,7 @@ public DataFrameReader Schema(string schemaString)
/// <returns>This DataFrameReader object</returns>
public DataFrameReader Option(string key, string value)
{
OptionInternal(key, value);
return this;
return OptionInternal(key, value);
}

/// <summary>
Expand All @@ -70,8 +69,7 @@ public DataFrameReader Option(string key, string value)
/// <returns>This DataFrameReader object</returns>
public DataFrameReader Option(string key, bool value)
{
OptionInternal(key, value);
return this;
return OptionInternal(key, value);
}

/// <summary>
Expand All @@ -82,8 +80,7 @@ public DataFrameReader Option(string key, bool value)
/// <returns>This DataFrameReader object</returns>
public DataFrameReader Option(string key, long value)
{
OptionInternal(key, value);
return this;
return OptionInternal(key, value);
}

/// <summary>
Expand All @@ -94,8 +91,7 @@ public DataFrameReader Option(string key, long value)
/// <returns>This DataFrameReader object</returns>
public DataFrameReader Option(string key, double value)
{
OptionInternal(key, value);
return this;
return OptionInternal(key, value);
}

/// <summary>
Expand All @@ -118,8 +114,7 @@ public DataFrameReader Options(Dictionary<string, string> options)
/// </remarks>
/// <param name="paths">Input paths</param>
/// <returns>DataFrame object</returns>
public DataFrame Load(params string[] paths) =>
new DataFrame((JvmObjectReference)_jvmObject.Invoke("load", paths));
public DataFrame Load(params string[] paths) => LoadSource("load", paths);
suhsteve marked this conversation as resolved.
Show resolved Hide resolved

/// <summary>
/// Loads a JSON file (one object per line) and returns the result as a DataFrame.
Expand Down Expand Up @@ -182,7 +177,7 @@ private DataFrame LoadSource(string source, params string[] paths)
throw new ArgumentException($"paths cannot be empty for source: {source}");
}

return new DataFrame((JvmObjectReference)_jvmObject.Invoke(source, paths));
return new DataFrame((JvmObjectReference)_jvmObject.Invoke(source, (object)paths));
}
}
}
26 changes: 17 additions & 9 deletions src/csharp/Microsoft.Spark/Sql/DataFrameWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,7 @@ public DataFrameWriter Format(string source)
/// <returns>This DataFrameWriter object</returns>
public DataFrameWriter Option(string key, string value)
{
_jvmObject.Invoke("option", value);
return this;
return OptionInternal(key, value);
}

/// <summary>
Expand All @@ -83,8 +82,7 @@ public DataFrameWriter Option(string key, string value)
/// <returns>This DataFrameWriter object</returns>
public DataFrameWriter Option(string key, bool value)
{
_jvmObject.Invoke("option", value);
return this;
return OptionInternal(key, value);
}

/// <summary>
Expand All @@ -95,8 +93,7 @@ public DataFrameWriter Option(string key, bool value)
/// <returns>This DataFrameWriter object</returns>
public DataFrameWriter Option(string key, long value)
{
_jvmObject.Invoke("option", value);
return this;
return OptionInternal(key, value);
}

/// <summary>
Expand All @@ -107,8 +104,7 @@ public DataFrameWriter Option(string key, long value)
/// <returns>This DataFrameWriter object</returns>
public DataFrameWriter Option(string key, double value)
{
_jvmObject.Invoke("option", value);
return this;
return OptionInternal(key, value);
}

/// <summary>
Expand Down Expand Up @@ -240,6 +236,18 @@ public void Jdbc(string url, string table, Dictionary<string, string> properties
/// Saves the content of the DataFrame in CSV format at the specified path.
/// </summary>
/// <param name="path">Path to save the content</param>
public void Csv(string path) => _jvmObject.Invoke("csvs", path);
public void Csv(string path) => _jvmObject.Invoke("csv", path);

/// <summary>
/// Helper function to add given key/value pair as a new option.
/// </summary>
/// <param name="key">Name of the option</param>
/// <param name="value">Value of the option</param>
/// <returns>This DataFrameWriter object</returns>
private DataFrameWriter OptionInternal(string key, object value)
{
_jvmObject.Invoke("option", key, value);
return this;
}
}
}
1 change: 0 additions & 1 deletion src/csharp/Microsoft.Spark/Sql/SparkSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

using System;
using Microsoft.Spark.Interop.Ipc;
using Microsoft.Spark.Services;
using Microsoft.Spark.Sql.Streaming;

namespace Microsoft.Spark.Sql
Expand Down