Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Trigger in DataStreamWriter #153

Merged
merged 7 commits into from
Jul 9, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using Microsoft.Spark.Sql;
using Microsoft.Spark.Sql.Streaming;
using Xunit;

namespace Microsoft.Spark.E2ETest.IpcTests
{
[Collection("Spark E2E Tests")]
public class TriggerTests
{
private readonly SparkSession _spark;

public TriggerTests(SparkFixture fixture)
{
_spark = fixture.Spark;
}

/// <summary>
/// Test Trigger's static functions
/// </summary>
[Fact]
public void TestSignatures()
{
Assert.IsType<Trigger>(Trigger.Once());

Assert.IsType<Trigger>(Trigger.Continuous("1 seconds"));
Assert.IsType<Trigger>(Trigger.Continuous(1000));

Assert.IsType<Trigger>(Trigger.ProcessingTime("1 seconds"));
Assert.IsType<Trigger>(Trigger.ProcessingTime(1000));
}
}
}
25 changes: 25 additions & 0 deletions src/csharp/Microsoft.Spark/Sql/Streaming/DataStreamWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,31 @@ public DataStreamWriter Options(Dictionary<string, string> options)
return this;
}

/// <summary>
/// Sets the trigger for the stream query.
/// </summary>
/// <param name="trigger">Trigger object</param>
/// <returns>This DataStreamReader object</returns>
public DataStreamWriter Trigger(Trigger trigger)
{
_jvmObject.Invoke("trigger", trigger);
return this;
}

/// <summary>
/// Specifies the name of the <see cref="StreamingQuery"/>
/// that can be started with `start()`.
/// This name must be unique among all the currently active queries
/// in the associated SQLContext.
/// </summary>
/// <param name="queryName">Query name</param>
/// <returns>This DataStreamReader object</returns>
public DataStreamWriter QueryName(string queryName)
{
_jvmObject.Invoke("queryName", queryName);
return this;
}

/// <summary>
/// Starts the execution of the streaming query.
/// </summary>
Expand Down
91 changes: 91 additions & 0 deletions src/csharp/Microsoft.Spark/Sql/Streaming/Trigger.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using Microsoft.Spark.Interop;
using Microsoft.Spark.Interop.Ipc;

namespace Microsoft.Spark.Sql.Streaming
{
/// <summary>
/// Policy used to indicate how often results should be produced by a
/// <see cref="StreamingQuery"/>
/// </summary>
public sealed class Trigger : IJvmObjectReferenceProvider
{
private static IJvmBridge Jvm { get; } = SparkEnvironment.JvmBridge;
private static readonly string s_triggerClassName =
"org.apache.spark.sql.streaming.Trigger";

private readonly JvmObjectReference _jvmObject;

internal Trigger(JvmObjectReference jvmObject) => _jvmObject = jvmObject;

JvmObjectReference IJvmObjectReferenceProvider.Reference => _jvmObject;

/// <summary>
/// A trigger policy that runs a query periodically based on an interval
/// in processing time.
/// If `interval` is 0, the query will run as fast as possible.
/// </summary>
/// <param name="intervalMs">Milliseconds</param>
/// <returns>Trigger Object</returns>
public static Trigger ProcessingTime(long intervalMs)
{
return Apply("ProcessingTime", intervalMs);
}

/// <summary>
/// A trigger policy that runs a query periodically based on an interval
/// in processing time.
/// If `interval` is effectively 0, the query will run as fast as possible.
/// </summary>
/// <param name="interval">string representation for interval. eg. "10 seconds"</param>
/// <returns>Trigger Object</returns>
public static Trigger ProcessingTime(string interval)
{
return Apply("ProcessingTime", interval);
}

/// <summary>
/// A trigger that process only one batch of data in a streaming query
/// then terminates the query.
/// </summary>
/// <returns>Trigger Object</returns>
public static Trigger Once()
{
return Apply("Once");
}

/// <summary>
/// A trigger that continuously processes streaming data,
/// asynchronously checkpointing at the specified interval.
/// </summary>
/// <param name="intervalMs">Milliseconds</param>
/// <returns>Trigger Object</returns>
public static Trigger Continuous(long intervalMs)
{
return Apply("Continuous", intervalMs);
}

/// <summary>
/// A trigger that continuously processes streaming data,
/// asynchronously checkpointing at the specified interval.
/// </summary>
/// <param name="interval">string representation for interval. eg. "10 seconds"</param>
/// <returns>Trigger Object</returns>
public static Trigger Continuous(string interval)
{
return Apply("Continuous", interval);
}

private static Trigger Apply(string funcName, params object[] args)
{
return new Trigger(
(JvmObjectReference)Jvm.CallStaticJavaMethod(
s_triggerClassName,
funcName,
args));
}
}
}