Skip to content

Commit

Permalink
Cancel statement if token cancellation is requested (#288)
Browse files Browse the repository at this point in the history
* Change request behaviour

We want to get the response back from the cluster as fast as possible. When we have the statement_id from the response, we can cancel the statement if the cancellation token is cancelled.

We aim to minimize the time where user-code can't cancel the interaction with databricks cluster.

* Enhance comment to why no cancellation token is provided

* Use same exception types.
If the cancellation token is canceled within a task it raises a TaskCanceledException. Catch this exception and wrap it in an OperationCanceledException

* Bump package version

* Update release notes

* Update source/Databricks/source/SqlStatementExecution/Statement/DatabricksStatementRequest.cs

Co-authored-by: Dan Stenrøjl <51327761+dstenroejl@users.noreply.github.com>

* Fix cancellation of Databricks statements

* Add cancellation token test and helper classes

* Update release notes.
Using asynchronous mode for communication with job cluster

* Refactor method names

---------

Co-authored-by: Dan Stenrøjl <51327761+dstenroejl@users.noreply.github.com>
  • Loading branch information
Sondergaard and dstenroejl authored Mar 7, 2024
1 parent 5812054 commit 9c9d85c
Show file tree
Hide file tree
Showing 7 changed files with 119 additions and 10 deletions.
8 changes: 8 additions & 0 deletions source/Databricks/documents/release-notes/release-notes.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
# Databricks Release Notes

## Version 9.0.2

Cancel statement if token cancellation is requested.

When the request is canceled the cluster is notified to also cancel execution of that statement.

With this change the communication with the job cluster is using the [**Asynchronous mode**](https://docs.databricks.com/api/azure/workspace/statementexecution/executestatement#wait_timeout).

## Version 9.0.1

Added try-catch block to health checks to prevent both logging exceptions and indicate unhealthy state.
Expand Down
2 changes: 1 addition & 1 deletion source/Databricks/source/Jobs/Jobs.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ limitations under the License.

<PropertyGroup>
<PackageId>Energinet.DataHub.Core.Databricks.Jobs</PackageId>
<PackageVersion>9.0.1$(VersionSuffix)</PackageVersion>
<PackageVersion>9.0.2$(VersionSuffix)</PackageVersion>
<Title>Databricks Jobs</Title>
<Company>Energinet-DataHub</Company>
<Authors>Energinet-DataHub</Authors>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

using System.Net.Http.Json;
using System.Text.Json.Serialization;
using Energinet.DataHub.Core.Databricks.SqlStatementExecution.Formats;
using Energinet.DataHub.Core.Databricks.SqlStatementExecution.IntegrationTests.Client.Statements;
using Energinet.DataHub.Core.Databricks.SqlStatementExecution.IntegrationTests.Fixtures;
Expand Down Expand Up @@ -168,9 +170,69 @@ public async Task ExecuteStatementAsync_WhenCancelled_IsCancelledDuringTheInitia
task.IsCanceled.Should().BeTrue();
}

[Fact]
public async Task GivenCancellationToken_WhenTokenIsCancelled_ThenClusterJobIsCancelled()
{
var client = _sqlWarehouseFixture.CreateSqlStatementClient();
var statement = new AtLeast10SecStatement();

var cts = new CancellationTokenSource();
cts.CancelAfter(TimeSpan.FromSeconds(1));

await Assert.ThrowsAsync<OperationCanceledException>(async () =>
{
var result = client.ExecuteStatementAsync(statement, cts.Token);
_ = await result.CountAsync(cts.Token);
});

await AssertThatStatementIsCancelledAsync(_sqlWarehouseFixture.CreateHttpClient(), statement.HelperId.ToString());
}

public static IEnumerable<object[]> GetFormats()
{
yield return new object[] { Format.ApacheArrow };
yield return new object[] { Format.JsonArray };
}

public class QueryHistory
{
[JsonPropertyName("next_page_token")]
public string NextPageToken { get; init; } = string.Empty;

[JsonPropertyName("has_next_page")]
public bool HasNextPage { get; init; }

[JsonPropertyName("res")]
public Query[] Queries { get; set; } = Array.Empty<Query>();
}

public class Query
{
[JsonPropertyName("query_id")]
public string QueryId { get; init; } = string.Empty;

[JsonPropertyName("status")]
public string Status { get; init; } = string.Empty;

[JsonPropertyName("query_text")]
public string QueryText { get; init; } = string.Empty;
}

private static async Task AssertThatStatementIsCancelledAsync(HttpClient client, string statementId)
{
var retriesLeft = 6;
while (retriesLeft-- > 0)
{
await Task.Delay(750);
var response = await client.GetAsync($"api/2.0/sql/history/queries?include_metrics=false");
var history = await response.Content.ReadFromJsonAsync<QueryHistory>();

var query = history?.Queries.FirstOrDefault(q => q.QueryText.EndsWith(statementId, StringComparison.InvariantCultureIgnoreCase));
if (query == null) continue;

if (query.Status.Equals("Canceled", StringComparison.OrdinalIgnoreCase)) return;
}

Assert.Fail("No cancelled query found in history for statementId: " + statementId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,15 @@ namespace Energinet.DataHub.Core.Databricks.SqlStatementExecution.IntegrationTes
/// </summary>
public class AtLeast10SecStatement : DatabricksStatement
{
public AtLeast10SecStatement()
{
HelperId = Guid.NewGuid();
}

public Guid HelperId { get; private set; }

protected internal override string GetSqlStatement()
{
return "SELECT concat_ws('-', M.id, N.id, random()) as ID FROM range(1000000) AS M, range(1000000) AS N";
return $"SELECT concat_ws('-', M.id, N.id, random()) as ID FROM range(1000000) AS M, range(1000000) AS N; --HelperId:{HelperId}";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,15 @@ public DatabricksSqlWarehouseQueryExecutor CreateSqlStatementClient()
return serviceProvider.GetRequiredService<DatabricksSqlWarehouseQueryExecutor>();
}

public HttpClient CreateHttpClient()
{
var services = CreateServiceCollection();
var serviceProvider = services.BuildServiceProvider();
var factory = serviceProvider.GetRequiredService<IHttpClientFactory>();

return factory.CreateClient("Databricks");
}

private static ServiceCollection CreateServiceCollection()
{
var integrationTestConfiguration = _lazyConfiguration.Value;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ limitations under the License.

<PropertyGroup>
<PackageId>Energinet.DataHub.Core.Databricks.SqlStatementExecution</PackageId>
<PackageVersion>9.0.1$(VersionSuffix)</PackageVersion>
<PackageVersion>9.0.2$(VersionSuffix)</PackageVersion>
<Title>Databricks SQL Statement Execution</Title>
<Company>Energinet-DataHub</Company>
<Authors>Energinet-DataHub</Authors>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ internal DatabricksStatementRequest(string warehouseId, DatabricksStatement stat
Parameters = statement.GetParameters().ToArray();
WarehouseId = warehouseId;
Disposition = "EXTERNAL_LINKS";
WaitTimeout = "30s";
WaitTimeout = "0s";
Format = format;
}

Expand Down Expand Up @@ -70,10 +70,25 @@ public async ValueTask<DatabricksStatementResponse> WaitForSqlWarehouseResultAsy
DatabricksStatementResponse? response = null;
do
{
response = await GetResponseFromDataWarehouseAsync(client, endpoint, response, cancellationToken);
if (response.IsSucceeded) return response;

cancellationToken.ThrowIfCancellationRequested();
try
{
response = await GetResponseFromDataWarehouseAsync(client, endpoint, response, cancellationToken);
if (response.IsSucceeded) return response;

if (cancellationToken.IsCancellationRequested)
{
if (string.IsNullOrEmpty(response.statement_id)) throw new InvalidOperationException("The statement_id is missing from databricks response");

// Cancel the statement without cancellation token since it is already cancelled
await CancelStatementAsync(client, endpoint, response.statement_id);
cancellationToken.ThrowIfCancellationRequested();
}
}
catch (TaskCanceledException tce)
{
if (!string.IsNullOrEmpty(response?.statement_id)) await CancelStatementAsync(client, endpoint, response.statement_id);
throw new OperationCanceledException("The operation was cancelled", tce, cancellationToken);
}
}
while (response.IsPending || response.IsRunning);

Expand All @@ -88,8 +103,10 @@ private async Task<DatabricksStatementResponse> GetResponseFromDataWarehouseAsyn
{
if (response == null)
{
using var httpResponse = await client.PostAsJsonAsync(endpoint, this, cancellationToken);
response = await httpResponse.Content.ReadFromJsonAsync<DatabricksStatementResponse>(cancellationToken: cancellationToken);
// No cancellation token is used because we want to wait for the result
// With the response we are able to cancel the statement if needed
using var httpResponse = await client.PostAsJsonAsync(endpoint, this);
response = await httpResponse.Content.ReadFromJsonAsync<DatabricksStatementResponse>();
}
else
{
Expand All @@ -107,4 +124,10 @@ private async Task<DatabricksStatementResponse> GetResponseFromDataWarehouseAsyn

return response;
}

private static async Task CancelStatementAsync(HttpClient client, string endpoint, string statementId)
{
var path = $"{endpoint}/{statementId}/cancel";
using var httpResponse = await client.PostAsync(path, new StringContent(string.Empty));
}
}

0 comments on commit 9c9d85c

Please sign in to comment.