Skip to content

Commit

Permalink
updates
Browse files Browse the repository at this point in the history
  • Loading branch information
alovew committed Dec 7, 2022
1 parent 67fb4de commit fcb37b8
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -384,25 +384,31 @@ public JobInfoRead cancelJob(final JobIdRequestBody jobIdRequestBody) throws IOE
return submitCancellationToWorker(jobIdRequestBody.getId());
}

private void discoveredSchemaWithCatalogDiff(final SourceDiscoverSchemaRead discoveredSchema,
final SourceDiscoverSchemaRequestBody discoverSchemaRequestBody)
private void generateCatalogDiffsAndDisableConnectionsIfNeeded(SourceDiscoverSchemaRead discoveredSchema,
SourceDiscoverSchemaRequestBody discoverSchemaRequestBody)
throws JsonValidationException, ConfigNotFoundException, IOException {
final Optional<io.airbyte.api.model.generated.AirbyteCatalog> catalogUsedToMakeConfiguredCatalog = connectionsHandler
.getConnectionAirbyteCatalog(discoverSchemaRequestBody.getConnectionId());
final ConnectionRead connectionRead = connectionsHandler.getConnection(discoverSchemaRequestBody.getConnectionId());
final io.airbyte.api.model.generated.@NotNull AirbyteCatalog currentAirbyteCatalog =
connectionRead.getSyncCatalog();
final CatalogDiff diff =
connectionsHandler.getDiff(catalogUsedToMakeConfiguredCatalog.orElse(currentAirbyteCatalog), discoveredSchema.getCatalog(),
CatalogConverter.toProtocol(currentAirbyteCatalog));
final boolean containsBreakingChange = containsBreakingChange(diff);
final ConnectionUpdate updateObject =
new ConnectionUpdate().breakingChange(containsBreakingChange).connectionId(discoverSchemaRequestBody.getConnectionId());
final ConnectionStatus connectionStatus;
if (shouldDisableConnection(containsBreakingChange, connectionRead.getNonBreakingChangesPreference(), diff)) {
connectionStatus = ConnectionStatus.INACTIVE;
} else {
connectionStatus = ConnectionStatus.ACTIVE;
final ConnectionReadList connectionsForSource = connectionsHandler.listConnectionsForSource(discoverSchemaRequestBody.getSourceId(), false);
for (final ConnectionRead connectionRead : connectionsForSource.getConnections()) {
final Optional<io.airbyte.api.model.generated.AirbyteCatalog> catalogUsedToMakeConfiguredCatalog = connectionsHandler
.getConnectionAirbyteCatalog(connectionRead.getConnectionId());
final io.airbyte.api.model.generated.@NotNull AirbyteCatalog currentAirbyteCatalog =
connectionRead.getSyncCatalog();
CatalogDiff diff = connectionsHandler.getDiff(catalogUsedToMakeConfiguredCatalog.orElse(currentAirbyteCatalog), discoveredSchema.getCatalog(),
CatalogConverter.toProtocol(currentAirbyteCatalog));
boolean containsBreakingChange = containsBreakingChange(diff);
ConnectionUpdate updateObject =
new ConnectionUpdate().breakingChange(containsBreakingChange).connectionId(connectionRead.getConnectionId());
ConnectionStatus connectionStatus;
if (shouldDisableConnection(containsBreakingChange, connectionRead.getNonBreakingChangesPreference(), diff)) {
connectionStatus = ConnectionStatus.INACTIVE;
} else {
connectionStatus = ConnectionStatus.ACTIVE;
}
updateObject.status(connectionStatus);
connectionsHandler.updateConnection(updateObject);
if (connectionRead.getConnectionId().equals(discoverSchemaRequestBody.getConnectionId())) {
discoveredSchema.catalogDiff(diff).breakingChange(containsBreakingChange).connectionStatus(connectionStatus);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import static io.airbyte.metrics.lib.ApmTraceConstants.WORKFLOW_TRACE_OPERATION_NAME;

import datadog.trace.api.Trace;
import io.airbyte.api.client.invoker.generated.ApiException;
import io.airbyte.commons.temporal.scheduling.SyncWorkflow;
import io.airbyte.config.NormalizationInput;
import io.airbyte.config.NormalizationSummary;
Expand All @@ -27,16 +26,13 @@
import io.airbyte.config.StandardSyncSummary.ReplicationStatus;
import io.airbyte.config.SyncStats;
import io.airbyte.config.WebhookOperationSummary;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.metrics.lib.ApmTraceUtils;
import io.airbyte.persistence.job.models.IntegrationLauncherConfig;
import io.airbyte.persistence.job.models.JobRunConfig;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.validation.json.JsonValidationException;
import io.airbyte.workers.temporal.annotations.TemporalActivityStub;
import io.airbyte.workers.temporal.scheduling.activities.ConfigFetchActivity;
import io.temporal.workflow.Workflow;
import java.io.IOException;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import io.airbyte.api.client.AirbyteApiClient;
import io.airbyte.api.client.generated.SourceApi;
import io.airbyte.api.client.invoker.generated.ApiException;
import io.airbyte.api.client.model.generated.SourceDiscoverSchemaRequestBody;
Expand All @@ -25,7 +24,6 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;

@ExtendWith(MockitoExtension.class)
Expand Down
12 changes: 2 additions & 10 deletions docs/reference/api/generated-api-html/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -190,17 +190,9 @@ <h1>Airbyte Configuration API</h1>
<li>The naming convention for endpoints is: localhost:8000/{VERSION}/{METHOD_FAMILY}/{METHOD_NAME} e.g. <code>localhost:8000/v1/connections/create</code>.</li>
<li>For all <code>update</code> methods, the whole object must be passed in, even the fields that did not change.</li>
</ul>
<p>Change Management:</p>
<p>Authentication (OSS):</p>
<ul>
<li>The major version of the API endpoint can be determined / specified in the URL <code>localhost:8080/v1/connections/create</code></li>
<li>Minor version bumps will be invisible to the end user. The user cannot specify minor versions in requests.</li>
<li>All backwards incompatible changes will happen in major version bumps. We will not make backwards incompatible changes in minor version bumps. Examples of non-breaking changes (includes but not limited to...):
<ul>
<li>Adding fields to request or response bodies.</li>
<li>Adding new HTTP endpoints.</li>
</ul>
</li>
<li>All <code>web_backend</code> APIs are not considered public APIs and are not guaranteeing backwards compatibility.</li>
<li>When authenticating to the Configuration API, you must use Basic Authentication by setting the Authentication Header to Basic and base64 encoding the username and password (which are <code>airbyte</code> and <code>password</code> by default - so base64 encoding <code>airbyte:password</code> results in <code>YWlyYnl0ZTpwYXNzd29yZA==</code>). So the full header reads <code>'Authorization': &quot;Basic YWlyYnl0ZTpwYXNzd29yZA==&quot;</code></li>
</ul>
</div>
<div class="app-desc">More information: <a href="https://openapi-generator.tech">https://openapi-generator.tech</a></div>
Expand Down

0 comments on commit fcb37b8

Please sign in to comment.