-
Notifications
You must be signed in to change notification settings - Fork 348
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Simplify use of transactions when performing overwrites and when creating new tables #157
Conversation
@@ -48,6 +47,8 @@ import org.apache.spark.sql.types._ | |||
* non-empty. After the write operation completes, we use this to construct a list of non-empty | |||
* Avro partition files. | |||
* | |||
* - Using JDBC, start a new tra |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO: this entire block comment needs to be updated.
I'd love feedback on whether the tests here are adequate; my gut says that we need a test which counts the number of times that |
} | ||
log.info(s"Loading new Redshift data to: $table") | ||
doRedshiftLoad(conn, data, params, creds, manifestUrl) | ||
conn.commit() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this need to handle InterruptedException? i.e. can this block for a really long time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It has been my experience that virtually any Redshift command, COMMIT
included, can block for several minutes under certain circumstances. I think the most likely cause is that the cluster has WLM parameters configured to put the connected client on a limited pool of some type, such that all commands will be queued when all slots are taken by other queries.
I have a feeling that means though, if you plan to send a ROLLBACK
command in response to the interruption, that will also block for many minutes...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure that it's safe for us to wrap this in order to catch InterruptedException since I don't think that it's safe to call .rollback()
while the other thread is in the middle of executing commit()
. Therefore, I'm going to leave this as-is for now and will revisit later if this turns out to be a problem in practice.
Current coverage is
|
Ping @jaley, do you have any cycles to help review this? I think that you might be a good person to look at this because you have a lot of the context RE: the original implementation of this functionality. |
Hey @JoshRosen , Sure. Sorry for being slow to notice this - been playing catch up after a long vacation. I'll take a look in the next day or so if that's cool with you? |
Sure, that's fine. |
} catch { | ||
case NonFatal(e) => | ||
try { | ||
conn.rollback() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might a log.info or log.warn be good here, to inform anyone testing their application code that the load failed and what they're now waiting for is a rollback to finish?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
Do we currently run the integration test with just the Redshift JDBC driver? Now that we're using the driver to manage transactions, I wonder if we might start to see different behaviour between the Redshift and Postgres drivers? Amazon's recommendation is to use their official driver, but I think we've previously been lucky and users could safely make use of either for the small set of commands we rely on. This change effectively exercises different code paths in the driver, so it might be worth making sure that if anything no-longer works about the Postgres driver, we're at least able to communicate that clearly to users and explicitly state that it's not supported any more. (Hopefully it's still fine anyway!) |
The change looks good to me. I wonder if perhaps the I also wonder if it really communicates the fact that if you set this to false, you save disk space but you run the risk of losing your existing data? If not, that was always the case, this change just caused me to think about it some more! |
} | ||
mockRedshift.verifyThatConnectionsWereClosed() | ||
mockRedshift.verifyThatCommitWasNotCalled() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this also verify that rollback was called?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
@JoshRosen what your plan for merging this PR? I've now observed a number of situations where Spark thinks a COPY operation has failed (due to a connection failure) yet it succeeds in Redshift leading to duplicated data when using |
Current coverage is 89.42%@@ master #157 diff @@
==========================================
Files 13 13
Lines 685 681 -4
Methods 604 596 -8
Messages 0 0
Branches 81 85 +4
==========================================
- Hits 616 609 -7
- Misses 69 72 +3
Partials 0 0
|
I think that we should just deprecate this parameter. If a user wants to avoid the performance penalty of using a transaction for overwrites then I don't think it's unreasonable to suggest that they drop the old table themselves.
Yep, this is the old behavior. It looks like you documented this in one of the original commits that added this functionality: https://github.com/databricks/spark-redshift/blame/v0.5.0/README.md#L280 |
I'm going to go ahead and merge this now and will address any minor comments in folllowups. I'll give this a test with both drivers and then will begin work on packaging a new release. |
This patch aims to significantly simplify our use of transactions when performing overwrites and when creating new tables; it addresses a few shortcomings in the existing code: previously, we didn't issue the
CREATE TABLE
andCOPY
commands as part of the same transaction when creating new tables and when performing overwrites we created a temporary table outside of a transaction, opening the possibility for that table to be leaked in rare corner-cases.To address this, this patch refactors the main write logic so that it uses a JDBC connection with auto-commit disabled. Explicitly issuing
commit()
lets us significantly simplify the code and reduces duplication between the branches for handling different SaveModes.One important change: previously,
usestagingtable
would literally create a separate staging table, but now we will simply delete the existing table and re-create it in a transaction; this frees us of the burden of having to find a unique name for the staging table. This should be safe due to Redshift's serializable snapshot isolation semantics. Whenusestagingtable
is disabled, this now means that we simply issue an extracommit()
after the original table's deletion so that the original table's resources are freed (which lets us write the new rows without having to maintain both the old and new tables at the same time)./cc @jaley, who wrote the original staging-table-handling code; it would be good if you could confirm whether the changes here will undermine the performance benefits / semantics of the old
usestagingtable=false
path.Fixes #153 and #154.