Skip to content

Commit

Permalink
SqlStatementExecution with exponential backoff (#346)
Browse files Browse the repository at this point in the history
Introduces an exponential backoff strategy for retrieving response from Databricks SQL Statement Execution API
  • Loading branch information
HenrikSommer authored Sep 10, 2024
1 parent 081d6e8 commit 64627d4
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 11 deletions.
4 changes: 4 additions & 0 deletions source/Databricks/documents/release-notes/release-notes.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Databricks Release Notes

## Version 11.2.1

- Introduce an exponential backoff strategy for retrieving statement query response from the Databricks SQL Statement Execution API.

## Version 11.2.0

- Add support for downloading chunks in parallel from Databricks.
Expand Down
2 changes: 1 addition & 1 deletion source/Databricks/source/Jobs/Jobs.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ limitations under the License.

<PropertyGroup>
<PackageId>Energinet.DataHub.Core.Databricks.Jobs</PackageId>
<PackageVersion>11.2.0$(VersionSuffix)</PackageVersion>
<PackageVersion>11.2.1$(VersionSuffix)</PackageVersion>
<Title>Databricks Jobs</Title>
<Company>Energinet-DataHub</Company>
<Authors>Energinet-DataHub</Authors>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright 2020 Energinet DataHub A/S
//
// Licensed under the Apache License, Version 2.0 (the "License2");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

namespace Energinet.DataHub.Core.Databricks.SqlStatementExecution.UnitTests;

public class FibonacciTests
{
[Fact]
public void GetNextNumber_WhenCalled_ReturnsNextNumberInSequence()
{
// Arrange
var fibonacci = new Fibonacci();
var result = 0;

// Act
for (int i = 0; i < 5; i++)
{
result = fibonacci.GetNextNumber();
}

// Assert
Assert.Equal(8, result);
}
}
32 changes: 32 additions & 0 deletions source/Databricks/source/SqlStatementExecution/Fibonacci.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright 2020 Energinet DataHub A/S
//
// Licensed under the Apache License, Version 2.0 (the "License2");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

namespace Energinet.DataHub.Core.Databricks.SqlStatementExecution;

internal sealed class Fibonacci
{
private int _firstNumber;
private int _secondNumber = 1;

/// <summary>
/// Get the next number in the Fibonacci sequence.
/// </summary>
public int GetNextNumber()
{
var result = _firstNumber + _secondNumber;
_firstNumber = _secondNumber;
_secondNumber = result;
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ limitations under the License.

<PropertyGroup>
<PackageId>Energinet.DataHub.Core.Databricks.SqlStatementExecution</PackageId>
<PackageVersion>11.2.0$(VersionSuffix)</PackageVersion>
<PackageVersion>11.2.1$(VersionSuffix)</PackageVersion>
<Title>Databricks SQL Statement Execution</Title>
<Company>Energinet-DataHub</Company>
<Authors>Energinet-DataHub</Authors>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ internal class DatabricksStatementRequest
{
internal const string JsonFormat = "JSON_ARRAY";
internal const string ArrowFormat = "ARROW_STREAM";
private const int MaxWaitTimeForLoopInMilliseconds = 10000;

internal DatabricksStatementRequest(string warehouseId, DatabricksStatement statement, string format)
{
Expand Down Expand Up @@ -58,16 +59,18 @@ internal DatabricksStatementRequest(string warehouseId, DatabricksStatement stat
public string WaitTimeout { get; init; }

[JsonPropertyName("format")]
public string Format { get; set; }
public string Format { get; init; }

public async ValueTask<DatabricksStatementResponse> WaitForSqlWarehouseResultAsync(HttpClient client, string endpoint, CancellationToken cancellationToken)
{
DatabricksStatementResponse? response = null;
var fibonacci = new Fibonacci();
do
{
try
{
response = await GetResponseFromDataWarehouseAsync(client, endpoint, response, cancellationToken).ConfigureAwait(false);
var delayInMilliseconds = Math.Min(fibonacci.GetNextNumber() * 10, MaxWaitTimeForLoopInMilliseconds);
response = await GetResponseFromDataWarehouseAsync(client, endpoint, response, delayInMilliseconds, cancellationToken).ConfigureAwait(false);
if (response.IsSucceeded)
return response;

Expand Down Expand Up @@ -97,30 +100,30 @@ private async Task<DatabricksStatementResponse> GetResponseFromDataWarehouseAsyn
HttpClient client,
string endpoint,
DatabricksStatementResponse? response,
int delayInMilliseconds,
CancellationToken cancellationToken)
{
if (response == null)
{
// No cancellation token is used because we want to wait for the result
// With the response we are able to cancel the statement if needed
// ReSharper disable MethodSupportsCancellation
#pragma warning disable CA2016
using var httpResponse = await client.PostAsJsonAsync(endpoint, this).ConfigureAwait(false);
response = await httpResponse.Content.ReadFromJsonAsync<DatabricksStatementResponse>().ConfigureAwait(false);
#pragma warning restore CA2016
// ReSharper restore MethodSupportsCancellation
}
else
{
await Task.Delay(TimeSpan.FromSeconds(2), cancellationToken).ConfigureAwait(false);
await Task.Delay(TimeSpan.FromMilliseconds(delayInMilliseconds), cancellationToken).ConfigureAwait(false);

var path = $"{endpoint}/{response.statement_id}";
using var httpResponse = await client.GetAsync(path, cancellationToken).ConfigureAwait(false);
response = await httpResponse.Content.ReadFromJsonAsync<DatabricksStatementResponse>(cancellationToken: cancellationToken).ConfigureAwait(false);
}

if (response == null)
{
throw new DatabricksException(this);
}

return response;
return response ?? throw new DatabricksException(this);
}

private static async Task CancelStatementAsync(HttpClient client, string endpoint, string statementId)
Expand Down

0 comments on commit 64627d4

Please sign in to comment.