Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Disable all broken connections when source is refreshed #20208

Merged
merged 22 commits into from
Dec 9, 2022
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -865,6 +865,20 @@ public List<StandardSync> listWorkspaceStandardSyncs(final UUID workspaceId, fin
return getStandardSyncsFromResult(connectionAndOperationIdsResult);
}

public List<StandardSync> listConnectionsBySource(final UUID sourceId, final boolean includeDeleted) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just put up a PR with a more complete version of this: https://github.com/airbytehq/airbyte/pull/20264/files#diff-2d29b293778e290157be96e22294c2df1fdfa7c72a48961d03396776446deeddR852
However, my PR might take longer to merge due to breaking UI changes.

final Result<Record> connectionAndOperationIdsResult = database.query(ctx -> ctx
.select(
CONNECTION.asterisk(),
groupConcat(CONNECTION_OPERATION.OPERATION_ID).separator(OPERATION_IDS_AGG_DELIMITER).as(OPERATION_IDS_AGG_FIELD))
.from(CONNECTION)
.leftJoin(CONNECTION_OPERATION).on(CONNECTION_OPERATION.CONNECTION_ID.eq(CONNECTION.ID))
.where(CONNECTION.SOURCE_ID.eq(sourceId)
.and(includeDeleted ? noCondition() : CONNECTION.STATUS.notEqual(StatusType.deprecated)))
.groupBy(CONNECTION.ID)).fetch();

return getStandardSyncsFromResult(connectionAndOperationIdsResult);
}

