Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for new Delta v0.4.0 APIs #297

Merged
merged 18 commits into from
Oct 22, 2019
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,73 @@ public void TestStreamingScenario()
}
}

/// <summary>
/// Test <c>DeltaTable.IsDeltaTable()</c> API.
/// </summary>
[SkipIfSparkVersionIsLessThan(Versions.V2_4_2)]
public void TestIsDeltaTable()
{
using (var tempDirectory = new TemporaryDirectory())
{
// Save the same data to a DeltaTable and to Parquet.
DataFrame data = _spark.Range(0, 5);
string parquetPath = Path.Combine(tempDirectory.Path, "parquet-data");
data.Write().Parquet(parquetPath);
string deltaTablePath = Path.Combine(tempDirectory.Path, "delta-table");
data.Write().Format("delta").Save(deltaTablePath);

Assert.False(DeltaTable.IsDeltaTable(parquetPath));
Assert.False(DeltaTable.IsDeltaTable(_spark, parquetPath));

Assert.True(DeltaTable.IsDeltaTable(deltaTablePath));
Assert.True(DeltaTable.IsDeltaTable(_spark, deltaTablePath));
}
}

/// <summary>
/// Test <c>DeltaTable.ConvertToDelta()</c> API.
/// </summary>
[SkipIfSparkVersionIsLessThan(Versions.V2_4_2)]
public void TestConvertToDelta()
{
string partitionColumnName = "id_plus_one";
DataFrame data = _spark.Range(0, 5).Select(
Functions.Col("id"),
Functions.Expr($"(`id` + 1) AS `{partitionColumnName}`"));

// Run the same test on the different overloads of DeltaTable.ConvertToDelta().
void testWrapper(DataFrame dataFrame, Func<string, DeltaTable> convertToDelta, string partitionColumn = null)
AFFogarty marked this conversation as resolved.
Show resolved Hide resolved
{
using (var tempDirectory = new TemporaryDirectory())
{
string path = Path.Combine(tempDirectory.Path, "parquet-data");
DataFrameWriter dataWriter = dataFrame.Write();

if (!string.IsNullOrEmpty(partitionColumn))
{
dataWriter = dataWriter.PartitionBy(partitionColumn);
}

dataWriter.Parquet(path);

Assert.False(DeltaTable.IsDeltaTable(path));

string identifier = $"parquet.`{path}`";
DeltaTable convertedDeltaTable = convertToDelta(identifier);

ValidateDataFrame(Enumerable.Range(0, 5), convertedDeltaTable.ToDF());
Assert.True(DeltaTable.IsDeltaTable(path));
}
}

testWrapper(data, identifier => DeltaTable.ConvertToDelta(_spark, identifier));
testWrapper(
data.Repartition(Functions.Col(partitionColumnName)),
identifier => DeltaTable.ConvertToDelta(_spark, identifier, $"{partitionColumnName} bigint"),
AFFogarty marked this conversation as resolved.
Show resolved Hide resolved
partitionColumnName);
// TODO: Test with StructType partition schema once StructType is supported.
}

/// <summary>
/// Test that methods return the expected signature.
/// </summary>
Expand All @@ -161,7 +228,11 @@ public void TestSignatures()
DeltaTable table = Assert.IsType<DeltaTable>(DeltaTable.ForPath(path));
table = Assert.IsType<DeltaTable>(DeltaTable.ForPath(_spark, path));

Assert.IsType<bool>(DeltaTable.IsDeltaTable(_spark, path));
Assert.IsType<bool>(DeltaTable.IsDeltaTable(path));

Assert.IsType<DeltaTable>(table.As("oldTable"));
Assert.IsType<DeltaTable>(table.Alias("oldTable"));
Assert.IsType<DataFrame>(table.History());
Assert.IsType<DataFrame>(table.History(200));
Assert.IsType<DataFrame>(table.ToDF());
Expand Down Expand Up @@ -221,6 +292,19 @@ public void TestSignatures()
.Option("path", path)
.Load());
Assert.IsType<DataFrame>(_spark.ReadStream().Format("delta").Load(path));

// Create Parquet data and convert it to DeltaTables.
string parquetIdentifier = $"parquet.`{path}`";
rangeRate.Write().Mode(SaveMode.Overwrite).Parquet(path);
Assert.IsType<DeltaTable>(DeltaTable.ConvertToDelta(_spark, parquetIdentifier));
rangeRate
.Select(Functions.Col("id"), Functions.Expr($"(`id` + 1) AS `id_plus_one`"))
.Write()
.PartitionBy("id")
.Mode(SaveMode.Overwrite)
.Parquet(path);
Assert.IsType<DeltaTable>(DeltaTable.ConvertToDelta(_spark, parquetIdentifier, "id bigint"));
AFFogarty marked this conversation as resolved.
Show resolved Hide resolved
// TODO: Test with StructType partition schema once StructType is supported.
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using Microsoft.Spark.Interop;
using Microsoft.Spark.Interop.Ipc;
using Microsoft.Spark.Sql;
using Microsoft.Spark.Sql.Types;

namespace Microsoft.Spark.Extensions.Delta.Tables
{
Expand All @@ -19,6 +20,7 @@ namespace Microsoft.Spark.Extensions.Delta.Tables
/// </summary>
public class DeltaTable : IJvmObjectReferenceProvider
{
private const string JvmClassName = "io.delta.tables.DeltaTable";
AFFogarty marked this conversation as resolved.
Show resolved Hide resolved
private readonly JvmObjectReference _jvmObject;

internal DeltaTable(JvmObjectReference jvmObject)
Expand All @@ -28,6 +30,90 @@ internal DeltaTable(JvmObjectReference jvmObject)

JvmObjectReference IJvmObjectReferenceProvider.Reference => _jvmObject;

/// <summary>
/// Create a DeltaTable from the given parquet table and partition schema.
/// Takes an existing parquet table and constructs a delta transaction log in the base path
/// of that table.
///
/// Note: Any changes to the table during the conversion process may not result in a
/// consistent state at the end of the conversion. Users should stop any changes to the
/// table before the conversion is started.
///
/// An example usage would be
/// <code>
/// DeltaTable.ConvertToDelta(
/// spark,
/// "parquet.`/path`",
/// var partitionSchema = new StructType(new List&lt;StructField&gt;() {
/// new StructField("key1", new LongType()),
/// new StructField("key2", new StringType())
/// });
/// </code>
/// </summary>
/// <param name="spark">The relevant session.</param>
/// <param name="identifier">String used to identify the parquet table.</param>
/// <param name="partitionSchema">Struct representing the partition schema.</param>
/// <returns>The converted DeltaTable.</returns>
public static DeltaTable ConvertToDelta(SparkSession spark, string identifier, StructType partitionSchema) =>
AFFogarty marked this conversation as resolved.
Show resolved Hide resolved
new DeltaTable(
(JvmObjectReference)SparkEnvironment.JvmBridge.CallStaticJavaMethod(
JvmClassName,
"convertToDelta",
spark,
identifier,
partitionSchema));

/// <summary>
/// Create a DeltaTable from the given parquet table and partition schema.
/// Takes an existing parquet table and constructs a delta transaction log in the base path of
/// that table.
///
/// Note: Any changes to the table during the conversion process may not result in a consistent
/// state at the end of the conversion. Users should stop any changes to the table before the
/// conversion is started.
///
/// An example usage would be
/// <code>
/// DeltaTable.ConvertToDelta(spark, "parquet.`/path`", "key1 long, key2 string")
/// </code>
/// </summary>
/// <param name="spark">The relevant session.</param>
/// <param name="identifier">String used to identify the parquet table.</param>
/// <param name="partitionSchema">String representing the partition schema.</param>
/// <returns>The converted DeltaTable.</returns>
public static DeltaTable ConvertToDelta(SparkSession spark, string identifier, string partitionSchema) =>
AFFogarty marked this conversation as resolved.
Show resolved Hide resolved
new DeltaTable(
(JvmObjectReference)SparkEnvironment.JvmBridge.CallStaticJavaMethod(
JvmClassName,
"convertToDelta",
spark,
identifier,
partitionSchema));

/// <summary>
/// Create a DeltaTable from the given parquet table. Takes an existing parquet table and
/// constructs a delta transaction log in the base path of the table.
///
/// Note: Any changes to the table during the conversion process may not result in a consistent
AFFogarty marked this conversation as resolved.
Show resolved Hide resolved
/// state at the end of the conversion. Users should stop any changes to the table before the
AFFogarty marked this conversation as resolved.
Show resolved Hide resolved
/// conversion is started.
///
/// An example would be
/// <code>
/// DeltaTable.ConvertToDelta(spark, "parquet.`/path`")
/// </code>
/// </summary>
/// <param name="spark">The relevant session.</param>
/// <param name="identifier">String used to identify the parquet table.</param>
/// <returns>The converted DeltaTable.</returns>
public static DeltaTable ConvertToDelta(SparkSession spark, string identifier) =>
new DeltaTable(
(JvmObjectReference)SparkEnvironment.JvmBridge.CallStaticJavaMethod(
JvmClassName,
"convertToDelta",
spark,
identifier));

/// <summary>
/// Create a DeltaTable for the data at the given <c>path</c>.
///
Expand All @@ -40,7 +126,7 @@ internal DeltaTable(JvmObjectReference jvmObject)
public static DeltaTable ForPath(string path) =>
new DeltaTable(
(JvmObjectReference)SparkEnvironment.JvmBridge.CallStaticJavaMethod(
"io.delta.tables.DeltaTable",
JvmClassName,
"forPath",
path));

Expand All @@ -54,11 +140,51 @@ public static DeltaTable ForPath(string path) =>
public static DeltaTable ForPath(SparkSession sparkSession, string path) =>
new DeltaTable(
(JvmObjectReference)SparkEnvironment.JvmBridge.CallStaticJavaMethod(
"io.delta.tables.DeltaTable",
JvmClassName,
"forPath",
sparkSession,
path));

/// <summary>
/// Check if the provided <c>identifier</c> string, in this case a file path,
/// is the root of a Delta table using the given SparkSession.
///
/// An example would be
/// <code>
/// DeltaTable.IsDeltaTable(spark, "path/to/table")
/// </code>
/// </summary>
/// <param name="sparkSession">The relevant session.</param>
/// <param name="identifier">String that identifies the table, e.g. path to table.</param>
/// <returns>True if the table is a DeltaTable.</returns>
public static bool IsDeltaTable(SparkSession sparkSession, string identifier) =>
(bool)SparkEnvironment.JvmBridge.CallStaticJavaMethod(
JvmClassName,
"isDeltaTable",
sparkSession,
identifier);

/// <summary>
/// Check if the provided <c>identifier</c> string, in this case a file path,
/// is the root of a Delta table.
///
/// Note: This uses the active SparkSession in the current thread to search for the table. Hence,
AFFogarty marked this conversation as resolved.
Show resolved Hide resolved
/// this throws error if active SparkSession has not been set, that is,
/// <c>SparkSession.GetActiveSession()</c> is empty.
///
/// An example would be
/// <code>
/// DeltaTable.IsDeltaTable(spark, "/path/to/table")
/// </code>
/// </summary>
/// <param name="identifier">String that identifies the table, e.g. path to table.</param>
/// <returns>True if the table is a DeltaTable.</returns>
public static bool IsDeltaTable(string identifier) =>
(bool)SparkEnvironment.JvmBridge.CallStaticJavaMethod(
JvmClassName,
"isDeltaTable",
identifier);

/// <summary>
/// Apply an alias to the DeltaTable. This is similar to <c>Dataset.As(alias)</c> or SQL
/// <c>tableName AS alias</c>.
Expand All @@ -68,6 +194,15 @@ public static DeltaTable ForPath(SparkSession sparkSession, string path) =>
public DeltaTable As(string alias) =>
new DeltaTable((JvmObjectReference)_jvmObject.Invoke("as", alias));

/// <summary>
/// Apply an alias to the DeltaTable. This is similar to <c>Dataset.as(alias)</c>
/// or SQL <c>tableName AS alias</c>.
/// </summary>
/// <param name="alias">The table alias.</param>
/// <returns>Aliased DeltaTable.</returns>
public DeltaTable Alias(string alias) =>
new DeltaTable((JvmObjectReference)_jvmObject.Invoke("alias", alias));

/// <summary>
/// Get a DataFrame (that is, Dataset[Row]) representation of this Delta table.
/// </summary>
Expand All @@ -91,7 +226,7 @@ public DataFrame Vacuum(double retentionHours) =>
/// for maintaining older versions up to the given retention threshold. This method will
/// return an empty DataFrame on successful completion.
///
/// Note: This will use the default retention period of 7 hours.
/// Note: This will use the default retention period of 7 days.
/// </summary>
/// <returns>Vacuumed DataFrame.</returns>
public DataFrame Vacuum() =>
Expand Down Expand Up @@ -207,16 +342,16 @@ public void UpdateExpr(string condition, Dictionary<string, string> set) =>

/// <summary>
/// Merge data from the <c>source</c> DataFrame based on the given merge <c>condition</c>.
/// This class returns a <c>DeltaMergeBuilder</c> object that can be used to specify the
/// This returns a <c>DeltaMergeBuilder</c> object that can be used to specify the
/// update, delete, or insert actions to be performed on rows based on whether the rows
/// matched the condition or not.
///
/// See the <see cref="DeltaMergeBuilder"/> for a full description of this operation and
/// what combination update, delete and insert operations are allowed.
/// what combinations of update, delete and insert operations are allowed.
/// </summary>
/// <example>
/// See the <c>DeltaMergeBuilder</c> for a full description of this operation and what combination
/// update, delete and insert operations are allowed.
/// See the <c>DeltaMergeBuilder</c> for a full description of this operation and what
/// combinations of update, delete and insert operations are allowed.
///
/// Example to update a key-value Delta table with new key-values from a source DataFrame:
/// <code>
Expand Down Expand Up @@ -256,7 +391,7 @@ public DeltaMergeBuilder Merge(DataFrame source, string condition) =>
/// matched the condition or not.
///
/// See the <see cref="DeltaMergeBuilder"/> for a full description of this operation and
/// what combination update, delete and insert operations are allowed.
/// what combinations of update, delete and insert operations are allowed.
/// </summary>
/// <example>
/// Example to update a key-value Delta table with new key-values from a source DataFrame:
Expand Down