Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

destination-redshift: add option for drop cascade #38189

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc
dockerImageTag: 2.6.1
dockerImageTag: 2.6.2
dockerRepository: airbyte/destination-redshift
documentationUrl: https://docs.airbyte.com/integrations/destinations/redshift
githubIssueLabel: destination-redshift
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public static JsonNode getJdbcConfig(final JsonNode redshiftConfig) {

@Override
protected JdbcSqlGenerator getSqlGenerator(final JsonNode config) {
return new RedshiftSqlGenerator(super.getNamingResolver());
return new RedshiftSqlGenerator(super.getNamingResolver(), config);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ public JsonNode toJdbcConfig(final JsonNode config) {

@Override
protected JdbcSqlGenerator getSqlGenerator(final JsonNode config) {
return new RedshiftSqlGenerator(getNamingResolver());
return new RedshiftSqlGenerator(getNamingResolver(), config);
}

@Override
Expand Down Expand Up @@ -229,7 +229,8 @@ public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonN
stream.getStream().setNamespace(defaultNamespace);
}
}
final RedshiftSqlGenerator sqlGenerator = new RedshiftSqlGenerator(getNamingResolver());

final RedshiftSqlGenerator sqlGenerator = new RedshiftSqlGenerator(getNamingResolver(), config);
final ParsedCatalog parsedCatalog;
final TyperDeduper typerDeduper;
final JdbcDatabase database = getDatabase(getDataSource(config));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,6 @@ private RedshiftDestinationConstants() {}

public static final DataType<String> SUPER_TYPE = new DefaultDataType<>(null, String.class, "super");

public static final String DROP_CASCADE_OPTION = "drop_cascade";

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.cdk.db.jdbc.JdbcDatabase;
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler;
import io.airbyte.commons.exceptions.ConfigErrorException;
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteProtocolType;
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType;
import io.airbyte.integrations.base.destination.typing_deduping.Array;
Expand Down Expand Up @@ -49,6 +50,12 @@ public void execute(final Sql sql) throws Exception {
getJdbcDatabase().executeWithinTransaction(modifiedStatements);
} catch (final SQLException e) {
log.error("Sql {}-{} failed", queryId, transactionId, e);
// This is a big hammer for something that should be much more targetted, only when executing the
// DROP TABLE command.
if (e.getMessage().contains("ERROR: cannot drop table") && e.getMessage().contains("because other objects depend on it")) {
throw new ConfigErrorException(
"Failed to drop table without the CASCADE option. Consider changing the drop_cascade configuration parameter", e);
}
Comment on lines +55 to +58
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is exactly the type of 'translation' we'll want to move into a more formal 'usability layer' error handling class soon

throw e;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import static org.jooq.impl.DSL.rowNumber;
import static org.jooq.impl.DSL.val;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.cdk.integrations.destination.NamingConventionTransformer;
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcSqlGenerator;
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteProtocolType;
Expand Down Expand Up @@ -49,8 +50,20 @@ public class RedshiftSqlGenerator extends JdbcSqlGenerator {

private static final String AIRBYTE_META_COLUMN_CHANGES_KEY = "changes";

public RedshiftSqlGenerator(final NamingConventionTransformer namingTransformer) {
super(namingTransformer);
private final boolean dropCascade;

private static boolean isDropCascade(JsonNode config) {
final JsonNode dropCascadeNode = config.get(RedshiftDestinationConstants.DROP_CASCADE_OPTION);
return dropCascadeNode != null && dropCascadeNode.asBoolean();
}

public RedshiftSqlGenerator(final NamingConventionTransformer namingTransformer, JsonNode config) {
this(namingTransformer, isDropCascade(config));
}

public RedshiftSqlGenerator(final NamingConventionTransformer namingTransformer, boolean dropCascade) {
super(namingTransformer, dropCascade);
this.dropCascade = dropCascade;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,13 @@
"title": "Disable Final Tables. (WARNING! Unstable option; Columns in raw table schema might change between versions)",
"order": 11,
"group": "connection"
},
"drop_cascade": {
"type": "boolean",
"default": false,
"description": "Drop tables with CASCADE. WARNING! This will delete all data in all dependent objects (views, etc.). Use with caution. This option is intended for usecases which can easily rebuild the dependent objects.",
"title": "Drop tables with CASCADE. (WARNING! Risk of unrecoverable data loss)",
"order": 12
}
},
"groups": [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ protected JdbcCompatibleSourceOperations<?> getSourceOperations() {

@Override
protected SqlGenerator getSqlGenerator() {
return new RedshiftSqlGenerator(new RedshiftSQLNameTransformer()) {
return new RedshiftSqlGenerator(new RedshiftSQLNameTransformer(), false) {

// Override only for tests to print formatted SQL. The actual implementation should use unformatted
// to save bytes.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,13 @@

import static io.airbyte.cdk.db.jdbc.DateTimeConverter.putJavaSQLTime;
import static io.airbyte.integrations.destination.redshift.operations.RedshiftSqlOperations.escapeStringLiteral;
import static org.jooq.impl.DSL.createView;
import static org.jooq.impl.DSL.quotedName;
import static org.jooq.impl.DSL.select;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrowsExactly;
import static org.junit.jupiter.api.Assertions.assertTrue;

import com.fasterxml.jackson.databind.JsonNode;
Expand All @@ -19,6 +24,7 @@
import io.airbyte.cdk.db.jdbc.JdbcUtils;
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcSqlGenerator;
import io.airbyte.cdk.integrations.standardtest.destination.typing_deduping.JdbcSqlGeneratorIntegrationTest;
import io.airbyte.commons.exceptions.ConfigErrorException;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler;
import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialStatus;
Expand All @@ -39,6 +45,7 @@
import org.jooq.DataType;
import org.jooq.Field;
import org.jooq.SQLDialect;
import org.jooq.conf.ParamType;
import org.jooq.conf.Settings;
import org.jooq.impl.DSL;
import org.jooq.impl.DefaultDataType;
Expand Down Expand Up @@ -143,7 +150,7 @@ public static void teardownRedshift() throws Exception {

@Override
protected JdbcSqlGenerator getSqlGenerator() {
return new RedshiftSqlGenerator(new RedshiftSQLNameTransformer()) {
return new RedshiftSqlGenerator(new RedshiftSQLNameTransformer(), false) {

// Override only for tests to print formatted SQL. The actual implementation should use unformatted
// to save bytes.
Expand Down Expand Up @@ -193,4 +200,41 @@ public void testCreateTableIncremental() throws Exception {
// TODO assert on table clustering, etc.
}

/**
* Verify that we correctly DROP...CASCADE the final table when cascadeDrop is enabled.
*/
@Test
public void testCascadeDropEnabled() throws Exception {
// Explicitly create a sqlgenerator with cascadeDrop=true
final RedshiftSqlGenerator generator = new RedshiftSqlGenerator(new RedshiftSQLNameTransformer(), true);
// Create a table, then create a view referencing it
getDestinationHandler().execute(generator.createTable(getIncrementalAppendStream(), "", false));
database.execute(createView(quotedName(getIncrementalAppendStream().getId().getFinalNamespace(), "example_view"))
.as(select().from(quotedName(getIncrementalAppendStream().getId().getFinalNamespace(), getIncrementalAppendStream().getId().getFinalName())))
.getSQL(ParamType.INLINED));
// Create a "soft reset" table
getDestinationHandler().execute(generator.createTable(getIncrementalDedupStream(), "_soft_reset", false));

// Overwriting the first table with the second table should succeed.
assertDoesNotThrow(() -> getDestinationHandler().execute(generator.overwriteFinalTable(getIncrementalDedupStream().getId(), "_soft_reset")));
}

@Test
public void testCascadeDropDisabled() throws Exception {
// Explicitly create a sqlgenerator with cascadeDrop=false
final RedshiftSqlGenerator generator = new RedshiftSqlGenerator(new RedshiftSQLNameTransformer(), false);
// Create a table, then create a view referencing it
getDestinationHandler().execute(generator.createTable(getIncrementalAppendStream(), "", false));
database.execute(createView(quotedName(getIncrementalAppendStream().getId().getFinalNamespace(), "example_view"))
.as(select().from(quotedName(getIncrementalAppendStream().getId().getFinalNamespace(), getIncrementalAppendStream().getId().getFinalName())))
.getSQL(ParamType.INLINED));
// Create a "soft reset" table
getDestinationHandler().execute(generator.createTable(getIncrementalDedupStream(), "_soft_reset", false));

// Overwriting the first table with the second table should fal with a configurationError.
Throwable t = assertThrowsExactly(ConfigErrorException.class,
() -> getDestinationHandler().execute(generator.overwriteFinalTable(getIncrementalDedupStream().getId(), "_soft_reset")));
assertTrue(t.getMessage().equals("Failed to drop table without the CASCADE option. Consider changing the drop_cascade configuration parameter"));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class RedshiftSqlGeneratorTest {

private static final Random RANDOM = new Random();

private static final RedshiftSqlGenerator redshiftSqlGenerator = new RedshiftSqlGenerator(new RedshiftSQLNameTransformer()) {
private static final RedshiftSqlGenerator redshiftSqlGenerator = new RedshiftSqlGenerator(new RedshiftSQLNameTransformer(), false) {

// Override only for tests to print formatted SQL. The actual implementation should use unformatted
// to save bytes.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
public class RedshiftSuperLimitationTransformerTest {

private RedshiftSuperLimitationTransformer transformer;
private static final RedshiftSqlGenerator redshiftSqlGenerator = new RedshiftSqlGenerator(new RedshiftSQLNameTransformer());
private static final RedshiftSqlGenerator redshiftSqlGenerator = new RedshiftSqlGenerator(new RedshiftSQLNameTransformer(), false);

@BeforeEach
public void setup() {
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/destinations/redshift.md
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ Each stream will be output into its own raw table in Redshift. Each table will c

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 2.6.2 | 2024-05-14 | [38189](https://github.com/airbytehq/airbyte/pull/38189) | adding an option to DROP CASCADE on resets |
| 2.6.1 | 2024-05-13 | [\#38126](https://github.com/airbytehq/airbyte/pull/38126) | Adapt to signature changes in `StreamConfig` |
| 2.6.0 | 2024-05-08 | [\#37713](https://github.com/airbytehq/airbyte/pull/37713) | Remove option for incremental typing and deduping |
| 2.5.0 | 2024-05-06 | [\#34613](https://github.com/airbytehq/airbyte/pull/34613) | Upgrade Redshift driver to work with Cluster patch 181; Adapt to CDK 0.33.0; Minor signature changes |
Expand Down
Loading