Skip to content

Commit

Permalink
Support Trigger in DataStreamWriter (#153)
Browse files Browse the repository at this point in the history
  • Loading branch information
danny8002 authored and imback82 committed Jul 9, 2019
1 parent cd91a13 commit 73158a3
Show file tree
Hide file tree
Showing 3 changed files with 152 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using Microsoft.Spark.Sql;
using Microsoft.Spark.Sql.Streaming;
using Xunit;

namespace Microsoft.Spark.E2ETest.IpcTests
{
[Collection("Spark E2E Tests")]
public class TriggerTests
{
private readonly SparkSession _spark;

public TriggerTests(SparkFixture fixture)
{
_spark = fixture.Spark;
}

/// <summary>
/// Test Trigger's static functions
/// </summary>
[Fact]
public void TestSignatures()
{
Assert.IsType<Trigger>(Trigger.Once());

Assert.IsType<Trigger>(Trigger.Continuous("1 seconds"));
Assert.IsType<Trigger>(Trigger.Continuous(1000));

Assert.IsType<Trigger>(Trigger.ProcessingTime("1 seconds"));
Assert.IsType<Trigger>(Trigger.ProcessingTime(1000));
}
}
}
25 changes: 25 additions & 0 deletions src/csharp/Microsoft.Spark/Sql/Streaming/DataStreamWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,31 @@ public DataStreamWriter Options(Dictionary<string, string> options)
return this;
}

/// <summary>
/// Sets the trigger for the stream query.
/// </summary>
/// <param name="trigger">Trigger object</param>
/// <returns>This DataStreamReader object</returns>
public DataStreamWriter Trigger(Trigger trigger)
{
_jvmObject.Invoke("trigger", trigger);
return this;
}

/// <summary>
/// Specifies the name of the <see cref="StreamingQuery"/>
/// that can be started with `start()`.
/// This name must be unique among all the currently active queries
/// in the associated SQLContext.
/// </summary>
/// <param name="queryName">Query name</param>
/// <returns>This DataStreamReader object</returns>
public DataStreamWriter QueryName(string queryName)
{
_jvmObject.Invoke("queryName", queryName);
return this;
}

/// <summary>
/// Starts the execution of the streaming query.
/// </summary>
Expand Down
91 changes: 91 additions & 0 deletions src/csharp/Microsoft.Spark/Sql/Streaming/Trigger.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using Microsoft.Spark.Interop;
using Microsoft.Spark.Interop.Ipc;

namespace Microsoft.Spark.Sql.Streaming
{
/// <summary>
/// Policy used to indicate how often results should be produced by a
/// <see cref="StreamingQuery"/>
/// </summary>
public sealed class Trigger : IJvmObjectReferenceProvider
{
private static IJvmBridge Jvm { get; } = SparkEnvironment.JvmBridge;
private static readonly string s_triggerClassName =
"org.apache.spark.sql.streaming.Trigger";

private readonly JvmObjectReference _jvmObject;

internal Trigger(JvmObjectReference jvmObject) => _jvmObject = jvmObject;

JvmObjectReference IJvmObjectReferenceProvider.Reference => _jvmObject;

/// <summary>
/// A trigger policy that runs a query periodically based on an interval
/// in processing time.
/// If `interval` is 0, the query will run as fast as possible.
/// </summary>
/// <param name="intervalMs">Milliseconds</param>
/// <returns>Trigger Object</returns>
public static Trigger ProcessingTime(long intervalMs)
{
return Apply("ProcessingTime", intervalMs);
}

/// <summary>
/// A trigger policy that runs a query periodically based on an interval
/// in processing time.
/// If `interval` is effectively 0, the query will run as fast as possible.
/// </summary>
/// <param name="interval">string representation for interval. eg. "10 seconds"</param>
/// <returns>Trigger Object</returns>
public static Trigger ProcessingTime(string interval)
{
return Apply("ProcessingTime", interval);
}

/// <summary>
/// A trigger that process only one batch of data in a streaming query
/// then terminates the query.
/// </summary>
/// <returns>Trigger Object</returns>
public static Trigger Once()
{
return Apply("Once");
}

/// <summary>
/// A trigger that continuously processes streaming data,
/// asynchronously checkpointing at the specified interval.
/// </summary>
/// <param name="intervalMs">Milliseconds</param>
/// <returns>Trigger Object</returns>
public static Trigger Continuous(long intervalMs)
{
return Apply("Continuous", intervalMs);
}

/// <summary>
/// A trigger that continuously processes streaming data,
/// asynchronously checkpointing at the specified interval.
/// </summary>
/// <param name="interval">string representation for interval. eg. "10 seconds"</param>
/// <returns>Trigger Object</returns>
public static Trigger Continuous(string interval)
{
return Apply("Continuous", interval);
}

private static Trigger Apply(string funcName, params object[] args)
{
return new Trigger(
(JvmObjectReference)Jvm.CallStaticJavaMethod(
s_triggerClassName,
funcName,
args));
}
}
}

0 comments on commit 73158a3

Please sign in to comment.