From 3888b97395f34822deaef49a2bb556ed55c780cf Mon Sep 17 00:00:00 2001 From: Parker Mossman Date: Wed, 6 Mar 2024 09:36:21 -0800 Subject: [PATCH] AirbyteApiClient2: Enable retries on 5xx responses (#11524) --- .../src/main/kotlin/AirbyteApiClient2.kt | 20 ++++++++++++++++++- .../acceptance/VersioningAcceptanceTests.java | 2 +- 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/airbyte-api/src/main/kotlin/AirbyteApiClient2.kt b/airbyte-api/src/main/kotlin/AirbyteApiClient2.kt index 9c8e9fcfbd0..ace9f4a9c1e 100644 --- a/airbyte-api/src/main/kotlin/AirbyteApiClient2.kt +++ b/airbyte-api/src/main/kotlin/AirbyteApiClient2.kt @@ -24,6 +24,7 @@ import io.airbyte.api.client2.generated.StateApi import io.airbyte.api.client2.generated.StreamStatusesApi import io.airbyte.api.client2.generated.WorkspaceApi import okhttp3.OkHttpClient +import java.io.IOException /** * This class wraps all the generated API clients and provides a single entry point. This class is meant @@ -55,8 +56,15 @@ class AirbyteApiClient2 constructor( basePath: String, policy: RetryPolicy = RetryPolicy.ofDefaults(), - httpClient: OkHttpClient = OkHttpClient(), + var httpClient: OkHttpClient = OkHttpClient(), + throwOn5xx: Boolean = true, ) { + init { + if (throwOn5xx) { + httpClient = httpClient.newBuilder().addInterceptor(ThrowOn5xxInterceptor()).build() + } + } + val connectionApi = ConnectionApi(basePath = basePath, client = httpClient, policy = policy) val connectorBuilderProjectApi = ConnectorBuilderProjectApi(basePath = basePath, client = httpClient, policy = policy) val deploymentMetadataApi = DeploymentMetadataApi(basePath = basePath, client = httpClient, policy = policy) @@ -76,3 +84,13 @@ class AirbyteApiClient2 val streamStatusesApi = StreamStatusesApi(basePath = basePath, client = httpClient, policy = policy) val secretPersistenceConfigApi = SecretsPersistenceConfigApi(basePath = basePath, client = httpClient, policy = policy) } + +class ThrowOn5xxInterceptor : okhttp3.Interceptor { + override fun intercept(chain: okhttp3.Interceptor.Chain): okhttp3.Response { + val response = chain.proceed(chain.request()) + if (response.code >= 500) { + throw IOException("HTTP error: ${response.code} ${response.message}") + } + return response + } +} diff --git a/airbyte-tests/src/test-acceptance/java/io/airbyte/test/acceptance/VersioningAcceptanceTests.java b/airbyte-tests/src/test-acceptance/java/io/airbyte/test/acceptance/VersioningAcceptanceTests.java index 3df67a6899b..7d0c4da462e 100644 --- a/airbyte-tests/src/test-acceptance/java/io/airbyte/test/acceptance/VersioningAcceptanceTests.java +++ b/airbyte-tests/src/test-acceptance/java/io/airbyte/test/acceptance/VersioningAcceptanceTests.java @@ -44,7 +44,7 @@ static void init() throws IOException, URISyntaxException { .withBackoff(Duration.ofSeconds(1), Duration.ofSeconds(60)).build(); final OkHttpClient client = new OkHttpClient.Builder().readTimeout(Duration.ofSeconds(60)).build(); - apiClient2 = new AirbyteApiClient2(String.format("%s/api", AIRBYTE_SERVER_HOST), policy, client); + apiClient2 = new AirbyteApiClient2(String.format("%s/api", AIRBYTE_SERVER_HOST), policy, client, /* throwOn5xx */ true); workspaceId = apiClient2.getWorkspaceApi().listWorkspaces().getWorkspaces().get(0).getWorkspaceId(); }