diff --git a/README.md b/README.md index 1e5fa18c..95f6b4c7 100644 --- a/README.md +++ b/README.md @@ -350,17 +350,15 @@ must also set a distribution key with the distkey option. - usestagingtable + usestagingtable (Deprecated) No true -

When performing an overwrite of existing data, this setting can be used to stage the new data in a temporary -table, such that we make sure the COPY finishes successfully before making any changes to the existing table. -This means that we minimize the amount of time that the target table will be unavailable and restore the old -data should the COPY fail.

+

+ Setting this deprecated option to false will cause an overwrite operation's destination table to be dropped immediately at the beginning of the write, making the overwrite operation non-atomic and reducing the availability of the destination table. This may reduce the temporary disk space requirements for overwrites. +

-

You may wish to disable this by setting the parameter to false if you can't spare the disk space in your -Redshift cluster and/or don't have requirements to keep the table availability high.

+

Since setting usestagingtable=false operation risks data loss / unavailability, we have chosen to deprecate it in favor of requiring users to manually drop the destination table themselves.

@@ -470,15 +468,17 @@ When reading from / writing to Redshift, this library reads and writes data in S ### Guarantees of the Redshift data source for Spark -**Creating a new table**: Creating a new table is a two-step process, consisting of a `CREATE TABLE` command followed by a [`COPY`](https://docs.aws.amazon.com/redshift/latest/dg/r_COPY.html) command to append the initial set of rows. Currently, these two steps are performed in separate transactions, so their effects may become visible at different times to readers. The `COPY` itself is atomic, so the table will never be visible in a state where it contains a non-empty subset of the saved rows. In a future release, this will be changed so that the `CREATE TABLE` and `COPY` statements are issued as part of the same transaction. **Appending to an existing table**: In the [`COPY`](https://docs.aws.amazon.com/redshift/latest/dg/r_COPY.html) command, this library uses [manifests](https://docs.aws.amazon.com/redshift/latest/dg/loading-data-files-using-manifest.html) to guard against certain eventually-consistent S3 operations. As a result, it appends to existing tables have the same atomic and transactional properties as regular Redshift `COPY` commands. -**Overwriting an existing table**: By default, this library uses transactions to perform overwrites. Outside of a transaction, it will create an empty temporary table and append the new rows using a `COPY` statement. If the `COPY` succeeds, it will use a transaction to atomically delete the overwritten table and rename the temporary table to destination table. +**Appending to an existing table**: When inserting rows into Redshift, this library uses the [`COPY`](https://docs.aws.amazon.com/redshift/latest/dg/r_COPY.html) command and specifies [manifests](https://docs.aws.amazon.com/redshift/latest/dg/loading-data-files-using-manifest.html) to guard against certain eventually-consistent S3 operations. As a result, `spark-redshift` appends to existing tables have the same atomic and transactional properties as regular Redshift `COPY` commands. -In a future release, this will be changed so that the temporary table is created in the same transaction as the `COPY`. -This use of a staging table can be disabled by setting `usestagingtable` to `false`, in which case the destination table will be deleted before the `COPY`, sacrificing the atomicity of the overwrite operation. +**Creating a new table (`SaveMode.CreateIfNotExists`)**: Creating a new table is a two-step process, consisting of a `CREATE TABLE` command followed by a [`COPY`](https://docs.aws.amazon.com/redshift/latest/dg/r_COPY.html) command to append the initial set of rows. Both of these operations are performed in a single transaction. + +**Overwriting an existing table**: By default, this library uses transactions to perform overwrites, which are implemented by deleting the destination table, creating a new empty table, and appending rows to it. + +If the deprecated `usestagingtable` setting is set to `false` then this library will commit the `DELETE TABLE` command before appending rows to the new table, sacrificing the atomicity of the overwrite operation but reducing the amount of staging space that Redshift needs during the overwrite. **Querying Redshift tables**: Queries use Redshift's [`UNLOAD`](https://docs.aws.amazon.com/redshift/latest/dg/r_UNLOAD.html) command to execute a query and save its results to S3 and use [manifests](https://docs.aws.amazon.com/redshift/latest/dg/loading-data-files-using-manifest.html) to guard against certain eventually-consistent S3 operations. As a result, queries from Redshift data source for Spark should have the same consistency properties as regular Redshift queries. diff --git a/src/main/scala/com/databricks/spark/redshift/Parameters.scala b/src/main/scala/com/databricks/spark/redshift/Parameters.scala index fe8d22f1..4288ffb2 100644 --- a/src/main/scala/com/databricks/spark/redshift/Parameters.scala +++ b/src/main/scala/com/databricks/spark/redshift/Parameters.scala @@ -191,6 +191,8 @@ private[redshift] object Parameters { def sortKeySpec: Option[String] = parameters.get("sortkeyspec") /** + * DEPRECATED: see PR #157. + * * When true, data is always loaded into a new temporary table when performing an overwrite. * This is to ensure that the whole load process succeeds before dropping any data from * Redshift, which can be useful if, in the event of failures, stale data is better than no data diff --git a/src/main/scala/com/databricks/spark/redshift/RedshiftWriter.scala b/src/main/scala/com/databricks/spark/redshift/RedshiftWriter.scala index 0feff4fe..620a35cd 100644 --- a/src/main/scala/com/databricks/spark/redshift/RedshiftWriter.scala +++ b/src/main/scala/com/databricks/spark/redshift/RedshiftWriter.scala @@ -26,7 +26,6 @@ import org.apache.spark.TaskContext import org.slf4j.LoggerFactory import scala.collection.mutable -import scala.util.Random import scala.util.control.NonFatal import com.databricks.spark.redshift.Parameters.MergedParameters @@ -48,18 +47,15 @@ import org.apache.spark.sql.types._ * non-empty. After the write operation completes, we use this to construct a list of non-empty * Avro partition files. * - * - Use JDBC to issue any CREATE TABLE commands, if required. - * * - If there is data to be written (i.e. not all partitions were empty), then use the list of * non-empty Avro files to construct a JSON manifest file to tell Redshift to load those files. * This manifest is written to S3 alongside the Avro files themselves. We need to use an * explicit manifest, as opposed to simply passing the name of the directory containing the * Avro files, in order to work around a bug related to parsing of empty Avro files (see #96). * - * - Use JDBC to issue a COPY command in order to instruct Redshift to load the Avro data into - * the appropriate table. If the Overwrite SaveMode is being used, then by default the data - * will be loaded into a temporary staging table, which later will atomically replace the - * original table via a transaction. + * - Start a new JDBC transaction and disable auto-commit. Depending on the SaveMode, issue + * DELETE TABLE or CREATE TABLE commands, then use the COPY command to instruct Redshift to load + * the Avro data into the appropriate table. */ private[redshift] class RedshiftWriter( jdbcWrapper: JDBCWrapper, @@ -101,43 +97,6 @@ private[redshift] class RedshiftWriter( s"AVRO 'auto' manifest ${params.extraCopyOptions}" } - /** - * Sets up a staging table then runs the given action, passing the temporary table name - * as a parameter. - */ - private def withStagingTable( - conn: Connection, - table: TableName, - action: (String) => Unit) { - val randomSuffix = Math.abs(Random.nextInt()).toString - val tempTable = - table.copy(unescapedTableName = s"${table.unescapedTableName}_staging_$randomSuffix") - val backupTable = - table.copy(unescapedTableName = s"${table.unescapedTableName}_backup_$randomSuffix") - log.info("Loading new Redshift data to: " + tempTable) - log.info("Existing data will be backed up in: " + backupTable) - - try { - action(tempTable.toString) - - if (jdbcWrapper.tableExists(conn, table.toString)) { - jdbcWrapper.executeInterruptibly(conn.prepareStatement( - s""" - | BEGIN; - | ALTER TABLE $table RENAME TO ${backupTable.escapedTableName}; - | ALTER TABLE $tempTable RENAME TO ${table.escapedTableName}; - | DROP TABLE $backupTable; - | END; - """.stripMargin.trim)) - } else { - jdbcWrapper.executeInterruptibly(conn.prepareStatement( - s"ALTER TABLE $tempTable RENAME TO ${table.escapedTableName}")) - } - } finally { - jdbcWrapper.executeInterruptibly(conn.prepareStatement(s"DROP TABLE IF EXISTS $tempTable")) - } - } - /** * Generate COMMENT SQL statements for the table and columns. */ @@ -151,23 +110,15 @@ private[redshift] class RedshiftWriter( } /** - * Perform the Redshift load, including deletion of existing data in the case of an overwrite, - * and creating the table if it doesn't already exist. + * Perform the Redshift load by issuing a COPY statement. */ private def doRedshiftLoad( conn: Connection, data: DataFrame, - saveMode: SaveMode, params: MergedParameters, creds: AWSCredentials, manifestUrl: Option[String]): Unit = { - // Overwrites must drop the table, in case there has been a schema update - if (saveMode == SaveMode.Overwrite) { - jdbcWrapper.executeInterruptibly( - conn.prepareStatement(s"DROP TABLE IF EXISTS ${params.table.get}")) - } - // If the table doesn't exist, we need to create it first, using JDBC to infer column types val createStatement = createTableSql(data, params) log.info(createStatement) @@ -190,8 +141,11 @@ private[redshift] class RedshiftWriter( jdbcWrapper.executeInterruptibly(conn.prepareStatement(copyStatement)) } catch { case e: SQLException => + log.error("SQLException thrown while running COPY query; will attempt to retrieve " + + "more information by querying the STL_LOAD_ERRORS table", e) // Try to query Redshift's STL_LOAD_ERRORS table to figure out why the load failed. // See http://docs.aws.amazon.com/redshift/latest/dg/r_STL_LOAD_ERRORS.html for details. + conn.rollback() val errorLookupQuery = """ | SELECT * @@ -374,6 +328,12 @@ private[redshift] class RedshiftWriter( "For save operations you must specify a Redshift table name with the 'dbtable' parameter") } + if (!params.useStagingTable) { + log.warn("Setting useStagingTable=false is deprecated; instead, we recommend that you " + + "drop the target table yourself. For more details on this deprecation, see" + + "https://github.com/databricks/spark-redshift/pull/157") + } + val creds: AWSCredentials = AWSCredentialsUtils.load(params, sqlContext.sparkContext.hadoopConfiguration) @@ -382,19 +342,35 @@ private[redshift] class RedshiftWriter( Utils.checkThatBucketHasObjectLifecycleConfiguration(params.rootTempDir, s3ClientFactory(creds)) + // Save the table's rows to S3: + val manifestUrl = unloadData(sqlContext, data, params.createPerQueryTempDir()) val conn = jdbcWrapper.getConnector(params.jdbcDriver, params.jdbcUrl, params.credentials) - + conn.setAutoCommit(false) try { - val tempDir = params.createPerQueryTempDir() - val manifestUrl = unloadData(sqlContext, data, tempDir) - if (saveMode == SaveMode.Overwrite && params.useStagingTable) { - withStagingTable(conn, params.table.get, stagingTable => { - val updatedParams = MergedParameters(params.parameters.updated("dbtable", stagingTable)) - doRedshiftLoad(conn, data, saveMode, updatedParams, creds, manifestUrl) - }) - } else { - doRedshiftLoad(conn, data, saveMode, params, creds, manifestUrl) + val table: TableName = params.table.get + if (saveMode == SaveMode.Overwrite) { + // Overwrites must drop the table in case there has been a schema update + jdbcWrapper.executeInterruptibly(conn.prepareStatement(s"DROP TABLE IF EXISTS $table;")) + if (!params.useStagingTable) { + // If we're not using a staging table, commit now so that Redshift doesn't have to + // maintain a snapshot of the old table during the COPY; this sacrifices atomicity for + // performance. + conn.commit() + } } + log.info(s"Loading new Redshift data to: $table") + doRedshiftLoad(conn, data, params, creds, manifestUrl) + conn.commit() + } catch { + case NonFatal(e) => + try { + log.error("Exception thrown during Redshift load; will roll back transaction", e) + conn.rollback() + } catch { + case NonFatal(e2) => + log.error("Exception while rolling back transaction", e2) + } + throw e } finally { conn.close() } diff --git a/src/test/scala/com/databricks/spark/redshift/MockRedshift.scala b/src/test/scala/com/databricks/spark/redshift/MockRedshift.scala index 6cc90b42..576ee46f 100644 --- a/src/test/scala/com/databricks/spark/redshift/MockRedshift.scala +++ b/src/test/scala/com/databricks/spark/redshift/MockRedshift.scala @@ -89,6 +89,18 @@ class MockRedshift( } } + def verifyThatRollbackWasCalled(): Unit = { + jdbcConnections.foreach { conn => + verify(conn, atLeastOnce()).rollback() + } + } + + def verifyThatCommitWasNotCalled(): Unit = { + jdbcConnections.foreach { conn => + verify(conn, never()).commit() + } + } + def verifyThatExpectedQueriesWereIssued(expectedQueries: Seq[Regex]): Unit = { expectedQueries.zip(queriesIssued).foreach { case (expected, actual) => if (expected.findFirstMatchIn(actual).isEmpty) { diff --git a/src/test/scala/com/databricks/spark/redshift/RedshiftSourceSuite.scala b/src/test/scala/com/databricks/spark/redshift/RedshiftSourceSuite.scala index 37a2d5d6..18e19cad 100644 --- a/src/test/scala/com/databricks/spark/redshift/RedshiftSourceSuite.scala +++ b/src/test/scala/com/databricks/spark/redshift/RedshiftSourceSuite.scala @@ -297,20 +297,12 @@ class RedshiftSourceSuite "usestagingtable" -> "true") val expectedCommands = Seq( - "DROP TABLE IF EXISTS \"PUBLIC\".\"test_table_staging_.*\"".r, - "CREATE TABLE IF NOT EXISTS \"PUBLIC\".\"test_table_staging_.*\"".r, - "DELETE FROM \"PUBLIC\".\"test_table_staging_.*\" WHERE id < 100".r, - "DELETE FROM \"PUBLIC\".\"test_table_staging_.*\" WHERE id > 100".r, - "DELETE FROM \"PUBLIC\".\"test_table_staging_.*\" WHERE id = -1".r, - "COPY \"PUBLIC\".\"test_table_staging_.*\"".r, - """ - | BEGIN; - | ALTER TABLE "PUBLIC"\."test_table" RENAME TO "test_table_backup_.*"; - | ALTER TABLE "PUBLIC"\."test_table_staging_.*" RENAME TO "test_table"; - | DROP TABLE "PUBLIC"\."test_table_backup_.*"; - | END; - """.stripMargin.trim.r, - "DROP TABLE IF EXISTS \"PUBLIC\"\\.\"test_table_staging_.*\"".r) + "DROP TABLE IF EXISTS \"PUBLIC\".\"test_table.*\"".r, + "CREATE TABLE IF NOT EXISTS \"PUBLIC\".\"test_table.*\"".r, + "DELETE FROM \"PUBLIC\".\"test_table.*\" WHERE id < 100".r, + "DELETE FROM \"PUBLIC\".\"test_table.*\" WHERE id > 100".r, + "DELETE FROM \"PUBLIC\".\"test_table.*\" WHERE id = -1".r, + "COPY \"PUBLIC\".\"test_table.*\"".r) source.createRelation(testSqlContext, SaveMode.Overwrite, params, expectedDataDF) mockRedshift.verifyThatExpectedQueriesWereIssued(expectedCommands) @@ -324,19 +316,11 @@ class RedshiftSourceSuite "distkey" -> "testint") val expectedCommands = Seq( - "DROP TABLE IF EXISTS \"PUBLIC\"\\.\"test_table_staging_.*\"".r, - ("CREATE TABLE IF NOT EXISTS \"PUBLIC\"\\.\"test_table_staging.*" + + "DROP TABLE IF EXISTS \"PUBLIC\"\\.\"test_table.*\"".r, + ("CREATE TABLE IF NOT EXISTS \"PUBLIC\"\\.\"test_table.*" + " DISTSTYLE KEY DISTKEY \\(testint\\).*").r, - "COPY \"PUBLIC\"\\.\"test_table_staging_.*\"".r, - "GRANT SELECT ON \"PUBLIC\"\\.\"test_table_staging.+\" TO jeremy".r, - """ - | BEGIN; - | ALTER TABLE "PUBLIC"\."test_table" RENAME TO "test_table_backup_.*"; - | ALTER TABLE "PUBLIC"\."test_table_staging_.*" RENAME TO "test_table"; - | DROP TABLE "PUBLIC"\."test_table_backup_.*"; - | END; - """.stripMargin.trim.r, - "DROP TABLE IF EXISTS \"PUBLIC\"\\.\"test_table_staging_.*\"".r) + "COPY \"PUBLIC\"\\.\"test_table.*\"".r, + "GRANT SELECT ON \"PUBLIC\"\\.\"test_table\" TO jeremy".r) val mockRedshift = new MockRedshift( defaultParams("url"), @@ -374,6 +358,8 @@ class RedshiftSourceSuite testSqlContext, df, SaveMode.Append, Parameters.mergeParameters(defaultParams)) } mockRedshift.verifyThatConnectionsWereClosed() + mockRedshift.verifyThatCommitWasNotCalled() + mockRedshift.verifyThatRollbackWasCalled() mockRedshift.verifyThatExpectedQueriesWereIssued(Seq.empty) } @@ -383,23 +369,22 @@ class RedshiftSourceSuite val mockRedshift = new MockRedshift( defaultParams("url"), Map(TableName.parseFromEscaped("test_table").toString -> TestUtils.testSchema), - jdbcQueriesThatShouldFail = Seq("COPY \"PUBLIC\".\"test_table_staging_.*\"".r)) + jdbcQueriesThatShouldFail = Seq("COPY \"PUBLIC\".\"test_table.*\"".r)) val expectedCommands = Seq( - "DROP TABLE IF EXISTS \"PUBLIC\".\"test_table_staging_.*\"".r, - "CREATE TABLE IF NOT EXISTS \"PUBLIC\".\"test_table_staging_.*\"".r, - "COPY \"PUBLIC\".\"test_table_staging_.*\"".r, - ".*FROM stl_load_errors.*".r, - "DROP TABLE IF EXISTS \"PUBLIC\".\"test_table_staging_.*\"".r + "DROP TABLE IF EXISTS \"PUBLIC\".\"test_table.*\"".r, + "CREATE TABLE IF NOT EXISTS \"PUBLIC\".\"test_table.*\"".r, + "COPY \"PUBLIC\".\"test_table.*\"".r, + ".*FROM stl_load_errors.*".r ) val source = new DefaultSource(mockRedshift.jdbcWrapper, _ => mockS3Client) intercept[Exception] { source.createRelation(testSqlContext, SaveMode.Overwrite, params, expectedDataDF) - mockRedshift.verifyThatConnectionsWereClosed() - mockRedshift.verifyThatExpectedQueriesWereIssued(expectedCommands) } mockRedshift.verifyThatConnectionsWereClosed() + mockRedshift.verifyThatCommitWasNotCalled() + mockRedshift.verifyThatRollbackWasCalled() mockRedshift.verifyThatExpectedQueriesWereIssued(expectedCommands) }