Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: ensure migration progress is not lost for PG, mysql and sqlite #1991

Merged
merged 10 commits into from
Sep 13, 2022
15 changes: 15 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,11 @@ name = "sqlite-test-attr"
path = "tests/sqlite/test-attr.rs"
required-features = ["sqlite", "macros", "migrate"]

[[test]]
name = "sqlite-migrate"
path = "tests/sqlite/migrate.rs"
required-features = ["sqlite", "macros", "migrate"]

#
# MySQL
#
Expand Down Expand Up @@ -232,6 +237,11 @@ name = "mysql-test-attr"
path = "tests/mysql/test-attr.rs"
required-features = ["mysql", "macros", "migrate"]

[[test]]
name = "mysql-migrate"
path = "tests/mysql/migrate.rs"
required-features = ["mysql", "macros", "migrate"]

#
# PostgreSQL
#
Expand Down Expand Up @@ -266,6 +276,11 @@ name = "postgres-test-attr"
path = "tests/postgres/test-attr.rs"
required-features = ["postgres", "macros", "migrate"]

[[test]]
name = "postgres-migrate"
path = "tests/postgres/migrate.rs"
required-features = ["postgres", "macros", "migrate"]

#
# Microsoft SQL Server (MSSQL)
#
Expand Down
2 changes: 1 addition & 1 deletion sqlx-core/src/migrate/migration_type.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/// Migration Type represents the type of migration
#[derive(Debug, Copy, Clone)]
#[derive(Debug, Copy, Clone, PartialEq)]
pub enum MigrationType {
/// Simple migration are single file migrations with no up / down queries
Simple,
Expand Down
85 changes: 73 additions & 12 deletions sqlx-core/src/mysql/migrate.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::connection::ConnectOptions;
use crate::connection::{ConnectOptions, Connection};
use crate::error::Error;
use crate::executor::Executor;
use crate::migrate::MigrateError;
Expand Down Expand Up @@ -209,29 +209,67 @@ CREATE TABLE IF NOT EXISTS _sqlx_migrations (
migration: &'m Migration,
) -> BoxFuture<'m, Result<Duration, MigrateError>> {
Box::pin(async move {
// Use a single transaction for the actual migration script and the essential bookeeping so we never
// execute migrations twice. See https://github.com/launchbadge/sqlx/issues/1966.
// The `execution_time` however can only be measured for the whole transaction. This value _only_ exists for
// data lineage and debugging reasons, so it is not super important if it is lost. So we initialize it to -1
// and update it once the actual transaction completed.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MySQL is a problem case because any DDL statements like CREATE TABLE implicitly commit any open transaction: https://dev.mysql.com/doc/refman/5.6/en/implicit-commit.html
https://dev.mysql.com/doc/refman/8.0/en/implicit-commit.html

There's no real way we can actually get any isolation with MySQL. All the user can really do is either manually reverse the changes or restore from a backup.

What we probably should be doing is inserting into _sqlx_migrations first with success = FALSE and then update it if we successfully apply the migration.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, this is why this construct was there, I was wondering why the mysql code looked so much different. I'll apply your suggestion and add some doc comment explaining it (including the links you've provided).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Applied. However I needed a similar trick for the reverse direction and there I first toggle the flag from true to false and then delete the entry (have a look at the code), let me know if that makes sense to you.

let mut tx = self.begin().await?;
let start = Instant::now();

let res = self.execute(&*migration.sql).await;

let elapsed = start.elapsed();

// For MySQL we cannot really isolate migrations due to implicit commits caused by table modification, see
// https://dev.mysql.com/doc/refman/8.0/en/implicit-commit.html
//
// To somewhat try to detect this, we first insert the migration into the migration table with
// `success=FALSE` and later modify the flag.
//
// language=MySQL
let _ = query(
r#"
INSERT INTO _sqlx_migrations ( version, description, success, checksum, execution_time )
VALUES ( ?, ?, ?, ?, ? )
VALUES ( ?, ?, FALSE, ?, -1 )
"#,
)
.bind(migration.version)
.bind(&*migration.description)
.bind(res.is_ok())
.bind(&*migration.checksum)
.execute(&mut tx)
.await?;

let _ = tx.execute(&*migration.sql).await?;

// language=MySQL
let _ = query(
r#"
UPDATE _sqlx_migrations
SET success = TRUE
WHERE version = ?
"#,
)
.bind(migration.version)
.execute(&mut tx)
.await?;

tx.commit().await?;

// Update `elapsed_time`.
// NOTE: The process may disconnect/die at this point, so the elapsed time value might be lost. We accept
// this small risk since this value is not super important.

let elapsed = start.elapsed();

let _ = query(
r#"
UPDATE _sqlx_migrations
SET execution_time = ?
WHERE version = ?
"#,
)
.bind(elapsed.as_nanos() as i64)
.bind(migration.version)
.execute(self)
.await?;

res?;

Ok(elapsed)
})
}
Expand All @@ -241,18 +279,41 @@ CREATE TABLE IF NOT EXISTS _sqlx_migrations (
migration: &'m Migration,
) -> BoxFuture<'m, Result<Duration, MigrateError>> {
Box::pin(async move {
// Use a single transaction for the actual migration script and the essential bookeeping so we never
// execute migrations twice. See https://github.com/launchbadge/sqlx/issues/1966.
let mut tx = self.begin().await?;
let start = Instant::now();

self.execute(&*migration.sql).await?;
// For MySQL we cannot really isolate migrations due to implicit commits caused by table modification, see
// https://dev.mysql.com/doc/refman/8.0/en/implicit-commit.html
//
// To somewhat try to detect this, we first insert the migration into the migration table with
// `success=FALSE` and later remove the migration altogether.
//
// language=MySQL
let _ = query(
r#"
UPDATE _sqlx_migrations
SET success = FALSE
WHERE version = ?
"#,
)
.bind(migration.version)
.execute(&mut tx)
.await?;

let elapsed = start.elapsed();
tx.execute(&*migration.sql).await?;

// language=SQL
let _ = query(r#"DELETE FROM _sqlx_migrations WHERE version = ?"#)
.bind(migration.version)
.execute(self)
.execute(&mut tx)
.await?;

tx.commit().await?;

let elapsed = start.elapsed();

Ok(elapsed)
})
}
Expand Down
43 changes: 33 additions & 10 deletions sqlx-core/src/postgres/migrate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,23 +222,44 @@ CREATE TABLE IF NOT EXISTS _sqlx_migrations (
let mut tx = self.begin().await?;
let start = Instant::now();

// Use a single transaction for the actual migration script and the essential bookeeping so we never
// execute migrations twice. See https://github.com/launchbadge/sqlx/issues/1966.
// The `execution_time` however can only be measured for the whole transaction. This value _only_ exists for
// data lineage and debugging reasons, so it is not super important if it is lost. So we initialize it to -1
// and update it once the actual transaction completed.
let _ = tx.execute(&*migration.sql).await?;

tx.commit().await?;

let elapsed = start.elapsed();

// language=SQL
let _ = query(
r#"
INSERT INTO _sqlx_migrations ( version, description, success, checksum, execution_time )
VALUES ( $1, $2, TRUE, $3, $4 )
VALUES ( $1, $2, TRUE, $3, -1 )
"#,
)
.bind(migration.version)
.bind(&*migration.description)
.bind(&*migration.checksum)
.execute(&mut tx)
.await?;

tx.commit().await?;

// Update `elapsed_time`.
// NOTE: The process may disconnect/die at this point, so the elapsed time value might be lost. We accept
// this small risk since this value is not super important.

let elapsed = start.elapsed();

// language=SQL
let _ = query(
r#"
UPDATE _sqlx_migrations
SET execution_time = $1
WHERE version = $2
"#,
)
.bind(elapsed.as_nanos() as i64)
.bind(migration.version)
.execute(self)
.await?;

Expand All @@ -251,21 +272,23 @@ CREATE TABLE IF NOT EXISTS _sqlx_migrations (
migration: &'m Migration,
) -> BoxFuture<'m, Result<Duration, MigrateError>> {
Box::pin(async move {
// Use a single transaction for the actual migration script and the essential bookeeping so we never
// execute migrations twice. See https://github.com/launchbadge/sqlx/issues/1966.
let mut tx = self.begin().await?;
let start = Instant::now();

let _ = tx.execute(&*migration.sql).await?;

tx.commit().await?;

let elapsed = start.elapsed();

// language=SQL
let _ = query(r#"DELETE FROM _sqlx_migrations WHERE version = $1"#)
.bind(migration.version)
.execute(self)
.execute(&mut tx)
.await?;

tx.commit().await?;

let elapsed = start.elapsed();

Ok(elapsed)
})
}
Expand Down
43 changes: 33 additions & 10 deletions sqlx-core/src/sqlite/migrate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,23 +173,44 @@ CREATE TABLE IF NOT EXISTS _sqlx_migrations (
let mut tx = self.begin().await?;
let start = Instant::now();

// Use a single transaction for the actual migration script and the essential bookeeping so we never
// execute migrations twice. See https://github.com/launchbadge/sqlx/issues/1966.
// The `execution_time` however can only be measured for the whole transaction. This value _only_ exists for
// data lineage and debugging reasons, so it is not super important if it is lost. So we initialize it to -1
// and update it once the actual transaction completed.
let _ = tx.execute(&*migration.sql).await?;

tx.commit().await?;

let elapsed = start.elapsed();

// language=SQL
let _ = query(
r#"
INSERT INTO _sqlx_migrations ( version, description, success, checksum, execution_time )
VALUES ( ?1, ?2, TRUE, ?3, ?4 )
VALUES ( ?1, ?2, TRUE, ?3, -1 )
"#,
)
.bind(migration.version)
.bind(&*migration.description)
.bind(&*migration.checksum)
.execute(&mut tx)
.await?;

tx.commit().await?;

// Update `elapsed_time`.
// NOTE: The process may disconnect/die at this point, so the elapsed time value might be lost. We accept
// this small risk since this value is not super important.

let elapsed = start.elapsed();

// language=SQL
let _ = query(
r#"
UPDATE _sqlx_migrations
SET execution_time = ?1
WHERE version = ?2
"#,
)
.bind(elapsed.as_nanos() as i64)
.bind(migration.version)
.execute(self)
.await?;

Expand All @@ -202,21 +223,23 @@ CREATE TABLE IF NOT EXISTS _sqlx_migrations (
migration: &'m Migration,
) -> BoxFuture<'m, Result<Duration, MigrateError>> {
Box::pin(async move {
// Use a single transaction for the actual migration script and the essential bookeeping so we never
// execute migrations twice. See https://github.com/launchbadge/sqlx/issues/1966.
let mut tx = self.begin().await?;
let start = Instant::now();

let _ = tx.execute(&*migration.sql).await?;

tx.commit().await?;

let elapsed = start.elapsed();

// language=SQL
let _ = query(r#"DELETE FROM _sqlx_migrations WHERE version = ?1"#)
.bind(migration.version)
.execute(self)
.execute(&mut tx)
.await?;

tx.commit().await?;

let elapsed = start.elapsed();

Ok(elapsed)
})
}
Expand Down
21 changes: 15 additions & 6 deletions tests/migrate/macro.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,29 @@
use sqlx::migrate::Migrator;
use std::path::Path;

static EMBEDDED: Migrator = sqlx::migrate!("tests/migrate/migrations");
static EMBEDDED_SIMPLE: Migrator = sqlx::migrate!("tests/migrate/migrations_simple");
static EMBEDDED_REVERSIBLE: Migrator = sqlx::migrate!("tests/migrate/migrations_reversible");

#[sqlx_macros::test]
async fn same_output() -> anyhow::Result<()> {
let runtime = Migrator::new(Path::new("tests/migrate/migrations")).await?;
let runtime_simple = Migrator::new(Path::new("tests/migrate/migrations_simple")).await?;
let runtime_reversible =
Migrator::new(Path::new("tests/migrate/migrations_reversible")).await?;

assert_eq!(runtime.migrations.len(), EMBEDDED.migrations.len());
assert_same(&EMBEDDED_SIMPLE, &runtime_simple);
assert_same(&EMBEDDED_REVERSIBLE, &runtime_reversible);

for (e, r) in EMBEDDED.iter().zip(runtime.iter()) {
Ok(())
}

fn assert_same(embedded: &Migrator, runtime: &Migrator) {
assert_eq!(runtime.migrations.len(), embedded.migrations.len());

for (e, r) in embedded.iter().zip(runtime.iter()) {
assert_eq!(e.version, r.version);
assert_eq!(e.description, r.description);
assert_eq!(e.migration_type, r.migration_type);
assert_eq!(e.sql, r.sql);
assert_eq!(e.checksum, r.checksum);
}

Ok(())
}
6 changes: 0 additions & 6 deletions tests/migrate/migrations/20200723212833_tweet.sql

This file was deleted.

5 changes: 0 additions & 5 deletions tests/migrate/migrations/20200723212841_accounts.sql

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE migrations_reversible_test;
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
CREATE TABLE migrations_reversible_test (
some_id BIGINT NOT NULL PRIMARY KEY,
some_payload BIGINT NOT NUll
);

INSERT INTO migrations_reversible_test (some_id, some_payload)
VALUES (1, 100);
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
UPDATE migrations_reversible_test
SET some_payload = some_payload - 1;
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
UPDATE migrations_reversible_test
SET some_payload = some_payload + 1;
Loading