private List<StandardSync> getStandardSyncsFromResult(final Result<Record> connectionAndOperationIdsResult) {
final List<StandardSync> standardSyncs = new ArrayList<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,14 @@ public ConnectionReadList listConnectionsForWorkspace(final WorkspaceIdRequestBo
return new ConnectionReadList().connections(connectionReads);
}

public ConnectionReadList listConnectionsForSource(final UUID sourceId, final boolean includeDeleted) throws IOException {
final List<ConnectionRead> connectionReads = Lists.newArrayList();
for (final StandardSync standardSync : configRepository.listConnectionsBySource(sourceId, includeDeleted)) {
connectionReads.add(ApiPojoConverters.internalToConnectionRead(standardSync));
}
return new ConnectionReadList().connections(connectionReads);
}

public ConnectionReadList listConnections() throws JsonValidationException, ConfigNotFoundException, IOException {
final List<ConnectionRead> connectionReads = Lists.newArrayList();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.airbyte.api.model.generated.CheckConnectionRead.StatusEnum;
import io.airbyte.api.model.generated.ConnectionIdRequestBody;
import io.airbyte.api.model.generated.ConnectionRead;
import io.airbyte.api.model.generated.ConnectionReadList;
import io.airbyte.api.model.generated.ConnectionStatus;
import io.airbyte.api.model.generated.ConnectionUpdate;
import io.airbyte.api.model.generated.DestinationCoreConfig;
Expand Down Expand Up @@ -264,7 +265,8 @@ public SourceDiscoverSchemaRead discoverSchemaForSourceFromSourceId(final Source
final SourceDiscoverSchemaRead discoveredSchema = retrieveDiscoveredSchema(persistedCatalogId);

if (discoverSchemaRequestBody.getConnectionId() != null) {
discoveredSchemaWithCatalogDiff(discoveredSchema, discoverSchemaRequestBody);
// modify discoveredSchema object to add CatalogDiff, containsBreakingChange, and connectionStatus
generateCatalogDiffsAndDisableConnectionsIfNeeded(discoveredSchema, discoverSchemaRequestBody);
}

return discoveredSchema;
Expand Down Expand Up @@ -383,30 +385,32 @@ public JobInfoRead cancelJob(final JobIdRequestBody jobIdRequestBody) throws IOE
return submitCancellationToWorker(jobIdRequestBody.getId());
}

private void discoveredSchemaWithCatalogDiff(final SourceDiscoverSchemaRead discoveredSchema,
final SourceDiscoverSchemaRequestBody discoverSchemaRequestBody)
private void generateCatalogDiffsAndDisableConnectionsIfNeeded(SourceDiscoverSchemaRead discoveredSchema,
SourceDiscoverSchemaRequestBody discoverSchemaRequestBody)
throws JsonValidationException, ConfigNotFoundException, IOException {
final Optional<io.airbyte.api.model.generated.AirbyteCatalog> catalogUsedToMakeConfiguredCatalog = connectionsHandler
.getConnectionAirbyteCatalog(discoverSchemaRequestBody.getConnectionId());
final ConnectionRead connectionRead = connectionsHandler.getConnection(discoverSchemaRequestBody.getConnectionId());
final io.airbyte.api.model.generated.@NotNull AirbyteCatalog currentAirbyteCatalog =
connectionRead.getSyncCatalog();
final CatalogDiff diff =
connectionsHandler.getDiff(catalogUsedToMakeConfiguredCatalog.orElse(currentAirbyteCatalog), discoveredSchema.getCatalog(),
CatalogConverter.toProtocol(currentAirbyteCatalog));
final boolean containsBreakingChange = containsBreakingChange(diff);
final ConnectionUpdate updateObject =
new ConnectionUpdate().breakingChange(containsBreakingChange).connectionId(discoverSchemaRequestBody.getConnectionId());
final ConnectionStatus connectionStatus;
if (shouldDisableConnection(containsBreakingChange, connectionRead.getNonBreakingChangesPreference(), diff)) {
connectionStatus = ConnectionStatus.INACTIVE;
} else {
connectionStatus = connectionRead.getStatus();
final ConnectionReadList connectionsForSource = connectionsHandler.listConnectionsForSource(discoverSchemaRequestBody.getSourceId(), false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would advocate for avoiding having handlers depend on other handlers. At the handler level, I am currently leaning towards using our DB abstraction directly until we introduce a better layer.
It might be worth checking with the team to agree on a direction moving forward here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh interesting, you mean use configPersistence directly instead of going through the connection handler? I may have misunderstood how we wanted to migrate this

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes and I don't think we agreed on a strategy here.

for (final ConnectionRead connectionRead : connectionsForSource.getConnections()) {
final Optional<io.airbyte.api.model.generated.AirbyteCatalog> catalogUsedToMakeConfiguredCatalog = connectionsHandler
.getConnectionAirbyteCatalog(connectionRead.getConnectionId());
final io.airbyte.api.model.generated.@NotNull AirbyteCatalog currentAirbyteCatalog =
connectionRead.getSyncCatalog();
CatalogDiff diff = connectionsHandler.getDiff(catalogUsedToMakeConfiguredCatalog.orElse(currentAirbyteCatalog), discoveredSchema.getCatalog(),
CatalogConverter.toProtocol(currentAirbyteCatalog));
boolean containsBreakingChange = containsBreakingChange(diff);
ConnectionUpdate updateObject =
new ConnectionUpdate().breakingChange(containsBreakingChange).connectionId(connectionRead.getConnectionId());
ConnectionStatus connectionStatus;
if (shouldDisableConnection(containsBreakingChange, connectionRead.getNonBreakingChangesPreference(), diff)) {
connectionStatus = ConnectionStatus.INACTIVE;
} else {
connectionStatus = connectionRead.getStatus();
}
updateObject.status(connectionStatus);
connectionsHandler.updateConnection(updateObject);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels very broad, from this function, I expect we only modify syncCatalog and the breaking change flag, but this function will actually override the whole object. Should we introduce a more scoped update here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is actually a patch update even though the object is named ConnectionUpdate, so it is only modifying the breaking change field and status

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, my point is that if we add other fields to the updateObject we could end up overriding fields by mistake.
I don't think it is a big issue currently, but this could lead to some surprises.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you mean adding other fields within this method here? we would only be updating other fields here if we explicitly stated them on the ConnectionUpdate object, but I think that would be expected

if (connectionRead.getConnectionId().equals(discoverSchemaRequestBody.getConnectionId())) {
discoveredSchema.catalogDiff(diff).breakingChange(containsBreakingChange).connectionStatus(connectionStatus);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add some description to the function to highlight that we are modifying the input?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just added a comment where we're calling the function - do you think that works?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd say it should be part of the documentation of this function.

}
}
updateObject.status(connectionStatus);
connectionsHandler.updateConnection(updateObject);
discoveredSchema.catalogDiff(diff).breakingChange(containsBreakingChange).connectionStatus(connectionStatus);

}

private boolean shouldDisableConnection(final boolean containsBreakingChange,
Expand Down
Loading