From 9c9d85c402be8b59a77d1ab4717f52f9a73e5f12 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B8ren=20S=C3=B8ndergaard?= Date: Thu, 7 Mar 2024 15:52:11 +0100 Subject: [PATCH] Cancel statement if token cancellation is requested (#288) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Change request behaviour We want to get the response back from the cluster as fast as possible. When we have the statement_id from the response, we can cancel the statement if the cancellation token is cancelled. We aim to minimize the time where user-code can't cancel the interaction with databricks cluster. * Enhance comment to why no cancellation token is provided * Use same exception types. If the cancellation token is canceled within a task it raises a TaskCanceledException. Catch this exception and wrap it in an OperationCanceledException * Bump package version * Update release notes * Update source/Databricks/source/SqlStatementExecution/Statement/DatabricksStatementRequest.cs Co-authored-by: Dan Stenrøjl <51327761+dstenroejl@users.noreply.github.com> * Fix cancellation of Databricks statements * Add cancellation token test and helper classes * Update release notes. Using asynchronous mode for communication with job cluster * Refactor method names --------- Co-authored-by: Dan Stenrøjl <51327761+dstenroejl@users.noreply.github.com> --- .../documents/release-notes/release-notes.md | 8 +++ source/Databricks/source/Jobs/Jobs.csproj | 2 +- .../Client/DatabricksStatementsTests.cs | 62 +++++++++++++++++++ .../Statements/AtLeast10SecStatement.cs | 9 ++- .../Fixtures/DatabricksSqlWarehouseFixture.cs | 9 +++ .../SqlStatementExecution.csproj | 2 +- .../Statement/DatabricksStatementRequest.cs | 37 ++++++++--- 7 files changed, 119 insertions(+), 10 deletions(-) diff --git a/source/Databricks/documents/release-notes/release-notes.md b/source/Databricks/documents/release-notes/release-notes.md index a24e737a1..95219aa9f 100644 --- a/source/Databricks/documents/release-notes/release-notes.md +++ b/source/Databricks/documents/release-notes/release-notes.md @@ -1,5 +1,13 @@ # Databricks Release Notes +## Version 9.0.2 + +Cancel statement if token cancellation is requested. + +When the request is canceled the cluster is notified to also cancel execution of that statement. + +With this change the communication with the job cluster is using the [**Asynchronous mode**](https://docs.databricks.com/api/azure/workspace/statementexecution/executestatement#wait_timeout). + ## Version 9.0.1 Added try-catch block to health checks to prevent both logging exceptions and indicate unhealthy state. diff --git a/source/Databricks/source/Jobs/Jobs.csproj b/source/Databricks/source/Jobs/Jobs.csproj index 3a0e20607..299c21443 100644 --- a/source/Databricks/source/Jobs/Jobs.csproj +++ b/source/Databricks/source/Jobs/Jobs.csproj @@ -31,7 +31,7 @@ limitations under the License. Energinet.DataHub.Core.Databricks.Jobs - 9.0.1$(VersionSuffix) + 9.0.2$(VersionSuffix) Databricks Jobs Energinet-DataHub Energinet-DataHub diff --git a/source/Databricks/source/SqlStatementExecution.IntegrationTests/Client/DatabricksStatementsTests.cs b/source/Databricks/source/SqlStatementExecution.IntegrationTests/Client/DatabricksStatementsTests.cs index 305df00ea..518dffba8 100644 --- a/source/Databricks/source/SqlStatementExecution.IntegrationTests/Client/DatabricksStatementsTests.cs +++ b/source/Databricks/source/SqlStatementExecution.IntegrationTests/Client/DatabricksStatementsTests.cs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +using System.Net.Http.Json; +using System.Text.Json.Serialization; using Energinet.DataHub.Core.Databricks.SqlStatementExecution.Formats; using Energinet.DataHub.Core.Databricks.SqlStatementExecution.IntegrationTests.Client.Statements; using Energinet.DataHub.Core.Databricks.SqlStatementExecution.IntegrationTests.Fixtures; @@ -168,9 +170,69 @@ public async Task ExecuteStatementAsync_WhenCancelled_IsCancelledDuringTheInitia task.IsCanceled.Should().BeTrue(); } + [Fact] + public async Task GivenCancellationToken_WhenTokenIsCancelled_ThenClusterJobIsCancelled() + { + var client = _sqlWarehouseFixture.CreateSqlStatementClient(); + var statement = new AtLeast10SecStatement(); + + var cts = new CancellationTokenSource(); + cts.CancelAfter(TimeSpan.FromSeconds(1)); + + await Assert.ThrowsAsync(async () => + { + var result = client.ExecuteStatementAsync(statement, cts.Token); + _ = await result.CountAsync(cts.Token); + }); + + await AssertThatStatementIsCancelledAsync(_sqlWarehouseFixture.CreateHttpClient(), statement.HelperId.ToString()); + } + public static IEnumerable GetFormats() { yield return new object[] { Format.ApacheArrow }; yield return new object[] { Format.JsonArray }; } + + public class QueryHistory + { + [JsonPropertyName("next_page_token")] + public string NextPageToken { get; init; } = string.Empty; + + [JsonPropertyName("has_next_page")] + public bool HasNextPage { get; init; } + + [JsonPropertyName("res")] + public Query[] Queries { get; set; } = Array.Empty(); + } + + public class Query + { + [JsonPropertyName("query_id")] + public string QueryId { get; init; } = string.Empty; + + [JsonPropertyName("status")] + public string Status { get; init; } = string.Empty; + + [JsonPropertyName("query_text")] + public string QueryText { get; init; } = string.Empty; + } + + private static async Task AssertThatStatementIsCancelledAsync(HttpClient client, string statementId) + { + var retriesLeft = 6; + while (retriesLeft-- > 0) + { + await Task.Delay(750); + var response = await client.GetAsync($"api/2.0/sql/history/queries?include_metrics=false"); + var history = await response.Content.ReadFromJsonAsync(); + + var query = history?.Queries.FirstOrDefault(q => q.QueryText.EndsWith(statementId, StringComparison.InvariantCultureIgnoreCase)); + if (query == null) continue; + + if (query.Status.Equals("Canceled", StringComparison.OrdinalIgnoreCase)) return; + } + + Assert.Fail("No cancelled query found in history for statementId: " + statementId); + } } diff --git a/source/Databricks/source/SqlStatementExecution.IntegrationTests/Client/Statements/AtLeast10SecStatement.cs b/source/Databricks/source/SqlStatementExecution.IntegrationTests/Client/Statements/AtLeast10SecStatement.cs index de958aa1a..cae879dee 100644 --- a/source/Databricks/source/SqlStatementExecution.IntegrationTests/Client/Statements/AtLeast10SecStatement.cs +++ b/source/Databricks/source/SqlStatementExecution.IntegrationTests/Client/Statements/AtLeast10SecStatement.cs @@ -19,8 +19,15 @@ namespace Energinet.DataHub.Core.Databricks.SqlStatementExecution.IntegrationTes /// public class AtLeast10SecStatement : DatabricksStatement { + public AtLeast10SecStatement() + { + HelperId = Guid.NewGuid(); + } + + public Guid HelperId { get; private set; } + protected internal override string GetSqlStatement() { - return "SELECT concat_ws('-', M.id, N.id, random()) as ID FROM range(1000000) AS M, range(1000000) AS N"; + return $"SELECT concat_ws('-', M.id, N.id, random()) as ID FROM range(1000000) AS M, range(1000000) AS N; --HelperId:{HelperId}"; } } diff --git a/source/Databricks/source/SqlStatementExecution.IntegrationTests/Fixtures/DatabricksSqlWarehouseFixture.cs b/source/Databricks/source/SqlStatementExecution.IntegrationTests/Fixtures/DatabricksSqlWarehouseFixture.cs index e6ea9ae9a..07935e00d 100644 --- a/source/Databricks/source/SqlStatementExecution.IntegrationTests/Fixtures/DatabricksSqlWarehouseFixture.cs +++ b/source/Databricks/source/SqlStatementExecution.IntegrationTests/Fixtures/DatabricksSqlWarehouseFixture.cs @@ -29,6 +29,15 @@ public DatabricksSqlWarehouseQueryExecutor CreateSqlStatementClient() return serviceProvider.GetRequiredService(); } + public HttpClient CreateHttpClient() + { + var services = CreateServiceCollection(); + var serviceProvider = services.BuildServiceProvider(); + var factory = serviceProvider.GetRequiredService(); + + return factory.CreateClient("Databricks"); + } + private static ServiceCollection CreateServiceCollection() { var integrationTestConfiguration = _lazyConfiguration.Value; diff --git a/source/Databricks/source/SqlStatementExecution/SqlStatementExecution.csproj b/source/Databricks/source/SqlStatementExecution/SqlStatementExecution.csproj index 32865f68d..22c8cf0b2 100644 --- a/source/Databricks/source/SqlStatementExecution/SqlStatementExecution.csproj +++ b/source/Databricks/source/SqlStatementExecution/SqlStatementExecution.csproj @@ -30,7 +30,7 @@ limitations under the License. Energinet.DataHub.Core.Databricks.SqlStatementExecution - 9.0.1$(VersionSuffix) + 9.0.2$(VersionSuffix) Databricks SQL Statement Execution Energinet-DataHub Energinet-DataHub diff --git a/source/Databricks/source/SqlStatementExecution/Statement/DatabricksStatementRequest.cs b/source/Databricks/source/SqlStatementExecution/Statement/DatabricksStatementRequest.cs index fe137c4a1..6a529db30 100644 --- a/source/Databricks/source/SqlStatementExecution/Statement/DatabricksStatementRequest.cs +++ b/source/Databricks/source/SqlStatementExecution/Statement/DatabricksStatementRequest.cs @@ -38,7 +38,7 @@ internal DatabricksStatementRequest(string warehouseId, DatabricksStatement stat Parameters = statement.GetParameters().ToArray(); WarehouseId = warehouseId; Disposition = "EXTERNAL_LINKS"; - WaitTimeout = "30s"; + WaitTimeout = "0s"; Format = format; } @@ -70,10 +70,25 @@ public async ValueTask WaitForSqlWarehouseResultAsy DatabricksStatementResponse? response = null; do { - response = await GetResponseFromDataWarehouseAsync(client, endpoint, response, cancellationToken); - if (response.IsSucceeded) return response; - - cancellationToken.ThrowIfCancellationRequested(); + try + { + response = await GetResponseFromDataWarehouseAsync(client, endpoint, response, cancellationToken); + if (response.IsSucceeded) return response; + + if (cancellationToken.IsCancellationRequested) + { + if (string.IsNullOrEmpty(response.statement_id)) throw new InvalidOperationException("The statement_id is missing from databricks response"); + + // Cancel the statement without cancellation token since it is already cancelled + await CancelStatementAsync(client, endpoint, response.statement_id); + cancellationToken.ThrowIfCancellationRequested(); + } + } + catch (TaskCanceledException tce) + { + if (!string.IsNullOrEmpty(response?.statement_id)) await CancelStatementAsync(client, endpoint, response.statement_id); + throw new OperationCanceledException("The operation was cancelled", tce, cancellationToken); + } } while (response.IsPending || response.IsRunning); @@ -88,8 +103,10 @@ private async Task GetResponseFromDataWarehouseAsyn { if (response == null) { - using var httpResponse = await client.PostAsJsonAsync(endpoint, this, cancellationToken); - response = await httpResponse.Content.ReadFromJsonAsync(cancellationToken: cancellationToken); + // No cancellation token is used because we want to wait for the result + // With the response we are able to cancel the statement if needed + using var httpResponse = await client.PostAsJsonAsync(endpoint, this); + response = await httpResponse.Content.ReadFromJsonAsync(); } else { @@ -107,4 +124,10 @@ private async Task GetResponseFromDataWarehouseAsyn return response; } + + private static async Task CancelStatementAsync(HttpClient client, string endpoint, string statementId) + { + var path = $"{endpoint}/{statementId}/cancel"; + using var httpResponse = await client.PostAsync(path, new StringContent(string.Empty)); + } }