-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Disable all broken connections when source is refreshed #20208
Changes from 21 commits
26bc09d
ac4c0a4
9ee3347
8c2901e
5aad0e9
12d8d90
4a63b49
7b7724d
6d46a4d
8703218
cd57cd5
e724939
c5019d7
c4f2c6d
d655bfa
5e8a5fe
ca2cda3
8432701
9c9e01d
83567c9
3b52950
46e876c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,6 +16,7 @@ | |
import io.airbyte.api.model.generated.CheckConnectionRead.StatusEnum; | ||
import io.airbyte.api.model.generated.ConnectionIdRequestBody; | ||
import io.airbyte.api.model.generated.ConnectionRead; | ||
import io.airbyte.api.model.generated.ConnectionReadList; | ||
import io.airbyte.api.model.generated.ConnectionStatus; | ||
import io.airbyte.api.model.generated.ConnectionUpdate; | ||
import io.airbyte.api.model.generated.DestinationCoreConfig; | ||
|
@@ -264,7 +265,8 @@ public SourceDiscoverSchemaRead discoverSchemaForSourceFromSourceId(final Source | |
final SourceDiscoverSchemaRead discoveredSchema = retrieveDiscoveredSchema(persistedCatalogId); | ||
|
||
if (discoverSchemaRequestBody.getConnectionId() != null) { | ||
discoveredSchemaWithCatalogDiff(discoveredSchema, discoverSchemaRequestBody); | ||
// modify discoveredSchema object to add CatalogDiff, containsBreakingChange, and connectionStatus | ||
generateCatalogDiffsAndDisableConnectionsIfNeeded(discoveredSchema, discoverSchemaRequestBody); | ||
} | ||
|
||
return discoveredSchema; | ||
|
@@ -383,30 +385,32 @@ public JobInfoRead cancelJob(final JobIdRequestBody jobIdRequestBody) throws IOE | |
return submitCancellationToWorker(jobIdRequestBody.getId()); | ||
} | ||
|
||
private void discoveredSchemaWithCatalogDiff(final SourceDiscoverSchemaRead discoveredSchema, | ||
final SourceDiscoverSchemaRequestBody discoverSchemaRequestBody) | ||
private void generateCatalogDiffsAndDisableConnectionsIfNeeded(SourceDiscoverSchemaRead discoveredSchema, | ||
SourceDiscoverSchemaRequestBody discoverSchemaRequestBody) | ||
throws JsonValidationException, ConfigNotFoundException, IOException { | ||
final Optional<io.airbyte.api.model.generated.AirbyteCatalog> catalogUsedToMakeConfiguredCatalog = connectionsHandler | ||
.getConnectionAirbyteCatalog(discoverSchemaRequestBody.getConnectionId()); | ||
final ConnectionRead connectionRead = connectionsHandler.getConnection(discoverSchemaRequestBody.getConnectionId()); | ||
final io.airbyte.api.model.generated.@NotNull AirbyteCatalog currentAirbyteCatalog = | ||
connectionRead.getSyncCatalog(); | ||
final CatalogDiff diff = | ||
connectionsHandler.getDiff(catalogUsedToMakeConfiguredCatalog.orElse(currentAirbyteCatalog), discoveredSchema.getCatalog(), | ||
CatalogConverter.toProtocol(currentAirbyteCatalog)); | ||
final boolean containsBreakingChange = containsBreakingChange(diff); | ||
final ConnectionUpdate updateObject = | ||
new ConnectionUpdate().breakingChange(containsBreakingChange).connectionId(discoverSchemaRequestBody.getConnectionId()); | ||
final ConnectionStatus connectionStatus; | ||
if (shouldDisableConnection(containsBreakingChange, connectionRead.getNonBreakingChangesPreference(), diff)) { | ||
connectionStatus = ConnectionStatus.INACTIVE; | ||
} else { | ||
connectionStatus = connectionRead.getStatus(); | ||
final ConnectionReadList connectionsForSource = connectionsHandler.listConnectionsForSource(discoverSchemaRequestBody.getSourceId(), false); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would advocate for avoiding having handlers depend on other handlers. At the handler level, I am currently leaning towards using our DB abstraction directly until we introduce a better layer. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. oh interesting, you mean use configPersistence directly instead of going through the connection handler? I may have misunderstood how we wanted to migrate this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes and I don't think we agreed on a strategy here. |
||
for (final ConnectionRead connectionRead : connectionsForSource.getConnections()) { | ||
final Optional<io.airbyte.api.model.generated.AirbyteCatalog> catalogUsedToMakeConfiguredCatalog = connectionsHandler | ||
.getConnectionAirbyteCatalog(connectionRead.getConnectionId()); | ||
final io.airbyte.api.model.generated.@NotNull AirbyteCatalog currentAirbyteCatalog = | ||
connectionRead.getSyncCatalog(); | ||
CatalogDiff diff = connectionsHandler.getDiff(catalogUsedToMakeConfiguredCatalog.orElse(currentAirbyteCatalog), discoveredSchema.getCatalog(), | ||
CatalogConverter.toProtocol(currentAirbyteCatalog)); | ||
boolean containsBreakingChange = containsBreakingChange(diff); | ||
ConnectionUpdate updateObject = | ||
new ConnectionUpdate().breakingChange(containsBreakingChange).connectionId(connectionRead.getConnectionId()); | ||
ConnectionStatus connectionStatus; | ||
if (shouldDisableConnection(containsBreakingChange, connectionRead.getNonBreakingChangesPreference(), diff)) { | ||
connectionStatus = ConnectionStatus.INACTIVE; | ||
} else { | ||
connectionStatus = connectionRead.getStatus(); | ||
} | ||
updateObject.status(connectionStatus); | ||
connectionsHandler.updateConnection(updateObject); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This feels very broad, from this function, I expect we only modify syncCatalog and the breaking change flag, but this function will actually override the whole object. Should we introduce a more scoped update here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is actually a patch update even though the object is named ConnectionUpdate, so it is only modifying the breaking change field and status There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, my point is that if we add other fields to the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you mean adding other fields within this method here? we would only be updating other fields here if we explicitly stated them on the ConnectionUpdate object, but I think that would be expected |
||
if (connectionRead.getConnectionId().equals(discoverSchemaRequestBody.getConnectionId())) { | ||
discoveredSchema.catalogDiff(diff).breakingChange(containsBreakingChange).connectionStatus(connectionStatus); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we add some description to the function to highlight that we are modifying the input? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. just added a comment where we're calling the function - do you think that works? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd say it should be part of the documentation of this function. |
||
} | ||
} | ||
updateObject.status(connectionStatus); | ||
connectionsHandler.updateConnection(updateObject); | ||
discoveredSchema.catalogDiff(diff).breakingChange(containsBreakingChange).connectionStatus(connectionStatus); | ||
|
||
} | ||
|
||
private boolean shouldDisableConnection(final boolean containsBreakingChange, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just put up a PR with a more complete version of this: https://github.com/airbytehq/airbyte/pull/20264/files#diff-2d29b293778e290157be96e22294c2df1fdfa7c72a48961d03396776446deeddR852
However, my PR might take longer to merge due to breaking UI changes.