Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify use of transactions when performing overwrites and when creating new tables #157

Closed
wants to merge 10 commits into from

Conversation

JoshRosen
Copy link
Contributor

This patch aims to significantly simplify our use of transactions when performing overwrites and when creating new tables; it addresses a few shortcomings in the existing code: previously, we didn't issue the CREATE TABLE and COPY commands as part of the same transaction when creating new tables and when performing overwrites we created a temporary table outside of a transaction, opening the possibility for that table to be leaked in rare corner-cases.

To address this, this patch refactors the main write logic so that it uses a JDBC connection with auto-commit disabled. Explicitly issuing commit() lets us significantly simplify the code and reduces duplication between the branches for handling different SaveModes.

One important change: previously, usestagingtable would literally create a separate staging table, but now we will simply delete the existing table and re-create it in a transaction; this frees us of the burden of having to find a unique name for the staging table. This should be safe due to Redshift's serializable snapshot isolation semantics. When usestagingtable is disabled, this now means that we simply issue an extra commit() after the original table's deletion so that the original table's resources are freed (which lets us write the new rows without having to maintain both the old and new tables at the same time).

/cc @jaley, who wrote the original staging-table-handling code; it would be good if you could confirm whether the changes here will undermine the performance benefits / semantics of the old usestagingtable=false path.

Fixes #153 and #154.

@JoshRosen JoshRosen added the bug label Jan 17, 2016
@JoshRosen JoshRosen added this to the 0.6.1 milestone Jan 17, 2016
@@ -48,6 +47,8 @@ import org.apache.spark.sql.types._
* non-empty. After the write operation completes, we use this to construct a list of non-empty
* Avro partition files.
*
* - Using JDBC, start a new tra
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: this entire block comment needs to be updated.

@JoshRosen
Copy link
Contributor Author

I'd love feedback on whether the tests here are adequate; my gut says that we need a test which counts the number of times that commit() is called when not using a staging table. I can add this tomorrow.

}
log.info(s"Loading new Redshift data to: $table")
doRedshiftLoad(conn, data, params, creds, manifestUrl)
conn.commit()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to handle InterruptedException? i.e. can this block for a really long time?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It has been my experience that virtually any Redshift command, COMMIT included, can block for several minutes under certain circumstances. I think the most likely cause is that the cluster has WLM parameters configured to put the connected client on a limited pool of some type, such that all commands will be queued when all slots are taken by other queries.

I have a feeling that means though, if you plan to send a ROLLBACK command in response to the interruption, that will also block for many minutes...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure that it's safe for us to wrap this in order to catch InterruptedException since I don't think that it's safe to call .rollback() while the other thread is in the middle of executing commit(). Therefore, I'm going to leave this as-is for now and will revisit later if this turns out to be a problem in practice.

@codecov-io
Copy link

Current coverage is 88.52%

Merging #157 into master will decrease coverage by -0.47% as of a2e241b

@@            master    #157   diff @@
======================================
  Files           13      13       
  Stmts          645     636     -9
  Branches       142     138     -4
  Methods          0       0       
======================================
- Hit            574     563    -11
  Partial          0       0       
- Missed          71      73     +2

Review entire Coverage Diff as of a2e241b

Powered by Codecov. Updated on successful CI builds.

@JoshRosen
Copy link
Contributor Author

Ping @jaley, do you have any cycles to help review this? I think that you might be a good person to look at this because you have a lot of the context RE: the original implementation of this functionality.

@jaley
Copy link
Contributor

jaley commented Jan 25, 2016

Hey @JoshRosen ,

Sure. Sorry for being slow to notice this - been playing catch up after a long vacation. I'll take a look in the next day or so if that's cool with you?

@JoshRosen
Copy link
Contributor Author

Sure, that's fine.

} catch {
case NonFatal(e) =>
try {
conn.rollback()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might a log.info or log.warn be good here, to inform anyone testing their application code that the load failed and what they're now waiting for is a rollback to finish?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@jaley
Copy link
Contributor

jaley commented Jan 26, 2016

Do we currently run the integration test with just the Redshift JDBC driver? Now that we're using the driver to manage transactions, I wonder if we might start to see different behaviour between the Redshift and Postgres drivers? Amazon's recommendation is to use their official driver, but I think we've previously been lucky and users could safely make use of either for the small set of commands we rely on. This change effectively exercises different code paths in the driver, so it might be worth making sure that if anything no-longer works about the Postgres driver, we're at least able to communicate that clearly to users and explicitly state that it's not supported any more. (Hopefully it's still fine anyway!)

@jaley
Copy link
Contributor

jaley commented Jan 26, 2016

The change looks good to me. I wonder if perhaps the useStagingTable parameter name is now a little confusing? This is certainly the equivalent behaviour, and so in the interest of maintaining compatibility it should certainly be supported, but I suspect if we were choosing a name for that now, we probably wouldn't refer to staging tables.

I also wonder if it really communicates the fact that if you set this to false, you save disk space but you run the risk of losing your existing data? If not, that was always the case, this change just caused me to think about it some more!

}
mockRedshift.verifyThatConnectionsWereClosed()
mockRedshift.verifyThatCommitWasNotCalled()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this also verify that rollback was called?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@ssimeonov
Copy link

ssimeonov commented May 31, 2016

@JoshRosen what your plan for merging this PR? I've now observed a number of situations where Spark thinks a COPY operation has failed (due to a connection failure) yet it succeeds in Redshift leading to duplicated data when using SaveMode.Append. Putting everything in an end-to-end transaction with an explicit COMMIT should help a lot.

@codecov-io
Copy link

codecov-io commented Jul 7, 2016

Current coverage is 89.42%

Merging #157 into master will decrease coverage by 0.49%

@@             master       #157   diff @@
==========================================
  Files            13         13          
  Lines           685        681     -4   
  Methods         604        596     -8   
  Messages          0          0          
  Branches         81         85     +4   
==========================================
- Hits            616        609     -7   
- Misses           69         72     +3   
  Partials          0          0          

Powered by Codecov. Last updated by 7ae3b9a...b2804bf

@JoshRosen JoshRosen mentioned this pull request Jul 7, 2016
@JoshRosen
Copy link
Contributor Author

JoshRosen commented Jul 8, 2016

@jaley:

The change looks good to me. I wonder if perhaps the useStagingTable parameter name is now a little confusing? This is certainly the equivalent behaviour, and so in the interest of maintaining compatibility it should certainly be supported, but I suspect if we were choosing a name for that now, we probably wouldn't refer to staging tables.

I think that we should just deprecate this parameter. If a user wants to avoid the performance penalty of using a transaction for overwrites then I don't think it's unreasonable to suggest that they drop the old table themselves.

I also wonder if it really communicates the fact that if you set this to false, you save disk space but you run the risk of losing your existing data? If not, that was always the case, this change just caused me to think about it some more!

Yep, this is the old behavior. It looks like you documented this in one of the original commits that added this functionality: https://github.com/databricks/spark-redshift/blame/v0.5.0/README.md#L280

@JoshRosen
Copy link
Contributor Author

I'm going to go ahead and merge this now and will address any minor comments in folllowups. I'll give this a test with both drivers and then will begin work on packaging a new release.

@JoshRosen JoshRosen closed this in e5d825f Jul 10, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants